本篇文章操作基于IDEA的本地测试,如果你还不会如何在IDEA中运行spark,请参考这篇文章 先说说这篇文章说了什么, 如果你对spark有所了解,应该知道spark的几大内置模块: RDD入门程序 在项目的父目录下创建in目录,创建word.txt 图解: rdd简介 RDD 实质上是一种更为通用的迭代并行计算框架,用户可以显示控制计算的中间结果,然后将其自由运用于之后的计算。 在大数据实际应用开发中存在许多迭代算法,如机器学习、图算法等,和交互式数据挖掘工具。这些应用场景的共同之处是在不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。 RDD 正是为了满足这种需求而设计的。虽然 MapReduce 具有自动容错、负载平衡和可拓展性的优点,但是其最大的缺点是采用非循环式的数据流模型,使得在迭代计算时要进行大量的磁盘 I/O 操作。 通过使用 RDD,用户不必担心底层数据的分布式特性,只需要将具体的应用逻辑表达为一系列转换处理,就可以实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘 I/O 和数据序列化的开销。 从集合中创建 从外部创建 注:下划线表示自身 1 map算子 2 mapPartitions(func) 算子 3 mapPartitionsWithIndex(func) **4. flatMap(func) ** (所以func应该返回一个序列,而不是单一元素) 5 glom案例 6 groupBy(func)&&filter 字面意思 7. sample(withReplacement, fraction, seed) 作用:随机抽样 参数说明 需求:创建一个RDD(1-10),从中选择放回和不放回抽样 8. distinct([numTasks])) 案例 9. coalesce(numPartitions) &&repartision 10.sortBy 1. 集合 union(otherDataset)–计算并集 2 cartesian(otherDataset) 结果: 3 . zip(otherDataset)案例 1 partitionBy 案例 2 groupByKey案例 3 reduceByKey(func, [numTasks]) 案例 现在反过头来看经典的统计单词案列,你是否看懂了? 4 reduceByKey和groupByKey的区别 5 aggregateByKey案例 参数:(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U) 6. foldByKey案例 7 sortByKey([ascending], [numTasks]) 8 mapValues案例 9 join(otherDataset, [numTasks]) 案例 10 cogroup(otherDataset, [numTasks]) 案例 样本如下: 加油!!快结束了 1 基本 collect()–在驱动程序中,以数组的形式返回数据集的所有元素。 count()–返回RDD中元素的个数 first ()–返回第一个元素 take(n)–返回前n个元素 takeOrdered(n)—返回该RDD排序后的前n个元素组成的数组 countByKey()—-针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 1 reduce(func)案例 3 . aggregate案例 fold(num)(func)–折叠操作,aggregate的简化操作,seqop和combop一样。 文件操作 恭喜你,终于读完了这个又臭又长的教程!!! 下一篇文章–RDD进阶知识(待更新,学习中)
前言
这篇文章就是基于IDEA,详解spark-Core(围绕RDD展开)
即:
目录
0. 入门经典程序,统计字符
看不懂?没关系,先把后面看懂了,再回来看这个 def main(args: Array[String]): Unit = { //上下文 var conf=new SparkConf().setAppName("WordCount").setMaster("local"); var sc=new SparkContext(conf); sc.setLogLevel("ERROR") //1.读取文件,将文件内容一行一行读取 var lines:RDD[String] = sc.textFile("in/word.txt"); //2将一行一行数据分解成一个个单词 var words:RDD[String]=lines.flatMap(_.split(" ")); //3.将单词数据进行结构转化,便于统计 var wordToOne=words.map((_,1)); //4.两两聚合 var sum=wordToOne.reduceByKey(_+_) //5打印 var result=sum.collect(); result.foreach(println); }
1. RDD
通俗点来讲,可以将 RDD 理解为一个分布式对象集合,本质上是一个只读的分区记录集合。每个 RDD 可以分成多个分区,每个分区就是一个数据集片段。一个 RDD 的不同分区可以保存到集群中的不同结点上,从而可以在集群中的不同结点上进行并行计算。
2.RDD- 创建操作
1)使用parallelize()从集合创建
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8))
2)使用makeRDD()从集合创建
val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6,7,8)) def main(args: Array[String]): Unit = { //上下文 var conf=new SparkConf().setAppName("WordCount").setMaster("local"); var sc=new SparkContext(conf); sc.setLogLevel("ERROR") //1.使用makeRDD创建 var r1=sc.makeRDD(Array(1,2,3,4,5,6)); r1.collect().foreach(println); //2.使用parallelize创建 var r2=sc.parallelize(Array(1,2,3,4,5,6)); r2.collect().foreach(println); }
包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等,
如前面写过的—-var lines:RDD[String] = sc.textFile(“in/word.txt”);3. 转换操作–value类型
若出现x._1,x._2,则x表示key-value键值对
def main(args: Array[String]): Unit = { //上下文 var conf=new SparkConf().setAppName("WordCount").setMaster("local"); var sc=new SparkContext(conf); sc.setLogLevel("ERROR") //创建 var arr=sc.makeRDD(1 to 10); var map=arr.map(_*2); //打印 map.collect().foreach(println); }
def main(args: Array[String]): Unit = { //上下文 var conf=new SparkConf().setAppName("WordCount").setMaster("local"); var sc=new SparkContext(conf); sc.setLogLevel("ERROR") //创建 var arr=sc.makeRDD(1 to 10); // var map=arr.mapPartitions(x=>x.map(_*2)); map.collect().foreach(println); }
def main(args: Array[String]): Unit = { //上下文 var conf=new SparkConf().setAppName("WordCount").setMaster("local"); var sc=new SparkContext(conf); sc.setLogLevel("ERROR") //创建 var arr=sc.makeRDD(1 to 5); // var map=arr.mapPartitionsWithIndex((index,item)=>(item.map((index,_)))); map.collect().foreach(println); }
def main(args: Array[String]): Unit = { //上下文 var conf=new SparkConf().setAppName("WordCount").setMaster("local"); var sc=new SparkContext(conf); sc.setLogLevel("ERROR") //创建 var arr=sc.makeRDD(Array(List(1 , 3),List(2 , 4))); var map=arr.flatMap(x=>x); map.collect().foreach(println); }
def main(args: Array[String]): Unit = { //上下文 var conf=new SparkConf().setAppName("WordCount").setMaster("local"); var sc=new SparkContext(conf); sc.setLogLevel("ERROR") //创建4个分区 var arr=sc.makeRDD(1 to 16,4); var map=arr.glom(); map.collect().foreach(arr=>{ println(arr.mkString(",")); }); }
//groupby var arr=sc.makeRDD(1 to 10); var map=arr.groupBy(_%2); map.collect().foreach(println); //filter var arr1=sc.makeRDD(Array("xiaoming","xiaohong","xiaohuang")); var map1=arr1.filter(x=>(x.contains("ming"))); map1.collect().foreach(println);
var arr=sc.makeRDD(1 to 10); var map=arr.sample(false,0.4,1); map.collect().foreach(println);
var arr=sc.makeRDD(Array(1,2,2,3,4,4,5,6,6)); var map=arr.distinct(); map.collect().mkString(",");
var arr=sc.makeRDD(1 to 16,4); var map=arr.coalesce(3); var m=map.repartition(4); println(map.partitions.size); println(m.partitions.size);
var arr=sc.makeRDD(1 to 16); //按照自身大小从小到大 var map=arr.sortBy(x=>x).collect(); //降序 var map1=arr.sortBy(x=>(x,false)).collect(); //取余3后降序 var map2=arr.sortBy(x=>(x%3,false)).collect();
4转化操作–双value类型
subtract (otherDataset) –计算差集
intersection(otherDataset)–计算交集 var arr1=sc.makeRDD(1 to 3); var arr2=sc.makeRDD(2 to 4); var map=arr1.union(arr2); map.collect().foreach(println); println("-----") var map1=arr1.intersection(arr2); map1.collect().foreach(println);
var arr1=sc.makeRDD(1 to 3); var arr2=sc.makeRDD(2 to 4); var map=arr1.cartesian(arr2); map.collect().foreach(println);
(1,2) (1,3) (1,4) (2,2) (2,3) (2,4) (3,2) (3,3) (3,4)
var arr1=sc.makeRDD(Array("a","b","c")); var arr2=sc.makeRDD(Array(1,2,3)); var map=arr1.zip(arr2); map.collect().foreach(println); 结果: (a,1) (b,2) (c,3)
5 RDD转化操作–key-value类型
var arr1=sc.makeRDD(Array((1,"a"),(2,"b"),(3,"c"),(4,"d")),4); var map=arr1.partitionBy(new org.apache.spark.HashPartitioner(2)); map.collect().foreach(println); --------------------- 结果: (2,b) (4,d) (1,a) (3,c)
//1.获取k-v型rdd var words = Array("one", "two", "two", "three", "three", "three"); var rdd=sc.makeRDD(words).map(x=>(x,1)); //2.根据k进行分组 var group=rdd.groupByKey(); //3.将分组的值进行求和 var result=group.map(x=>(x._1,x._2.sum)); result.collect().foreach(println); ----result (two,2) (one,1) (three,3)
//1.获取k-v型rdd var rdd=sc.parallelize(List(("female",1),("male",5),("female",5),("male",2))); var result=rdd.reduceByKey((x,y)=>(x+y)); result.collect().foreach(println); ------ (male,7) (female,6)
```java def main(args: Array[String]): Unit = { //上下文 var conf=new SparkConf().setAppName("WordCount").setMaster("local"); var sc=new SparkContext(conf); sc.setLogLevel("ERROR") //1.读取文件,将文件内容一行一行读取 var lines:RDD[String] = sc.textFile("in/word.txt"); //2将一行一行数据分解成一个个单词 var words:RDD[String]=lines.flatMap(_.split(" ")); //3.将单词数据进行结构转化,便于统计 var wordToOne=words.map((_,1)); //4.两两聚合 var sum=wordToOne.reduceByKey(_+_) //5打印 var result=sum.collect(); result.foreach(println); }
(1)zeroValue:给每一个分区中的每一个key一个初始值;
(2)seqOp:函数用于在每一个分区中用初始值逐步迭代value;
(3)combOp:函数用于合并每个分区中的结果。
//1.获取k-v型rdd val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2) //取出每个分区相同key对应值的最大值,然后相加 val result=rdd.aggregateByKey(0)(math.max(_,_),_+_); result.collect().foreach(println); ------ (b,3) (a,3) (c,12)
参数:(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
1.作用:和reduceByKey类似
val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c"))); rdd3.mapValues((value)=>(value+"|||||")).collect().foreach(println);
val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"))); val rdd1=sc.parallelize(Array((1,4),(2,5),(3,6))); rdd.join(rdd1).collect().foreach(println); ---------------------- (1,(a,4)) (3,(c,6)) (2,(b,5))
val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"))); val rdd1=sc.parallelize(Array((1,4),(2,5),(3,6))); rdd.cogroup(rdd1).collect().foreach(println); ------- (1,(CompactBuffer(a),CompactBuffer(4))) (3,(CompactBuffer(c),CompactBuffer(6))) (2,(CompactBuffer(b),CompactBuffer(5)))
6. RDD转化操作综合案列–统计广告点击数量
asdf aaa 4 asdf aaa 3 asdf aaa 4 asdf aaa 2 asdf aaa 1 qwer asdf 1 qwer asdf 2 qwer asdf 3 qwer asdf 1 qwer asdf 2
def main(args: Array[String]): Unit = { //上下文 var conf=new SparkConf().setAppName("WordCount").setMaster("local"); var sc=new SparkContext(conf); sc.setLogLevel("ERROR") //读文件 val rdd=sc.textFile("in/word.txt"); //转化成((province,AD),1) val r1=rdd.map(x=>{ val line=x.split(" "); ((line(0),line(2)),1); }) //计算每个省每个广告被点击总数 var r2=r1.reduceByKey(_+_); //转化成 (province,(AD,times)) val r3=r2.map(x=>(x._1._1,(x._1._2,x._2))); //分组 var r4=r3.groupByKey(); //计算前三名的广告 val r5=r4.mapValues(xml=>{ xml.toList.sortWith((x,y)=>{ x._2>y._2 }).take(3); }) //打印 r5.collect().foreach(println); }
7. RDD行为操作
val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5))); val map=rdd2.reduce((x,y)=>{ (x._1+y._1,x._2+y._2); }); println(map); ------- (aacd,12)
val rdd=sc.makeRDD(1 to 10,2); var r=rdd.aggregate(0)(_+_,_+_); println(r); ---- 55
countByKey()案例
作用:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
作用:将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
作用:用于将RDD中的元素序列化成对象,存储到文件中。
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算