RocketMQ-客户端编程模型
RocketMQ的运⾏架构rocketmp核心的服务nameserver与brokerbroker它是实际处理业务数据的的服务单元包括客户端发送消息consumer拉取消息都是通过broker来进行存储以及中转这些业务数据的。name server它的作用只是用来维护broker的信息。业务繁忙时为了保证这个整个服务的高可用增加name server服务来维护信息name server会及时维护这个broker服务状态如broker哪些存活地址多少等信息。客户端producer和consumer整体的工作机制是只需要直接指定name server获取broker的服务状态信息然后通过负载均衡算法找到可用的broker服务发送请求、推送信息、接受信息。整体上来说对于客户端编程模型只需要指定name server。但实际上背后请求都是通过和broker交互完成的只有broker才有处理消息的能力nameserver不处理消息。RocketMQ的消息模型在业务层面producer和consumer会通过topic来指定所属的业务类型。而topic实际上它是一个逻辑概念并不实际存储消息它只是逻辑上的一个概念。实际存储的消息是topic下面对应的一系列的message消息队列这些消息队列会尽量平均的分配到多个不同的broker当中 在集群当中根据broker的数量均匀分配这样有利于发挥broker的这种集群的性能优势。客户端消息确认机制1、⽣产端采⽤消息确认加多次重试的机制保证消息正常发送到RocketMQ单向发送消息⽣产者只管往Broker发送消息不关⼼Broker端是否成功接收到消息。特点发送消息效率更⾼。适⽤于⼀些追求消息发送效率⽽允许消息丢失的业务场景。⽐如⽇志。同步发送消息⽣产者在往Broker端发送消息后会阻塞当前线程等待Broker端的相应结果特点很⼤程度上保证消息发送的安全性但是发送效率⽐较低如果⽹速⽐较慢同步发送的耗时就会很⻓异步发送⽣产者在向Broker发送消息时会同时注册⼀个回调函数。接下来⽣产者并不等待Broker的响应。当Broker端有响应数据过来时⾃动触发回调函数进⾏对应的处理。特点较好的兼容消息的安全性以及⽣产者的⾼吞吐需求2、消费者端采⽤状态确认机制保证消费者⼀定能正常处理对应的消息Broker等待消费者返回消息处理状态消费者返回CONSUME_SUCCESS表示消息处理完成消费者返回RECONSUME_LATER则Broker会过⼀段时间再发起消息重试。消息重试机制Broker不会向消费者⽆限制的推送失败的信息。Broker会记录每⼀个消息的重试次数。如果⼀个消息经过很多次重试后消费者依然⽆法正常处理那么Broker会将这个消息推⼊到消费者组对应的死信Topic中。RocketMQ默认的最⼤重试次数是16次。每个消费者组⾃动⽣成⼀个对应的重试Topic。在消息重试时会先移动到对应的重试Topic中。后续Broker只要从这些重试Topic中不断拿出消息往消费者组重新推送即可。重试的消息有单独队列不会影响到Topic下的其他消息。Broker端最终只通过消费者组返回的状态来确定消息有没有处理成功。⾄于消费者组⾃⼰的业务执⾏是否正常Broker端是不管的。同一个消费者组可以订阅多个Topic消费者组内所有实例的消费业务逻辑也必须完全相同。生产者是向Topic发送Topic有多个队列生产者会轮流或按规则选择不同队列队列在集群模式下只会绑定消费者组内的一个消费者。3、消费者组⾃⾏指定起始消费位点创建新的消费者组通过ConsumerFromWhere属性指定这个消费者组的消费起点。属性包括 从对列的最后⼀条消息开始消费CONSUME_FROM_LAST_OFFSET 从对列的第⼀条消息开始消费CONSUME_FROM_FIRST_OFFSET 从某⼀个时间点开始重新消费CONSUME_FROM_TIMESTAMP广播模式详解⼀个消息则会推送给所有消费者实例处理不再关⼼消费者组。示例代码consumer.setMessageModel(MessageModel.BROADCASTING);实现机制将Offset转移到Consumer端⾃⾏保管包括Offset记录以及更新全都放到客户端。Broker推送消息时就不再管ConsumerGroup只要Consumer来拉取消息就返回对应的消息。注意Broker端不维护消费进度如果消费者处理消息失败了将⽆法进⾏消息重试。在服务重启时消费者按照上⼀次消费的进度处理后⾯没有消费过的消息。如果Offset丢了Consuer依然可以拉取消息只能从最新消息读取。消息过滤机制1简单过滤⽣产者发送消息时增加Tag属性消费者通过Tag属性过滤内容2SQL过滤注意点1、Consumer端过滤也可以⾃⾏获取⽤户属性将不感兴趣的消息直接返回不成功的状态跳过该消息就⾏了。2、Broker会在往Consumer推送消息时在Broker端进⾏消息过滤只将Consumer感兴趣的消息推送给Consumer。这样的好处是减少了不必要的⽹络IO但是缺点是加⼤了服务端的压⼒。不过在RocketMQ的良好设计下更建议使⽤消息过滤机制。3、Consumer不感兴趣的消息并不表示直接丢弃。通常是在同⼀个消费者组定制另外的消费者实例消费那些剩下的消息。但是如果⼀直没有另外的Consumer那么Broker端还是会推进Offset。顺序消息机制通过MessageSelector将相同的消息都转发到同⼀个MessageQueue中。消费者使用MessageListenerOrderly1、⽣产者只有将⼀批有顺序的消息放到同⼀个MesasgeQueue上通过MessageQueue的FIFO特性保证这⼀批消息的顺序。如果不指定MessageSelector对象那么⽣产者会采⽤轮询的⽅式将多条消息依次发送到不同的MessageQueue上。2、消费者需要实现MessageListenerOrderly接⼝实际上在broker服务端处理MessageListenerOrderly时会给⼀个MessageQueue加锁拿到MessageQueue上所有的消息然后再去读取下⼀个MessageQueue的消息。注意点1、理解局部有序与全局有序。⼤部分业务场景下我们需要的其实是局部有序。如果要保持全局有序那就只保留⼀个MessageQueue。性能显然⾮常低。2、⽣产者端尽可能将有序消息打散到不同的MessageQueue上避免过于集中导致数据热点竞争。3、消费者端只进⾏有限次数的重试。如果⼀条消息处理失败RocketMQ会将后续消息阻塞住让消费者进⾏重试。但是如果消费者⼀直处理失败超过最⼤重试次数那么RocketMQ就会跳过这⼀条消息处理后⾯的消息这会造成消息乱序。4、消费者端如果确实处理逻辑中出现问题不建议抛出异常可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代。延迟消息指消息发送到RocketMQ后并不期望⽴⻢投递这条消息⽽是延迟⼀定时间后才投递到Consumer进⾏消费。1、指定固定的延迟级别实现方式预设⼀个系统Topic名字叫做SCHEDULE_TOPIC_XXXXX。在这个Topic下预设了18个MessageQueue。这⾥每个对列就对应了⼀种延迟级别。然后每次扫描这18个队列⾥的消息进⾏延迟操作就可以了。2、指定消息发送时间通过时间轮算法实现的。批量消息⽣产者要发送的消息⽐较多时可以将多条消息合并成⼀个批量消息⼀次性发送出去。这样可以减少⽹络IO提升消息发送的吞吐量注意点同⼀批消息的Topic必须相同另外不⽀持延迟消息。还有批量消息的⼤⼩不要超过1M如果太⼤就需要⾃⾏分割。事务消息机制两阶段提交过程⽣产者将消息发送⾄RocketMQ服务端。RocketMQ服务端将消息持久化成功之后向⽣产者返回Ack确认消息已经发送成功此时消息被标记为暂不能投递这种状态下的消息即为半事务消息。⽣产者开始执⾏本地事务逻辑。⽣产者根据本地事务执⾏结果向服务端提交⼆次确认结果Commit或是Rollback服务端收到确认结果后处理逻辑如下⼆次确认结果为Commit服务端将半事务消息标记为可投递并投递给消费者。⼆次确认结果为Rollback服务端将回滚事务不会将半事务消息投递给消费者。在断⽹或者是⽣产者应⽤重启的特殊情况下若服务端未收到发送者提交的⼆次确认结果或服务端收到的⼆次确认结果为Unknown未知状态经过固定时间后服务端将对消息⽣产者即⽣产者集群中任⼀⽣产者实例发起消息回查。⽣产者收到消息回查后需要检查对应消息的本地事务执⾏的最终结果。⽣产者根据检查到的本地事务的最终状态再次提交⼆次确认服务端仍按照步骤 4 对半事务消息进⾏处理。注意点1、半消息是对消费者不可⻅的⼀种消息。实际上RocketMQ的做法是将消息转到了⼀个系统TopicRMQ_SYS_TRANS_HALF_TOPIC。2、事务消息中本地事务回查次数通过参数transactionCheckMax设定默认15次。本地事务回查的间隔通过参数transactionCheckInterval设定默认60秒。超过回查次数后消息将会被丢弃。ACL权限控制体系应⽤场景RocketMQ提供了针对队列、⽤户等不同维度的⾮常全⾯的权限管理机制。通常来说RocketMQ作为⼀个内部服务是不需要进⾏权限控制的但是如果要通过RocketMQ进⾏跨部⻔甚⾄跨公司的合作权限控制的重要性就显现出来了。权限控制体系1、RocketMQ针对每个Topic就有完整的权限控制。⽐如在控制平台中就可以很⽅便的给每个Topic配置权限。perm字段表示Topic的权限。有三个可选项。 2禁写禁订阅4可订阅不能写6可写可订阅2、在Broker端还提供了更详细的权限控制机制。主要是在broker.conf中打开acl的标志aclEnabletrue。然后就可以⽤他提供的plain_acl.yml来进⾏权限配置了。并且这个配置⽂件是热加载的也就是说要修改配置时只要修改配置⽂件就可以了不⽤重启Broker服务。⽂件的配置⽅式也⾮常简单⼀⽬了然。补充1、消息的IDKey和TagMessageId是RocketMQ内部给每条消息分配的唯⼀索引Producer发送的Message对象是没有msgId属性的。Broker端接收到Producer发过来的消息后会给每条消息单独分配⼀个唯⼀的msgId。这个msgID可以作为消息的唯⼀主键来使⽤。在RocketMQ内部也会针对批量消息、事务消息等特殊的消息机制有特殊的msgId分配机制。因此在复杂业务场景下不建议使⽤msgId来作为消息的唯⼀索引⽽建议采⽤下⾯的key属性⾃⾏指定业务层⾯上的唯⼀索引2、key是Message中的补充信息在Producer发送Message消息时同样也是没有key属性的。⽽这⾥设置的key其实是以RocketMQ中消息的补充属性的形式插⼊进去的。针对key这⼀个属性建议在业务中可以添加⼀些带有业务唯⼀性的数据作为MessageId的补充。RocketMQ基于Keys属性实现了消息溯源、消息压缩等⼀系列功能。3、通过Tag进⾏消息过滤性能⾮常⾼Tag属性也是Producer发送的Message对象的固有属性。其作⽤主要是⽤来进⾏消息过滤。实际上RocketMQ的服务端会把消息的Tag信息以某种形式(hashCode)写⼊到检索消息的ConsumeQueue索引中。这样当Consumer消费消息时就可以通过过滤ConsumeQueue索引中的Tag属性快速找到⾃⼰感兴趣的消息。⼀个应⽤尽可能⽤⼀个Topic⽽消息⼦类型则可以⽤tags来标识。tags可以由应⽤⾃由设置只有⽣产者在发送消息设置了tags消费⽅在订阅消息时才可以利⽤tags通过broker做消息过滤message.setTags(“TagA”)。Kafka的⼀⼤问题是Topic过多会造成Partition⽂件过多影响性能。⽽RocketMQ中的Topic完全不会对消息转发性能有影响。但是Topic过多还是会加⼤RocketMQ的元数据维护的性能消耗。所以在使⽤时还是需要对Topic进⾏合理的分配。使⽤Tag区分消息时尽量直接使⽤Tag过滤不要使⽤复杂的SQL过滤。因为消息过滤机制虽然可以减少⽹络IO但是毕竟会加⼤Broker端的消息处理压⼒。所以消息过滤的逻辑还是越简单越好。4、消费者端进⾏幂等控制需要由业务系统⾃⾏保证消息的幂等性消息幂等的必要性**发送时消息重复**当⼀条消息已被成功发送到服务端并完成持久化此时出现了⽹络闪断或者客户端宕机导致服务端对客户端应答失败。 如果此时⽣产者意识到消息发送失败并尝试再次发送消息消费者后续会收到两条内容相同并且 Message ID 也相同的消息。**投递时消息重复**消息消费的场景下消息已投递到消费者并完成业务处理当客户端给服务端反馈应答的时候⽹络闪断。 为了保证消息⾄少被消费⼀次消息队列 RocketMQ 的服务端将在⽹络恢复后再次尝试投递之前已被处理过的消息消费者后续会收到两条内容相同并且Message ID 也相同的消息。负载均衡时消息重复包括但不限于⽹络抖动、Broker 重启以及订阅⽅应⽤重启当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时会触发 Rebalance此时消费者可能会收到重复消息。处理⽅式RocketMQ中是⽆法保证每个消息只被投递⼀次的所以要在业务上⾃⾏来保证消息消费的幂等性RocketMQ的每条消息都有⼀个唯⼀的MessageId这个参数在多次投递的过程中是不会改变的所以业务上可以⽤这个MessageId来作为判断幂等的关键依据。但是这个MessageId是⽆法保证全局唯⼀的也会有冲突的情况。所以在⼀些对幂等性要求严格的场景最好是使⽤业务上唯⼀的⼀个标识⽐较靠谱。例如订单ID。⽽这个业务标识可以使⽤Message的Key来进⾏传递。关注错误消息重试消费者端如果处理消息失败了Broker是会将消息重新进⾏投送的。⽽在重试时RocketMQ实际上会为每个消费者组创建⼀个对应的重试队列。重试的消息会进⼊⼀个 “%RETRY%”ConsumeGroup 的队列中。多关注重试队列可以及时了解消费者端的运⾏情况。这个队列中出现了⼤量的消息就意味着消费者的运⾏出现了问题要及时跟踪进⾏⼲预。RocketMQ默认允许每条消息最多重试16次重试次数如果消息重试16次后仍然失败消息将不再投递。转为进⼊死信队列。然后关于这个重试次数RocketMQ可以进⾏定制。例如通过consumer.setMaxReconsumeTimes(20);将重试次数设定为20次。当定制的重试次数超过16次后消息的重试时间间隔均为2⼩时。配置覆盖消息最⼤重试次数的设置对相同GroupID下的所有Consumer实例有效。并且最后启动的Consumer会覆盖之前启动的Consumer的配置。⼿动处理死信队列当⼀条消息消费失败RocketMQ就会⾃动进⾏消息重试。⽽如果消息超过最⼤重试次数RocketMQ就会认为这个消息有问题。但是此时RocketMQ不会⽴刻将这个有问题的消息丢弃⽽会将其发送到这个消费者组对应的⼀种特殊队列死信队列。通常⼀条消息进⼊了死信队列意味着消息在消费处理的过程中出现了⽐较严重的错误并且⽆法⾃⾏恢复。此时⼀般需要⼈⼯去查看死信队列中的消息对错误原因进⾏排查。然后对死信消息进⾏处理⽐如转发到正常的Topic重新进⾏消费或者丢弃。死信队列的名称是%DLQ%ConsumGroup死信队列的特征⼀个死信队列对应⼀个ConsumGroup⽽不是对应某个消费者实例。如果⼀个ConsumeGroup没有产⽣死信RocketMQ就不会为其创建相应的死信队列。⼀个死信队列包含了这个ConsumeGroup⾥的所有死信消息⽽不区分该消息属于哪个Topic。死信队列中的消息不会再被消费者正常消费。死信队列的有效期跟正常消息相同。默认3天对应broker.conf中的fileReservedTime属性。超过这个最⻓时间的消息都会被删除⽽不管消息是否消费过。注默认创建出来的死信队列他⾥⾯的消息是⽆法读取的在控制台和消费者中都⽆法读取。这是因为这些默认的死信队列他们的权限perm被设置成了2:禁读(这个权限有三种 2:禁读4:禁写,6:可读可写)。需要⼿动将死信队列的权限配置成6才能被消费(可以通过mqadmin指定或者web控制台)。