AMQP

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)。定义了消息队列的结构、能力集、以及命令的协议格式。

amqp-architecture.png

消息队列有下面几个角色:

  • Publisher
    消息生产方,组装消息内容和属性将其投递给Exchange。可以有多个。
  • Exchange
    消息路由,将消息按规则(Bindings)投递到对应的队列Queue。可以有多个规则或者使用默认的。

    • Direct Exchange
      按key路由到对应队列,精确匹配
    • Fanout Exchange
      将消息广播到所有队列
    • Topic Exchange
      按Topic模式路由到对应队列,模糊匹配
  • Queue
    消息队列,存储消息并且投递消息给Consumer。可动态创建和销毁。

  • Consumer
    消息消费方,消费从Queue来的消息,如果需要发送ACK确认给Queue。可以有多个。

消息传递有三种模式:

  • Shared共享模式
    生产者发布消息,多个镜像的消费者共享消息队列的状态。最常见的模式,分布式系统里多实例负载均衡。
  • Pub-Sub发布订阅模式
    生产者发布消息,消息会被广播给多个订阅的消费者。EDA(事件驱动架构)一般使用这种模式。
  • Reply请求-回复模式
    生产者发布消息,消费者消费消息,返回应答给生产者。RPC服务会使用这种模式。

AMQP协议文本在https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf
类似的消息协议还有STOMP(Streaming Text Oriented Messaging Protocol)、MQTT(Message Queue Telemerty Transport)等。

RabbitMQ

RabbitMQ是AMQP的一个实现,使用Erlang语言开发。https://www.rabbitmq.com/tutorials

RabbitMQ的集群部署非常简单,在单机节点上执行join_cluster命令即可(默认是磁盘节点,一般是磁盘节点和内存节点--ram混合使用以提升性能)
rabbitmqctl join_cluster rabbit@master

普通集群只是服务上的集群,不会复制和备份消息队列。可以在普通集群上设置镜像策略以控制消息队列的复制。新版本更推荐使用仲裁队列(Quorum Queue,基于raft协议复制的持久化队列)。
下面是镜像集群的架构:
rabbitmq-architecture.png
RabbitMQ集群并不支持队列的水平扩展,比如类似RocketMQ和Kafka那样的分区机制。但是阿里云上的RabbitMQ服务经过简单的改造,是支持水平扩展的。

RabbitMQ以Exchange为核心。生产者发布消息到Exchange上,消费者从Exchange消费消息,是使用独立队列还是共用队列完全由消费者选择的消息模式决定。

支持以下几种消息传播模式:

  1. 默认队列就是Shared模式,负载均衡round-robin
    rabbitmq-queue.png
  2. Fanout Exchange组成多队列实现Pub-Sub模式
    rabbitmq-pub-sub.png
  3. Direct Exchange + Bindings组合成各种路由模式
    rabbitmq-routing.png
  4. RPC模式不推荐
    客户端发消息到请求队列里,服务端从请求队列消费后将回复消息发送到应答队列里,客户端从应答队列拿到correlationId匹配的回复消息。
    rabbitmq-rpc.png

消息可靠传递的一些措施:

  1. 部署集群模式
  2. 创建可靠的队列
    推荐使用仲裁队列
    x-queue-type=quorum
    x-overflow=reject-publish-dlx
    dead-letter-strategy=at-least-once
  3. 生产者使用confirm模式发送持久化消息
  4. 消费者处理后确认消息ACK

RocketMQ

RocketMQ使用Java语言开发。https://rocketmq.apache.org/docs/

rocketmq-architecture.png
新的5.0版本中Broker模块做了存储计算分离,将计算部分独立到了无状态的Proxy模块中。

  • Local模式
    Local模式下,Proxy模块还是和Broker部署在同一个进程中。和4.0的部署几乎一致。
  • Cluster模式
    Proxy和Broker独立部署,5.0版本以前对Broker的所有访问,都切到Proxy。Broker只作为单纯的内部存储服务。

RocketMQ里没有Exchange和Bindings,但是多了个SubscriptionRelationShip支持消费者多实例。
rocketmq-concept.png

支持的消息传播模式:

  1. 点到点传播
    rocketmq-point2point.png
  2. Pub-Sub模式
    rocketmq-pub-sub.png

RocketMQ核心设计

rocketmq-concept.png
RocketMQ面对的主要场景是高吞吐量下的小消息处理,低延迟是核心优化目标。

存储相关的核心代码在org.apache.rocketmq.store

  • CommitLog
    消息内容文件。一个Broker关联的所有topic消息都写在统一的CommitLog文件里。CommitLog以每个1G的固定大小一直顺序写下去。
  • ConsumeQueue
    每个订阅的消费者(或者消费者组)对应一个ConsumeQueue,只存储消息在CommitLog文件里的偏移量。消费者读消息时先从ConsumeQueue取到offset,然后通过offset从CommitLog里拿到消息内容。ConsumeQueue文件的内容是定长的,所以消费者取消息的复杂度O(1)。
┌───────────────────────────────┬───────────────────┬───────────────────────────────┐
│    CommitLog Physical Offset  │      Body Size    │            Tag HashCode       │
│          (8 Bytes)            │      (4 Bytes)    │             (8 Bytes)         │
├───────────────────────────────┴───────────────────┴───────────────────────────────┤
│                                     Store Unit                                    │
  • IndexFile
    Hash索引文件。记录消息id和offset的对应关系,以支持通过消息id查询消息内容。
┌───────────────┬───────────────────────────────┬───────────────┬───────────────┐
│ Key HashCode  │        Physical Offset        │   Time Diff   │ Next Index Pos│
│   (4 Bytes)   │          (8 Bytes)            │   (4 Bytes)   │   (4 Bytes)   │
├───────────────┴───────────────────────────────┴───────────────┴───────────────┤
│                                 Index Store Unit                              │
│                                                                               │
  • RocksDBConsumeQueueOffsetTable
    持久化存储每个消费者已经消费消息的位置。

RocketMQ源码

Broker的启动入口在distribution\bin\runbroker.sh,设置完一些JVM参数后,启动核心类org.apache.rocketmq.broker.BrokerController
启动过程如下:

  • 初始化
      public boolean initialize() throws CloneNotSupportedException {
    
          boolean result = this.initializeMetadata(); //初始化元数据
          if (!result) {
              return false;
          }
    
          result = this.initializeMessageStore(); //初始化存储
          if (!result) {
              return false;
          }
    
          return this.recoverAndInitService(); //初始化服务
      }
    
  • 启动各种内部结构和命令处理器NettyRequestProcessor
    1. MessageStore //队列存储核心
    2. TimerMessageStore //秒级定时队列
    3. ReplicasManager //备份
    4. RemotingServer //主要netty服务NettyRemotingServer
    5. BrokerAttachedPlugin //各种插件
    6. PopMessageProcessor //5.0版本的POP模式,PUSH模式可以升级到这种模式
    7. AckMessageProcessor //ACK命令
    8. NotificationProcessor //POP模式下的通知
    9. TopicQueueMappingCleanService //Topic和队列的映射关系
    10. FileWatchService //磁盘文件切换
    11. PullRequestHoldService //伪PUSH模式
    12. ClientHousekeepingService //管理客户端连接的各种状态(异常、关闭、空闲超时等)
    13. BrokerStatsManager //状态信息
    14. BrokerFastFailure //系统繁忙时快速返回失败SYSTEM_BUSY
    15. BroadcastOffsetManager //订阅模式offset保存
    16. EscapeBridge
    17. TopicRouteInfoManager //topic订阅关系
    18. BrokerPreOnlineService
    19. ColdDataPullRequestHoldService //冷数据分层
    20. ColdDataCgCtrService //冷数据分层
    21. QueryAssignmentProcessor //订阅平衡
  • 向命名服务注册topic列表
  • 启动到命名服务的心跳线程
  • 启动主备节点之间的心跳线程

其中NettyRemotingServer是一个Netty服务,启动代码在remoting\src\main\java\org\apache\rocketmq\remoting\netty\NettyRemotingServer.java

public void start() {
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),
        new ThreadFactoryImpl("NettyServerCodecThread_"));

    prepareSharableHandlers();

    serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) //EpollServerSocketChannel边缘触发
        .option(ChannelOption.SO_BACKLOG, 1024)
        .option(ChannelOption.SO_REUSEADDR, true)
        .childOption(ChannelOption.SO_KEEPALIVE, false)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .localAddress(new InetSocketAddress(this.nettyServerConfig.getBindAddress(),
            this.nettyServerConfig.getListenPort()))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                configChannel(ch);
            }
        });

    addCustomConfig(serverBootstrap); //socket参数设置

    try {
        ChannelFuture sync = serverBootstrap.bind().sync(); //启动服务
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        if (0 == nettyServerConfig.getListenPort()) {
            this.nettyServerConfig.setListenPort(addr.getPort());
        }
        log.info("RemotingServer started, listening {}:{}", this.nettyServerConfig.getBindAddress(),
            this.nettyServerConfig.getListenPort());
        this.remotingServerTable.put(this.nettyServerConfig.getListenPort(), this);
    } catch (Exception e) {
        throw new IllegalStateException(String.format("Failed to bind to %s:%d", nettyServerConfig.getBindAddress(),
            nettyServerConfig.getListenPort()), e);
    }

    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }

    TimerTask timerScanResponseTable = new TimerTask() {
        @Override
        public void run(Timeout timeout) {
            try {
                NettyRemotingServer.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            } finally {
                timer.newTimeout(this, 1000, TimeUnit.MILLISECONDS);
            }
        }
    };
    this.timer.newTimeout(timerScanResponseTable, 1000 * 3, TimeUnit.MILLISECONDS); //检查应答超时

    scheduledExecutorService.scheduleWithFixedDelay(() -> {
        try {
            NettyRemotingServer.this.printRemotingCodeDistribution();
        } catch (Throwable e) {
            TRAFFIC_LOGGER.error("NettyRemotingServer print remoting code distribution exception", e);
        }
    }, 1, 1, TimeUnit.SECONDS);
}
protected ChannelPipeline configChannel(SocketChannel ch) {
    return ch.pipeline()
        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
        .addLast(defaultEventExecutorGroup,
            encoder,
            new NettyDecoder(),
            distributionHandler,
            new IdleStateHandler(0, 0,
                nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
            connectionManageHandler,
            serverHandler
        );
}

以下是消息处理过程:

  • 发布消息到CommitLog
    RocketMQ支持的100+命令参见
    remoting\src\main\java\org\apache\rocketmq\remoting\protocol\RequestCode.java
    发布消息对应的处理器是SendMessageProcessor,默认(有插件接口)会调用DefaultMessageStore::putMessage存储消息。DefaultMessageStore内部就是CommitLog。消息写入CommitLog后就直接应答给生产者了。

    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
      // Set the storage time
      if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
          msg.setStoreTimestamp(System.currentTimeMillis());
      }
      // Set the message body CRC (consider the most appropriate setting on the client)
      msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
      if (enabledAppendPropCRC) {
          // delete crc32 properties if exist
          msg.deleteProperty(MessageConst.PROPERTY_CRC32);
      }
      // Back to Results
      AppendMessageResult result = null;
    
      StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
    
      String topic = msg.getTopic();
      msg.setVersion(MessageVersion.MESSAGE_VERSION_V1);
      boolean autoMessageVersionOnTopicLen =
          this.defaultMessageStore.getMessageStoreConfig().isAutoMessageVersionOnTopicLen();
      if (autoMessageVersionOnTopicLen && topic.length() > Byte.MAX_VALUE) {
          msg.setVersion(MessageVersion.MESSAGE_VERSION_V2);
      }
    
      InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
      if (bornSocketAddress.getAddress() instanceof Inet6Address) {
          msg.setBornHostV6Flag();
      }
    
      InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
      if (storeSocketAddress.getAddress() instanceof Inet6Address) {
          msg.setStoreHostAddressV6Flag();
      }
    
      PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
      updateMaxMessageSize(putMessageThreadLocal);
      String topicQueueKey = generateKey(putMessageThreadLocal.getKeyBuilder(), msg);
      long elapsedTimeInLock = 0;
      MappedFile unlockMappedFile = null;
      MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    
      long currOffset;
      if (mappedFile == null) {
          currOffset = 0;
      } else {
          currOffset = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
      }
    
      int needAckNums = this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas();
      boolean needHandleHA = needHandleHA(msg);
    
      if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
          if (this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset) < this.defaultMessageStore.getMessageStoreConfig().getMinInSyncReplicas()) {
              return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
          }
          if (this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
              // -1 means all ack in SyncStateSet
              needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET;
          }
      } else if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableSlaveActingMaster()) {
          int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
              this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset));
          needAckNums = calcNeedAckNums(inSyncReplicas);
          if (needAckNums > inSyncReplicas) {
              // Tell the producer, don't have enough slaves to handle the send request
              return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
          }
      }
    
      topicQueueLock.lock(topicQueueKey);
      try {
    
          boolean needAssignOffset = true;
          if (defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()
              && defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
              needAssignOffset = false;
          }
          if (needAssignOffset) {
              defaultMessageStore.assignOffset(msg);
          }
    
          PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
          if (encodeResult != null) {
              return CompletableFuture.completedFuture(encodeResult);
          }
          msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
          PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey);
    
          putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
          try {
              long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
              this.beginTimeInLock = beginLockTimestamp;
    
              // Here settings are stored timestamp, in order to ensure an orderly
              // global
              if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                  msg.setStoreTimestamp(beginLockTimestamp);
              }
    
              if (null == mappedFile || mappedFile.isFull()) {
                  mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
                  if (isCloseReadAhead()) {
                      setFileReadMode(mappedFile, LibC.MADV_RANDOM);
                  }
              }
              if (null == mappedFile) {
                  log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                  beginTimeInLock = 0;
                  return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null));
              }
    
              result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
              switch (result.getStatus()) {
                  case PUT_OK:
                      onCommitLogAppend(msg, result, mappedFile);
                      break;
                  case END_OF_FILE:
                      onCommitLogAppend(msg, result, mappedFile);
                      unlockMappedFile = mappedFile;
                      // Create a new file, re-write the message
                      mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                      if (null == mappedFile) {
                          // XXX: warn and notify me
                          log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                          beginTimeInLock = 0;
                          return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
                      }
                      if (isCloseReadAhead()) {
                          setFileReadMode(mappedFile, LibC.MADV_RANDOM);
                      }
                      result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                      if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
                          onCommitLogAppend(msg, result, mappedFile);
                      }
                      break;
                  case MESSAGE_SIZE_EXCEEDED:
                  case PROPERTIES_SIZE_EXCEEDED:
                      beginTimeInLock = 0;
                      return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                  case UNKNOWN_ERROR:
                  default:
                      beginTimeInLock = 0;
                      return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
              }
    
              elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
              beginTimeInLock = 0;
          } finally {
              putMessageLock.unlock();
          }
          // Increase queue offset when messages are successfully written
          if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
              this.defaultMessageStore.increaseOffset(msg, getMessageNum(msg));
          }
      } catch (RocksDBException e) {
          return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
      } finally {
          topicQueueLock.unlock(topicQueueKey);
      }
    
      if (elapsedTimeInLock > 500) {
          log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
      }
    
      if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
          this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
      }
    
      PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
    
      // Statistics
      storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(result.getMsgNum());
      storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
    
      return handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA);
    }
    
  • 扫描CommitLog分发消息到ConsumeQueue
    DefaultMessageStore会开启后台线程ReputMessageService,扫描CommitLog文件把消息分发到各个ConsumeQueue

    public void doReput() {
      if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
          LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
              this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
          this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
      }
      for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
    
          SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
    
          if (result == null) {
              break;
          }
    
          try {
              this.reputFromOffset = result.getStartOffset();
    
              for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
                  DispatchRequest dispatchRequest =
                      DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false, false);
                  int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
    
                  if (reputFromOffset + size > DefaultMessageStore.this.getConfirmOffset()) {
                      doNext = false;
                      break;
                  }
    
                  if (dispatchRequest.isSuccess()) {
                      if (size > 0) {
                          DefaultMessageStore.this.doDispatch(dispatchRequest);
    
                          if (!notifyMessageArriveInBatch) {
                              notifyMessageArriveIfNecessary(dispatchRequest);
                          }
    
                          this.reputFromOffset += size;
                          readSize += size;
                          if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
                              DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                              DefaultMessageStore.this.storeStatsService
                                  .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(dispatchRequest.getBatchSize());
                              DefaultMessageStore.this.storeStatsService
                                  .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                  .add(dispatchRequest.getMsgSize());
                          }
                      } else if (size == 0) {
                          this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                          readSize = result.getSize();
                      }
                  } else {
                      if (size > 0) {
                          LOGGER.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                          this.reputFromOffset += size;
                      } else {
                          doNext = false;
                          // If user open the dledger pattern or the broker is master node,
                          // it will not ignore the exception and fix the reputFromOffset variable
                          if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
                              DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                              LOGGER.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                  this.reputFromOffset);
                              this.reputFromOffset += result.getSize() - readSize;
                          }
                      }
                  }
              }
          } catch (RocksDBException e) {
              ERROR_LOG.info("dispatch message to cq exception. reputFromOffset: {}", this.reputFromOffset, e);
              return;
          } finally {
              result.release();
          }
    
          finishCommitLogDispatch();
      }
    }
    
  • 消费端从ConsumeQueue队列pull消息
    1. 客户端调用DefaultLitePullConsumer.poll
    2. 发送PULL_MESSAGE命令到Broker
    3. Broker对应处理
      public RemotingCommand PullMessageProcessor::processRequest(final Channel channel, RemotingCommand request)

      1. 通过request.consumerGroup找到订阅关系SubscriptionGroupConfig,没有则初始化
      2. 通过request.topic找到TopicConfig和队列映射TopicQueueMappingContext,没有则初始化
      3. 通过request.queueOffset从ConsumeQueue取到消息
        public CompletableFuture<PutMessageResult> EscapeBridge::asyncPutMessage(MessageExtBrokerInner messageExt)
      4. 发送消息到remote
        public static void NettyRemotingAbstract::writeResponse(Channel channel, RemotingCommand request, @Nullable RemotingCommand response)
      5. 记录订阅的偏移量
        public void ConsumerOffsetManager::commitOffset(final String clientHost, final String group, final String topic, final int queueId,
        final long offset)
  • push消息ConsumeQueue到消费端
    1. 客户端调用DefaultMQPushConsumer.registerMessageListener
    2. 发送PULL_MESSAGE命令到Broker(不是真正的push,是长轮询伪造的push)
    3. Broker对应处理
      1. 处理PullMessageProcessor::processRequest时,当队列消息为空ResponseCode.PULL_NOT_FOUND,则挂起这个连接PullRequestHoldService::suspendPullRequest。
      2. 后台线程轮询检查连接
        protected void PullRequestHoldService::checkHoldRequest()
      3. 伪PULL_MESSAGE命令进入PullMessageProcessor::processRequest
  • RPC模式
    RPC模式是更高级别的一个抽象,由两个Pub-Sub模拟了一个RPC调用。public Message DefaultMQProducer::request(final Message msg, final long timeout)

    1. 生产者发送消息时带上请求方标识CORRELATION_ID、REPLY_TO_CLIENT、TTL
    2. 消费者消费完后将结果发布给一个系统Topic:${CLUSTER_NAME}_REPLY_TOPIC
    3. Broker在处理系统Topic时,通过REPLY_TO_CLIENT将消息返回给生产者
    4. 生产者通过CORRELATION_ID关联上请求的消息,request函数继续执行,并且将结果返回给调用方。

Kafka

Kafka也使用Java开发。https://kafka.apache.org/documentation/

kafka-architecture.png

Kafka以Queue为核心,一个Queue对应一个文件。生产者和消费者共用一个文件,消息消费的偏移量offset概念非常重要。

Kafka通过分区来提升吞吐量,但是分区后很多东西无法保证,比如消息有序。

Topics are partitioned, meaning a topic is spread over a number of "buckets" located on different Kafka brokers. This distributed placement of your data is very important for scalability because it allows client applications to both read and write the data from/to many brokers at the same time. When a new event is published to a topic, it is actually appended to one of the topic's partitions. Events with the same event key (e.g., a customer or vehicle ID) are written to the same partition, and Kafka guarantees that any consumer of a given topic-partition will always read that partition's events in exactly the same order as they were written.

Kafka是从日志处理系统发展而来,消息处理的延迟不是主要的考量目标,比较适合大数据场景。



微信扫描下方的二维码阅读本文

上一篇: 一致性协议Raft

下一篇: Redis


0 Comments

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注