Redis 除了做数据缓存,做 NoSQL 数据库,也可以当做轻量级消息队列使用,并且提供了基于 List 实现的、基于 Pub/Sub 机制的订阅/发布模式、基于 sorted set 的实现和基于 Stream 类型的实现几种实现方式。其中 List 实现的分非阻塞和阻塞方式,Stream 则是 Redis 5 加入的消息队列。

之前代码已经写过了,只是工程整合搞得比较复杂,所以这里算是写份注释文档。

关联代码地址lin/lin-redis at master · zgshen/lin

使用 List 类型实现

List 就是列表数据结构,用来做消息队列这是最简单直观的了,也是典型的点对点消息模型,先看下 Redis 列表提供的操作命令。

push 压入:

  • LPUSH key value1 [value2 ...] 将一个或多个值插入到列表头部
  • RPUSH key value1 [value2 ...] 将一个或多个值插入到列表尾部

pop 弹出:

  • LPOP key 移除并获取列表的第一个元素
  • RPOP key 移除并获取列表的最后一个元素

阻塞弹出;:

  • BLPOP key1 [key2 ...] timeout 移除并获取列表的第一个元素,若列表为空则阻塞等待
  • BRPOP key1 [key2 ...] timeout 移除并获取列表的最后一个元素,若列表为空则阻塞等待

压入和弹出前面的 L 和 R 表示从队列左端和右端压入和弹出,阻塞弹出的 B 代表就是 blocking 的意思。

使用队列一般遵循先进先出,所以要么左近右出,要么右近左出,框架提供的 RedisTemplate 封装了 Redis 的操作命令,push 和 pop 直接调用就行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Autowired
private RedisTemplate redisTemplate;

//左进
public Long push(String... params) {
Long aLong = redisTemplate.opsForList().leftPushAll(LIST_PUSH_POP_MSG, params);
return aLong;
}

//右出,轮询检测
public String pop() {
String str = redisTemplate.opsForList().rightPop(LIST_PUSH_POP_MSG).toString();
return str;
}

再看下堵塞弹出的异步操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void blockingConsume() {
List<Object> obj = redisTemplate.executePipelined(new RedisCallback<Object>() {
// @Nullable
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
//队列没有元素会阻塞操作,直到队列获取新的元素或超时
//return connection.bRPop(PUB_SUB_TIME_OUT, LIST_PUSH_POP_MSG.getBytes());
return connection.bLPop(PUB_SUB_TIME_OUT, LIST_PUSH_POP_MSG.getBytes());
}
}, new StringRedisSerializer());
for (Object str : obj) {
log.info("blockingConsume : {}", str);
}
}

此外 Redis 还有两个命令 RPOPLPUSH、BRPOPLPUSH(阻塞)可以从一队列获取队列并且写入另一个队列,可以用于简单保证消息可靠性,业务成功处理后再移除另一队列的消息,如果业务处理失败又可以从另一队列恢复。

1
2
3
4
5
6
7
8
9
10
11
12
13
public String rightPopLeftPush() {
String str;
try {
str = redisTemplate.opsForList().rightPopAndLeftPush(LIST_PUSH_POP_MSG, LIST_PUSH_POP_BACKUP_MSG).toString();
// 其他业务,处理失败了还能在 LIST_PUSH_POP_BACKUP_MSG 队列找到备份
} catch (Exception e) {
log.error("业务异常:{}", e.getMessage());
throw new RuntimeException(e);
}
// 先进先出业务完毕出栈,让异常的消息留在队列里
redisTemplate.opsForList().leftPop(LIST_PUSH_POP_BACKUP_MSG);
return str;
}

使用 Sorted Set 实现

Sorted Set 是有序集合,元素唯一不可重复,元素按照 score 值升序排列,支持范围操作,所以适合做简单的延迟消息队列。

添加元素:

  • ZADD key score member [score member ...] 向有序集合中加入一个或多个成员,或更新已存在成员的分数

获取元素:

  • ZRANGE key start stop [WITHSCORES] 按位置范围遍历集合,可附加分数
  • ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT...] 按分数范围遍历集合

以下是简单的生产和消费程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @param businessId 业务 id(如订单 id 等)
* @param expiredTime 延时时间,单位秒
*/
public void produce(String businessId, long expiredTime) {
redisTemplate.opsForZSet().add(MsgConstant.SORTED_SET_MSG, businessId, System.currentTimeMillis() + expiredTime * 1000);
}

/**
* 简单的消费程序
* 死循环,仅做测试
*/
public void consume() {
while (true) {
//(K key, double min, double max, long offset, long count)
//键,要取区间score最小值,要取区间score最大值,偏移(从哪个位置开始),数量
Set<String> set = redisTemplate.opsForZSet().rangeByScore(MsgConstant.SORTED_SET_MSG, 0, System.currentTimeMillis(), 0, 1);
if (set == null || set.isEmpty()) continue;
log.info(set.toString());
String next = set.iterator().next();
Long remove = redisTemplate.opsForZSet().remove(MsgConstant.SORTED_SET_MSG, next);
if (remove > 0) log.info("{} remove success.", next);
}
}

使用 Pub/Sub 订阅发布模式

发布者把消息发到某个频道,订阅改频道的所有消费者都会收到消息,即消息多播,并且订阅支持模糊匹配频道。这种方式就是常规的消费者-消费者模型,不过与典型的 MQ 还是有区别,Pub/Sub 订阅发布更像是个广播,不能并发消费,不支持持久化,也没有 ACK 确认。

发布命令:

  • PUBLISH channel message : 将消息 message 发布到指定的频道 channel

订阅命令:

  • SUBSCRIBE channel [channel ...] : 订阅一个或多个频道
  • PSUBSCRIBE pattern [pattern ...] : 订阅一个或多个模式,用于模糊匹配频道

Spring 工程的配置类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter adapter, MessageListenerAdapter adapter1) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//主题的监听,adapter 和 adapter1 对应下面两个 bean 实例,有多少
container.addMessageListener(adapter, new PatternTopic(PUB_SUB_MSG));//普通的订阅者
container.addMessageListener(adapter1, new PatternTopic(PUB_SUB_MSG_FUZZY));//模糊匹配的订阅者
return container;
}

/**
* 多个订阅
* @param message
* @return
*/
@Bean
public MessageListenerAdapter adapter(MessageSubscribe message){
// MessageSubscribe 的 onMessage 监听获取订阅数据
return new MessageListenerAdapter(message, "onMessage");
}

@Bean
public MessageListenerAdapter adapter1(MessageSubscribe1 message){
// MessageSubscribe1 的 onMessage
return new MessageListenerAdapter(message, "onMessage");
}

订阅者类:

1
2
3
4
5
6
7
8
9
10
@Slf4j
@Component
public class MessageSubscribe implements MessageListener {

@Override
public void onMessage(Message message, byte[] bytes) {
log.info("sub, topic name: {}, message: {}", new String(bytes), new String(message.getBody()));
}

}

发布者类:

1
2
3
4
5
6
7
8
9
10
11
@Service
public class MessagePublish {

@Autowired
StringRedisTemplate redisTemplate;

public void publish(String channel, String msg) {
redisTemplate.convertAndSend(channel, msg);
}

}

使用 Stream

Redis 5.0 新增了 Stream 的数据结构,与 Pub/Sub 订阅发布模式相比,Redis Stream 提供了消息的持久化和主备复制功能。

添加消息:

1
XADD key ID field value [field value ...] 

其中ID,消息id,可使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性

读取消息:

1
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

milliseconds 设置堵塞秒数,没设置就是非阻塞模式。

创建消费者组:

1
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

key 队列名,不存在就创建;groupname 组名;$ 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

1
2
3
4
5
# 从头开始消费
XGROUP CREATE mystream consumer-group-name 0-0

# 从尾部开始消费
XGROUP CREATE mystream consumer-group-name $

读取消费者组中的消息:

1
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

group 消费组名;consumer 消费者名;count 读取数量;milliseconds 阻塞毫秒数;key 队列名;ID 消息 ID。

例子:

1
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >

看下在 Spring Boot 中的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* 启动初始化配置,注册 listener
*/
@Slf4j
@Component
public class RedisStreamRunner implements ApplicationRunner, DisposableBean {

private StreamMessageListenerContainer<String, MapRecord<String, String, String>> container;
private final ThreadPoolTaskExecutor executor;
private final RedisConnectionFactory redisConnectionFactory;
private final StringRedisTemplate stringRedisTemplate;

public RedisStreamRunner(ThreadPoolTaskExecutor executor, RedisConnectionFactory redisConnectionFactory, StringRedisTemplate stringRedisTemplate) {
this.executor = executor;
this.redisConnectionFactory = redisConnectionFactory;
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public void run(ApplicationArguments args) throws Exception {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.batchSize(10)// 一次性最多拉取多少条消息
.executor(executor)// 执行消息轮询的执行器
.pollTimeout(Duration.ZERO)// 超时时间,设置为0,表示不超时(超时后会抛出异常)
.build();

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
StreamMessageListenerContainer.create(redisConnectionFactory, options);

initStreamAndGroup(stringRedisTemplate.opsForStream(), STREAM_KEY, STREAM_GROUP);
// receive 方法内部 autoAcknowledge 为 false,需要手动 ack 的
container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER), //消费组和消费者,这里只演示一个消费者
StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),//读取 id 大于消费者组最后消费的所有新到达元素
new TestStreamListener(stringRedisTemplate));//消费消息,业务处理

this.container = container;
this.container.start();
}

/**
* 消费组,不存在则创建
*/
private void initStreamAndGroup(StreamOperations<String, ?, ?> ops, String streamKey, String group) {
String status = "OK";
try {
StreamInfo.XInfoGroups groups = ops.groups(streamKey);
if (groups.stream().noneMatch(xInfoGroup -> group.equals(xInfoGroup.groupName()))) {
status = ops.createGroup(streamKey, group);
}
} catch (Exception exception) {
RecordId initialRecord = ops.add(ObjectRecord.create(streamKey, "Initial Record"));
Assert.notNull(initialRecord, "Cannot initialize stream with key '" + streamKey + "'");
status = ops.createGroup(streamKey, ReadOffset.from(initialRecord), group);
} finally {
Assert.isTrue("OK".equals(status), "Cannot create group with name '" + group + "'");
}
}

@Override
public void destroy() {
this.container.stop();
}

}

TestStreamListener 处理消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j
public class TestStreamListener implements StreamListener<String, MapRecord<String, String, String>> {

StringRedisTemplate redisTemplate;

public TestStreamListener(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}

@Override
public void onMessage(MapRecord<String, String, String> message) {

log.info("MessageId: " + message.getId());
log.info("Stream: " + message.getStream());
log.info("Body: " + message.getValue());
//记得手动确认
redisTemplate.opsForStream().acknowledge(STREAM_GROUP, message);
}
}

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
@Service
public class TestStreamProducer {

@Autowired
StringRedisTemplate redisTemplate;

//发送流信息
public void add(String streamKey, String msg) {
redisTemplate.opsForStream().add(Record.of(msg).withStreamKey(streamKey));
}

}

参考