RocketMQ——Consumer篇:向Broker同步消费进展的守时使命ITeye - AG环亚娱乐集团

RocketMQ——Consumer篇:向Broker同步消费进展的守时使命ITeye

2019-01-10 20:59:05 | 作者: 盼烟 | 标签: 消费,进展,一个 | 浏览: 1484

每隔5秒调用一次MQClientInstance.persistAllConsumerOffset()办法将消费进展向Broker同步。遍历MQClientInstance.consumerTable: ConcurrentHashMap String/*group */, MQConsumerInner 变量。关于PushConsumer端和PullConsumer端,处理逻辑是相同的,以DefaultMQPushConsumerImpl为例,调用DefaultMQPushConsumerImpl.persistConsumerOffset()办法。

1、获取DefaultMQPushConsumerImpl.rebalanceImpl变量的processQueueTable:ConcurrentHashMap MessageQueue, ProcessQueue 变量值,取该变量的key值调集,即MessageQueue调集;以该调集为参数调用OffsetStore.persistAll(Set MessageQueue  mqs)办法;

2、若音讯形式是播送(BROADCASTING),即DefaultMQPushConsumerImpl.offsetStore变量初始化为LocalFileOffsetStore目标,在此调用LocalFileOffsetStore.persistAll(Set MessageQueue  mqs)办法,在此办法中遍历LocalFileOffsetStore.offsetTable:ConcurrentHashMap MessageQueue,AtomicLong 变量,将包含在第1步的MessageQueue调集中的MessageQueue目标的消费进展耐久化到consumerOffset.json物理文件中;

3、若音讯形式为集群形式,即DefaultMQPushConsumerImpl.offsetStore变量初始化为RemoteBrokerOffsetStore目标,在此调用RemoteBrokerOffsetStore.persistAll(Set MessageQueue  mqs)办法,在此办法中遍历RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap MessageQueue,AtomicLong 变量;关于包含在第1步的MessageQueue调集中的MessageQueue目标,调用updateConsumeOffsetToBroker(MessageQueuemq, long offset)办法向Broker发送UPDATE_CONSUMER_OFFSET恳求码的消费进展信息;

 

消费进展(offset)

消费进展是指,当一个consumer group里的consumer在消费某个queue里的音讯时,equeue是经过记载消费方位(offset)来知道当时消费到哪里了。以便该consumer重启后持续从该方位开端消费。比方一个topic有4个queue,一个consumer group有4个consumer,则每个consumer分配到一个queue,然后每个consumer别离消费自己的queue里的音讯。equeue会别离记载每个consumer对其queue的消费进展,然后保证每个consumer重启后知道下次从哪里开端持续消费。实际上,或许下次重启后不是由该consumer消费该queue了,而是由group里的其他consumer消费了,这样也不要紧,因为咱们现已记载了这个queue的消费方位了。所以能够看出,消费方位和consumer其实无关,消费方位彻底是queue的一个特点,用来记载当时被消费到哪里了。别的一点很重要的是,一个topic能够被多个consumer group里的consumer订阅。不同consumer group里的consumer即便是消费同一个topic下的同一个queue,那消费进展也是分隔存储的。也便是说,不同的consumer group内的consumer的消费彻底阻隔,互相不受影响。还有一点便是,关于集群消费和播送消费,消费进展耐久化的当地是不同的,集群消费的消费进展是放在broker,也便是音讯行列效劳器上的,而播送消费的消费进展是存储在consumer本地磁盘上的。之所以这样规划是因为,关于集群消费,因为一个queue的顾客或许会替换,因为consumer group下的consumer数量或许会添加或削减,然后就会从头核算每个consumer该消费的queue是哪些,这个能了解的把?所以,当呈现一个queue的consumer变化的时分,新的consumer怎么知道该从哪里开端消费这个queue呢?假如这个queue的消费进展是存储在前一个consumer效劳器上的,那就很难拿到这个消费进展了,因为有或许那个效劳器现已挂了,或许下架了,都有或许。而因为broker关于一切的consumer总是在效劳的,所以,在集群消费的状况下,被订阅的topic的queue的消费方位是存储在broker上的,存储的时分依照不同的consumer group做阻隔,以保证不同的consumer group下的consumer的消费进展互补影响。然后,关于播送消费,因为不会呈现一个queue的consumer会变化的状况,所以咱们没必要让broker来保存消费方位,所以是保存在consumer自己的效劳器上。

 

http://blog.csdn.net/meilong_whpu/article/details/77065587

https://www.cc362.com/content/Npz3glwzPQ.html

版权声明
本文来源于网络,版权归原作者所有,其内容与观点不代表AG环亚娱乐集团立场。转载文章仅为传播更有价值的信息,如采编人员采编有误或者版权原因,请与我们联系,我们核实后立即修改或删除。

猜您喜欢的文章

阅读排行

  • 1

    简历打分排序ITeye

    排序,简历,体系
  • 2

    如何用Redlock完成分布式锁ITeye

    分布式,完成,获取
  • 3
  • 4

    java 批量推送 iosITeye

    推送,测验,内容
  • 5
  • 6
  • 7
  • 8

    递归算法和文件行列算法ITeye

    文件,行列,文件夹
  • 9

    链表结构ITeye

    结点,保存,删去
  • 10