专业的编程技术博客社区

网站首页 > 博客文章 正文

0362-如何在Spark Streaming应用中使用Kudu管理Kafka的Offset

baijin 2024-10-16 07:40:18 博客文章 11 ℃ 0 评论

温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

Fayson的github:

https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的


在Hadoop集群中使用Spark Streaming处理Kafka的流数据时,需要考虑如果程序发生故障、升级、或宕机重启,如何保证Kafka的数据不丢失不重复呢?在Spark1.3版本之后,使用checkpoint保存当前消费Kafka的Offset,那么在特定的环境下checkpoint配合Kafka可以保证数据不丢失不重复。Spark Streaming的checkpoint在运行的过程中将元数据、每次的RDD状态数据以及相关的jar序列化成一个二进制文件持久化到HDFS中,如果你的Spark作业升级后重新运行会导致启动报错或反序列化失败,即使启动成功代码仍然是更新前的代码。当然我们也可以自己维护Kafka的Offset,本篇文章Fayson主要介绍如何使用Kudu管理Kafka的Offset。

  • 文章概述

1.环境准备

2.Spark Streaming示例代码

3.Kafka Offset测试

4.总结

  • 测试环境

1.CM和CDH版本为5.15

2.Redhat7.3

3.JDK1.8.0_141

4.MariaDB-5.5.56

2.环境准备


1.准备向Kakfa发送数据的脚本,关于脚本这里就不在过多的介绍前面很多文章都有介绍,具体可以参考Fayson的GitHub:

https://github.com/fayson/cdhproject/tree/master/kafkademo/0283-kafka-shell

(可左右滑动)

根据需要将conf下面的配置文件修改为自己集群的环境即可,发送至Kafka的JSON数据示例如下:

{
 "occupation": "生产工作、运输工作和部分体力劳动者",
 "address": "台东东二路16号-8-8",
 "city": "长治",
 "marriage": "1",
 "sex": "1",
 "name": "仲淑兰",
 "mobile_phone_num": "13607268580",
 "bank_name": "广州银行31",
 "id": "510105197906185179",
 "child_num": "1",
 "fix_phone_num": "15004170180"
}

(可左右滑动)

2.登录CM进入SPARK2服务的配置项将spark_kafka_version的kafka版本修改为0.10

3.创建测试使用的Topic

[root@cdh01 0283-kafka-shell]# kafka-topics --create --zookeeper cdh01.fayson.com:2181 --replication-factor 3 --partitions 3 --topic kafka_kudu_topic_offset_3

(可左右滑动)

3.Spark Streaming示例代码


1.使用Maven创建一个Scala工程,pom.xml内容如下:

<dependency>
 <groupId>org.apache.kudu</groupId>
 <artifactId>kudu-spark_2.10</artifactId>
 <version>1.2.0-cdh5.10.0</version>
</dependency>
<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-streaming-kafka_2.10</artifactId>
 <version>1.6.0-cdh5.12.1</version>
</dependency>
<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-streaming_2.10</artifactId>
 <version>1.6.0-cdh5.12.1</version>
</dependency>
<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-sql_2.10</artifactId>
 <version>1.6.0-cdh5.12.1</version>
</dependency>

(可左右滑动)

2.创建0295.properties的配置文件,内容如下:

kafka.brokers=cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092
kafka.topics=kafka_kudu_topic_offset
group.id=testgroup
kudumaster.list=cdh01.fayson.com,cdh02.fayson.com,cdh03.fayson.com
zookeeper.list=cdh01.fayson.com,cdh02.fayson.com,cdh03.fayson.com
offset.tablename=kafka_offset_tb

(可左右滑动)

3.创建KafkaOffsetTest.scala文件,内容如下:

package com.cloudera.streaming.nokerberos
import java.io.{File, FileInputStream}
import java.util.Properties
import org.apache.commons.lang.StringUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.cloudera.utils.KafkaOffsetByKudu
import scala.util.parsing.json.JSON
/**
 * package: com.cloudera.streaming.nokerberos
 * describe: 用于测试使用Kudu管理Kakfa的Offset
 * creat_user: Fayson 
 * email: htechinfo@163.com
 * creat_date: 2018/8/9
 * creat_time: 下午5:50
 * 公众号:Hadoop实操
 */
object KafkaOffsetTest {
 Logger.getLogger("*").setLevel(Level.WARN) //设置日志级别
 var confPath: String = System.getProperty("user.dir") + File.separator + "conf"
 /**
 * 用于存储Kafka消息到Kudu的建表Schema定义
 */
 val userInfoSchema = StructType(
 // col name type nullable?
 StructField("id", StringType , false) ::
 StructField("name" , StringType, true ) ::
 StructField("sex" , StringType, true ) ::
 StructField("city" , StringType, true ) ::
 StructField("occupation" , StringType, true ) ::
 StructField("tel" , StringType, true ) ::
 StructField("fixPhoneNum" , StringType, true ) ::
 StructField("bankName" , StringType, true ) ::
 StructField("address" , StringType, true ) ::
 StructField("marriage" , StringType, true ) ::
 StructField("childNum", StringType , true ) :: Nil
 )
 /**
 * 定义一个UserInfo对象
 */
 case class UserInfo (
 id: String,
 name: String,
 sex: String,
 city: String,
 occupation: String,
 tel: String,
 fixPhoneNum: String,
 bankName: String,
 address: String,
 marriage: String,
 childNum: String
 )
 def main(args: Array[String]): Unit = {
 //加载配置文件
 val properties = new Properties()
 val file = new File(confPath + File.separator + "0295.properties")
 if(!file.exists()) {
 val in = Kafka2Spark2Hbase.getClass.getClassLoader.getResourceAsStream("0295.properties")
 properties.load(in);
 } else {
 properties.load(new FileInputStream(file))
 }
 val offsetTableName = properties.getProperty("offset.tablename")
 val brokers = properties.getProperty("kafka.brokers")
 val topics = properties.getProperty("kafka.topics")
 val groupName = properties.getProperty("group.id")
 val kuduMaster = properties.getProperty("kudumaster.list")
 val zklist = properties.getProperty("zookeeper.list")
 println("kafka.brokers:" + brokers)
 println("kafka.topics:" + topics)
 println("kudu.master:" + kuduMaster)
 println("kafka.brokers:" + brokers)
 println("kafka.topics:" + topics)
 if(StringUtils.isEmpty(brokers)|| StringUtils.isEmpty(topics) || StringUtils.isEmpty(groupName)) {
 System.exit(0)
 }
 val topicsSet = topics.split(",").toSet
 val spark = SparkSession.builder()
 .appName("Kafka2Spark2Kudu-Offset")
 .config(new SparkConf())
 .getOrCreate()
 //引入隐式
 import spark.implicits._
 val kuduContext = new KuduContext(kuduMaster, spark.sparkContext)
 KafkaOffsetByKudu.init_kudu_tb(kuduContext, offsetTableName)
 val fromOffsetMap = KafkaOffsetByKudu.getLastCommittedOffsets(topicsSet, groupName, offsetTableName, kuduContext, spark, zklist)
 val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) //设置Spark时间窗口,每5s处理一次
 val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers
 , "key.deserializer" -> classOf[StringDeserializer]
 , "value.deserializer" -> classOf[StringDeserializer]
 , "group.id" -> groupName
 )
 val dStream = KafkaUtils.createDirectStream[String, String](ssc,
 LocationStrategies.PreferConsistent,
 ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams, fromOffsetMap))
 dStream.foreachRDD(rdd => {
 //将rdd数据重新封装为Rdd[UserInfo]
 val newrdd = rdd.map(line => {
 val jsonObj = JSON.parseFull(line.value())
 val map:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String, Any]]
 new UserInfo(
 map.get("id").get.asInstanceOf[String],
 map.get("name").get.asInstanceOf[String],
 map.get("sex").get.asInstanceOf[String],
 map.get("city").get.asInstanceOf[String],
 map.get("occupation").get.asInstanceOf[String],
 map.get("mobile_phone_num").get.asInstanceOf[String],
 map.get("fix_phone_num").get.asInstanceOf[String],
 map.get("bank_name").get.asInstanceOf[String],
 map.get("address").get.asInstanceOf[String],
 map.get("marriage").get.asInstanceOf[String],
 map.get("child_num").get.asInstanceOf[String]
 )
 })
 //将RDD转换为DataFrame
 val userinfoDF = spark.sqlContext.createDataFrame(newrdd)
 kuduContext.upsertRows(userinfoDF, "user_info")
 //数据更新成功后,更新Topic Offset数据到Kudu表中
 KafkaOffsetByKudu.saveOffset(kuduContext, spark, rdd, groupName, offsetTableName)
 })
 ssc.start()
 ssc.awaitTermination()
 }
}

(可左右滑动)

4.创建KafkaOffsetByKudu.scala文件,内容如下

package com.cloudera.utils
import kafka.utils.ZkUtils
import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import util.control.Breaks._
import scala.collection.JavaConverters._
/**
 * package: com.cloudera.utils
 * describe: 使用Kudu管理Kafka的Offset
 * creat_user: Fayson 
 * email: htechinfo@163.com
 * creat_date: 2018/8/6
 * creat_time: 下午5:46
 * 公众号:Hadoop实操
 */
object KafkaOffsetByKudu {
 /**
 * 用于存储Offset的建表Schema
 */
 val KafkOffset = StructType(
 // col name type nullable?
 StructField("topic", StringType , false) ::
 StructField("group" , StringType, false ) ::
 StructField("partition" , IntegerType, false ) ::
 StructField("offset_id" , LongType, false ) ::
 StructField("timestamp", LongType, false) :: Nil
 )
 /**
 * 定义一个KafkOffset对象
 * @param topic
 * @param group
 * @param partition
 * @param offset_id
 * @param timestamp
 */
 case class KafkOffsetInfo (
 topic: String,
 group: String,
 partition: Int,
 offset_id: Long,
 timestamp: Long
 )
 /**
 * 创建一个用于存放Topic Offset信息的Kudu表
 * @param kuduContext
 * @param tableName
 */
 def init_kudu_tb(kuduContext: KuduContext, tableName: String): Unit = {
 //判断表是否存在
 if(!kuduContext.tableExists(tableName)) {
 println("create Kudu Table :" + tableName)
 val createTableOptions = new CreateTableOptions()
 createTableOptions.addHashPartitions(List("topic","group", "partition").asJava, 8).setNumReplicas(3)
 kuduContext.createTable(tableName, KafkOffset, Seq("topic","group", "partition"), createTableOptions)
 }
 }
 /**
 * 获取最后记录的Kafka Topic Offset
 * @param topics
 * @param group
 * @param tableName
 * @param kuduContext
 * @param spark
 * @return
 */
 def getLastCommittedOffsets(topics : Set[String], group: String, tableName: String, kuduContext: KuduContext, spark:SparkSession, zklist: String) : Map[TopicPartition,Long] = {
 var fromOffsetMap = Map[TopicPartition,Long]()
 //通过Zookeeper获取topics的Partition
 //如果kudu表中存储的数据为空,则需要将所有的partition设置为从0开始
 val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zklist, 30000, 3000)
 val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2,false)
 //通过Zookeeper获取相应Topic的Partition及Offset
 val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(topics.toSeq)
 import spark.implicits._
 val rdd_offset = kuduContext.kuduRDD(spark.sparkContext, tableName, Seq("topic","group", "partition" ,"offset_id"))
 //用于缓存需要更新的Offset的数据
 var list = List[KafkOffsetInfo]()
 if(rdd_offset.isEmpty()) { //如果查询的Kudu表数据为空则使用ZK获取到的partition,并将所有的partition的offset设置为0
 topics.foreach(topic => {
 zKNumberOfPartitionsForTopic.get(topic).foreach(_.foreach(partition_id => {
 fromOffsetMap += (new TopicPartition(topic, partition_id) -> 0)
 list.+:(new KafkOffsetInfo(
 topic,
 group,
 partition_id,
 0,
 System.currentTimeMillis()
 ))
 }))
 })
 } else {
 rdd_offset.map(row => {
 val tmp_topic = row.getAs[String]("topic")
 val tmp_group = row.getAs[String]("group")
 val partition_id = row.getAs[Int]("partition")
 breakable{
 if(!topics.contains(tmp_topic) || !group.equals(tmp_group)) break
 }
 zKNumberOfPartitionsForTopic.get(tmp_topic).foreach(_.foreach(tmp_partition_id => {
 if(tmp_partition_id == partition_id) {
 fromOffsetMap += (new TopicPartition(tmp_topic, partition_id) -> row.getAs[Long]("offset_id"))
 } else {
 fromOffsetMap += (new TopicPartition(tmp_topic, partition_id) -> 0)
 //将该对象存入Kudu的KafkaOffset表中
 list.+:(new KafkOffsetInfo(
 tmp_topic,
 tmp_group,
 tmp_partition_id,
 0,
 System.currentTimeMillis()
 ))
 }
 }))
 })
 }
 //将相应Topic的Offset信息更新到kudu表,包含新增的
 if(!list.isEmpty) {
 kuduContext.upsertRows(spark.sqlContext.createDataFrame(list), tableName)
 }
 fromOffsetMap
 }
 /**
 * 将消费的Kafka Offset保存到Kudu表
 * @param kuduContext
 * @param spark
 * @param kafka_rdd
 * @param groupName
 * @param offsetTableName
 */
 def saveOffset(kuduContext: KuduContext, spark:SparkSession, kafka_rdd:RDD[ConsumerRecord[String, String]], groupName:String, offsetTableName:String):Unit = {
 val kafkaOffsetDF = spark.sqlContext.createDataFrame(
 kafka_rdd.map(line => {
 new KafkaOffsetByKudu.KafkOffsetInfo(
 line.topic(),
 groupName,
 line.partition(),
 line.offset(),
 System.currentTimeMillis()
 )
 })
 )
 kuduContext.upsertRows(kafkaOffsetDF, offsetTableName)
 }
}

(可左右滑动)

5.使用mvn命令编译工程,注意由于是scala工程编译时mvn命令要加scala:compile

mvn clean scala:compile package

(可左右滑动)

6.将编译好的spark2-demo-1.0-SNAPSHOT.jar包上传至服务器

将Spark2应用的配置文件放在conf目录下,内容如下:

0295.properties配置文件内容如下:

4.Kafka Offset测试


初次运行在Kudu表未创建的情况下。

1.在没有运行SparkStreaming的情况下,先向指定的kafka_kudu_topic发送消息

运行两次脚本向Topic中发送1200条消息。

2.启动SparkStreaming作业,查看作业接收到的消息

spark2-submit --class com.cloudera.streaming.nokerberos.KafkaOffsetTest \
--master yarn-client \
--driver-cores 1 \
--driver-memory 2g \
--executor-cores 1 \
--executor-memory 1g \
spark2-demo-1.0-SNAPSHOT.jar

(可左右滑动)

3.启动SparkStreaming作业,查看作业接收到的消息,可以看到Spark处理的消息数量为1200条

4.停掉Spark作业,然后使用脚本再次向Kafka发送600条消息

5.再次启动Spark作业查看,可以看到Spark作业只处理了600条数据

6.通过Hue创建Kudu的表

查看kafka_offset_tb表数据

可以看到kafka_offset_tb表中kafka_kudu_topic_offset_3的Topic每个Partition的offset_id,每个Offset从0开始。

5.总结


1.SparkSteaming默认的checkpoint存储Kafka的Offset信息,因此这里使用自己维护的方式将Kafka的Offset存储到Kudu表中。

2.Kudu表的设计方式以topic、group和partition做为表的Key。

3.在Kafka的Topic增加了Partition后,则需要重启SparkStreaming作业,否则作业无法知道Topic增加了Partition。

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。

温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表