常用命令 #停止kafka #查看topic信息 ★面试题:Receiver & Direct 开发中我们经常会利用SparkStreaming实时地读取kafka中的数据然后进行处理,在spark1.3版本后,kafkaUtils里面提供了两种创建DStream的方法: 1.Receiver接收方式: KafkaUtils.createDstream(开发中不用,了解即可,但是面试可能会问) Receiver作为常驻的Task运行在Executor等待数据,但是一个Receiver效率低,需要开启多个,再手动合并数据(union),再进行处理,很麻烦 Receiver哪台机器挂了,可能会丢失数据,所以需要开启WAL(预写日志)保证数据安全,那么效率又会降低! Receiver方式是通过zookeeper来连接kafka队列,调用Kafka高阶API,offset存储在zookeeper,由Receiver维护, spark在消费的时候为了保证数据不丢也会在Checkpoint中存一份offset,可能会出现数据不一致 所以不管从何种角度来说,Receiver模式都不适合在开发中使用了,已经淘汰了 2.Direct直连方式: KafkaUtils.createDirectStream(开发中使用,要求掌握) Direct方式是直接连接kafka分区来获取数据,从每个分区直接读取数据大大提高了并行能力 Direct方式调用Kafka低阶API(底层API),offset自己存储和维护,默认由Spark维护在checkpoint中,消除了与zk不一致的情况 当然也可以自己手动维护,把offset存在mysql、redis中 所以基于Direct模式可以在开发中使用,且借助Direct模式的特点+手动操作可以保证数据的Exactly once 精准一次 总结: Receiver接收方式 Direct直连方式 实现方式 消息语义 存在的问题 Receiver at most once 最多被处理一次 会丢失数据 Receiver+WAL at least once 最少被处理一次 不会丢失数据,但可能会重复消费,且效率低 Direct+手动操作 exactly once 只被处理一次/精准一次 不会丢失数据,也不会重复消费,且效率高 ●注意: 开发中SparkStreaming和kafka集成有两个版本:0.8及0.10+ 0.8版本有Receiver和Direct模式(但是0.8版本生产环境问题较多,在Spark2.3之后不支持0.8版本了) 0.10以后只保留了direct模式(Reveiver模式不适合生产环境),并且0.10版本API有变化(更加强大) ●结论: 我们学习和开发都直接使用0.10版本中的direct模式 但是关于Receiver和Direct的区别面试的时候要能够答得上来 ●说明 spark-streaming-kafka-0-10版本中,API有一定的变化,操作更加灵活,开发中使用 ●pom.xml <!–<dependency> <dependency> ●API https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html ●创建topic /export/servers/kafka/bin/kafka-topics.sh –create –zookeeper node01:2181 –replication-factor 3 –partitions 3 –topic spark_kafka ●启动生产者 /export/servers/kafka/bin/kafka-console-producer.sh –broker-list node01:9092,node01:9092,node01:9092 –topic spark_kafka ●API https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html ●启动生产者 /export/servers/kafka/bin/kafka-console-producer.sh –broker-list node01:9092,node01:9092,node01:9092 –topic spark_kafka kafka回顾!!!上图完事!
#启动kafka
/export/servers/kafka/bin/kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties
/export/servers/kafka/bin/kafka-server-stop.sh
/export/servers/kafka/bin/kafka-topics.sh –list –zookeeper node01:2181
#创建topic
/export/servers/kafka/bin/kafka-topics.sh –create –zookeeper node01:2181 –replication-factor 3 –partitions 3 –topic test
#查看某个topic信息
/export/servers/kafka/bin/kafka-topics.sh –describe –zookeeper node01:2181 –topic test
#删除topic
/export/servers/kafka/bin/kafka-topics.sh –zookeeper node01:2181 –delete –topic test
#启动生产者–控制台的生产者一般用于测试
/export/servers/kafka/bin/kafka-console-producer.sh –broker-list node01:9092 –topic spark_kafka
#启动消费者–控制台的消费者一般用于测试
/export/servers/kafka/bin/kafka-console-consumer.sh –zookeeper node01:2181 –topic spark_kafka–from-beginning
# 消费者连接到borker的地址
/export/servers/kafka/bin/kafka-console-consumer.sh –bootstrap-server node01:9092,node02:9092,node03:9092 –topic spark_kafka –from-beginning
spark整合Kafka两种模式说明
spark-streaming-kafka-0-10
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>–>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>代码他来咯
package cn.itcast.streaming import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object SparkKafkaDemo { def main(args: Array[String]): Unit = { //1.创建StreamingContext //spark.master should be set as local[n], n > 1 val conf = new SparkConf().setAppName("wc").setMaster("local[*]") val sc = new SparkContext(conf)3 sc.setLogLevel("WARN") val ssc = new StreamingContext(sc,Seconds(5))//5表示5秒中对数据进行切分形成一个RDD //准备连接Kafka的参数 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "SparkKafkaDemo", //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 //none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 //这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费 "auto.offset.reset" -> "latest", //false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护 "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("spark_kafka") //2.使用KafkaUtil连接Kafak获取数据 val recordDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent,//位置策略,源码强烈推荐使用该策略,会让Spark的Executor和Kafka的Broker均匀对应 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))//消费策略,源码强烈推荐使用该策略 //3.获取VALUE数据 val lineDStream: DStream[String] = recordDStream.map(_.value())//_指的是ConsumerRecord val wrodDStream: DStream[String] = lineDStream.flatMap(_.split(" ")) //_指的是发过来的value,即一行数据 val wordAndOneDStream: DStream[(String, Int)] = wrodDStream.map((_,1)) val result: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_) result.print() ssc.start()//开启 ssc.awaitTermination()//等待优雅停止 } }
Kafka手动维护偏移量
package cn.itcast.streaming import java.sql.{DriverManager, ResultSet} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{OffsetRange, _} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable object SparkKafkaDemo2 { def main(args: Array[String]): Unit = { //1.创建StreamingContext //spark.master should be set as local[n], n > 1 val conf = new SparkConf().setAppName("wc").setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val ssc = new StreamingContext(sc,Seconds(5))//5表示5秒中对数据进行切分形成一个RDD //准备连接Kafka的参数 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "SparkKafkaDemo", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("spark_kafka") //2.使用KafkaUtil连接Kafak获取数据 //注意: //如果MySQL中没有记录offset,则直接连接,从latest开始消费 //如果MySQL中有记录offset,则应该从该offset处开始消费 val offsetMap: mutable.Map[TopicPartition, Long] = OffsetUtil.getOffsetMap("SparkKafkaDemo","spark_kafka") val recordDStream: InputDStream[ConsumerRecord[String, String]] = if(offsetMap.size > 0){//有记录offset println("MySQL中记录了offset,则从该offset处开始消费") KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent,//位置策略,源码强烈推荐使用该策略,会让Spark的Executor和Kafka的Broker均匀对应 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offsetMap))//消费策略,源码强烈推荐使用该策略 }else{//没有记录offset println("没有记录offset,则直接连接,从latest开始消费") // /export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092 --topic spark_kafka KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent,//位置策略,源码强烈推荐使用该策略,会让Spark的Executor和Kafka的Broker均匀对应 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))//消费策略,源码强烈推荐使用该策略 } //3.操作数据 //注意:我们的目标是要自己手动维护偏移量,也就意味着,消费了一小批数据就应该提交一次offset //而这一小批数据在DStream的表现形式就是RDD,所以我们需要对DStream中的RDD进行操作 //而对DStream中的RDD进行操作的API有transform(转换)和foreachRDD(动作) recordDStream.foreachRDD(rdd=>{ if(rdd.count() > 0){//当前这一时间批次有数据 rdd.foreach(record => println("接收到的Kafk发送过来的数据为:" + record)) //接收到的Kafk发送过来的数据为:ConsumerRecord(topic = spark_kafka, partition = 1, offset = 6, CreateTime = 1565400670211, checksum = 1551891492, serialized key size = -1, serialized value size = 43, key = null, value = hadoop spark ...) //注意:通过打印接收到的消息可以看到,里面有我们需要维护的offset,和要处理的数据 //接下来可以对数据进行处理....或者使用transform返回和之前一样处理 //处理数据的代码写完了,就该维护offset了,那么为了方便我们对offset的维护/管理,spark提供了一个类,帮我们封装offset的数据 val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (o <- offsetRanges){ println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset}") } //手动提交offset,默认提交到Checkpoint中 //recordDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) //实际中偏移量可以提交到MySQL/Redis中 OffsetUtil.saveOffsetRanges("SparkKafkaDemo",offsetRanges) } }) /* val lineDStream: DStream[String] = recordDStream.map(_.value())//_指的是ConsumerRecord val wrodDStream: DStream[String] = lineDStream.flatMap(_.split(" ")) //_指的是发过来的value,即一行数据 val wordAndOneDStream: DStream[(String, Int)] = wrodDStream.map((_,1)) val result: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_) result.print()*/ ssc.start()//开启 ssc.awaitTermination()//等待优雅停止 } /* 手动维护offset的工具类 首先在MySQL创建如下表 CREATE TABLE `t_offset` ( `topic` varchar(255) NOT NULL, `partition` int(11) NOT NULL, `groupid` varchar(255) NOT NULL, `offset` bigint(20) DEFAULT NULL, PRIMARY KEY (`topic`,`partition`,`groupid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; */ object OffsetUtil { //从数据库读取偏移量 def getOffsetMap(groupid: String, topic: String) = { val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root") val pstmt = connection.prepareStatement("select * from t_offset where groupid=? and topic=?") pstmt.setString(1, groupid) pstmt.setString(2, topic) val rs: ResultSet = pstmt.executeQuery() val offsetMap = mutable.Map[TopicPartition, Long]() while (rs.next()) { offsetMap += new TopicPartition(rs.getString("topic"), rs.getInt("partition")) -> rs.getLong("offset") } rs.close() pstmt.close() connection.close() offsetMap } //将偏移量保存到数据库 def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = { val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root") //replace into表示之前有就替换,没有就插入 val pstmt = connection.prepareStatement("replace into t_offset (`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)") for (o <- offsetRange) { pstmt.setString(1, o.topic) pstmt.setInt(2, o.partition) pstmt.setString(3, groupid) pstmt.setLong(4, o.untilOffset) pstmt.executeUpdate() } pstmt.close() connection.close() } } }
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算