Pulsar(1) —— 消息的接受#
consumer FLOW#
pulsar的消息消费其实是在SDK中由consumer发一条command.FLOW去向broker索要消息。
sdk维护了一个imcommingMessages, 用于缓冲从broker接收到的消息,并记录一个初始permit = imcommingMessages的最大长度,每次FLOW会给出一个具体数值messagePermits代表可以接受的消息条数,并且permit -= messagePermits。实际作用就是一个跨进程的信号量。
pb源码参见github
message CommandFlow {
required uint64 consumer_id = 1;
// Max number of messages to prefetch, in addition
// of any number previously specified
required uint32 messagePermits = 2;
}
这里偷一个哪都有的图解

broker send#
broker处理完这条flow command之后,dispatcher会向 cursor(最终通过DB)索要一定数量的entry。主要调用readMoreEntries这个方法。在用户视角或者说consumer视角,receive()得到的是一条条的消息,即message,而entry是一个db的概念。
具体而言,如果开启了批量消息,那么一个entry可能实际上是一批message,而ACK是以message为单位,当需要重推批量消息时,实际上是重推整一个entry, dispatcher不会花费cpu时间对entry进行解包、过滤、再组包的过程,而是牺牲网络io和客户端cpu,在sdk侧进行解包和过滤。
dispatcher得到entry之后,进行一些无聊的permit计算,得到合适数量的message后,便通过command.Message发往消费者,以shared模式为例:
protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
// 核心逻辑如下
while (entriesToDispatch > 0 && isAtleastOneConsumerAvailable()) {
Consumer c = getNextConsumer();
if (c == null) {
// 无可用消费者:释放剩余 entry,rewind cursor 并返回 false
entries.subList(start, entries.size()).forEach(Entry::release);
cursor.rewind();
lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
return false;
}
int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;
int maxEntriesInThisBatch = getMaxEntriesInThisBatch(
remainingMessages,
c.getMaxUnackedMessages(),
c.getUnackedMessages(),
avgBatchSizePerMsg,
availablePermits,
serviceConfig.getDispatcherMaxRoundRobinBatchSize()
);
int end = Math.min(start + maxEntriesInThisBatch, entries.size());
List<Entry> entriesForThisConsumer = entries.subList(start, end);
if (readType == ReadType.Replay) {
entriesForThisConsumer.forEach(entry ->
redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId()));
}
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
totalEntries += filterEntriesForConsumer(
metadataArray, start, entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks,
cursor, readType == ReadType.Replay, c);
totalEntriesProcessed += entriesForThisConsumer.size();
c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks,
sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
int msgSent = sendMessageInfo.getTotalMessages();
remainingMessages -= msgSent;
start += maxEntriesInThisBatch;
entriesToDispatch -= maxEntriesInThisBatch;
// 全局可用许可调整:已发送消息数减去批内已 ack 的索引数
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
-(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
}
lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
if (entriesToDispatch > 0) {
// 将未发送的 entries 存入重放队列以便稍后重试
entries.subList(start, entries.size()).forEach(this::addEntryToReplay);
}
return true;
}
我们关注这行代码
c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks,
sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
这里以consumer为单位接受List, 可以看到batchIndexesAcks这个参数,其中的元素对应pb中的这一个字段
message CommandMessage {
required uint64 consumer_id = 1;
required MessageIdData message_id = 2;
optional uint32 redelivery_count = 3 [default = 0];
repeated int64 ack_set = 4; // here
optional uint64 consumer_epoch = 5;
}
对每个entry进行二进制编码后,最终走到的生成消息逻辑如下,metadataAndPayload即为entry的二进制格式
public ByteBufPair newMessageAndIntercept(long consumerId, long ledgerId, long entryId, int partition,
int redeliveryCount, ByteBuf metadataAndPayload, long[] ackSet, String topic, long epoch) {
// 这里command即为CommandMessage
BaseCommand command = Commands.newMessageCommand(consumerId, ledgerId, entryId, partition, redeliveryCount,
ackSet, epoch);
ByteBufPair res = Commands.serializeCommandMessageWithSize(command, metadataAndPayload);
// 忽略interceptor的逻辑
return res;
}
可以看到,一条CommandMessage里实际上带的是一条entry。
consumer recv#
consumer的receive()是接受单条消息,但是command是以entry为单位发送的,因此需要在
sdk的messageReceived()里面进行解包分离message,并塞到imcommingMessages中。
拆分的时候,便需要使用ackSet进行消息过滤。当然,如果是一条全新的消息entry,ackSet == null,自然就会读出整个batch。
void receiveIndividualMessagesFromBatch(...) {
for (int i = 0; i < batchSize; ++i) {
// 把batch里面的每一条都单独读出为message
final MessageImpl<T> message = newSingleMessage(i, batchSize, brokerEntryMetadata, msgMetadata,
singleMessageMetadata, uncompressedPayload, batchMessage, schema, true,
ackBitSet, ackSetInMessageId, redeliveryCount, consumerEpoch, isEncrypted);
//...
executeNotifyCallback(message); // 入队
}
}
MessageImpl<V> newSingleMessage(...) {
// 如果已经被ack了,则跳过
if (isSingleMessageAcked(ackBitSet, index)) {
return null;
}
// ...
}
有一个ZeroQueueConsumer可以不设缓冲区,这里不再详细展开。
当消费者真正调用receive()之后,就可以直接incomingMessages.poll()取出单条消息来了。
