作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD 作用:类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。 需求:创建一个RDD,使每个元素*2组成新的RDD 作用:类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]; 需求:创建一个RDD,使每个元素跟所在分区形成一个元组组成一个新的RDD 作用:类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) 需求:创建一个元素为1-5的RDD,运用flatMap创建一个新的RDD,新的RDD为原RDD的每个元素的2倍(2,4,6,8,10) 作用:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]] 需求:创建一个4个分区的RDD,并将每个分区的数据放到一个数组 作用:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。 需求:创建一个RDD,按照元素模以2的值进行分组。 作用:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。 需求:创建一个RDD(由字符串组成),过滤出一个新RDD(包含”xiao”子串) 作用:以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。 需求:创建一个RDD(1-10),从中选择放回和不放回抽样 作用:对源RDD进行去重后返回一个新的RDD。默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。 需求:创建一个RDD,使用distinct()对其去重。 作用:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。 需求:创建一个4个分区的RDD,对其缩减分区 作用:根据分区数,重新通过网络随机洗牌所有数据。 需求:创建一个4个分区的RDD,对其重新分区 coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。 repartition实际上是调用的coalesce,默认是进行shuffle的。源码如下: 作用:使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。 需求:创建一个RDD,按照不同的规则进行排序 作用:管道,针对每个分区,都执行一个shell脚本,返回输出的RDD。 需求:编写一个脚本,使用管道将脚本作用于RDD上。 1、 2、对于Scala集合调用parallelize( 3、对于textFile(文件,分区数) defaultMinPartitions 4、rdd的分区数 对于本地文件: 对于HDFS文件: 所以如果分配的核数为多个,且从文件中读取数据创建RDD,即使hdfs文件只有1个切片,最后的Spark的RDD的partition数也有可能是2目录
RDD整体上分为Value类型和Key-Value类型
map(func)案例
(1)创建 scala> var source = sc.parallelize(1 to 10) source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24 (2)打印 scala> source.collect() res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) (3)将所有元素*2 scala> val mapadd = source.map(_ * 2) mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:26 (4)打印最终结果 scala> mapadd.collect() res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
mapPartitions(func) 案例
(1)创建一个RDD scala> val rdd = sc.parallelize(Array(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24 (2)使每个元素*2组成新的RDD scala> rdd.mapPartitions(x=>x.map(_*2)) res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:27 (3)打印新的RDD scala> res3.collect res4: Array[Int] = Array(2, 4, 6, 8)
mapPartitionsWithIndex(func) 案例
(1)创建一个RDD scala> val rdd = sc.parallelize(Array(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24 (2)使每个元素跟所在分区形成一个元组组成一个新的RDD scala> val indexRdd = rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_)))) indexRdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at mapPartitionsWithIndex at <console>:26 (3)打印新的RDD scala> indexRdd.collect res2: Array[(Int, Int)] = Array((0,1), (0,2), (1,3), (1,4))
flatMap(func) 案例
(1)创建 scala> val sourceFlat = sc.parallelize(1 to 5) sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24 (2)打印 scala> sourceFlat.collect() res11: Array[Int] = Array(1, 2, 3, 4, 5) (3)根据原RDD创建新RDD(1->1,2->1,2……5->1,2,3,4,5) scala> val flatMap = sourceFlat.flatMap(1 to _) flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at flatMap at <console>:26 (4)打印新RDD scala> flatMap.collect() res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)
map()和mapPartition()的区别
glom案例
(1)创建 scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24 (2)将每个分区的数据放到一个数组并收集到Driver端打印 scala> rdd.glom().collect() res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))
groupBy(func)案例
(1)创建 scala> val rdd = sc.parallelize(1 to 4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24 (2)按照元素模以2的值进行分组 scala> val group = rdd.groupBy(_%2) group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:26 (3)打印结果 scala> group.collect res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))
filter(func) 案例
(1)创建 scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi")) sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24 (2)打印 scala> sourceFilter.collect() res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi) (3)过滤出含” xiao”子串的形成一个新的RDD scala> val filter = sourceFilter.filter(_.contains("xiao")) filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26 (4)打印新RDD scala> filter.collect() res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)
sample(withReplacement, fraction, seed) 案例
(1)创建RDD scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24 (2)打印 scala> rdd.collect() res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) (3)放回抽样 scala> var sample1 = rdd.sample(true,0.4,2) sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[21] at sample at <console>:26 (4)打印放回抽样结果 scala> sample1.collect() res16: Array[Int] = Array(1, 2, 2, 7, 7, 8, 9) (5)不放回抽样 scala> var sample2 = rdd.sample(false,0.2,3) sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[22] at sample at <console>:26 (6)打印不放回抽样结果 scala> sample2.collect() res17: Array[Int] = Array(1, 9)
distinct([numTasks])) 案例
(1)创建一个RDD scala> val distinctRdd = sc.parallelize(List(1,2,1,5,2,9,6,1)) distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24 (2)对RDD进行去重(不指定并行度) scala> val unionRDD = distinctRdd.distinct() unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at distinct at <console>:26 (3)打印去重后生成的新RDD scala> unionRDD.collect() res20: Array[Int] = Array(1, 9, 5, 6, 2) (4)对RDD(指定并行度为2) scala> val unionRDD = distinctRdd.distinct(2) unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at distinct at <console>:26 (5)打印去重后生成的新RDD scala> unionRDD.collect() res21: Array[Int] = Array(6, 2, 1, 9, 5)
coalesce(numPartitions) 案例
(1)创建一个RDD scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at <console>:24 (2)查看RDD的分区数 scala> rdd.partitions.size res20: Int = 4 (3)对RDD重新分区 scala> val coalesceRDD = rdd.coalesce(3) coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[55] at coalesce at <console>:26 (4)查看新RDD的分区数 scala> coalesceRDD.partitions.size res21: Int = 3
repartition(numPartitions) 案例
(1)创建一个RDD scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at <console>:24 (2)查看RDD的分区数 scala> rdd.partitions.size res22: Int = 4 (3)对RDD重新分区 scala> val rerdd = rdd.repartition(2) rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[60] at repartition at <console>:26 (4)查看新RDD的分区数 scala> rerdd.partitions.size res23: Int = 2
coalesce和repartition的区别
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }
sortBy(func,[ascending], [numTasks]) 案例
(1)创建一个RDD scala> val rdd = sc.parallelize(List(2,1,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24 (2)按照自身大小排序 scala> rdd.sortBy(x => x).collect() res11: Array[Int] = Array(1, 2, 3, 4) (3)按照与3余数的大小排序 scala> rdd.sortBy(x => x%3).collect() res12: Array[Int] = Array(3, 4, 1, 2)
pipe(command, [envVars]) 案例
注意:脚本需要放在Worker节点可以访问到的位置
(1)编写一个脚本 Shell脚本 #!/bin/sh echo "AA" while read LINE; do echo ">>>"${LINE} done (2)创建一个只有一个分区的RDD scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),1) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24 (3)将脚本作用该RDD并打印 scala> rdd.pipe("/opt/module/spark/pipe.sh").collect() res18: Array[String] = Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you) (4)创建一个有两个分区的RDD scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),2) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[52] at parallelize at <console>:24 (5)将脚本作用该RDD并打印 scala> rdd.pipe("/opt/module/spark/pipe.sh").collect() res19: Array[String] = Array(AA, >>>hi, >>>Hello, AA, >>>how, >>>are, >>>you)
RDD的分区原则
启动的时候指定的CPU核数
确定了一个参数值:
集合,分区数
)方法
如果没有指定分区数,就使用spark.default.parallelism,
如果指定了就使用指定的分区数(不要指定大于spark.default.parallelism)
如果没有指定分区数sc.defaultMinPartitions=min(defaultParallelism,2)
如果指定了就使用指定的分区数sc.defaultMinPartitions=指定的分区数
rdd的分区数 = max(本地file的分片数, sc.defaultMinPartitions)
rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions)
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算