Pulsar(2) —— ACK#
正如收消息可以由sdk批,ack也可以由sdk批。
consumer ACK#
consumer调用ack之后,会在sdk暂存起来, 以batch消息为例,计算ackSet (将对应的为置0) 并更新到pending队列中。
CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv msgId) {
ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
MessageIdAdvUtils.discardBatch(msgId), __ -> {
final BitSet ackSet = msgId.getAckSet();
final ConcurrentBitSetRecyclable value;
if (ackSet != null) {
synchronized (ackSet) {
if (!ackSet.isEmpty()) {
value = ConcurrentBitSetRecyclable.create(ackSet);
} else {
value = ConcurrentBitSetRecyclable.create();
value.set(0, msgId.getBatchSize());
}
}
} else {
value = ConcurrentBitSetRecyclable.create();
value.set(0, msgId.getBatchSize());
}
return value;
});
bitSet.clear(msgId.getBatchIndex());
return CompletableFuture.completedFuture(null);
}
sdk需要根据ackSet滤掉已经被ack的message。因此对于batch消息,在ack时,设置bit位可以让broker知道这一个entry中的哪些batch index是被ack过的。
sdk进行消息的生产和消费(主要是ack)时会有以下表现:
- 生产者
- 不开启batch,每条消息生产之后都没有
batch size,entry id都是独立的 - 开启batch消息并设置
batch size,则取决于生产速率,如果达到了timeout则该批消息会提前发出,size小于设定的batch size
- 不开启batch,每条消息生产之后都没有
- 消费者
- 非batch消息,receive得到都是entry独立的单条消息,ack没有特殊处理
- batch消息,receive得到的是一个entry里按
batch index解码出的单条消息- ack之后如上面提到的,同一个entry重发之后会在sdk过滤掉已ack的,避免重复消费
- command ack不会携带
batch size!!! ack set基于org.apache.pulsar.common.util.collections.ConcurrentBitSet,底层是java.util.BitSet,会自动truncate,即抹掉高位0以节省空间。- pb的
ack set结构是Vec<i64>,或者说long[] - 例如
batch size = 65, 已ack的index为64(最后一个,刚好是第二个long的最低位),原本的表示应该是\[0b1..1, 0b0..000 0\] (超过batch size的位置当成已ack,所以为0),这时候高位0自动抹除只剩下了\[0b1..1\]。 - 当整个batch都ack之后,因为全0,导致
ack set为空, 只看ack set和batch size的话跟非batch消息没什么区别 !!! - 因此无法通过
ack set来反推batch size
- pb的
通常情况下调用这个函数来构造ack命令,入参只带了ack set
public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, AckType ackType,
ValidationError validationError, Map<String, Long> properties, long requestId) {
return newAck(consumerId, ledgerId, entryId, ackSet, ackType, validationError,
properties, -1L, -1L, requestId, -1); // 最后一个参数是batch size = -1, 即不设置
}
这种完全依赖broker记录batch size的行为,可以确保ack的时候不会因为sdk导致batch size混乱。但是在broker恢复后entry缓存还未重建,为了尚未完成的batch ack而要把entry读出才能得到batch size,牺牲了性能和灵活性。
broker cursor delete#
broker收到ack后,会通过cursor模块移动相应的指针和更新记录。每个subscription下可能有多个consumer,但是只会有一个cursor,即topic/partition/subscription为单位进行记录。
和ack相关的字段大致如下:
public class ManagedCursorImpl implements ManagedCursor {
Position markDeletePosition; // 记录最大连续位置(下一个位置是空洞位置)
RangeSetWrapper<Position> individualDeletedMessages; // 单条ACK的消息集合,离散的位置
ConcurrentSkipListMap<Position, BitSet> batchDeletedIndexes; // batch消息的子消息ack情况
}
class Position {
int ledgerId;
int entryId;
long[] ackSet;
}
这三个字段从下往上是一个层级关系
batchDeletedIndexes记录了batch消息的ack情况,实际上存储的是ackSet,这里ackSet的位为0代表已经ackindividualDeletedMessages记录了离散的ack记录(区间表示),如果是batch消息,只有在batchDeletedIndexes中的ackSet全为0,才会从中转移到individualDeletedMessages上;如果是非batch消息,那么ack会直接记录。markDeletePosition记录了可以被安全删除的位置,即position最小的空洞的前一个位置,或者说从头开始连续的最大的位置,当空洞合并成了新的区间,且区间的左端点-1是markDeletePosition,那么这个区间的右端点 变会成为新的markDeletePosition
后台会定期的对这些重要的元信息做持久化,便于错误恢复或者节点转移时可以得到尽可能新的ack进度。
consumer REDELIVER#
sdk定时发送重推请求#
在sdk内部维护了一个UnAckedMessageRedeliveryTracker,用于统计需要自动定时重推的消息,由名字可以看出其追踪尚未被用户ack的消息。定期扫描所有尚未ack的消息,如果超时则收集起来,并统一批量发送Redeliver UnackedMessages给broker处理。
private void triggerRedelivery(ConsumerBase<?> consumerBase) {
if (ackTimeoutMessages.isEmpty()) {
return;
}
Set<MessageId> messageIds = TL_MESSAGE_IDS_SET.get();
messageIds.clear();
try {
long now = System.currentTimeMillis();
ackTimeoutMessages.forEach((messageId, timestamp) -> {
if (timestamp <= now) {
addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase);
messageIds.add(messageId);
}
});
if (!messageIds.isEmpty()) {
log.info("[{}] {} messages will be re-delivered", consumerBase, messageIds.size());
Iterator<MessageId> iterator = messageIds.iterator();
while (iterator.hasNext()) {
MessageId messageId = iterator.next();
ackTimeoutMessages.remove(messageId);
}
}
} finally {
if (messageIds.size() > 0) {
consumerBase.onAckTimeoutSend(messageIds);
consumerBase.redeliverUnacknowledgedMessages(messageIds);
}
}
}
broker接收到重推请求后,则会将这批message id推送到dispatcher中,并增加其重推计数,并写入pb对应的redelivery_count字段中。
public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
positions.forEach(redeliveryTracker::incrementAndGetRedeliveryCount);
redeliverUnacknowledgedMessages(consumer);
}
sdk收到之后进行检查,如果订阅了RLQ/DLQ规则且重试次数redelivery_count达到了上限,那么会稍后转投到RLQ/DLQ中。
// ConsumerImpl#messageReceived
if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null) {
if (redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
Collections.singletonList(message));
if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount()) {
// count超过!!之后,这里继续发重推请求,兜底策略,因为一般count就不会超过
redeliverUnacknowledgedMessages(Collections.singleton(message.getMessageId()));
// The message is skipped due to reaching the max redelivery count,
// so we need to increase the available permits
increaseAvailablePermits(cnx);
return;
}
}
}
nack#
nack由用户主动调用consumer.negativeAcknowledge(msg),同样有一个NegativeAcksTracker进行记录,并定时扫描需要重推的消息。也是通过RedeliverUnackedMessages这条命令。
需要注意的是,broker中的重推计数器不会持久化,因此broker挂掉或者一个subscription的所有consumer离线之后,重推计数会归零。
RLQ#
RetryLetterQueue即重试队列,与SDK维护的重试逻辑不同,本质上是一个全新的队列,broker无感。具体而言RLQ表现为:
- 配置dlq规则时使用
enableRetry(true)开启 - 使用
consumer.reconsumerLater(msg)标记消息自动转发,当redelivery_count超过阈值则先转投RLQ - 会订阅一个全新的topic,如果未指定则根据规则自动订阅
- 全新topic意味着与原来独立,broker对待RLQ和对待普通topic是一样的,不会特殊处理。
- 可视作sdk侧有一个新的producer和新的consumer,重投就是生产新消息
- 每次重投会
ack原消息,并生产一条新消息(payload一致)投递,如果设置了timeout则是延迟消息 - RLQ的消息通过
properties进行计数,因此broker可以做到无感{ REAL_TOPIC="persistent://my-property/my-ns/test, ORIGIN_MESSAGE_ID=314:28:-1, RETRY_TOPIC="persistent://my-property/my-ns/my-subscription-retry, RECONSUMETIMES=16 # 每次重投+1 } RECONSUMETIMES超过设定之后会转投DLQ(如果有的话)
DLQ#
DeadLetterQueue即死信队列,本质上与RLQ相似,都是订阅新topic。与RLQ的区别在于:
- 只会sdk内部自动订阅
producer用于转投,consumer需要用户手动订阅 - 没有
RECONSUMETIMES,因为不会重试

下面给出doReconsumeLater()的核心逻辑
// 如果重试次数超过阈值,转投DLQ
if (reconsumeTimes > this.deadLetterPolicy.getMaxRedeliverCount()
&& StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
initDeadLetterProducerIfNeeded().thenAcceptAsync(dlqProducer -> {
try {
TypedMessageBuilder<byte[]> typedMessageBuilderNew =
dlqProducer.newMessage(
Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
.value(retryMessage.getData())
.properties(propertiesMap);
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
copyMessageEventTime(message, typedMessageBuilderNew);
// 投往DLQ
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
consumerDlqMessagesCounter.increment();
// 成功后ack当前消息
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> {
result.complete(null);
}).exceptionally(ex -> {
result.completeExceptionally(ex);
return null;
});
}).exceptionally(ex -> {
result.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
result.completeExceptionally(e);
}
}, internalPinnedExecutor).exceptionally(ex -> {
result.completeExceptionally(ex);
return null;
});
} else {
// 重试次数还没耗尽,继续重投
assert retryMessage != null;
initRetryLetterProducerIfNeeded().thenAcceptAsync(rtlProducer -> {
try {
TypedMessageBuilder<byte[]> typedMessageBuilderNew = rtlProducer
.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
.value(retryMessage.getData())
.properties(propertiesMap);
if (delayTime > 0) {
typedMessageBuilderNew.deliverAfter(delayTime, unit);
}
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
copyMessageEventTime(message, typedMessageBuilderNew);
// 重新发往RLQ
typedMessageBuilderNew.sendAsync()
.thenCompose(
__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) // ack旧消息
.thenAccept(v -> {
result.complete(null);
})
.exceptionally(ex -> {
result.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
result.completeExceptionally(e);
}
}, internalPinnedExecutor).exceptionally(ex -> {
result.completeExceptionally(ex);
return null;
});
}
