本文共 4897 字,大约阅读时间需要 16 分钟。
SparkStreaming与Kafka的Direct Approach集成:自定义Offset管理实践
在之前的文章《解析SparkStreaming和Kafka集成的两种方式》中,我们详细介绍了SparkStreaming和Kafka集成的两种主要方式:Receiver based Approach和Direct Approach,并对比了各自的优劣势。此外,还讨论了针对不同版本的Spark和Kafka集成进行的支持。此次文章将深入探讨SparkStreaming与Kafka采用Direct Approach进行数据处理时,如何实现自定义Offset管理。
SparkStreaming通过Direct Approach接收数据的入口是KafkaUtils.createDirectStream方法。在调用此方法时,会先创建KafkaCluster实例。KafkaCluster负责与Kafka集群通信,并获取Kafka的分区信息,创建DirectKafkaInputDStream。每个DirectKafkaInputDStream对应一个主题(topic),并且每个DirectKafkaInputDStream都会持有一个KafkaCluster实例。
在计算周期结束后,会调用DirectKafkaInputDStream的compute方法,执行以下操作:
关于KafkaRDD与Kafka分区对应关系,可以参考《重要 | Spark分区并行度决定机制》。
SparkStreaming与Kafka通过Direct方式集成,自定义Offset管理的核心逻辑
以下是实现自定义Offset管理的关键代码示例:
package org.apache.spark.streaming.kafka;public class KafkaManager extends Serializable { private final KafkaCluster kc; private final Map kafkaParams; KafkaManager(Map kafkaParams) { this.kc = new KafkaCluster(kafkaParams); this.kafkaParams = kafkaParams; } // 创建数据流之前,根据实际情况更新消费offsets def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = { topics.foreach { topic => var hasConsumed = true // 获取每一个topic分区 val partitionsE = kc.getPartitions(Set(topic)) if (partitionsE.isLeft) { throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}") } val partitions = partitionsE.right.get // 获取消费偏移量 val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions) if (consumerOffsetsE.isLeft) { hasConsumed = false } if (hasConsumed) { // 获取earliestLeaderOffsets val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions) if (earliestLeaderOffsetsE.isLeft) { throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}") } val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get // 获取当前消费偏移量 val consumerOffsets = consumerOffsetsE.right.get // 更新offsets var offsets: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]() consumerOffsets.foreach { case (tp, n) => val earliestLeaderOffset = earliestLeaderOffsets(tp).offset if (n < earliestLeaderOffset) { offsets += (tp -> earliestLeaderOffset) println(s"consumer group: $groupId, topic: ${tp.topic}, partition: ${tp.partition} offsets已过时,更新为: $earliestLeaderOffset") } } if (!offsets.isEmpty) { kc.setConsumerOffsets(groupId, offsets.toMap) } } else { // 处理offset过期重置的情况 val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase) val leaderOffsets: Map[TopicAndPartition, LeaderOffset] = if (reset == Some("smallest")) { val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions) if (leaderOffsetsE.isLeft) { throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}") } leaderOffsetsE.right.get } else { val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions) if (leaderOffsetsE.isLeft) { throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}") } leaderOffsetsE.right.get } val offsets = leaderOffsets.map { case (tp, lo) => (tp, lo.offset) } kc.setConsumerOffsets(groupId, offsets) } } } // 更新zookeeper上的消费offsets def updateZKOffsets(rdd: RDD[(String, String)]): Unit = { val groupId = kafkaParams("group.id") val offsetList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges offsetList.foreach { offset => val topicAndPartition = TopicAndPartition(offset.topic, offset.partition) val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offset.untilOffset))) if (o.isLeft) { println(s"Error updating the offset to Kafka cluster: ${o.left.get}") } } }} 在实现自定义Offset管理时,主要逻辑包括以下几个方面:
a. 获取Kafka分区信息b. 获取Kafka消费偏移量c. 更新Zookeeper上的消费offsetsd. 处理offset过期重置的情况
通过上述逻辑,可以实现对Kafka主题中数据的精确消费,即使在数据已经过期的情况下,也能正确处理偏移量,避免数据丢失或重复消费。
在实际应用中,可以根据业务需求对上述逻辑进行扩展和修改,如添加数据丢失的处理机制、优化Zookeeper连接等。
关注微信公众号:大数据学习与分享,获取更多技术干货
转载地址:http://ilefk.baihongyu.com/