Hive 实现原理


Hive 原理 新人培训课程 | 从入门到精通 1 作者:周忱 | 淘宝综合产品 微博:@MinZhou 邮箱:zhouchen.zm@taobao.com Hive实现原理 1 关于我 Taobao Java Team | zhouchen.zm • 花名:周忱(chén) • 真名:周敏 • 微博: @MinZhou • Twitter: @minzhou • 2010年6月加入淘宝 • 曾经淘宝Hadoop&Hive研发 组Leader • 目前专注分布式实时计算 • Hive Contributor • 自由、开源软件热爱者 Hive实现原理 1 关于我 Taobao Java Team | zhouchen.zm • 花名:周忱(chén) • 真名:周敏 • 微博: @MinZhou • Twitter: @minzhou • 2010年6月加入淘宝 • 曾经淘宝Hadoop&Hive研发 组Leader • 目前专注分布式实时计算 • Hive Contributor • 自由、开源软件热爱者 如何用MR实现下面语句? 1Hive实现原理 Taobao Java Team | zhouchen.zm SELECT pageid, age, count(1) FROM pv_users GROUP BY pageid, age; pageid age 1 25 2 25 1 32 2 25 pv_users pageid age count 1 25 1 2 25 2 1 32 1 就是这么简单 1Hive实现原理 Taobao Java Team | zhouchen.zm pageid age 1 25 2 25 pv_users pageid 1 1 pageid age 1 32 2 25 Map key value <1,25> 1 <2,25> 1 key value <1,32> 1 <2,25> 1 key value <1,25> 1 <1,32> 1 key value <2,25> 1 <2,25> 1 Shuffle Sort pageid 2 Reduce 就是这么简单 1Hive实现原理 Taobao Java Team | zhouchen.zm pageid age count 1 25 1 1 32 1 Map key value <1,25> 1 <2,25> 1 key value <1,32> 1 <2,25> 1 key value <1,25> 1 <1,32> 1 key value <2,25> 1 <2,25> 1 Shuffle Sort pageid age count 2 25 2 Reduce 为什么要学习Hive的实现? • Hive学习曲线平缓,适合非专业人员, 集团内部普遍使用 • 一道Hive SQL将转换为多少道M/R 作业? • 我们怎么加快Hive SQL的执行速度? • 编写Hive SQL的时候我们可以做些什么? • Hive怎么将HiveQL转换成M/R 作业? • Hive将会采用什么样的优化方式? 1Hive实现原理 Taobao Java Team | zhouchen.zm 组件分析 Hive实现原理 1 Taobao Java Team | zhouchen.zm Hive架构&执行流程 Hive实现原理 1 Driver Compiler Hadoop Client Metastore Taobao Java Team | zhouchen.zm Hive执行流程 • 编译器将Hive SQL 转换成一组操作符(Operator) • 操作符是Hive的最小处理单元 • 每个操作符处理代表一道HDFS操作或MapReduce作业 Hive实现原理 1 Driver Compiler Hadoop Client Metastor e Taobao Java Team | zhouchen.zm Hive执行流程 • 操作符 Hive实现原理 1 操作符 描述 TableScanOperator 扫描hive表数据 ReduceSinkOperator 创建将发送到Reducer端的对 JoinOperator Join两份数据 SelectOperator 选择输出列 FileSinkOperator 建立结果数据,输出至文件 FilterOperator 过滤输入数据 GroupByOperator Group By语句 MapJoinOperator /*+ mapjoin(t) */ LimitOperator Limit语句 UnionOperator Union语句 Taobao Java Team | zhouchen.zm Hive执行流程 • Hive通过ExecMapper和ExecReducer执行MapReduce任务 • 在执行MapReduce时有两种模式 – 本地模式 – 分布式模式 Hive实现原理 1 Driver Compiler Hadoop Client Metastor e Taobao Java Team | zhouchen.zm Hive架构&执行流程 Hive实现原理 1 Driver Compiler Hadoop Client Metastore Taobao Java Team | zhouchen.zm Hive编译器 Parser • 将SQL转换成抽象语法树 Semantic Analyzer • 将抽象语法树转换成查询块 Logic Plan Generator • 将查询块转换成逻辑查询计划 Hive实现原理 1 Taobao Java Team | zhouchen.zm Hive编译器 Logical Optimizer • 重写逻辑查询计划 Physical Plan Generator • 将逻辑计划转成物理计划 ( M/R jobs ) Physical Optimizer • 选择最佳的Join策略 Hive实现原理 1 Taobao Java Team | zhouchen.zm 编译流程 Hive实现原理 1 Semantic Analyzer Logical Plan Gen. Logical Optimizer Physical Plan Gen. Physical Optimizer Parser Taobao Java Team | zhouchen.zm 编译流程 Hive实现原理 1 Semantic Analyzer Logical Plan Gen. Logical Optimizer Physical Plan Gen. Physical Optimizer Parser Taobao Java Team | zhouchen.zm Hive QL AST QB OP Tree Task Tree Task Tree OP Tree Semantic Analyzer Logical Plan Gen. Logical Optimizer Physical Plan Gen. Physical OptimizerParser Parser AST INSERT OVERWRITE TABLE access_log_temp2 SELECT a.user, a.prono, p.maker, p.price FROM access_log_hbase a JOIN product_hbase p ON (a.prono = p.prono); TOK_QUERY + TOK_FROM + TOK_JOIN + TOK_TABREF + TOK_TABNAME + "access_log_hbase" + a + TOK_TABREF + TOK_TABNAME + "product_hbase" + "p" + "=" + "." + TOK_TABLE_OR_COL + "a" + "access_log_hbase" + "." + TOK_TABLE_OR_COL + "p" + "prono“ Hive QL AST + TOK_INSERT + TOK_DESTINATION + TOK_TAB + TOK_TABNAME + "access_log_temp2" + TOK_SELECT + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "a" + "user" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "a" + "prono" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "p" + "maker" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "p" + "price" Hive实现原理 1 SQL 编译流程Hive实现原理 SQL AST INSERT OVERWRITE TABLE access_log_temp2 SELECT a.user, a.prono, p.maker, p.price FROM access_log_hbase a JOIN product_hbase p ON (a.prono = p.prono); TOK_QUERY + TOK_FROM + TOK_JOIN + TOK_TABREF + TOK_TABNAME + "access_log_hbase" + a + TOK_TABREF + TOK_TABNAME + "product_hbase" + "p" + "=" + "." + TOK_TABLE_OR_COL + "a" + “prono" + "." + TOK_TABLE_OR_COL + "p" + "prono“ + TOK_INSERT + TOK_DESTINATION + TOK_TAB + TOK_TABNAME + "access_log_temp2" + TOK_SELECT + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "a" + "user" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "a" + "prono" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "p" + "maker" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "p" + "price" SQL AST 1 编译流程Hive实现原理 SQL AST INSERT OVERWRITE TABLE access_log_temp2 SELECT a.user, a.prono, p.maker, p.price FROM access_log_hbase a JOIN product_hbase p ON (a.prono = p.prono); TOK_QUERY + TOK_FROM + TOK_JOIN + TOK_TABREF + TOK_TABNAME + "access_log_hbase" + a + TOK_TABREF + TOK_TABNAME + "product_hbase" + "p" + "=" + "." + TOK_TABLE_OR_COL + "a" + “prono" + "." + TOK_TABLE_OR_COL + "p" + "prono“ + TOK_INSERT + TOK_DESTINATION + TOK_TAB + TOK_TABNAME + "access_log_temp2" + TOK_SELECT + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "a" + "user" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "a" + "prono" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "p" + "maker" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "p" + "price" SQL AST 1 2 编译流程Hive实现原理 SQL AST INSERT OVERWRITE TABLE access_log_temp2 SELECT a.user, a.prono, p.maker, p.price FROM access_log_hbase a JOIN product_hbase p ON (a.prono = p.prono); TOK_QUERY + TOK_FROM + TOK_JOIN + TOK_TABREF + TOK_TABNAME + "access_log_hbase" + a + TOK_TABREF + TOK_TABNAME + "product_hbase" + "p" + "=" + "." + TOK_TABLE_OR_COL + "a" + “prono" + "." + TOK_TABLE_OR_COL + "p" + "prono“ + TOK_INSERT + TOK_DESTINATION + TOK_TAB + TOK_TABNAME + "access_log_temp2" + TOK_SELECT + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "a" + "user" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "a" + "prono" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "p" + "maker" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "p" + "price" SQL AST 1 2 3 Semantic Analyzer (1/3) + TOK_FROM + TOK_JOIN + TOK_TABREF + TOK_TABNAME + "access_log_hbase" + a + TOK_TABREF + TOK_TABNAME + "product_hbase" + "p" + "=" + "." + TOK_TABLE_OR_COL + "a" + “prono" + "." + TOK_TABLE_OR_COL + "p" + "prono“ QBAST ParseInfo Join Node + TOK_JOIN + TOK_TABREF … + TOK_TABREF … + “=” … Semantic Analyzer Logical Plan Gen. Logical Optimizer Physical Plan Gen. Physical OptimizerParser MetaData Alias To Table Info “a”=Table Info(“access_log_hbase”) “p”=Table Info(“product_hbase”) 1 Hive实现原理 1 Taobao Java Team | zhouchen.zm AST QB Semantic Analyzer (2/3) + TOK_DESTINATION + TOK_TAB + TOK_TABNAME + "access_log_temp2” AST QB ParseInfo Name To Destination Node + TOK_TAB + TOK_TABNAME +"access_log_temp2” Semantic Analyzer Logical Optimizer Physical Plan Gen. Physical OptimizerParser 2 AST QB Taobao Java Team | zhouchen.zm Logical Plan Gen. Hive实现原理 1 + TOK_SELECT + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "a" + "user" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "a" + "prono" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "p" + "maker" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "p" + "price" AST QB ParseInfo Name To Select Node + TOK_SELECT + TOK_SELEXPR … + TOK_SELEXPR … + TOK_SELEXPR … + TOK_SELEXPR …3 Semantic Analyzer (3/3) AST QB Hive实现原理 1 Semantic Analyzer Logical Optimizer Physical Plan Gen. Physical OptimizerParser Taobao Java Team | zhouchen.zm Logical Plan Gen. Logical Plan Generator (1/4) QB OP Tree TableScanOperator(“access_log_hbase”) TableScanOperator(“product_hbase”) MetaData Alias To Table Info “a”=Table Info(“access_log_hbase”) “p”=Table Info(“product_hbase”) Semantic Analyzer Logical Optimizer Physical Plan Gen. Physical OptimizerParser QB OP Tree Hive实现原理 1 Logical Plan Gen. Taobao Java Team | zhouchen.zm QB ParseInfo + TOK_JOIN + TOK_TABREF + TOK_TABNAME + "access_log_hbase" + a + TOK_TABREF + TOK_TABNAME + "product_hbase" + "p" + "=" + "." + TOK_TABLE_OR_COL + "a" + "access_log_hbase" + "." + TOK_TABLE_OR_COL + "p" + "prono“ OP Tree ReduceSinkOperator(“access_log_hbase”) ReduceSinkOperator(“product_hbase”) JoinOperator Semantic Analyzer Logical Optimizer Physical Plan Gen. Physical OptimizerParser Logical Plan Gen. Taobao Java Team | zhouchen.zm Logical Plan Generator (2/4) QB OP Tree Hive实现原理 1 Semantic Analyzer Logical Optimizer Physical Plan Gen. Physical OptimizerParser Logical Plan Gen. OP Tree SelectOperator QB ParseInfo Name To Select Node + TOK_SELECT + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "a" + "user" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "a" + "prono" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "p" + "maker" + TOK_SELEXPR + "." + TOK_TABLE_OR_COL + "p" + "price" Logical Plan Generator (3/4) QB OP Tree Hive实现原理 1 Taobao Java Team | zhouchen.zm OP Tree FileSinkOperator QB MetaData Name To Destination Table Info “insclause-0”= Table Info(“access_log_temp2”) Logical Plan Generator (4/4) QB OP Tree Hive实现原理 1 Semantic Analyzer Logical Optimizer Physical Plan Gen. Physical OptimizerParser Logical Plan Gen. Taobao Java Team | zhouchen.zm Logical Plan Generator (result) 29 LCF TableScanOperator TS_1 TableScanOperator TS_0 ReduceSinkOperator RS_2 ReduceSinkOperator RS_3 JoinOperator JOIN_4 SelectOperator SEL_5 FileSinkOperator FS_6 Semantic Analyzer Logical Plan Gen. Logical Optimizer Physical Plan Gen. Physical OptimizerParser Hive实现原理 1 Op Tree Logical Optimizer 说明 LineageGenerator 表与表的血缘关系生成器 ColumnPruner 列裁剪 Predicate PushDown 谓词下推, 将只与一张表有关的过滤操 作下推至TableScanOperator之后 PartitionPruner 分区裁剪 PartitionCondition Remover 在分区裁剪前, 将一些无关的条件谓 词去除 GroupByOptimizer Group By优化 SamplePruner 采样裁剪 说明 MapJoinProcessor 如果用户指定mapjoin,则将 ReduceSinkOperator转换成 MapSinkOperator BucketMapJoin Optimizer 采用分桶的Map Join, 扩大Map Join的适 用范围 SortedMergeBucket MapJoinOptimizer Sort Merge Join UnionProcessor 目前只在两个子查询都是map-only Task 时作个标记 JoinReorder /*+ STREAMTABLE(A) */ ReduceSink DeDuplication 如果两个reduce sink operator共享同一 个分区/排序列, 则需要对它们进行合并 Semantic Analyzer Logical Plan Gen. Logical Optimizer Physical Plan Gen. Physical OptimizerParser Taobao Java Team | zhouchen.zm Hive实现原理 1 Logical Optimizer (Predicate Push Down) INSERT OVERWRITE TABLE access_log_temp2 SELECT a.user, a.prono, p.maker, p.price FROM access_log_hbase a JOIN product_hbase p ON (a.prono = p.prono); Semantic Analyzer Logical Plan Gen. Logical Optimizer Physical Plan Gen. Physical OptimizerParser Taobao Java Team | zhouchen.zm Hive实现原理 1 Logical Optimizer (Predicate Push Down) INSERT OVERWRITE TABLE access_log_temp2 SELECT a.user, a.prono, p.maker, p.price FROM access_log_hbase a JOIN product_hbase p ON (a.prono = p.prono) WHERE p.maker = 'honda'; INSERT OVERWRITE TABLE access_log_temp2 SELECT a.user, a.prono, p.maker, p.price FROM access_log_hbase a JOIN product_hbase p ON (a.prono = p.prono); Semantic Analyzer Logical Plan Gen. Logical Optimizer Physical Plan Gen. Physical OptimizerParser Taobao Java Team | zhouchen.zm Hive实现原理 1 Logical Optimizer (Predicate Push Down) INSERT OVERWRITE TABLE access_log_temp2 SELECT a.user, a.prono, p.maker, p.price FROM access_log_hbase a JOIN product_hbase p ON (a.prono = p.prono) WHERE p.maker = 'honda'; INSERT OVERWRITE TABLE access_log_temp2 SELECT a.user, a.prono, p.maker, p.price FROM access_log_hbase a JOIN product_hbase p ON (a.prono = p.prono); TableScanOperator TS_1 TableScanOperator TS_0 ReduceSinkOperator RS_2 ReduceSinkOperator RS_3 JoinOperator JOIN_4 SelectOperator SEL_5 FileSinkOperator FS_6 Semantic Analyzer Logical Plan Gen. Logical Optimizer Physical Plan Gen. Physical OptimizerParser Taobao Java Team | zhouchen.zm Hive实现原理 1 Logical Optimizer (Predicate Push Down) INSERT OVERWRITE TABLE access_log_temp2 SELECT a.user, a.prono, p.maker, p.price FROM access_log_hbase a JOIN product_hbase p ON (a.prono = p.prono) WHERE p.maker = 'honda'; INSERT OVERWRITE TABLE access_log_temp2 SELECT a.user, a.prono, p.maker, p.price FROM access_log_hbase a JOIN product_hbase p ON (a.prono = p.prono); TableScanOperator TS_1 TableScanOperator TS_0 ReduceSinkOperator RS_2 ReduceSinkOperator RS_3 JoinOperator JOIN_4 SelectOperator SEL_5 FileSinkOperator FS_6 Semantic Analyzer Logical Plan Gen. Logical Optimizer Physical Plan Gen. Physical OptimizerParser Taobao Java Team | zhouchen.zm Hive实现原理 1 FilterOperator FIL_5 (_col8 = 'honda') Logical Optimizer (Predicate Push Down) INSERT OVERWRITE TABLE access_log_temp2 SELECT a.user, a.prono, p.maker, p.price FROM access_log_hbase a JOIN product_hbase p ON (a.prono = p.prono) WHERE p.maker = 'honda'; INSERT OVERWRITE TABLE access_log_temp2 SELECT a.user, a.prono, p.maker, p.price FROM access_log_hbase a JOIN product_hbase p ON (a.prono = p.prono); TableScanOperator TS_1 TableScanOperator TS_0 ReduceSinkOperator RS_2 ReduceSinkOperator RS_3 JoinOperator JOIN_4 SelectOperator SEL_5 FileSinkOperator FS_6 Semantic Analyzer Logical Plan Gen. Logical Optimizer Physical Plan Gen. Physical OptimizerParser Taobao Java Team | zhouchen.zm FilterOperator FIL_5 (_col8 = 'honda') FilterOperator FIL_8 (marker= 'honda') 36 MoveTask (Stage-0) Op Tree LoadTableDesc MapRedTask (Stage-1/root) TableScanOperator (TS_1) JoinOperator (JOIN_4) ReduceSinkOperator (RS_3) FileSinkOperator (FS_6) StatsTask (Stage-2) Semantic Analyzer Logical Plan Gen. Logical Optimizer Physical Plan Gen. Physical OptimizerParser TableScanOperator (TS_0) ReduceSinkOperator (RS_2) SelectOperator(SEL_5) Physical Plan Generator Hive实现原理 1 Op Tree Task Tree Taobao Java Team | zhouchen.zm MapRedTask (Stage-1/root) TableScanOperator (TS_1) JoinOperator (JOIN_4) ReduceSinkOperator (RS_3) TableScanOperator (TS_0) ReduceSinkOperator (RS_2) SelectOperator(SEL_5) Physical Plan Generator (result) MapRedTask (Stage-1/root) Mapper TableScanOperator TS_1 TableScanOperator TS_0 ReduceSinkOperator RS_2 ReduceSinkOperator RS_3 Reducer JoinOperator JOIN_4 SelectOperator SEL_5 FileSinkOperator FS_6 Physical Optimizer Semantic Analyzer Logical Plan Gen. Logical Optimizer Physical Plan Gen.Parser Taobao Java Team | zhouchen.zm Hive实现原理 1 Op Tree Task Tree Physical Optimizer java/org/apache/hadoop/hive/ql/optimizer/physical/目录下 说明 MapJoinResolver 处理MapJoin SkewJoinResolver 处理倾斜Join CommonJoinResolver 处理普通Join Semantic Analyzer Logical Plan Gen. Logical Optimizer Physical Plan Gen. Physical OptimizerParser Taobao Java Team | zhouchen.zm Task Tree Task Tree Physical Optimizer (MapJoinResolver) MapRedTask (Stage-1) Mapper TableScanOperator TS_1 TableScanOperator TS_0 MapJoinOperator MAPJOIN_7 SelectOperator SEL_5 FileSinkOperator FS_6 SelectOperator SEL_8 Task Tree Task Tree Semantic Analyzer Logical Plan Gen. Logical Optimizer Physical Plan Gen. Physical OptimizerParser Taobao Java Team | zhouchen.zm Physical Optimizer (MapJoinResolver) MapRedTask (Stage-1) Mapper TableScanOperator TS_1 MapJoinOperator MAPJOIN_7 SelectOperator SEL_5 FileSinkOperator FS_6 SelectOperator SEL_8 MapredLocalTask (Stage-7) TableScanOperator TS_0 HashTableSinkOperator HASHTABLESINK_11 MapRedTask (Stage-1) Mapper TableScanOperator TS_1 TableScanOperator TS_0 MapJoinOperator MAPJOIN_7 SelectOperator SEL_5 FileSinkOperator FS_6 SelectOperator SEL_8 Semantic Analyzer Logical Plan Gen. Logical Optimizer Physical Plan Gen. Physical OptimizerParser Hive实现原理 1 Taobao Java Team | zhouchen.zm Task Tree Task Tree 通过Explain观察Hive行为 hive> explain INSERT OVERWRITE TABLE access_log_temp2 > SELECT a.user, a.prono, p.maker, p.price > FROM access_log_hbase a JOIN product_hbase p ON (a.prono = p.prono); OK ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME access_log_hbase) a) (TOK_TABREF (TOK_TABNAME product_hbase) p) (= (. (TOK_TABLE_OR_COL a) prono) (. (TOK_TABLE_OR_COL p) prono)))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME access_log_temp2))) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) user)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) prono)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL p) maker)) (TOK_SELEXPR (. ( Hive实现原理 1 (TOK_TABLE_OR_COL p) price))))) STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 Stage-2 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: a TableScan alias: a Reduce Output Operator key expressions: expr: prono type: int sort order: + Map-reduce partition columns: expr: prono type: int tag: 0 value expressions: expr: user type: string expr: prono type: int p TableScan alias: p Reduce Output Operator key expressions: expr: prono type: int sort order: + Map-reduce partition columns: expr: prono type: int tag: 1 value expressions: expr: maker type: string expr: price type: int Reduce Operator Tree: Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {VALUE._col0} {VALUE._col2} 1 {VALUE._col1} {VALUE._col2} handleSkewJoin: false outputColumnNames: _col0, _col2, _col6, _col7 Select Operator expressions: expr: _col0 type: string expr: _col2 type: int expr: _col6 type: string expr: _col7 type: int outputColumnNames: _col0, _col1, _col2, _col3 File Output Operator compressed: false GlobalTableId: 1 table: input format: org.apache.hadoop.mapred.TextInputForma t output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKey TextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySim pleSerDe name: default.access_log_temp2 Stage: Stage-0 Move Operator tables: replace: true table: input format: org.apache.hadoop.mapred.TextInputForma t output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKey TextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySim pleSerDe name: default.access_log_temp2 Stage: Stage-2 Stats-Aggr Operator Time taken: 0.1 seconds hive> ABSTRACT SYNTAX TREE: STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 Stage-2 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-1 Map Reduce Map Operator Tree: TableScan Reduce Output Operator TableScan Reduce Output Operator Reduce Operator Tree: Join Operator Select Operator File Output Operator Stage: Stage-0 Move Operator Stage: Stage-2 Stats-Aggr Operator Hive实现原理 1 Taobao Java Team | zhouchen.zm hive> explain INSERT OVERWRITE TABLE access_log_temp2 > SELECT a.user, a.prono, p.maker, p.price > FROM access_log_hbase a JOIN product_hbase p ON (a.prono = p.prono); OK ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME access_log_hbase) a) (TOK_TABREF (TOK_TABNAME product_hbase) p) (= (. (TOK_TABLE_OR_COL a) prono) (. (TOK_TABLE_OR_COL p) prono)))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME access_log_temp2))) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) user)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) prono)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL p) maker)) (TOK_SELEXPR (. ( (TOK_TABLE_OR_COL p) price))))) STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 Stage-2 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: a TableScan alias: a Reduce Output Operator key expressions: expr: prono type: int sort order: + Map-reduce partition columns: expr: prono type: int tag: 0 value expressions: expr: user type: string expr: prono type: int p TableScan alias: p Reduce Output Operator key expressions: expr: prono type: int sort order: + Map-reduce partition columns: expr: prono type: int tag: 1 value expressions: expr: maker type: string expr: price type: int Reduce Operator Tree: Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {VALUE._col0} {VALUE._col2} 1 {VALUE._col1} {VALUE._col2} handleSkewJoin: false outputColumnNames: _col0, _col2, _col6, _col7 Select Operator expressions: expr: _col0 type: string expr: _col2 type: int expr: _col6 type: string expr: _col7 type: int outputColumnNames: _col0, _col1, _col2, _col3 File Output Operator compressed: false GlobalTableId: 1 table: input format: org.apache.hadoop.mapre d.TextInputFormat output format: org.apache.hadoop.hive.ql .io.HiveIgnoreKeyTextOutp utFormat serde: org.apache.hadoop.hive.se rde2.lazy.LazySimpleSerDe name: default.access_log_temp2 Stage: Stage-0 Move Operator tables: replace: true table: input format: org.apache.hadoop.mapre d.TextInputFormat output format: org.apache.hadoop.hive.ql .io.HiveIgnoreKeyTextOutp utFormat serde: org.apache.hadoop.hive.se rde2.lazy.LazySimpleSerDe name: default.access_log_temp2 Stage: Stage-2 Stats-Aggr Operator Time taken: 0.1 seconds hive> ABSTRACT SYNTAX TREE: STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 Stage-2 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-1 Map Reduce Map Operator Tree: TableScan Reduce Output Operator TableScan Reduce Output Operator Reduce Operator Tree: Join Operator Select Operator File Output Operator Stage: Stage-0 Move Operator Stage: Stage-2 Stats-Aggr Operator MapRedTask (Stage-1/root) Mapper TableScanOperator TS_1 TableScanOperator TS_0 ReduceSinkOperator RS_2 ReduceSinkOperator RS_3 Reducer JoinOperator JOIN_4 SelectOperator SEL_5 FileSinkOperator FS_6 = Move Task (Stage-0) Stats Task (Stage-2) Taobao Java Team | zhouchen.zm Hive实现原理 1 Join的优化 Hive实现原理 1 Taobao Java Team | zhouchen.zm Common Join Task A Mapper Mapper Table X Mapper … … Mapper Mapper… Mapper Reducer Table Y Shuffle Common Join Task Mapper Mapper MapJoin Task Mapper … … … … … … Previous Map Join Task A Task C … Big Table Data Record Record Record Record Record … … … Small Table Data Optimized Map Join Task A Task C Mapper Mapper … … Mapper … MapJoin Task Big Table Data Record Record Record Record … … MapReduce Local Task Small Table Data Small Table Data Small Table Data Distributed Cache HashTable Files Upload files to DC HashTable FilesHashTable Files Converting Common Join into Map Join Task A CommonJoinTask Task C Task A Conditional Task Task C MapJoinLocalTask CommonJoinTask. . . . . c a b Previous Execution Flow Optimized Execution Flow MapJoinTask MapJoinLocalTask MapJoinTask MapJoinLocalTask MapJoinTask Compile Time Task A Conditional Task Task C MapJoinLocalTask CommonJoinTask a MapJoinTask MapJoinLocalTask MapJoinTask SELECT * FROM SRC1 x JOIN SRC2 y ON x.key = y.key; Assume TABLE x is the big table Assume TABLE y is the big table Execution Time Task A Conditional Task Task C MapJoinLocalTask CommonJoinTask a MapJoinTask Table X is the big table Both tables are too big for map join SELECT * FROM SRC1 x JOIN SRC2 y ON x.key = y.key; Backup Task Task A Conditional Task Task C MapJoin LocalTask CommonJoinTask MapJoinTask Run as a Backup Task Memory Bound Performance Bottleneck • Distributed Cache is the potential performance bottleneck – Large hashtable file will slow down the propagation of Distributed Cache – Mappers are waiting for the hashtables file from Distributed Cache • Compress and archive all the hashtable file into a tar file. Hive实现原理 1 Taobao Java Team | zhouchen.zm Compress and Archive Task A Task C a b Mapper Mapper … … Small Table Data MapJoin Task Big Table Data Record Record Record Record … … Mapper … MapReduce Local Task Distributed Cache HashTable Files Compressed & Archived Small Table Data Small Table Data HashTable FilesHashTable Files Performance Evaluation Small Table Big Table Join Condition Average Join Execution Time Without Compression Average Join Execution Time With Compression Performance Improvement 75 K rows; 383K file size 130 M rows; 3.5G file size; 1 join key, 2 join value 106 sec 73 sec + 45% 500 K rows; 2.6M file size 130 M rows; 3.5G file size 1 join key, 2 join value 129 sec 106 sec +21 % 75 K rows; 383K file size 16.7 B rows; 459 G file size 1 join key, 2 join value 441 sec 326 sec + 35 % 500 K rows; 2.6M file size 16.7 B rows; 459 G file size 1 join key, 2 join value 326 sec 251 sec +30 % 1M rows; 10M file size 16.7 B rows; 459 G file size 1 join key, 3 join value 495 sec 266sec +86 % 1M rows; 10M file size 16.7 B rows; 459 G file size 2 join key, 2 join value 425 sec 255 sec +67% Hive实现原理 1 Taobao Java Team | zhouchen.zm Performance Evaluation Small Table Big Table Join Condition Previous Common Join Optimized Common Join Performance Improvement 75 K rows; 383K file size 130 M rows; 3.5G file size; 1 join key, 2 join value 169 sec 79 sec + 114% 500 K rows; 2.6M file size 130 M rows; 3.5G file size 1 join key, 2 join value 246 sec 144 sec +71 % 75 K rows; 383K file size 16.7 B rows; 459 G file size 1 join key, 2 join value 511 sec 325 sec + 57 % 500 K rows; 2.6M file size 16.7 B rows; 459 G file size 1 join key, 2 join value 502 sec 305 sec +64 % 1M rows; 10M file size 16.7 B rows; 459 G file size 1 join key, 3 join value 653 sec 248 sec +163 % 1M rows; 10M file size 16.7 B rows; 459 G file size 2 join key, 2 join value 1117sec 536 sec +108% Hive实现原理 1 Left Semi Join • 实现 IN/EXISTS 子查询 SELECT A.* FROM A WHERE A.KEY IN (SELECT B.KEY FROM B WHERE B.VALUE > 100); 等同于: SELECT A.* FROM A LEFT SEMI JOIN B ON (A.KEY = B.KEY and B.VALUE > 100); • 优化 • map端group by,用来减少流入 reducer端的数据量 • Join一旦匹配, 立即退出 Hive实现原理 1 Taobao Java Team | zhouchen.zm Bucket Map Join set hive.optimize.bucketmapjoin = true; 1.和map join一起工作 2.所有要join的表都必须做了分桶(bucket) , 大表的桶个 数是小表桶个数的整数倍. 3.做了bucket的列必须等于join的列 Taobao Java Team | zhouchen.zm Hive实现原理 1 Bucket Map Join实现 SELECT /*+MAPJOIN(a,c)*/ a.*, b.*, c.* a join b on a.key = b.key join c on a.key=c.key; Table b Table a Table c Mapper 1 Bucket b1 Bucket a1 Bucket a2 Bucket c1 Mapper 2 Bucket b1 Mapper 3 Bucket b2 a1 c1 a1 c1 a2 c1 Normally in production, there will be thousands of buckets! Table a,b,c all bucketized by ‘key’ a has 2 buckets, b has 2, and c has 1 1. Spawn mapper based on the big table 2. Only matching buckets of all small tables are replicated onto each mapper Hive实现原理 1 Join时数据倾斜,造成Reduce端OOM set hive.optimize.skewjoin = true; set hive.skewjoin.key = 阀值; Hive实现原理 1 Taobao Java Team | zhouchen.zm Skew Join Skew JoinReducer 1 Reducer 2 a-K 3 b-K 3 a-K 3 b-K 3 a-K 2 b-K 2 a-K 2 b-K 2 a-K 1 b-K 1Table A Table B A join B Write to HDFS HDFS File a-K1 HDFS File b-K1 Map join a-k1 map join b-k1 Job 1 Job 2 Final results Hive实现原理 1 Taobao Java Team | zhouchen.zm 常用链接 Hive实现原理 1 • Hive官网 http://hive.apache.org • Wiki https://cwiki.apache.org/confluence/display/Hive/Home • JIRA https://issues.apache.org/jira/browse/HIVE • SVN http://svn.apache.org/repos/asf/hive/ Taobao Java Team | zhouchen.zm Q & A Hive实现原理 1 作者:周忱 | 淘宝综合业务 微博:@MinZhou 邮箱:zhouchen.zm@taobao.com Taobao Java Team | zhouchen.zm
还剩61页未读

继续阅读

下载pdf到电脑,查找使用更方便

pdf的实际排版效果,会与网站的显示效果略有不同!!

需要 10 金币 [ 分享pdf获得金币 ] 1 人已下载

下载pdf

pdf贡献者

jackyyuq

贡献于2014-05-10

下载需要 10 金币 [金币充值 ]
亲,您也可以通过 分享原创pdf 来获得金币奖励!
下载pdf