- 1. 曹 鲁
2017.9.2CarbonData
Partition 功能介绍上汽集团
CarbonData实践分享与
- 2. 关于我曹 鲁
2010年毕业于武汉大学计算机学院
曾负责某金融行业公司BI、ETL系统开发,某互联网电商行业公司数据仓库的容量管理、性能调优等
2016年加入上汽集团数据业务部
负责大数据平台架构设计与开发
关注大数据技术与开源社区
Mail:caolu@saicmotor.com
- 3. 上汽集团数据业务部主要负责
规划和实施数据管理体系
建设大数据基础架构和分析平台
拓展和提升集团内企业数据业务能力
推动人工智能技术在集团业务中的应用
为上汽集团战略转型和创新发展提供支持和服务。
- 4. 议题CarbonData Partition 功能介绍
CarbonData背景、文件结构、建索引过程简介
Partition功能将带来的改变
Partition Table的建表语句
Partition Table的数据加载过程
Partition Table的查询过程
Partition 的新增(add)、拆分(split)及删除(drop)
上汽集团的CarbonData实践
- 5. CarbonData Partition 功能介绍Apache CarbonData
是大数据平台上一种带索引的列式存储数据格式2016年6月由华为公司贡献至apache基金会,成为apache孵化项目
2017年4月正式毕业为apache顶级项目
- 6. CarbonData Partition 功能介绍 —— 文件结构简介Blocklet: 文件内的数据块
Column chunk: Blocklet内的列数据
元数据和索引信息
Header:Version,Schema
Footer: Blocklet Offset, Index & statistics Carbon Data File Blocklet 1Column 1 Chunk
Column 2 Chunk…Column n Chunk Blocklet NFile Footer…Blocklet Offset, Index & StatsFile HeaderVersionSchema Page1…Page2Page3
- 7. CarbonData Partition 功能介绍 —— 索引生成过程字典编码 + 排序MDK => Multi Dimension KeyYearsQuartersMonthsTerritoryCountryQuantitySales2003QTR1JanEMEAGermany14211,4322003QTR1JanAPACChina54154,7022003QTR1JanEMEASpain44344,6222003QTR1FebEMEADenmark54558,8712003QTR1FebEMEAItaly67556,1812003QTR1MarAPACIndia529,7492003QTR1MarEMEAUK57051,0182003QTR1MarJapanJapan56155,2452003QTR2AprAPACAustralia52550,3982003QTR2AprEMEAGermany14411,532[1,1,1,1,1] : [142,11432]
[1,1,1,3,2] : [541,54702]
[1,1,1,1,3] : [443,44622]
[1,1,2,1,4] : [545,58871]
[1,1,2,1,5] : [675,56181]
[1,1,3,3,6] : [52,9749]
[1,1,3,1,7] : [570,51018]
[1,1,3,2,8] : [561,55245]
[1,2,4,3,9] : [525,50398]
[1,2,4,1,1] : [144,11532]
Blocklet (Columnar view)Sort
(MDK Index)[1,1,1,1,1] : [142,11432]
[1,1,1,1,3] : [443,44622]
[1,1,1,3,2] : [541,54702]
[1,1,2,1,4] : [545,58871]
[1,1,2,1,5] : [675,56181]
[1,1,3,1,7] : [570,51018]
[1,1,3,2,8] : [561,55245] [1,1,3,3,6] : [52,9749] [1,2,4,1,1] : [144,11532]
[1,2,4,3,9] : [525,50398]
Sorted MDK Index1
1
1
1
1
1
1
1
1
11
1
1
1
1
1
1
1
2
21
1
1
2
2
3
3
3
4
41
1
3
1
1
1
2
3
1
3142
443
541
545
675
570
561
52
144
52511432
44622
54702
58871
56181
51018
55245
9749
11532
50398C1 C2 C3 C4 C5 C6 C71
3
2
4
5
7
8
6
1
9Encoding
- 8. Blocklet 1111111120001112125000112111120001122125000113111120001132125000Blocklet 2121323110001223231100012332311000131434200013153410001324342000Blocklet 3132534100013343420001335341000141411200001424112000014341120000Blocklet 4211111120002112125000212111120002122125000213111120002132125000Blocklet IndexBlocklet1Start Key1End Key1Start Key1End Key4Start Key1End Key2Start Key3End Key4Start Key1End Key1Start Key2End Key2Start Key3End Key3Start Key4End Key4File FooterBlockletC1(Min, Max)….C7(Min, Max)Blocklet4Start Key4End Key4C1(Min, Max)….C7(Min, Max)C1(Min,Max)…C7(Min,Max)C1(Min,Max)…C7(Min,Max)C1(Min,Max)…C7(Min,Max)C1(Min,Max)…C7(Min,Max)CarbonData Partition 功能介绍 —— 索引生成过程
- 9. CarbonData对比Parquet查询性能测试(Dec. 2016, CarbonData 0.2.0)CarbonData Partition 功能介绍 —— 初次性能测试没有排序维度过滤条件的聚合查询,即无法使用Btree索引
有排序维度列作为过滤条件,但聚合列在MDKey中排序较为靠后的聚合查询
相对较慢的查询场景:
- 10. CarbonData Partition 功能介绍为什么我们要做Partition功能?Partition功能将带来哪些改变?or
- 11. 改变1:数据将基于Partition列更为集中存储,查询时可过滤掉大量block,减少spark task数量
P1P2P3P4查询引擎CarbonData Partition 功能介绍 —— Partition带来的改变
- 12. CarbonData Partition 功能介绍 —— Partition带来的改变Col ACol BCol CCol DCol E.
.
.
.
.
.
.
.
.
.
.
.
.Col ACol BCol CCol E.
.
.
.
.
..
.
.
.
.
.Col DP1P2改变2:可以使其他列在排序中更靠前,提升查询性能
- 13. CarbonData Partition 功能介绍目前支持三种类型Partition Table:
Hash Partition
Range Partition
List Partition
Range Interval(Developing)
- 14. CarbonData Partition 功能介绍CREATE TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name data_type , ...)]
PARTITIONED BY (partition_col_name data_type)
STORED BY 'carbondata'
[TBLPROPERTIES ('PARTITION_TYPE'='HASH', 'PARTITION_NUM'='N' ...)]
//N is the number of hash partitionsHash Partition DDL Syntaxcreate table if not exists hash_partition_table(
col_A String,
col_B Int,
col_C Long,
col_D Decimal(10,2),
col_F Timestamp )
partitioned by (col_E Long)
stored by 'carbondata' tblproperties('partition_type'='Hash','partition_num'='9')Example
- 15. CarbonData Partition 功能介绍CREATE TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name data_type , ...)]
PARTITIONED BY (partition_col_name data_type)
STORED BY 'carbondata'
[TBLPROPERTIES ('PARTITION_TYPE'='RANGE', 'RANGE_INFO'='2014-01-01, 2015-01-01, 2016-01-01' ...)]Range Partition DDL Syntaxcreate table if not exists range_partition_table(
col_A String,
col_B Int,
col_C Long,
col_D Decimal(10,2),
col_E Long )
partitioned by (col_F Timestamp)
stored by 'carbondata'
tblproperties('partition_type'='Range',
'range_info'='2015-01-01, 2016-01-01, 2017-01-01, 2017-02-01')ExampleRangeInfo 必须为升序
- 16. CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name data_type , ...)]
PARTITIONED BY (partition_col_name data_type)
STORED BY 'carbondata'
[TBLPROPERTIES ('PARTITION_TYPE'='LIST', 'LIST_INFO'='A, B, C' ...)]List Partition DDL Syntaxcreate table if not exists list_partition_table(
col_B Int,
col_C Long,
col_D Decimal(10,2),
col_E Long,
col_F Timestamp )
partitioned by (col_A String)
stored by 'carbondata'
tblproperties('partition_type'='List', 'list_info'='aaaa, bbbb, (cccc, dddd), eeee')ExampleCarbonData Partition 功能介绍ListInfo支持One Level Group
- 17. Syntax:CarbonData Partition 功能介绍 —— Show Partition SHOW PARTITIONS [db_name.]table_name+--------------------+
|partition |
+--------------------+
|vin = HASH_NUMBER(5)|
+--------------------++--------------------------+
|partition |
+--------------------------+
|0, country = DEFAULT |
|1, country = China |
|2, country = UK, US |
|3, country = Japan |
|7, country = Canada |
|8, country = Russia |
|9, country = Fiji |
|5, country = Korea |
|6, country = India |
+--------------------------++-------------------------------------+
|partition |
+-------------------------------------+
|0, logdate = DEFAULT |
|1, logdate < 2014/01/01 |
|2, 2014/01/01 <= logdate < 2015/01/01|
|3, 2015/01/01 <= logdate < 2016/01/01|
+-------------------------------------+Partition IdHash PartitionRange PartitionList Partition
- 18. CarbonData Partition 功能介绍 ——数据加载过程DataNodeDataNodeDataNodeReaderReaderReaderHDFSScan RDDrow => (partitionValue, row)ShuffleWriterWriterWriterWriterLoad RDDRdd.partitionby(partitioner)
- 19. CarbonData Partition 功能介绍 ——查询过程1. 根据SQL中的过滤条件=, <=, <, >, >=, in, not in以及表达式右值确定命中的partitionIdExample:
某表A以年龄为分区字段,创建range partition table
分区信息为(’range_info’ = ‘10, 20, 30, 40, 50, 60, 70, 80’)
SQL过滤条件: age >= 20 and age < 40
得到命中的分区ID为3, 42. 如果有其他在排过序的维度列有过滤条件,则在driver端根据Btree索引获取blocklet 所在的文件名,如没有则获取全部,再根据文件名中的partitionId,筛选得到需要读取的文件,最后再下发spark task进行读取Partition Id
- 20. CarbonData Partition 功能介绍 —— Add Partition Syntax:ALTER TABLE [db_name].table_name ADD PARTITION('new_partition')0123N…...Max1. 修改PartitionInfo2. 读取默认分区(PartitionId为0)数据并根据新的PartitionInfo重新导入数据00Max3. 删除旧的默认分区数据文件Process:0123N…...Max
- 21. CarbonData Partition 功能介绍 —— Split Partition Syntax:ALTER TABLE [db_name].table_name SPLIT PARTITION(partition_id) INTO('new_partition1', 'new_partition2'...)Process:1. 修改PartitionInfo012356478012564Note:不能Split 0分区(默认分区)
- 22. CarbonData Partition 功能介绍 —— Split Partition Process:2.读取目标分区数据并根据新的PartitionInfo重新导入数据RangePartitionListPartitionAB[A, B) =>
[A, C), [C, D), [D, B)CD3.删除旧分区数据文件
- 23. CarbonData Partition 功能介绍 —— Drop Partition Syntax://Drop partition definition only and keep data
ALTER TABLE [db_name].table_name DROP PARTITION(partition_id)
//Drop both partition definition and data
ALTER TABLE [db_name].table_name DROP PARTITION(partition_id) WITH DATAProcess:1. 修改PartitionInfo01235640125643不能Drop默认分区
- 24. CarbonData Partition 功能介绍 —— Drop Partition Process:2a. Drop Partition 但保留数据 —— RangePartition0123564012564Drop非最后partition
数据merge到 next partitionDrop最后一个partition
数据merge到default partiton0123564012534
- 25. CarbonData Partition 功能介绍 —— Drop Partition Process:2a. Drop Partition 但保留数据 —— ListPartition01235640126352b. Drop Partition 不保留数据数据merge到default partiton0123564012645
- 26. 上汽集团CarbonData实践分享集群环境:1台Spark client server 6台DataNode server启用资源:
spark-shell
--master yarn
--deploy-mode client
--num-executors 6
--driver-memory 10g
--executor-memory 50g
--executor-cores 5测试数据样本:
荣威RX5 2017年1月1日~1月30日数据
每天3~5亿条,19~30G
30天共109亿条,约667G
- 27. 上汽集团CarbonData实践分享CREATE TABLE IF NOT EXISTS rx5_carbon_partition_test(
Id string,
create_time timestamp,
Field_C Int,
Field_D String,
…
...
Field_XX
)
PARTITIONED BY (create_time_hour int)
STORED BY 'carbondata'
TBLPROPERTIES('PARTITION_TYPE' = 'RANGE',
'RANGE_INFO' = '1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24', 'SORT_COLUMNS' = 'Id, Field_C, Field_D ')建表语句:共51个字段,其中绝大部分为维度
- 28. 上汽集团CarbonData实践分享
- 29. 上汽集团CarbonData实践分享select Date(create_time), count(distinct ID)
from default.rx5_carbon_partition_test
where create_time_hour >= 17 and create_time_hour < 20
group by Date(create_time)性能测试Case1:无排序维度列作为过滤条件,有partition列上的范围过滤条件的聚合查询无法应用索引
或在索引中排序位置较为靠后查询时间对比Spark Task数对比
- 30. 上汽集团CarbonData实践分享性能测试Case2:有维度列作为过滤条件,有Partition列过滤条件的聚合查询select Date(create_time), count(*)
from default.rx5_carbon_partition_test
where create_time_hour >= 17 and create_time_hour < 20
and id = ’XXXX0000000001’
group by Date(create_time)查询时间对比Spark Task数对比
- 31. 上汽集团CarbonData实践分享性能测试Case3:没有过滤条件的聚合查询
性能测试Case4:没有过滤条件的聚合查询,聚合列为partition列select Date(create_time), count(*)
from default.rx5_carbon_partition_test
group by Date(create_time)select create_time_hour, count(*)
from default.rx5_carbon_partition_test
group by create_time_hourselect Date(create_time), count(*)
from default.rx5_carbon_nonpartition_test
group by Date(create_time)select hour(create_time), count(*)
from default.rx5_carbon_nonpartition_test
group by hour(create_time)查询时间对比Spark Task数对比
- 32. select * from default.rx5_carbon_partition_test
where create_time_hour>= 17 and create_time_hour < 20
and id= ’XXX127893791231023'
and date(create_time) = '2017-01-04'上汽集团CarbonData实践分享性能测试5:明细数据查询
- 33. CarbonData Partition的性能调优1. 选择最合适的Partition列
结合业务,了解哪些列会作为常用的过滤条件
结合数据,可粗略统计一下数据分布情况,避免选择数据倾斜严重的列
如有多种选择,可再结合数据加载效率考虑
2. 尽可能的使用Partition列作为过滤条件
例如Partition列为A,你根据业务需求在Column B上有筛选条件,但你注意到A与B列之间存在某种固定的mapping关系,这时就可以根据B列的过滤条件再新增一个partition列的过滤条件,以提高查询效率。
- 34. CarbonData Partition TODO ListRange Interval Partition
Value-Based Partition
Alter Table Merge Partition
Optimization for join between partition tables
…...
- 35. 欢迎加入Apache CarbonData社区:
发邮件至 dev-subscribe@carbondata.apache.org加入邮件组
关于使用CarbonData过程中遇到任何问题均可发邮件至
dev@carbondata.apache.org
官网:http://carbondata.apache.org/
扫描二维码获取
上汽集团数据业务部开放职位谢谢