网站首页 > 博客文章 正文
温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。
Fayson的github:
https://github.com/fayson/cdhproject
提示:代码块部分可以左右滑动查看噢
1.文档编写目的
在Hadoop集群中使用Spark Streaming处理Kafka的流数据时,需要考虑如果程序发生故障、升级、或宕机重启,如何保证Kafka的数据不丢失不重复呢?在Spark1.3版本之后,使用checkpoint保存当前消费Kafka的Offset,那么在特定的环境下checkpoint配合Kafka可以保证数据不丢失不重复。Spark Streaming的checkpoint在运行的过程中将元数据、每次的RDD状态数据以及相关的jar序列化成一个二进制文件持久化到HDFS中,如果你的Spark作业升级后重新运行会导致启动报错或反序列化失败,即使启动成功代码仍然是更新前的代码。当然我们也可以自己维护Kafka的Offset,本篇文章Fayson主要介绍如何使用Kudu管理Kafka的Offset。
- 文章概述
1.环境准备
2.Spark Streaming示例代码
3.Kafka Offset测试
4.总结
- 测试环境
1.CM和CDH版本为5.15
2.Redhat7.3
3.JDK1.8.0_141
4.MariaDB-5.5.56
2.环境准备
1.准备向Kakfa发送数据的脚本,关于脚本这里就不在过多的介绍前面很多文章都有介绍,具体可以参考Fayson的GitHub:
https://github.com/fayson/cdhproject/tree/master/kafkademo/0283-kafka-shell
(可左右滑动)
根据需要将conf下面的配置文件修改为自己集群的环境即可,发送至Kafka的JSON数据示例如下:
{ "occupation": "生产工作、运输工作和部分体力劳动者", "address": "台东东二路16号-8-8", "city": "长治", "marriage": "1", "sex": "1", "name": "仲淑兰", "mobile_phone_num": "13607268580", "bank_name": "广州银行31", "id": "510105197906185179", "child_num": "1", "fix_phone_num": "15004170180" }
(可左右滑动)
2.登录CM进入SPARK2服务的配置项将spark_kafka_version的kafka版本修改为0.10
3.创建测试使用的Topic
[root@cdh01 0283-kafka-shell]# kafka-topics --create --zookeeper cdh01.fayson.com:2181 --replication-factor 3 --partitions 3 --topic kafka_kudu_topic_offset_3
(可左右滑动)
3.Spark Streaming示例代码
1.使用Maven创建一个Scala工程,pom.xml内容如下:
<dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-spark_2.10</artifactId> <version>1.2.0-cdh5.10.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.0-cdh5.12.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.0-cdh5.12.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.0-cdh5.12.1</version> </dependency>
(可左右滑动)
2.创建0295.properties的配置文件,内容如下:
kafka.brokers=cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092 kafka.topics=kafka_kudu_topic_offset group.id=testgroup kudumaster.list=cdh01.fayson.com,cdh02.fayson.com,cdh03.fayson.com zookeeper.list=cdh01.fayson.com,cdh02.fayson.com,cdh03.fayson.com offset.tablename=kafka_offset_tb
(可左右滑动)
3.创建KafkaOffsetTest.scala文件,内容如下:
package com.cloudera.streaming.nokerberos import java.io.{File, FileInputStream} import java.util.Properties import org.apache.commons.lang.StringUtils import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kudu.spark.kudu.KuduContext import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import com.cloudera.utils.KafkaOffsetByKudu import scala.util.parsing.json.JSON /** * package: com.cloudera.streaming.nokerberos * describe: 用于测试使用Kudu管理Kakfa的Offset * creat_user: Fayson * email: htechinfo@163.com * creat_date: 2018/8/9 * creat_time: 下午5:50 * 公众号:Hadoop实操 */ object KafkaOffsetTest { Logger.getLogger("*").setLevel(Level.WARN) //设置日志级别 var confPath: String = System.getProperty("user.dir") + File.separator + "conf" /** * 用于存储Kafka消息到Kudu的建表Schema定义 */ val userInfoSchema = StructType( // col name type nullable? StructField("id", StringType , false) :: StructField("name" , StringType, true ) :: StructField("sex" , StringType, true ) :: StructField("city" , StringType, true ) :: StructField("occupation" , StringType, true ) :: StructField("tel" , StringType, true ) :: StructField("fixPhoneNum" , StringType, true ) :: StructField("bankName" , StringType, true ) :: StructField("address" , StringType, true ) :: StructField("marriage" , StringType, true ) :: StructField("childNum", StringType , true ) :: Nil ) /** * 定义一个UserInfo对象 */ case class UserInfo ( id: String, name: String, sex: String, city: String, occupation: String, tel: String, fixPhoneNum: String, bankName: String, address: String, marriage: String, childNum: String ) def main(args: Array[String]): Unit = { //加载配置文件 val properties = new Properties() val file = new File(confPath + File.separator + "0295.properties") if(!file.exists()) { val in = Kafka2Spark2Hbase.getClass.getClassLoader.getResourceAsStream("0295.properties") properties.load(in); } else { properties.load(new FileInputStream(file)) } val offsetTableName = properties.getProperty("offset.tablename") val brokers = properties.getProperty("kafka.brokers") val topics = properties.getProperty("kafka.topics") val groupName = properties.getProperty("group.id") val kuduMaster = properties.getProperty("kudumaster.list") val zklist = properties.getProperty("zookeeper.list") println("kafka.brokers:" + brokers) println("kafka.topics:" + topics) println("kudu.master:" + kuduMaster) println("kafka.brokers:" + brokers) println("kafka.topics:" + topics) if(StringUtils.isEmpty(brokers)|| StringUtils.isEmpty(topics) || StringUtils.isEmpty(groupName)) { System.exit(0) } val topicsSet = topics.split(",").toSet val spark = SparkSession.builder() .appName("Kafka2Spark2Kudu-Offset") .config(new SparkConf()) .getOrCreate() //引入隐式 import spark.implicits._ val kuduContext = new KuduContext(kuduMaster, spark.sparkContext) KafkaOffsetByKudu.init_kudu_tb(kuduContext, offsetTableName) val fromOffsetMap = KafkaOffsetByKudu.getLastCommittedOffsets(topicsSet, groupName, offsetTableName, kuduContext, spark, zklist) val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) //设置Spark时间窗口,每5s处理一次 val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers , "key.deserializer" -> classOf[StringDeserializer] , "value.deserializer" -> classOf[StringDeserializer] , "group.id" -> groupName ) val dStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams, fromOffsetMap)) dStream.foreachRDD(rdd => { //将rdd数据重新封装为Rdd[UserInfo] val newrdd = rdd.map(line => { val jsonObj = JSON.parseFull(line.value()) val map:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String, Any]] new UserInfo( map.get("id").get.asInstanceOf[String], map.get("name").get.asInstanceOf[String], map.get("sex").get.asInstanceOf[String], map.get("city").get.asInstanceOf[String], map.get("occupation").get.asInstanceOf[String], map.get("mobile_phone_num").get.asInstanceOf[String], map.get("fix_phone_num").get.asInstanceOf[String], map.get("bank_name").get.asInstanceOf[String], map.get("address").get.asInstanceOf[String], map.get("marriage").get.asInstanceOf[String], map.get("child_num").get.asInstanceOf[String] ) }) //将RDD转换为DataFrame val userinfoDF = spark.sqlContext.createDataFrame(newrdd) kuduContext.upsertRows(userinfoDF, "user_info") //数据更新成功后,更新Topic Offset数据到Kudu表中 KafkaOffsetByKudu.saveOffset(kuduContext, spark, rdd, groupName, offsetTableName) }) ssc.start() ssc.awaitTermination() } }
(可左右滑动)
4.创建KafkaOffsetByKudu.scala文件,内容如下
package com.cloudera.utils import kafka.utils.ZkUtils import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord} import org.apache.kafka.common.TopicPartition import org.apache.kudu.client.CreateTableOptions import org.apache.kudu.spark.kudu.KuduContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ import util.control.Breaks._ import scala.collection.JavaConverters._ /** * package: com.cloudera.utils * describe: 使用Kudu管理Kafka的Offset * creat_user: Fayson * email: htechinfo@163.com * creat_date: 2018/8/6 * creat_time: 下午5:46 * 公众号:Hadoop实操 */ object KafkaOffsetByKudu { /** * 用于存储Offset的建表Schema */ val KafkOffset = StructType( // col name type nullable? StructField("topic", StringType , false) :: StructField("group" , StringType, false ) :: StructField("partition" , IntegerType, false ) :: StructField("offset_id" , LongType, false ) :: StructField("timestamp", LongType, false) :: Nil ) /** * 定义一个KafkOffset对象 * @param topic * @param group * @param partition * @param offset_id * @param timestamp */ case class KafkOffsetInfo ( topic: String, group: String, partition: Int, offset_id: Long, timestamp: Long ) /** * 创建一个用于存放Topic Offset信息的Kudu表 * @param kuduContext * @param tableName */ def init_kudu_tb(kuduContext: KuduContext, tableName: String): Unit = { //判断表是否存在 if(!kuduContext.tableExists(tableName)) { println("create Kudu Table :" + tableName) val createTableOptions = new CreateTableOptions() createTableOptions.addHashPartitions(List("topic","group", "partition").asJava, 8).setNumReplicas(3) kuduContext.createTable(tableName, KafkOffset, Seq("topic","group", "partition"), createTableOptions) } } /** * 获取最后记录的Kafka Topic Offset * @param topics * @param group * @param tableName * @param kuduContext * @param spark * @return */ def getLastCommittedOffsets(topics : Set[String], group: String, tableName: String, kuduContext: KuduContext, spark:SparkSession, zklist: String) : Map[TopicPartition,Long] = { var fromOffsetMap = Map[TopicPartition,Long]() //通过Zookeeper获取topics的Partition //如果kudu表中存储的数据为空,则需要将所有的partition设置为从0开始 val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zklist, 30000, 3000) val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2,false) //通过Zookeeper获取相应Topic的Partition及Offset val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(topics.toSeq) import spark.implicits._ val rdd_offset = kuduContext.kuduRDD(spark.sparkContext, tableName, Seq("topic","group", "partition" ,"offset_id")) //用于缓存需要更新的Offset的数据 var list = List[KafkOffsetInfo]() if(rdd_offset.isEmpty()) { //如果查询的Kudu表数据为空则使用ZK获取到的partition,并将所有的partition的offset设置为0 topics.foreach(topic => { zKNumberOfPartitionsForTopic.get(topic).foreach(_.foreach(partition_id => { fromOffsetMap += (new TopicPartition(topic, partition_id) -> 0) list.+:(new KafkOffsetInfo( topic, group, partition_id, 0, System.currentTimeMillis() )) })) }) } else { rdd_offset.map(row => { val tmp_topic = row.getAs[String]("topic") val tmp_group = row.getAs[String]("group") val partition_id = row.getAs[Int]("partition") breakable{ if(!topics.contains(tmp_topic) || !group.equals(tmp_group)) break } zKNumberOfPartitionsForTopic.get(tmp_topic).foreach(_.foreach(tmp_partition_id => { if(tmp_partition_id == partition_id) { fromOffsetMap += (new TopicPartition(tmp_topic, partition_id) -> row.getAs[Long]("offset_id")) } else { fromOffsetMap += (new TopicPartition(tmp_topic, partition_id) -> 0) //将该对象存入Kudu的KafkaOffset表中 list.+:(new KafkOffsetInfo( tmp_topic, tmp_group, tmp_partition_id, 0, System.currentTimeMillis() )) } })) }) } //将相应Topic的Offset信息更新到kudu表,包含新增的 if(!list.isEmpty) { kuduContext.upsertRows(spark.sqlContext.createDataFrame(list), tableName) } fromOffsetMap } /** * 将消费的Kafka Offset保存到Kudu表 * @param kuduContext * @param spark * @param kafka_rdd * @param groupName * @param offsetTableName */ def saveOffset(kuduContext: KuduContext, spark:SparkSession, kafka_rdd:RDD[ConsumerRecord[String, String]], groupName:String, offsetTableName:String):Unit = { val kafkaOffsetDF = spark.sqlContext.createDataFrame( kafka_rdd.map(line => { new KafkaOffsetByKudu.KafkOffsetInfo( line.topic(), groupName, line.partition(), line.offset(), System.currentTimeMillis() ) }) ) kuduContext.upsertRows(kafkaOffsetDF, offsetTableName) } }
(可左右滑动)
5.使用mvn命令编译工程,注意由于是scala工程编译时mvn命令要加scala:compile
mvn clean scala:compile package
(可左右滑动)
6.将编译好的spark2-demo-1.0-SNAPSHOT.jar包上传至服务器
将Spark2应用的配置文件放在conf目录下,内容如下:
0295.properties配置文件内容如下:
4.Kafka Offset测试
初次运行在Kudu表未创建的情况下。
1.在没有运行SparkStreaming的情况下,先向指定的kafka_kudu_topic发送消息
运行两次脚本向Topic中发送1200条消息。
2.启动SparkStreaming作业,查看作业接收到的消息
spark2-submit --class com.cloudera.streaming.nokerberos.KafkaOffsetTest \ --master yarn-client \ --driver-cores 1 \ --driver-memory 2g \ --executor-cores 1 \ --executor-memory 1g \ spark2-demo-1.0-SNAPSHOT.jar
(可左右滑动)
3.启动SparkStreaming作业,查看作业接收到的消息,可以看到Spark处理的消息数量为1200条
4.停掉Spark作业,然后使用脚本再次向Kafka发送600条消息
5.再次启动Spark作业查看,可以看到Spark作业只处理了600条数据
6.通过Hue创建Kudu的表
查看kafka_offset_tb表数据
可以看到kafka_offset_tb表中kafka_kudu_topic_offset_3的Topic每个Partition的offset_id,每个Offset从0开始。
5.总结
1.SparkSteaming默认的checkpoint存储Kafka的Offset信息,因此这里使用自己维护的方式将Kafka的Offset存储到Kudu表中。
2.Kudu表的设计方式以topic、group和partition做为表的Key。
3.在Kafka的Topic增加了Partition后,则需要重启SparkStreaming作业,否则作业无法知道Topic增加了Partition。
提示:代码块部分可以左右滑动查看噢
为天地立心,为生民立命,为往圣继绝学,为万世开太平。
温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。
推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。
原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操
猜你喜欢
- 2024-10-16 10分钟大数据Hadoop基础入门「值得收藏」
- 2024-10-16 0288-如何使用Flume采集Kafka数据写入HBase
- 2024-10-16 0032-如何在CDH启用Kerberos的情况下安装及使用Sentry(二)
- 2024-10-16 使用ClouderaManager的优点、产生背景和应用场景
- 2024-10-16 大数据之Impala入门实战,看这篇就够了
- 2024-10-16 0732-Cloudera Data Center7.0今天正式GA
- 2024-10-16 清华大佬学习经验:Hadoop教程(一) Hadoop入门教程
- 2024-10-16 你了解过Cloudera的整体架构吗?4.89G学习视频带你走进它的世界
- 2024-10-16 大数据Hadoop之——基于Hive的内存型SQL查询引擎Impala
- 2024-10-16 Hive架构及搭建方式(hive的搭建)
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- powershellfor (55)
- messagesource (56)
- aspose.pdf破解版 (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- vue数组concat (56)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)