当前位置:首页 » 存储配置 » rocketmq存储

rocketmq存储

发布时间: 2024-09-18 18:29:06

Ⅰ RocketMQ第五讲

broker是RocketMQ的核心,核心工作就是接收生成这的消息,进行存储。同时,收到消费者的请求后,从磁盘读取内容,把结果返回给消费者。

消息主体以及元数据的存储主体,存储Procer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

CommitLog文件中保存了消息的全量内容。不同的Topic的消息,在CommitLog都是顺序存放的。就是来一个消息,不管Topic是什么,直接追加的CommitLog中。

broker启动了一个专门的线程来构建索引,把CommitLog中的消息,构建了两种类型的索引。ConsumerQueue和Index。正常消费的时候,是根据Topic来消费,会用到ConsumerQueue索引。

也可根据返回的offsetMsgId,解析出ip,端口和CommitLog中的物理消息偏移量,直接去CommitLog中取数据。

引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。

其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。

IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是: {fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

按照Message Key查询消息的时候,会用到这个索引文件。

IndexFile索引文件为用户提供通过“按照Message Key查询消息”的消息索引查询服务,IndexFile文件的存储位置是: {fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于40+500W 4+2000W 20= 420000040个字节大小。如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。

其中的索引数据包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共20 Byte。NextIndex offset 即前面读出来的 slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个Index File的结构如图,40 Byte 的Header用于保存一些总的统计信息,4 500W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。20 2000W 是真正的索引数据,即一个 Index File 可以保存 2000W个索引。

“按照Message Key查询消息”的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。

RocketMQ中有两个核心模块,remoting模块和store模块。remoting模块在NameServer,Proce,Consumer和Broker都用到。store只在Broker中用到,包含了存储文件操作的API,对消息实体的操作是通过DefaultMessageStore进行操作。

属性和方法很多,就不往这里放了。

文件存储实现类,包括多个内部类

· 对于文件夹下的一个文件

上面介绍了broker的核心业务流程和架构,关键接口和类,启动流程。最后介绍一下broker的线程模型,只有知道了线程模型,才能大概知道前面介绍的那些事如何协同工作的,对broker才能有一个立体的认识。

RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。关于Reactor线程模型,可以看看我之前写的这篇文档: Reactor线程模型

上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码code去processorTable这个本地缓存变量中找到对应的 processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。

上面的图和这段画是从官方文档抄过来的,但是文字和图对应的不是很好,画的也不够详细,但是主要流程是这个样子。以后有时间了,我重新安装自己的理解,画一张更详细的图。

AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,应该是守护线程

FileWatchService:

NettyEventExecutor:

NettyNIOBoss_:一个

NettyServerNIOSelector_:默认为三个

NSScheledThread:定时任务线程

ServerHouseKeepingService:守护线程

ThreadDeathWatch-2-1:守护线程,Netty用,已经废弃

RemotingExecutorThread(1-8):工作线程池,没有共用NettyServerNIOSelector_,直接初始化8个线程

AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,共九个:

RocketmqBrokerAppender_inner

RocketmqFilterAppender_inner

RocketmqProtectionAppender_inner

RocketmqRemotingAppender_inner

RocketmqRebalanceLockAppender_inner

RocketmqStoreAppender_inner

RocketmqStoreErrorAppender_inner

RocketmqWaterMarkAppender_inner

RocketmqTransactionAppender_inner

SendMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_MESSAGE

PullMessageThread_:remotingServer.registerProcessor(RequestCode.PULL_MESSAGE

ProcessReplyMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE

QueryMessageThread_:remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE

AdminBrokerThread_:remotingServer.registerDefaultProcessor

ClientManageThread_:remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT

HeartbeatThread_:remotingServer.registerProcessor(RequestCode.HEART_BEAT

EndTransactionThread_:remotingServer.registerProcessor(RequestCode.END_TRANSACTION

ConsumerManageThread_:remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,RequestCode.UPDATE_CONSUMER_OFFSET,RequestCode.QUERY_CONSUMER_OFFSET

brokerOutApi_thread_:BrokerController.registerBrokerAll(true, false, true);

==================================================================

BrokerControllerScheledThread:=>

BrokerController.this.getBrokerStats().record();

BrokerController.this.consumerOffsetManager.persist();

BrokerController.this.consumerFilterManager.persist();

BrokerController.this.protectBroker();

BrokerController.this.printWaterMark();

log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());

BrokerController.this.brokerOuterAPI.fetchNameServerAddr();

BrokerController.this.printMasterAndSlaveDiff();

BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());

:=>

:=>

FilterServerManager.this.createFilterServer();

:=>

ClientHousekeepingService.this.scanExceptionChannel();

PullRequestHoldService

FileWatchService

AllocateMappedFileService

AcceptSocketService

BrokerStatsThread1

Ⅱ RocketMQ 之 IoT 消息解析:物联网需要什么样的消息技术

随着技术趋势的变迁,RocketMQ 5.0 作为关键角色,助力物联网(IoT)的崛起。面对物联网的快速发展和数据规模挑战,消息中间件的需求也在升级。物联网设备数量预计将达到200亿,实时数据占比90%,这推动了对消息技术的新需求,如低延迟处理和边缘计算的结合。

物联网消息技术的核心在于连接设备、处理数据流,并支持边缘计算。RocketMQ 5.0 通过子产品RocketMQ - MQTT,采用MQTT协议,考虑到物联网设备的弱网和低算力特性,提供轻量级的发布订阅功能。其架构特点包括存算分离和端云一体化,以降低存储成本并支持大规模连接的实时推送。

物联网场景对消息技术的诉求包括支持设备间和设备与云端的通信,以及实时流处理。RocketMQ - MQTT通过读放大为主的存储模型,结合MQTT协议的复杂订阅机制,实现高效的消息传递。同时,它引入端云一体化存储,允许服务端和物联网设备共享消息,简化集成,提高效率。

为了支持百万级队列,RocketMQ采用Rocksdb引擎,提供高性能的顺序写入和原子性操作,保证数据一致性。推送模型则采用推拉结合的方式,通过Proxy节点进行消息匹配和可靠触达,兼顾性能和扩展性。

总的来说,RocketMQ 5.0的物联网优化旨在提供一个高效、兼容和可扩展的解决方案,适应物联网技术的快速发展和挑战,帮助企业在数字化转型中简化架构,降低成本,提升用户体验。

Ⅲ RocketMQ架构分析

RocketMQ是阿里巴巴捐赠给appache的MQ开源组件,从架构上我们分析一下。

kafka是依靠Zookeeper进行集群选举的,在rocketMQ的同样位置上是NameServer,这个Nameserver仅仅是注册服务,没有选举能力。每个broker都和NameServer进行连接,通过心跳维持状态。

procer和consumer定时到Nameserver拉取broker信息,并且和自己所消费的broker建立连接。这就和微服务的体系一模一样了。

那么rocketMQ的集群选举怎么实现的呢,通过集成了Dledge实现,Dledge是个jar包,实现了raft算法

如图,topic可在多个broker上形成分片,procer可写数据到不通的分片,分片信息也可以由不同的group进行消费。

如下介绍存储,rocketMQ可配置主备,形成主备复制。

http://rocketmq.apache.org/rocketmq/how-to-support-more-queues-in-rocketmq/ 介绍了rocketMQ存储设计初衷,和kafka存储不同,kafka在每个partition中存储了数据,而RocketMQ将实际消息集中存储,在messageQueue中存储的是元数据信息,通过元数据信息可以索引到CommitLog。

对于保存的数据,每天会删除数据;如果磁盘满,超过设置阈值,则不允许写入数据。

RocketMQ的设计确保了消息的并发处理能力,但是有时候,消息是有状态的,即有顺序,RocketMQ怎么实现呢?

发送到临时缓存,到达延迟时间后由delay service路由给topic。

如果消费返回了consumer_later,则如上述延迟消息一样,会延迟一段时间,进入死信队列,消费死信队列,重新处理。

如果业务规模小,不会改源码,就选用RabbitMQ;如果业务规模大,不允许丢消息,追求效率高,用RocketMQ;如果业务规模大,运行少量丢消息,吞吐量大,用Kafka;如果用于大数据,毫无疑问选kafka。

热点内容
删除云存储空间图库 发布:2024-09-19 19:59:18 浏览:638
荣耀三四的存储状况 发布:2024-09-19 19:54:19 浏览:99
事例数据库 发布:2024-09-19 19:54:11 浏览:206
php盒子 发布:2024-09-19 19:46:31 浏览:330
手机配置低和平精英如何提升技术 发布:2024-09-19 19:43:55 浏览:704
软件连不上服务器ip 发布:2024-09-19 19:15:49 浏览:723
服务器数据库如何建立 发布:2024-09-19 19:15:07 浏览:366
解压软件游戏 发布:2024-09-19 18:51:54 浏览:155
16新速腾配置怎么样有改进吗 发布:2024-09-19 18:47:20 浏览:338
脚本不管用 发布:2024-09-19 18:46:30 浏览:98