我的Spark学习笔记( 四 )

sortBy算子:数据排序import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** * 该操作用于排序数据 。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列 。* 排序后新产生的 RDD 的分区数与原 RDD 的分区数一致 。中间存在shuffle的过程 。*/object sortBy {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// 例子1val rdd = sc.makeRDD(List(6, 2, 4, 5, 3, 1), 2)val newRDD: RDD[Int] = rdd.sortBy(n => n)println(newRDD.collect().mkString(","))newRDD.saveAsTextFile("output")// 例子2val rdd2 = sc.makeRDD(List(("1", 1), ("3", 2), ("2", 3)), 2)// sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序的方式// sortBy默认情况下,不会改变分区 。但是中间存在shuffle操作val newRDD1 = rdd2.sortBy(t => t._1.toInt, false) // 降序val newRDD2 = rdd2.sortBy(t => t._1.toInt, true) // 升序newRDD1.collect().foreach(println)newRDD2.collect().foreach(println)sc.stop()}}intersection union subtract zip:两个数据源 交 并 差 拉链/** * 两个数据源 交 并 差 拉链 */object intersection_union_subtract_zip {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// 交集,并集和差集要求两个数据源数据类型保持一致val rdd1 = sc.makeRDD(List(1, 2, 3, 4))val rdd2 = sc.makeRDD(List(3, 4, 5, 6))// 交集 : 【3,4】val rdd3: RDD[Int] = rdd1.intersection(rdd2)println(rdd3.collect().mkString(","))// 并集 : 【1,2,3,4,3,4,5,6】val rdd4: RDD[Int] = rdd1.union(rdd2)println(rdd4.collect().mkString(","))// 差集 : 【1,2】val rdd5: RDD[Int] = rdd1.subtract(rdd2)println(rdd5.collect().mkString(","))// 拉链 : 【1-3,2-4,3-5,4-6】val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)println(rdd6.collect().mkString(","))// 拉链操作两个数据源的类型可以不一致,但要求分区中数据数量保持一致val rdd7 = sc.makeRDD(List("a", "b", "c", "d"))val rdd8 = rdd1.zip(rdd7)println(rdd8.collect().mkString(","))sc.stop()}}partitionBy算子:数据按照指定规则重新进行分区import org.apache.spark.rdd.RDDimport org.apache.spark.{HashPartitioner, SparkConf, SparkContext}/** * partitionBy:数据按照指定规则重新进行分区 。Spark 默认的分区器是 HashPartitioner * repartition coalesce:将分区增加或缩小,数据是无规则的 */object partitionBy {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)// PairRDDFunctions才支持partitionBy,所以需要先转换成mapRDDval mapRDD: RDD[(Int, Int)] = rdd.map(num => (num, 1))// partitionBy根据指定的分区规则对数据进行重分区val newRDD = mapRDD.partitionBy(new HashPartitioner(2))newRDD.partitionBy(new HashPartitioner(2))newRDD.saveAsTextFile("output")sc.stop()}}reduceByKey算子:按相同key聚合import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** * 可以将数据按照相同的 Key 对 Value 进行聚合 */object reduceByKey {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))// reduceByKey : 相同的key的数据进行value数据的聚合操作// scala语言中一般的聚合操作都是两两聚合,spark基于scala开发的,所以它的聚合也是两两聚合// reduceByKey中如果key的数据只有一个,是不会参与运算的 。val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey((x: Int, y: Int) => {println(s"x = ${x}, y = ${y}")x + y})reduceRDD.collect().foreach(println)sc.stop()}}

经验总结扩展阅读