概念

三个角色

Component Data Stored Replication Replication Method
Ozone Manager (OM) Full namespace metadata Fully replicated on all OM nodes Raft (Apache Ratis)
Storage Container Manager (SCM) Full storage metadata Fully replicated on all SCM nodes Raft (Apache Ratis)
DataNode Actual data blocks Replicated as per replication factor (e.g., 3 replicas) Managed by SCM pipelines

架构

om 的 处理过程

om 存储的元数据大致内容

1
2
3
4
5
6
Container: c1
  ├── Block: b1 (DataNode1, DataNode2, DataNode3)
  ├── Block: b2 (DataNode1, DataNode2, DataNode3)

Container: c2
  ├── Block: b3 (DataNode4, DataNode5, DataNode6)

存储在 rocksdb 中的内容

1
2
3
4
5
6
7
{
  "container_id": "c1",
  "blocks": ["b1", "b2"],
  "replication_factor": 3,
  "pipeline": "pipeline_1",
  "datanodes": ["datanode1", "datanode2", "datanode3"]
}

scm 的处理过程

存储的内容

1
2
3
4
5
6
Container: c1
  ├── Block: b1 (DataNode1, DataNode2, DataNode3)
  ├── Block: b2 (DataNode1, DataNode2, DataNode3)

Container: c2
  ├── Block: b3 (DataNode4, DataNode5, DataNode6)

存储在 rocksdb 中的内容

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
  "block_id": "b1",
  "container_id": "c1",
  "size": "4MB",
  "checksum": "abc123",
  "replicas": [
    {"datanode": "datanode1", "location": "/path/to/block1"},
    {"datanode": "datanode2", "location": "/path/to/block1"},
    {"datanode": "datanode3", "location": "/path/to/block1"}
  ]
}

datanode 类似 HDFS 的datanode,是真正存储数据的地方

  • 会先分配一个 container,默认 5G
  • 多个文件都是放到一个 container中,按照offset区分

recon 提供管理界面

高可用,使用 rocksdb + raft 实现的

其中 raft 使用的是 apache 开源的 raft 实现库

https://github.com/apache/ratis

安全,可以整合 ranger 和 kberos
自带了一些 ACL

文件的元数据布局

存储方案

HDFS 替代存储的性能对比

Feature Apache Ozone Ceph Cluster Storage (General) MinIO
Storage Type Object Storage (HDFS Compatible) Object, Block, File Storage Varies (File, Block, etc.) Object Storage (S3 Compatible)
Hadoop Integration Full Integration Limited (via S3 Gateway) Varies None (direct S3 interface)
Ease of Deployment Moderate Complex Varies Easy
Performance Optimized for small/large files High performance with scalability Depends on system High throughput, low latency
Use Case Big Data Analytics Multi-purpose storage HPC, scalable general storage Cloud-native object storage

整合

ozone整合

整合 spark

1
2
3
4
5
spark-submit \
  --master local \
  --conf spark.hadoop.fs.defaultFS=ofs://ozone1 \
  --conf spark.hadoop.ozone.om.address=localhost:9862 \
  --class your.package.YourApp your-spark-app.jar

代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("Spark with Ozone")
  .config("spark.hadoop.fs.defaultFS", "ofs://ozone1")
  .config("spark.hadoop.ozone.om.address", "localhost:9862")
  .getOrCreate()

val data = Seq(("Alice", 25), ("Bob", 30))
val df = spark.createDataFrame(data).toDF("name", "age")

df.write.format("parquet").save("ofs://ozone1/mybucket/spark_data")

val readDF = spark.read.format("parquet").load("ofs://ozone1/mybucket/spark_data")
readDF.show()

整合 flink

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object FlinkOzoneExample {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val data = env.fromElements(("Alice", 25), ("Bob", 30))

    data.writeAsCsv("ofs://ozone1/mybucket/flink_data")

    env.execute("Flink Ozone Integration")
  }
}

presto、trino

hive.metastore.uri=thrift://localhost:9083
hive.s3.warehouse.dir=ofs://ozone1/mybucket/
fs.defaultFS=ofs://ozone1
ozone.om.address=localhost:9862

然后直接查询就可以了

1
SELECT * FROM mybucket.my_table WHERE age > 25;

impala,因为是兼容 hdfs,所以整合也很简单

1
2
SET fs.defaultFS=ofs://ozone1;
SELECT * FROM mybucket.my_table;

doris

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE EXTERNAL TABLE my_external_table (
    id INT,
    name STRING
)
PROPERTIES (
    "fs.defaultFS"="ofs://ozone1",
    "path"="ofs://ozone1/mybucket/doris_data/"
);

SELECT * FROM my_external_table;

minio整合

整合spark

spark.hadoop.fs.s3a.endpoint=http://<minio-host>:9000
spark.hadoop.fs.s3a.access.key=<your-access-key>
spark.hadoop.fs.s3a.secret.key=<your-secret-key>
spark.hadoop.fs.s3a.path.style.access=true
spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem

代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("Spark with MinIO")
  .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
  .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
  .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
  .config("spark.hadoop.fs.s3a.path.style.access", "true")
  .getOrCreate()

val data = Seq(("Alice", 28), ("Bob", 35))
val df = spark.createDataFrame(data).toDF("name", "age")

// Write to MinIO
df.write.parquet("s3a://mybucket/spark/data")

// Read from MinIO
val readDF = spark.read.parquet("s3a://mybucket/spark/data")
readDF.show()

flink 整合

1
2
3
4
5
s3.endpoint: http://<minio-host>:9000
s3.access-key: <your-access-key>
s3.secret-key: <your-secret-key>
s3.path-style-access: true
s3.filesystem.impl: org.apache.hadoop.fs.s3a.S3AFileSystem

代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import org.apache.flink.api.scala._

object FlinkMinIOExample {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val data = env.fromElements(("Alice", 28), ("Bob", 35))

    // Write to MinIO
    data.writeAsCsv("s3a://mybucket/flink/data")

    env.execute("Flink with MinIO")
  }
}

presto/trino

hive.s3.aws-access-key=<your-access-key>
hive.s3.aws-secret-key=<your-secret-key>
hive.s3.endpoint=http://<minio-host>:9000
hive.s3.path-style-access=true
hive.s3.file-system-type=EMRFS

查询

1
SELECT * FROM mybucket.my_table WHERE age > 30;

doris 整合

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
CREATE EXTERNAL TABLE my_external_table (
  id INT,
  name STRING
)
PROPERTIES (
  "s3.endpoint" = "http://<minio-host>:9000",
  "s3.access_key" = "<your-access-key>",
  "s3.secret_key" = "<your-secret-key>",
  "s3.path" = "s3a://mybucket/doris/data/"
);

SELECT * FROM my_external_table;

参考