测试例子使用的数据:
test01:
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
a a
b b
c c
d d
e e
f f
g g
test02:
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
11
22
33
44
55
66
a a
b b
c c
d d
e e
f f
1、union(otherRDD)
union() 将两个rdd简单结合在一起,与mysql中的union 操作类似只不过它是操作的rdd,它不会改变partition中的数据
spark sql 测试:
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
./spark-shell
sc
val t01 = sc.textFile("hdfs://user/data_spark/test01")
val t02 = sc.textFile("hdfs://user/data_spark/test02")
t01.union(t01) foreach println
结果:
a a
e e
b b
a a
f f
b b
c c
c c
g g
d d
d d
e e
f f
g g
多次测试union,结果顺序都是随机的,所以,union只是简单的将两个rdd的数据拼接到一起
2、groupByKey(numPartitions)
普通的RDD 类是没有这个方法的,org.apache.spark.rdd.PairRDDFunctions 这个pairRdd提供这个方法;
顾名思义,这个方法是将相同的key的records聚合在一起,类似mysql中的groupby操作,通过ShuffledRDD将每个partition中fetch过来,shuffle机制默认用的是hashShuffle,spark1.1版本引入sorted shuffle,速度更快。shuffle操作后面接着mapPartition()操作,生成MapPartitionRDD。这就是groupbykey的结果了。
同一个key的值聚合以后,将所有的value放到一个arraylist,新的arraylist 作为value
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
val wc = t01.union(t01).flatMap(l=>l.split(" ")).map(w=>(w,1))
wc foreach println
结果:
(e,1)
(e,1)
(e,1)
(e,1)
(f,1)
(f,1)
(f,1)
(f,1)
(g,1)
(g,1)
(g,1)
(g,1)
(a,1)
(a,1)
(a,1)
(a,1)
(b,1)
(b,1)
(b,1)
(b,1)
(c,1)
(c,1)
(c,1)
(c,1)
(d,1)
(d,1)
(d,1)
(d,1)
wc.groupByKey foreach println
结果:
(d,CompactBuffer(1, 1, 1, 1))
(g,CompactBuffer(1, 1, 1, 1))
(c,CompactBuffer(1, 1, 1, 1))
(b,CompactBuffer(1, 1, 1, 1))
(f,CompactBuffer(1, 1, 1, 1))
(e,CompactBuffer(1, 1, 1, 1))
(a,CompactBuffer(1, 1, 1, 1))
ok,groupByKey 之后,将同一个key的值都放到一个列表中
3、reduceByKey(func,numPartition)
这个操作的作用类似mapreduce中的reduce操作,对相同的key的值加上func的操作,比如要做wordcount的操作:
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
map(x=>(x,1)).reduceByKey(_+_, 5)
reduceByKey默认开启map端的combine,上面的groupByKey默认没有开启map端的combine操作,可以人工设置一下。
接上面的测试
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
wc.reduceByKey(_+_) foreach println
结果:
(d,4)
(b,4)
(f,4)
(g,4)
(c,4)
(e,4)
(a,4)
4、distinct(numPartitions)
将 parent rdd 的数据去重,放到新的numPartitions,还是要通过shuffle操作,如果是kv pair 的数据 则直接进行shuffle 操作,如果只有key,那么spark先将数据转换成再进行shuffle。其实后面调用的是reduceByKey()
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc.distinct(1) foreach println
结果:
(g,1)
(b,1)
(f,1)
(d,1)
(a,1)
(e,1)
(c,1)
5、cogroup(otherRDD,numPartitions)
与groupByKey不同的地方,cogroup 是将多个rdd的数据聚合到一起,过程跟groupByKey 类似.
但是结果是一个包含多个arraylist 的arraylist,每一个rdd 的value放到一个arraylist,然后,将这些arraylist放到一个元素的arraylist的arraylist。
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
val wc01 = t01.flatMap(l=>l.split(" ")).map(w=>(w,1))
val wc02 = t02.flatMap(l=>l.split(" ")).map(w=>(w,1))
wc01.cogroup(wc02,1) foreach println
结果:
(d,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(e,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(4,(CompactBuffer(),CompactBuffer(1, 1)))
(5,(CompactBuffer(),CompactBuffer(1, 1)))
(a,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(6,(CompactBuffer(),CompactBuffer(1, 1)))
(b,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(2,(CompactBuffer(),CompactBuffer(1, 1)))
(3,(CompactBuffer(),CompactBuffer(1, 1)))
(f,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(1,(CompactBuffer(),CompactBuffer(1, 1)))
(g,(CompactBuffer(1, 1),CompactBuffer()))
(c,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
6、intersection(otherRDD)
这个操作值保留两个rdd中都包含的数据,首先将rdd的数据转化成,后面调用cogroup()操作。
然后, 对cogroup结果进行过滤,由前面cogroup 的结果格式介绍可知,会生成包含两个arraylist元素的arraylist,只保留结果中两个arraylist都不为空的,最后取出key,便是最终的结果。
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc01.intersection(wc02) foreach println
结果:
(d,1)
(e,1)
(b,1)
(f,1)
(a,1)
(c,1)
只有两个rdd共同的部分 kv 对
7、join(otherRDD, numPartitions)
将两个RDD[ K, V ] 安装sql中的join方式聚合。类似intersection,先进行cogroup操作,得到 的MappedValuesRDD。
将 Iterable[v1] 和 Iterable[v2] 做笛卡尔集,并将集合flat()化,生成FlatMappedValuesRDD。
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
wc01.join(wc02,1) foreach println
结果:
(d,(1,1))
(d,(1,1))
(d,(1,1))
(d,(1,1))
(e,(1,1))
(e,(1,1))
(e,(1,1))
(e,(1,1))
(a,(1,1))
(a,(1,1))
(a,(1,1))
(a,(1,1))
(b,(1,1))
(b,(1,1))
(b,(1,1))
(b,(1,1))
(f,(1,1))
(f,(1,1))
(f,(1,1))
(f,(1,1))
(c,(1,1))
(c,(1,1))
(c,(1,1))
(c,(1,1))
这个jion应该对应于mysql的inner join,只包含双方共有的数据
8、sortByKey(ascending,numPartitions)
将RDD [ k, v ] 按照key进行排序,如果ascending=true表示升序,false表示降序。
先通过shuffle将数据聚合到一起,然后将聚合的数据按照key排序
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc01.sortByKey(true,1) foreach println
结果:
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(d,1)
(d,1)
(e,1)
(e,1)
(f,1)
(f,1)
(g,1)
(g,1)
9、cartesian(otherRDD)
求两个rdd的笛卡尔集,生成的CartesianRDD中的partition个数为两个rdd的partition的个数乘积。
逻辑类似join
笛卡尔乘积,这个很简单,不过数据量大的话就不要这么做了
10、coalesce(numPartitions, shuffle = false)
合并,对一个rdd,两种方式,一种需要shuffle,一种直接将多个partitions的内容合并到一起,不需要shuffle。
这个方法的主要作用就是调整 parentRDD 的partition数量。合并因素除了考虑partition的个数外,还应该考虑locality 和 balance的问题
这个操作的逻辑比较难理解:
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
wc01.coalesce(1) foreach println
结果:
14/10/2717:23:42 INFO rdd.HadoopRDD: Input split: hdfs://qunarcluster/user/data_spark/test01:0+14
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(d,1)
(d,1)
14/10/2717:23:42 INFO rdd.HadoopRDD: Input split: hdfs://qunarcluster/user/data_spark/test01:14+14
(e,1)
(e,1)
(f,1)
(f,1)
(g,1)
(g,1)
wc01.coalesce(2) foreach println
结果:
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(e,1)
(d,1)
(e,1)
(d,1)
(f,1)
(f,1)
(g,1)
(g,1)
11、repartition(numPartitions)
等价于coalesce(numPartitions, shuffle = true)
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc01.repartition(1) foreach println
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(d,1)
(d,1)
(e,1)
(e,1)
(f,1)
(f,1)
(g,1)
(g,1)
虽然mapreduce 相当于 spark 的 map + reduceByKey, 但是 mapreduce中的reduce可以灵活的操作,加入一些自己的逻辑,所以,各有所长。
但是,spark 确实很方便
分享到:
相关推荐
Spark_Transformation和Action算子.md
Spark核心概念简介: Spark使用maven进行打包(减少jar包大小): Spark中的(弹性分布式数据集)简称RDD: ...SparkStreaming中的正常操作(每读2秒就计算一次): Spark中的local[2]: Spark中的处理流程图像:
spark transformation & action 算子速查表,大数据实时和离线数据处理方向,希望对大家学习和工作有所帮助。
第3章 Spark计算模型 3.1 Spark程序模型 3.2 弹性分布式数据集 3.2.1 RDD简介 3.2.2 RDD与分布式共享内存的异同 3.2.3 Spark的数据存储 3.3 Spark算子分类及功能 33.3.1 Value型Transformation算子 3.3.2 Key-Value...
Feature Extraction and Transformation - RDD-based API(特征的提取和转换) Frequent Pattern Mining - RDD-based API(频繁模式挖掘) Evaluation metrics - RDD-based API(评估指标) PMML model export -...
Informatica Expression Transformation组件 详解
Informatica Union Transformation组件 详解
6. 解释Spark中的转换(Transformation)和动作(Action)。 7. Spark支持哪些语言进行数据处理? 8. 什么是Spark的Lineage(血统)? 9. 解释广播变量和累加器。 10. Spark SQL是什么,它有什么优势? 11. 什么是...
You will explore RDD and its associated common Action and Transformation Java APIs, set up a production-like clustered environment, and work with Spark SQL. Moving on, you will perform near-real-time...
本篇文档介绍了我们在生产中常用的transformation和action
Spark RDD 算子说明,分别讲述了Transformation和Action这两类的算子。
Data-Driven Transformation-Leveraging Big Data at SHOWTIME with Apache Spark Tangram-Distributed Scheduling Framework for Apache Spark at Facebook Near Real-Time Analytics with Apache Spark Improving...
Vista Transformation Pack 9.0.1
Informatica Joiner Transformation组件 详解
fab-transformation.zip,太多无法一一验证是否可用,程序如果跑不起来需要自调,部分代码功能进行参考学习。
在java应用程序中集成应用kettle,实现调用资源库上和本地的transformation和job。
Informatica Java Transformation
Hauseholder and Givens Transformation应用。
关于spark较为简单的算子讲义 和相对的用法,基于scala语言
本文将围绕Spark的基本概念、使用方法和功能进行讲解,帮助您快速上手Spark。 知识领域:大数据处理、Scala编程、SparkContext、RDD、actions 技术关键词:Spark、大数据、Scala、SparkContext、RDD、actions 内容...