/数据仓库/hive-极致优化-解释计划类别
1. 解释计划计划种类
在这里我们会讨论下面几种类型的解释计划
- select-from-where
- select-function(col)-from和select-from-where-function(col)
- select-aggr_function()-from-group by
- select-window_function-from
1.1 基础前置准备
了解MapReduce的基础流程 从上面我们可以分为俩个层面来看上图
1.2 select-from-where 型
这种是最简单的类型,从下面第8行我们知道只有一个Map操作,为什么只有map? 结合1.1 基础前置准备的知识,我们了解,原因有俩个
- Hadoop在进行计算,会将计算逻辑发送到数据所在的机器上
- 只是涉及到数据查询(select)和过滤(where)操作,且不需要对其他服务器的数据进行处理,每个Map处理自己所分配的数据,在Map输出端即可, 当然也就不需要在经过shuffle在传到Reduce在进行输出
explainselect * from odata.stock_asset_holding dtwhere dt.busi_date='2017-03-17' and dt.trade_id='11592'; 1 STAGE DEPENDENCIES:2 Stage-1 is a root stage3 Stage-0 depends on stages: Stage-14 5 STAGE PLANS:6 Stage: Stage-17 Map Reduce8 Map Operator Tree:9 TableScan10 alias: dt11 filterExpr: ((busi_date = '2017-03-17') and (trade_id = '11592')) (type: boolean)12 Statistics: Num rows: 455199 Data size: 11644188 Basic stats: COMPLETE Column stats: NONE13 Filter Operator14 predicate: (trade_id = '11592') (type: boolean)15 Statistics: Num rows: 227599 Data size: 5822081 Basic stats: COMPLETE Column stats: NONE16 Select Operator17 expressions: '11592' (type: string), secu_acc_id (type: string), prd_no (type: string), sys_code (type: string), qty (type: bigint), prsnt_qty (type: bigint), asset_amend_qty (type: bigint), mkt_val (type: double), last_price (type: double), undivi_last_price (type: double), scale_factor (type: double), '2017-03-17' (type: string)18 outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col1119 Statistics: Num rows: 227599 Data size: 5822081 Basic stats: COMPLETE Column stats: NONE20 File Output Operator21 compressed: false22 Statistics: Num rows: 227599 Data size: 5822081 Basic stats: COMPLETE Column stats: NONE23 table:24 input format: org.apache.hadoop.mapred.TextInputFormat25 output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat26 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
上面程序执行的日志打印
INFO : Completed compiling command(queryId=hive_20180731210404_3a460178-9700-4254-8827-afe0faab7352); Time taken: 0.093 secondsINFO : Executing command(queryId=hive_20180731210404_3a460178-9700-4254-8827-afe0faab7352): select * from odata.stock_asset_holding dtwhere dt.busi_date='2017-03-17' and dt.trade_id='11592'INFO : Query ID = hive_20180731210404_3a460178-9700-4254-8827-afe0faab7352INFO : Total jobs = 1INFO : Launching Job 1 out of 1INFO : Starting task [Stage-1:MAPRED] in serial modeINFO : Number of reduce tasks is set to 0 since there's no reduce operatorINFO : number of splits:3INFO : Submitting tokens for job: job_1533027632802_0015INFO : The url to track the job: http://bigdata-03:8088/proxy/application_1533027632802_0015/INFO : Starting Job = job_1533027632802_0015, Tracking URL = http://bigdata-03:8088/proxy/application_1533027632802_0015/INFO : Kill Command = /opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/hadoop/bin/hadoop job -kill job_1533027632802_0015INFO : Hadoop job information for Stage-1: number of mappers: 3; number of reducers: 0INFO : 2018-07-31 21:04:46,415 Stage-1 map = 0%, reduce = 0%INFO : 2018-07-31 21:04:54,822 Stage-1 map = 33%, reduce = 0%, Cumulative CPU 3.6 secINFO : 2018-07-31 21:04:56,901 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 15.27 secINFO : MapReduce Total cumulative CPU time: 15 seconds 270 msecINFO : Ended Job = job_1533027632802_0015INFO : MapReduce Jobs Launched: INFO : Stage-Stage-1: Map: 3 Cumulative CPU: 15.27 sec HDFS Read: 11926505 HDFS Write: 186 SUCCESSINFO : Total MapReduce CPU Time Spent: 15 seconds 270 msecINFO : Completed executing command(queryId=hive_20180731210404_3a460178-9700-4254-8827-afe0faab7352); Time taken: 19.055 secondsINFO : OK
仔细观察上图Map=100%后,作业立马就结束了,说明只有Map阶段操作,和解释计划里面描述的是一致的。 扩展:有用过sqoop的同学,如果仔细观察job的信息会看到类似上面的日志打印,Map=100%后,作业就结束了。为什么?Sqoop从关系数据库中查询速度导入的HDFS,其实 也就是启动Map Operator,链接jdbc,查询数据库,将数据写入到hdfs。整个过程也不需要对数据进行分组,排序,等聚会性操作,当然也就不需要进行shuffle和reduce。
1.3 select-function(col)-from 和 select-from-where-function(col) 型
这边的function,我们特指聚合函数和窗口函数之外的函数,eg:concat,cast,nvl,case when etc。下面我们看俩例子
--例子1explainselect concat('11592', prd_no) , CASE WHEN ((prd_no <> 0.0)) THEN (prd_no) ELSE (-1) END from adatatest.stock_cust_return_by_prd_testwhere busi_date='2017-03-16' 1 STAGE DEPENDENCIES:2 Stage-1 is a root stage3 Stage-0 depends on stages: Stage-14 5 STAGE PLANS:6 Stage: Stage-17 Map Reduce8 Map Operator Tree:9 TableScan10 alias: stock_cust_return_by_prd_test11 filterExpr: (busi_date = '2017-03-16') (type: boolean)12 Statistics: Num rows: 27732 Data size: 2773224 Basic stats: COMPLETE Column stats: NONE13 Select Operator14 expressions: concat('11592', prd_no) (type: string), CASE WHEN ((prd_no <> 0.0)) THEN (prd_no) ELSE (-1) END (type: string)15 outputColumnNames: _col0, _col116 Statistics: Num rows: 27732 Data size: 2773224 Basic stats: COMPLETE Column stats: NONE17 File Output Operator18 compressed: false19 Statistics: Num rows: 27732 Data size: 2773224 Basic stats: COMPLETE Column stats: NONE20 table:21 input format: org.apache.hadoop.mapred.TextInputFormat22 output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat23 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
从上面的plan我们可以看到除了,expressions和1.2的解释计划不一样外,其他的完全一样也就是说在select 字段,里面添加function不会影响解释计划的运行。
--例子2explainselect concat('11592', prd_no) , CASE WHEN ((prd_no <> 0.0)) THEN (prd_no) ELSE (-1) END from adatatest.stock_cust_return_by_prd_testwhere busi_date='2017-03-16' and substring(trade_id,0,3)='115' STAGE DEPENDENCIES:2 Stage-1 is a root stage3 Stage-0 depends on stages: Stage-14 5 STAGE PLANS:6 Stage: Stage-17 Map Reduce8 Map Operator Tree:9 TableScan10 alias: stock_cust_return_by_prd_test11 filterExpr: ((busi_date = '2017-03-16') and (substring(trade_id, 0, 3) = '115')) (type: boolean)12 Statistics: Num rows: 13866 Data size: 2773224 Basic stats: COMPLETE Column stats: NONE13 Filter Operator14 predicate: (substring(trade_id, 0, 3) = '115') (type: boolean)15 Statistics: Num rows: 6933 Data size: 1386612 Basic stats: COMPLETE Column stats: NONE16 Select Operator17 expressions: concat('11592', prd_no) (type: string), CASE WHEN ((prd_no <> 0.0)) THEN (prd_no) ELSE (-1) END (type: string)18 outputColumnNames: _col0, _col119 Statistics: Num rows: 6933 Data size: 1386612 Basic stats: COMPLETE Column stats: NONE20 File Output Operator21 compressed: false22 Statistics: Num rows: 6933 Data size: 1386612 Basic stats: COMPLETE Column stats: NONE23 table:24 input format: org.apache.hadoop.mapred.TextInputFormat25 output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat26 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
结合1.2的例子和上面俩个例子,不管是查询字段是否有函数,还是where 限制条件是否有函数,整个查询计划树都和select ... from ... 几乎一致, 这点和关系型数据库是有很大的不一样。在关系型数据库的使用中,会别贴上一条规则,尽量不要在where条件中套用函数,会导致用不上索引。其实这句话的跟确切的表达 就是在where条件中套用函数,会导致解释计划改变。但是从上面看hive中不会,而且我们也极力推荐能够在MapOpertor中处理的,就就尽量在MapOpertor中处理,不要拖到后面的流程中 所以我们推荐在where,或者select语句内套用函数来对数据进行处理和过滤,这一点也是hive优化很重要一点。至于为什么要这样做,后面再讲解详细讲解MapReduce模型时会提到
1.4 select-aggr_function()-from-group by
聚合函数,涉及到对数据聚合操作,不是简单将数据查找出来,对单条数据过滤或者格式转换,而是要对多行的数据进行汇总,这里的汇总必然会需要摄取分散在整个分布式存储系统(eg:hdfs,hbase .etc) 汇总到单个节点上。简单点说汇总涉及到数据夸机器传输,必然会经过本地Map操作,输出到磁盘shuffle,在到reduce节点。比如年段长现在接到命令要统计一个年级男女学生各自现在上课人数(count操作), 但是有的班级在操,场有的班级在不同的房间楼层。让年段长一个个班级去跑和统计明显效率低,工作难度大,怎么做? 每个班(Map节点)先自行统计,统计完后通过电话,网络告知年段长(reduce节点),由于要统计男女学生各自上课人数,这告知内容,格式应该是男生:XX人,女生XX人。 也就是传到年段长这,key是男生/女生,Value则是个数。希望通过这个例子能让大家简短了解下 从大概念来说,相比于1.1-1.3的例子,这里会涉及到网络传输,传输到远程节点数据的存取。对这俩方面的理解和优化,则很大程度决定了这个hive程序的性能高低。 这里我们简单对解释计划进行一个解读,后面会结合yarn的日志在来对这类型的优化点进行深入剖析。看下面这个例子
explainselect trade_id,count(prd_no)from adatatest.stock_cust_return_by_prd_testwhere busi_date='2017-03-16' group by trade_id 1 STAGE DEPENDENCIES:2 Stage-1 is a root stage3 Stage-0 depends on stages: Stage-14 5 STAGE PLANS:6 Stage: Stage-17 Map Reduce8 Map Operator Tree:9 TableScan10 alias: stock_cust_return_by_prd_test11 filterExpr: (busi_date = '2017-03-16') (type: boolean)12 Statistics: Num rows: 13866 Data size: 2773224 Basic stats: COMPLETE Column stats: NONE13 Select Operator14 expressions: prd_no (type: string), trade_id (type: string)15 outputColumnNames: prd_no, trade_id16 Statistics: Num rows: 13866 Data size: 2773224 Basic stats: COMPLETE Column stats: NONE17 Group By Operator18 aggregations: count(prd_no)19 keys: trade_id (type: string)20 mode: hash21 outputColumnNames: _col0, _col122 Statistics: Num rows: 13866 Data size: 2773224 Basic stats: COMPLETE Column stats: NONE23 Reduce Output Operator24 key expressions: _col0 (type: string)25 sort order: +26 Map-reduce partition columns: _col0 (type: string)27 Statistics: Num rows: 13866 Data size: 2773224 Basic stats: COMPLETE Column stats: NONE28 value expressions: _col1 (type: bigint)29 Execution mode: vectorized30 Reduce Operator Tree:31 Group By Operator32 aggregations: count(VALUE._col0)33 keys: KEY._col0 (type: string)34 mode: mergepartial35 outputColumnNames: _col0, _col136 Statistics: Num rows: 6933 Data size: 1386612 Basic stats: COMPLETE Column stats: NONE37 File Output Operator38 compressed: false39 Statistics: Num rows: 6933 Data size: 1386612 Basic stats: COMPLETE Column stats: NONE40 table:41 input format: org.apache.hadoop.mapred.TextInputFormat42 output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
从上面我们看到多比之前的章节多了俩部分
- Map Operator Tree->TableScan->Select Operator->Group By Operator,其中
- aggregations:表示运算的操作,就是对应语句中select count(prd_no)中的count
- keys,表示Map端的key值输出,就是对应group by 分组字段 中的字段,就是上面trade_id
- outputColumnNames,表示输出列名,这里用俩个占位符_col0, _col1。个数和select 字段列相同
- Reduce Output Operator,这个不是表示已经启动Reduce,这边的Reduce操作,仅仅也就是字面意思,对缩小数据,怎么缩小呢,按 aggregations中的表达式来缩小,最终输出一列 _col1 类型为bigint
- Map-reduce partition columns:表示最终输出按什么列进行分区
- Reduce Operator Tree 这就表示真的表示reduce节点所做的事情
- aggregations:这里要注意一点现在count,是count(VALUE._col0),和Map Operator Tree里面的count(prd_no)有区别, 这个取count是对VALUE的第一列,也就是Map Operator Tree..->Group By Operator 中_col1
1.5 select-window_function-from
理解这种类型的解释计划如果没有编写过一定MapReduce的程序,理解起来有一定的难度,但是如果能够理解这种解释计划,也会对自己写MapReduce或者对编写其他的分布式计算算子是有很大的启发意义 好了,让我们看下这个下面这个例子:
explainselect prd_no,row_number() over(partition by trade_id order by mkt_val ) sfrom odata.stock_asset_holdingwhere busi_date='2017-03-16' STAGE DEPENDENCIES:2 Stage-1 is a root stage3 Stage-0 depends on stages: Stage-14 5 STAGE PLANS:6 Stage: Stage-17 Map Reduce8 Map Operator Tree:9 TableScan10 alias: stock_asset_holding11 filterExpr: (busi_date = '2017-03-16') (type: boolean)12 Statistics: Num rows: 37244 Data size: 7746942 Basic stats: COMPLETE Column stats: NONE13 Reduce Output Operator14 key expressions: trade_id (type: string), mkt_val (type: double)15 sort order: ++16 Map-reduce partition columns: trade_id (type: string)17 Statistics: Num rows: 37244 Data size: 7746942 Basic stats: COMPLETE Column stats: NONE18 value expressions: prd_no (type: string)19 Execution mode: vectorized20 Reduce Operator Tree:21 Select Operator22 expressions: KEY.reducesinkkey0 (type: string), VALUE._col1 (type: string), KEY.reducesinkkey1 (type: double)23 outputColumnNames: _col0, _col2, _col724 Statistics: Num rows: 37244 Data size: 7746942 Basic stats: COMPLETE Column stats: NONE25 PTF Operator26 Function definitions:27 Input definition28 input alias: ptf_029 output shape: _col0: string, _col2: string, _col7: double30 type: WINDOWING31 Windowing table definition32 input alias: ptf_133 name: windowingtablefunction34 order by: _col735 partition by: _col036 raw input shape:37 window functions:38 window function definition39 alias: _wcol040 name: row_number41 window function: GenericUDAFRowNumberEvaluator42 window frame: PRECEDING(MAX)~FOLLOWING(MAX)43 isPivotResult: true44 Statistics: Num rows: 37244 Data size: 7746942 Basic stats: COMPLETE Column stats: NONE45 Select Operator46 expressions: _col2 (type: string), _wcol0 (type: int)47 outputColumnNames: _col0, _col148 Statistics: Num rows: 37244 Data size: 7746942 Basic stats: COMPLETE Column stats: NONE49 File Output Operator50 compressed: false51 Statistics: Num rows: 37244 Data size: 7746942 Basic stats: COMPLETE Column stats: NONE52 table:53 input format: org.apache.hadoop.mapred.TextInputFormat54 output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat55 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
上面的例子我们需要特别注意有下面几点
- Map Operator Tree->Reduce Output Operator,这里的key expressions竟然是row_number() over(partition by trade_id order by mkt_val ) 中涉及的俩个字段trade_id,mkt_val。value expressions则变成了prd_no,也就是说Map输出形式key:trade_id,mkt_val,value:prd_no。 Map-reduce partition columns则是只有trade_id。
- Reduce Operator Tree->Select Operator
- 从expressions中可以看到,会有三个值key中俩个值(reducesinkkey0,reducesinkkey1)和value中一个值,
- outputColumnNames,表示最终这个阶段输出的列,由于在mapreduce是没有字段名称一说,都是用占位符来表示, _col0, _col2, _col7分别代表Map阶段输入的trade_id,prd_no,mk_val
- PTF Operator-> Windowing table definition:hive中会将partition by _col0(trade_id)规整到一个Reduce,这是时候这个reduce就是抽象意义上的一个windowing table, 并且按order by: _col7(mk_val)做了排序,这个窗口函数究竟做了什么事?是rank()over,row_nubmer(),还是sum()over? 则需要在window functions进行查看,我们看到name: row_number,实际调用的java类是GenericUDAFRowNumberEvaluator, window frame 表示这个窗口方法所要处理的数据。最终这个函数会以_wcol0列作为输出
- PTF Operator->Select Operator,则是将数据进行输出