Table Design
Data Model
- Aggregate
- Unique
- Duplicate
Aggregate Model
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
CREATE DATABASE IF NOT EXISTS example_db;
CREATE TABLE IF NOT EXISTS example_db.example_tbl_agg1
(
`user_id` LARGEINT NOT NULL COMMENT "user id",
`date` DATE NOT NULL COMMENT "data import time",
`city` VARCHAR(20) COMMENT "city",
`age` SMALLINT COMMENT "age",
`sex` TINYINT COMMENT "gender",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "last visit date time",
`cost` BIGINT SUM DEFAULT "0" COMMENT "user total cost",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "user max dwell time",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "user min dwell time"
)
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
|
插入数据
1
2
3
4
5
6
7
8
|
insert into example_db.example_tbl_agg1 values
(10000,"2017-10-01","Beijing",20,0,"2017-10-01 06:00:00",20,10,10),
(10000,"2017-10-01","Beijing",20,0,"2017-10-01 07:00:00",15,2,2),
(10001,"2017-10-01","Beijing",30,1,"2017-10-01 17:05:45",2,22,22),
(10002,"2017-10-02","Shanghai",20,1,"2017-10-02 12:59:12",200,5,5),
(10003,"2017-10-02","Guangzhou",32,0,"2017-10-02 11:20:00",30,11,11),
(10004,"2017-10-01","Shenzhen",35,0,"2017-10-01 10:00:15",100,3,3),
(10004,"2017-10-03","Shenzhen",35,0,"2017-10-03 10:20:22",11,6,6);
|
10000
的那两条数据,user_id, date, city, age, sex 这几个值都一样,所以被聚合了
- last_visit_date,被最新的替换了
- cost,将两条数据值做了 sum
- max_dwell_time,取两条记录最大
- min_dwell_time,取两条记录最小
10004
没有被聚合,因为两条记录有些值不完全一样
这种聚合适合 多维分析场景,可以预聚合
主要的聚合类型
- SUM: Accumulate the values in multiple rows.
- REPLACE: The newly imported value will replace the previous value.
- MAX: Keep the maximum value.
- MIN: Keep the minimum value.
- REPLACE_IF_NOT_NULL: Non-null value replacement. Unlike REPLACE, it does not replace null values.
- HLL_UNION: Aggregation method for columns of HLL type, using the HyperLogLog algorithm for aggregation.
- BITMAP_UNION: Aggregation method for columns of BITMAP type, performing a union aggregation of bitmaps.
agg_state
- 具体的类型根据 申明的聚合函数来确定
- 更像是一个中间结果
1
2
3
4
5
6
7
8
9
|
set enable_agg_state=true;
create table aggstate(
k1 int null,
k2 agg_state sum(int),
k3 agg_state group_concat(string)
)
aggregate key (k1)
distributed BY hash(k1) buckets 3
properties("replication_num" = "1");
|
使用
1
2
3
4
|
insert into aggstate values(1,sum_state(1),group_concat_state('a'));
insert into aggstate values(1,sum_state(2),group_concat_state('b'));
insert into aggstate values(1,sum_state(3),group_concat_state('c'));
insert into aggstate values(2,sum_state(4),group_concat_state('d'));
|
此时表的结构
k1 |
k2 |
k3 |
1 |
sum(1,2,3) |
group_concat_state(a,b,c) |
2 |
sum(4) |
group_concat_state(d) |
查询
1
2
3
4
5
6
7
8
9
10
11
12
13
|
select sum_merge(k2) from aggstate;
+---------------+
| sum_merge(k2) |
+---------------+
| 10 |
+---------------+
select group_concat_merge(k3) from aggstate;
+------------------------+
| group_concat_merge(k3) |
+------------------------+
| c,b,a,d |
+------------------------+
|
Unique Model
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
CREATE TABLE IF NOT EXISTS example_db.example_tbl_unique
(
`user_id` LARGEINT NOT NULL COMMENT "User ID",
`username` VARCHAR (50) NOT NULL COMMENT "Username",
`city` VARCHAR (20) COMMENT "User location city",
`age` SMALLINT COMMENT "User age",
`sex` TINYINT COMMENT "User sex",
`phone` LARGEINT COMMENT "User phone number",
`address` VARCHAR (500) COMMENT "User address",
`register_time` DATETIME COMMENT "User registration time"
)
UNIQUE KEY (`user_id`, `username`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
|
相当于定义 primary key,多列组成复合主键,重复了就是 替换
类似于聚合模式的 replace
1.2 之后新增了Merge on Write,大概是 LSM 的写墓碑操作,可以优化读性能
比 聚合模式的 Merge on Read性能更好
Duplicate Model
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
CREATE TABLE IF NOT EXISTS example_db.example_tbl_duplicate
(
`timestamp` DATETIME NOT NULL COMMENT "Log time",
`type` INT NOT NULL COMMENT "Log type",
`error_code` INT COMMENT "Error code",
`error_msg` VARCHAR(1024) COMMENT "Error details",
`op_id` BIGINT COMMENT "Operator ID",
`op_time` DATETIME COMMENT "Operation time"
)
DUPLICATE KEY(`timestamp`, `type`, `error_code`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
|
多个 key 重复了不会覆盖,还是相当于多个 row
Data Partition
分区维度划分
- 表 -> 分区 -> tablet
- tablet 是最小的物理单元,movement and replication 基于 tablet 操作
- partition 是最小 逻辑单元,import, delete 等基于 分区来做
- 每个分区之间的数据也是不想交的
range 分区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
CREATE TABLE IF NOT EXISTS example_db.example_range_tbl
(
`user_id` LARGEINT NOT NULL COMMENT "User ID",
`date` DATE NOT NULL COMMENT "Date when the data are imported",
`timestamp` DATETIME NOT NULL COMMENT "Timestamp when the data are imported",
`city` VARCHAR(20) COMMENT "User location city",
`age` SMALLINT COMMENT "User age",
`sex` TINYINT COMMENT "User gender",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "User last visit time",
`cost` BIGINT SUM DEFAULT "0" COMMENT "Total user consumption",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "Maximum user dwell time",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "Minimum user dwell time"
)
ENGINE=olap
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
PARTITION BY RANGE(`date`)
(
PARTITION `p201701` VALUES LESS THAN ("2017-02-01"),
PARTITION `p201702` VALUES LESS THAN ("2017-03-01"),
PARTITION `p201703` VALUES LESS THAN ("2017-04-01")
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
PROPERTIES
(
"replication_num" = "3",
"storage_medium" = "SSD",
"storage_cooldown_time" = "2018-01-01 12:00:00"
);
|
list 分区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
CREATE TABLE IF NOT EXISTS example_db.example_list_tbl
(
`user_id` LARGEINT NOT NULL COMMENT "User ID",
`date` DATE NOT NULL COMMENT "Date when the data are imported",
`timestamp` DATETIME NOT NULL COMMENT "Timestamp when the data are imported",
`city` VARCHAR(20) COMMENT "User location city",
`age` SMALLINT COMMENT "User Age",
`sex` TINYINT COMMENT "User gender",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "User last visit time",
`cost` BIGINT SUM DEFAULT "0" COMMENT "Total user consumption",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "Maximum user dwell time",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "Minimum user dwell time"
)
ENGINE=olap
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
PARTITION BY LIST(`city`)
(
PARTITION `p_cn` VALUES IN ("Beijing", "Shanghai", "Hong Kong"),
PARTITION `p_usa` VALUES IN ("New York", "San Francisco"),
PARTITION `p_jp` VALUES IN ("Tokyo")
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
PROPERTIES
(
"replication_num" = "3",
"storage_medium" = "SSD",
"storage_cooldown_time" = "2018-01-01 12:00:00"
);
|
Bucketing
- 类似于 hive 和 impala 的分桶
- 基于 hash、或者随机
- 桶多了适合写,尤其是随机的可以消除数据倾斜;桶少了适合读取
- tablet 大小推荐
1G - 10G
- 双层分区就是 分区 + 分桶,单层分区只有一个桶
Rollup
Rollup
- Rollup can be seen as a materialized index structure for a Table
- materialized in the sense that its data is physically independent in storage
- and indexed in the sense that Rollup can reorder columns to increase the hit rate of prefix indexes as well as reduce Key columns to increase the aggregation level of data.
概念
- rollup,相当于 物化索引
- 查询的时候由 Doris 自动决定,用户无法操作
- rollup 是在基表上创建的,对基表的操作也会影响到 rollup
- rollup的存储跟 基本是独立的,大量的rollup对 import 有影响,但对查询无影响
- 查询中的所有列,包括 select 和 where 中的列都必须在 rollup 中出现,否则智能访问基表
- 对基表的更新会 自动同步更新到 rollup
操作
1
|
ALTER TABLE table1 ADD ROLLUP rollup_city(citycode, pv);
|
查看
1
2
3
|
SHOW ALTER TABLE ROLLUP;
DESC tbl_name ALL;
CANCEL ALTER TABLE ROLLUP FROM table1;
|
Index
内置索引,不可选择
- prefix indexes
- ZoneMap indexes
prefix index 例子
ColumnName |
Type |
user_id |
BIGINT |
age |
INT |
message |
VARCHAR(100) |
max_dwell_time |
DATETIME |
min_dwell_time |
DATETIME |
使用 36个字节作为 prefix index,user_id(8 Bytes) + age(4 Bytes) + message(prefix 20 Bytes)
message会被截断
1
2
3
4
5
|
-- 有有效
SELECT * FROM table WHERE user_id=1829239 and age=20;
-- 较差
SELECT * FROM table WHERE age=20;
|
用户自定义的索引
- inverted index
- bloomfilter index
- ngram bloomfilter index
- bitmap index
Inverted Index
基本语法
- parser,包含english、chinese、unicode 等解析方式
- parser_mode,fine_grained 细节度解析,会生成更多的 token,coarse_grained
- support_phrase,索引是否需要支持短语模式查询MATCH_PHRAS
- char_filter,用于预处理阶段
一些例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
SELECT TOKENIZE('武汉长江大桥','"parser"="chinese","parser_mode"="fine_grained"');
+-----------------------------------------------------------------------------------+
| tokenize('武汉长江大桥', '"parser"="chinese","parser_mode"="fine_grained"') |
+-----------------------------------------------------------------------------------+
| ["武汉", "武汉长江大桥", "长江", "长江大桥", "大桥"] |
+-----------------------------------------------------------------------------------+
SELECT TOKENIZE('武汉市长江大桥','"parser"="chinese","parser_mode"="coarse_grained"');
+----------------------------------------------------------------------------------------+
| tokenize('武汉市长江大桥', '"parser"="chinese","parser_mode"="coarse_grained"') |
+----------------------------------------------------------------------------------------+
| ["武汉市", "长江大桥"] |
+----------------------------------------------------------------------------------------+
SELECT TOKENIZE('I love CHINA','"parser"="english"');
+------------------------------------------------+
| tokenize('I love CHINA', '"parser"="english"') |
+------------------------------------------------+
| ["i", "love", "china"] |
+------------------------------------------------+
|
一个例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
CREATE DATABASE test_inverted_index;
USE test_inverted_index;
-- define inverted index idx_comment for comment column on table creation
-- USING INVERTED specify using inverted index
-- PROPERTIES("parser" = "english") specify english word parser
CREATE TABLE hackernews_1m
(
`id` BIGINT,
`deleted` TINYINT,
`type` String,
`author` String,
`timestamp` DateTimeV2,
`comment` String,
`dead` TINYINT,
`parent` BIGINT,
`poll` BIGINT,
`children` Array<BIGINT>,
`url` String,
`score` INT,
`title` String,
`parts` Array<INT>,
`descendants` INT,
INDEX idx_comment (`comment`) USING INVERTED PROPERTIES("parser" = "english") COMMENT 'inverted index for comment'
)
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES ("replication_num" = "1");
|
查询
1
2
3
4
5
6
7
8
|
-- 普通模糊查询
SELECT count() FROM hackernews_1m WHERE comment LIKE '%OLAP%';
SELECT count() FROM hackernews_1m WHERE comment LIKE '%OLAP%' AND comment LIKE '%OLTP%';
+---------+
-- 使用索引
SELECT count() FROM hackernews_1m WHERE comment MATCH_ANY 'OLAP';
SELECT count() FROM hackernews_1m WHERE comment MATCH_ALL 'OLAP OLTP';
|
增加一索引
1
2
3
4
5
6
7
|
CREATE INDEX idx_timestamp ON hackernews_1m(timestamp) USING INVERTED;
BUILD INDEX idx_timestamp ON hackernews_1m;
SHOW ALTER TABLE COLUMN;
SHOW BUILD INDEX;
SELECT count() FROM hackernews_1m WHERE timestamp > '2007-08-23 04:17:00';
|
查看状态
1
2
|
SHOW ALTER TABLE COLUMN;
SHOW BUILD INDEX;
|
BloomFilter Index
创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
CREATE TABLE IF NOT EXISTS sale_detail_bloom (
sale_date date NOT NULL COMMENT "Sales time",
customer_id int NOT NULL COMMENT "Customer ID",
saler_id int NOT NULL COMMENT "Salesperson",
sku_id int NOT NULL COMMENT "Product ID",
category_id int NOT NULL COMMENT "Product Category",
sale_count int NOT NULL COMMENT "Sales Quantity",
sale_price DECIMAL(12,2) NOT NULL COMMENT "unit price",
sale_amt DECIMAL(20,2) COMMENT "Total sales amount"
)
Duplicate KEY(sale_date, customer_id,saler_id,sku_id,category_id)
PARTITION BY RANGE(sale_date)
(
PARTITION P_202111 VALUES [('2021-11-01'), ('2021-12-01'))
)
DISTRIBUTED BY HASH(saler_id) BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"bloom_filter_columns"="saler_id,category_id",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "MONTH",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "P_",
"dynamic_partition.replication_num" = "1",
"dynamic_partition.buckets" = "3"
);
|
检查
1
2
3
4
5
6
7
8
|
-- 查看表结构,是否存在
SHOW CREATE TABLE <table_name>;
-- 删除
ALTER TABLE <db.table_name> SET ("bloom_filter_columns" = "");
-- 修改
ALTER TABLE <db.table_name> SET ("bloom_filter_columns" = "k1,k3");
|
使用场景
- 适合 non-prefix filtering
- 根据最高频率进行过滤,大多条件都是
=
过滤
- bitmap 适合 low-cardinality 的列,如 gender,bloom filter 适合 high cardinality 列,如 UserID
NGram BloomFilter Index
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
CREATE TABLE `table3` (
`siteid` int(11) NULL DEFAULT "10" COMMENT "",
`citycode` smallint(6) NULL COMMENT "",
`username` varchar(100) NULL DEFAULT "" COMMENT "",
INDEX idx_ngrambf (`username`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256") COMMENT 'username ngram_bf index'
) ENGINE=OLAP
AGGREGATE KEY(`siteid`, `citycode`, `username`) COMMENT "OLAP"
DISTRIBUTED BY HASH(`siteid`) BUCKETS 10
PROPERTIES (
"replication_num" = "1"
);
-- PROPERTIES("gram_size"="3", "bf_size"="1024"),indicate the number of gram and bytes of bloom filter respectively.
-- the gram size set to same as the like query pattern string length.
-- and the suitable bytes of bloom filter can be get by test, more larger more better, 256 maybe is a good start.
-- Usually, if the data's cardinality is small, you can increase the bytes of bloom filter to improve the efficiency.
|
检查
1
2
3
4
5
|
show index from example_db.table3;
alter table example_db.table3 drop index idx_ngrambf;
alter table example_db.table3 add index idx_ngrambf(username)
using NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="512")comment 'username ngram_bf index'
|
NGrams
- NGram is a contiguous sequence of n items from a given sample of text or speech.
- An NGram of size 1 is referred to as a “unigram”; size 2 is a “bigram”;
- size 3 is a “trigram”. For example, the word “hello” can be broken down into the following bigrams: he, el, ll, lo.
NGram Bloom Filter Index
- This is a combination of the two concepts, typically used to index text data.
- It uses NGrams to break down the text into searchable parts and then applies a Bloom filter to these NGrams to quickly check for the presence or absence of these NGrams in the index.
Bitmap Index
操作
1
2
3
|
CREATE INDEX IF NOT EXISTS bitmap_index ON table3 (siteid) USING BITMAP COMMENT 'balabala';
SHOW INDEX FROM example_db.table_name;
DROP INDEX [IF EXISTS] index_name ON [db_name.]table_name;
|
查看状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
SHOW INDEX FROM table3\G
*************************** 1. row ***************************
Table: default_cluster:test_index.table3
Non_unique:
Key_name: idx_ngrambf
Seq_in_index:
Column_name: username
Collation:
Cardinality:
Sub_part:
Packed:
Null:
Index_type: NGRAM_BF
Comment: username ngram_bf index
Properties: ("gram_size" = "3", "bf_size" = "256")
*************************** 2. row ***************************
Table: default_cluster:test_index.table3
Non_unique:
Key_name: bitmap_index
Seq_in_index:
Column_name: siteid
Collation:
Cardinality:
Sub_part:
Packed:
Null:
Index_type: BITMAP
Comment: balabala
Properties:
|
Basic Use
冷热数据
-- 打了:HDD 标签的是冷数据
-- SSD 标签的是热数据
storage_root_path=/home/disk1/doris,medium:HDD;/home/disk2/doris,medium:SSD
Broker
- Broker is deployed as a plug-in, which is independent of Doris.
- If you need to import data from a third-party storage system, you need to deploy the corresponding Broker.
- By default, Doris provides fs_broker for HDFS reading and object storage (supporting S3 protocol).
- fs_broker is stateless and we recommend that you deploy a Broker for each FE and BE node.
一些管理操作
1
2
3
4
|
SET PASSWORD FOR 'root' = PASSWORD('your_password');
CREATE USER 'test' IDENTIFIED BY 'test_passwd';
GRANT ALL ON example_db TO test;
|
load data
1
2
|
curl --location-trusted -u test:test_passwd -H "label:table1_20170707" -H "column_separator:,"
-T table1_data http://FE_HOST:8030/api/example_db/table1/_stream_load
|
Broker Load
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
LOAD LABEL table1_20170708
(
DATA INFILE("hdfs://your.namenode.host:port/dir/table1_data")
INTO TABLE table1
)
WITH BROKER hdfs
(
"username"="hdfs_user",
"password"="hdfs_password"
)
PROPERTIES
(
"timeout"="3600",
"max_filter_ratio"="0.1"
);
SHOW LOAD WHERE LABEL = "table1_20170708";
|
查询变量
1
2
3
4
5
|
SHOW VARIABLES;
SHOW VARIABLES LIKE "%mem_limit%";
SET exec_mem_limit = 8589934592;
SHOW VARIABLES LIKE "%query_timeout%";
SET query_timeout = 60;
|
Data Operation
Import
By Scene
Data Source |
Import Method |
Object Storage (s3), HDFS |
Import data using Broker |
Local file |
Import local data |
Kafka |
Subscribe to Kafka data |
Mysql, PostgreSQL, Oracle, SQLServer |
Sync data via external table |
Import via JDBC |
Sync data using JDBC |
Import JSON format data |
JSON format data import |
Divided by Import Method
Import method name |
Use method |
Spark Load |
Import external data via Spark |
Broker Load |
Import external storage data via Broker |
Stream Load |
Stream import data (local file and memory data) |
Routine Load |
Import Kafka data |
Insert Into |
External table imports data through INSERT |
S3 Load |
Object storage data import of S3 protocol |
MySql Load |
Local data import of MySql protocol |
Supported Data Formats
Import Methods |
Supported Formats |
Broker Load |
parquet, orc, csv, gzip |
Stream Load |
csv, json, parquet, orc |
Routine Load |
csv, json |
MySql Load |
csv |
INSERT 自动有原子保证
通过 Label
再配合上游,可以实现精确一致
同步导入,异步导入
导入数组形式,支持向量化
1
2
3
4
5
6
7
|
LOAD LABEL label_03_14_49_34_898986_19090452100 (
DATA INFILE("hdfs://test.hdfs.com:9000/user/test/data/sys/load/array_test.data")
INTO TABLE `test_array_table`
COLUMNS TERMINATED BY "|" (`k1`, `a1`, `a2`, `a3`, `a4`, `a5`, `a6`, `a7`, `a8`, `a9`, `a10`, `a11`, `a12`, `a13`, `b14`)
SET(a14=array_union(cast(b14 as array<string>), cast(a13 as array<string>))) WHERE size(a2) > 270)
WITH BROKER "hdfs" ("username"="test_array", "password"="")
PROPERTIES( "max_filter_ratio"="0.8" );
|
Import Scenes
Stream Load
1
|
PUT /api/{db}/{table}/_stream_load
|
最大不要超过 1G - 2G,超过可以并发批次导入
MySQL load
1
2
3
4
|
LOAD DATA
LOCAL
INFILE '/path/to/local/demo.txt'
INTO TABLE demo.load_local_file_test
|
从 hdfs 导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
LOAD LABEL demo.label_20231212_1
(
DATA INFILE("hdfs://1.2.3.4:8020/user/root/haha.txt")
INTO TABLE `load_hdfs_file_test`
COLUMNS TERMINATED BY "\t"
(id,age,name)
)
with HDFS (
"fs.defaultFS"="hdfs://1.2.3.4:8020",
"hdfs_user"="root"
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);
|
检查结果
1
|
show load order by createtime desc limit 1\G;
|
支持 S3 导入
Kafka 导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
CREATE ROUTINE LOAD demo.my_first_routine_load_job ON test_1
COLUMNS TERMINATED BY ","
PROPERTIES
(
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.group.id" = "xxx",
"property.client.id" = "xxx",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
|
支持的外部数据源
- MySQL
- Oracle
- PostgreSQL
- SQLServer
- Hive
- Iceberg
- ElasticSearch
导入
1
2
|
INSERT INTO table SELECT ...
INSERT INTO table VALUES(...)
|
column conversion
- Pre-filtering
- Mapping, souce (a1,a2,a3) -> dest (a2, a3, a1 * 10)
- Convert the column values in the source file and import them into the table
- Through the case when function, column conversion is performed conditionally
- Convert the null value in the source file to 0 and import it. At the same time, the region id conversion in Example 2 is also performed.
Data Quality
- Filtered Rows,会碰到数据格式,类型不匹配,长度阶段等错误
- Unselected Rows
- Loaded Rows
- Strict Mode
import methods
- broker load
- stream load
- routine load
- INSERT
Import Way
Broker Load
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
+
| 1. user create broker load
v
+----+----+
| |
| FE |
| |
+----+----+
|
| 2. BE etl and load the data
+--------------------------+
| | |
+---v---+ +--v----+ +---v---+
| | | | | |
| BE | | BE | | BE |
| | | | | |
+---+-^-+ +---+-^-+ +--+-^--+
| | | | | |
| | | | | | 3. pull data from broker
+---v-+-+ +---v-+-+ +--v-+--+
| | | | | |
|Broker | |Broker | |Broker |
| | | | | |
+---+-^-+ +---+-^-+ +---+-^-+
| | | | | |
+---v-+-----------v-+----------v-+-+
| HDFS/BOS/AFS cluster |
| |
+----------------------------------+
|
load hive table, use ORC
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
LOAD LABEL dish_2022_03_23
(
DATA INFILE("hdfs://10.220.147.151:8020/user/hive/warehouse/ods.db/ods_demo_orc_detail/*/*")
INTO TABLE doris_ods_test_detail
COLUMNS TERMINATED BY ","
FORMAT AS "orc"
(id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
COLUMNS FROM PATH AS (`day`)
SET
(rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,
commodity_name=commodity_name,commodity_price=commodity_price,member_price =member_price,cost_price=cost_price,unit=unit,
quantity=quantity,actual_price=actual_price)
)
WITH BROKER "broker_name_1"
(
"username" = "hdfs",
"password" = ""
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);
|
检查
1
2
|
show load order by createtime desc limit 1\G;
CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";
|
fe.conf
- min_bytes_per_broker_scanner
- max_bytes_per_broker_scanner
- max_broker_concurrency
Routine Load
FE 会将 JobScheduler 分解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
+---------+
| Client |
+----+----+
|
+-----------------------------+
| FE | |
| +-----------v------------+ |
| | | |
| | Routine Load Job | |
| | | |
| +---+--------+--------+--+ |
| | | | |
| +---v--+ +---v--+ +---v--+ |
| | task | | task | | task | |
| +--+---+ +---+--+ +---+--+ |
| | | | |
+-----------------------------+
| | |
v v v
+---+--+ +--+---+ ++-----+
| BE | | BE | | BE |
+------+ +------+ +------+
|
kafka 导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
WHERE k1 > 100 and k2 like "%doris%"
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
);
|
支持 SSL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
CREATE ROUTINE LOAD db1.job1 on tbl1
PROPERTIES
(
"desired_concurrent_number"="1"
)
FROM KAFKA
(
"kafka_broker_list"= "broker1:9091,broker2:9091",
"kafka_topic" = "my_topic",
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg"
);
|
支持 kerberos
1
2
3
4
5
6
7
8
9
10
11
12
13
|
CREATE ROUTINE LOAD db1.job1 on tbl1
PROPERTIES (
"desired_concurrent_number"="1",
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092",
"kafka_topic" = "my_topic",
"property.security.protocol" = "SASL_PLAINTEXT",
"property.sasl.kerberos.service.name" = "kafka",
"property.sasl.kerberos.keytab" = "/etc/krb5.keytab",
"property.sasl.kerberos.principal" = "[email protected]"
);
|
一些配置
- max_routine_load_task_concurrent_num
- max_routine_load_task_num_per_be
- max_routine_load_job_num
- max_consumer_num_per_group
- max_tolerable_backend_down_num
- period_of_auto_resume_min
Spark Load
用额外的集群来实现排序,聚会写入
stream load, brokder load 更适合小量数据,这两个会影响到 doris集群资源,而spark load 使用的是spark集群资源
spark load 更适合大数据量导入
执行过程
- Fe schedules and submits ETL tasks to spark cluster for execution.
- Spark cluster executes ETL to complete the preprocessing of load data. It includes global dictionary building (bitmap type), partitioning, sorting, aggregation, etc.
- After the ETL task is completed, Fe obtains the data path of each partition that has been preprocessed, and schedules the related be to execute the push task.
- Be reads data through broker and converts it into Doris underlying storage format.
- Fe schedule the effective version and complete the load job.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
+
| 0. User create spark load job
+----v----+
| FE |---------------------------------+
+----+----+ |
| 3. FE send push tasks |
| 5. FE publish version |
+------------+------------+ |
| | | |
+---v---+ +---v---+ +---v---+ |
| BE | | BE | | BE | |1. FE submit Spark ETL job
+---^---+ +---^---+ +---^---+ |
|4. BE push with broker | |
+---+---+ +---+---+ +---+---+ |
|Broker | |Broker | |Broker | |
+---^---+ +---^---+ +---^---+ |
| | | |
+---+------------+------------+---+ 2.ETL +-------------v---------------+
| HDFS +-------> Spark cluster |
| <-------+ |
+---------------------------------+ +-----------------------------+
|
创建外部资源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
-- yarn cluster mode
CREATE EXTERNAL RESOURCE "spark0"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.jars" = "xxx.jar,yyy.jar",
"spark.files" = "/tmp/aaa,/tmp/bbb",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "queue0",
"spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
"broker" = "broker0",
"broker.username" = "user0",
"broker.password" = "password0"
);
-- spark standalone client mode
CREATE EXTERNAL RESOURCE "spark1"
PROPERTIES
(
"type" = "spark",
"spark.master" = "spark://127.0.0.1:7777",
"spark.submit.deployMode" = "client",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
"broker" = "broker1"
);
-- yarn HA mode
CREATE EXTERNAL RESOURCE sparkHA
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "default",
"spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
"spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
"spark.hadoop.yarn.resourcemanager.address.rm1" = "xxxx:8032",
"spark.hadoop.yarn.resourcemanager.address.rm2" = "xxxx:8032",
"spark.hadoop.fs.defaultFS" = "hdfs://nameservices01",
"spark.hadoop.dfs.nameservices" = "nameservices01",
"spark.hadoop.dfs.ha.namenodes.nameservices01" = "mynamenode1,mynamenode2",
"spark.hadoop.dfs.namenode.rpc-address.nameservices01.mynamenode1" = "xxxx:8020",
"spark.hadoop.dfs.namenode.rpc-address.nameservices01.mynamenode2" = "xxxx:8020",
"spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"working_dir" = "hdfs://nameservices01/doris_prd_data/sinan/spark_load/",
"broker" = "broker_name",
"broker.username" = "username",
"broker.password" = "",
"broker.dfs.nameservices" = "nameservices01",
"broker.dfs.ha.namenodes.nameservices01" = "mynamenode1, mynamenode2",
"broker.dfs.namenode.rpc-address.nameservices01.mynamenode1" = "xxxx:8020",
"broker.dfs.namenode.rpc-address.nameservices01.mynamenode2" = "xxxx:8020",
"broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
|
其他相关命令
1
2
3
4
5
6
7
8
9
10
11
12
13
|
-- drop spark resource
DROP RESOURCE resource_name
-- show resources
SHOW RESOURCES
SHOW PROC "/resources"
-- privileges
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identity
GRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identity
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name
|
支持 kerberos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
CREATE EXTERNAL RESOURCE "spark_on_kerberos"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.jars" = "xxx.jar,yyy.jar",
"spark.files" = "/tmp/aaa,/tmp/bbb",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "queue0",
"spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
"spark.hadoop.hadoop.security.authentication" = "kerberos",
"spark.hadoop.yarn.resourcemanager.principal" = "[email protected]",
"spark.hadoop.yarn.resourcemanager.keytab" = "/home/doris/yarn.keytab",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
"broker" = "broker0",
"broker.hadoop.security.authentication" = "kerberos",
"broker.kerberos_principal" = "[email protected]",
"broker.kerberos_keytab" = "/home/doris/my.keytab"
);
|
Stream Load
处理过程,fe 会将 http请求转发给 be,客户端也可以直接访问 be
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
^ +
| |
| | 1A. User submit load to FE
| |
| +--v-----------+
| | FE |
5. Return result to user | +--+-----------+
| |
| | 2. Redirect to BE
| |
| +--v-----------+
+---+Coordinator BE| 1B. User submit load to BE
+-+-----+----+-+
| | |
+-----+ | +-----+
| | | 3. Distrbute data
| | |
+-v-+ +-v-+ +-v-+
|BE | |BE | |BE |
+---+ +---+ +---+
|
一个例子
1
|
curl --location-trusted -u root -T date -H "label:123" http://abc.com:8030/api/test/date/_stream_load
|
导入时的一些重要参数,这些参数放入到 http header 中
- label,唯一标识用的,防止重复导入
- column_separator
- line_delimiter
- max_filter_ratio
- where
- partitions
- columns
- format,包括:csv, json
- exec_mem_limit,默认为 2G
- merge_type。类型:APPEND, DELETE, and MERGE. APPEND is the default
- two_phase_commit
- enclose
- escape
- enable_profile
- memtable_on_sink_node
- partial_columns
一个例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
curl --location-trusted -u user:passwd -H "two_phase_commit:true" -T test.txt http://fe_host:http_port/api/{db}/{table}/_stream_load
{"TxnId": 18036,
"Label": "55c8ffc9-1c40-4d51-b75e-f2265b3602ef",
"TwoPhaseCommit": "true",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 100,
"NumberLoadedRows": 100,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 1031,
"LoadTimeMs": 77,
"BeginTxnTimeMs": 1,
"StreamLoadPutTimeMs": 1,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 58,
"CommitAndPublishTimeMs": 0
}```
导入语句中可以加入 SQL,例子
```shell
curl --location-trusted -u root: -T test.csv
-H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream("format" = "CSV",
"column_separator" = "," ) where age >= 30" http://127.0.0.1:28030/api/_http_stream
|
MySql Load
使用原生的 MySQL语法
执行步骤
- FE receives the MySQL Load request executed by the client and then analyse the SQL
- FE build the MySql Load request as a StreamLoad request.
- FE selects a BE node to send a StreamLoad request
- When sending the request, FE will read the local file data from the MySQL client side streamingly, and send it to the HTTP request of StreamLoad asynchronously.
- After the data transfer on the MySQL client side is completed, FE waits for the StreamLoad to complete, and displays the import success or failure information to the client side.
例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
-- 建表
CREATE TABLE testdb.t1 (pk INT, v1 INT SUM) AGGREGATE KEY (pk) DISTRIBUTED BY hash (pk) PROPERTIES ('replication_num' = '1');
-- import file from client node
LOAD DATA LOCAL
INFILE 'client_local.csv '
INTO TABLE testdb.t1
PARTITION (partition_a, partition_b, partition_c, partition_d)
COLUMNS TERMINATED BY '\ t'
LINES TERMINATED BY '\ n'
IGNORE 1 LINES
(K1, k2, v2, v10, v11)
SET (c1 = k1, c2 = k2, c3 = v10, c4 = v11)
PROPERTIES ("strict_mode" = "true")
-- import file from fe server node
LOAD DATA
INFILE '/root/server_local.csv'
INTO TABLE testdb.t1
PARTITION (partition_a, partition_b, partition_c, partition_d)
COLUMNS TERMINATED BY '\ t'
LINES TERMINATED BY '\ n'
IGNORE 1 LINES
(K1, k2, v2, v10, v11)
SET (c1 = k1, c2 = k2, c3 = v10, c4 = v11)
PROPERTIES ("strict_mode" = "true")
|
相关配置
- mysql_load_thread_pool
- mysql_load_server_secure_path
- mysql_load_in_memory_record
S3 Load
例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
LOAD LABEL example_db.exmpale_label_1
(
DATA INFILE("s3://your_bucket_name/your_file.txt")
INTO TABLE load_test
COLUMNS TERMINATED BY ","
)
WITH S3
(
"AWS_ENDPOINT" = "AWS_ENDPOINT",
"AWS_ACCESS_KEY" = "AWS_ACCESS_KEY",
"AWS_SECRET_KEY"="AWS_SECRET_KEY",
"AWS_REGION" = "AWS_REGION"
)
PROPERTIES
(
"timeout" = "3600"
);
|
Insert Into
两种形式
- INSERT INTO tbl SELECT …
- INSERT INTO tbl (col1, col2, …) VALUES (1, 2, …), (1,3, …);
例子
1
2
|
INSERT INTO tbl2 WITH LABEL label1 SELECT * FROM tbl3;
INSERT INTO tbl1 VALUES ("qweasdzxcqweasdzxc"), ("a");
|
insert select
1
2
3
4
5
6
7
8
9
10
11
12
13
|
INSERT INTO tbl1 WITH LABEL label1
WITH cte1 AS (SELECT * FROM tbl1), cte2 AS (SELECT * FROM tbl2)
SELECT k1 FROM cte1 JOIN cte2 WHERE cte1.k1 = 1;
INSERT INTO tbl1 (k1)
WITH cte1 AS (SELECT * FROM tbl1), cte2 AS (SELECT * FROM tbl2)
SELECT k1 FROM cte1 JOIN cte2 WHERE cte1.k1 = 1;
INSERT INTO tbl1 (k1)
select * from (
WITH cte1 AS (SELECT * FROM tbl1), cte2 AS (SELECT * FROM tbl2)
SELECT k1 FROM cte1 JOIN cte2 WHERE cte1.k1 = 1) as ret
|
查询
1
2
3
4
5
6
7
8
9
|
show last insert\G
*************************** 1. row ***************************
TransactionId: 64067
Label: insert_ba8f33aea9544866-8ed77e2844d0cc9b
Database: default_cluster:db1
Table: t1
TransactionStatus: VISIBLE
LoadedRows: 2
FilteredRows: 0
|
支持 json path
其他没什么特别的
Import Advanced
默认是要 半数以上副本写入成功,才算 import 成功
设置 min_load_replica_num 属性
1
2
3
4
5
6
7
8
9
10
11
12
|
CREATE TABLE test_table1
(
k1 INT,
k2 INT
)
DUPLICATE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 5
PROPERTIES
(
'replication_num' = '2',
'min_load_replica_num' = '1'
);
|
修改已经存在的表
1
2
|
ALTER TABLE test_table1
SET ( 'min_load_replica_num' = '1');
|
也可以设置 fe 级别的 min_load_replica_num
Export
执行过程
- The user submits an Export job to FE.
- FE calculates all the tablets to be exported and groups them based on the parallelism parameter. Each group generates multiple SELECT INTO OUTFILE query plans based on the maximum_number_of_export_partitions parameter.
- Based on the parallelism parameter, an equal number of ExportTaskExecutor are generated, and each ExportTaskExecutor is responsible for a thread, which is scheduled and executed by FE’s Job scheduler framework.
- FE’s Job scheduler schedules and executes the ExportTaskExecutor, and each ExportTaskExecutor serially executes the multiple SELECT INTO OUTFILE query plans it is responsible for.
导出到 HDFS
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
EXPORT TABLE db1.tbl1
PARTITION (p1,p2)
[WHERE [expr]]
TO "hdfs://host/path/to/export/"
PROPERTIES
(
"label" = "mylabel",
"column_separator"=",",
"columns" = "col1,col2",
"parallelusm" = "3"
)
WITH BROKER "hdfs"
(
"username" = "user",
"password" = "passwd"
);
|
导出到 S3
1
2
3
4
5
6
7
|
EXPORT TABLE test TO "s3://bucket/path/to/export/dir/"
WITH S3 (
"s3.endpoint" = "http://host",
"s3.access_key" = "AK",
"s3.secret_key"="SK",
"s3.region" = "region"
);
|
检查状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
show EXPORT\G;
*************************** 1. row ***************************
JobId: 14008
State: FINISHED
Progress: 100%
TaskInfo: {"partitions":[],"max_file_size":"","delete_existing_files":"","columns":"","format":"csv","column_separator":"\t","line_delimiter":"\n","db":"default_cluster:demo","tbl":"student4","tablet_num":30}
Path: hdfs://host/path/to/export/
CreateTime: 2019-06-25 17:08:24
StartTime: 2019-06-25 17:08:28
FinishTime: 2019-06-25 17:08:34
Timeout: 3600
ErrorMsg: NULL
OutfileInfo: [
[
{"fileNumber": "1",
"totalRows": "4",
"fileSize": "34bytes",
"url": "file:///127.0.0.1/Users/fangtiewei/tmp_data/export/f1ab7dcc31744152-bbb4cda2f5c88eac_"
}]
]
|
取消
1
2
3
|
CANCEL EXPORT
FROM example_db
WHERE LABEL like "%example%";
|
相关配置和建议
- Concurrent Export
- exec_mem_limit
- The Export job can export data from Doris Base tables, View, and External tables, but not from Rollup Index.
- 一次不要导出太大数据,可以导出分区
SELECT INTO OUTFILE 例子
1
2
3
4
5
6
7
8
9
10
11
12
|
SELECT * FROM tbl
INTO OUTFILE "hdfs://path/to/result_"
FORMAT AS CSV
PROPERTIES
(
"broker.name" = "my_broker",
"column_separator" = ",",
"line_delimiter" = "\n"
);
select * from tbl1 limit 10
INTO OUTFILE "file:///home/work/path/result_";
|
支持并发导出
普通 导出,并发导出
1
2
3
4
5
6
7
8
9
10
11
12
13
|
select * from tbl1 limit 10 into outfile "file:///home/work/path/result_";
+------------+-----------+----------+--------------------------------------------------------------------+
| FileNumber | TotalRows | FileSize | URL |
+------------+-----------+----------+--------------------------------------------------------------------+
| 1 | 2 | 8 | file:///192.168.1.10/home/work/path/result_{fragment_instance_id}_ |
+------------+-----------+----------+--------------------------------------------------------------------+
+------------+-----------+----------+--------------------------------------------------------------------+
| FileNumber | TotalRows | FileSize | URL |
+------------+-----------+----------+--------------------------------------------------------------------+
| 1 | 3 | 7 | file:///192.168.1.10/home/work/path/result_{fragment_instance_id}_ |
| 1 | 2 | 4 | file:///192.168.1.11/home/work/path/result_{fragment_instance_id}_ |
+------------+-----------+----------+--------------------------------------------------------------------+
|
导出表结构
1
|
mysqldump -h127.0.0.1 -P9030 -uroot --no-tablespaces --databases test --tables table1 --no-data
|
Update and Delete
导入数据的方式有三种
具体实现是,增加一个__DORIS_DELETE_SIGN__
隐藏列
- remove,设置 DORIS_DELETE_SIGN != true
- import,按照 delete on 的条件来设置这一列
- read,增加一个隐藏表达式,DORIS_DELETE_SIGN != true
- compact,会将标记为 delete 的数据删除
例子
stream load
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
LOAD LABEL db1.label1
(
[MERGE|APPEND|DELETE] DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
INTO TABLE tbl1
COLUMNS TERMINATED BY ","
(tmp_c1,tmp_c2, label_c3)
SET
(
id=tmp_c2,
name=tmp_c1,
)
[DELETE ON label_c3=true]
)
WITH BROKER'broker'
(
"username"="user",
"password"="pass"
)
PROPERTIES
(
"timeout" = "3600"
);
|
routine load
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
[WITH MERGE|APPEND|DELETE]
COLUMNS(k1, k2, k3, v1, v2, label),
WHERE k1> 100 and k2 like "%doris%"
[DELETE ON label=true]
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
);
|
显示隐藏列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
SET show_hidden_columns=true
desc example_tbl_unique;
+-----------------------+--------------+------+-------+---------+---------+
| Field | Type | Null | Key | Default | Extra |
+-----------------------+--------------+------+-------+---------+---------+
| user_id | LARGEINT | No | true | NULL | |
| username | VARCHAR(50) | No | true | NULL | |
| city | VARCHAR(20) | Yes | false | NULL | REPLACE |
| age | SMALLINT | Yes | false | NULL | REPLACE |
| sex | TINYINT | Yes | false | NULL | REPLACE |
| phone | LARGEINT | Yes | false | NULL | REPLACE |
| address | VARCHAR(500) | Yes | false | NULL | REPLACE |
| register_time | DATETIME | Yes | false | NULL | REPLACE |
| __DORIS_DELETE_SIGN__ | TINYINT | No | false | 0 | REPLACE |
| __DORIS_VERSION_COL__ | BIGINT | No | false | 0 | REPLACE |
+-----------------------+--------------+------+-------+---------+---------+
|
doris 在并发更新的时候,没有做特殊处理,毕竟是 OLAP 系统
更新是基于 row 更新的,即使一行中的其他列没有变也会被更新
两个并发操作同时更新一个 row,可能会导致脏数据
更新的操作步骤
- Step 1: Read the rows that satisfy WHERE order id=1 (1, 100, ‘pending payment’)
- Step 2: Change the order status of the row from ‘Pending Payment’ to ‘Pending Shipping’ (1, 100, ‘Pending shipment’)
- Step 3: Insert the updated row back into the table to achieve the updated effect.
为提高性能,支持 部分列 更新
The Unique Key model currently supports column updates only in the Merge-on-Write implementation
部分列更新机制:使用 MVCC 实现
Usage Recommendations
- high write performance,low query performance:Aggregate Key Model.
- high query performance,lower write performance:Unique Key Merge-on-Write
- 读多写少场景,如果用反了,使用 Aggregate Key Model,有 5 - 10 倍性能差别
开启
1
|
set enable_unique_key_partial_update=true
|
delete 操作
1
2
3
4
|
mysql> delete from test_tbl PARTITION p1 where k1 = 1;
Query OK, 0 rows affected (0.04 sec)
{'label':'delete_e7830c72-eb14-4cb9-bbb6-eebd4511d251', 'status':'COMMITTED',
'txnId':'4005', 'err':'delete job is committed but may be taking effect later' }
|
参数
- tablet_delete_timeout_second
- load_straggler_wait_second
- query_timeout
- max_allowed_in_element_num_of_delete
查询
1
2
3
4
5
6
7
|
show delete from test_db;
+-----------+---------------+---------------------+-----------------+----------+
| TableName | PartitionName | CreateTime | DeleteCondition | State |
+-----------+---------------+---------------------+-----------------+----------+
| empty_tbl | p3 | 2020-04-15 23:09:35 | k1 EQ "1" | FINISHED |
| test_tbl | p4 | 2020-04-15 23:09:53 | k1 GT "80" | FINISHED |
+-----------+---------------+---------------------+-----------------+----------+
|
Sequence Column
- 为了解决导入时的乱序问题
- 增加一个 DORIS_SEQUENCE_COL 隐藏列
- 比如将这个隐藏列 设置为指向表的 date类型,根据时间排序
- import的时候,最大的时间会覆盖较小的
- read,根据时间排序会覆盖之前的
例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
CREATE TABLE test.test_table
(
user_id bigint,
date date,
group_id bigint,
modify_date date,
keyword VARCHAR(128)
)
UNIQUE KEY(user_id, date, group_id)
DISTRIBUTED BY HASH (user_id) BUCKETS 32
PROPERTIES(
"function_column.sequence_col" = 'modify_date',
"replication_num" = "1",
"in_memory" = "false"
);
|
查看隐藏列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
SET show_hidden_columns=true;
desc test_table;
+------------------------+--------------+------+-------+---------+---------+
| Field | Type | Null | Key | Default | Extra |
+------------------------+--------------+------+-------+---------+---------+
| user_id | BIGINT | Yes | true | NULL | |
| date | DATE | Yes | true | NULL | |
| group_id | BIGINT | Yes | true | NULL | |
| modify_date | DATE | Yes | false | NULL | REPLACE |
| keyword | VARCHAR(128) | Yes | false | NULL | REPLACE |
| __DORIS_DELETE_SIGN__ | TINYINT | No | false | 0 | REPLACE |
| __DORIS_VERSION_COL__ | BIGINT | No | false | 0 | REPLACE |
| __DORIS_SEQUENCE_COL__ | DATE | Yes | false | NULL | REPLACE |
+------------------------+--------------+------+-------+---------+---------+
|
导入数据后查看
1
2
3
4
5
6
7
8
9
10
11
12
13
|
1 2020-02-22 1 2020-02-21 a
1 2020-02-22 1 2020-02-22 b
1 2020-02-22 1 2020-03-05 c
1 2020-02-22 1 2020-02-26 d
1 2020-02-22 1 2020-02-23 e
1 2020-02-22 1 2020-02-24 b
select * from test_table;
+---------+------------+----------+-------------+---------+
| user_id | date | group_id | modify_date | keyword |
+---------+------------+----------+-------------+---------+
| 1 | 2020-02-22 | 1 | 2020-03-05 | c |
+---------+------------+----------+-------------+---------+
|
虽然c
只是在中间,但是其 modify_date 值是最大的,所以最后被保留了
Advanced Usage
Alter Table
修改 schema 包括:
- Add and delete columns
- Modify column type
- Adjust column order
- Add and modify Bloom Filter
- Add and delete bitmap index
基本概念
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
+----------+
| Load Job |
+----+-----+
|
| Load job generates both origin and new index data
|
| +------------------+ +---------------+
| | Origin Index | | Origin Index |
+------> New Incoming Data| | History Data |
| +------------------+ +------+--------+
| |
| | Convert history data
| |
| +------------------+ +------v--------+
| | New Index | | New Index |
+------> New Incoming Data| | History Data |
+------------------+ +---------------+
|
一个例子,原表
1
2
3
4
5
6
7
8
9
10
11
12
|
+-----------+-------+------+------+------+---------+-------+
| IndexName | Field | Type | Null | Key | Default | Extra |
+-----------+-------+------+------+------+---------+-------+
| tbl1 | k1 | INT | No | true | N/A | |
| | k2 | INT | No | true | N/A | |
| | k3 | INT | No | true | N/A | |
| | | | | | | |
| rollup2 | k2 | INT | No | true | N/A | |
| | | | | | | |
| rollup1 | k1 | INT | No | true | N/A | |
| | k2 | INT | No | true | N/A | |
+-----------+-------+------+------+------+---------+-------+
|
插入数据
1
2
3
4
|
ALTER TABLE tbl1
ADD COLUMN k4 INT default "1" to rollup1,
ADD COLUMN k4 INT default "1" to rollup2,
ADD COLUMN k5 INT default "1" to rollup2;
|
结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
+-----------+-------+------+------+------+---------+-------+
| IndexName | Field | Type | Null | Key | Default | Extra |
+-----------+-------+------+------+------+---------+-------+
| tbl1 | k1 | INT | No | true | N/A | |
| | k2 | INT | No | true | N/A | |
| | k3 | INT | No | true | N/A | |
| | k4 | INT | No | true | 1 | |
| | k5 | INT | No | true | 1 | |
| | | | | | | |
| rollup2 | k2 | INT | No | true | N/A | |
| | k4 | INT | No | true | 1 | |
| | k5 | INT | No | true | 1 | |
| | | | | | | |
| rollup1 | k1 | INT | No | true | N/A | |
| | k2 | INT | No | true | N/A | |
| | k4 | INT | No | true | 1 | |
+-----------+-------+------+------+------+---------+-------+
|
注意事项
- 一次只能有一个变更
- 不能阻塞 import、query
- The partition column and bucket column cannot be modified
replace操作
1
2
|
ALTER TABLE [db.]tbl1 REPLACE WITH tbl2
[PROPERTIES('swap' = 'true')];
|
解释
- 如果 swap = true,则 Rename table B to table A, Rename table A to table B.
- 如果是 false,则 Drop table A. Rename table B to table A.
Doris Partition
动态分区,根据指定的规则动态向前创建一些分区
以及根据规则是否保留、删除历史分区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
CREATE TABLE tbl1
(
k1 DATE,
...
)
PARTITION BY RANGE(k1) ()
DISTRIBUTED BY HASH(k1)
PROPERTIES
(
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-7",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "32"
);
|
解释
- time_unit,根据天创建分区
- start,往后保留 7天,默认不设置就是全部保留
- end,往前创建新的 3 天的分区
查看
1
2
3
4
5
6
7
8
9
10
11
|
SHOW DYNAMIC PARTITION TABLES;
+-----------+--------+----------+-------------+------+--------+---------+-----------+----------------+---------------------+--------+------------------------+----------------------+-------------------------+
| TableName | Enable | TimeUnit | Start | End | Prefix | Buckets | StartOf | LastUpdateTime | LastSchedulerTime | State | LastCreatePartitionMsg | LastDropPartitionMsg | ReservedHistoryPeriods |
+-----------+--------+----------+-------------+------+--------+---------+-----------+----------------+---------------------+--------+------------------------+----------------------+-------------------------+
| d3 | true | WEEK | -3 | 3 | p | 1 | MONDAY | N/A | 2020-05-25 14:29:24 | NORMAL | N/A | N/A | [2021-12-01,2021-12-31] |
| d5 | true | DAY | -7 | 3 | p | 32 | N/A | N/A | 2020-05-25 14:29:24 | NORMAL | N/A | N/A | NULL |
| d4 | true | WEEK | -3 | 3 | p | 1 | WEDNESDAY | N/A | 2020-05-25 14:29:24 | NORMAL | N/A | N/A | NULL |
| d6 | true | MONTH | -2147483648 | 2 | p | 8 | 3rd | N/A | 2020-05-25 14:29:24 | NORMAL | N/A | N/A | NULL |
| d2 | true | DAY | -3 | 3 | p | 32 | N/A | N/A | 2020-05-25 14:29:24 | NORMAL | N/A | N/A | NULL |
| d7 | true | MONTH | -2147483648 | 5 | p | 8 | 24th | N/A | 2020-05-25 14:29:24 | NORMAL | N/A | N/A | NULL |
+-----------+--------+----------+-------------+------+--------+---------+-----------+----------------+---------------------+--------+------------------------+----------------------+-------------------------+
|
临时分区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
ALTER TABLE tbl1 ADD TEMPORARY PARTITION tp1 VALUES LESS THAN ("2020-02-01");
ALTER TABLE tbl2 ADD TEMPORARY PARTITION tp1 VALUES [("2020-01-01"), ("2020-02-01"));
ALTER TABLE tbl1 ADD TEMPORARY PARTITION tp1 VALUES LESS THAN ("2020-02-01")
("replication_num" = "1")
DISTRIBUTED BY HASH (k1) BUCKETS 5;
ALTER TABLE tbl3 ADD TEMPORARY PARTITION tp1 VALUES IN ("Beijing", "Shanghai");
ALTER TABLE tbl4 ADD TEMPORARY PARTITION tp1 VALUES IN ((1, "Beijing"), (1, "Shanghai"));
ALTER TABLE tbl3 ADD TEMPORARY PARTITION tp1 VALUES IN ("Beijing", "Shanghai")
("replication_num" = "1")
DISTRIBUTED BY HASH(k1) BUCKETS 5;
|
Data Cache
分区缓存,开启
1
2
|
vim fe/conf/fe.conf
cache_enable_sql_mode=true
|
1
|
MySQL [(none)]> set [global] enable_sql_cache=true;
|
Enable Partition Cache
1
2
|
vim fe/conf/fe.conf
cache_enable_partition_mode=true
|
1
|
MySQL [(none)]> set [global] enable_partition_cache=true;
|
AutoBucket
例子
1
2
3
4
5
6
|
-- old version of the creation syntax for specifying the number of buckets
DISTRIBUTED BY HASH(site) BUCKETS 20
-- Newer versions use the creation syntax for automatic bucket imputation
DISTRIBUTED BY HASH(site) BUCKETS AUTO
properties("estimate_partition_size" = "100G")
|
Broker
只需要少量的内存,不参与计算,主要是 RPC 读写
支持的系统
- Apache HDFS
- Aliyun OSS
- Baidu Cloud BOS
- Tencent Cloud CHDFS
- Tencent Cloud GFS (since 1.2.0)
- Huawei Cloud OBS (since 1.2.0)
- Amazon S3
- JuiceFS (since 2.0.0)
- GCS (since 2.0.0)
整体设计
1
2
3
4
5
6
7
8
9
10
11
12
13
|
+----+ +----+
| FE | | BE |
+-^--+ +--^-+
| |
| |
+-v---------v-+
| Broker |
+------^------+
|
|
+------v------+
|HDFS/BOS/AFS |
+-------------+
|
Resource Management
用来连接外部的计算资源、存储资源、ETL 等
语法
- CREATE RESOURCE
- DROP RESOURCE
- SHOW RESOURCES
支持的资源
- Spark resource: do ETL work
- ODBC resource: query and import data from external tables
spark的例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
CREATE EXTERNAL RESOURCE "spark0"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.jars" = "xxx.jar,yyy.jar",
"spark.files" = "/tmp/aaa,/tmp/bbb",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "queue0",
"spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
"broker" = "broker0",
"broker.username" = "user0",
"broker.password" = "password0"
);
|
ODBC 的例子
1
2
3
4
5
6
7
8
9
10
11
|
CREATE EXTERNAL RESOURCE `oracle_odbc`
PROPERTIES (
"type" = "odbc_catalog",
"host" = "192.168.0.1",
"port" = "8086",
"user" = "test",
"password" = "test",
"database" = "test",
"odbc_type" = "oracle",
"driver" = "Oracle 19 ODBC driver"
);
|
Orthogonal BITMAP Calculation
当数据量很大,超过 1 亿行,10G 的时候,bitmap计算也很慢
将数据分为多个 bucket,然后分别计算
这里需要做两次处理
- 按照 id 范围 1 - 100的,100 - 200 等等分为一组
- 然后分别做聚合,聚合了之后上层算子,再做 merge 处理
- 这样总体计算时间就少了很多
相关函数
- orthogonal_bitmap_intersect
- orthogonal_bitmap_intersect_count
- orthogonal_bitmap_union_count
- orthogonal_bitmap_expr_calculate
- orthogonal_bitmap_expr_calculate_count
Approximate Deduplication Using HLL
这里使用的是 HyperLogLog 近似概率算法
相关函数
- HLL_UNION_AGG(hll)
- HLL_CARDINALITY
- HLL_HASH
例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
create table test_hll(
dt date,
id int,
name char(10),
province char(10),
os char(10),
pv hll hll_union
)
Aggregate KEY (dt,id,name,province,os)
distributed by hash(id) buckets 10
PROPERTIES(
"replication_num" = "1",
"in_memory"="false"
);
|
测试数据
1
2
3
4
5
6
7
8
|
2022-05-05,10001,测试01,北京,windows
2022-05-05,10002,测试01,北京,linux
2022-05-05,10003,测试01,北京,macos
2022-05-05,10004,测试01,河北,windows
2022-05-06,10001,测试01,上海,windows
2022-05-06,10002,测试01,上海,linux
2022-05-06,10003,测试01,江苏,macos
2022-05-06,10004,测试01,陕西,windows
|
加载
1
2
3
4
|
curl --location-trusted -u root: -H "label:label_test_hll_load" \
-H "column_separator:," \
-H "columns:dt,id,name,province,os, pv=hll_hash(id)" -T test_hll.csv \
http://FE_HOST:8030/api/test/test_hll/_stream_load
|
查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
select HLL_UNION_AGG(pv) from test_hll;
+---------------------+
| hll_union_agg(`pv`) |
+---------------------+
| 4 |
+---------------------+
SELECT COUNT(DISTINCT pv) FROM test_hll;
+----------------------+
| count(DISTINCT `pv`) |
+----------------------+
| 4 |
+----------------------+
select HLL_UNION_AGG(pv) from test_hll group by dt;
+-------------------+
| hll_union_agg(pv) |
+-------------------+
| 4 |
| 4 |
+-------------------+
|
Variable
使用
1
2
|
SHOW VARIABLES;
SHOW VARIABLES LIKE '%time_zone%';
|
设置session 级别的
1
2
3
|
SET exec_mem_limit = 137438953472;
SET forward_to_master = true;
SET time_zone = "Asia/Shanghai";
|
全局级别的
1
|
SET GLOBAL exec_mem_limit = 137438953472
|
sql modes,比如 ||
有的数据库是连接,有的是 or
1
2
3
4
5
|
select @@global.sql_mode
select @@session.sql_mode
show global variables
show session variables
|
根据原表结构,创建目标表,有些属性,函数不需要一致
- 属性如:storage_policy
- colocate_with
- 函数如:auto bucket
- dynamic partition
File Manager
将一些 ca 文件 download 下来,保存到本地
使用了 BDBJE, Oracle Berkeley DB Java Edition. Distributed embedded database for persistent metadata in FE.
例子
1
2
3
4
5
6
|
CREATE FILE "test_hll.csv"
PROPERTIES
(
"url" = "http://192.168.12.71:9999/test_hll.csv",
"catalog" = "internal"
);
|
查询
1
2
3
4
5
6
|
SHOW FILE FROM demo;
+-------+----------------------+----------+--------------+----------+-----------+----------------------------------+
| Id | DbName | Catalog | FileName | FileSize | IsContent | MD5 |
+-------+----------------------+----------+--------------+----------+-----------+----------------------------------+
| 12097 | default_cluster:demo | internal | test_hll.csv | 320 | true | 0b197c5ac2c2ad64a8640b3fb8d7d604 |
+-------+----------------------+----------+--------------+----------+-----------+----------------------------------+
|
会将文件保存在内存,同时也会持久化存储
相关配置
- Small_file_dir
- max_small_file_size_bytes
- max_small_file_number
- be config: Small_file_dir
Cold Hot Separation
冷数据可以存储在 S3、或者 HDFS
1
2
3
4
5
6
7
8
9
10
11
|
CREATE RESOURCE "remote_hdfs" PROPERTIES (
"type"="hdfs",
"fs.defaultFS"="fs_host:default_fs_port",
"hadoop.username"="hive",
"hadoop.password"="hive",
"dfs.nameservices" = "my_ha",
"dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
"dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
"dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_prot",
"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
|
创建存储策略
1
2
3
4
|
CREATE STORAGE POLICY test_policy PROPERTIES (
"storage_resource" = "remote_hdfs",
"cooldown_ttl" = "300"
)
|
使用存储策略
1
2
3
4
5
6
7
8
9
10
|
CREATE TABLE IF NOT EXISTS create_table_use_created_policy (
k1 BIGINT,
k2 LARGEINT,
v1 VARCHAR(2048)
)
UNIQUE KEY(k1)
DISTRIBUTED BY HASH (k1) BUCKETS 3
PROPERTIES(
"storage_policy" = "test_policy"
);
|
支持的 schema change
- Add and delete columns
- Modify column type
- Adjust column order
- Add and modify Bloom Filter
- Add and delete bitmap index
Compute Node
doris 默认的架构是Share-Nothing
,计算和存储绑定了
doris 也可以查询数据湖中的数据,但是 存储就有点浪费了
可以配置成 计算模式,这样就变成了 Share-Disk
架构了
BE 节点支持两种模式
- hybrid node,支持计算和存储能力,默认
- computing node,只有计算能力
如果是跨 catalog 查询,默认会交给 computing node 计算
修改 be.conf
1
|
be_node_role = computation
|
查询状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
show backends\G
*************************** 1. row ***************************
BackendId: 10010
Cluster: default_cluster
IP: 10.248.181.219
HeartbeatPort: 9050
BePort: 9060
HttpPort: 8040
BrpcPort: 8060
LastStartTime: 2022-11-30 23:01:40
LastHeartbeat: 2022-12-05 15:01:18
Alive: true
SystemDecommissioned: false
ClusterDecommissioned: false
TabletNum: 753
DataUsedCapacity: 1.955 GB
AvailCapacity: 202.987 GB
TotalCapacity: 491.153 GB
UsedPct: 58.67 %
MaxDiskUsedPct: 58.67 %
RemoteUsedCapacity: 0.000
。。。。。。
HeartbeatFailureCounter: 0
NodeRole: computation
|
Row to Column
使用EXPLODE
函数生成一个虚拟表
例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
CREATE TABLE `person` (
`id` int(11) NULL,
`name` text NULL,
`age` int(11) NULL,
`class` int(11) NULL,
`address` text NULL
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);
|
插入
1
2
3
4
5
|
INSERT INTO person VALUES
(100, 'John', 30, 1, 'Street 1'),
(200, 'Mary', NULL, 1, 'Street 2'),
(300, 'Mike', 80, 3, 'Street 3'),
(400, 'Dan', 50, 4, 'Street 4');
|
查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
mysql> SELECT * FROM person
-> LATERAL VIEW EXPLODE(ARRAY(30, 60)) tableName AS c_age;
+------+------+------+-------+----------+-------+
| id | name | age | class | address | c_age |
+------+------+------+-------+----------+-------+
| 100 | John | 30 | 1 | Street 1 | 30 |
| 100 | John | 30 | 1 | Street 1 | 60 |
| 200 | Mary | NULL | 1 | Street 2 | 30 |
| 200 | Mary | NULL | 1 | Street 2 | 60 |
| 300 | Mike | 80 | 3 | Street 3 | 30 |
| 300 | Mike | 80 | 3 | Street 3 | 60 |
| 400 | Dan | 50 | 4 | Street 4 | 30 |
| 400 | Dan | 50 | 4 | Street 4 | 60 |
+------+------+------+-------+----------+-------+
|
Best Practice
查询计划解析
1
|
EXPLAIN select * from test_hll as h join test_table as t on h.id = t.user_id where user_id > 1;
|
更详细的文本方式
1
|
EXPLAIN VERBOSE select ...
|
类似 Impala 的 图形查询计划
1
|
EXPLAIN GRAPH select ...
|
类似 Spark 那种toString 逻辑计划的方式打印出来
类似上面 Spark 的那种,这次的 查询 plan 是解析后的
1
|
EXPLAIN ANALYZED PLAN select ...;
|
执行过查询重写后的
1
|
EXPLAIN REWRITTEN PLAN select ...;
|
显示优化有的逻辑计划
1
|
EXPLAIN OPTIMIZED PLAN select ...;
|
简化后的查询计划
1
|
EXPLAIN SHAPE PLAN select ...;
|
查询计划之间的转换,通过如下两个算子
- DataStreamSink
- ExchangeNode
查看 profile
1
|
SET is_report_success=true;
|
debug log,FE 有三种方式
- 修改配置文件,需要重启
- 通过 WebUI 修改,无需重启
- 或者通过 HTTP API 方式修改
fe 的修改配置文件,修改 fe.conf 文件
1
2
3
4
5
6
7
8
|
# Only enable Debug log for class org.apache.doris.catalog.Catalog
sys_log_verbose_modules=org.apache.doris.catalog.Catalog
# Open the Debug log of all classes under the package org.apache.doris.catalog
sys_log_verbose_modules=org.apache.doris.catalog
# Enable Debug logs for all classes under package org
sys_log_verbose_modules=org
|
修改 be
1
2
|
sys_log_verbose_modules=plan_fragment_executor,olap_scan_node
sys_log_verbose_level=3
|
压缩
- Vertical Compaction
- 将多个列合并为一组做压缩,比传统的 row 压缩效果更好,内存消耗是 1/10,压缩比提高 15%
- Segment Compaction
- 用于处理数据 load,这可能会引发很多小文件
相关配置
- enable_vertical_compaction
- vertical_compaction_num_columns_per_group
- vertical_compaction_max_segment_size
- enable_segcompaction
- segcompaction_batch_size
WEB UI
- FE,http://FE_HOST:8030/
- BE,http://FE_HOST:8040/