- 引入依赖
- 发送消息
- 消费消息
- 消息的可靠性(不丢失、高可用)
- 延时消息
- 消息过滤
- 顺序消息
- 全局顺序消息
- 部分顺序消息
- 消息积压
- 消息重复(保证消息的幂等性)
- 消息 重试
- 事务消息
- 问题
- MQ集群挂掉
org.springframework.boot spring-boot-starter-web org.apache.rocketmq rocketmq-client 4.9.3
发送消息// 同步发送
SendResult sendResult = producer.send(msg);
// 异步发送,指定回调
producer.send(msg, new SendCallback() {// 当producer接收到MQ发送来的ACK后就会触发该回调方法的执行
@Override
public void onSuccess(SendResult sendResult) {System.out.println(sendResult);
}
@Override
public void onException(Throwable e) {e.printStackTrace();
}
});
// 消息发送的状态
public enum SendStatus {SEND_OK, // 发送成功
FLUSH_DISK_TIMEOUT, // 刷盘超时。当Broker设置的刷盘策略为同步刷盘时才可能出现这种异常状态。异步刷盘不会出现
FLUSH_SLAVE_TIMEOUT, // Slave同步超时。当Broker集群设置的Master-Slave的复制方式为同步复制时才可能出现这种异常状态。异步复制不会出现
SLAVE_NOT_AVAILABLE, // 没有可用的Slave。当Broker集群设置为Master-Slave的复制方式为同步复制时才可能出现这种异常状态。异步复制不会出现
}
消费消息DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer ("cg");
consumer.setNamesrvAddr ("rocketmqOS:9876");
// 指定 从第一条消息开始消费
consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe ("someTopicA", "*");
// 指定 每次可以 消费10条消息,默认为1
consumer.setConsumeMessageBatchMaxSize (10);
// 指定 每次 可以从Broker拉取40条消息, 默认为32
consumer.setPullBatchSize (40);
consumer.registerMessageListener (new MessageListenerConcurrently () {@Override
public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println (msg);
}
// 消费成功的返回结果
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// 消费异常时的返回结果
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start ();
消息的可靠性(不丢失、高可用)消息可能在哪些阶段丢失呢?可能会在这三个阶段发生丢失:生产阶段、存储阶段、消费阶段
生产阶段:事务消息,写到OS Cache就返回 成功发送!
存储阶段:调整 MQ的刷盘策略,我们需要调整broker.conf配置文件,将其中的flushDiskType配置设置为:SYNC_FLUSH,默认他的值是ASYNC_FLUSH,即默认是异步刷盘
消费阶段
不能 异步 消费消息
// 开启监听,用于接收消息
consumer.registerMessageListener (new MessageListenerConcurrently () {@Override
public ConsumeConcurrentlyStatus consumeMessage(Listlist, ConsumeConcurrentlyContext consumeConcurrentlyContext) {// 业务 处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动接收消息的服务
consumer.start ();
也可以 使用如下结构:
目前RocketMQ支持的延时级别:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
生产者:
// 这是 订单系统的生产者
DefaultMQProducer producer = new DefaultMQProducer ("OrderGroup");
// 启动生产者
producer.start ();
Message msg = new Message (
"CreateOrderInformTopic",
"create success".getBytes ());
// 设置消息为延时消息,延迟级别为 16
msg.setDelayTimeLevel (16);
producer.send (msg);
消费者:
// 订单扫描服务 的消费者
DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer ("OrderScanServiceGroup");
// 订阅 订单创建 通知Topic
consumer.subscribe ("CreateOrderInformTopic", "*");
// 注册 消息监听者
consumer.registerMessageListener (new MessageListenerConcurrently () {@Override
public ConsumeConcurrentlyStatus consumeMessage(Listlist,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {// 将 msg转为 订单对象
Order order = buildOrder (msg);
// 根据Id 查询数据库
order = getById (order.getId ());
// 如果 已支付
if (order.getStatus () == 1) {}
// 如果 未支付
else {}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
消息过滤- 根据Tag过滤
// 创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("aGroup");
// 订阅主题,tag为 a,b,c的都监听
consumer.subscribe ("orderInfoTopic", "a || b || c");
- SQL 表达式过滤
修改broker.conf 配置文件才能生效
// 创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("bGroup");
// 只有订阅的消息有这个属性a, a >=0 and a<= 3
consumer.subscribe ("TopicTest", MessageSelector.bySql ("a between 0 and 3"));
consumer.registerMessageListener (new MessageListenerConcurrently () {@Override
public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) {// 处理业务逻辑
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start ();
RocketMQ还是 支持比较丰富的数据过滤语法的,如下所示:
(1)数值比较,比如:>,>=,<,<=,BETWEEN,=;
(2)字符比较,比如:=,<>,IN;
(3)IS NULL 或者 IS NOT NULL;
(4)逻辑符号 AND,OR,NOT;
(5)数值,比如:123,3.1415;
(6)字符,比如:‘abc’,必须用单引号包裹起来;
(7)NULL,特殊的常量
(8)布尔值,TRUE 或 FALSE
顺序消息是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序,比如订单的生成、付款、发货,这个消息必须按顺序处理才行。
「消息乱序解决方案 不能和重试队列混用。」
全局顺序消息创建一个 Topic ,默认八个写队列,八个读队列
要保证全局顺序消息, 需要先把 Topic 的读写队列数设置为 一,然后Producer Consumer 的并发设置,也要是一
部分顺序消息部分顺序消息相对比较好实现
,生产端需要做到把同 ID 的消息发送到同一个 Message Queue ;在消费过程中,要做到从同一个Message Queue读取的消息顺序处理
DefaultMQProducer producer = new DefaultMQProducer ();
producer.setSendLatencyFaultEnable (true);
producer.setSendMsgTimeout (50);
producer.setNamesrvAddr ("192.168.111.101:9876");
producer.start ();
Message message = new Message ("Topic", "hello world".getBytes ());
SendResult sendResult = producer.send (message, new MessageQueueSelector () {@Override
public MessageQueue select(Listmqs,
Message message,
Object arg) {Long orderId = (Long) arg; // 根据 订单id选择发送 queue
long index = orderId % mqs.size (); // 用订单id 对MessageQueue数量取模
return mqs.get ((int) index); // 返回一个MessageQueue
}
}, orderId //这里传入订单id
);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("cg");
consumer.setNamesrvAddr ("192.168.111.101:9876");
// 从头 开始消费
consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe ("Topic", "*");
consumer.registerMessageListener (new MessageListenerOrderly () {@Override
public ConsumeOrderlyStatus consumeMessage(Listmsgs,
ConsumeOrderlyContext context) { context.setAutoCommit (true);
try { for (MessageExt msg : msgs) { // 对有序的消息 进行处理
}
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) { // 如果 消息处理 有问题
// 让这批消息 暂停一会儿 再继续处理这批消息
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
consumer.start ();
消息积压发生了消息积压,这时候就得想办法赶紧把积压的消息消费完,就得考虑提高消费能力,一般有两种办法
- 消费者扩容:如果当前Topic的Message Queue的数量大于消费者数量,就可以对消费者进行扩容,增加消费者,来提高消费能力,尽快把积压的消息消费玩。
- 消息迁移Queue扩容:如果当前Topic的Message Queue的数量小于或者等于消费者数量,这种情况,再扩容消费者就没什么用,就得考虑扩容Message Queue。可以新建一个临时的Topic,临时的Topic多设置一些Message Queue,然后先用一些消费者把消费的数据丢到临时的Topic,因为不用业务处理,只是转发一下消息,还是很快的。接下来用扩容的消费者去消费新的Topic里的数据,消费完了之后,恢复原状
处理消息重复问题,主要有业务端自己保证,主要的方式有两种:业务去重 和 消息去重。
业务去重:保证 业务消费的幂等性即可
消息去重:依据在 生产者方设置消息的 messageKey,然后 每一条消息 在消费方依据这个唯一的 messageKey,进行幂等判断
流程:** insert 失败,说明已经消费---->捕获异常返回已消费,insert 成功---->处理业务 提交事务后 再确认成功消费~~ **
// 设置 同步发送失败时 重试发送的次数,默认为2次
producer.setRetryTimesWhenSendFailed(3);
// 设置 发送 超时时限为5s,默认3s
producer.setSendMsgTimeout(5000);
// 指定 异步发送失败后 不进行重试发送
producer.setRetryTimesWhenSendAsyncFailed(0);
// 顺序消息消费失败的消费重试时间间隔,单位毫秒,默认为1000,其取值范围为[10,30000]
// 重试期间 应用会出现 消息消费被阻塞
consumer.setSuspendCurrentQueueTimeMillis(100);
// 修改消费重试次数,默认16次
consumer.setMaxReconsumeTimes(10);
消息 刷盘失败策略
消息刷盘超时(Master或Slave)或slave不可用(slave在做数据同步时向master返回状态不是SEND_OK)时,默认是不会将消息尝试发送到其他Broker的。不过,对于重要消息可以通过在Broker的配置文件设置retryAnotherBrokerWhenNotStoreOK
属性为true
来开启
重试队列里面的消息会再次发给消费组,默认 最多重试 16 次,如果重试 16 次失败则进入「死信队列」
「死信队列:」
对于死信队列,一般我们可以专门开一个后台线程,订阅这个死信队列,对死信队列中的消息,一直不停的尝试。
消息量比较大,不建议同步,影响消息消费速度,造成消息堆积
消费者处理失败后,立刻写入重试表,有个 定时任务专门重试
消费失败,自己给同一个topic发一条消息—>对消息顺序要求不高的场景可以使用
事务消息原理
Rocketmq 未收到rollback、commit也会 补偿回调,MQ也会有补偿机制 :checkLocalTransaction方法
让我们自己处理
TransactionListenerImpl 事务监听器
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class TransactionListenerImpl implements TransactionListener {// 发送成功给 broker后,可以 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {// 执行 订单本地事务
try {// 如果 本地事务都执行成功了,返回 commit
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {// 本地事务 执行失败,回滚 本地事务
// 更新 broker中的消息状态为 删除
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 因为超时 等原因,没有返回 commit或者 rollback
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {// 这里默认是 回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
生产者:
// 接受 RocketMQ 回调的一个 监听器接口
// 会执行 订单本地事务,commit、rollback,回调查询 等逻辑
TransactionListenerImpl transactionListener = new TransactionListenerImpl ();
TransactionMQProducer producer = new TransactionMQProducer ();
ThreadPoolExecutor executorService = new ThreadPoolExecutor (
2,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000)
);
// 设置 对应的线程池,负责执行 回调请求
producer.setExecutorService (executorService);
// 开启 容错机制
producer.setSendLatencyFaultEnable (true);
// 设置 事务监听器
producer.setTransactionListener (transactionListener);
// 设置 发送失败时,最多重试几次
producer.setRetryTimesWhenSendFailed (2);
// 构建 消息体
Message message = new Message (
"PayOrderSuccessTopic",
"Tag",
"MyKey",
"Pay Success".getBytes ());
// 可以 查询发送结果
SendResult sendResult = producer.send (message, 10);
问题
MQ集群挂掉降级方案通常的思路:
发送消息到MQ代码里去try catch捕获异常,如果你发现发送消息到MQ有异常,此时你需要进行重试
重试了,比如超过3次还是失败,说明此时可能就是你的MQ集群彻底崩溃了
此时你必须把这条重要的消息写入到本地存储中去,可以是写入数据库里
要不停的尝试发送消息到MQ去
发现MQ集群恢复了,你 必须有一个后台线程可以 把之前持久化存储的消息都查询出来,然后依次 按照顺序 发送到MQ集群里去
这里要有一个很关键的注意点,就是你把消息写入存储中 暂存时,一定要保证他的顺序
,比如按照顺序一条一条的写入本地磁盘文件去
暂存消息
**流量 太多:**解决⽅法就是 对线上系统扩容双段缓冲的⼤⼩,从 512kb 扩容到⼀个缓冲区 10mb。
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
新闻名称:RocketMQ-创新互联
网页网址:http://scpingwu.com/article/djsdpo.html