博客
关于我
spark读取kafka数据_SparkStreaming和Kafka基于Direct Approach管理offset(一)
阅读量:798 次
发布时间:2023-04-02

本文共 4897 字,大约阅读时间需要 16 分钟。

SparkStreaming与Kafka的Direct Approach集成:自定义Offset管理实践

在之前的文章《解析SparkStreaming和Kafka集成的两种方式》中,我们详细介绍了SparkStreaming和Kafka集成的两种主要方式:Receiver based Approach和Direct Approach,并对比了各自的优劣势。此外,还讨论了针对不同版本的Spark和Kafka集成进行的支持。此次文章将深入探讨SparkStreaming与Kafka采用Direct Approach进行数据处理时,如何实现自定义Offset管理。

SparkStreaming通过Direct Approach接收数据的入口是KafkaUtils.createDirectStream方法。在调用此方法时,会先创建KafkaCluster实例。KafkaCluster负责与Kafka集群通信,并获取Kafka的分区信息,创建DirectKafkaInputDStream。每个DirectKafkaInputDStream对应一个主题(topic),并且每个DirectKafkaInputDStream都会持有一个KafkaCluster实例。

在计算周期结束后,会调用DirectKafkaInputDStream的compute方法,执行以下操作:

  • 获取对应Kafka Partition的untilOffset,以确定需要获取数据的区间。
  • 构建KafkaRDD实例。每个计算周期内,DirectKafkaInputDStream和KafkaRDD是一一对应的。
  • 将相关的offset信息报告给InputInfoTracker。
  • 返回该RDD。
  • 关于KafkaRDD与Kafka分区对应关系,可以参考《重要 | Spark分区并行度决定机制》。

    SparkStreaming与Kafka通过Direct方式集成,自定义Offset管理的核心逻辑

  • 业务逻辑处理
  • 以下是实现自定义Offset管理的关键代码示例:

    package org.apache.spark.streaming.kafka;public class KafkaManager extends Serializable {    private final KafkaCluster kc;    private final Map
    kafkaParams; KafkaManager(Map
    kafkaParams) { this.kc = new KafkaCluster(kafkaParams); this.kafkaParams = kafkaParams; } // 创建数据流之前,根据实际情况更新消费offsets def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = { topics.foreach { topic => var hasConsumed = true // 获取每一个topic分区 val partitionsE = kc.getPartitions(Set(topic)) if (partitionsE.isLeft) { throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}") } val partitions = partitionsE.right.get // 获取消费偏移量 val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions) if (consumerOffsetsE.isLeft) { hasConsumed = false } if (hasConsumed) { // 获取earliestLeaderOffsets val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions) if (earliestLeaderOffsetsE.isLeft) { throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}") } val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get // 获取当前消费偏移量 val consumerOffsets = consumerOffsetsE.right.get // 更新offsets var offsets: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]() consumerOffsets.foreach { case (tp, n) => val earliestLeaderOffset = earliestLeaderOffsets(tp).offset if (n < earliestLeaderOffset) { offsets += (tp -> earliestLeaderOffset) println(s"consumer group: $groupId, topic: ${tp.topic}, partition: ${tp.partition} offsets已过时,更新为: $earliestLeaderOffset") } } if (!offsets.isEmpty) { kc.setConsumerOffsets(groupId, offsets.toMap) } } else { // 处理offset过期重置的情况 val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase) val leaderOffsets: Map[TopicAndPartition, LeaderOffset] = if (reset == Some("smallest")) { val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions) if (leaderOffsetsE.isLeft) { throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}") } leaderOffsetsE.right.get } else { val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions) if (leaderOffsetsE.isLeft) { throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}") } leaderOffsetsE.right.get } val offsets = leaderOffsets.map { case (tp, lo) => (tp, lo.offset) } kc.setConsumerOffsets(groupId, offsets) } } } // 更新zookeeper上的消费offsets def updateZKOffsets(rdd: RDD[(String, String)]): Unit = { val groupId = kafkaParams("group.id") val offsetList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges offsetList.foreach { offset => val topicAndPartition = TopicAndPartition(offset.topic, offset.partition) val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offset.untilOffset))) if (o.isLeft) { println(s"Error updating the offset to Kafka cluster: ${o.left.get}") } } }}
    1. offset管理核心逻辑
    2. 在实现自定义Offset管理时,主要逻辑包括以下几个方面:

      a. 获取Kafka分区信息b. 获取Kafka消费偏移量c. 更新Zookeeper上的消费offsetsd. 处理offset过期重置的情况

      通过上述逻辑,可以实现对Kafka主题中数据的精确消费,即使在数据已经过期的情况下,也能正确处理偏移量,避免数据丢失或重复消费。

      在实际应用中,可以根据业务需求对上述逻辑进行扩展和修改,如添加数据丢失的处理机制、优化Zookeeper连接等。

      关注微信公众号:大数据学习与分享,获取更多技术干货

    转载地址:http://ilefk.baihongyu.com/

    你可能感兴趣的文章
    oracle 课堂笔记
    查看>>
    Oracle 返回结果集的 存储过程
    查看>>
    Oracle 递归
    查看>>
    Oracle 递归函数与拼接
    查看>>
    oracle 逻辑优化,提升高度,综合SQL上下文进行逻辑优化
    查看>>
    oracle 闪回关闭,关闭闪回即disable flashback的操作步骤
    查看>>
    oracle 限制用户并行,insert /*parallel */ 到不同用户,并行起不来的问题
    查看>>
    oracle--用户,权限,角色的管理
    查看>>
    Oracle-定时任务-JOB
    查看>>
    oracle.dataaccess 连接池,asp.net使用Oracle.DataAccess.dll连接Oracle
    查看>>
    oracle00205报错,Oracle控制文件损坏报错场景
    查看>>
    Oracle10g EM乱码之快速解决
    查看>>
    Oracle10g下载地址--多平台下的32位和64位
    查看>>
    Oracle10g安装了11g的ODAC后,PL/SQL连接提示TNS:无法解析指定的连接标识符
    查看>>
    oracle11g dataguard物理备库搭建(关闭主库cp数据文件到备库)
    查看>>
    Oracle11G基本操作
    查看>>
    Oracle11g服务详细介绍及哪些服务是必须开启的?
    查看>>
    Oracle11g静默安装dbca,netca报错处理--直接跟换操作系统
    查看>>
    oracle12安装软件后安装数据库,然后需要自己配置监听
    查看>>
    Oracle——08PL/SQL简介,基本程序结构和语句
    查看>>