spark 2.0中初步提供了一些内置的source支持。 准备工作: spark应用可以监听某一个目录,而web服务在这个目录上实时产生日志文件,这样对于spark应用来说,日志文件就是实时数据 获得到Source之后的基本数据处理方式和之前学习的DataFrame、DataSet一致,不再赘述 计算结果可以选择输出到多种设备并进行如下设定
创建Source
Socket source (for testing): 从socket连接中读取文本内容。
File source: 以数据流的方式读取一个目录中的文件。支持text、csv、json、parquet等文件类型。
Kafka source: 从Kafka中拉取数据,与0.10或以上的版本兼容,后面单独整合Kafka读取Socket数据
nc -lk 9999
hadoop spark sqoop hadoop spark hive hadoop代码演示
package cn.itcast.structedstreaming import org.apache.spark.SparkContext import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object WordCount { def main(args: Array[String]): Unit = { //1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSet val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.接收数据 val dataDF: DataFrame = spark.readStream .option("host", "node01") .option("port", 9999) .format("socket") .load() //3.处理数据 import spark.implicits._ val dataDS: Dataset[String] = dataDF.as[String] val wordDS: Dataset[String] = dataDS.flatMap(_.split(" ")) val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc) //result.show() //Queries with streaming sources must be executed with writeStream.start(); result.writeStream .format("console")//往控制台写 .outputMode("complete")//每次将所有的数据写出 .trigger(Trigger.ProcessingTime(0))//触发时间间隔,0表示尽可能的快 //.option("checkpointLocation","./ckp")//设置checkpoint目录,socket不支持数据恢复,所以第二次启动会报错,需要注掉 .start()//开启 .awaitTermination()//等待停止 } }
读取目录下文本数据
Structured Streaming支持的文件类型有text,csv,json,parquet
在people.json文件输入如下数据: {"name":"json","age":23,"hobby":"running"} {"name":"charles","age":32,"hobby":"basketball"} {"name":"tom","age":28,"hobby":"football"} {"name":"lili","age":24,"hobby":"running"} {"name":"bob","age":20,"hobby":"swimming"} 注意:文件必须是被移动到目录中的,且文件名不能有特殊字符
使用Structured Streaming统计年龄小于25岁的人群的爱好排行榜代码演示
package cn.itcast.structedstreaming import org.apache.spark.SparkContext import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** * {"name":"json","age":23,"hobby":"running"} * {"name":"charles","age":32,"hobby":"basketball"} * {"name":"tom","age":28,"hobby":"football"} * {"name":"lili","age":24,"hobby":"running"} * {"name":"bob","age":20,"hobby":"swimming"} * 统计年龄小于25岁的人群的爱好排行榜 */ object WordCount2 { def main(args: Array[String]): Unit = { //1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSet val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") val Schema: StructType = new StructType() .add("name","string") .add("age","integer") .add("hobby","string") //2.接收数据 import spark.implicits._ // Schema must be specified when creating a streaming source DataFrame. val dataDF: DataFrame = spark.readStream.schema(Schema).json("D:\data\spark\data") //3.处理数据 val result: Dataset[Row] = dataDF.filter($"age" < 25).groupBy("hobby").count().sort($"count".desc) //4.输出结果 result.writeStream .format("console") .outputMode("complete") .trigger(Trigger.ProcessingTime(0)) .start() .awaitTermination() } }
计算操作
输出
output mode
每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。
这里有三种输出模型:
output sink
使用说明
File sink 输出到路径
//支持parquet文件,以及append模式 writeStream .format("parquet") // can be "orc", "json", "csv", etc. .option("path", "path/to/destination/dir") .start()
Kafka sink 输出到kafka内的一到多个topic
writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "updates") .start()
Foreach sink 对输出中的记录运行任意计算。
writeStream .foreach(...) .start()
Console sink (for debugging) 当有触发器时,将输出打印到控制台。
writeStream .format("console") .start()
Memory sink (for debugging) – 输出作为内存表存储在内存中.
writeStream .format("memory") .queryName("tableName") .start()
官网示例代码
// ========== DF with no aggregations ========== val noAggDF = deviceDataDf.select("device").where("signal > 10") // Print new data to console noAggDF.writeStream.format("console").start() // Write new data to Parquet files noAggDF.writeStream.format("parquet").option("checkpointLocation", "path/to/checkpoint/dir").option("path", "path/to/destination/dir").start() // ========== DF with aggregation ========== val aggDF = df.groupBy("device").count() // Print updated aggregations to console aggDF.writeStream.outputMode("complete").format("console").start() // Have all the aggregates in an in-memory table aggDF.writeStream.queryName("aggregates").outputMode("complete").format("memory").start() spark.sql("select * from aggregates").show() // interactively query in-memory table
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算