RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
52.源代码解读-RocketMQ消息写入机制

一. 前言

RocketMQ采用内存和磁盘存储来存储消息。那现在来分析一下消息存储的流程

成都创新互联公司主要从事网站设计、网站建设、网页设计、企业做网站、公司建网站等业务。立足成都服务白碱滩,10余年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:028-86922220

二. 代码流程

在Broker启动的时候会拉起相关服务
流程如下:

52.源代码解读-RocketMQ消息写入机制

流程图引用网址
http://blog.csdn.net/akfly/article/details/53447000

三. 代码流程

由于是Broker来存储消息,那么消息入口的代码应该是在Broker里面,而Broker的入口是BrokerStartup,以及重要的BrokerController。
具体流程可以参考Broker启动源代码分析。

Broker启动流程

以发送消息为例

1. Broker启动注册发送消息处理器

Broker启动的时候,会注册一个SendMessageProcesser来响应netty的发送消息请求,如下:

public void registerProcessor() {
        /**
         * SendMessageProcessor
         */
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
}

2. 消息处理器处理发送者发送过来的消息

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

    @Override
    public RemotingCommand proce***equest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        SendMessageContext mqtraceContext;
                ...
        switch (request.getCode()) {
            response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
        }
    }
}       

继续看sendMessage..

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
        ...
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}               

调用MessageStore.putMessage(msgInner)


网站栏目:52.源代码解读-RocketMQ消息写入机制
标题路径:http://scpingwu.com/article/pojohp.html