基于MySQL 8的特性实现拉模型的消息队列

概述

mysql 8 新特性

MySQL 8增加了 Locking Read Concurrency with NOWAIT and SKIP LOCKED 的实现
官方文档:mysql8 15.7.2.4 Locking Reads

消息队列

  普通的消息队列就是由生产者与消费者组成,多数场景下数据都会先落库然后将消息发送到消息队列,等待消费者进行消费。消费者在消费的时候往往需要保证执行的成功性,如果失败则需要进行n次重试。在某些场景下还需要事务消息的存在,需要保证消息与数据库写入同时成功。
  在一些小型项目内完全没有必要引入体量巨大的消息队列中间件,但多数项目都有数据库的依赖。所以直接使用mysql进行消息队列实现可以简化某些项目的体量。

实现

原理

使用单表记录存放消息
结构:

字段 类型 用途
id int(11) 主键
topic varchar(64) 主题
key varchar(64) 消息唯一性
status int(11) 消费状态
retry int(11) 剩余重试次数
start_time bigint(20) 任务最小开始执行的时间
exception varchar(11) 错误内容
msg varchar(256) 消息主体

建立唯一索引1(topic, key, retry),建立普通索引2(topic, status, start_time)
状态分为:0(未消费),1(消费失败),2(消费成功)

处理逻辑

查找可以消费的消息

先开启事务,查找可以消费的消息,指定Topic,指定状态是待消费,开始时间小于等于当前时间

1
SELECT * FROM task WHERE topic='topic1' status=0 AND start_time<=1608212942 LIMIT 1 FOR UPDATE SKIP LOCKED;

消费成功

更新状态为3,表示消费成功

1
UPDATE task SET status=3 WHERE id=1000;

消费失败

更新状态为2,表示当前消息消费失败
检查当前retry是否大于0,若大于0,则插入一条新的消息,状态为0,retry-1

1
UPDATE task SET status=2 WHERE id=1000;

异常退出

若消费者在消费期间异常退出,则事务会被rollback,那么最多当前消费行为被当做无效行为,但可以保证会被下一个消费者重试,但无法保证重试次数是有效的。但异常退出本身就是少数情况,正常情况下的代码逻辑异常都会被捕获,不会导致该情况发生。

注意点

由于是通过事务来进行的行锁,所以在逻辑调用的时候要防止内部逻辑中有数据库操作导致的关联rollback,最好开启多个session进行数据库操作

性能问题

消息拉取的SQL会走索引2,所以在mysql8的innodb引擎中,会命中行锁,所以不会产生并发问题,但由于整个消费期间事务不会被释放,所以会有大量的事务和链接保持,会数据库有一定的性能影响,具体影响有多少未进行测试。这个设计也仅仅是一个轻量级的拉模式消息队列实现,至少保证了事务一定会被消费成功。
后续有时间会进行简单的性能测试。

额外

基于这个表还可以实现消费结果的存放