如何创建RDD?
创建RDD有三种方式
基于集合创建RDD:使用sparkContext的parallelize()方法,第一个参数传入集合,第二个参数传入partition数量。Spark会为每个partition执行一个task
1
2
3val spark = SparkSession.builder().getOrCreate()
val arr = Array(1,2,3,4)
val rdd = spark.sparkContext.parallelize(arr,4) //基于Array创建一个4分区的rdd基于本地或HDFS文件创建RDD:使用sparkContext的textFile()方法,第一个参数传入文件路径,第二个参数传入partition数量
1
2val spark = SparkSession.builder().appName("WordCount").getOrCreate()
val text = spark.sparkContext.textFile("/path/words.txt",3)
Spark中对RDD的操作有哪些?
- 在Spark中,对RDD的操作只有两种,Transformation 和 Action
- Transformation
- 是对已有的RDD转化为新的RDD,如flatMap、Map等操作
- lazy特性,在没有执行Action之前,所有的操作都只是得到一个逻辑上的RDD,内存中没有任何数据
- Action
- 是对RDD最后的操作,如foreach,reduce,返回结果给Driver进程等操作
- 只有当执行到Action代码,才会触发之前所有的Transformation算子的执行
Transformation算子实战
1 | val sc = SparkSession.builder().getOrCreate().sparkContext |
Action算子实战
1 | val sc = SparkSession.builder().getOrCreate().sparkContext |