• 1. 曹 鲁 2017.9.2CarbonData Partition 功能介绍上汽集团 CarbonData实践分享与
  • 2. 关于我曹 鲁 2010年毕业于武汉大学计算机学院 曾负责某金融行业公司BI、ETL系统开发,某互联网电商行业公司数据仓库的容量管理、性能调优等 2016年加入上汽集团数据业务部 负责大数据平台架构设计与开发 关注大数据技术与开源社区 Mail:caolu@saicmotor.com
  • 3. 上汽集团数据业务部主要负责 规划和实施数据管理体系 建设大数据基础架构和分析平台 拓展和提升集团内企业数据业务能力 推动人工智能技术在集团业务中的应用 为上汽集团战略转型和创新发展提供支持和服务。
  • 4. 议题CarbonData Partition 功能介绍 CarbonData背景、文件结构、建索引过程简介 Partition功能将带来的改变 Partition Table的建表语句 Partition Table的数据加载过程 Partition Table的查询过程 Partition 的新增(add)、拆分(split)及删除(drop) 上汽集团的CarbonData实践
  • 5. CarbonData Partition 功能介绍Apache CarbonData 是大数据平台上一种带索引的列式存储数据格式2016年6月由华为公司贡献至apache基金会,成为apache孵化项目 2017年4月正式毕业为apache顶级项目
  • 6. CarbonData Partition 功能介绍 —— 文件结构简介Blocklet: 文件内的数据块 Column chunk: Blocklet内的列数据 元数据和索引信息 Header:Version,Schema Footer: Blocklet Offset, Index & statistics Carbon Data File Blocklet 1Column 1 Chunk Column 2 Chunk…Column n Chunk Blocklet NFile Footer…Blocklet Offset, Index & StatsFile HeaderVersionSchema Page1…Page2Page3
  • 7. CarbonData Partition 功能介绍 —— 索引生成过程字典编码 + 排序MDK => Multi Dimension KeyYearsQuartersMonthsTerritoryCountryQuantitySales2003QTR1JanEMEAGermany14211,4322003QTR1JanAPACChina54154,7022003QTR1JanEMEASpain44344,6222003QTR1FebEMEADenmark54558,8712003QTR1FebEMEAItaly67556,1812003QTR1MarAPACIndia529,7492003QTR1MarEMEAUK57051,0182003QTR1MarJapanJapan56155,2452003QTR2AprAPACAustralia52550,3982003QTR2AprEMEAGermany14411,532[1,1,1,1,1] : [142,11432] [1,1,1,3,2] : [541,54702] [1,1,1,1,3] : [443,44622] [1,1,2,1,4] : [545,58871] [1,1,2,1,5] : [675,56181] [1,1,3,3,6] : [52,9749] [1,1,3,1,7] : [570,51018] [1,1,3,2,8] : [561,55245] [1,2,4,3,9] : [525,50398] [1,2,4,1,1] : [144,11532] Blocklet (Columnar view)Sort (MDK Index)[1,1,1,1,1] : [142,11432] [1,1,1,1,3] : [443,44622] [1,1,1,3,2] : [541,54702] [1,1,2,1,4] : [545,58871] [1,1,2,1,5] : [675,56181] [1,1,3,1,7] : [570,51018] [1,1,3,2,8] : [561,55245] [1,1,3,3,6] : [52,9749] [1,2,4,1,1] : [144,11532] [1,2,4,3,9] : [525,50398] Sorted MDK Index1 1 1 1 1 1 1 1 1 11 1 1 1 1 1 1 1 2 21 1 1 2 2 3 3 3 4 41 1 3 1 1 1 2 3 1 3142 443 541 545 675 570 561 52 144 52511432 44622 54702 58871 56181 51018 55245 9749 11532 50398C1 C2 C3 C4 C5 C6 C71 3 2 4 5 7 8 6 1 9Encoding
  • 8. Blocklet 1111111120001112125000112111120001122125000113111120001132125000Blocklet 2121323110001223231100012332311000131434200013153410001324342000Blocklet 3132534100013343420001335341000141411200001424112000014341120000Blocklet 4211111120002112125000212111120002122125000213111120002132125000Blocklet IndexBlocklet1Start Key1End Key1Start Key1End Key4Start Key1End Key2Start Key3End Key4Start Key1End Key1Start Key2End Key2Start Key3End Key3Start Key4End Key4File FooterBlockletC1(Min, Max)….C7(Min, Max)Blocklet4Start Key4End Key4C1(Min, Max)….C7(Min, Max)C1(Min,Max)…C7(Min,Max)C1(Min,Max)…C7(Min,Max)C1(Min,Max)…C7(Min,Max)C1(Min,Max)…C7(Min,Max)CarbonData Partition 功能介绍 —— 索引生成过程
  • 9. CarbonData对比Parquet查询性能测试(Dec. 2016, CarbonData 0.2.0)CarbonData Partition 功能介绍 —— 初次性能测试没有排序维度过滤条件的聚合查询,即无法使用Btree索引 有排序维度列作为过滤条件,但聚合列在MDKey中排序较为靠后的聚合查询 相对较慢的查询场景:
  • 10. CarbonData Partition 功能介绍为什么我们要做Partition功能?Partition功能将带来哪些改变?or
  • 11. 改变1:数据将基于Partition列更为集中存储,查询时可过滤掉大量block,减少spark task数量 P1P2P3P4查询引擎CarbonData Partition 功能介绍 —— Partition带来的改变
  • 12. CarbonData Partition 功能介绍 —— Partition带来的改变Col ACol BCol CCol DCol E. . . . . . . . . . . . .Col ACol BCol CCol E. . . . . .. . . . . .Col DP1P2改变2:可以使其他列在排序中更靠前,提升查询性能
  • 13. CarbonData Partition 功能介绍目前支持三种类型Partition Table: Hash Partition Range Partition List Partition Range Interval(Developing)
  • 14. CarbonData Partition 功能介绍CREATE TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name data_type , ...)] PARTITIONED BY (partition_col_name data_type) STORED BY 'carbondata' [TBLPROPERTIES ('PARTITION_TYPE'='HASH', 'PARTITION_NUM'='N' ...)] //N is the number of hash partitionsHash Partition DDL Syntaxcreate table if not exists hash_partition_table( col_A String, col_B Int, col_C Long, col_D Decimal(10,2), col_F Timestamp ) partitioned by (col_E Long) stored by 'carbondata' tblproperties('partition_type'='Hash','partition_num'='9')Example
  • 15. CarbonData Partition 功能介绍CREATE TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name data_type , ...)] PARTITIONED BY (partition_col_name data_type) STORED BY 'carbondata' [TBLPROPERTIES ('PARTITION_TYPE'='RANGE', 'RANGE_INFO'='2014-01-01, 2015-01-01, 2016-01-01' ...)]Range Partition DDL Syntaxcreate table if not exists range_partition_table( col_A String, col_B Int, col_C Long, col_D Decimal(10,2), col_E Long ) partitioned by (col_F Timestamp) stored by 'carbondata' tblproperties('partition_type'='Range', 'range_info'='2015-01-01, 2016-01-01, 2017-01-01, 2017-02-01')ExampleRangeInfo 必须为升序
  • 16. CREATE TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name data_type , ...)] PARTITIONED BY (partition_col_name data_type) STORED BY 'carbondata' [TBLPROPERTIES ('PARTITION_TYPE'='LIST', 'LIST_INFO'='A, B, C' ...)]List Partition DDL Syntaxcreate table if not exists list_partition_table( col_B Int, col_C Long, col_D Decimal(10,2), col_E Long, col_F Timestamp ) partitioned by (col_A String) stored by 'carbondata' tblproperties('partition_type'='List', 'list_info'='aaaa, bbbb, (cccc, dddd), eeee')ExampleCarbonData Partition 功能介绍ListInfo支持One Level Group
  • 17. Syntax:CarbonData Partition 功能介绍 —— Show Partition SHOW PARTITIONS [db_name.]table_name+--------------------+ |partition | +--------------------+ |vin = HASH_NUMBER(5)| +--------------------++--------------------------+ |partition | +--------------------------+ |0, country = DEFAULT | |1, country = China | |2, country = UK, US | |3, country = Japan | |7, country = Canada | |8, country = Russia | |9, country = Fiji | |5, country = Korea | |6, country = India | +--------------------------++-------------------------------------+ |partition | +-------------------------------------+ |0, logdate = DEFAULT | |1, logdate < 2014/01/01 | |2, 2014/01/01 <= logdate < 2015/01/01| |3, 2015/01/01 <= logdate < 2016/01/01| +-------------------------------------+Partition IdHash PartitionRange PartitionList Partition
  • 18. CarbonData Partition 功能介绍 ——数据加载过程DataNodeDataNodeDataNodeReaderReaderReaderHDFSScan RDDrow => (partitionValue, row)ShuffleWriterWriterWriterWriterLoad RDDRdd.partitionby(partitioner)
  • 19. CarbonData Partition 功能介绍 ——查询过程1. 根据SQL中的过滤条件=, <=, <, >, >=, in, not in以及表达式右值确定命中的partitionIdExample: 某表A以年龄为分区字段,创建range partition table 分区信息为(’range_info’ = ‘10, 20, 30, 40, 50, 60, 70, 80’) SQL过滤条件: age >= 20 and age < 40 得到命中的分区ID为3, 42. 如果有其他在排过序的维度列有过滤条件,则在driver端根据Btree索引获取blocklet 所在的文件名,如没有则获取全部,再根据文件名中的partitionId,筛选得到需要读取的文件,最后再下发spark task进行读取Partition Id
  • 20. CarbonData Partition 功能介绍 —— Add Partition Syntax:ALTER TABLE [db_name].table_name ADD PARTITION('new_partition')0123N…...Max1. 修改PartitionInfo2. 读取默认分区(PartitionId为0)数据并根据新的PartitionInfo重新导入数据00Max3. 删除旧的默认分区数据文件Process:0123N…...Max
  • 21. CarbonData Partition 功能介绍 —— Split Partition Syntax:ALTER TABLE [db_name].table_name SPLIT PARTITION(partition_id) INTO('new_partition1', 'new_partition2'...)Process:1. 修改PartitionInfo012356478012564Note:不能Split 0分区(默认分区)
  • 22. CarbonData Partition 功能介绍 —— Split Partition Process:2.读取目标分区数据并根据新的PartitionInfo重新导入数据RangePartitionListPartitionAB[A, B) => [A, C), [C, D), [D, B)CD3.删除旧分区数据文件
  • 23. CarbonData Partition 功能介绍 —— Drop Partition Syntax://Drop partition definition only and keep data ALTER TABLE [db_name].table_name DROP PARTITION(partition_id) //Drop both partition definition and data ALTER TABLE [db_name].table_name DROP PARTITION(partition_id) WITH DATAProcess:1. 修改PartitionInfo01235640125643不能Drop默认分区
  • 24. CarbonData Partition 功能介绍 —— Drop Partition Process:2a. Drop Partition 但保留数据 —— RangePartition0123564012564Drop非最后partition 数据merge到 next partitionDrop最后一个partition 数据merge到default partiton0123564012534
  • 25. CarbonData Partition 功能介绍 —— Drop Partition Process:2a. Drop Partition 但保留数据 —— ListPartition01235640126352b. Drop Partition 不保留数据数据merge到default partiton0123564012645
  • 26. 上汽集团CarbonData实践分享集群环境:1台Spark client server 6台DataNode server启用资源: spark-shell --master yarn --deploy-mode client --num-executors 6 --driver-memory 10g --executor-memory 50g --executor-cores 5测试数据样本: 荣威RX5 2017年1月1日~1月30日数据 每天3~5亿条,19~30G 30天共109亿条,约667G
  • 27. 上汽集团CarbonData实践分享CREATE TABLE IF NOT EXISTS rx5_carbon_partition_test( Id string, create_time timestamp, Field_C Int, Field_D String, … ... Field_XX ) PARTITIONED BY (create_time_hour int) STORED BY 'carbondata' TBLPROPERTIES('PARTITION_TYPE' = 'RANGE', 'RANGE_INFO' = '1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24', 'SORT_COLUMNS' = 'Id, Field_C, Field_D ')建表语句:共51个字段,其中绝大部分为维度
  • 28. 上汽集团CarbonData实践分享
  • 29. 上汽集团CarbonData实践分享select Date(create_time), count(distinct ID) from default.rx5_carbon_partition_test where create_time_hour >= 17 and create_time_hour < 20 group by Date(create_time)性能测试Case1:无排序维度列作为过滤条件,有partition列上的范围过滤条件的聚合查询无法应用索引 或在索引中排序位置较为靠后查询时间对比Spark Task数对比
  • 30. 上汽集团CarbonData实践分享性能测试Case2:有维度列作为过滤条件,有Partition列过滤条件的聚合查询select Date(create_time), count(*) from default.rx5_carbon_partition_test where create_time_hour >= 17 and create_time_hour < 20 and id = ’XXXX0000000001’ group by Date(create_time)查询时间对比Spark Task数对比
  • 31. 上汽集团CarbonData实践分享性能测试Case3:没有过滤条件的聚合查询 性能测试Case4:没有过滤条件的聚合查询,聚合列为partition列select Date(create_time), count(*) from default.rx5_carbon_partition_test group by Date(create_time)select create_time_hour, count(*) from default.rx5_carbon_partition_test group by create_time_hourselect Date(create_time), count(*) from default.rx5_carbon_nonpartition_test group by Date(create_time)select hour(create_time), count(*) from default.rx5_carbon_nonpartition_test group by hour(create_time)查询时间对比Spark Task数对比
  • 32. select * from default.rx5_carbon_partition_test where create_time_hour>= 17 and create_time_hour < 20 and id= ’XXX127893791231023' and date(create_time) = '2017-01-04'上汽集团CarbonData实践分享性能测试5:明细数据查询
  • 33. CarbonData Partition的性能调优1. 选择最合适的Partition列 结合业务,了解哪些列会作为常用的过滤条件 结合数据,可粗略统计一下数据分布情况,避免选择数据倾斜严重的列 如有多种选择,可再结合数据加载效率考虑 2. 尽可能的使用Partition列作为过滤条件 例如Partition列为A,你根据业务需求在Column B上有筛选条件,但你注意到A与B列之间存在某种固定的mapping关系,这时就可以根据B列的过滤条件再新增一个partition列的过滤条件,以提高查询效率。
  • 34. CarbonData Partition TODO ListRange Interval Partition Value-Based Partition Alter Table Merge Partition Optimization for join between partition tables …...
  • 35. 欢迎加入Apache CarbonData社区: 发邮件至 dev-subscribe@carbondata.apache.org加入邮件组 关于使用CarbonData过程中遇到任何问题均可发邮件至 dev@carbondata.apache.org 官网:http://carbondata.apache.org/ 扫描二维码获取 上汽集团数据业务部开放职位谢谢