`

spark 几种transformation 的计算逻辑和测试

阅读更多

测试例子使用的数据:
test01:
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
a a  
b b  
c c  
d d  
e e  
f f  
g g  

test02:
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
11
22
33
44
55
66
a a  
b b  
c c  
d d  
e e  
f f  


1、union(otherRDD)
      union() 将两个rdd简单结合在一起,与mysql中的union 操作类似只不过它是操作的rdd,它不会改变partition中的数据
spark sql 测试:
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
./spark-shell  
sc  
val t01 = sc.textFile("hdfs://user/data_spark/test01")  
val t02 = sc.textFile("hdfs://user/data_spark/test02")  
t01.union(t01) foreach println  
结果:  
a a  
e e  
b b  
a a  
f f  
b b  
c c  
c c  
g g  
d d  
d d  
e e  
f f  
g g  

多次测试union,结果顺序都是随机的,所以,union只是简单的将两个rdd的数据拼接到一起

2、groupByKey(numPartitions)
      普通的RDD 类是没有这个方法的,org.apache.spark.rdd.PairRDDFunctions 这个pairRdd提供这个方法;
       顾名思义,这个方法是将相同的key的records聚合在一起,类似mysql中的groupby操作,通过ShuffledRDD将每个partition中fetch过来,shuffle机制默认用的是hashShuffle,spark1.1版本引入sorted shuffle,速度更快。shuffle操作后面接着mapPartition()操作,生成MapPartitionRDD。这就是groupbykey的结果了。
      同一个key的值聚合以后,将所有的value放到一个arraylist,新的arraylist 作为value
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
val wc = t01.union(t01).flatMap(l=>l.split(" ")).map(w=>(w,1))  
wc foreach println  
结果:  
(e,1)  
(e,1)  
(e,1)  
(e,1)  
(f,1)  
(f,1)  
(f,1)  
(f,1)  
(g,1)  
(g,1)  
(g,1)  
(g,1)  
(a,1)  
(a,1)  
(a,1)  
(a,1)  
(b,1)  
(b,1)  
(b,1)  
(b,1)  
(c,1)  
(c,1)  
(c,1)  
(c,1)  
(d,1)  
(d,1)  
(d,1)  
(d,1)  
wc.groupByKey foreach println  
结果:  
(d,CompactBuffer(1, 1, 1, 1))  
(g,CompactBuffer(1, 1, 1, 1))  
(c,CompactBuffer(1, 1, 1, 1))  
(b,CompactBuffer(1, 1, 1, 1))  
(f,CompactBuffer(1, 1, 1, 1))  
(e,CompactBuffer(1, 1, 1, 1))  
(a,CompactBuffer(1, 1, 1, 1))  
ok,groupByKey 之后,将同一个key的值都放到一个列表中

3、reduceByKey(func,numPartition)
      这个操作的作用类似mapreduce中的reduce操作,对相同的key的值加上func的操作,比如要做wordcount的操作:
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
map(x=>(x,1)).reduceByKey(_+_, 5)  
      reduceByKey默认开启map端的combine,上面的groupByKey默认没有开启map端的combine操作,可以人工设置一下。
接上面的测试
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
wc.reduceByKey(_+_) foreach println  
结果:  
(d,4)  
(b,4)  
(f,4)  
(g,4)  
(c,4)  
(e,4)  
(a,4)  

4、distinct(numPartitions)
      将 parent rdd 的数据去重,放到新的numPartitions,还是要通过shuffle操作,如果是kv pair 的数据  则直接进行shuffle 操作,如果只有key,那么spark先将数据转换成再进行shuffle。其实后面调用的是reduceByKey()
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc.distinct(1) foreach println  
结果:  
(g,1)  
(b,1)  
(f,1)  
(d,1)  
(a,1)  
(e,1)  
(c,1)  

5、cogroup(otherRDD,numPartitions)
     与groupByKey不同的地方,cogroup 是将多个rdd的数据聚合到一起,过程跟groupByKey 类似.
     但是结果是一个包含多个arraylist 的arraylist,每一个rdd 的value放到一个arraylist,然后,将这些arraylist放到一个元素的arraylist的arraylist。
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
val wc01 = t01.flatMap(l=>l.split(" ")).map(w=>(w,1))  
val wc02 = t02.flatMap(l=>l.split(" ")).map(w=>(w,1))  
wc01.cogroup(wc02,1) foreach println  
结果:  
(d,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  
(e,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  
(4,(CompactBuffer(),CompactBuffer(1, 1)))  
(5,(CompactBuffer(),CompactBuffer(1, 1)))  
(a,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  
(6,(CompactBuffer(),CompactBuffer(1, 1)))  
(b,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  
(2,(CompactBuffer(),CompactBuffer(1, 1)))  
(3,(CompactBuffer(),CompactBuffer(1, 1)))  
(f,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  
(1,(CompactBuffer(),CompactBuffer(1, 1)))  
(g,(CompactBuffer(1, 1),CompactBuffer()))  
(c,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  


6、intersection(otherRDD)
     这个操作值保留两个rdd中都包含的数据,首先将rdd的数据转化成,后面调用cogroup()操作。
     然后,  对cogroup结果进行过滤,由前面cogroup 的结果格式介绍可知,会生成包含两个arraylist元素的arraylist,只保留结果中两个arraylist都不为空的,最后取出key,便是最终的结果。
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc01.intersection(wc02) foreach println  
结果:  
(d,1)  
(e,1)  
(b,1)  
(f,1)  
(a,1)  
(c,1)  
只有两个rdd共同的部分 kv 对

7、join(otherRDD, numPartitions)
     将两个RDD[ K, V ] 安装sql中的join方式聚合。类似intersection,先进行cogroup操作,得到 的MappedValuesRDD。
     将 Iterable[v1] 和 Iterable[v2] 做笛卡尔集,并将集合flat()化,生成FlatMappedValuesRDD。
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
wc01.join(wc02,1) foreach println  
结果:  
(d,(1,1))  
(d,(1,1))  
(d,(1,1))  
(d,(1,1))  
(e,(1,1))  
(e,(1,1))  
(e,(1,1))  
(e,(1,1))  
(a,(1,1))  
(a,(1,1))  
(a,(1,1))  
(a,(1,1))  
(b,(1,1))  
(b,(1,1))  
(b,(1,1))  
(b,(1,1))  
(f,(1,1))  
(f,(1,1))  
(f,(1,1))  
(f,(1,1))  
(c,(1,1))  
(c,(1,1))  
(c,(1,1))  
(c,(1,1))  
这个jion应该对应于mysql的inner join,只包含双方共有的数据

8、sortByKey(ascending,numPartitions)
     将RDD [ k, v ] 按照key进行排序,如果ascending=true表示升序,false表示降序。
     先通过shuffle将数据聚合到一起,然后将聚合的数据按照key排序
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc01.sortByKey(true,1) foreach println  
结果:  
(a,1)  
(a,1)  
(b,1)  
(b,1)  
(c,1)  
(c,1)  
(d,1)  
(d,1)  
(e,1)  
(e,1)  
(f,1)  
(f,1)  
(g,1)  
(g,1)  

9、cartesian(otherRDD)
       求两个rdd的笛卡尔集,生成的CartesianRDD中的partition个数为两个rdd的partition的个数乘积。
       逻辑类似join
笛卡尔乘积,这个很简单,不过数据量大的话就不要这么做了

10、coalesce(numPartitions, shuffle = false) 
        合并,对一个rdd,两种方式,一种需要shuffle,一种直接将多个partitions的内容合并到一起,不需要shuffle。
        这个方法的主要作用就是调整 parentRDD 的partition数量。合并因素除了考虑partition的个数外,还应该考虑locality 和 balance的问题
这个操作的逻辑比较难理解:
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
 wc01.coalesce(1) foreach println  
结果:  
14/10/2717:23:42 INFO rdd.HadoopRDD: Input split: hdfs://qunarcluster/user/data_spark/test01:0+14
(a,1)  
(a,1)  
(b,1)  
(b,1)  
(c,1)  
(c,1)  
(d,1)  
(d,1)  
14/10/2717:23:42 INFO rdd.HadoopRDD: Input split: hdfs://qunarcluster/user/data_spark/test01:14+14
(e,1)  
(e,1)  
(f,1)  
(f,1)  
(g,1)  
(g,1)  
 wc01.coalesce(2) foreach println  
结果:  
(a,1)  
(a,1)  
(b,1)  
(b,1)  
(c,1)  
(c,1)  
(e,1)  
(d,1)  
(e,1)  
(d,1)  
(f,1)  
(f,1)  
(g,1)  
(g,1)  

11、repartition(numPartitions)        
        等价于coalesce(numPartitions, shuffle = true)
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc01.repartition(1) foreach println  
(a,1)  
(a,1)  
(b,1)  
(b,1)  
(c,1)  
(c,1)  
(d,1)  
(d,1)  
(e,1)  
(e,1)  
(f,1)  
(f,1)  
(g,1)  
(g,1)  



虽然mapreduce 相当于 spark 的 map +  reduceByKey, 但是 mapreduce中的reduce可以灵活的操作,加入一些自己的逻辑,所以,各有所长。
但是,spark 确实很方便

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics