专业的编程技术博客社区

网站首页 > 博客文章 正文

Spring Data Redis Stream的使用(spring.redis)

baijin 2024-08-21 11:24:17 博客文章 11 ℃ 0 评论

一、背景

Stream类型是 redis5之后新增的类型,在这篇文章中,我们实现使用Spring boot data redis来消费Redis Stream中的数据。实现独立消费和消费组消费。

二、整合步骤

1、引入jar包

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.11.1</version>
  </dependency>
</dependencies>

主要是上方的这个包,其他的不相关的包在此处省略导入。

2、配置RedisTemplate依赖

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        // 这个地方不可使用 json 序列化,如果使用的是ObjectRecord传输对象时,可能会有问题,会出现一个 java.lang.IllegalArgumentException: Value must not be null! 错误
        redisTemplate.setHashValueSerializer(RedisSerializer.string());
        return redisTemplate;
    }
}

注意:

此处需要注意 setHashValueSerializer 的序列化的方式,具体注意事项后期再说。

3、准备一个实体对象

这个实体对象是需要发送到Stream中的对象。

@Getter
@Setter
@ToString
public class Book {
    private String title;
    private String author;
    public static Book create() {
        com.github.javafaker.Book fakerBook = Faker.instance().book();
        Book book = new Book();
        book.setTitle(fakerBook.title());
        book.setAuthor(fakerBook.author());
        return book;
    }
}

每次调用create方法时,会自动产生一个Book的对象,对象模拟数据是使用javafaker来模拟生成的。

4、编写一个常量类,配置Stream的名称

/**
 * 常量
 */
public class Cosntants {
    public static final String STREAM_KEY_001 = "stream-001";
}

5、编写一个生产者,向Stream中生产数据

1、编写一个生产者,向Stream中产生ObjectRecord类型的数据

/**
 * 消息生产者
 */
@Component
@RequiredArgsConstructor
@Slf4j
public class StreamProducer {
    private final RedisTemplate<String, Object> redisTemplate;
    public void sendRecord(String streamKey) {
        Book book = Book.create();
        log.info("产生一本书的信息:[{}]", book);
        ObjectRecord<String, Book> record = StreamRecords.newRecord()
                .in(streamKey)
                .ofObject(book)
                .withId(RecordId.autoGenerate());
        RecordId recordId = redisTemplate.opsForStream()
                .add(record);
        log.info("返回的record-id:[{}]", recordId);
    }
}

2、每隔5s就生产一个数据到Stream中

/**
 * 周期性的向流中产生消息
 */
@Component
@AllArgsConstructor
public class CycleGeneratorStreamMessageRunner implements ApplicationRunner {
    private final StreamProducer streamProducer;
    @Override
    public void run(ApplicationArguments args) {
        Executors.newSingleThreadScheduledExecutor()
                .scheduleAtFixedRate(() -> streamProducer.sendRecord(STREAM_KEY_001),
                        0, 5, TimeUnit.SECONDS);
    }
}

三、独立消费

独立消费指的是脱离消费组的直接消费Stream中的消息,是使用 xread方法读取流中的数据,流中的数据在读取后并不会被删除,还是存在的。如果多个程序同时使用xread读取,都是可以读取到消息的。

1、实现从头开始消费-xread实现

此处实现的是从Stream的第一个消息开始消费

package com.huan.study.redis.stream.consumer.xread;

import com.huan.study.redis.constan.Cosntants;
import com.huan.study.redis.entity.Book;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * 脱离消费组-直接消费Stream中的数据,可以获取到Stream中所有的消息
 */
@Component
@Slf4j
public class XreadNonBlockConsumer01 implements InitializingBean, DisposableBean {

    private ThreadPoolExecutor threadPoolExecutor;

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    private volatile boolean stop = false;

    @Override
    public void afterPropertiesSet() {
        // 初始化线程池
        threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("xread-nonblock-01");
            return thread;
        });

        StreamReadOptions streamReadOptions = StreamReadOptions.empty()
                // 如果没有数据,则阻塞1s 阻塞时间需要小于`spring.redis.timeout`配置的时间
                .block(Duration.ofMillis(1000))
                // 一直阻塞直到获取数据,可能会报超时异常
                // .block(Duration.ofMillis(0))
                // 1次获取10个数据
                .count(10);

        StringBuilder readOffset = new StringBuilder("0-0");
        threadPoolExecutor.execute(() -> {
            while (!stop) {
                // 使用xread读取数据时,需要记录下最后一次读取到offset,然后当作下次读取的offset,否则读取出来的数据会有问题
                List<ObjectRecord<String, Book>> objectRecords = redisTemplate.opsForStream()
                        .read(Book.class, streamReadOptions, StreamOffset.create(Cosntants.STREAM_KEY_001, ReadOffset.from(readOffset.toString())));

                if (CollectionUtils.isEmpty(objectRecords)) {
                    log.warn("没有获取到数据");
                    continue;
                }

                for (ObjectRecord<String, Book> objectRecord : objectRecords) {
                    log.info("获取到的数据信息 id:[{}] book:[{}]", objectRecord.getId(), objectRecord.getValue());
                    readOffset.setLength(0);
                    readOffset.append(objectRecord.getId());
                }
            }
        });
    }

    @Override

    public void destroy() throws Exception {
        stop = true;
        threadPoolExecutor.shutdown();
        threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);
    }
}

注意:

下一次读取数据时,offset 是上一次最后获取到的id的值,否则可能会出现漏数据。

2、StreamMessageListenerContainer实现独立消费

见下方的消费组消费的代码

四、消费组消费

1、实现StreamListener接口

实现这个接口的目的是为了,消费Stream中的数据。需要注意在注册时使用的是streamMessageListenerContainer.receiveAutoAck()还是streamMessageListenerContainer.receive()方法,如果是第二个,则需要手动ack,手动ack的代码:redisTemplate.opsForStream().acknowledge("key","group","recordId");

/**
 * 通过监听器异步消费
 *
 * @author huan.fu 2021/11/10 - 下午5:51
 */

@Slf4j
@Getter
@Setter
public class AsyncConsumeStreamListener implements StreamListener<String, ObjectRecord<String, Book>> {

    /**
     * 消费者类型:独立消费、消费组消费
     */
    private String consumerType;

    /**
     * 消费组
     */
    private String group;

    /**
     * 消费组中的某个消费者
     */

    private String consumerName;

    public AsyncConsumeStreamListener(String consumerType, String group, String consumerName) {
        this.consumerType = consumerType;
        this.group = group;
        this.consumerName = consumerName;
    }

    private RedisTemplate<String, Object> redisTemplate;

    @Override
    public void onMessage(ObjectRecord<String, Book> message) {
        String stream = message.getStream();
        RecordId id = message.getId();
        Book value = message.getValue();
        if (StringUtils.isBlank(group)) {
            log.info("[{}]: 接收到一个消息 stream:[{}],id:[{}],value:[{}]", consumerType, stream, id, value);
        } else {
            log.info("[{}] group:[{}] consumerName:[{}] 接收到一个消息 stream:[{}],id:[{}],value:[{}]", consumerType,
                    group, consumerName, stream, id, value);
        }

        // 当是消费组消费时,如果不是自动ack,则需要在这个地方手动ack
        // redisTemplate.opsForStream()
        //         .acknowledge("key","group","recordId");
    
}

2、获取消费或消费消息过程中错误的处理

/**
 * StreamPollTask 获取消息或对应的listener消费消息过程中发生了异常
 *
 * @author huan.fu 2021/11/11 - 下午3:44
 */

@Slf4j
public class CustomErrorHandler implements ErrorHandler {

    @Override
    public void handleError(Throwable t) {
        log.error("发生了异常", t);
    }
}

3、消费组配置

/**
 * redis stream 消费组配置
 *
 * @author huan.fu 2021/11/11 - 下午12:22
 */

@Configuration
public class RedisStreamConfiguration {

    @Resource
    private RedisConnectionFactory redisConnectionFactory;

    /**
     * 可以同时支持 独立消费 和 消费者组 消费
     * <p>
     * 可以支持动态的 增加和删除 消费者
     * <p>
     * 消费组需要预先创建出来
     *
     * @return StreamMessageListenerContainer
     */

    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer() {

        AtomicInteger index = new AtomicInteger(1);

        int processors = Runtime.getRuntime().availableProcessors();

        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Book>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多获取多少条消息
                        .batchSize(10)
                        // 运行 Stream 的 poll task
                        .executor(executor)
                        // 可以理解为 Stream Key 的序列化方式
                        .keySerializer(RedisSerializer.string())
                        // 可以理解为 Stream 后方的字段的 key 的序列化方式
                        .hashKeySerializer(RedisSerializer.string())
                        // 可以理解为 Stream 后方的字段的 value 的序列化方式
                        .hashValueSerializer(RedisSerializer.string())
                        // Stream 中没有消息时,阻塞多长时间,需要比 `spring.redis.timeout` 的时间小
                        .pollTimeout(Duration.ofSeconds(1))
                        // ObjectRecord 时,将 对象的 filed 和 value 转换成一个 Map 比如:将Book对象转换成map
                        .objectMapper(new ObjectHashMapper())
                        // 获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理
                        .errorHandler(new CustomErrorHandler())
                        // 将发送到Stream中的Record转换成ObjectRecord,转换成具体的类型是这个地方指定的类型
                        .targetType(Book.class)
                        .build();

        StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer =
                StreamMessageListenerContainer.create(redisConnectionFactory, options);

        // 独立消费
        String streamKey = Cosntants.STREAM_KEY_001;
        streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey),
                new AsyncConsumeStreamListener("独立消费", null, null));
        // 消费组A,不自动ack
        // 从消费组中没有分配给消费者的消息开始消费
        streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-a"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("消费组消费", "group-a", "consumer-a"));
        // 从消费组中没有分配给消费者的消息开始消费
        streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-b"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("消费组消费A", "group-a", "consumer-b"));

        // 消费组B,自动ack
        streamMessageListenerContainer.receiveAutoAck(Consumer.from("group-b", "consumer-a"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("消费组消费B", "group-b", "consumer-bb"));
        // 如果需要对某个消费者进行个性化配置在调用register方法的时候传递`StreamReadRequest`对象

        return streamMessageListenerContainer;
    }
}

注意:

提前建立好消费组


127.0.0.1:6379> xgroup create stream-001 group-a $

OK

127.0.0.1:6379> xgroup create stream-001 group-b $

OK

1、独有消费配置


 streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey), new AsyncConsumeStreamListener("独立消费", null, null));

不传递Consumer即可。

2、配置消费组-不自动ack消息


streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-b"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("消费组消费A", "group-a", "consumer-b"));

1、需要注意ReadOffset的取值。

2、需要注意group需要提前创建好。

3、配置消费组-自动ack消息


streamMessageListenerContainer.receiveAutoAck()

五、序列化策略




六、ReadOffset策略

消费消息时的Read Offset 策略

七、注意事项

1、读取消息的超时时间

当我们使用 StreamReadOptions.empty().block(Duration.ofMillis(1000)) 配置阻塞时间时,这个配置的阻塞时间必须要比 spring.redis.timeout配置的时间短,否则可能会报超时异常。

2、ObjectRecord反序列化错误

如果我们在读取消息时发生如下异常,那么排查思路如下:


java.lang.IllegalArgumentException: Value must not be null!
    at org.springframework.util.Assert.notNull(Assert.java:201)
    at org.springframework.data.redis.connection.stream.Record.of(Record.java:81)
    at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147)
    at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:138)
    at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecords(StreamObjectMapper.java:164)
    at org.springframework.data.redis.core.StreamOperations.map(StreamOperations.java:594)
    at org.springframework.data.redis.core.StreamOperations.read(StreamOperations.java:413)
    at com.huan.study.redis.stream.consumer.xread.XreadNonBlockConsumer02.lambda$afterPropertiesSet$1(XreadNonBlockConsumer02.java:61)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

1、检测 RedisTemplate的HashValueSerializer的序列化方式,最好不要使用json可以使用RedisSerializer.string()。

2、检查redisTemplate.opsForStream()中配置的HashMapper,默认是ObjectHashMapper这个是把对象字段和值序列化成byte[]格式。

提供一个可用的配置


# RedisTemplate的hash value 使用string类型的序列化方式
redisTemplate.setHashValueSerializer(RedisSerializer.string());

# 这个方法opsForStream()里面使用默认的ObjectHashMapper
redisTemplate.opsForStream()

关于上面的这个错误,我在Spring Data Redis的官方仓库提了一个 issue,得到官方的回复是,这是一个bug,后期会修复的。

3、使用xread顺序读取数据漏数据

如果我们使用xread读取数据发现有些数据漏掉了,这个时候我们需要检查第二次读取时配置的StreamOffset是否合法,这个值需要是上一次读取的最后一个值。

举例说明:

1、SteamOffset传递的是 $ 表示读取最新的一个数据。

2、处理上一步读取到的数据,此时另外的生产者又向Stream中插入了几个数据,这个时候读取到的数据还没有处理完。

3、再次读取Stream中的数据,还是传递的$,那么表示还是读取最新的数据。那么在上一步流入到Stream中的数据,这个消费者就读取不到了,因为它读取的是最新的数据。

4、StreamMessageListenerContainer的使用

1、可以动态的添加和删除消费者

2、可以进行消费组消费

3、可以直接独立消费

4、如果传输ObjectRecord的时候,需要注意一下序列化方式。参考上面的代码。

八、完整代码

https://gitee.com/huan1993/spring-cloud-parent/tree/master/redis/redis-stream

九、参考文档

1、https://docs.spring.io/spring-data/redis/docs/2.5.5/reference/html/#redis.streams

2、https://github.com/spring-projects/spring-data-redis/issues/2198

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表