Spark Streaming消费Kafka的数据有两种模式:Receiver和Direct模式,学习时候重点关注下Direct即可,因为在最新读取方式中已经不支持Receiver。 在Spark 1.3之前,Spark Streaming消费Kafka中的数据采用基于Kafka高级消费API实现的Receiver模式,如图1所示。首先是Receiver从Kafka中消费数据并存储到Spark Executor中,然后由Spark Streaming启动的Job将处理数据。为了保证高可用,可以启用Spark Streaming的预写日志机制(Write Ahead Log,WAL),将接收到的Kafka数据同步写到外部存储中(如HDFS)。当系统发生故障适合,可以重新将数据加载到系统中,从而避免出现数据丢失的问题。 在Spark 1.3时,Spark采用无Receiver的Direct模式,如图2所示。首先Spark周期性地查询Kafka中自己订阅的Topic下每个分区的最新offset,通过对比上一批次的offset,从而获取本批次offset的范围。然后启动处理数据的Job, 最后使用Kafka低级消费API去拉取本批次的数据。 在最新的API中,Receiver模式已经被废弃。Spark的官网上是推荐Direct模式,理由如下: Direct模式中,将创建与要使用的Kafka分区一样多的RDD分区,所有这些分区都将从Kafka并行读取数据。因此,Kafka和RDD分区之间存在一对一的映射,这更易于理解和调整。 2) 高效率 Receiver模式中为了保证高可用,需要启用WAL机制,在KafkaUtils.createStream()中设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER,将数据复制一份到外存中。在处理大数据情况下,很容易成为系统瓶颈。 Direct模式中,为了保证不丢失数据,不需要使用WAL这种重量级的保存机制,只需要将上一批次的Offset保存即可。如果系统挂掉,Spark会重新读取上一次保存的Offset,从而保证数据的零丢失。 3) 占用资源低 Receiver不处理数据,只是持续不断的接受数据,与其他Exector是异步的,但却会占用系统资源。在分配相同cup和内存资源的前提下,Receiver会占用Cpu时间,影响Spark的处理速度,如果数据量大,Receiver会占用更大内存。 以前博主一般是用虚拟机来从头到尾来安装各种大数据组件,每次学半天都是在安装,但是公司中大数据平台都是运维已经安装好的,我们每天要做的更多是在平台上开发业务,因此学习时候使用Docker快速安装就显得很有意义。 这里2个ip分别是zk和kafka所在设备的ip。 使用docker最好将镜像仓库配成阿里云,否词下载会很慢,配置方法网上很多,本文不再累述。 在上Offset管理代码之前,有个重要概念需要交代下,就是消费语义。Spark Streaming从Kafka broker中拉取数据时候,有三种消费语意: 从字面上看就是会有重复数据。Spark从Kafka拉取数据进行处理后,如果offset还没有保存而系统挂掉时候,就会重新从上一次的Offset处开始消费,会造成至少消费一次。 Kafka broker存储的数据只会被消费一次。实现这种模式主要有2种方法 简单点说就是数据可能发生丢失。比如使用自动提交offset时,因为是定时提交offset,有可能刚拉去到一批数据,还没进行处理,系统就自动帮你提交了offset,而你在处理过程中挂了,这就造成了数据丢失。 1)创建topic 2)向Kafka中写入数据 登陆redis的客户端,查看key为Hello的值,可以发现kafka 生产者一共写入了200个”Hello” 1)Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式
1 Spark Streaming读取Kafka的两种模式
1.1 Receiver模式
图 1(来自百度图片)1.2 Direct模式
图 2(来自百度图片)1.3 两种接收方式的对比
1) 简化并行度
Receiver模式中,Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以在KafkaUtils.createStream()中提高partition的数量,只会增加一个Receiver中读取partition的线程数,不会增加Spark处理数据的并行度2 使用Docker安装实验环境
2.1 安装Reids
docker pull redislabs/rebloom docker run -d -p 6379:6379 redislabs/rebloom
2.2 安装Zookeeper
docker pull wurstmeister/zookeeper docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
2.3 安装Kafka
docker pull wurstmeister/kafka docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.0.150:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.150:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
2.4 查看运行的容器
docker ps
2.5 注意事项
3 消费语义
3.1 至少消费一次
3.2 恰好消费一次
a 在至少消费一次的基础上,下游系统自己做幂等。比如有2条uuid一样的数据被Spark处理且存在数据库,下游系统读取的时候要根据uuid去重。
b 使用数据库的事务。保证数据处理入库和提交offset这2个操作处于同一个事务之中。3.3 最多消费一次
4 使用Redis管理Kafka Offset(支持多Topiic)
4.1 向Kafka中写入数据
import org.apache.kafka.clients.admin.*; import java.util.Collections; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * 创建主题 */ public class CreatTopicService { public static final String brokerList = "192.168.2.50:9092"; public static final String topic = "first"; public static void main(String[] args) { Properties properties = new Properties(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); AdminClient client = AdminClient.create(properties); NewTopic newTopic = new NewTopic(topic, 2, (short) 1); CreateTopicsResult result = client.createTopics(Collections.singleton(newTopic)); try { result.all().get(); } catch (InterruptedException | ExecutionException e){ e.printStackTrace(); }finally{ if(client != null){ client.close() ; } } } }
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; public class ProducerService { public static final String brokerList = "localhost:9092"; public static final String topic = "first"; public static void main(String[] args) { Properties properties = new Properties(); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("bootstrap.servers", brokerList); //自己直生产者客户端参数并创建KafkaProducer实例 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); //构建所需妥发送的消息 ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello Spark!"); try { //发送消息3种方法 //1)发后即忘 // producer.send(record); //2)同步 // Future<RecordMetadata> future = producer.send(record); // RecordMetadata metadata = future.get(); // System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset()); //3)异步 for (int i = 0; i < 200; i++){ producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null){ exception.printStackTrace(); }else{ System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset()); } } }); } }catch (Exception e){ e.printStackTrace(); }finally { //关闭生产着客户端实例 if(producer != null){ producer.close(); } } } }
4.2 Spark消费Kafka的代码
import com.dhhy.redis.RedisUtil import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.slf4j.LoggerFactory /** * 使用redis管理kafka Offset */ object KafkaOffseManageDemo2 { val LOG = LoggerFactory.getLogger(KafkaOffseManageDemo.getClass) /** * 从Redis获取上一次保存的偏移量 * @param topics * @return */ def getOffsets(topics: Array[String]): Map[TopicPartition, Long] = { val fromOffsets = collection.mutable.Map.empty[TopicPartition, Long] import scala.collection.JavaConversions._ val jedis = RedisUtil.getResource() topics.foreach(topic => { jedis.hgetAll(topic).foreach(kv =>{ fromOffsets += (new TopicPartition(topic, kv._1.toInt) -> kv._2.toLong) }) }) RedisUtil.returnResource(jedis) fromOffsets.toMap } def main(args: Array[String]): Unit = { val LOG = LoggerFactory.getLogger(KafkaOffseManageDemo2.getClass) //设置日志级别 Logger.getLogger("org").setLevel(Level.INFO) //配置项 val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaOffseManageDemo") //创建流式上下文,设置批处理间隔 val ssc = new StreamingContext(conf, Seconds(5)) //kafka配置参数 val kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092", //kafka集群地址 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], //序列化类型,此处为字符类型 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], //序列化类型,此处为字符类型 ConsumerConfig.GROUP_ID_CONFIG -> "KafkaOffseManageDemo1",//Kafka消费组l ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest", //读取最新offset ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)// 是否向定时向zookeeper写入每个分区的offse ) //消费的topic //如果消费多个topic,则val topics = Array(topic1,topic2,…) val topics = Array("first") val offsets = getOffsets(topics) // 初始化输入流 val stream = if(offsets.isEmpty){ KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) }else{ KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams, offsets)) } stream.foreachRDD{rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition { iter => val jedis = RedisUtil.getResource() val pipeline = jedis.pipelined() iter.foreach { msg => msg.value().split(" ").foreach (f => if (f == "Hello"){ pipeline.incr("Hello") } ) } val o = offsetRanges(TaskContext.get.partitionId) //打印偏移量信息 println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") //保存Kafka的偏移量 pipeline.hset(o.topic, o.partition + "", o.untilOffset + "") pipeline.sync() RedisUtil.returnResource(jedis) } } ssc.start() ssc.awaitTermination() } }
4.3 测试结果
5 参考文献
https://blog.csdn.net/kwu_ganymede/article/details/50314901
2) Apache spark官方文档
https://spark.apache.org/docs/latest/streaming-kafka-integration.html
3) 耿嘉安,Spark内核设计的艺术:架构设计与实现 第1版. 2018, 机械工业出版社.
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算