专业的编程技术博客社区

网站首页 > 博客文章 正文

面试题之kafka:如何提高整体的消费能力

baijin 2024-12-11 10:29:24 博客文章 6 ℃ 0 评论

生产者生产消息的速度大于消费者的消费速度如何解决?kafka有消息保留机制,有些消息在消息被消费之前就有可能被清理掉了,从而导致消息的丢失。

当生产者发送消息的速度大于消费者消费消息的速度时,可以采用多线程的方式实现消息的消费,提高整体的消费能力。多线程的实现方式有多种:

第一种也是最常见的方式:==线程封闭==,即为每个线程实例化一个kafkaconsumer对象,这种线程称为消费线程。每个消费线程可以消费一个或多个分区的消息。这种实现方式的并发度受限于分区的实际个数,当==消费线程的个数==大于分区时,就会有部分消费线程一直处于空闲状态。==优点==是可以顺序地消费各个分区的消息。==缺点==也很明显,每个消费线程都要维护一个独立的tcp连接,如果分区数和消费线程的值都比较大,那么会造成不小的系统开销。

第二种与第一种对应,==多个消费线程同时消费一个分区==,这个通过assign(),seek()方法实现。==优点==是可以打破原有的消费线程个数不能超过分区数的限制,进一步提高消费能力。但这种方式对于==位移提交和顺序控制==的处理就会变得异常复杂,实际使用极少。

第三种:==利用多线程提高消息处理的能力==。因为如果消息处理得非常快,那么poll()拉取的频次也会更高,进而提高整体消费的性能。一般而言,poll()拉取消息的速度是相当快的,而整体消费的瓶颈也正在消息的逻辑处理上。所以将处理消息模块改为多线程的实现方式,能带动整体消费性能的提升。

  • 注意kafkaConsumerThread(消费线程)可以横向扩展来提高整体的消费能力。消费线程中线程池(ThreadPoolExecutor)的最后一个参数拒绝策略设置为callerRunsPolicy,可以有效的防止==消费能力不及==poll()拉取能力时导致的异常。
  • 缺点就是对于==消息的顺序处理==比较困难。这里引入一个共享的offsets来参与提交(Map\<TopicPatition, OffsetAndMetadata\> offsets) ,每一个消息处理完之后都会把位移保存到共享变量offsets中,kafkaConsumerThread在每一次poll()方法之后都读取Offsets中的内容并进行位称提交。
  • 写入offsets时问题:==如何解决并发问题?==注意在实现过程中需要对offsets读写需要进行加锁处理,防止出现并发问题。
for(TopicPartition tp : records.partitions()) {
      List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
      long lastConsumedOffset = tpRecords.get(tpRecords.size() - 1).offset();
      sychronized(offsets) {
        if(!offsets.containsKey(tp)) {
          offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
        } else {
        	long position = offsets.get(tp).offset();
          if(position < lastConsumeOffset - 1) {
            offsets.put(tp, new OffsetMetaData(lastConsumedOffset + 1)) {     
          }
        }
      }
    }
  • 如何解决位移覆盖的问题?
    synchronized(offsets) {
      if(!offsets.isEmpty()) {
        kafkaConsumer.commitAndSync(offsets);
        offsets.clear();
      }
    }


  • 这种移位提交的方式会有数据丢失的风险,什么情况下产生,如何解决?(扩展 滑动窗口解决方案//---todo)


本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表