filter算子:数据过滤import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// 将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃 。// 当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜 。object filter {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4))val filterRDD: RDD[Int] = rdd.filter(num => num % 2 != 0)filterRDD.collect().foreach(println)sc.stop()}}
sample算子:数据采样随机抽取import org.apache.spark.{SparkConf, SparkContext}// 根据指定的规则从数据集中抽取数据object sample {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val dataRDD = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 1)// 抽取数据不放回(伯努利算法)// 伯努利算法:又叫 0、 1 分布 。例如扔硬币,要么正面,要么反面 。// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要// 第一个参数:抽取的数据是否放回, false:不放回// 第二个参数:抽取的几率,范围只能在[0,1]之间,0:全不取; 1:全取;// 第三个参数:随机数种子val dataRDD1 = dataRDD.sample(false, 0.5)// 抽取数据放回(泊松算法)// 第一个参数:抽取的数据是否放回, true:放回; false:不放回// 第二个参数:重复数据的几率,范围大于等于0,可以大于1 表示每一个元素被期望抽取到的次数// 第三个参数:随机数种子// 例如数据集内有10个,fraction为1的话抽取10个, 0.5的话抽取5个,2的话抽取20个val dataRDD2 = dataRDD.sample(true, 2)println(dataRDD1.collect().mkString(","))println(dataRDD2.collect().mkString(","))sc.stop()}}
distinct算子:数据去重import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object distinct {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))val rdd1: RDD[Int] = rdd.distinct()val rdd2: RDD[Int] = rdd.distinct(2)// 底层相当于这样写val rdd3 = rdd.map(x => (x, null)).reduceByKey((x, _) => x).map(_._1)println(rdd.collect().mkString(","))println(rdd1.collect().mkString(","))println(rdd2.collect().mkString(","))println(rdd3.collect().mkString(","))sc.stop()}}
coalesce算子:数据(shuffle可选)重新分区import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** * 根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率 * 当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本 */object coalesce {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// 默认3个分区val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)// coalesce方法默认情况下不会将分区的数据打乱重新组合,默认shuffer=false// 这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜,如果想要让数据均衡,可以进行shuffle处理// 缩减成2个分区并shufferval newRDD: RDD[Int] = rdd.coalesce(2, true)newRDD.saveAsTextFile("output")sc.stop()}}
repartition算子:数据shuffle重新分区import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** * 该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true 。* 无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD, * repartition操作都可以完成,因为无论如何都会经 shuffle 过程 。* 直接用repartition就行,coalesce就别用了 */object repartition {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3)// coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义,不起作用 。// 所以如果想要实现扩大分区的效果,需要使用shuffle操作/*** 底层就是coalesce* def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {* coalesce(numPartitions, shuffle = true)* }*/// 缩减分区val newRDD1: RDD[Int] = rdd.repartition(2)// 扩大分区val newRDD2: RDD[Int] = rdd.repartition(4)rdd.saveAsTextFile("output0")newRDD1.saveAsTextFile("output1")newRDD2.saveAsTextFile("output2")sc.stop()}}
经验总结扩展阅读
- 惊艳我的朋友圈个性签名 句句高品位的ins风签名
- 祝我的男孩儿子 文案 很潮又短的生日祝福
- 生日文案给儿子 愿我的大男孩生日快乐的句子
- 我的汤姆猫里的2048怎么玩(能玩2048的汤姆猫)
- JVM学习笔记——垃圾回收篇
- 我的世界怎么去月球无模组无指令(我的世界新版怎么去月球)
- 我的世界怎么去月球,我的世界手机版月球传送门怎么做
- 我的世界怎么去月球模组免费(mc月球模组)
- 我的世界虚无3月球怎么去(我的世界虚无世界怎么找传送门)
- 我的世界怎么去月球(我的世界惊变100天)