水平扩展、高可用、海量数据堆积、单机TPS过10万、毫秒级投递
并发消息、串行消息、广播消息、延迟消息、事务消费、失败重试、超时控制
XXL-MQ是一款轻量级分布式消息队列,拥有 “水平扩展、高可用、海量数据堆积、单机TPS过10万、毫秒级投递” 等特性,
支持 “并发消息、串行消息、广播消息、延迟消息、事务消费、失败重试、超时控制” 等消息特性。现已开放源代码,开箱即用。
于2015年中,我在github上创建XXL-MQ项目仓库并提交第一个commit,随之进行系统结构设计,UI选型,交互设计……
至今,XXL-MQ已接入多家公司的线上产品线,截止2016-09-18为止,XXL-MQ已接入的公司包括不限于:
- 1、农信互联
- ……
更多接入的公司,欢迎在 登记地址 登记,登记仅仅为了产品推广。
欢迎大家的关注和使用,XXL-MQ也将拥抱变化,持续发展。
目前流行的ActiveMQ、RabbitMQ和ZeroMQ等消息队列的软件中,大多为了实现AMQP,STOMP,XMPP之类的协议,变得极其重量级(如新版本Activemq建议分配内存达1G+),但在很多Web应用中的实际情况是:我们只是想找到一个缓解高并发请求的解决方案,一个轻量级的消息队列实现方式才是我们真正需要的。
源码仓库地址 | Release Download |
---|---|
https://github.com/xuxueli/xxl-mq | Download |
https://gitee.com/xuxueli0323/xxl-mq | Download |
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-mq-client</artifactId>
<version>{最新Release版本}</version>
</dependency>
请下载项目源码并解压,获取 “消息中心数据库初始化SQL脚本” 并执行即可
“消息中心数据库初始化SQL脚本” 位置为:
/xxl-mq/doc/db/xxl-mq-mysql.sql
消息中心支持集群部署,集群情况下各节点务必连接同一个mysql实例;
注意:消息中心数据库,原生兼容支持 “MySQL、TIDB” 两种存储方式,前者支持千万级消息堆积,后者支持百亿级别消息堆积(TIDB理论上无上限);
可视情况选择使用,当选择TIDB时,仅需要修改消息中心数据库连接jdbc地址配置即可,其他部分如SQL和驱动兼容MySQL和TIDB使用,不需修改。
解压源码,按照maven格式将源码导入IDE, 使用maven进行编译即可,源码结构如下:
- /xxl-mq-admin :消息中心,提供消息Broker、服务注册、消息在线管理功能;
- /xxl-mq-client :客户端核心依赖, 提供API开发Producer和Consumer;
- /xxl-mq-samples :接入项目参考示例, 可自行参考学习并使用;
- /xxl-mq-samples-frameless :无框架示例项目,不依赖第三方框架,只需main方法即可启动运行;
- /xxl-mq-samples-springboot :springboot版本示例项目;
消息中心配置文件地址:
/xxl-mq/xxl-mq-admin/src/main/resources/application.properties
消息中心配置内容说明:
### 数据库配置
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-mq?Unicode=true&characterEncoding=UTF-8
### 告警邮箱发送方配置
spring.mail.username=xxx@qq.com
spring.mail.password=xxx
### 注册心跳时间
xxl.mq.registry.beattime=10
### 注册信息磁盘存储目录,务必拥有读写权限;
xxl.mq.registry.data.filepath=/data/applogs/xxl-mq/registrydata
### 消息中心Broker服务RPC通讯地址,为空则自动获取
xxl-mq.rpc.remoting.ip=
### 消息中心Broker服务RPC通讯端口
xxl-mq.rpc.remoting.port=7080
### 日志保存天数,超过该阈值的成功消息将会被自动清理;大于等于3时生效
xxl.mq.log.logretentiondays=3
### 登陆信息配置
xxl.mq.login.username=admin
xxl.mq.login.password=123456
如果已经正确进行上述配置,可将项目编译打包部署。
消息中心访问地址:http://localhost:8080/xxl-mq-admin (该地址接入方项目将会使用到,作为注册地址),登录后运行界面如下图所示
至此“消息中心”项目已经部署成功。
消息中心支持集群部署,提升消息系统容灾和可用性。
消息中心集群部署时,几点要求和建议:
// Docker地址:https://hub.docker.com/r/xuxueli/xxl-mq-admin/
docker pull xuxueli/xxl-mq-admin
docker run -p 8080:8080 -v /tmp:/data/applogs --name xxl-mq-admin -d xuxueli/xxl-mq-admin
/**
* 如需自定义 mysql 等配置,可通过 "PARAMS" 指定,参数格式 RAMS="--key=value --key2=value2" ;
* 配置项参考文件:/xxl-mq/xxl-mq-admin/src/main/resources/application.properties
*/
docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-mq?Unicode=true&characterEncoding=UTF-8" -p 8080:8080 -p 7080 -v /tmp:/data/applogs --name xxl-mq-admin -d xuxueli/xxl-mq-admin
接入XXL-MQ项目:"xxl-mq-samples-springboot" (提供多种版本示例项目供参考选择,现以springboot版本为例讲解)
作用:生产消息、消费消息;可直接部署,也可以将集成到现有业务项目中。
确认pom文件中引入了 “xxl-mq-client” 的maven依赖;
消息接入方配置,配置文件地址:
/xxl-mq/xxl-mq-samples/xxl-mq-samples-springboot/src/main/resources/application.properties
消息接入方配置,配置内容说明:
# 消息中心跟地址;支持配置多个,建议域名方式配置;
xxl.mq.admin.address=http://localhost:8080/xxl-mq-admin
@Bean
public XxlMqSpringClientFactory getXxlMqConsumer(){
XxlMqSpringClientFactory xxlMqSpringClientFactory = new XxlMqSpringClientFactory();
xxlMqSpringClientFactory.setAdminAddress(adminAddress);
return xxlMqSpringClientFactory;
}
如果已经正确进行上述配置,可将项目编译打包部署。
springboot版本示例项目,访问地址:http://localhost:8081/
至此“消息接入方”示例项目已经部署结束。
消息接入方支持集群部署,提升消息系统可用性,同时提升消息处理能力。
消息接入方集群部署时,要求和建议:
/**
* 生产消息:并行消息
*/
XxlMqProducer.produce(new XxlMqMessage(topic, data));
/**
* 生产消息:串行消费( ShardingId 保持一致即可;如秒杀消息,可将 ShardingId 设置为商品ID,则该商户全部消息固定在一台机器消费;)
*/
XxlMqMessage mqMessage = new XxlMqMessage();
mqMessage.setTopic(topic);
mqMessage.setData(data);
mqMessage.setShardingId(1);
XxlMqProducer.produce(mqMessage);
/**
* 生产消息:广播消费( 消费者 IMqConsumer 注解的 group 属性修改不一致即可;一条消息将会广播给该主题全部在线 group,每个group都会消费,单个group只会消费一次; )
*/
XxlMqProducer.broadcast(new XxlMqMessage(topic, data));
/**
* 生产消息:延时消费( EffectTime 设置为固定时间点即可;如订单30min超时取消,可将 EffectTime 设置为30min后的时间点,到时将会自动消费;)
*/
XxlMqMessage mqMessage = new XxlMqMessage();
mqMessage.setTopic(topic);
mqMessage.setData(data);
mqMessage.setEffectTime(effectTime);
XxlMqProducer.produce(mqMessage);
/**
* 生产消息:失败重试消费( RetryCount 设置重试次数即可;如发送短信消息,第三方服务不稳定时失败很常见,可设置 RetryCount 为3,失败是将会自动重试指定次数;)
*/
XxlMqMessage mqMessage = new XxlMqMessage();
mqMessage.setTopic(topic);
mqMessage.setData(data);
mqMessage.setRetryCount(3);
XxlMqProducer.produce(mqMessage);
……
更多消息属性、场景,可参考章节 “4.2 Message设计”;
@MqConsumer(topic = "topic_1")
@Service
public class DemoAMqComsumer implements IMqConsumer {
private Logger logger = LoggerFactory.getLogger(DemoAMqComsumer.class);
@Override
public MqResult consume(String data) throws Exception {
logger.info("[DemoAMqComsumer] 消费一条消息:{}", data);
return MqResult.SUCCESS;
}
}
系统中每个消费者以 “IMqConsumer” 的形式存在, 规定如下:
- 1、每个 "IMqConsumer" 需要继承 "com.xxl.mq.client.consumer.IMqConsumer" 接口;
- 2、需要扫描为Spring的Bean实例, 需加上 "@Service" 注解并被Spring扫描;
- 3、需要加上注解 "com.xxl.mq.client.consumer.annotation.MqConsumer"。该注解 "value" 值为订阅的消息主题, "type" 值为消息类型(TOPIC广播消息、QUEUE并发消息队列 和 SERIAL_QUEUE串行消息队列);
更多消费者属性、场景,可参考章节 “4.6 Consumer设计”;
首选启动消息中心,然后启动 “springboot版本示例项目”;
访问部署成功的 “springboot版本示例项目” 地址,浏览器访问展示如下如下:
该示例项目已经提供了多个消息生产与消费的实例:
进入消息中心 “消息记录” 菜单,消息列表如下:
逐个查看消息流转日志如下:
可以注意 “锁定消息” 的 “消费者信息”,可以查看到当前消费者在集群中的排序 “rank”。
逐个查看每条消息对应消费者的 “rank” 属性,可以看到上面4条消息平局分配给不同 “rank” 的消费者,即平均分配给了不同消费者。测试正常;
操作步骤同 “并行消息”。最后一步逐个查看每条消息对应消费者的 “rank” 属性,会发现全部一致,即固定分配给了一个消费者。测试正常
进入消息中心 “消息记录” 菜单,消息列表如下:
一条广播消息将会广播给该主题全部在线group,该消息主题存在2个消息group,所以会每个group创建一条,即两条消息。测试正常。
进入消息中心 “消息记录” 菜单,可以查看消息 “生效时间”属性为 5min 之后,最终该消息在 5min 之后被消费执行。测定正常。
点击按钮后,页面下方展示文案 “Cost = 1055”,说明在 1055ms 之内客户端发送了 1000 条消息;
但是,由于测试代码中采用异步方式发送,消息发送事件与是否成功需要在消息中心中确认。
进入消息中心 “消息记录” 菜单,如下图,可以看到 10000 条消息创建事件最大为 “2018-12-02 04:51:54”,最小为 “2018-12-02 04:51:55”。说明在 1s 左右客户端成功发送了 10000 条消息,且 100% 投递成功,即单机TPS过万;
然后进入 “运行报表” 界面,如下图,点击成功比例图可知,成功消费 10000 条,比例 100%。说明客户端发送的 10000 条消息 100% 消费成功。
如延时消息、重试消息 …… 可自行参考示例代码测试;
运行报表界面,展示消息中心系统信息,如业务线、消息主题、消息数量等;支持日期分布图、成功比例图方式查看;
消息主题界面,可查看在线消息主题列表;底层会周期性扫描消息记录,发型并录入新的消息主题,并展示在这里;
消息主题界面,支持为消息主题设置一些附属参数,提供一些增强功能;如负责人、告警邮箱等;
消息主题属性:
消息记录界面,可查看在线消息记录;支持筛选、查看消息流转轨迹;
消息新增如下图所示,消息属性说明,可参考章节 “4.2 Message设计”;
业务先界面,可查看在线业务线列表,并管理维护;可通过自定义业务线,绑定消息主题,从而方便消息主题的分组管理;
消息核心属性 | 说明 |
---|---|
topic | 消息主题 |
group | 消息分组, 分组一致时消息仅消费一次;存在多个分组时,多个分组时【广播消费】; |
data | 消息数据 |
retryCount | 重试次数, 执行失败且大于0时生效,每重试一次减一; |
shardingId | 分片ID, 大于0时启用,否则使用消息ID;消费者通过该参数进行消息分片消费;分片ID不一致时分片【并发消费】、一致时【串行消费】; |
timeout | 超时时间,单位秒;大于0时生效,处于锁定运行状态且运行超时时,将主动标记运行失败; |
effectTime | 生效时间, new Date()立即执行, 否则在生效时间点之后开始执行; |
Broker(消息代理中心):系统核心组成模块, 负责接受消息生产者Producer推送生产的消息, 同时负责提供RPC服务供消费者Consumer使用来消费消息;
Broker支持集群部署, 集群节点之间地位平等, 集群部署情况下可大大提高系统的消息吞吐量。
Broker通过内置注册中心实现集群功能, 各节点在启动时会自动注册到注册中心, Producer或Consumer在生产消息或者消费消息时,将会通过内置注册中心自动感知到在线的Broker节点。
Broker在接收到Produce的生产消息的RPC调用时, 并不会立即存储该消息, 而是立即push到内存队列中, 同时立即响应RPC调用。 内存队列将会异步将队列中的消息数据存储到Mysql中。
Broker在接收到 “消息锁定” 等同步RPC调用时, 将会触发同步调用, 采用乐观锁方式锁定消息;
Registry Center(注册中心)主要分为两个子模块: Broker注册中心、Consumer注册中心;
Producer(消息生产者), 兼容“异步批量多线程生产”+“同步生产”两种方式,提升消息发送性能;
底层通讯全异步化:消息新增 + 消息新增接受 + 消息回调 + 消息回调接受;仅批量PULL消息与锁消息非异步;
MqConsumer注解属性 | 说明 |
---|---|
group | 消息分组;为空时自动赋值UUID多分组【广播消费】; |
topic | 消息主题 |
transaction | 事务开关,开启消息事务性保证只会成功执行一次;关闭时可能重复消费,性能较优; |
消费者通过 “多线程轮训 + 消息分片 + PULL + 消息锁定” 的方式来实现:
支持设置消息的延迟生效时间, 到达设置的生效时间时该消息才会被消费;适用于延时消费场景,如订单超时取消等;
消费者开启事务开关后,消息事务性保证只会成功执行一次;
支持设置消息的重试次数, 在消息执行失败后将会按照设置的值进行消息重试执行,直至重试次数耗尽或者执行成功;
支持自定义消息超时时间,消息消费超时将会主动中断;
消息中心数据库,原生兼容支持 “MySQL、TIDB” 两种存储方式,前者支持千万级消息堆积,后者支持百亿级别消息堆积(TIDB理论上无上限);
可视情况选择使用,当选择TIDB时,仅需要修改消息中心数据库连接jdbc地址配置即可,其他部分如SQL和驱动兼容MySQL和TIDB使用,不需修改。
欢迎参与项目贡献!比如提交PR修复一个bug,或者新建 Issue 讨论新特性或者变更。
更多接入的公司,欢迎在 登记地址 登记,登记仅仅为了产品推广。
产品开源免费,并且将持续提供免费的社区技术支持。个人或企业内部可自由的接入和使用。
无论金额多少都足够表达您这份心意,非常感谢 :) 前往捐赠