hive的多表连接,都会转换成多个MR job,每一个MR job在hive中均称为Join阶段。按照join程序最后一个表应该尽量是大表,因为join前一阶段生成的数据会存在于Reducer 的buffer中,通过stream最后面的表,直接从Reducer中读取已经缓冲的中间数据结果,与后面的大表进行连接时,只需要从buffer中读取缓存的key,与大表中的指定key进行连接,速度更快,也避免内存缓冲区溢出。
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1);
另外,也可以通过一些hint信息来启发join操作,即指定那个表作为大表:
SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)
这样,就会先对表b和c进行join。
hive本身支持的子查询非常有限,Hive不支持where子句中的子查询,只允许子查询在from中出现
错误写法:
insert into table branch_atmzc_sumSelect XT_OP_TRL, SA_TX_DT,"取款-存款",b.cr_tx_amt- a.cr_tx_amt as cr_tx_amt from branch_atmzc a join branch_atmzc b on (a.XT_OP_TRL = b.XT_OP_TRL and a.SA_TX_DT = b.SA_TX_DT and a.tran_cd = 'ATM存款' and b.tran_cd = 'ATM取款'), counts from branch_atmzcgroup by XT_OP_TRL, SA_TX_DT,cr_tx_amt,counts;
正确写法:
insert into table branch_atmzc_sumSelect a.XT_OP_TRL, a.SA_TX_DT,"取款-存款",b.cr_tx_amt- a.cr_tx_amt ,b.counts+a.counts from branch_atmzc a join branch_atmzc b on (a.XT_OP_TRL = b.XT_OP_TRL and a.SA_TX_DT = b.SA_TX_DT and a.tran_cd = 'ATM存款' and b.tran_cd = 'ATM取款')
下面详细介绍各种连接方式的应用和效率
hive的join类型及其就是把MR 中的几种方式都封装实现了,其中代表性的有join on、 left semi join 是使用频率最高的。
join on 属于common join(shuffle join/reduce join),而left semi join 属于map join(broadcast join)的一一种变体。实现上原理有差异。
Common Join
最普通的join,即reduce side join,最没效率的一种方式,由一个mapreduce job完成。
实现原理:先对大表和小表分别进行map操作,在map shuffle的阶段每一个map output key 变成了table_name_tag_prefix + join_column_value , 但是在进行partition 的时候它仍然只使用join_column_value 进行hash.
每一个reduce 接受所有的map 传过来的split , 在reduce 的shuffle 阶段,它将map output key 前面的table_name_tag_prefix 给舍弃掉进行比较. 因为reduce 的个数可以由小表的大小进行决定,所以对于每一个节点的reduce 一定可以将小表的split 放入内存变成hashtable. 然后将大表的每一条记录进行一条一条的比较.
基于条件的 LEFT OUTER JOIN 优化
左连接时,左表中出现的join字段都保留,右表没有连接上的都为空。对于带where条件的join语句,如下:
SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key)WHERE a.ds='2009-07-07' AND b.ds='2009-07-07'
执行顺序是,首先完成两个表的join,然后再通过where条件过滤,这样在join时会输出大量结果,耗时。
进一步优化:把where条件放在on后,比如:
SELECT a.val, b.val FROM a LEFT OUTER JOIN bON (a.key=b.key AND b.ds='2009-07-07' AND a.ds='2009-07-07'
这样子,在join时,会对不满足条件的记录先预先过滤,效果更好。
左半连接(LEFT SEMI JOIN)
采用半连接的原因是:对于reduce side join,跨机器的传输量非常大,如果能够在map端过滤掉不需要进行join操作的数据,可以节省IO,提高效率。
实现原理:选取一个小表,假设是File1,将其参与join的可以抽取出来,保存到File3,一般都很好可以直接放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker,然后可以将File2中不在File3中记录过滤掉,不参加join操作,然后剩下的reduce操作还是和reduce side join是一样的。
所以根据原理,更好理解:
(1)用了LEFT SEMI JOIN子句以后,右边的表在JOIN操作以外就不可见了,表b只能出现在on子句后面,不能出现在select和where语句中了,相当于右表只有join key参与关联计算了。
(2)对待右表中重复(key)时,因为left semi join是in(keySet)的关系,左表会直接跳过,而如果是join on 则会一直遍历。
左半连接类似IN/EXISTS的查询语句,对比如下:
SQL:SELECT a.key, a.value FROM a WHERE a.key IN (SELECT b.key FROM b);Hive:SELECT a.key, a.val FROM a LEFT SEMI JOIN b ON (a.key = b.key)
注:b是其小表,b中只有join key 参与运算,不能出现在select 和where的筛选中。
关于子查询,这里提一下,Hive支持情况如下:
- 在0.12版本,只支持FROM子句中的子查询;
- 在0.13版本,也支持WHERE子句中的子查询。
join on 与 left semi on 比较输出特殊情况:
Left semi join:select a.level2, a.name2, cast((a.alipay_fee) as double) as pay, cast(0 as double) as pay2 from test1 a left semi join test2 b on (a.level2 = b.cat_id2 and a.brand_id = b.brand_id and b.cat_id2 > 0 and b.brand_id > 0 and b.max_price = 0 )Join on:select a.level2, a.name2, cast((a.alipay_fee) as double) as pay, cast(0 as double) as pay 2 from test1 a join test2 b on (a.level2 = b.cat_id2 and a.brand_id = b.brand_id) where b.cat_id2 > 0 and b.brand_id > 0 and b.max_price = 0
陷阱:统计得到结果不一致,这是一个陷阱,因为子表中test2 b中存在重复的数据。当join on 时,a ,b表会关联到两条记录,在on上条件符合。
而当使用left semi join时,当A表中的记录,在B表上产生符合条件就返回,不会再继续查找B表的记录,所以即使有重复,也不会产生多条记录。
所以大多数情况下,两种方式是对等的,只有在有重复的记录时,要小心一点。
Map Side Join
MapJoin 即是 Map任务输出后,不需要将数据拷贝到Reducer节点,降低的数据在网络节点之间传输的开销。
多表连接,如果只有一个表比较大,其他表都很小,则JOIN操作会转换成一个只包含Map的Job,例如SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a JOIN b ON a.key = b.key;
所以对于表a的每一个map,都能够完全读取表b的数据。这里,表a和b不允许FULL OUTER JOIN、RIGHT OUTER JOIN。
BUCKET Map Side JOIN
我们先看两个表a和b的DDL,表a为:
CREATE TABLE a(key INT, othera STRING)CLUSTERED BY(key) INTO 4 BUCKETSROW FORMAT DELIMITEDFIELDS TERMINATED BY '\001'COLLECTION ITEMS TERMINATED BY '\002'MAP KEYS TERMINATED BY '\003'STORED AS SEQUENCEFILE;
表b为:
CREATE TABLE b(key INT, otherb STRING)CLUSTERED BY(key) INTO 32 BUCKETSROW FORMAT DELIMITEDFIELDS TERMINATED BY '\001'COLLECTION ITEMS TERMINATED BY '\002'MAP KEYS TERMINATED BY '\003'STORED AS SEQUENCEFILE;
SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a JOIN b ON a.key = b.key
并且表a有4个BUCKET,表b有32个BUCKET,默认情况下,对于表a的每一个BUCKET,都会去获取表b中的每一个BUCKET来进行JOIN,这回造成一定的开销,因为只有表b中满足JOIN条件的BUCKET才会真正与表a的BUCKET进行连接。
这种默认行为可以进行优化,通过改变默认JOIN行为,只需要设置变量:set hive.optimize.bucketmapjoin = true
这样,JOIN的过程是,表a的BUCKET 1只会与表b中的BUCKET 1进行JOIN,而不再考虑表b中的其他BUCKET 2~32。
如果上述表具有相同的BUCKET,如都是32个,而且还是排序的,亦即,在表定义中在CLUSTERED BY(key)后面增加如下约束:SORTED BY(key)
则上述JOIN语句会执行一个Sort-Merge-Bucket (SMB) JOIN,同样需要设置如下参数来改变默认行为,优化JOIN时只遍历相关的BUCKET即可:
sethive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set hive.optimize.bucketmapjoin = true
set hive.optimize.bucketmapjoin.sortedmerge = true;
例子:
SELECT t1.产品类型, COUNT(DISTINCT (IF(t2.用户ID IS NULL, NULL, t1.用户ID))) AS KEEP_UVFROM( SELECT 产品类型, 用户ID FROM 事实表 WHERE (`DATE` >= 20140201 AND `DATE` <= 20140228)) t1LEFT OUTER JOIN( SELECT 产品类型, 用户ID FROM 事实表 WHERE (`DATE` >= 20140101 AND `DATE` <= 20140131)) t2 ON (t1.产品类型 = t2.产品类型 AND t1.用户ID = t2.用户ID)GROUP BY t1.产品类型
本身表包含的字段信息多,时间跨度大。对于这种对于IN / EXISTS子查询(准确地说,这里是非相关子查询)有一种高效的实现,就是LEFT SEMI JOIN:
LEFT SEMI JOIN implements the uncorrelated IN/EXISTS subquery semantics in an efficient way.
left join
SELECT 产品类型, COUNT(DISTINCT t1.用户ID) AS KEEP_UVFROM( SELECT 产品类型, 用户ID FROM 事实表 WHERE (`DATE` >= 20140201 AND `DATE` <= 20140228)) t1LEFT SEMI JOIN( SELECT 产品类型, 用户ID FROM 事实表 WHERE (`DATE` >= 20140101 AND `DATE` <= 20140131)) t2 ON (t1.产品类型 = t2.产品类型 AND t1.用户ID = t2.用户ID)GROUP BY 产品类型
参考链接:,
配置参数也是hive优化的重要方面:参考,