mapPartitions算子:数据转换(分区批处理)import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** * mapPartitions VS map * * map 传入的是分区中的每个元素,是对每个元素就进行一次转换和改变,但不会减少或增多元素 * mapPartitions 传入的参数是Iterator返回值也是Iterator,所传入的计算逻辑是对一个Iterator进行一次运算,可以增加或减少元素 * * * Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高 。* 但是 mapPartitions 算子会长时间占用内存,这样会导致内存OOM 。而map会在内存不够时进行GC 。* * 详细参考 https://blog.csdn.net/AnameJL/article/details/121689987 */object mapPartitions {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)// mapPartitions: 可以以分区为单位进行数据转换操作,但是会将整个分区的数据加载到内存进行引用 。// 在内存较小,数据量较大的场合下,容易出现内存溢出 。val mpRDD: RDD[Int] = rdd.mapPartitions(iter => {println("批处理当前分区数据")iter.map(_ * 2)})mpRDD.collect().foreach(println)sc.stop()}}
mapPartitionsWithIndex算子:分区索引 + 数据迭代器import org.apache.spark.{SparkConf, SparkContext}// 分区索引object mapPartitionsWithIndex {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)val mpiRDD = rdd.mapPartitionsWithIndex(//(分区索引, 数据迭代器)(index, iter) => {println("index:" + index, "iter[" + iter.mkString(",") + "]")})mpiRDD.collect().foreach(println)sc.stop()}}
flatMap算子:数据扁平化import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// 将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射object flatMap {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd: RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4)))// 多个list合并成一个listval flatRDD: RDD[Int] = rdd.flatMap(list => list)flatRDD.collect().foreach(println)sc.stop()}}
glom算子:分区内数据合并import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// 将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变object glom {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)// 把每一个分区内数据合并成Arrayval glomRDD: RDD[Array[Int]] = rdd.glom()glomRDD.collect().foreach(array => {println(array.mkString(","))})sc.stop()}}
groupBy算子:数据分组import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// 将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle 。// 极限情况下,数据可能被分在同一个分区中一个组的数据在一个分区中,但是并不是说一个分区中只有一个组,分组和分区没有必然的关系object groupBy {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)// groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组,相同的key值的数据会放置在一个组中// val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(num => num % 2)val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)groupRDD.collect().foreach(println)sc.stop()}}
经验总结扩展阅读
- 惊艳我的朋友圈个性签名 句句高品位的ins风签名
- 祝我的男孩儿子 文案 很潮又短的生日祝福
- 生日文案给儿子 愿我的大男孩生日快乐的句子
- 我的汤姆猫里的2048怎么玩(能玩2048的汤姆猫)
- JVM学习笔记——垃圾回收篇
- 我的世界怎么去月球无模组无指令(我的世界新版怎么去月球)
- 我的世界怎么去月球,我的世界手机版月球传送门怎么做
- 我的世界怎么去月球模组免费(mc月球模组)
- 我的世界虚无3月球怎么去(我的世界虚无世界怎么找传送门)
- 我的世界怎么去月球(我的世界惊变100天)