Table Design

Data Model

  • Aggregate
  • Unique
  • Duplicate

Aggregate Model

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
CREATE DATABASE IF NOT EXISTS example_db;

CREATE TABLE IF NOT EXISTS example_db.example_tbl_agg1
(
    `user_id` LARGEINT NOT NULL COMMENT "user id",
    `date` DATE NOT NULL COMMENT "data import time",
    `city` VARCHAR(20) COMMENT "city",
    `age` SMALLINT COMMENT "age",
    `sex` TINYINT COMMENT "gender",
    `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "last visit date time",
    `cost` BIGINT SUM DEFAULT "0" COMMENT "user total cost",
    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "user max dwell time",
    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "user min dwell time"
)
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

插入数据

1
2
3
4
5
6
7
8
insert into example_db.example_tbl_agg1 values
(10000,"2017-10-01","Beijing",20,0,"2017-10-01 06:00:00",20,10,10),
(10000,"2017-10-01","Beijing",20,0,"2017-10-01 07:00:00",15,2,2),
(10001,"2017-10-01","Beijing",30,1,"2017-10-01 17:05:45",2,22,22),
(10002,"2017-10-02","Shanghai",20,1,"2017-10-02 12:59:12",200,5,5),
(10003,"2017-10-02","Guangzhou",32,0,"2017-10-02 11:20:00",30,11,11),
(10004,"2017-10-01","Shenzhen",35,0,"2017-10-01 10:00:15",100,3,3),
(10004,"2017-10-03","Shenzhen",35,0,"2017-10-03 10:20:22",11,6,6);

10000的那两条数据,user_id, date, city, age, sex 这几个值都一样,所以被聚合了

  • last_visit_date,被最新的替换了
  • cost,将两条数据值做了 sum
  • max_dwell_time,取两条记录最大
  • min_dwell_time,取两条记录最小

10004 没有被聚合,因为两条记录有些值不完全一样
这种聚合适合 多维分析场景,可以预聚合

主要的聚合类型

  • SUM: Accumulate the values in multiple rows.
  • REPLACE: The newly imported value will replace the previous value.
  • MAX: Keep the maximum value.
  • MIN: Keep the minimum value.
  • REPLACE_IF_NOT_NULL: Non-null value replacement. Unlike REPLACE, it does not replace null values.
  • HLL_UNION: Aggregation method for columns of HLL type, using the HyperLogLog algorithm for aggregation.
  • BITMAP_UNION: Aggregation method for columns of BITMAP type, performing a union aggregation of bitmaps.

agg_state

  • 具体的类型根据 申明的聚合函数来确定
  • 更像是一个中间结果
1
2
3
4
5
6
7
8
9
set enable_agg_state=true;
create table aggstate(
    k1 int null,
    k2 agg_state sum(int),
    k3 agg_state group_concat(string)
)
aggregate key (k1)
distributed BY hash(k1) buckets 3
properties("replication_num" = "1");

使用

1
2
3
4
insert into aggstate values(1,sum_state(1),group_concat_state('a'));
insert into aggstate values(1,sum_state(2),group_concat_state('b'));
insert into aggstate values(1,sum_state(3),group_concat_state('c'));
insert into aggstate values(2,sum_state(4),group_concat_state('d'));

此时表的结构

k1 k2 k3
1 sum(1,2,3) group_concat_state(a,b,c)
2 sum(4) group_concat_state(d)

查询

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
select sum_merge(k2) from aggstate;
+---------------+
| sum_merge(k2) |
+---------------+
|            10 |
+---------------+

select group_concat_merge(k3) from aggstate;
+------------------------+
| group_concat_merge(k3) |
+------------------------+
| c,b,a,d                |
+------------------------+

Unique Model

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
CREATE TABLE IF NOT EXISTS example_db.example_tbl_unique
(
`user_id` LARGEINT NOT NULL COMMENT "User ID",
`username` VARCHAR (50) NOT NULL COMMENT "Username",
`city` VARCHAR (20) COMMENT "User location city",
`age` SMALLINT COMMENT "User age",
`sex` TINYINT COMMENT "User sex",
`phone` LARGEINT COMMENT "User phone number",
`address` VARCHAR (500) COMMENT "User address",
`register_time` DATETIME COMMENT "User registration time"
)
UNIQUE KEY (`user_id`, `username`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

相当于定义 primary key,多列组成复合主键,重复了就是 替换
类似于聚合模式的 replace
1.2 之后新增了Merge on Write,大概是 LSM 的写墓碑操作,可以优化读性能
比 聚合模式的 Merge on Read性能更好

Duplicate Model

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
CREATE TABLE IF NOT EXISTS example_db.example_tbl_duplicate
(
    `timestamp` DATETIME NOT NULL COMMENT "Log time",
    `type` INT NOT NULL COMMENT "Log type",
    `error_code` INT COMMENT "Error code",
    `error_msg` VARCHAR(1024) COMMENT "Error details",
    `op_id` BIGINT COMMENT "Operator ID",
    `op_time` DATETIME COMMENT "Operation time"
)
DUPLICATE KEY(`timestamp`, `type`, `error_code`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

多个 key 重复了不会覆盖,还是相当于多个 row

Data Partition

分区维度划分

  • 表 -> 分区 -> tablet
  • tablet 是最小的物理单元,movement and replication 基于 tablet 操作
  • partition 是最小 逻辑单元,import, delete 等基于 分区来做
  • 每个分区之间的数据也是不想交的

range 分区

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
CREATE TABLE IF NOT EXISTS example_db.example_range_tbl
(
    `user_id` LARGEINT NOT NULL COMMENT "User ID",
    `date` DATE NOT NULL COMMENT "Date when the data are imported",
    `timestamp` DATETIME NOT NULL COMMENT "Timestamp when the data are imported",
    `city` VARCHAR(20) COMMENT "User location city",
    `age` SMALLINT COMMENT "User age",
    `sex` TINYINT COMMENT "User gender",
    `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "User last visit time",
    `cost` BIGINT SUM DEFAULT "0" COMMENT "Total user consumption",
    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "Maximum user dwell time",
    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "Minimum user dwell time"
)
ENGINE=olap
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
PARTITION BY RANGE(`date`)
(
    PARTITION `p201701` VALUES LESS THAN ("2017-02-01"),
    PARTITION `p201702` VALUES LESS THAN ("2017-03-01"),
    PARTITION `p201703` VALUES LESS THAN ("2017-04-01")
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
PROPERTIES
(
    "replication_num" = "3",
    "storage_medium" = "SSD",
    "storage_cooldown_time" = "2018-01-01 12:00:00"
);

list 分区

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
CREATE TABLE IF NOT EXISTS example_db.example_list_tbl
(
    `user_id` LARGEINT NOT NULL COMMENT "User ID",
    `date` DATE NOT NULL COMMENT "Date when the data are imported",
    `timestamp` DATETIME NOT NULL COMMENT "Timestamp when the data are imported",
    `city` VARCHAR(20) COMMENT "User location city",
    `age` SMALLINT COMMENT "User Age",
    `sex` TINYINT COMMENT "User gender",
    `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "User last visit time",
    `cost` BIGINT SUM DEFAULT "0" COMMENT "Total user consumption",
    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "Maximum user dwell time",
    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "Minimum user dwell time"
)
ENGINE=olap
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
PARTITION BY LIST(`city`)
(
    PARTITION `p_cn` VALUES IN ("Beijing", "Shanghai", "Hong Kong"),
    PARTITION `p_usa` VALUES IN ("New York", "San Francisco"),
    PARTITION `p_jp` VALUES IN ("Tokyo")
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
PROPERTIES
(
    "replication_num" = "3",
    "storage_medium" = "SSD",
    "storage_cooldown_time" = "2018-01-01 12:00:00"
);

Bucketing

  • 类似于 hive 和 impala 的分桶
  • 基于 hash、或者随机
  • 桶多了适合写,尤其是随机的可以消除数据倾斜;桶少了适合读取
  • tablet 大小推荐 1G - 10G
  • 双层分区就是 分区 + 分桶,单层分区只有一个桶

Rollup

Rollup

  • Rollup can be seen as a materialized index structure for a Table
  • materialized in the sense that its data is physically independent in storage
  • and indexed in the sense that Rollup can reorder columns to increase the hit rate of prefix indexes as well as reduce Key columns to increase the aggregation level of data.

概念

  • rollup,相当于 物化索引
  • 查询的时候由 Doris 自动决定,用户无法操作
  • rollup 是在基表上创建的,对基表的操作也会影响到 rollup
  • rollup的存储跟 基本是独立的,大量的rollup对 import 有影响,但对查询无影响
  • 查询中的所有列,包括 select 和 where 中的列都必须在 rollup 中出现,否则智能访问基表
  • 对基表的更新会 自动同步更新到 rollup

操作

1
ALTER TABLE table1 ADD ROLLUP rollup_city(citycode, pv);

查看

1
2
3
SHOW ALTER TABLE ROLLUP;
DESC tbl_name ALL;
CANCEL ALTER TABLE ROLLUP FROM table1;

Index

内置索引,不可选择

  • prefix indexes
  • ZoneMap indexes

prefix index 例子

ColumnName Type
user_id BIGINT
age INT
message VARCHAR(100)
max_dwell_time DATETIME
min_dwell_time DATETIME

使用 36个字节作为 prefix index,user_id(8 Bytes) + age(4 Bytes) + message(prefix 20 Bytes)
message会被截断

1
2
3
4
5
-- 有有效
SELECT * FROM table WHERE user_id=1829239 and age=20
-- 较差
SELECT * FROM table WHERE age=20

用户自定义的索引

  • inverted index
  • bloomfilter index
  • ngram bloomfilter index
  • bitmap index

Inverted Index

基本语法

  • parser,包含english、chinese、unicode 等解析方式
  • parser_mode,fine_grained 细节度解析,会生成更多的 token,coarse_grained
  • support_phrase,索引是否需要支持短语模式查询MATCH_PHRAS
  • char_filter,用于预处理阶段

一些例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
SELECT TOKENIZE('武汉长江大桥','"parser"="chinese","parser_mode"="fine_grained"');
+-----------------------------------------------------------------------------------+
| tokenize('武汉长江大桥', '"parser"="chinese","parser_mode"="fine_grained"')       |
+-----------------------------------------------------------------------------------+
| ["武汉", "武汉长江大桥", "长江", "长江大桥", "大桥"]                              |
+-----------------------------------------------------------------------------------+

SELECT TOKENIZE('武汉市长江大桥','"parser"="chinese","parser_mode"="coarse_grained"');
+----------------------------------------------------------------------------------------+
| tokenize('武汉市长江大桥', '"parser"="chinese","parser_mode"="coarse_grained"')        |
+----------------------------------------------------------------------------------------+
| ["武汉市", "长江大桥"]                                                                 |
+----------------------------------------------------------------------------------------+

SELECT TOKENIZE('I love CHINA','"parser"="english"');
+------------------------------------------------+
| tokenize('I love CHINA', '"parser"="english"') |
+------------------------------------------------+
| ["i", "love", "china"]                         |
+------------------------------------------------+

一个例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
CREATE DATABASE test_inverted_index;

USE test_inverted_index;

-- define inverted index idx_comment for comment column on table creation
--   USING INVERTED specify using inverted index
--   PROPERTIES("parser" = "english") specify english word parser
CREATE TABLE hackernews_1m
(
    `id` BIGINT,
    `deleted` TINYINT,
    `type` String,
    `author` String,
    `timestamp` DateTimeV2,
    `comment` String,
    `dead` TINYINT,
    `parent` BIGINT,
    `poll` BIGINT,
    `children` Array<BIGINT>,
    `url` String,
    `score` INT,
    `title` String,
    `parts` Array<INT>,
    `descendants` INT,
    INDEX idx_comment (`comment`) USING INVERTED PROPERTIES("parser" = "english") COMMENT 'inverted index for comment'
)
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES ("replication_num" = "1");

查询

1
2
3
4
5
6
7
8
-- 普通模糊查询
SELECT count() FROM hackernews_1m WHERE comment LIKE '%OLAP%';
SELECT count() FROM hackernews_1m WHERE comment LIKE '%OLAP%' AND comment LIKE '%OLTP%';
+---------+

-- 使用索引
SELECT count() FROM hackernews_1m WHERE comment MATCH_ANY 'OLAP';
SELECT count() FROM hackernews_1m WHERE comment MATCH_ALL 'OLAP OLTP';

增加一索引

1
2
3
4
5
6
7
CREATE INDEX idx_timestamp ON hackernews_1m(timestamp) USING INVERTED;
BUILD INDEX idx_timestamp ON hackernews_1m;

SHOW ALTER TABLE COLUMN;
SHOW BUILD INDEX;

SELECT count() FROM hackernews_1m WHERE timestamp > '2007-08-23 04:17:00';

查看状态

1
2
SHOW ALTER TABLE COLUMN;
SHOW BUILD INDEX;

BloomFilter Index

创建

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
CREATE TABLE IF NOT EXISTS sale_detail_bloom (
    sale_date date NOT NULL COMMENT "Sales time",
    customer_id int NOT NULL COMMENT "Customer ID",
    saler_id int NOT NULL COMMENT "Salesperson",
    sku_id int NOT NULL COMMENT "Product ID",
    category_id int NOT NULL COMMENT "Product Category",
    sale_count int NOT NULL COMMENT "Sales Quantity",
    sale_price DECIMAL(12,2) NOT NULL COMMENT "unit price",
    sale_amt DECIMAL(20,2) COMMENT "Total sales amount"
)
Duplicate KEY(sale_date, customer_id,saler_id,sku_id,category_id)
PARTITION BY RANGE(sale_date)
(
PARTITION P_202111 VALUES [('2021-11-01'), ('2021-12-01'))
)
DISTRIBUTED BY HASH(saler_id) BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"bloom_filter_columns"="saler_id,category_id",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "MONTH",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "P_",
"dynamic_partition.replication_num" = "1",
"dynamic_partition.buckets" = "3"
);

检查

1
2
3
4
5
6
7
8
-- 查看表结构,是否存在
SHOW CREATE TABLE <table_name>;

-- 删除
ALTER TABLE <db.table_name> SET ("bloom_filter_columns" = "");

-- 修改
ALTER TABLE <db.table_name> SET ("bloom_filter_columns" = "k1,k3");

使用场景

  • 适合 non-prefix filtering
  • 根据最高频率进行过滤,大多条件都是 = 过滤
  • bitmap 适合 low-cardinality 的列,如 gender,bloom filter 适合 high cardinality 列,如 UserID

NGram BloomFilter Index

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
CREATE TABLE `table3` (
  `siteid` int(11) NULL DEFAULT "10" COMMENT "",
  `citycode` smallint(6) NULL COMMENT "",
  `username` varchar(100) NULL DEFAULT "" COMMENT "",
  INDEX idx_ngrambf (`username`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256") COMMENT 'username ngram_bf index'
) ENGINE=OLAP
AGGREGATE KEY(`siteid`, `citycode`, `username`) COMMENT "OLAP"
DISTRIBUTED BY HASH(`siteid`) BUCKETS 10
PROPERTIES (
"replication_num" = "1"
);

-- PROPERTIES("gram_size"="3", "bf_size"="1024"),indicate the number of gram and bytes of bloom filter respectively.
-- the gram size set to same as the like query pattern string length. 
-- and the suitable bytes of bloom filter can be get by test, more larger more better, 256 maybe is a good start.
-- Usually, if the data's cardinality is small, you can increase the bytes of bloom filter to improve the efficiency.

检查

1
2
3
4
5
show index from example_db.table3;
alter table example_db.table3 drop index idx_ngrambf;

alter table example_db.table3 add index idx_ngrambf(username)
    using NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="512")comment 'username ngram_bf index' 

NGrams

  • NGram is a contiguous sequence of n items from a given sample of text or speech.
  • An NGram of size 1 is referred to as a “unigram”; size 2 is a “bigram”;
  • size 3 is a “trigram”. For example, the word “hello” can be broken down into the following bigrams: he, el, ll, lo.

NGram Bloom Filter Index

  • This is a combination of the two concepts, typically used to index text data.
  • It uses NGrams to break down the text into searchable parts and then applies a Bloom filter to these NGrams to quickly check for the presence or absence of these NGrams in the index.

Bitmap Index

操作

1
2
3
CREATE INDEX IF NOT EXISTS bitmap_index ON table3 (siteid) USING BITMAP COMMENT 'balabala';
SHOW INDEX FROM example_db.table_name;
DROP INDEX [IF EXISTS] index_name ON [db_name.]table_name;

查看状态

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
SHOW INDEX FROM table3\G
*************************** 1. row ***************************
       Table: default_cluster:test_index.table3
  Non_unique: 
    Key_name: idx_ngrambf
Seq_in_index: 
 Column_name: username
   Collation: 
 Cardinality: 
    Sub_part: 
      Packed: 
        Null: 
  Index_type: NGRAM_BF
     Comment: username ngram_bf index
  Properties: ("gram_size" = "3", "bf_size" = "256")
*************************** 2. row ***************************
       Table: default_cluster:test_index.table3
  Non_unique: 
    Key_name: bitmap_index
Seq_in_index: 
 Column_name: siteid
   Collation: 
 Cardinality: 
    Sub_part: 
      Packed: 
        Null: 
  Index_type: BITMAP
     Comment: balabala
  Properties: 

Basic Use

冷热数据

-- 打了:HDD 标签的是冷数据
-- SSD 标签的是热数据
storage_root_path=/home/disk1/doris,medium:HDD;/home/disk2/doris,medium:SSD

Broker

  • Broker is deployed as a plug-in, which is independent of Doris.
  • If you need to import data from a third-party storage system, you need to deploy the corresponding Broker.
  • By default, Doris provides fs_broker for HDFS reading and object storage (supporting S3 protocol).
  • fs_broker is stateless and we recommend that you deploy a Broker for each FE and BE node.

一些管理操作

1
2
3
4
SET PASSWORD FOR 'root' = PASSWORD('your_password');

CREATE USER 'test' IDENTIFIED BY 'test_passwd';
GRANT ALL ON example_db TO test;

load data

1
2
curl --location-trusted -u test:test_passwd -H "label:table1_20170707" -H "column_separator:," 
-T table1_data http://FE_HOST:8030/api/example_db/table1/_stream_load

Broker Load

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
LOAD LABEL table1_20170708
(
    DATA INFILE("hdfs://your.namenode.host:port/dir/table1_data")
    INTO TABLE table1
)
WITH BROKER hdfs 
(
    "username"="hdfs_user",
    "password"="hdfs_password"
)
PROPERTIES
(
    "timeout"="3600",
    "max_filter_ratio"="0.1"
);


SHOW LOAD WHERE LABEL = "table1_20170708";

查询变量

1
2
3
4
5
SHOW VARIABLES;
SHOW VARIABLES LIKE "%mem_limit%";
SET exec_mem_limit = 8589934592;
SHOW VARIABLES LIKE "%query_timeout%";
SET query_timeout = 60;

Data Operation

Import

By Scene

Data Source Import Method
Object Storage (s3), HDFS Import data using Broker
Local file Import local data
Kafka Subscribe to Kafka data
Mysql, PostgreSQL, Oracle, SQLServer Sync data via external table
Import via JDBC Sync data using JDBC
Import JSON format data JSON format data import

Divided by Import Method

Import method name Use method
Spark Load Import external data via Spark
Broker Load Import external storage data via Broker
Stream Load Stream import data (local file and memory data)
Routine Load Import Kafka data
Insert Into External table imports data through INSERT
S3 Load Object storage data import of S3 protocol
MySql Load Local data import of MySql protocol

Supported Data Formats

Import Methods Supported Formats
Broker Load parquet, orc, csv, gzip
Stream Load csv, json, parquet, orc
Routine Load csv, json
MySql Load csv

INSERT 自动有原子保证
通过 Label再配合上游,可以实现精确一致
同步导入,异步导入

导入数组形式,支持向量化

1
2
3
4
5
6
7
LOAD LABEL label_03_14_49_34_898986_19090452100 ( 
  DATA INFILE("hdfs://test.hdfs.com:9000/user/test/data/sys/load/array_test.data") 
  INTO TABLE `test_array_table` 
  COLUMNS TERMINATED BY "|" (`k1`, `a1`, `a2`, `a3`, `a4`, `a5`, `a6`, `a7`, `a8`, `a9`, `a10`, `a11`, `a12`, `a13`, `b14`) 
  SET(a14=array_union(cast(b14 as array<string>), cast(a13 as array<string>))) WHERE size(a2) > 270) 
  WITH BROKER "hdfs" ("username"="test_array", "password"="") 
  PROPERTIES( "max_filter_ratio"="0.8" );

Import Scenes

Stream Load

1
PUT /api/{db}/{table}/_stream_load

最大不要超过 1G - 2G,超过可以并发批次导入

MySQL load

1
2
3
4
LOAD DATA
LOCAL
INFILE '/path/to/local/demo.txt'
INTO TABLE demo.load_local_file_test

从 hdfs 导入

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
LOAD LABEL demo.label_20231212_1
    (
    DATA INFILE("hdfs://1.2.3.4:8020/user/root/haha.txt")
    INTO TABLE `load_hdfs_file_test`
    COLUMNS TERMINATED BY "\t"            
    (id,age,name)
    )
    with HDFS (
    "fs.defaultFS"="hdfs://1.2.3.4:8020",
    "hdfs_user"="root"
    )
    PROPERTIES
    (
    "timeout"="1200",
    "max_filter_ratio"="0.1"
    );

检查结果

1
show load order by createtime desc limit 1\G;

支持 S3 导入

Kafka 导入

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
CREATE ROUTINE LOAD demo.my_first_routine_load_job ON test_1
COLUMNS TERMINATED BY ","
PROPERTIES
(
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
)
FROM KAFKA
(
    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    "kafka_topic" = "my_topic",
    "property.group.id" = "xxx",
    "property.client.id" = "xxx",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

支持的外部数据源

  • MySQL
  • Oracle
  • PostgreSQL
  • SQLServer
  • Hive
  • Iceberg
  • ElasticSearch

导入

1
2
INSERT INTO table SELECT ...
INSERT INTO table VALUES(...)

column conversion

  • Pre-filtering
  • Mapping, souce (a1,a2,a3) -> dest (a2, a3, a1 * 10)
  • Convert the column values in the source file and import them into the table
  • Through the case when function, column conversion is performed conditionally
  • Convert the null value in the source file to 0 and import it. At the same time, the region id conversion in Example 2 is also performed.

Data Quality

  • Filtered Rows,会碰到数据格式,类型不匹配,长度阶段等错误
  • Unselected Rows
  • Loaded Rows
  • Strict Mode

import methods

  • broker load
  • stream load
  • routine load
  • INSERT

Import Way

Broker Load

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
				 +
                 | 1. user create broker load
                 v
            +----+----+
            |         |
            |   FE    |
            |         |
            +----+----+
                 |
                 | 2. BE etl and load the data
    +--------------------------+
    |            |             |
+---v---+     +--v----+    +---v---+
|       |     |       |    |       |
|  BE   |     |  BE   |    |   BE  |
|       |     |       |    |       |
+---+-^-+     +---+-^-+    +--+-^--+
    | |           | |         | |
    | |           | |         | | 3. pull data from broker
+---v-+-+     +---v-+-+    +--v-+--+
|       |     |       |    |       |
|Broker |     |Broker |    |Broker |
|       |     |       |    |       |
+---+-^-+     +---+-^-+    +---+-^-+
    | |           | |          | |
+---v-+-----------v-+----------v-+-+
|       HDFS/BOS/AFS cluster       |
|                                  |
+----------------------------------+

load hive table, use ORC

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
LOAD LABEL dish_2022_03_23
(
    DATA INFILE("hdfs://10.220.147.151:8020/user/hive/warehouse/ods.db/ods_demo_orc_detail/*/*")
    INTO TABLE doris_ods_test_detail
    COLUMNS TERMINATED BY ","
    FORMAT AS "orc"
(id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
    COLUMNS FROM PATH AS (`day`)
   SET
   (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,
   commodity_name=commodity_name,commodity_price=commodity_price,member_price =member_price,cost_price=cost_price,unit=unit,
   quantity=quantity,actual_price=actual_price)
)
WITH BROKER "broker_name_1"
(
"username" = "hdfs",
"password" = ""
)
PROPERTIES
(
    "timeout"="1200",
    "max_filter_ratio"="0.1"
);

检查

1
2
show load order by createtime desc limit 1\G;
CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

fe.conf

  • min_bytes_per_broker_scanner
  • max_bytes_per_broker_scanner
  • max_broker_concurrency

Routine Load

FE 会将 JobScheduler 分解

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
         +---------+
         |  Client |
         +----+----+
              |
+-----------------------------+
| FE          |               |
| +-----------v------------+  |
| |                        |  |
| |   Routine Load Job     |  |
| |                        |  |
| +---+--------+--------+--+  |
|     |        |        |     |
| +---v--+ +---v--+ +---v--+  |
| | task | | task | | task |  |
| +--+---+ +---+--+ +---+--+  |
|    |         |        |     |
+-----------------------------+
     |         |        |
     v         v        v
 +---+--+   +--+---+   ++-----+
 |  BE  |   |  BE  |   |  BE  |
 +------+   +------+   +------+

kafka 导入

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
WHERE k1 > 100 and k2 like "%doris%"
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
);

支持 SSL

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");

CREATE ROUTINE LOAD db1.job1 on tbl1
PROPERTIES
(
"desired_concurrent_number"="1"
)
FROM KAFKA
(
"kafka_broker_list"= "broker1:9091,broker2:9091",
"kafka_topic" = "my_topic",
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg"
);

支持 kerberos

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CREATE ROUTINE LOAD db1.job1 on tbl1
PROPERTIES (
"desired_concurrent_number"="1",
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092",
"kafka_topic" = "my_topic",
"property.security.protocol" = "SASL_PLAINTEXT",
"property.sasl.kerberos.service.name" = "kafka",
"property.sasl.kerberos.keytab" = "/etc/krb5.keytab",
"property.sasl.kerberos.principal" = "[email protected]"
);

一些配置

  • max_routine_load_task_concurrent_num
  • max_routine_load_task_num_per_be
  • max_routine_load_job_num
  • max_consumer_num_per_group
  • max_tolerable_backend_down_num
  • period_of_auto_resume_min

Spark Load

用额外的集群来实现排序,聚会写入 stream load, brokder load 更适合小量数据,这两个会影响到 doris集群资源,而spark load 使用的是spark集群资源 spark load 更适合大数据量导入 执行过程

  • Fe schedules and submits ETL tasks to spark cluster for execution.
  • Spark cluster executes ETL to complete the preprocessing of load data. It includes global dictionary building (bitmap type), partitioning, sorting, aggregation, etc.
  • After the ETL task is completed, Fe obtains the data path of each partition that has been preprocessed, and schedules the related be to execute the push task.
  • Be reads data through broker and converts it into Doris underlying storage format.
  • Fe schedule the effective version and complete the load job.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
                 +
                 | 0. User create spark load job
            +----v----+
            |   FE    |---------------------------------+
            +----+----+                                 |
                 | 3. FE send push tasks                |
                 | 5. FE publish version                |
    +------------+------------+                         |
    |            |            |                         |
+---v---+    +---v---+    +---v---+                     |
|  BE   |    |  BE   |    |  BE   |                     |1. FE submit Spark ETL job
+---^---+    +---^---+    +---^---+                     |
    |4. BE push with broker   |                         |
+---+---+    +---+---+    +---+---+                     |
|Broker |    |Broker |    |Broker |                     |
+---^---+    +---^---+    +---^---+                     |
    |            |            |                         |
+---+------------+------------+---+ 2.ETL +-------------v---------------+
|               HDFS              +------->       Spark cluster         |
|                                 <-------+                             |
+---------------------------------+       +-----------------------------+

创建外部资源

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
-- yarn cluster mode
CREATE EXTERNAL RESOURCE "spark0"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.jars" = "xxx.jar,yyy.jar",
"spark.files" = "/tmp/aaa,/tmp/bbb",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "queue0",
"spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
"broker" = "broker0",
"broker.username" = "user0",
"broker.password" = "password0"
);

-- spark standalone client mode
CREATE EXTERNAL RESOURCE "spark1"
PROPERTIES
(
"type" = "spark",
"spark.master" = "spark://127.0.0.1:7777",
"spark.submit.deployMode" = "client",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
"broker" = "broker1"
);

-- yarn HA mode
CREATE EXTERNAL RESOURCE sparkHA
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "default",
"spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
"spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
"spark.hadoop.yarn.resourcemanager.address.rm1" = "xxxx:8032",
"spark.hadoop.yarn.resourcemanager.address.rm2" = "xxxx:8032",
"spark.hadoop.fs.defaultFS" = "hdfs://nameservices01",
"spark.hadoop.dfs.nameservices" = "nameservices01",
"spark.hadoop.dfs.ha.namenodes.nameservices01" = "mynamenode1,mynamenode2",
"spark.hadoop.dfs.namenode.rpc-address.nameservices01.mynamenode1" = "xxxx:8020",
"spark.hadoop.dfs.namenode.rpc-address.nameservices01.mynamenode2" = "xxxx:8020",
"spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"working_dir" = "hdfs://nameservices01/doris_prd_data/sinan/spark_load/",
"broker" = "broker_name",
"broker.username" = "username",
"broker.password" = "",
"broker.dfs.nameservices" = "nameservices01",
"broker.dfs.ha.namenodes.nameservices01" = "mynamenode1, mynamenode2",
"broker.dfs.namenode.rpc-address.nameservices01.mynamenode1" = "xxxx:8020",
"broker.dfs.namenode.rpc-address.nameservices01.mynamenode2" = "xxxx:8020",
"broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);

其他相关命令

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
-- drop spark resource
DROP RESOURCE resource_name

-- show resources
SHOW RESOURCES
SHOW PROC "/resources"

-- privileges
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identity
GRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name

REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identity
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name

支持 kerberos

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
CREATE EXTERNAL RESOURCE "spark_on_kerberos"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.jars" = "xxx.jar,yyy.jar",
"spark.files" = "/tmp/aaa,/tmp/bbb",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "queue0",
"spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
"spark.hadoop.hadoop.security.authentication" = "kerberos",
"spark.hadoop.yarn.resourcemanager.principal" = "[email protected]",
"spark.hadoop.yarn.resourcemanager.keytab" = "/home/doris/yarn.keytab",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
"broker" = "broker0",
"broker.hadoop.security.authentication" = "kerberos",
"broker.kerberos_principal" = "[email protected]",
"broker.kerberos_keytab" = "/home/doris/my.keytab"
);

Stream Load

处理过程,fe 会将 http请求转发给 be,客户端也可以直接访问 be

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
                         ^      +
                         |      |
                         |      | 1A. User submit load to FE
                         |      |
                         |   +--v-----------+
                         |   | FE           |
5. Return result to user |   +--+-----------+
                         |      |
                         |      | 2. Redirect to BE
                         |      |
                         |   +--v-----------+
                         +---+Coordinator BE| 1B. User submit load to BE
                             +-+-----+----+-+
                               |     |    |
                         +-----+     |    +-----+
                         |           |          | 3. Distrbute data
                         |           |          |
                       +-v-+       +-v-+      +-v-+
                       |BE |       |BE |      |BE |
                       +---+       +---+      +---+

一个例子

1
curl --location-trusted -u root -T date -H "label:123" http://abc.com:8030/api/test/date/_stream_load

导入时的一些重要参数,这些参数放入到 http header 中

  • label,唯一标识用的,防止重复导入
  • column_separator
  • line_delimiter
  • max_filter_ratio
  • where
  • partitions
  • columns
  • format,包括:csv, json
  • exec_mem_limit,默认为 2G
  • merge_type。类型:APPEND, DELETE, and MERGE. APPEND is the default
  • two_phase_commit
  • enclose
  • escape
  • enable_profile
  • memtable_on_sink_node
  • partial_columns

一个例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
curl --location-trusted -u user:passwd -H "two_phase_commit:true" -T test.txt http://fe_host:http_port/api/{db}/{table}/_stream_load
{"TxnId": 18036,
"Label": "55c8ffc9-1c40-4d51-b75e-f2265b3602ef",
"TwoPhaseCommit": "true",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 100,
"NumberLoadedRows": 100,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 1031,
"LoadTimeMs": 77,
"BeginTxnTimeMs": 1,
"StreamLoadPutTimeMs": 1,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 58,
"CommitAndPublishTimeMs": 0
}```

导入语句中可以加入 SQL,例子
```shell
curl --location-trusted -u root: -T test.csv 
-H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream("format" = "CSV", 
"column_separator" = "," ) where age >= 30" http://127.0.0.1:28030/api/_http_stream

MySql Load

使用原生的 MySQL语法 执行步骤

  • FE receives the MySQL Load request executed by the client and then analyse the SQL
  • FE build the MySql Load request as a StreamLoad request.
  • FE selects a BE node to send a StreamLoad request
  • When sending the request, FE will read the local file data from the MySQL client side streamingly, and send it to the HTTP request of StreamLoad asynchronously.
  • After the data transfer on the MySQL client side is completed, FE waits for the StreamLoad to complete, and displays the import success or failure information to the client side.

例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
-- 建表
CREATE TABLE testdb.t1 (pk INT, v1 INT SUM) AGGREGATE KEY (pk) DISTRIBUTED BY hash (pk) PROPERTIES ('replication_num' = '1');

-- import file from client node
LOAD DATA LOCAL
INFILE 'client_local.csv '
INTO TABLE testdb.t1
PARTITION (partition_a, partition_b, partition_c, partition_d)
COLUMNS TERMINATED BY '\ t'
LINES TERMINATED BY '\ n'
IGNORE 1 LINES
(K1, k2, v2, v10, v11)
SET (c1 = k1, c2 = k2, c3 = v10, c4 = v11)
PROPERTIES ("strict_mode" = "true")

-- import file from fe server node
LOAD DATA
INFILE '/root/server_local.csv'
INTO TABLE testdb.t1
PARTITION (partition_a, partition_b, partition_c, partition_d)
COLUMNS TERMINATED BY '\ t'
LINES TERMINATED BY '\ n'
IGNORE 1 LINES
(K1, k2, v2, v10, v11)
SET (c1 = k1, c2 = k2, c3 = v10, c4 = v11)
PROPERTIES ("strict_mode" = "true")

相关配置

  • mysql_load_thread_pool
  • mysql_load_server_secure_path
  • mysql_load_in_memory_record

S3 Load

例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
LOAD LABEL example_db.exmpale_label_1
(
DATA INFILE("s3://your_bucket_name/your_file.txt")
INTO TABLE load_test
COLUMNS TERMINATED BY ","
)
WITH S3
(
"AWS_ENDPOINT" = "AWS_ENDPOINT",
"AWS_ACCESS_KEY" = "AWS_ACCESS_KEY",
"AWS_SECRET_KEY"="AWS_SECRET_KEY",
"AWS_REGION" = "AWS_REGION"
)
PROPERTIES
(
"timeout" = "3600"
);

Insert Into

两种形式

  • INSERT INTO tbl SELECT …
  • INSERT INTO tbl (col1, col2, …) VALUES (1, 2, …), (1,3, …);

例子

1
2
INSERT INTO tbl2 WITH LABEL label1 SELECT * FROM tbl3;
INSERT INTO tbl1 VALUES ("qweasdzxcqweasdzxc"), ("a");

insert select

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
INSERT INTO tbl1 WITH LABEL label1
WITH cte1 AS (SELECT * FROM tbl1), cte2 AS (SELECT * FROM tbl2)
SELECT k1 FROM cte1 JOIN cte2 WHERE cte1.k1 = 1;


INSERT INTO tbl1 (k1)
WITH cte1 AS (SELECT * FROM tbl1), cte2 AS (SELECT * FROM tbl2)
SELECT k1 FROM cte1 JOIN cte2 WHERE cte1.k1 = 1;

INSERT INTO tbl1 (k1)
select * from (
WITH cte1 AS (SELECT * FROM tbl1), cte2 AS (SELECT * FROM tbl2)
SELECT k1 FROM cte1 JOIN cte2 WHERE cte1.k1 = 1) as ret

查询

1
2
3
4
5
6
7
8
9
show last insert\G
*************************** 1. row ***************************
TransactionId: 64067
Label: insert_ba8f33aea9544866-8ed77e2844d0cc9b
Database: default_cluster:db1
Table: t1
TransactionStatus: VISIBLE
LoadedRows: 2
FilteredRows: 0

Importing Data in JSON Format

支持 json path
其他没什么特别的

Import Advanced

默认是要 半数以上副本写入成功,才算 import 成功 设置 min_load_replica_num 属性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
CREATE TABLE test_table1
(
k1 INT,
k2 INT
)
DUPLICATE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 5
PROPERTIES
(
'replication_num' = '2',
'min_load_replica_num' = '1'
);

修改已经存在的表

1
2
ALTER TABLE test_table1
SET ( 'min_load_replica_num' = '1');

也可以设置 fe 级别的 min_load_replica_num

Export

执行过程

  • The user submits an Export job to FE.
  • FE calculates all the tablets to be exported and groups them based on the parallelism parameter. Each group generates multiple SELECT INTO OUTFILE query plans based on the maximum_number_of_export_partitions parameter.
  • Based on the parallelism parameter, an equal number of ExportTaskExecutor are generated, and each ExportTaskExecutor is responsible for a thread, which is scheduled and executed by FE’s Job scheduler framework.
  • FE’s Job scheduler schedules and executes the ExportTaskExecutor, and each ExportTaskExecutor serially executes the multiple SELECT INTO OUTFILE query plans it is responsible for.

导出到 HDFS

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
EXPORT TABLE db1.tbl1 
PARTITION (p1,p2)
[WHERE [expr]]
TO "hdfs://host/path/to/export/" 
PROPERTIES
(
"label" = "mylabel",
"column_separator"=",",
"columns" = "col1,col2",
"parallelusm" = "3"
)
WITH BROKER "hdfs"
(
"username" = "user",
"password" = "passwd"
);

导出到 S3

1
2
3
4
5
6
7
EXPORT TABLE test TO "s3://bucket/path/to/export/dir/"
WITH S3 (
"s3.endpoint" = "http://host",
"s3.access_key" = "AK",
"s3.secret_key"="SK",
"s3.region" = "region"
);

检查状态

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
show EXPORT\G;
*************************** 1. row ***************************
JobId: 14008
State: FINISHED
Progress: 100%
TaskInfo: {"partitions":[],"max_file_size":"","delete_existing_files":"","columns":"","format":"csv","column_separator":"\t","line_delimiter":"\n","db":"default_cluster:demo","tbl":"student4","tablet_num":30}
Path: hdfs://host/path/to/export/
CreateTime: 2019-06-25 17:08:24
StartTime: 2019-06-25 17:08:28
FinishTime: 2019-06-25 17:08:34
Timeout: 3600
ErrorMsg: NULL
OutfileInfo: [
[
{"fileNumber": "1",
"totalRows": "4",
"fileSize": "34bytes",
"url": "file:///127.0.0.1/Users/fangtiewei/tmp_data/export/f1ab7dcc31744152-bbb4cda2f5c88eac_"
}]
]

取消

1
2
3
CANCEL EXPORT
FROM example_db
WHERE LABEL like "%example%";

相关配置和建议

  • Concurrent Export
  • exec_mem_limit
  • The Export job can export data from Doris Base tables, View, and External tables, but not from Rollup Index.
  • 一次不要导出太大数据,可以导出分区

SELECT INTO OUTFILE 例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
SELECT * FROM tbl
INTO OUTFILE "hdfs://path/to/result_"
FORMAT AS CSV
PROPERTIES
(
"broker.name" = "my_broker",
"column_separator" = ",",
"line_delimiter" = "\n"
);

select * from tbl1 limit 10 
INTO OUTFILE "file:///home/work/path/result_";

支持并发导出 普通 导出,并发导出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
select * from tbl1 limit 10 into outfile "file:///home/work/path/result_";
+------------+-----------+----------+--------------------------------------------------------------------+
| FileNumber | TotalRows | FileSize | URL |
+------------+-----------+----------+--------------------------------------------------------------------+
| 1 | 2 | 8 | file:///192.168.1.10/home/work/path/result_{fragment_instance_id}_ |
+------------+-----------+----------+--------------------------------------------------------------------+

+------------+-----------+----------+--------------------------------------------------------------------+
| FileNumber | TotalRows | FileSize | URL |
+------------+-----------+----------+--------------------------------------------------------------------+
| 1 | 3 | 7 | file:///192.168.1.10/home/work/path/result_{fragment_instance_id}_ |
| 1 | 2 | 4 | file:///192.168.1.11/home/work/path/result_{fragment_instance_id}_ |
+------------+-----------+----------+--------------------------------------------------------------------+

导出表结构

1
mysqldump -h127.0.0.1 -P9030 -uroot --no-tablespaces --databases test --tables table1 --no-data

Update and Delete

导入数据的方式有三种

  • APPEND
  • DELETE
  • MERGE

具体实现是,增加一个__DORIS_DELETE_SIGN__ 隐藏列

  • remove,设置 DORIS_DELETE_SIGN != true
  • import,按照 delete on 的条件来设置这一列
  • read,增加一个隐藏表达式,DORIS_DELETE_SIGN != true
  • compact,会将标记为 delete 的数据删除

例子 stream load

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
LOAD LABEL db1.label1
(
[MERGE|APPEND|DELETE] DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
INTO TABLE tbl1
COLUMNS TERMINATED BY ","
(tmp_c1,tmp_c2, label_c3)
SET
(
id=tmp_c2,
name=tmp_c1,
)
[DELETE ON label_c3=true]
)
WITH BROKER'broker'
(
"username"="user",
"password"="pass"
)
PROPERTIES
(
"timeout" = "3600"
);

routine load

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
[WITH MERGE|APPEND|DELETE]
COLUMNS(k1, k2, k3, v1, v2, label),
WHERE k1> 100 and k2 like "%doris%"
[DELETE ON label=true]
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
);

显示隐藏列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
SET show_hidden_columns=true

desc example_tbl_unique;
+-----------------------+--------------+------+-------+---------+---------+
| Field | Type | Null | Key | Default | Extra |
+-----------------------+--------------+------+-------+---------+---------+
| user_id | LARGEINT | No | true | NULL | |
| username | VARCHAR(50) | No | true | NULL | |
| city | VARCHAR(20) | Yes | false | NULL | REPLACE |
| age | SMALLINT | Yes | false | NULL | REPLACE |
| sex | TINYINT | Yes | false | NULL | REPLACE |
| phone | LARGEINT | Yes | false | NULL | REPLACE |
| address | VARCHAR(500) | Yes | false | NULL | REPLACE |
| register_time | DATETIME | Yes | false | NULL | REPLACE |
| __DORIS_DELETE_SIGN__ | TINYINT | No | false | 0 | REPLACE |
| __DORIS_VERSION_COL__ | BIGINT | No | false | 0 | REPLACE |
+-----------------------+--------------+------+-------+---------+---------+

doris 在并发更新的时候,没有做特殊处理,毕竟是 OLAP 系统 更新是基于 row 更新的,即使一行中的其他列没有变也会被更新 两个并发操作同时更新一个 row,可能会导致脏数据 更新的操作步骤

  • Step 1: Read the rows that satisfy WHERE order id=1 (1, 100, ‘pending payment’)
  • Step 2: Change the order status of the row from ‘Pending Payment’ to ‘Pending Shipping’ (1, 100, ‘Pending shipment’)
  • Step 3: Insert the updated row back into the table to achieve the updated effect.

为提高性能,支持 部分列 更新
The Unique Key model currently supports column updates only in the Merge-on-Write implementation
部分列更新机制:使用 MVCC 实现

Usage Recommendations

  • high write performance,low query performance:Aggregate Key Model.
  • high query performance,lower write performance:Unique Key Merge-on-Write
  • 读多写少场景,如果用反了,使用 Aggregate Key Model,有 5 - 10 倍性能差别

开启

1
set enable_unique_key_partial_update=true

delete 操作

1
2
3
4
mysql> delete from test_tbl PARTITION p1 where k1 = 1;
Query OK, 0 rows affected (0.04 sec)
{'label':'delete_e7830c72-eb14-4cb9-bbb6-eebd4511d251', 'status':'COMMITTED', 
'txnId':'4005', 'err':'delete job is committed but may be taking effect later' }

参数

  • tablet_delete_timeout_second
  • load_straggler_wait_second
  • query_timeout
  • max_allowed_in_element_num_of_delete

查询

1
2
3
4
5
6
7
show delete from test_db;
+-----------+---------------+---------------------+-----------------+----------+
| TableName | PartitionName | CreateTime          | DeleteCondition | State    |
+-----------+---------------+---------------------+-----------------+----------+
| empty_tbl | p3            | 2020-04-15 23:09:35 | k1 EQ "1"       | FINISHED |
| test_tbl  | p4            | 2020-04-15 23:09:53 | k1 GT "80"      | FINISHED |
+-----------+---------------+---------------------+-----------------+----------+

Sequence Column

  • 为了解决导入时的乱序问题
  • 增加一个 DORIS_SEQUENCE_COL 隐藏列
  • 比如将这个隐藏列 设置为指向表的 date类型,根据时间排序
  • import的时候,最大的时间会覆盖较小的
  • read,根据时间排序会覆盖之前的

例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
CREATE TABLE test.test_table
(
    user_id bigint,
    date date,
    group_id bigint,
    modify_date date,
    keyword VARCHAR(128)
)
UNIQUE KEY(user_id, date, group_id)
DISTRIBUTED BY HASH (user_id) BUCKETS 32
PROPERTIES(
    "function_column.sequence_col" = 'modify_date',
    "replication_num" = "1",
    "in_memory" = "false"
);

查看隐藏列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
SET show_hidden_columns=true;
desc test_table;
+------------------------+--------------+------+-------+---------+---------+
| Field                  | Type         | Null | Key   | Default | Extra   |
+------------------------+--------------+------+-------+---------+---------+
| user_id                | BIGINT       | Yes  | true  | NULL    |         |
| date                   | DATE         | Yes  | true  | NULL    |         |
| group_id               | BIGINT       | Yes  | true  | NULL    |         |
| modify_date            | DATE         | Yes  | false | NULL    | REPLACE |
| keyword                | VARCHAR(128) | Yes  | false | NULL    | REPLACE |
| __DORIS_DELETE_SIGN__  | TINYINT      | No   | false | 0       | REPLACE |
| __DORIS_VERSION_COL__  | BIGINT       | No   | false | 0       | REPLACE |
| __DORIS_SEQUENCE_COL__ | DATE         | Yes  | false | NULL    | REPLACE |
+------------------------+--------------+------+-------+---------+---------+

导入数据后查看

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
1       2020-02-22      1       2020-02-21      a
1       2020-02-22      1       2020-02-22      b
1       2020-02-22      1       2020-03-05      c
1       2020-02-22      1       2020-02-26      d
1       2020-02-22      1       2020-02-23      e
1       2020-02-22      1       2020-02-24      b

 select * from test_table;
+---------+------------+----------+-------------+---------+
| user_id | date       | group_id | modify_date | keyword |
+---------+------------+----------+-------------+---------+
|       1 | 2020-02-22 |        1 | 2020-03-05  | c       |
+---------+------------+----------+-------------+---------+

虽然c只是在中间,但是其 modify_date 值是最大的,所以最后被保留了

Advanced Usage

Alter Table

修改 schema 包括:

  • Add and delete columns
  • Modify column type
  • Adjust column order
  • Add and modify Bloom Filter
  • Add and delete bitmap index

基本概念

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
+----------+
| Load Job |
+----+-----+
     |
     | Load job generates both origin and new index data
     |
     |      +------------------+ +---------------+
     |      | Origin Index     | | Origin Index  |
     +------> New Incoming Data| | History Data  |
     |      +------------------+ +------+--------+
     |                                  |
     |                                  | Convert history data
     |                                  |
     |      +------------------+ +------v--------+
     |      | New Index        | | New Index     |
     +------> New Incoming Data| | History Data  |
            +------------------+ +---------------+

一个例子,原表

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
+-----------+-------+------+------+------+---------+-------+
| IndexName | Field | Type | Null | Key  | Default | Extra |
+-----------+-------+------+------+------+---------+-------+
| tbl1      | k1    | INT  | No   | true | N/A     |       |
|           | k2    | INT  | No   | true | N/A     |       |
|           | k3    | INT  | No   | true | N/A     |       |
|           |       |      |      |      |         |       |
| rollup2   | k2    | INT  | No   | true | N/A     |       |
|           |       |      |      |      |         |       |
| rollup1   | k1    | INT  | No   | true | N/A     |       |
|           | k2    | INT  | No   | true | N/A     |       |
+-----------+-------+------+------+------+---------+-------+

插入数据

1
2
3
4
ALTER TABLE tbl1
ADD COLUMN k4 INT default "1" to rollup1,
ADD COLUMN k4 INT default "1" to rollup2,
ADD COLUMN k5 INT default "1" to rollup2;

结果

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
+-----------+-------+------+------+------+---------+-------+
| IndexName | Field | Type | Null | Key  | Default | Extra |
+-----------+-------+------+------+------+---------+-------+
| tbl1      | k1    | INT  | No   | true | N/A     |       |
|           | k2    | INT  | No   | true | N/A     |       |
|           | k3    | INT  | No   | true | N/A     |       |
|           | k4    | INT  | No   | true | 1       |       |
|           | k5    | INT  | No   | true | 1       |       |
|           |       |      |      |      |         |       |
| rollup2   | k2    | INT  | No   | true | N/A     |       |
|           | k4    | INT  | No   | true | 1       |       |
|           | k5    | INT  | No   | true | 1       |       |
|           |       |      |      |      |         |       |
| rollup1   | k1    | INT  | No   | true | N/A     |       |
|           | k2    | INT  | No   | true | N/A     |       |
|           | k4    | INT  | No   | true | 1       |       |
+-----------+-------+------+------+------+---------+-------+

注意事项

  • 一次只能有一个变更
  • 不能阻塞 import、query
  • The partition column and bucket column cannot be modified

replace操作

1
2
ALTER TABLE [db.]tbl1 REPLACE WITH tbl2
[PROPERTIES('swap' = 'true')];

解释

  • 如果 swap = true,则 Rename table B to table A, Rename table A to table B.
  • 如果是 false,则 Drop table A. Rename table B to table A.

Doris Partition

动态分区,根据指定的规则动态向前创建一些分区
以及根据规则是否保留、删除历史分区

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
CREATE TABLE tbl1
(
    k1 DATE,
    ...
)
PARTITION BY RANGE(k1) ()
DISTRIBUTED BY HASH(k1)
PROPERTIES
(
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "DAY",
    "dynamic_partition.start" = "-7",
    "dynamic_partition.end" = "3",
    "dynamic_partition.prefix" = "p",
    "dynamic_partition.buckets" = "32"
);

解释

  • time_unit,根据天创建分区
  • start,往后保留 7天,默认不设置就是全部保留
  • end,往前创建新的 3 天的分区

查看

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
SHOW DYNAMIC PARTITION TABLES;
+-----------+--------+----------+-------------+------+--------+---------+-----------+----------------+---------------------+--------+------------------------+----------------------+-------------------------+
| TableName | Enable | TimeUnit | Start       | End  | Prefix | Buckets | StartOf   | LastUpdateTime | LastSchedulerTime   | State  | LastCreatePartitionMsg | LastDropPartitionMsg | ReservedHistoryPeriods  |
+-----------+--------+----------+-------------+------+--------+---------+-----------+----------------+---------------------+--------+------------------------+----------------------+-------------------------+
| d3        | true   | WEEK     | -3          | 3    | p      | 1       | MONDAY    | N/A            | 2020-05-25 14:29:24 | NORMAL | N/A                    | N/A                  | [2021-12-01,2021-12-31] |
| d5        | true   | DAY      | -7          | 3    | p      | 32      | N/A       | N/A            | 2020-05-25 14:29:24 | NORMAL | N/A                    | N/A                  | NULL                    |
| d4        | true   | WEEK     | -3          | 3    | p      | 1       | WEDNESDAY | N/A            | 2020-05-25 14:29:24 | NORMAL | N/A                    | N/A                  | NULL                    |
| d6        | true   | MONTH    | -2147483648 | 2    | p      | 8       | 3rd       | N/A            | 2020-05-25 14:29:24 | NORMAL | N/A                    | N/A                  | NULL                    |
| d2        | true   | DAY      | -3          | 3    | p      | 32      | N/A       | N/A            | 2020-05-25 14:29:24 | NORMAL | N/A                    | N/A                  | NULL                    |
| d7        | true   | MONTH    | -2147483648 | 5    | p      | 8       | 24th      | N/A            | 2020-05-25 14:29:24 | NORMAL | N/A                    | N/A                  | NULL                    |
+-----------+--------+----------+-------------+------+--------+---------+-----------+----------------+---------------------+--------+------------------------+----------------------+-------------------------+

临时分区

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
ALTER TABLE tbl1 ADD TEMPORARY PARTITION tp1 VALUES LESS THAN ("2020-02-01");

ALTER TABLE tbl2 ADD TEMPORARY PARTITION tp1 VALUES [("2020-01-01"), ("2020-02-01"));

ALTER TABLE tbl1 ADD TEMPORARY PARTITION tp1 VALUES LESS THAN ("2020-02-01")
("replication_num" = "1")
DISTRIBUTED BY HASH (k1) BUCKETS 5;

ALTER TABLE tbl3 ADD TEMPORARY PARTITION tp1 VALUES IN ("Beijing", "Shanghai");

ALTER TABLE tbl4 ADD TEMPORARY PARTITION tp1 VALUES IN ((1, "Beijing"), (1, "Shanghai"));

ALTER TABLE tbl3 ADD TEMPORARY PARTITION tp1 VALUES IN ("Beijing", "Shanghai")
("replication_num" = "1")
DISTRIBUTED BY HASH(k1) BUCKETS 5;

Data Cache

分区缓存,开启

1
2
vim fe/conf/fe.conf
cache_enable_sql_mode=true
1
MySQL [(none)]> set [global] enable_sql_cache=true;

Enable Partition Cache

1
2
vim fe/conf/fe.conf
cache_enable_partition_mode=true
1
MySQL [(none)]> set [global] enable_partition_cache=true;

AutoBucket

例子

1
2
3
4
5
6
-- old version of the creation syntax for specifying the number of buckets
DISTRIBUTED BY HASH(site) BUCKETS 20

-- Newer versions use the creation syntax for automatic bucket imputation
DISTRIBUTED BY HASH(site) BUCKETS AUTO
properties("estimate_partition_size" = "100G")

Broker

只需要少量的内存,不参与计算,主要是 RPC 读写
支持的系统

  • Apache HDFS
  • Aliyun OSS
  • Baidu Cloud BOS
  • Tencent Cloud CHDFS
  • Tencent Cloud GFS (since 1.2.0)
  • Huawei Cloud OBS (since 1.2.0)
  • Amazon S3
  • JuiceFS (since 2.0.0)
  • GCS (since 2.0.0)

整体设计

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
+----+   +----+
| FE |   | BE |
+-^--+   +--^-+
  |         |
  |         |
+-v---------v-+
|   Broker    |
+------^------+
       |
       |
+------v------+
|HDFS/BOS/AFS |
+-------------+

Resource Management

用来连接外部的计算资源、存储资源、ETL 等
语法

  • CREATE RESOURCE
  • DROP RESOURCE
  • SHOW RESOURCES

支持的资源

  • Spark resource: do ETL work
  • ODBC resource: query and import data from external tables

spark的例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
CREATE EXTERNAL RESOURCE "spark0"
PROPERTIES
(
  "type" = "spark",
  "spark.master" = "yarn",
  "spark.submit.deployMode" = "cluster",
  "spark.jars" = "xxx.jar,yyy.jar",
  "spark.files" = "/tmp/aaa,/tmp/bbb",
  "spark.executor.memory" = "1g",
  "spark.yarn.queue" = "queue0",
  "spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
  "spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
  "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
  "broker" = "broker0",
  "broker.username" = "user0",
  "broker.password" = "password0"
);

ODBC 的例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE EXTERNAL RESOURCE `oracle_odbc`
PROPERTIES (
"type" = "odbc_catalog",
"host" = "192.168.0.1",
"port" = "8086",
"user" = "test",
"password" = "test",
"database" = "test",
"odbc_type" = "oracle",
"driver" = "Oracle 19 ODBC driver"
);

Orthogonal BITMAP Calculation

当数据量很大,超过 1 亿行,10G 的时候,bitmap计算也很慢
将数据分为多个 bucket,然后分别计算
这里需要做两次处理

  • 按照 id 范围 1 - 100的,100 - 200 等等分为一组
  • 然后分别做聚合,聚合了之后上层算子,再做 merge 处理
  • 这样总体计算时间就少了很多

相关函数

  • orthogonal_bitmap_intersect
  • orthogonal_bitmap_intersect_count
  • orthogonal_bitmap_union_count
  • orthogonal_bitmap_expr_calculate
  • orthogonal_bitmap_expr_calculate_count

Approximate Deduplication Using HLL

这里使用的是 HyperLogLog 近似概率算法
相关函数

  • HLL_UNION_AGG(hll)
  • HLL_CARDINALITY
  • HLL_HASH

例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
create table test_hll(
    dt date,
    id int,
    name char(10),
    province char(10),
    os char(10),
    pv hll hll_union
)
Aggregate KEY (dt,id,name,province,os)
distributed by hash(id) buckets 10
PROPERTIES(
    "replication_num" = "1",
    "in_memory"="false"
);

测试数据

1
2
3
4
5
6
7
8
2022-05-05,10001,测试01,北京,windows
2022-05-05,10002,测试01,北京,linux
2022-05-05,10003,测试01,北京,macos
2022-05-05,10004,测试01,河北,windows
2022-05-06,10001,测试01,上海,windows
2022-05-06,10002,测试01,上海,linux
2022-05-06,10003,测试01,江苏,macos
2022-05-06,10004,测试01,陕西,windows

加载

1
2
3
4
curl --location-trusted -u root: -H "label:label_test_hll_load" \
    -H "column_separator:," \
    -H "columns:dt,id,name,province,os, pv=hll_hash(id)" -T test_hll.csv  \
	http://FE_HOST:8030/api/test/test_hll/_stream_load

查询

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
select HLL_UNION_AGG(pv) from test_hll;
+---------------------+
| hll_union_agg(`pv`) |
+---------------------+
|                   4 |
+---------------------+

SELECT COUNT(DISTINCT pv) FROM test_hll;
+----------------------+
| count(DISTINCT `pv`) |
+----------------------+
|                    4 |
+----------------------+

select HLL_UNION_AGG(pv) from test_hll group by dt;
+-------------------+
| hll_union_agg(pv) |
+-------------------+
|                 4 |
|                 4 |
+-------------------+

Variable

使用

1
2
SHOW VARIABLES;
SHOW VARIABLES LIKE '%time_zone%';

设置session 级别的

1
2
3
SET exec_mem_limit = 137438953472;
SET forward_to_master = true;
SET time_zone = "Asia/Shanghai";

全局级别的

1
SET GLOBAL exec_mem_limit = 137438953472

sql modes,比如 || 有的数据库是连接,有的是 or

1
2
3
4
5
select @@global.sql_mode
select @@session.sql_mode

show global variables
show session variables

根据原表结构,创建目标表,有些属性,函数不需要一致

  • 属性如:storage_policy
  • colocate_with
  • 函数如:auto bucket
  • dynamic partition

File Manager

将一些 ca 文件 download 下来,保存到本地
使用了 BDBJE, Oracle Berkeley DB Java Edition. Distributed embedded database for persistent metadata in FE.
例子

1
2
3
4
5
6
    CREATE FILE "test_hll.csv"
    PROPERTIES
    (
        "url" = "http://192.168.12.71:9999/test_hll.csv",
        "catalog" = "internal"
    );

查询

1
2
3
4
5
6
SHOW FILE FROM demo;
+-------+----------------------+----------+--------------+----------+-----------+----------------------------------+
| Id    | DbName               | Catalog  | FileName     | FileSize | IsContent | MD5                              |
+-------+----------------------+----------+--------------+----------+-----------+----------------------------------+
| 12097 | default_cluster:demo | internal | test_hll.csv | 320      | true      | 0b197c5ac2c2ad64a8640b3fb8d7d604 |
+-------+----------------------+----------+--------------+----------+-----------+----------------------------------+

会将文件保存在内存,同时也会持久化存储
相关配置

  • Small_file_dir
  • max_small_file_size_bytes
  • max_small_file_number
  • be config: Small_file_dir

Cold Hot Separation

冷数据可以存储在 S3、或者 HDFS

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE RESOURCE "remote_hdfs" PROPERTIES (
        "type"="hdfs",
        "fs.defaultFS"="fs_host:default_fs_port",
        "hadoop.username"="hive",
        "hadoop.password"="hive",
        "dfs.nameservices" = "my_ha",
        "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
        "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
        "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_prot",
        "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    );

创建存储策略

1
2
3
4
    CREATE STORAGE POLICY test_policy PROPERTIES (
        "storage_resource" = "remote_hdfs",
        "cooldown_ttl" = "300"
    )

使用存储策略

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    CREATE TABLE IF NOT EXISTS create_table_use_created_policy (
    k1 BIGINT,
    k2 LARGEINT,
    v1 VARCHAR(2048)
    )
    UNIQUE KEY(k1)
    DISTRIBUTED BY HASH (k1) BUCKETS 3
    PROPERTIES(
    "storage_policy" = "test_policy"
    );

支持的 schema change

  • Add and delete columns
  • Modify column type
  • Adjust column order
  • Add and modify Bloom Filter
  • Add and delete bitmap index

Compute Node

doris 默认的架构是Share-Nothing,计算和存储绑定了
doris 也可以查询数据湖中的数据,但是 存储就有点浪费了
可以配置成 计算模式,这样就变成了 Share-Disk架构了

BE 节点支持两种模式

  • hybrid node,支持计算和存储能力,默认
  • computing node,只有计算能力

如果是跨 catalog 查询,默认会交给 computing node 计算
修改 be.conf

1
be_node_role = computation 

查询状态

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
show backends\G

*************************** 1. row ***************************
              BackendId: 10010
                Cluster: default_cluster
                     IP: 10.248.181.219
          HeartbeatPort: 9050
                 BePort: 9060
               HttpPort: 8040
               BrpcPort: 8060
          LastStartTime: 2022-11-30 23:01:40
          LastHeartbeat: 2022-12-05 15:01:18
                  Alive: true
   SystemDecommissioned: false
  ClusterDecommissioned: false
              TabletNum: 753
       DataUsedCapacity: 1.955 GB
          AvailCapacity: 202.987 GB
          TotalCapacity: 491.153 GB
                UsedPct: 58.67 %
         MaxDiskUsedPct: 58.67 %
     RemoteUsedCapacity: 0.000
			。。。。。。
HeartbeatFailureCounter: 0
               NodeRole: computation

Row to Column

使用EXPLODE 函数生成一个虚拟表 例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
CREATE TABLE `person` (
  `id` int(11) NULL,
  `name` text NULL,
  `age` int(11) NULL,
  `class` int(11) NULL,
  `address` text NULL
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);

插入

1
2
3
4
5
INSERT INTO person VALUES
    (100, 'John', 30, 1, 'Street 1'),
    (200, 'Mary', NULL, 1, 'Street 2'),
    (300, 'Mike', 80, 3, 'Street 3'),
    (400, 'Dan', 50, 4, 'Street 4');

查询

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
mysql> SELECT * FROM person
    ->     LATERAL VIEW EXPLODE(ARRAY(30, 60)) tableName AS c_age;
+------+------+------+-------+----------+-------+
| id   | name | age  | class | address  | c_age |
+------+------+------+-------+----------+-------+
|  100 | John |   30 |     1 | Street 1 |    30 |
|  100 | John |   30 |     1 | Street 1 |    60 |
|  200 | Mary | NULL |     1 | Street 2 |    30 |
|  200 | Mary | NULL |     1 | Street 2 |    60 |
|  300 | Mike |   80 |     3 | Street 3 |    30 |
|  300 | Mike |   80 |     3 | Street 3 |    60 |
|  400 | Dan  |   50 |     4 | Street 4 |    30 |
|  400 | Dan  |   50 |     4 | Street 4 |    60 |
+------+------+------+-------+----------+-------+

Best Practice

查询计划解析

1
EXPLAIN  select * from test_hll as h join test_table as t on h.id = t.user_id where user_id > 1;

更详细的文本方式

1
EXPLAIN VERBOSE select ...

类似 Impala 的 图形查询计划

1
EXPLAIN GRAPH select ...

类似 Spark 那种toString 逻辑计划的方式打印出来

1
EXPLAIN PARSED PLAN

类似上面 Spark 的那种,这次的 查询 plan 是解析后的

1
EXPLAIN ANALYZED PLAN select ...;

执行过查询重写后的

1
EXPLAIN REWRITTEN PLAN select ...;

显示优化有的逻辑计划

1
EXPLAIN OPTIMIZED PLAN select ...;

简化后的查询计划

1
EXPLAIN SHAPE PLAN select ...;

查询计划之间的转换,通过如下两个算子

  • DataStreamSink
  • ExchangeNode

查看 profile

1
SET is_report_success=true;

debug log,FE 有三种方式

  • 修改配置文件,需要重启
  • 通过 WebUI 修改,无需重启
  • 或者通过 HTTP API 方式修改

fe 的修改配置文件,修改 fe.conf 文件

1
2
3
4
5
6
7
8
# Only enable Debug log for class org.apache.doris.catalog.Catalog
sys_log_verbose_modules=org.apache.doris.catalog.Catalog

# Open the Debug log of all classes under the package org.apache.doris.catalog
sys_log_verbose_modules=org.apache.doris.catalog

# Enable Debug logs for all classes under package org
sys_log_verbose_modules=org

修改 be

1
2
sys_log_verbose_modules=plan_fragment_executor,olap_scan_node
sys_log_verbose_level=3

压缩

  • Vertical Compaction
  • 将多个列合并为一组做压缩,比传统的 row 压缩效果更好,内存消耗是 1/10,压缩比提高 15%
  • Segment Compaction
  • 用于处理数据 load,这可能会引发很多小文件

相关配置

  • enable_vertical_compaction
  • vertical_compaction_num_columns_per_group
  • vertical_compaction_max_segment_size
  • enable_segcompaction
  • segcompaction_batch_size

WEB UI

  • FE,http://FE_HOST:8030/
  • BE,http://FE_HOST:8040/