Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。 SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。
RDD数据源
普通文本文件
sc.textFile("./dir/*.txt") 如果传递目录,则将目录下的所有文件读取作为RDD。文件路径支持通配符。 但是这样对于大量的小文件读取效率并不高,应该使用wholeTextFiles def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]) 返回值RDD[(String, String)],其中Key是文件的名称,Value是文件的内容。
JDBC[掌握]
package com.itcast import java.sql.DriverManager import org.apache.spark.rdd.JdbcRDD import org.apache.spark.{SparkConf, SparkContext} object JdbcSelect { def main(args: Array[String]): Unit = { //1.创建SparkContext val conf = new SparkConf().setAppName("Select").setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") /** * 创建一个连接池 * @return */ def getSelect()={ DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root") } /** * sc SprakContext * getSelect 数据库连接 * lowerBound UpperBound Numpartition 为查找范围 * sql 为sql语句 * rs 对查找的结果进行处理的函数 */ val StudentRdd: JdbcRDD[(Int, String, String,String,String)] = new JdbcRDD(sc, getSelect, "select * from user where id>? and id<? ", //要从哪个地方开始 0, //从哪个地方结束 100, //分区 5, rs => { //将结果进行封装 val id: Int = rs.getInt("id") val username: String = rs.getString("username") val birthday: String = rs.getString("birthday") val sex: String = rs.getString("sex") val address: String = rs.getString("address") (id, username, birthday,sex,address) }) println(StudentRdd.collect().toBuffer) } }
package com.itcast import java.sql.DriverManager import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object JdbcInsert { def main(args: Array[String]): Unit = { // 创建SparkConfig val conf = new SparkConf().setAppName("Insert").setMaster("local[*]") //创建Spark上下文对象 val sc = new SparkContext(conf) sc.setLogLevel("WARN") //创建要插入的数据 val data: RDD[(String, String, String, String)] = sc.parallelize(List(("小明", "2019-07-01","女","河北省"),("小花", "2019-05-01","男","河南省"),("小气", "2020-04-07","女","河北省"))) //调用ForeachPartition针对每一个分区进行操作 data.foreachPartition(saveToMySQL) def saveToMySQL(partitionData:Iterator[(String,String,String,String)])={ //将数据存入到Mysql //获取连接 val conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root") partitionData.foreach(data=>{ //将每一条数据存入到Mysql val sql="INSERT INTO `user` (`id`, `username`, `birthday`,`sex`,`address`) VALUES (NULL, ?, ?,?,?);" //封装sql语句 val ps = conn.prepareStatement(sql) //封装数据 ps.setString(1,data._1) ps.setString(2,data._2) ps.setString(3,data._3) ps.setString(4,data._4) //执行 ps.execute() }) conn.close() } } }
HadoopAPI[了解]
HadoopRDD、newAPIHadoopRDD、saveAsHadoopFile、saveAsNewAPIHadoopFile 是底层API
其他的API接口都是为了方便最终的Spark程序开发者而设置的,是这两个接口的高效实现版本.
package com.itcast import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object HadoopApi { def main(args: Array[String]): Unit = { val config = new SparkConf().setAppName("DataSourceTest").setMaster("local[*]") val sc = new SparkContext(config) sc.setLogLevel("WARN") System.setProperty("HADOOP_USER_NAME", "root") //1.HadoopAPI println("HadoopAPI") val dataRDD = sc.parallelize(Array((1,"hadoop"), (2,"hive"), (3,"spark"))) dataRDD.saveAsNewAPIHadoopFile("hdfs://node01:8020/spark_hadoop/", classOf[LongWritable], classOf[Text], classOf[TextOutputFormat[LongWritable, Text]]) val inputRDD = sc.newAPIHadoopFile( "hdfs://node01:8020/spark_hadoop/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf = sc.hadoopConfiguration ) inputRDD.map(_._2.toString).foreach(println) /* //2.读取小文件 println("读取小文件") val filesRDD: RDD[(String, String)] = sc.wholeTextFiles("D:\data\spark\files", minPartitions = 3) val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\r\n")) val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" ")) wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println) //3.操作SequenceFile println("SequenceFile") val dataRDD2: RDD[(Int, String)] = sc.parallelize(List((2, "aa"), (3, "bb"), (4, "cc"), (5, "dd"), (6, "ee"))) dataRDD2.saveAsSequenceFile("D:\data\spark\SequenceFile") val sdata: RDD[(Int, String)] = sc.sequenceFile[Int, String]("D:\data\spark\SequenceFile\*") sdata.collect().foreach(println) //4.操作ObjectFile println("ObjectFile") val dataRDD3 = sc.parallelize(List((2, "aa"), (3, "bb"), (4, "cc"), (5, "dd"), (6, "ee"))) dataRDD3.saveAsObjectFile("D:\data\spark\ObjectFile") val objRDD = sc.objectFile[(Int, String)]("D:\data\spark\ObjectFile\*") objRDD.collect().foreach(println) */ sc.stop() } }
SequenceFile文件[了解]
读sc.sequenceFile[ keyClass, valueClass](path) 写RDD.saveAsSequenceFile(path) 要求键和值能够自动转为Writable类型。
对象文件[了解]
对象文件是将对象序列化后保存的文件 读sc.objectFile[k,v](path) //因为是序列化所以要指定类型 写RDD.saveAsObjectFile()
HBase[了解]
由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat 类的实现,Spark 可以通过Hadoop输入格式访问HBase。 这个输入格式会返回键值对数据, 其中键的类型为org. apache.hadoop.hbase.io.ImmutableBytesWritable, 而值的类型为org.apache.hadoop.hbase.client.Result。 https://github.com/teeyog/blog/issues/22
package com.itcast import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object HbaseCreate { def main(args: Array[String]): Unit = { val config = new SparkConf().setAppName("DataSourceTest").setMaster("local[*]") val sc = new SparkContext(config) sc.setLogLevel("WARN") //连接habse val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181") //创建表 val studentTable = TableName.valueOf("student") val tableDescr = new HTableDescriptor(studentTable) //添加列族 tableDescr.addFamily(new HColumnDescriptor("info".getBytes)) //创建管理类 val admin = new HBaseAdmin(conf) //判断该表是否存在 if (admin.tableExists(studentTable)) { admin.disableTable(studentTable) admin.deleteTable(studentTable) } //创建表 admin.createTable(tableDescr) val textFile: RDD[String] = sc.textFile("input20200408/student.txt") val put: RDD[(ImmutableBytesWritable, Put)] = textFile.map { case line => { val datas = line.split("t") val rowkey = Bytes.toBytes(datas(0)) val put = new Put(rowkey) put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(datas(0))) put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("class"), Bytes.toBytes(datas(1))) put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sex"), Bytes.toBytes(datas(2))) put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("province"), Bytes.toBytes(datas(3))) (new ImmutableBytesWritable(rowkey), put) } } val jobConf = new JobConf(conf) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "student") put.saveAsHadoopDataset(jobConf) println("Spark向Hbase写入数据成功") } }
package com.itcast import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object HbaseInput { def main(args: Array[String]): Unit = { val config = new SparkConf().setAppName("DataSourceTest").setMaster("local[*]") val sc = new SparkContext(config) sc.setLogLevel("WARN") //连接habse val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181") //读取数据 conf.set(TableInputFormat.INPUT_TABLE, "student") val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) val count = hbaseRDD.count() println("HbaseRDD RDD Count:"+count) hbaseRDD.foreach{ case (_,result)=> val key=Bytes.toString(result.getRow) val name =Bytes.toString(result.getValue("info".getBytes,"name".getBytes)) val classz =Bytes.toString(result.getValue("info".getBytes,"class".getBytes)) val sex =Bytes.toString(result.getValue("info".getBytes,"sex".getBytes)) val province =Bytes.toString(result.getValue("info".getBytes,"province".getBytes)) println(s"Row Kye:${key} name:${name} Class:${classz} Sex:${sex} Province:${province}") } sc.stop() } }
扩展阅读
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算