• 1. Hive分享 松坡
  • 2. 看着这么上面强大的功能,那Hive到底是什么呢? 其实HIVE就是一个SQL解析引擎,它将SQL语句转译成M/R JOB然后在Hadoop执行,来达到快速开发的目的。拨开HIVE的神秘面纱之后来看它的表其实就是一个Hadoop的目录/文件(HIVE默认表存放路径一般都是在你工作目录的hive目录里面),按表名做文件夹分开,如果你有分区表的话,分区值是子文件夹,可以直接在其它的M/R job里直接应用这部分数据。Hive到底是什么?
  • 3. 为超大数据集设计的计算/扩展能力 based on Hadoop 支持SQL like查询语言 统一的元数据管理为什么选择hive?
  • 4. 简单select word, count(*) from ( select explode(split(sentence. ' ')) word from article ) t group by word为什么选择hive?
  • 5. Client端应用程序 元数据 编程接口Architect
  • 6. Database Table Partition File数据模型
  • 7. Primitive int / bigint / smallint / tinyint boolean double / float string Array Map Struct 没有精度/长度设定 int 4 没有 没有date / datetime 类型数据类型
  • 8. CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name (col_name data_type, ...) [PARTITIONED BY (col_name data_type, ...)] [ [ROW FORMAT row_format] [STORED AS file_format] ] [LOCATION hdfs_path] DDL
  • 9. CREATE TABLE tmp_table #表名 ( title string, # 字段名称 字段类型 minimum_bid double, quantity bigint, have_invoice bigint )COMMENT '注释:XXX' #表注释 PARTITIONED BY(pt STRING) #分区表字段(如果你文件非常之大的话,采用分区表可以快过滤出按分区字段划分的数据) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' # 字段是用什么分割开的 STORED AS SEQUENCEFILE; #用哪种方式存储数据,SEQUENCEFILE是hadoop自带的文件压缩格式一些相关命令 SHOW TABLES; # 查看所有的表 SHOW TABLES '*TMP*'; #支持模糊查询 SHOW PARTITIONS TABLE; #查看表有哪些分区 DESCRIBE TABLE; #查看表结构
  • 10. DDLCTAS CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name (col_name data_type, ...) … AS SELECT …
  • 11. CREATE EXTERNAL TABLE page_view ( viewTime INT, userid BIGINT, page_url STRING, referrer_url STRING, ip STRING COMMENT 'IP Address of the User', country STRING COMMENT 'country of origination‘ ) COMMENT 'This is the staging page view table' ROW FORMAT DELIMITED FIELDS TERMINATED BY '44' LINES TERMINATED BY '12' STORED AS TEXTFILE LOCATION '/user/data/staging/page_view';例子
  • 12. DMLInsert INSERT OVERWRITE TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement FROM from_statement Multiple insert FROM from_statement INSERT OVERWRITE TABLE tablename1 [PARTITION...)] select_statement1 [INSERT OVERWRITE TABLE tablename2 [PARTITION ...] select_statement2] ... Dynamic partitioning INSERT OVERWRITE TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement (HDFS)不支持UPDATE!
  • 13. DML/DDLAdd Partitions ALTER TABLE table_name ADD partition_spec [ LOCATION 'location1' ] partition_spec [ LOCATION 'location2' ] ... partition_spec: : PARTITION (partition_col = partition_col_value, partition_col = partiton_col_value, ...) 用户可以用 ALTER TABLE ADD PARTITION 来向一个表中增加分区。当分区名是字符串时加引号。 ALTER TABLE page_view ADD PARTITION (dt='2008-08-08', country='us') location '/path/to/us/part080808' PARTITION (dt='2008-08-09', country='us') location '/path/to/us/part080809';
  • 14. Queryselect SELECT [ALL | DISTINCT] select_expr, select_expr, ... FROM table_reference [WHERE where_condition] [GROUP BY col_list] [ CLUSTER BY col_list | [DISTRIBUTE BY col_list] [SORT BY col_list] | [ORDER BY col_list] ] [LIMIT number]
  • 15. Loading files into table 当数据被加载至表中时,不会对数据进行任何转换。Load 操作只是将数据复制/移动至 Hive 表对应的位置。 Syntax: LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)] //就是普通的insert 只不过数据来源是通过inpath路径找到, insert之前保证表已经建立完, 并且格式一致(换行,分隔符等)
  • 16. QueryJoin join_table: table_reference JOIN table_factor [join_condition] | table_reference [{LEFT|RIGHT|FULL} OUTER | LEFT SEMI] JOIN table_reference join_condition table_reference: table_factor | join_table table_factor: tbl_name [alias] | table_subquery alias | ( table_references ) join_condition: ON equality_expression ( AND equality_expression )* equality_expression: expression = expression 等值Join 合并Join的原则 NULL值处理
  • 17. QuerySubqueries SELECT ... FROM (subquery) name ... 不支持exist in子查询 select_statement UNION ALL select_statement UNION ALL select_statement ...
  • 18. Lateral viewCREATE VIEW [IF NOT EXISTS] view_name [ (column_name...) ] [TBLPROPERTIES (property_name = property_value, ...)] AS SELECT ...
  • 19. WHERE Clause where condition 是一个布尔表达式。例如,下面的查询语句只返回销售记录大于 10,且归属地属于美国的销售代表。Hive 不支持在WHERE 子句中的 IN,EXIST 或子查询。 SELECT * FROM sales WHERE amount > 10 AND region = "US" HAVING Clause Hive 现在不支持 HAVING 子句。可以将 HAVING 子句转化为一个字查询,例如: SELECT col1 FROM t1 GROUP BY col1 HAVING SUM(col2) > 10 可以用以下查询来表达: SELECT col1 FROM (SELECT col1, SUM(col2) AS col2sum FROM t1 GROUP BY col1) t2 WHERE t2.col2sum > 10 ALL and DISTINCT Clauses 支持
  • 20. 基于Partition的查询 ***** 一般 SELECT 查询会扫描整个表(除非是为了抽样查询)。但是如果一个表使用 PARTITIONED BY 子句建表,查询就可以利用分区剪枝(input pruning)的特性,只扫描一个表中它关心的那一部分。Hive 当前的实现是,只有分区断言出现在离 FROM 子句最近的那个WHERE 子句中,才会启用分区剪枝。例如,如果 page_views 表使用 date 列分区,以下语句只会读取分区为‘2008-03-01’的数据。 SELECT page_views.* FROM page_views WHERE page_views.date >= '2008-03-01' AND page_views.date <= '2008-03-31' LIMIT Clause *** Limit 可以限制查询的记录数。查询的结果是随机选择的。下面的查询语句从 t1 表中随机查询5条记录: SELECT * FROM t1 LIMIT 5 Top k 查询 下面的查询语句查询销售记录最大的 5 个销售代表。 SET mapred.reduce.tasks = 1 SELECT * FROM sales SORT BY amount DESC LIMIT 5
  • 21. Hive 只支持等值连接(equality joins)、外连接(outer joins)和(left semi joins???)。Hive 不支持所有非等值的连接,因为非等值连接非常难转化到 map/reduce 任务。另外,Hive 支持多于 2 个表的连接。 例: SELECT a.* FROM a JOIN b ON (a.id = b.id) SELECT a.* FROM a JOIN b ON (a.id = b.id AND a.department = b.department) 如果join中多个表的 join key 是同一个,则 join 会被转化为单个 map/reduce 任务, 例如: SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1) 被转化为单个 map/reduce 任务,因为 join 中只使用了 b.key1 作为 join key。 JOIN
  • 22. join 时,每次 map/reduce 任务的逻辑是这样的:reducer 会缓存 join 序列中除了最后一个表的所有表的记录,再通过最后一个表将结果序列化到文件系统。这一实现有助于在 reduce 端减少内存的使用量。实践中,应该把最大的那个表(驱动表)写在最后(否则会因为缓存浪费大量内存)。 例如: SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1) 所有表都使用同一个 join key(使用 1 次 map/reduce 任务计算)。Reduce 端会缓存 a 表和 b 表的记录,然后每次取得一个 c 表的记录就计算一次 join 结果,类似的还有: SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2) 这里用了 2 次 map/reduce 任务。第一次缓存 a 表,用 b 表序列化;第二次缓存第一次 map/reduce 任务的结果,然后用 c 表序列化。
  • 23. Join 发生在 WHERE 子句之前。如果你想限制 join 的输出,应该在 WHERE 子句中写过滤条件——或是在 join 子句中写。 这里面一个容易混淆的问题是表分区的情况: SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key) WHERE a.ds='2009-07-07' AND b.ds='2009-07-07' 解决办法: SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key AND b.ds='2009-07-07' AND a.ds='2009-07-07')
  • 24. Merge 例子:
  • 25. Full or Left >>>>>>>>>>>>>>>需要解释一下 Merge是选择Full Outer Join还是Left Outer Join? 这需要开发者对更新的数据比较了解. 通常, 要更新的数据若只有对已有数据的部分更新, 则应选用Left Outer Join; 而当要更新的数据有新数据需要增加时, 则应选用Full Outer Join.Join之后的中间结果o.Id n.id n.value 2 NULL null 4 NULL null NULL null 4 4 NULL null 2 2 NULL null 5 6 2 join 3 4
  • 26. #set($name = "jushi") // 初始变量 select * from users where username = '$name'; // = select * from users where username = 'jushi' 环境变量 在示例中有$env.开头的变量, 这些由HiveQL的运行环境初始的, 可直接使用. 目前提供的默认提供环境变量有: $env.home – hivelet(使用HiveQL编写的后缀为.sql的文件)所在路径上下文环境 $env.date – hivelet运行日期变量, 格式为yyyymmdd $env.date_std – hivelet运行日期变量, 格式为yyyy-mm-dd 环境变量可以在HiveQL的运行参数中增加. // 用define来简化子查询, 以避免复杂的嵌套, 让代码更易读 #define($daily_log)select * from web_log where logdate=$date#end select count(1) from bmw_user u join ($daily_log) l on u.user_id=l.uid; 变量
  • 27. // 用macro消除重复, 让代码更易维护 #macro(isLastDay $gmt_col)to_date($gmt_col) = date_sub('$env.date_std', 1)#end select * from tc_biz_order where #isLastDay("gmt_create"); select * from tc_pay_order where #isLastDay("gmt_create"); 跨文件宏复用 comm.mcr ## 判断是否是前一天 #macro(isLastDay $gmt_col)to_date($gmt_col) = date_sub('$env.date_std', 1)#end ## 判断是否是前三天 #macro(isLastThreeDay $gmt_col)to_date($gmt_col) >= date_sub('$env.date_stb', 3) and to_date($gmt_col) < '$env.date_stb'#end example.sql #parse("comm.mcr") select * from tc_biz_order where #isLastDay("gmt_create"); select * from tc_biz_order where #isLastThreeDay ("gmt_modified"); 宏
  • 28. UDF(User-Defined-Function) 用户可以自定义函数对数据进行处理 如下定义: add jar build/ql/test/test-udfs.jar; CREATE TEMPORARY FUNCTION testlength AS ‘org.apache.hadoop.hive.ql.udf.UDFTestLength’; SELECT testlength(src.value) FROM src; //使用 DROP TEMPORARY FUNCTION testlength; //删除 函数 UDFTestLength.java 为: package org.apache.hadoop.hive.ql.udf; public class UDFTestLength extends UDF { public Integer evaluate(String s) { if (s == null) { return null; } return s.length(); } }
  • 29. Transform ???用户可以自定义 Hive 使用的 Map/Reduce 脚本,比如: FROM ( SELECT TRANSFORM(user_id, page_url, unix_time) USING 'page_url_to_id.py' AS (user_id, page_id, unix_time) FROM mylog DISTRIBUTE BY user_id SORT BY user_id, unix_time) mylog2 SELECT TRANSFORM(user_id, page_id, unix_time) USING 'my_python_session_cutter.py' AS (user_id, session_info);
  • 30. 更好的执行计划数据倾斜 什么是数据倾斜? 数据分布不均匀 倾斜的原因? group by/distinct join
  • 31. Join数据倾斜 Map Join 限制 内存 语义 代价 用法 select /*+ MAPJOIN(tb_alias)*/ Bucketed map join Sort merge bucketed map join 更好的执行计划
  • 32. GroupBy数据倾斜 skewindata优化 用法 set hive.groupby.skewindata=true更好的执行计划
  • 33. 执行优化内存优化 驱动表 使用大表做驱动表,避免内存溢出 Join中最右边的表是驱动表 MapJoin无视Join顺序,使用大表做驱动表 STREAMTABLE
  • 34. I/O优化 Map aggregation 相关参数 hive.map.aggr hive.groupby.mapaggr.checkinterval (100000) hive.map.aggr.hash.percentmemory(0.5) hive.map.aggr.hash.min.reduction(0.5) 注意控制内存执行优化
  • 35. I/O优化 合并小文件 减少后续任务的map数 代价:额外的MR过程 参数: hive.merge.mapfiles hive.merge.mapredfiles hive.merge.size.per.task hive.merge.size.smallfiles.avgsize执行优化
  • 36. hive开发规范(补充) SQL中顶部的注释: #**     [Subject    :  http://confluence.taobao.ali.com:8080/pages/viewpage.action?pageId=128287162\]     [Author       : youliang@taobao.com]     [Created        : 2010-3-11]     [DependsOn    :  ducheng/public/web.sql]     [SmokeEnv    : home=dw_hive/hivelets/developing, date=20100328] *#建表命名规范: 源表用s_开头 临时表用t_开头 结果表用r_开头 跨sql的临时表用m_开头 如:r_youliang_usertag_base 上面这个表明就是一个结果表,花名是youliang 模块名是usertag 然后表的含义是base表示基础表
  • 37. THANK YOU!