spark 2.0中初步提供了一些内置的source支持。 Socket source (for testing): 从socket连接中读取文本内容。 File source: 以数据流的方式读取一个目录中的文件。支持text、csv、json、parquet等文件类型。 Kafka source: 从Kafka中拉取数据,与0.10或以上的版本兼容,后面单独整合Kafka ●准备工作 nc -lk 9999 hadoop spark sqoop hadoop spark hive hadoop spark应用可以监听某一个目录,而web服务在这个目录上实时产生日志文件,这样对于spark应用来说,日志文件就是实时数据 Structured Streaming支持的文件类型有text,csv,json,parquet ●准备工作 在people.json文件输入如下数据: {“name”:”json”,”age”:23,”hobby”:”running”} {“name”:”charles”,”age”:32,”hobby”:”basketball”} {“name”:”tom”,”age”:28,”hobby”:”football”} {“name”:”lili”,”age”:24,”hobby”:”running”} {“name”:”bob”,”age”:20,”hobby”:”swimming”} 注意:文件必须是被移动到目录中的,且文件名不能有特殊字符 ●需求 使用Structured Streaming统计年龄小于25岁的人群的爱好排行榜 ●使用说明 Kafka sink 输出到kafka内的一到多个topic Foreach sink 对输出中的记录运行任意计算。 Console sink (for debugging) 当有触发器时,将输出打印到控制台。 Memory sink (for debugging) – 输出作为内存表存储在内存中. ●官网示例代码 Structured Streaming输出详解
1.output mode:以哪种方式将result table的数据写入sink 2.format/output sink的一些细节:数据格式、位置等。 3.query name:指定查询的标识。类似tempview的名字 4.trigger interval:触发间隔,如果不指定,默认会尽可能快速地处理数据 5.checkpoint地址:一般是hdfs上的目录。注意:Socket不支持数据恢复,如果设置了,第二次启动会报错 ,Kafka支持
output mode
1.Append mode:输出新增的行,默认模式。每次更新结果集时,只将新添加到结果集的结果行输出到接收器。仅支持添加到结果表中的行永远不会更改的查询。因此,此模式保证每行仅输出一次。例如,仅查询select,where,map,flatMap,filter,join等会支持追加模式。不支持聚合 2.Complete mode: 所有内容都输出,每次触发后,整个结果表将输出到接收器。聚合查询支持此功能。仅适用于包含聚合操作的查询。 3.Update mode: 输出更新的行,每次更新结果集时,仅将被更新的结果行输出到接收器(自Spark 2.1.1起可用),不支持排序
读取Socket实时数据
创建Source
读取Socket数据
代码来咯
package cn.itcast.structedstreaming import org.apache.spark.SparkContext import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object WordCount { def main(args: Array[String]): Unit = { //1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSet val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.接收数据 val dataDF: DataFrame = spark.readStream .option("host", "node01") .option("port", 9999) .format("socket") .load() //3.处理数据 import spark.implicits._ val dataDS: Dataset[String] = dataDF.as[String] val wordDS: Dataset[String] = dataDS.flatMap(_.split(" ")) val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc) //result.show() //Queries with streaming sources must be executed with writeStream.start(); result.writeStream .format("console")//往控制台写 .outputMode("complete")//每次将所有的数据写出 .trigger(Trigger.ProcessingTime(0))//触发时间间隔,0表示尽可能的快 //.option("checkpointLocation","./ckp")//设置checkpoint目录,socket不支持数据恢复,所以第二次启动会报错,需要注掉 .start()//开启 .awaitTermination()//等待停止 } }
读取目录下文本数据
代码来咯
package cn.itcast.structedstreaming import org.apache.spark.SparkContext import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** * {"name":"json","age":23,"hobby":"running"} * {"name":"charles","age":32,"hobby":"basketball"} * {"name":"tom","age":28,"hobby":"football"} * {"name":"lili","age":24,"hobby":"running"} * {"name":"bob","age":20,"hobby":"swimming"} * 统计年龄小于25岁的人群的爱好排行榜 */ object WordCount2 { def main(args: Array[String]): Unit = { //1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSet val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") val Schema: StructType = new StructType() .add("name","string") .add("age","integer") .add("hobby","string") //2.接收数据 import spark.implicits._ // Schema must be specified when creating a streaming source DataFrame. val dataDF: DataFrame = spark.readStream.schema(Schema).json("D:\data\spark\data") //3.处理数据 val result: Dataset[Row] = dataDF.filter($"age" < 25).groupBy("hobby").count().sort($"count".desc) //4.输出结果 result.writeStream .format("console") .outputMode("complete") .trigger(Trigger.ProcessingTime(0)) .start() .awaitTermination() } }
输出 output sink
File sink 输出到路径
支持parquet文件,以及append模式
writeStream .format("parquet") // can be "orc", "json", "csv", etc. .option("path", "path/to/destination/dir") .start()
writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "updates") .start()
writeStream .foreach(...) .start()
writeStream .format("console") .start()
writeStream .format("memory") .queryName("tableName") .start()
// ========== DF with no aggregations ========== val noAggDF = deviceDataDf.select("device").where("signal > 10") // Print new data to console noAggDF.writeStream.format("console").start() // Write new data to Parquet files noAggDF.writeStream.format("parquet").option("checkpointLocation", "path/to/checkpoint/dir").option("path", "path/to/destination/dir").start() // ========== DF with aggregation ========== val aggDF = df.groupBy("device").count() // Print updated aggregations to console aggDF.writeStream.outputMode("complete").format("console").start() // Have all the aggregates in an in-memory table aggDF.writeStream.queryName("aggregates").outputMode("complete").format("memory").start() spark.sql("select * from aggregates").show() // interactively query in-memory table
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算