教育行業(yè)A股IPO第一股(股票代碼 003032)

全國咨詢/投訴熱線:400-618-4000

GroupingComparator應用案例

更新時間:2015年12月29日14時58分 來源:傳智播客云計算學科 瀏覽次數:

MapReduce中的GroupingComparator應用案例
在日常的數據統(tǒng)計分析中,常常會有類似如下的求分組最大值統(tǒng)計需求,用到的數據示例如下:
itemid    amount    date    …
10001    136.6    2015-1-12    …
10001    165.5    2015-1-12    …
10002    122.5    2015-1-12    …
10002    166.88    2015-1-12    …
10003    189.65    2015-1-12    …
10003    198.62    2015-1-13    …
10001    278.6    2015-1-13    …
10001    143.6    2015-1-13    …

需求是求出整個數據集中每一種商品銷售額最大的單筆訂單,結果如下:
10001    278.60
10002    166.88
10003    198.62
... ...
如果用傳統(tǒng)sql來求解,這是極其簡單的:
select itemid,max(amount) from t_order group by itemid;

而用mapreduce程序,該如何實現呢?最簡單的辦法是:
1、在mapper中將日志的每一行解析成鍵值對: “key: itemid ,value:amount”
2、經過shuffle之后,相同itemid的數據會發(fā)送給同一個reducer
3、然后,我們就可以在reducer中遍歷某個item的一組values,
4、這一組values對于amount來說是無序的,進而需要在reducer中緩存這一組values,然后排序從而取到這一組values中的最大值。

這個辦法固然可行,但是效率不是很高,因為在reducer中針對一組values取最大amount,需要在內存中進行緩存并排序,在數據量大的情況下,會耗費相當多的內存空間和cpu運算資源,甚至可能會內存溢出。

現在,就讓我們來思考另一種實現方式,如果能讓數據到達reducer時的次序是針對amount的倒序,則我們可以直接取改組values的第一個值即可,如何實現呢?
1、首先,我們構造一個bean<itemid,amount> implements WritableComparable作為mapper輸出的key來傳遞數據,在其compareTo()方法中定義邏輯:按照itemid升序及amount降序,這樣一來,mapper輸出的數據就會按照amount降序排列,示例如下:
<10001,278.60>
<10001,165.50>
<10001,136.60>
<10002,166.88>
<10002,122.5>
.......
2、但是,這樣一來,又帶來一個棘手的問題——相同item的bean在shuffle時不一定發(fā)往同一個reducer!因為每一個bean(就算是相同itemid)都是一個不同的對象,而默認HashPartitioner分區(qū)的邏輯是用bean的hashcode計算分區(qū)號。從而,需要自定義一個ItemPartitioner,實現將相同itemid的bean發(fā)往同一個reducer,代碼如下所示:
class ItemPartitioner extends Partitioner{
int getPartition(bean,numreducertasks){
      return bean.getItemid.hashCode() % numreducertasks;
}
}

這樣一來,可以保證相同item的數據會到達同一個reducer,并且是按照amount降序排序,如下所示:
<10001,278.60>
<10001,165.50>
<10001,136.60>
.......

3、接下來,就是如何取到這一組values中的最大值。
在默認情況下,reducer會將拿到的數據按照相同key進行聚合,然后對聚合起來的每一組數據調用一次reduce方法,此處麻煩的問題是,這里的每一個key都是一個對象,從而,就算是相同itemid的數據,也不會聚合到一組,而是會逐一地調用reduce()方法進行處理,這樣一來,我們也就沒辦法取到最大值了;
4、要解決這個問題,就得借助GroupingComparator了,其工作機制是這樣:
當mapper輸出的相同partition的kv數據到達一個Reducer后,會有一個聚合的過程,即將“相同”key的kv聚合到一起(其實質是利用GroupingComparator來對key進行比較),然后將這一組聚合好的kv中最前面的一個kv的key傳給reduce方法的入參key,將一個用來遍歷這一組kv數據的values的迭代器iterator傳給reduce方法的入參iterator。

5、從而,我們可以自定義一個GroupingComparator來定義哪些kv可以聚合成一組,代碼示例如下:
public class GroupingComparator extends WritableComparator{
protected  GroupingComparator() {
super(Bean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Bean kv1 = (Bean) a;
Bean kv2 = (Bean) b;
int cmp = kv1.getItemid().compareTo(kv2.getItemid());
return cmp;
}
}


6、這樣一來,雖然不同的bean是不同的對象,但是在進行聚合的時候,根據GroupingComparator ,只要是itemid相同的bean都會算成一組聚合kv,然后這一組聚合kv的最前面一個kv(也就是amount值最大的那一個)會傳入reduce方法的入參key,從而,在我們的reduce方法中,只要直接輸出這個key就ok了:
@Override
protected void reduce(Bean bean,Iterable<NullWritable> arg1,Context context)throws IOException, InterruptedException {
context.write(bean, NullWritable.get());
}

當然,要想讓這個GroupingComparator 生效,還需要在job中進行注冊:
job.setGroupingComparatorClass(GroupingComparator.class);


綜上所述,該案例需要自定義這幾個元素:
     自定義的復合key
     自定義的partitioner
     自定義的GroupingComparator
0 分享到:
和我們在線交談!