diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 8e2954d8ff0..957d8696fb9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1027,6 +1027,17 @@ private void reloadServerSslContext() { return result; } + /** + * Register pre-put hooks and the send-message-back hook into the message + * store. The hooks are executed in order before every {@code putMessage}: + *
+ * Core responsibilities: + *
+ * The cache structure is as follows: {
+ * groupId@topicId@queueId: {
+ * active: ConcurrentSkipListMap
Popped messages are stored here by + * {@link PopConsumerService#popAsync}. The background {@link #run()} thread + * periodically scans the cache and processes expired records: + *
Each {@code groupId@topicId@queueId} entry is backed by a
+ * {@link ConsumerRecords} instance containing two
+ * {@link ConcurrentSkipListMap}s — one for active records and one for
+ * records staged for removal.
+ */
public class PopConsumerCache extends ServiceThread {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -46,6 +81,14 @@ public class PopConsumerCache extends ServiceThread {
private final Consumer Used by {@link #writeRecords} to add popped messages,
+ * {@link #deleteRecords} to remove acked messages, and
+ * {@link #cleanupRecords} to process expired records.
+ */
private final ConcurrentMap Each record is inserted into the {@link ConsumerRecords} for its
+ * {@code groupId@topicId@queueId}. If no entry exists for that key, a
+ * new one is created. The cache size estimate is incremented.
+ *
+ * @param consumerRecordList the popped records to cache
+ */
public void writeRecords(List Uses two {@link ConcurrentSkipListMap}s to separate active and
+ * expiring records for safe two-phase cleanup:
+ * Populated by {@link #stageExpiredRecords} and drained by
+ * {@link PopConsumerCache#cleanupRecords}. Sorted by offset
+ * so that {@link #getMinOffset} can include these records in
+ * the minimum offset computation.
+ */
private final ConcurrentSkipListMap Records are added via {@link #write} when messages are popped,
+ * removed via {@link #delete} when an ack arrives, and moved to
+ * {@link #removeTreeMap} via {@link #stageExpiredRecords} when
+ * the visibility timeout or stay-buffer time expires.
+ *
+ * Sorted by offset for efficient minimum-offset queries
+ * ({@link #getMinOffset}).
+ */
private final ConcurrentSkipListMap Used by the KVStore-based ack path ({@code popConsumerKVServiceEnable=true}).
+ * When a message is popped, a record is written here. When the consumer acks
+ * the message or the visibility timeout expires, the record is deleted or
+ * revived. The default implementation is {@code PopConsumerRocksdbStore}.
+ *
+ * This interface supports three operations:
+ * The visibility timeout ({@code popTime + invisibleTime}) determines when
+ * a popped-but-unacked message becomes eligible for revival. Set by the
+ * consumer (default 60s via {@code DefaultMQPushConsumer#setPopInvisibleTime}).
+ * Can be changed by proxy with config.
+ * Can be extended via {@code ChangeInvisibleTime}.
+ */
@JSONField(ordinal = 5)
private long invisibleTime;
@@ -67,9 +76,31 @@ public int getCode() {
@JSONField(ordinal = 7)
private int attemptTimes;
+ /**
+ * Client-generated idempotency key for FIFO ordered consumption.
+ *
+ * Possible values:
+ * When {@code true}, the reconsume count is not incremented on
+ * revive, so the message will not be prematurely sent to the DLQ due to
+ * repeated visibility timeout extensions. Set via
+ * {@code ChangeInvisibleTimeRequestHeader#isSuspend}.
+ */
@JSONField(ordinal = 9)
private boolean suspend;
@@ -102,7 +133,19 @@ public long getVisibilityTimeout() {
}
/**
- * Key: timestamp(8) + groupId + topicId + queueId + offset
+ * Build the RocksDB key for this record.
+ *
+ * Format:
+ * The {@code visibilityTimeout} is placed first so that records are ordered
+ * by expiration time in RocksDB's SST files. This allows
+ * {@code PopConsumerRocksdbStore#scanExpiredRecords} to use a bounded iterator
+ * to scan only the relevant time window without a full table scan.
+ *
+ * NACK(changeInvisibleTime) will create a new record, and the old one will be deleted.
*/
@JSONField(serialize = false)
public byte[] getKeyBytes() {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java
index dc68f9d9fe5..fcd5826853e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java
@@ -38,6 +38,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * RocksDB-backed implementation of {@link PopConsumerKVStore} for the
+ * KVStore-based Pop ack path.
+ *
+ * Stores Pop consumer records in a dedicated {@code "popState"} column
+ * family. Each record is keyed by {@code visibilityTimeout|groupId@topicId@queueId@offset}
+ * so that {@link #scanExpiredRecords} can efficiently scan only expired
+ * records within a time window without a full table scan.
+ *
+ * Write and delete operations use synchronous flush and WAL for
+ * durability — Pop visibility state is the sole source of truth in the
+ * KVStore path and must survive crashes.
+ */
public class PopConsumerRocksdbStore extends AbstractRocksDBStorage implements PopConsumerKVStore {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -55,31 +68,71 @@ public PopConsumerRocksdbStore(String filePath, long blockCacheSize, long writeB
this.writeBufferSize = writeBufferSize;
}
- // https://www.cnblogs.com/renjc/p/rocksdb-class-db.html
- // https://github.com/johnzeng/rocksdb-doc-cn/blob/master/doc/RocksDB-Tuning-Guide.md
+ /**
+ * Configure RocksDB options for Pop consumer record storage.
+ *
+ * Unlike the parent class defaults, write and delete options enable
+ * WAL and synchronous flush — Pop visibility state is the sole source
+ * of truth and must survive crashes. Compaction is configured to be
+ * aggressive so that expired-then-deleted records are purged promptly,
+ * reclaiming disk space.
+ *
+ * @see rocksdb-class-db
+ * @see RocksDB-Tuning-Guide
+ */
protected void initOptions() {
+ // durability-first: enable WAL and sync flush for pop state recovery
this.options = RocksDBOptionsFactory.createDBOptions();
this.writeOptions = new WriteOptions();
+ // fsync every write to disk
this.writeOptions.setSync(true);
+ // enable WAL
this.writeOptions.setDisableWAL(false);
+ // allow writing throttling under pressure
this.writeOptions.setNoSlowdown(false);
+ // delete must be durable too — otherwise ack can be lost and message revived incorrectly
this.deleteOptions = new WriteOptions();
this.deleteOptions.setSync(true);
this.deleteOptions.setDisableWAL(false);
this.deleteOptions.setNoSlowdown(false);
+ // aggressive compaction to purge expired pop records and reclaim space
this.compactRangeOptions = new CompactRangeOptions();
+ // force compact bottom level
this.compactRangeOptions.setBottommostLevelCompaction(
CompactRangeOptions.BottommostLevelCompaction.kForce);
+ // allow compaction to pause writes
this.compactRangeOptions.setAllowWriteStall(true);
+ // manual compaction runs in parallel with auto-compaction.
+ // Appropriate here because expired Pop records generate tombstones continuously,
+ // and cleanup should not starve RocksDB's normal background work
this.compactRangeOptions.setExclusiveManualCompaction(false);
+ // Allows compaction to move data across levels
this.compactRangeOptions.setChangeLevel(true);
+ // -1 delegates level selection to RocksDB's internal heuristics
this.compactRangeOptions.setTargetLevel(-1);
+ // Splits the compaction work into at most 4 parallel sub-tasks
this.compactRangeOptions.setMaxSubcompactions(4);
}
+ /**
+ * Initialise the RocksDB instance with a dedicated column family for Pop state.
+ *
+ * Two column families are created:
+ * Called by {@link AbstractRocksDBStorage#start()} before the storage
+ * is marked as loaded. Returns {@code false} if any step fails, preventing
+ * all subsequent read/write operations via {@link #hold()}.
+ *
+ * @return {@code true} if the database was opened successfully
+ */
@Override
protected boolean postLoad() {
try {
@@ -111,6 +164,16 @@ public String getFilePath() {
return this.dbPath;
}
+ /**
+ * Batch-write consumer records to RocksDB via a single {@link WriteBatch}.
+ * Key: (popTime + invisibleTime) + groupId + topicId + queueId + offset
+ * value: PopConsumerRecord.toJsonBytes
+ *
+ * Each record is serialized with its visibility-timeout-prefixed key
+ * so that {@link #scanExpiredRecords} can efficiently scan by time range.
+ *
+ * @param consumerRecordList the records to persist
+ */
@Override
public void writeRecords(List Deletion uses the same durability guarantees as writes ({@code sync=true},
+ * WAL enabled)
+ *
+ * @param consumerRecordList the records to remove
+ */
@Override
public void deleteRecords(List Because each record's key is prefixed with {@code visibilityTimeout},
+ * this method uses a RocksDB iterator bounded by {@code [lower, upper)} to
+ * efficiently scan only the relevant time window without a full table scan.
+ *
+ * @param lower inclusive lower bound of the visibility timeout (ms)
+ * @param upper exclusive upper bound of the visibility timeout (ms)
+ * @param maxCount maximum number of records to return
+ * @return up to {@code maxCount} expired records, or an empty list
+ */
@Override
- // https://github.com/facebook/rocksdb/issues/10300
public List If messages were found:
+ * The consumer offset is then committed:
+ * For FIFO consumers, the offset is read from the regular consumer offset.
+ * For non-FIFO consumers, a separate pull offset is used (compatibility with
+ * pull consumer switchover).
+ *
+ * If no offset is stored (first pop), it is initialized via
+ * {@code PopMessageProcessor#getInitOffset} based on {@code initMode}
+ * (beginning or end of the queue).
+ *
+ * If a reset offset exists (offset reset command issued), the cache is
+ * cleared, FIFO lock unlock, and the reset offset takes effect
+ * immediately.
+ *
+ * @param groupId consumer group id
+ * @param topicId topic name
+ * @param queueId queue id
+ * @param initMode consume init mode (min/max)
+ * @param fifo whether this is a FIFO ordered consumption
+ * @return the consume offset to start popping from
+ */
public long getPopOffset(String groupId, String topicId, int queueId, int initMode, boolean fifo) {
// For FIFO messages, the pull offset is not used.
@@ -213,6 +267,7 @@ public long getPopOffset(String groupId, String topicId, int queueId, int initMo
this.brokerController.getConsumerOffsetManager().queryOffset(groupId, topicId, queueId) :
this.brokerController.getConsumerOffsetManager().queryPullOffset(groupId, topicId, queueId);
+ // init offset
if (offset < 0L) {
try {
offset = this.brokerController.getPopMessageProcessor()
@@ -223,6 +278,8 @@ public long getPopOffset(String groupId, String topicId, int queueId, int initMo
throw new RuntimeException(e);
}
}
+
+ // get reset offset
Long resetOffset =
this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topicId, groupId, queueId);
if (resetOffset != null) {
@@ -231,9 +288,29 @@ public long getPopOffset(String groupId, String topicId, int queueId, int initMo
this.brokerController.getConsumerOffsetManager()
.commitOffset("ResetPopOffset", groupId, topicId, queueId, resetOffset);
}
+
return resetOffset != null ? resetOffset : offset;
}
+ /**
+ * Fetch messages from the store with automatic offset correction.
+ * No external callers, except unit tests.
+ *
+ * If the stored offset is behind the actual consume queue offset
+ * ({@code OFFSET_TOO_SMALL}, {@code OFFSET_OVERFLOW_BADLY},
+ * {@code OFFSET_FOUND_NULL}), the offset is corrected and a retry is
+ * issued with the corrected offset. This prevents duplicate messages
+ * when the Pop buffer offset has not yet been committed.
+ *
+ * @param clientHost the client address
+ * @param groupId consumer group id
+ * @param topicId topic name
+ * @param queueId queue id
+ * @param offset the consume offset to start from
+ * @param batchSize max number of messages
+ * @param filter message filter
+ * @return a future completing with the fetch result
+ */
public CompletableFuture Chained via {@link CompletableFuture#thenCompose} from
+ * {@link #getMessageFromTopicAsync}. When the batch is already full
+ * ({@code remain <= 0}), the pending count is added to the context and
+ * the chain stops. Otherwise, messages are fetched from the store and
+ * the result is merged into the context via {@link #handleGetMessageResult}.
+ *
+ * Early termination can occur inside this method when:
+ * Each queue is visited once. For each queue the
+ * {@link #getMessageAsync(CompletableFuture, String, String, String, int, int, MessageFilter, PopConsumerRecord.RetryType)}
+ * method is chained via {@link CompletableFuture#thenCompose}. The chain carries
+ * the accumulated result through all queues, stopping early when the batch is
+ * filled, the queue is blocked, or the inflight threshold is reached.
+ *
+ * Queue iteration order respects {@code priorityOrderAsc} and uses
+ * {@code requestCount} as a round-robin offset for load balancing.
+ *
+ * @param future the accumulator future
+ * @param clientHost the client address
+ * @param groupId consumer group id
+ * @param topicId topic name
+ * @param requestCount round-robin counter for queue selection
+ * @param batchSize max number of messages to return
+ * @param filter message filter expression
+ * @param retryType whether this is a retry topic V1/V2
+ * @return a future completing with the pop result context
+ */
protected CompletableFuture This method coordinates the full Pop lifecycle:
+ * The deletion is a two-step fallback:
+ * memo: Notify polling request when receive orderly ack
+ *
+ * @param popTime the original pop time of the message
+ * @param invisibleTime the original visibility timeout
+ * @param groupId consumer group id
+ * @param topicId topic name
+ * @param queueId queue id
+ * @param offset the acked offset
+ * @return a future that completes with {@code true} on success
+ */
public CompletableFuture refer: ChangeInvisibleTimeProcessor.appendCheckPointThenAckOrigin
+ * This is the KVStore equivalent of {@code ChangeInvisibleTimeProcessor#appendCheckPointThenAckOrigin}.
+ *
+ * A new record with the updated timeout is written to the KVStore, and the
+ * old record (identified by the original {@code popTime + invisibleTime}) is
+ * deleted from the cache and KVStore.
+ *
+ * If the new and old records have the same visibility timeout (e.g. the
+ * consumer extended by the same duration it already had), the delete one is
+ * skipped because the write one already overwrites the old record in RocksDB.
+ *
+ * @param popTime the original pop time
+ * @param invisibleTime the original visibility timeout
+ * @param changedPopTime the new pop time (typically current time)
+ * @param changedInvisibleTime the new visibility timeout
+ * @param groupId consumer group id
+ * @param topicId topic name
+ * @param queueId queue id
+ * @param offset the message offset
+ * @param suspend whether to suspend (nack without incrementing reconsume count)
+ */
public void changeInvisibilityDuration(long popTime, long invisibleTime, long changedPopTime,
long changedInvisibleTime, String groupId, String topicId,
int queueId, long offset, boolean suspend) {
@@ -511,6 +722,7 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime, long ch
// No need to generate new records when the group does not exist,
// because these retry messages will not be consumed by anyone.
+ // default value of popReviveSkipIfGroupAbsent is true
boolean skipWrite = brokerConfig.isPopReviveSkipIfGroupAbsent() &&
!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId);
@@ -528,19 +740,41 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime, long ch
}
// If the new CK has the same key as the old CK (same visibilityTimeout),
- // the write already overwrites the old record in RocksDB, skip delete
+ // the write one already overwrites the old record in RocksDB, skip delete
// to avoid removing the newly written record.
if (skipWrite || ckRecord.getVisibilityTimeout() != ackRecord.getVisibilityTimeout()) {
this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord));
}
}
+ /**
+ * Read the original message from storage for revival.
+ * No external callers, except unit tests.
+ *
+ * Used by {@link #revive(PopConsumerRecord)} when a visibility timeout
+ * expires. Delegates to {@link org.apache.rocketmq.broker.EscapeBridge}
+ * which can read from either the local store or a remote broker's store.
+ *
+ * @param consumerRecord the expired record
+ * @return a triple of (message, info, needRetry)
+ */
// Use broker escape bridge to support remote read
public CompletableFuture Skips the record if the consumer group no longer exists.
+ * Otherwise, reads the original message,
+ * and re-publishes it via {@link #reviveRetry}.
+ *
+ * @param record the expired record to revive
+ * @return a future completing with {@code true} on success
+ */
public CompletableFuture This is the core revival loop called by {@link #run()}:
+ * Each iteration:
+ * Looks up the {@link OrderInfo} for the given topic-group-queue triple.
+ * Delegates to {@link OrderInfo#needBlock} which returns {@code true} if
+ * any message in the current batch is still within its invisible window
+ * and has not yet been ACKed — meaning another consumer is already
+ * working on this queue's ordered batch.
+ */
@Override
public boolean checkBlock(String attemptId, String topic, String group, int queueId, long invisibleTime) {
String key = buildKey(topic, group);
@@ -200,6 +216,7 @@ public void start() {
/**
* mark message is consumed finished. return the consumer offset
+ * called after message ack
*
* @param topic topic
* @param group group
@@ -400,6 +417,7 @@ public static class OrderInfo {
private long popTime;
/**
* the invisibleTime when pop message
+ * it was set the first time when message pop
*/
@JSONField(name = "i")
private Long invisibleTime;
@@ -411,8 +429,12 @@ public static class OrderInfo {
@JSONField(name = "o")
private List Returns {@code true} (must block) only if all of the following hold:
+ * When a consumer Pop s a batch of ordered messages from a queue, the
+ * queue is effectively "locked" until the consumer ACKs or the invisible
+ * time expires. This class uses a {@link HashedWheelTimer} to fire a
+ * notification at the predicted lock-free time, so that other consumers
+ * blocked in long-polling for the same queue can be woken up immediately
+ * rather than waiting for the polling timeout.
+ *
+ * Two notification paths are supported:
+ * Functionally gated by
+ * {@code BrokerConfig#isEnableNotifyAfterPopOrderLockRelease}.
+ */
public class QueueLevelConsumerOrderInfoLockManager {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 65f5f79aec4..3bf35343119 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -26,6 +26,8 @@
import org.apache.rocketmq.broker.lite.LiteMetadataUtil;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.pop.PopConsumerLockService;
+import org.apache.rocketmq.broker.pop.PopConsumerService;
+import org.apache.rocketmq.broker.pop.PopConsumerService;
import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.PopAckConstants;
@@ -54,6 +56,27 @@
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
+/**
+ * Processes consumer ack messages in Pop consumption mode.
+ *
+ * Handles both single ({@link RequestCode#ACK_MESSAGE}) and batch
+ * ({@link RequestCode#BATCH_ACK_MESSAGE}) acks. Each ack is processed
+ * through one of two paths:
+ * Orderly ack is handled separately by {@link #ackOrderly} /
+ * {@link #ackOrderlyNew}, which update the consumer order info and advance
+ * the consumer offset while notifying any long-polling waiters.
+ *
+ * This class also owns and manages the {@link PopReviveService} instances
+ * for the file-based revive path.
+ */
public class AckMessageProcessor implements NettyRequestProcessor {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -116,13 +139,36 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx,
return this.processRequest(ctx.channel(), request, true);
}
+ /**
+ * Process an ack request (single or batch).
+ *
+ * Routes to one of two paths based on {@code popConsumerKVServiceEnable}:
+ * Orderly acks ({@code rqId == POP_ORDER_REVIVE_QUEUE}) are handled by
+ * {@link #ackOrderly} / {@link #ackOrderlyNew} instead.
+ *
+ * @param channel the Netty channel of the requesting client
+ * @param request the incoming request
+ * @param brokerAllowSuspend whether the broker may suspend the request
+ * @return the response to send back to the client
+ * @throws RemotingCommandException if the request cannot be decoded
+ */
private RemotingCommand processRequest(final Channel channel, RemotingCommand request,
boolean brokerAllowSuspend) throws RemotingCommandException {
+ // init context params
AckMessageRequestHeader requestHeader;
BatchAckMessageRequestBody reqBody = null;
final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
response.setOpaque(request.getOpaque());
+
if (request.getCode() == RequestCode.ACK_MESSAGE) {
+ // decode and validate request
requestHeader = (AckMessageRequestHeader) request.decodeCommandCustomHeader(AckMessageRequestHeader.class);
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
@@ -147,6 +193,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
return ackLiteResponse;
}
+ // get and validate offset
long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
long maxOffset;
try {
@@ -162,12 +209,15 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
response.setRemark(errorInfo);
return response;
}
+
+ // append ack, default mode is queue based merge, call appendAck
if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) {
appendAckNew(requestHeader, null, response, channel, null);
} else {
appendAck(requestHeader, null, response, channel, null);
}
} else if (request.getCode() == RequestCode.BATCH_ACK_MESSAGE) {
+ // decode and validate request
if (request.getBody() != null) {
reqBody = BatchAckMessageRequestBody.decode(request.getBody(), BatchAckMessageRequestBody.class);
}
@@ -175,7 +225,10 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
response.setCode(ResponseCode.NO_MESSAGE);
return response;
}
+
+ // process each ack
for (BatchAck bAck : reqBody.getAcks()) {
+ // default value of popConsumerKVServiceEnable is false
if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) {
appendAckNew(null, bAck, response, channel, reqBody.getBrokerName());
} else {
@@ -183,6 +236,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
}
}
} else {
+ // unsupported request, logging and return
POP_LOGGER.error("AckMessageProcessor failed to process RequestCode: {}, consumer: {} ", request.getCode(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark(String.format("AckMessageProcessor failed to process RequestCode: %d", request.getCode()));
@@ -191,8 +245,31 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
return response;
}
+ /**
+ * Append an ack (single or batch) in the file-based path.
+ *
+ * For single ack: parses the extra info from the request header,
+ * routes orderly acks to {@link #ackOrderly}, or creates a single {@link AckMsg}.
+ *
+ * For batch ack: expands the {@link BitSet} from the
+ * {@link BatchAck} into individual offsets, routes orderly acks individually,
+ * and packs the remaining offsets into a {@link BatchAckMsg}.
+ *
+ * The ack is first offered to {@link PopBufferMergeService#addAk}.
+ * If the buffer merge is not available, the ack is serialized as JSON and
+ * written to the revive topic with tag {@link PopAckConstants#ACK_TAG}
+ * or {@link PopAckConstants#BATCH_ACK_TAG}.
+ *
+ * @param requestHeader the single-ack request header (null for batch)
+ * @param batchAck the batch ack body (null for single)
+ * @param response the response to modify on error
+ * @param channel the Netty channel
+ * @param brokerName the broker name
+ * @throws RemotingCommandException if offset validation fails
+ */
private void appendAck(final AckMessageRequestHeader requestHeader, final BatchAck batchAck,
final RemotingCommand response, final Channel channel, String brokerName) throws RemotingCommandException {
+ // init context params
String[] extraInfo;
String consumeGroup, topic;
int qId, rqId;
@@ -200,8 +277,11 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
long popTime, invisibleTime;
AckMsg ackMsg;
int ackCount = 0;
+
+ // ack orderly or set context params
if (batchAck == null) {
// single ack
+ // set context params
extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo());
brokerName = ExtraInfoUtil.getBrokerName(extraInfo);
consumeGroup = requestHeader.getConsumerGroup();
@@ -213,15 +293,18 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
popTime = ExtraInfoUtil.getPopTime(extraInfo);
invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);
+ // ack orderly if revive queue
if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
ackOrderly(topic, consumeGroup, qId, ackOffset, popTime, invisibleTime, channel, response);
return;
}
+ // set ackMsg and ackCount
ackMsg = new AckMsg();
ackCount = 1;
} else {
// batch ack
+ // set context params
consumeGroup = batchAck.getConsumerGroup();
topic = ExtraInfoUtil.getRealTopic(batchAck.getTopic(), batchAck.getConsumerGroup(), batchAck.getRetry());
qId = batchAck.getQueueId();
@@ -231,6 +314,7 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
popTime = batchAck.getPopTime();
invisibleTime = batchAck.getInvisibleTime();
+ // offset check
long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, qId);
long maxOffset;
try {
@@ -243,6 +327,7 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
return;
}
+ // ack orderly or add offset to batchAckMsg
BatchAckMsg batchAckMsg = new BatchAckMsg();
BitSet bitSet = batchAck.getBitSet();
for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
@@ -259,10 +344,13 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
batchAckMsg.getAckOffsetList().add(offset);
}
}
+
+ // skip if empty or is revive queue
if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE || batchAckMsg.getAckOffsetList().isEmpty()) {
return;
}
+ // set ackMsg and ackCount
ackMsg = batchAckMsg;
ackCount = batchAckMsg.getAckOffsetList().size();
}
@@ -270,6 +358,7 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
this.brokerController.getBrokerStatsManager().incBrokerAckNums(ackCount);
this.brokerController.getBrokerStatsManager().incGroupAckNums(consumeGroup, topic, ackCount);
+ // set ackMsg
ackMsg.setConsumerGroup(consumeGroup);
ackMsg.setTopic(topic);
ackMsg.setQueueId(qId);
@@ -278,11 +367,13 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
ackMsg.setPopTime(popTime);
ackMsg.setBrokerName(brokerName);
+ // add ackMsg
if (this.brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(rqId, ackMsg)) {
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount);
return;
}
+ // create revive message by ackMsg, if add ackMsg failed
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(reviveTopic);
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(StandardCharsets.UTF_8));
@@ -300,7 +391,9 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
msgInner.setDeliverTimeMs(popTime + invisibleTime);
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
- if (brokerController.getBrokerConfig().isAppendAckAsync()) {
+
+ // store revive message
+ if (brokerController.getBrokerConfig().isAppendAckAsync()) { // default is false
int finalAckCount = ackCount;
this.brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult -> {
handlePutMessageResult(putMessageResult, ackMsg, topic, consumeGroup, popTime, qId, finalAckCount);
@@ -320,6 +413,7 @@ private void appendAckNew(final AckMessageRequestHeader requestHeader, final Bat
final RemotingCommand response, final Channel channel, String brokerName) throws RemotingCommandException {
if (requestHeader != null && batchAck == null) {
+ // init context params
String[] extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo());
String groupId = requestHeader.getConsumerGroup();
String topicId = requestHeader.getTopic();
@@ -329,6 +423,7 @@ private void appendAckNew(final AckMessageRequestHeader requestHeader, final Bat
long invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);
int reviveQueueId = ExtraInfoUtil.getReviveQid(extraInfo);
+
if (reviveQueueId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
ackOrderlyNew(topicId, groupId, queueId, ackOffset, popTime, invisibleTime, channel, response);
} else {
@@ -339,6 +434,7 @@ private void appendAckNew(final AckMessageRequestHeader requestHeader, final Bat
this.brokerController.getBrokerStatsManager().incBrokerAckNums(1);
this.brokerController.getBrokerStatsManager().incGroupAckNums(groupId, topicId, 1);
} else {
+ // init context params
String groupId = batchAck.getConsumerGroup();
String topicId = ExtraInfoUtil.getRealTopic(
batchAck.getTopic(), batchAck.getConsumerGroup(), batchAck.getRetry());
@@ -349,6 +445,7 @@ private void appendAckNew(final AckMessageRequestHeader requestHeader, final Bat
long invisibleTime = batchAck.getInvisibleTime();
try {
+ // get minOffset and maxOffset
long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topicId, queueId);
long maxOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topicId, queueId);
if (minOffset == -1 || maxOffset == -1) {
@@ -360,6 +457,7 @@ private void appendAckNew(final AckMessageRequestHeader requestHeader, final Bat
// Maintain consistency with the old implementation code style
BitSet bitSet = batchAck.getBitSet();
for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
+ // validate offset
if (i == Integer.MAX_VALUE) {
break;
}
@@ -396,23 +494,53 @@ private void handlePutMessageResult(PutMessageResult putMessageResult, AckMsg ac
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount);
}
+ /**
+ * Handle an ack for an ordered Pop message in the file-based path.
+ *
+ * The flow is:
+ * If {@code commitAndNext} returns {@code -1}, the response is set to
+ * {@code MESSAGE_ILLEGAL} since the offset was not found in the in-flight
+ * order batch. A return value of {@code -2} (popTime mismatch) is
+ * silently ignored — the batch has already been superseded.
+ */
protected void ackOrderly(String topic, String consumeGroup, int qId, long ackOffset, long popTime,
long invisibleTime, Channel channel, RemotingCommand response) {
+ // check offset
String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + PopAckConstants.SPLIT + qId;
long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
if (ackOffset < oldOffset) {
return;
}
+
+ // lock queue
while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) {
}
+
try {
+ // double check offset with lock
oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
if (ackOffset < oldOffset) {
return;
}
+
+ // release orderInfo lock
long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext(
topic, consumeGroup, qId, ackOffset, popTime);
+
if (nextOffset > -1) {
+ // commit offset
if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset(topic, consumeGroup, qId)) {
this.brokerController.getConsumerOffsetManager().commitOffset(
channel.remoteAddress().toString(), consumeGroup, topic, qId, nextOffset);
@@ -420,7 +548,7 @@ protected void ackOrderly(String topic, String consumeGroup, int qId, long ackOf
if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic, consumeGroup, qId, invisibleTime)) {
this.brokerController.getPopMessageProcessor().notifyMessageArriving(topic, qId, consumeGroup);
}
- } else if (nextOffset == -1) {
+ } else if (nextOffset == -1) { // return error
String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s",
lockKey, oldOffset, ackOffset, nextOffset, channel.remoteAddress());
POP_LOGGER.warn(errorInfo);
@@ -428,12 +556,25 @@ protected void ackOrderly(String topic, String consumeGroup, int qId, long ackOf
response.setRemark(errorInfo);
return;
}
- } finally {
+ } finally { // unlock queue
this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
}
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, 1);
}
+ /**
+ * Handle an ack for an ordered Pop message in the KVStore path.
+ *
+ * Mirror of {@link #ackOrderly} but uses the {@link PopConsumerService}
+ * infrastructure: lock service is {@link PopConsumerLockService} keyed by
+ * {@code group@topic} (coarser than the per-queue lock in the file-based
+ * path), and the result of {@code commitAndNext} may be logged when
+ * {@code popConsumerKVServiceLog} is enabled.
+ *
+ * Behavior is otherwise identical: fast-reject, spin-lock, double-check,
+ * advance {@code OrderInfo} commit bit, persist offset, and notify the
+ * long-polling requester when the queue is no longer blocked.
+ */
protected void ackOrderlyNew(String topic, String consumeGroup, int qId, long ackOffset, long popTime,
long invisibleTime, Channel channel, RemotingCommand response) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index 02deeb18a7a..2001cb62e3e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -27,6 +27,7 @@
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.MemoryConsumerOrderInfoManager;
import org.apache.rocketmq.broker.pop.PopConsumerLockService;
+import org.apache.rocketmq.broker.pop.PopConsumerService;
import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.TopicConfig;
@@ -52,6 +53,23 @@
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
+/**
+ * Processes the nack {@code ChangeInvisibleTime} request from consumers.
+ *
+ * When a consumer needs more time to process a message (or wants to
+ * suspend/nack it), this processor updates the message's visibility
+ * timeout. The implementation varies by the ack mode:
+ * For orderly consumption, the next visible time is updated directly in
+ * the {@link ConsumerOrderInfoManager} without writing to the revive topic.
+ */
public class ChangeInvisibleTimeProcessor implements NettyRequestProcessor {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final BrokerController brokerController;
@@ -71,8 +89,12 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx,
private RemotingCommand processRequest(final Channel channel, RemotingCommand request,
boolean brokerAllowSuspend) throws RemotingCommandException {
+ // process request async
CompletableFuture Routes to the appropriate handler based on message type:
+ * Called after the new checkpoint has been written successfully. This method
+ * writes an {@link PopAckConstants#ACK_TAG} message that matches the
+ * original checkpoint's merge key. When {@link PopReviveService} processes this
+ * ack, it sets the corresponding bit in the old CK's bitMap, causing
+ * the old CK to be treated as fully acked and skipped during revive.
+ *
+ * If {@link PopBufferMergeService#addAk} accepts the ack (buffer
+ * merge enabled), it is merged in memory without writing to the store.
+ *
+ * @param requestHeader the original request header
+ * @param extraInfo the extra info from the original pop request
+ * @return a future that completes with {@code true} on success
+ */
private CompletableFuture This is the core of the file-based non-orderly ChangeInvisibleTime path:
+ * Two incoming paths:
+ * For commit:
+ * For rollback: deletes the prepared half message without writing.
+ *
+ * @param ctx the Netty channel context
+ * @param request the end-transaction request
+ * @return the response
+ * @throws RemotingCommandException if the request cannot be decoded
+ */
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
@@ -68,6 +96,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
return response;
}
+ // validate transaction flag and logging
if (requestHeader.getFromTransactionCheck()) {
switch (requestHeader.getCommitOrRollback()) {
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
@@ -127,8 +156,10 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
return null;
}
}
+
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
+ // get prepare message from prepare topic
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
if (rejectCommitOrRollback(requestHeader, result.getPrepareMessage())) {
@@ -137,6 +168,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
requestHeader.getMsgId(), requestHeader.getCommitLogOffset());
return response;
}
+
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
@@ -145,8 +177,10 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
+ // enqueue message to original topic
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
+ // delete prepare message
deletePrepareMessage(result);
// successful committed, then total num of half-messages minus 1
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getTopic(), -1);
@@ -164,6 +198,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
+ // get prepare message from prepare topic
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
if (rejectCommitOrRollback(requestHeader, result.getPrepareMessage())) {
@@ -172,8 +207,10 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
requestHeader.getMsgId(), requestHeader.getCommitLogOffset());
return response;
}
+
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
+ // delete prepare message
deletePrepareMessage(result);
// roll back, then total num of half-messages minus 1
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC), -1);
@@ -189,6 +226,21 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
return response;
}
+ /**
+ * Delete a prepared (half) message after transaction commit or rollback.
+ *
+ * Deletion strategy depends on the half-message storage:
+ * Supports four actions on each {@link LiteSubscriptionDTO} entry:
+ * Quota and ACL errors surface as
+ * {@link ResponseCode#LITE_SUBSCRIPTION_QUOTA_EXCEEDED} and
+ * {@link ResponseCode#ILLEGAL_OPERATION} respectively.
+ */
public class LiteSubscriptionCtlProcessor implements NettyRequestProcessor {
protected final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LITE_LOGGER_NAME);
@@ -48,6 +68,12 @@ public LiteSubscriptionCtlProcessor(BrokerController brokerController, LiteSubsc
this.liteSubscriptionRegistry = liteSubscriptionRegistry;
}
+ /**
+ * Process a batch of subscription control requests. Each entry is validated
+ * and dispatched to {@link LiteSubscriptionRegistry} according to its
+ * action. Blank fields cause the entry to be skipped with a warning rather
+ * than failing the whole batch.
+ */
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
if (request.getBody() == null) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 5373eaea333..b17c1c7e410 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -44,10 +44,56 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * File based Ack buffer merge service.
+ *
+ * buffer checkpoint in memory then enqueue them into system revive queue then wait to be acked.
+ *
+ * Two in-memory data structures drive the merge logic:
+ * The background {@link #scan()} thread periodically evaluates each buffered CK:
+ * This service is enabled by {@code enablePopBufferMerge} and only runs on
+ * a master or a slave acting as master. When {@code enablePopBatchAck} is set,
+ * multiple ack offsets are packed into a single {@link BatchAckMsg}.
+ */
public class PopBufferMergeService extends ServiceThread {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
+ /**
+ * In-memory map of check points.
+ * Key: topic + group + queueId + startOffset + popTime + brokerName
+ * Value: check point wrapper
+ * use cases:
+ * - scan: iterate buffer
+ * - addAckMsg: get check point from buffer and mark ack state of Check Point
+ */
ConcurrentHashMap For each {@code topic@cid@queueId} queue, the method peeks the head (oldest)
+ * wrapper and checks whether it is ready to commit:
+ * If the head is ready, it is committed and removed. Processing continues
+ * to the next wrapper in the same queue. If the head is not ready, the loop
+ * breaks — this ensures strict FIFO order and prevents consumer offset
+ * regression.
+ *
+ * Called at the end of {@link #scan()} after the buffer has been processed.
+ *
+ * @return the total number of remaining wrappers across all queues (for logging)
+ */
private int scanCommitOffset() {
Iterator Three types of entries are removed:
+ * For each entry in {@link #buffer}:
+ * After processing the buffer, calls {@link #scanCommitOffset()} to commit offsets
+ * for finished checkpoints in FIFO order.
+ *
+ * If the scan duration exceeds {@code popCkStayBufferTimeOut - 1000ms}, the service
+ * temporarily stops accepting new CKs ({@link #serving} = false) to avoid backlog.
+ */
private void scan() {
long startTime = System.currentTimeMillis();
AtomicInteger count = new AtomicInteger(0);
@@ -244,7 +352,6 @@ private void scan() {
continue;
}
-
// just process offset(already stored at pull thread), or buffer ck(not stored and ack finish)
if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() || isCkDone(pointWrapper)
|| isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {
@@ -259,6 +366,7 @@ private void scan() {
PopCheckPoint point = pointWrapper.getCk();
long now = System.currentTimeMillis();
+ // check whether check point is timeout
boolean removeCk = !this.serving;
// ck will be timeout
if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {
@@ -275,17 +383,18 @@ private void scan() {
}
// double check
- if (isCkDone(pointWrapper)) {
+ if (isCkDone(pointWrapper)) { // all checkpoint are acked, do nothing
continue;
- } else if (pointWrapper.isJustOffset()) {
+ } else if (pointWrapper.isJustOffset()) { // store checkpoint
// just offset should be in store.
if (pointWrapper.getReviveQueueOffset() < 0) {
putCkToStore(pointWrapper, this.brokerController.getBrokerConfig().isAppendCkAsync());
countCk++;
}
continue;
- } else if (removeCk) {
+ } else if (removeCk) { // store checkpoint if needed
// put buffer ak to store
+ // revive queue offset < 0 means checkpoint was not stored
if (pointWrapper.getReviveQueueOffset() < 0) {
putCkToStore(pointWrapper, this.brokerController.getBrokerConfig().isAppendCkAsync());
countCk++;
@@ -295,11 +404,13 @@ private void scan() {
continue;
}
- if (brokerController.getBrokerConfig().isEnablePopBatchAck()) {
+ // store checkpoint
+ if (brokerController.getBrokerConfig().isEnablePopBatchAck()) { // default is false
List Uses a CAS (compare-and-swap) loop to ensure thread safety without locking.
+ * If the bit is already set, this method returns immediately (no-op).
+ *
+ * @param setBits the atomic bitmask to update
+ * @param index the bit position (0-based)
+ */
private void markBitCAS(AtomicInteger setBits, int index) {
while (true) {
int bits = setBits.get();
@@ -384,6 +508,22 @@ private void markBitCAS(AtomicInteger setBits, int index) {
}
}
+ /**
+ * Commit the consumer offset for the checkpoint's {@code topic@cid@queueId}.
+ *
+ * Called from {@link #scanCommitOffset()} after the checkpoint is confirmed
+ * as finished (all acks received or CK stored). The offset is advanced to
+ * {@link PopCheckPointWrapper#nextBeginOffset}, which is the offset of the
+ * first message after this batch.
+ *
+ * The operation is guarded by {@link PopMessageProcessor.QueueLockManager}
+ * to prevent concurrent offset updates on the same queue.
+ *
+ * @param wrapper the finished checkpoint wrapper
+ * @return {@code true} if the offset was committed or no commit is needed
+ * ({@code nextBeginOffset < 0}); {@code false} if the lock could
+ * not be acquired (caller should retry later)
+ */
private boolean commitOffset(final PopCheckPointWrapper wrapper) {
if (wrapper.getNextBeginOffset() < 0) {
return true;
@@ -413,8 +553,25 @@ private boolean commitOffset(final PopCheckPointWrapper wrapper) {
return true;
}
+ /**
+ * Enqueue the checkpoint wrapper into the per-{@code topic@cid@queueId} offset queue
+ * for sequential offset committing.
+ *
+ * The queue is maintained in FIFO order. The {@link #scanCommitOffset()} method
+ * drains the queue from the head, ensuring that offsets are committed in the same
+ * order as the checkpoints were created, which prevents consumer offset regression.
+ *
+ * The {@link QueueWithTime#time} is also updated to the CK's pop time so that
+ * {@link #scanGarbage()} can identify and remove stale entries after 5 minutes of
+ * inactivity.
+ *
+ * @param pointWrapper the checkpoint wrapper to enqueue
+ * @return true if the element was added to the queue successfully
+ */
private boolean putOffsetQueue(PopCheckPointWrapper pointWrapper) {
QueueWithTime The ack is not written to the revive topic immediately. Instead, a flag is
+ * set in {@link PopCheckPointWrapper#bits} via {@link #markBitCAS}.
+ * The pending ack will later be flushed to storage by {@link #scan()} when the
+ * checkpoint is evicted (timeout / buffer full / service stopping).
+ *
+ * Rejection conditions (return false):
+ * Every sub-message has a corresponding bit in
+ * {@link PopCheckPointWrapper#bits}. This method returns {@code true} when
+ * all bits are set, meaning the CK can be removed from the buffer without
+ * writing any ack to the revive topic (clean completion).
+ *
+ * @param pointWrapper the checkpoint wrapper to check
+ * @return {@code true} if every sub-message has been acked
+ */
private boolean isCkDone(PopCheckPointWrapper pointWrapper) {
byte num = pointWrapper.getCk().getNum();
for (byte i = 0; i < num; i++) {
@@ -807,6 +1053,18 @@ private boolean isCkDone(PopCheckPointWrapper pointWrapper) {
return true;
}
+ /**
+ * Check whether all acked sub-messages have been fully persisted.
+ *
+ * Uses XOR: {@code bits ^ toStoreBits}. A bit is set in the result when
+ * the corresponding sub-message has been acked ({@code bits}) but not yet
+ * persisted ({@code toStoreBits}). Returns {@code true} only when every
+ * acked message has also been persisted, meaning the checkpoint is ready
+ * for final cleanup.
+ *
+ * @param pointWrapper the checkpoint wrapper to check
+ * @return {@code true} if no ack remains to be persisted
+ */
private boolean isCkDoneForFinish(PopCheckPointWrapper pointWrapper) {
byte num = pointWrapper.getCk().getNum();
int bits = pointWrapper.getBits().get() ^ pointWrapper.getToStoreBits().get();
@@ -842,17 +1100,46 @@ public LinkedBlockingDeque Three-state indicator:
+ * When {@code true}:
+ * This is the core processor for the Pop consumption mode. It handles:
+ * This class also owns the {@link PopLongPollingService},
+ * {@link PopBufferMergeService}, and {@link QueueLockManager} instances
+ * used by the file-based ack path.
+ */
public class PopMessageProcessor implements NettyRequestProcessor {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -217,13 +236,36 @@ public void notifyMessageArriving(final String topic, final int queueId, final S
topic, queueId, cid, false, null, 0L, null, null);
}
+ /**
+ * Process a PopMessage request.
+ *
+ * This method handles the full Pop lifecycle:
+ * Queues are visited sequentially (respecting {@code priorityOrderAsc}).
+ * For each queue a {@link #popMsgFromQueue} call is chained via
+ * {@code CompletableFuture#thenCompose}. The chained future carries the
+ * remaining number of messages still needed ({@code restNum}).
+ *
+ * Early termination can occur inside {@link #popMsgFromQueue} when:
+ * This method is called as a step in a {@link CompletableFuture} chain
+ * (see {@link #popMsgFromTopic}). The {@code restNum} argument is the
+ * number of messages still needed — when it drops to {@code 0} or below,
+ * subsequent calls in the chain may short-circuit early.
+ *
+ * The method has several early-termination paths (all return
+ * immediately with the current {@code restNum}):
+ * Otherwise, it asynchronously fetches messages from the store, handles
+ * offset correction, updates order-consume tracking / checkpoint data, and
+ * merges the results into {@code getMessageResult}.
+ *
+ * @param topic topic name
+ * @param attemptId attempt id for idempotent consumption
+ * @param isRetry whether this is a retry topic
+ * @param getMessageResult accumulator for messages popped so far
+ * @param requestHeader pop request parameters
+ * @param queueId target queue id
+ * @param restNum number of messages still needed before the batch
+ * size is satisfied
+ * @param reviveQid revive queue id for checkpoint
+ * @param channel netty channel of the requesting client
+ * @param popTime pop invocation timestamp
+ * @param messageFilter expression filter applied to each message
+ * @param startOffsetInfo buffer for offset tracing info
+ * @param msgOffsetInfo buffer for per-message offset tracing info
+ * @param orderCountInfo buffer for order-consume count info
+ * @return a future completing with the remaining number of messages needed
+ */
private CompletableFuture There is only one public method for business: run Each revive queue has its own dedicated {@code PopReviveService} instance.
+ * The service periodically:
+ * This is the file-based revive path (CK + Ack messages are stored in
+ * the system revive topic). It is complemented by the KVStore-based path in
+ * {@code PopConsumerService} which handles the {@code PopConsumerKVStore} flow.
+ */
public class PopReviveService extends ServiceThread {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final int[] ckRewriteIntervalsInSeconds = new int[] { 10, 20, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200 };
@@ -74,6 +93,25 @@ public class PopReviveService extends ServiceThread {
private long currentReviveMessageTimestamp = -1;
private volatile boolean shouldRunPopRevive = false;
+ /**
+ * Tracks checkpoints that are currently being revived.
+ *
+ * Key — the checkpoint being processed.
+ * Value — a pair of (startTime, completed), where:
+ * The map is sorted by {@link PopCheckPoint#compareTo} (by startOffset).
+ * This ordering is used to drain completed entries from the head, ensuring
+ * the revive topic offset is committed strictly in sequence.
+ *
+ * Concurrency is limited to at most 3 entries at a time (see
+ * {@link #mergeAndRevive}). If an entry stays incomplete for over 30
+ * seconds, it is considered hung and is skipped via {@link #rePutCK}.
+ */
private final NavigableMap Constructs a new {@link MessageExtBrokerInner} from the original
+ * message, increments the reconsume count (unless suspended), sets the
+ * first-pop time and origin group properties, and writes it to the
+ * appropriate retry topic (V1 or V2 depending on configuration).
+ *
+ * If the retry topic does not exist, it is created automatically
+ * via {@link #addRetryTopicIfNotExist}.
+ *
+ * @param popCheckPoint the checkpoint that triggered the revive
+ * @param messageExt the original message to re-publish
+ * @return {@code true} if the message was written successfully
+ */
private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) {
+ // convert checkpoint to inner message
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
if (!popCheckPoint.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
msgInner.setTopic(KeyBuilder.buildPopRetryTopic(popCheckPoint.getTopic(), popCheckPoint.getCId(), brokerController.getBrokerConfig().isEnableRetryTopicV2()));
@@ -133,9 +187,15 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt)
}
msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP, popCheckPoint.getCId());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+
+ // set topic and queueId
addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId());
msgInner.setQueueId(getRetryQueueId(msgInner.getTopic(), messageExt));
+
+ // store message
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+
+ // logging and metric
brokerController.getBrokerMetricsManager().getPopMetricsManager().incPopReviveRetryMessageCount(popCheckPoint, putMessageResult.getPutMessageStatus());
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},retry msg, ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ",
@@ -205,6 +265,17 @@ private int getRetryQueueId(String retryTopic, MessageExt messageExt) {
return oriQueueId;
}
+ /**
+ * Pull a batch of messages from the revive topic at the given offset.
+ *
+ * If the offset becomes illegal (e.g. the revive topic was truncated),
+ * the revive offset is corrected to {@code nextBeginOffset - 1} so that
+ * the next scan starts from a valid position.
+ *
+ * @param offset the queue offset to start reading from
+ * @param queueId the revive queue id
+ * @return a list of decoded messages, or {@code null} if at the tail
+ */
protected List This method reads messages from the revive topic starting from the
+ * current offset. Each message is classified by its tag:
+ * AckMsg that arrive after their checkpoint has already been processed
+ * ({@code enableSkipLongAwaitingAck}) are handled by creating a mock CK
+ * via {@link #mockCkForAck} so that the revive offset can still be
+ * committed correctly.
+ *
+ * The scan stops when any of:
+ * When an ack arrives long after its CK has been consumed (e.g. network
+ * delay), the CK is no longer in the scan map. If {@code enableSkipLongAwaitingAck}
+ * is enabled, this method creates a synthetic CK so that the revive offset
+ * can still be advanced correctly in {@link #mergeAndRevive}.
+ *
+ * @param messageExt the revive topic message that carried the ack
+ * @param ackMsg the decoded ack
+ * @param mergeKey the merge key for the CK lookup
+ * @param mockPointMap map to collect the mock CKs
+ * @return {@code true} if a mock CK was created
+ */
private boolean mockCkForAck(MessageExt messageExt, AckMsg ackMsg, String mergeKey, HashMap The mock CK has {@code num = 0} and empty bitMap, meaning no actual
+ * messages to revive. Its only purpose is to carry the {@code reviveOffset}
+ * so that the revive consumer offset can be committed past this ack.
+ *
+ * @param ackMsg the ack message
+ * @param reviveOffset the queue offset of the ack message in the revive topic
+ * @return a mock checkpoint with no sub-messages
+ */
private PopCheckPoint createMockCkForAck(AckMsg ackMsg, long reviveOffset) {
PopCheckPoint point = new PopCheckPoint();
point.setStartOffset(ackMsg.getStartOffset());
@@ -496,7 +634,26 @@ private PopCheckPoint createMockCkForAck(AckMsg ackMsg, long reviveOffset) {
return point;
}
+ /**
+ * Process collected checkpoints and revive all un-acked sub-messages.
+ *
+ * Checkpoints are sorted by revive offset. For each one:
+ * After processing, the revive topic offset is advanced past all
+ * processed checkpoints.
+ *
+ * @param consumeReviveObj the container with collected CKs and scan state
+ * @throws Throwable if any revive operation fails
+ */
protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwable {
+ // sort checkpoints and init newOffset
ArrayList For each sub-message whose bit is not set in the bitMap, the original
+ * message is fetched via {@link #getBizMessage} and re-published to the
+ * retry topic via {@link #reviveRetry}. All revive attempts run
+ * concurrently via {@link CompletableFuture#allOf}.
+ *
+ * After all attempts complete:
+ * When a sub-message cannot be revived (e.g. the original message is
+ * temporarily unavailable), the CK is re-published with:
+ * If {@code rePutTimes} exceeds the backoff table length and
+ * {@code skipWhenCKRePutReachMaxTimes} is set, the CK is dropped.
+ *
+ * @param oldCK the original checkpoint that failed to revive
+ * @param pair the failed offset and result (object1 = offset, object2 = result)
+ */
private void rePutCK(PopCheckPoint oldCK, Pair Each iteration:
+ * Before sending, the message headers are restored to the real
+ * business topic and queue so the producer can identify the original
+ * message. The network request carries commitLogOffset, msgId,
+ * transactionId, and queue offset for the producer to look up the local
+ * transaction state. If the producer's channel is no longer connected,
+ * the check is skipped with a warning.
+ */
public void sendCheckMessage(MessageExt msgExt) throws Exception {
+ // format request header and message
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setTopic(msgExt.getTopic());
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
@@ -60,17 +72,33 @@ public void sendCheckMessage(MessageExt msgExt) throws Exception {
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
+
+ // find channel, channel can send message to client
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
+
if (channel != null) {
+ // invoke channel.writeAndFlush() -> GrpcClientChannel.processCheckTransaction()
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}
+ /**
+ * Asynchronously contact the producer to check the status of a half
+ * message (commit, rollback, or unknown).
+ *
+ * This is invoked by the transaction check loop when a half message
+ * has aged past its immunity window without an OP record. The actual
+ * network request is dispatched to the dedicated
+ * {@code Transaction-msg-check-thread} pool so the caller is not blocked.
+ * If the pool is full, the {@code CallerRunsPolicy} will execute the task
+ * on the caller thread, applying back-pressure to the check loop.
+ */
public void resolveHalfMsg(final MessageExt msgExt) {
if (executorService != null) {
+ // executorService thread pool(2~5 threads)
executorService.execute(new Runnable() {
@Override
public void run() {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageCheckService.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageCheckService.java
index 52209c3fbdb..33c75166256 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageCheckService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageCheckService.java
@@ -22,6 +22,16 @@
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+/**
+ * Background service that periodically triggers transaction status checks.
+ * only one public method: run, and calls TransactionalMessageService.check()
+ *
+ * Runs at a configurable interval ({@code transactionCheckInterval}). Each
+ * iteration calls
+ * {@link TransactionalMessageService#check(long, int, AbstractTransactionalMessageCheckListener)}
+ * which scans the half-message topic for unresolved transactions and either
+ * initiates a broker-side check-back or discards expired messages.
+ */
public class TransactionalMessageCheckService extends ServiceThread {
private static final Logger log = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
index 6770561823f..aac3addcfbe 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
@@ -37,6 +37,15 @@ public DefaultTransactionalMessageCheckListener() {
super();
}
+ /**
+ * Move a half message to the dead-letter system topic
+ * {@code TRANS_CHECK_MAXTIME_TOPIC} when it has been checked too many
+ * times without a definitive commit or rollback.
+ *
+ * Once moved, the message is no longer tracked by the transaction check
+ * loop and will never be delivered to the consumer. This prevents endless
+ * rechecking of messages whose producer is permanently unable to respond.
+ */
@Override
public void resolveDiscardMsg(MessageExt msgExt) {
log.error("MsgExt:{} has been checked too many times, so discard it by moving it to system topic TRANS_CHECK_MAXTIME_TOPIC", msgExt);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/MessageQueueOpContext.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/MessageQueueOpContext.java
index e8e5f13de6b..1057749e5a3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/MessageQueueOpContext.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/MessageQueueOpContext.java
@@ -19,11 +19,25 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * Write buffer for transaction OP (operation) queue offsets.
+ *
+ * In the transaction message flow, commit/rollback operations produce OP
+ * records that are written to the {@code RMQ_SYS_TRANS_OP_HALF_TOPIC}.
+ * Instead of writing each OP individually, offsets are buffered here and
+ * flushed in batches to reduce I/O.
+ *
+ * The {@link #contextQueue} holds batched offset strings, while
+ * {@link #totalSize} tracks the accumulated count and
+ * {@link #lastWriteTimestamp} controls flush timing.
+ */
public class MessageQueueOpContext {
private AtomicInteger totalSize = new AtomicInteger(0);
private volatile long lastWriteTimestamp;
+ // offset1, offset2, offsetN, ...
private LinkedBlockingQueue The method:
+ * This is used when re-putting a half message back to the HALF topic
+ * during the transaction check-back process. The
+ * {@code PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET} property records the
+ * original queue offset so that later checks can determine whether the
+ * producer has already committed or rolled back the message within the
+ * immunity window.
+ *
+ * @param msgExt the original half message
+ * @return a new half message with the prepared queue offset preserved
+ */
public MessageExtBrokerInner renewImmunityHalfMessageInner(MessageExt msgExt) {
MessageExtBrokerInner msgInner = renewHalfMessageInner(msgExt);
String queueOffsetFromPrepare = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
index 2f05bee0040..9e3d46653f3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
@@ -67,6 +67,10 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
private static final int SLEEP_WHILE_NO_OP = 1000;
+ /**
+ * deleted offset queue map
+ * only one key: 0
+ */
private final ConcurrentHashMap Each time the message is checked, the {@code TRANSACTION_CHECK_TIMES}
+ * property is incremented. When it reaches {@code transactionCheckMax},
+ * the message is considered expired and will be discarded via
+ * {@link AbstractTransactionalMessageCheckListener#resolveDiscardMsg}.
+ *
+ * @param msgExt the prepared message being checked
+ * @param transactionCheckMax maximum allowed check attempts
+ * @return {@code true} if the message should be discarded
+ */
private boolean needDiscard(MessageExt msgExt, int transactionCheckMax) {
String checkTimes = msgExt.getProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES);
int checkTime = 1;
@@ -120,6 +138,19 @@ private boolean needDiscard(MessageExt msgExt, int transactionCheckMax) {
return false;
}
+ /**
+ * Check whether the prepared message should be skipped because its birth
+ * time exceeds the commit log's file reserved time.
+ * isExpired maybe a better method name
+ *
+ * If the message has been in the half topic longer than
+ * {@code fileReservedTime} hours, the corresponding commit log data may
+ * have already been deleted. The message is skipped rather than checked
+ * to avoid unnecessary IO and potential errors.
+ *
+ * @param msgExt the prepared message being checked
+ * @return {@code true} if the message should be skipped
+ */
private boolean needSkip(MessageExt msgExt) {
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
if (valueOfCurrentMinusBorn
@@ -158,10 +189,37 @@ private boolean putBackHalfMsgQueue(MessageExt msgExt, long offset) {
}
}
+ /**
+ * Scan the half-message topic and compare with the OP topic to find
+ * unresolved transactions. called by independent thread(TransactionalMessageCheckService).
+ *
+ * For each queue in the HALF topic:
+ * The immunity time is the minimum duration the broker must wait
+ * before initiating a transaction status check-back to the producer.
+ * If the producer specifies a custom value via
+ * {@code PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS}, it is used (converted
+ * from seconds to millis). Otherwise, the default
+ * {@code transactionTimeout} is returned.
+ *
+ * @param checkImmunityTimeStr the custom immunity time string, may be null
+ * @param transactionTimeout the default transaction timeout
+ * @return the immunity time in milliseconds
+ */
private long getImmunityTime(String checkImmunityTimeStr, long transactionTimeout) {
long checkImmunityTime;
@@ -378,6 +486,7 @@ private long getImmunityTime(String checkImmunityTimeStr, long transactionTimeou
*/
private PullResult fillOpRemoveMap(HashMap There are three cases:
+ * The transaction checker later reads the OP topic and skips any half
+ * message whose offset appears in the OP stream.
+ *
+ * @param messageExt the prepared (half) message to delete
+ * @return {@code true} if the OP record was written successfully
+ */
@Override
public boolean deletePrepareMessage(MessageExt messageExt) {
Integer queueId = messageExt.getQueueId();
MessageQueueOpContext mqContext = deleteContext.get(queueId);
+
+ // init mq op context if not exist
if (mqContext == null) {
mqContext = new MessageQueueOpContext(System.currentTimeMillis(), 20000);
MessageQueueOpContext old = deleteContext.putIfAbsent(queueId, mqContext);
@@ -605,11 +770,17 @@ public boolean deletePrepareMessage(MessageExt messageExt) {
}
}
+ // the body of OP_Message is the offset of Half_Message
+ // every Half_Message store a lot of offset, split by comma
+ // default number of offset is 4096
String data = messageExt.getQueueOffset() + TransactionalMessageUtil.OFFSET_SEPARATOR;
try {
+ // add offset to context queue
boolean res = mqContext.getContextQueue().offer(data, 100, TimeUnit.MILLISECONDS);
+ // if offer succeed, wait for batch write
if (res) {
int totalSize = mqContext.getTotalSize().addAndGet(data.length());
+ // default value of transactionOpMsgMaxSize is 4096
if (totalSize > transactionalMessageBridge.getBrokerController().getBrokerConfig().getTransactionOpMsgMaxSize()) {
this.transactionalOpBatchService.wakeup();
}
@@ -620,6 +791,7 @@ public boolean deletePrepareMessage(MessageExt messageExt) {
} catch (InterruptedException ignore) {
}
+ // if failed to enqueue offset to memory queue, write to OP topic
Message msg = getOpMessage(queueId, data);
if (this.transactionalMessageBridge.writeOp(queueId, msg)) {
log.warn("Force add remove op data. queueId={}", queueId);
@@ -653,6 +825,17 @@ public void close() {
this.getTransactionMetrics().persist();
}
+ /**
+ * build op message with data in deleteContext.get(queueId)
+ * - topic: op_topic
+ * - tag: REMOVE_TAG
+ * - body: moreData(prepareOffset + ",")
+ * + prepareOffset in deleteContext.get(queueId)
+ *
+ * @param queueId prepare message queueId
+ * @param moreData prepare message offset list
+ * @return op message
+ */
public Message getOpMessage(int queueId, String moreData) {
String opTopic = TransactionalMessageUtil.buildOpTopic();
MessageQueueOpContext mqContext = deleteContext.get(queueId);
@@ -695,17 +878,36 @@ public Message getOpMessage(int queueId, String moreData) {
return new Message(opTopic, TransactionalMessageUtil.REMOVE_TAG,
sb.toString().getBytes(TransactionalMessageUtil.CHARSET));
}
+
+ /**
+ * Flush buffered delete offsets for all queues to the OP topic.
+ * Called by independent thread(TransactionalOpBatchService)
+ *
+ * Iterates over each per-queue {@link MessageQueueOpContext}. If the
+ * buffer has data and the time since the last write exceeds
+ * {@code transactionOpBatchInterval} (or the buffer is oversized), the
+ * buffered offsets are drained via {@link #getOpMessage}, combined into
+ * a single OP message, and written via
+ * {@link TransactionalMessageBridge#writeOp}.
+ *
+ * Called by {@link TransactionalOpBatchService#onWaitEnd()}.
+ *
+ * @return the earliest wakeup timestamp for the next flush, or 0 if no
+ * waiting is needed
+ */
public long batchSendOpMessage() {
- long startTime = System.currentTimeMillis();
try {
+ long startTime = System.currentTimeMillis();
long firstTimestamp = startTime;
Map Half-message delete offsets are accumulated in per-queue
+ * {@link MessageQueueOpContext} buffers. This service wakes up either when the
+ * buffer exceeds {@code transactionOpMsgMaxSize} or when the time-based
+ * {@code transactionOpBatchInterval} elapses, calling
+ * {@link TransactionalMessageServiceImpl#batchSendOpMessage()} to batch-write
+ * the buffered offsets as a single OP message.
+ */
public class TransactionalOpBatchService extends ServiceThread {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java
index dbd3575d69c..372f04d5585 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java
@@ -136,23 +136,31 @@ private void checkTransRecordsStatus(List All methods are static and called sequentially from
+ * {@code SendMessageProcessor#asyncSendMessage}:
+ * If any step returns a non-null {@link PutMessageResult}, the operation is
+ * aborted immediately.
+ */
public class HookUtils {
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -58,6 +76,13 @@ public class HookUtils {
*/
private static final Integer MAX_TOPIC_LENGTH = 255;
+ /**
+ * Pre-put message validation: guards against writes when the store is
+ * shut down, in slave mode (non-duplication), not writable, topic too long,
+ * body null, or OS page cache busy.
+ *
+ * @return null if the check passes, or a rejection {@link PutMessageResult}
+ */
public static PutMessageResult checkBeforePutMessage(BrokerController brokerController, final MessageExt msg) {
if (brokerController.getMessageStore().isShutdown()) {
LOG.warn("message store has shutdown, so putMessage is forbidden");
@@ -109,6 +134,14 @@ public static PutMessageResult checkBeforePutMessage(BrokerController brokerCont
return null;
}
+ /**
+ * Check inner-batch sysFlag consistency
+ * There is no inner-batch after v5.0.0
+ *
+ * @param brokerController brokerController(object container)
+ * @param msg msg
+ * @return putMessageResult
+ */
public static PutMessageResult checkInnerBatch(BrokerController brokerController, final MessageExt msg) {
if (msg.getProperties().containsKey(MessageConst.PROPERTY_INNER_NUM)
&& !MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
@@ -127,12 +160,31 @@ public static PutMessageResult checkInnerBatch(BrokerController brokerController
return null;
}
+ /**
+ * Route timer or delay-level messages to the appropriate system topic.
+ *
+ * For non-transaction or committed messages, two checks run in order:
+ * Parses the delivery time from {@code PROPERTY_TIMER_DELAY_SEC},
+ * {@code PROPERTY_TIMER_DELAY_MS}, or {@code PROPERTY_TIMER_DELIVER_MS}.
+ * The time is aligned to {@code timerPrecisionMs} boundaries to match
+ * the TimerWheel tick resolution.
+ *
+ * The original topic and queue are saved as properties.
+ * topic was changed to {@link TimerMessageStore#TIMER_TOPIC},
+ * queue was changed to 0
+ *
+ * Rejection conditions:
+ * Each message is sent with {@code waitStoreMsgOK=false} and a 3s timeout.
+ * Messages are removed from the list on success; on any failure the entire
+ * batch is aborted and {@code false} is returned.
+ */
public static boolean sendMessageBack(BrokerController brokerController, List Provides common CRUD operations ({@code put}, {@code get}, {@code delete},
+ * {@code batchPut}, {@code rangeDelete}, {@code iterate}), lifecycle management
+ * ({@code start}, {@code shutdown}), automatic recovery on corruption
+ * ({@code scheduleReloadRocksdb}), and manual compaction scheduling.
+ *
+ * Subclasses define column families in {@link #postLoad()} and handle
+ * their own cleanup in {@link #preShutdown()}.
+ */
public abstract class AbstractRocksDBStorage {
protected static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME);
@@ -76,10 +87,14 @@ public abstract class AbstractRocksDBStorage {
protected RocksDB db;
protected DBOptions options;
+ // WriteOptions with WAL disabled for high-throughput index writes that can be rebuilt
protected WriteOptions writeOptions;
+ // WriteOptions with WAL enabled for durability-critical writes (trans, timer checkpoints)
protected WriteOptions ableWalWriteOptions;
+ // ReadOptions using prefix seek (fast, index-friendly)
protected ReadOptions readOptions;
+ // ReadOptions using total-order seek (slower, required for range scans without prefix)
protected ReadOptions totalOrderReadOptions;
protected CompactionOptions compactionOptions;
@@ -93,8 +108,10 @@ public abstract class AbstractRocksDBStorage {
protected volatile boolean loaded;
protected CompressionType compressionType = CompressionType.LZ4_COMPRESSION;
+ // Set to true when a reload is scheduled, causing hold() to reject new operations
private volatile boolean closed;
+ // Guard to ensure only one reload attempt at a time
private final Semaphore reloadPermit = new Semaphore(1);
private final ScheduledExecutorService reloadScheduler = ThreadUtils.newScheduledThreadPool(1, new ThreadFactoryImpl("RocksDBStorageReloadService_"));
private final ThreadPoolExecutor manualCompactionThread = (ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor(
@@ -175,6 +192,16 @@ protected void initFlushOptions() {
this.flushOptions = new FlushOptions();
}
+ /**
+ * check RocksDB status. isReady maybe a better name.
+ *
+ * Called before every read/write operation. Returns {@code true} if the
+ * database is fully loaded, the handle is non-null, and the instance has not
+ * been closed (e.g. due to a scheduled reload). Subclasses may override
+ * {@link #release()} to pair with this call (e.g. for reference counting).
+ *
+ * @return {@code true} if the database is ready for operations
+ */
public boolean hold() {
if (!this.loaded || this.db == null || this.closed) {
LOGGER.error("hold rocksdb Failed. {}", this.dbPath);
@@ -333,6 +360,17 @@ public void iterate(ColumnFamilyHandle columnFamilyHandle, final byte[] prefix,
iterate(columnFamilyHandle, prefix, null, null, callback);
}
+ /**
+ * Iterate over keys in a column family with optional prefix and range bounds.
+ *
+ * If a prefix is given without an explicit start, the prefix serves as
+ * the lower bound and iteration continues until keys deviate from the prefix.
+ * If a start key is given, it takes precedence over the prefix for seeking.
+ *
+ * @param prefix optional lower-bound prefix; iteration stops when key deviates
+ * @param start optional explicit start key (overrides prefix for seek)
+ * @param end optional upper bound (exclusive)
+ */
public void iterate(ColumnFamilyHandle columnFamilyHandle, byte[] prefix,
final byte[] start, final byte[] end, BiConsumer An {@code int} bitmask is used to track the ack state of up to 32 sub-messages
+ * within a single Pop checkpoint (see {@code PopCheckPoint}).
+ */
public class DataConverter {
public static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+ /**
+ * Convert a {@code long} to an 8-byte array (big-endian).
+ */
public static byte[] Long2Byte(Long v) {
ByteBuffer tmp = ByteBuffer.allocate(8);
tmp.putLong(v);
return tmp.array();
}
+ /**
+ * Set or clear the bit at {@code index} in an int bitmask.
+ * Uses {@code 1L} (long literal) to avoid signed-int overflow when {@code index == 31}.
+ *
+ * @param value the original bitmask
+ * @param index the bit position (0-based, 0..31)
+ * @param flag {@code true} to set, {@code false} to clear
+ * @return the updated bitmask
+ */
public static int setBit(int value, int index, boolean flag) {
if (flag) {
return (int) (value | (1L << index));
@@ -36,6 +54,13 @@ public static int setBit(int value, int index, boolean flag) {
}
}
+ /**
+ * Test whether the bit at {@code index} is set in an int bitmask.
+ *
+ * @param value the bitmask
+ * @param index the bit position (0-based, 0..31)
+ * @return {@code true} if the bit is 1
+ */
public static boolean getBit(int value, int index) {
return (value & (1L << index)) != 0;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
index 1b38a19ae6a..94e4f4ffc8b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
@@ -55,6 +55,14 @@
public class ProxyStartup {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ /**
+ * proxy components container, manager components with method start/shutdown/...
+ * - gRPC thread pool executor
+ * - message processor (wrap broker controller)
+ * - grpc server
+ * - remoting protocol server
+ * - ...
+ */
private static final ProxyStartAndShutdown PROXY_START_AND_SHUTDOWN = new ProxyStartAndShutdown();
private static class ProxyStartAndShutdown extends AbstractStartAndShutdown {
@@ -73,8 +81,10 @@ public static void main(String[] args) {
// init thread pool monitor for proxy.
initThreadPoolMonitor();
+ // init business thread pool for grpc server
ThreadPoolExecutor executor = createServerExecutor();
+ // create message processor, wrap broker controller in local mode
MessagingProcessor messagingProcessor = createMessagingProcessor();
// tls cert update
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 30681098358..6c606d06a4a 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -264,7 +264,7 @@ public class ProxyConfig implements ConfigFile {
private String remotingAccessAddr = "";
private int remotingListenPort = 8080;
- // related to proxy's send strategy in cluster mode.
+ // related to proxy's sending strategy in cluster mode.
private boolean sendLatencyEnable = false;
private boolean startDetectorEnable = false;
private int detectTimeout = 200;
@@ -272,9 +272,38 @@ public class ProxyConfig implements ConfigFile {
private int remotingHeartbeatThreadPoolNums = 2 * PROCESSOR_NUMBER;
private int remotingTopicRouteThreadPoolNums = 2 * PROCESSOR_NUMBER;
+ /**
+ * thread pool number for
+ * 1. send message(and send message v2)
+ * 2. send batch message
+ * 3. consume send message back
+ * 4. end transaction
+ * 5. recall message
+ */
private int remotingSendMessageThreadPoolNums = 4 * PROCESSOR_NUMBER;
+ /**
+ * thread pool number for
+ * 1. pull message
+ * 2. lite pull message
+ * 3. pop message
+ */
private int remotingPullMessageThreadPoolNums = 4 * PROCESSOR_NUMBER;
+ /**
+ * thread pool number for
+ * 1. update consumer offset
+ * 2. ack message
+ * 3. change message invisible time
+ * 4. get consumer connection list
+ */
private int remotingUpdateOffsetThreadPoolNums = 4 * PROCESSOR_NUMBER;
+ /**
+ * thread pool number for
+ * 1. unregister client
+ * 2. check client config
+ * 3. get consumer list by group
+ * 4. get min/max offset, query consume offset, search offset by timestamp
+ * 5. lock/unlock batch mq
+ */
private int remotingDefaultThreadPoolNums = 4 * PROCESSOR_NUMBER;
private int remotingHeartbeatThreadPoolQueueCapacity = 50000;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
index 3429ad54e27..12508d32108 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
@@ -73,6 +73,15 @@
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseWriter;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+/**
+ * RocketMQ gRPC protocol implementation
+ *
+ * Uses double-checked locking with {@link #telemetryWriteLock} to
+ * safely handle concurrent writes. If the underlying gRPC stream is
+ * closed or throws {@link StatusRuntimeException} /
+ * {@link IllegalStateException} (e.g. client disconnected), the observer
+ * is cleared so subsequent writes are silently skipped.
+ */
public void writeTelemetryCommand(TelemetryCommand command) {
StreamObserver When auto-renew is enabled ({@code enableProxyAutoRenew}), the proxy
+ * periodically extends the invisible time of delivered but unacked messages
+ * so that they are not revived while the consumer is still processing them.
+ *
+ * This method extracts the {@code PROPERTY_POP_CK} from each popped
+ * message, wraps it into a {@link MessageReceiptHandle}, and registers it
+ * via {@link MessagingProcessor#addReceiptHandle}. The returned
+ * {@link Runnable} is executed after the response has been written to the
+ * client stream.
+ *
+ * @param ctx the proxy context
+ * @param request the original receive-message request
+ * @param group consumer group
+ * @param topic topic name
+ * @param popResult the pop result returned from the broker
+ * @param writer the response stream writer
+ * @return a runnable to execute after the response write, or {@code null}
+ * if no messages were found
+ */
private Runnable handleAutoRenew(ProxyContext ctx, ReceiveMessageRequest request,
String group, String topic, PopResult popResult, ReceiveMessageResponseStreamWriter writer
) {
+ // check result status
if (!PopStatus.FOUND.equals(popResult.getPopStatus())) {
return null;
}
+ // get socket channel
GrpcClientChannel clientChannel = grpcChannelManager.getChannel(ctx.getClientID());
if (clientChannel == null) {
GrpcProxyException e = new GrpcProxyException(Code.MESSAGE_NOT_FOUND,
@@ -207,6 +246,7 @@ private Runnable handleAutoRenew(ProxyContext ctx, ReceiveMessageRequest request
writer.processThrowableWhenWriteMessage(e, ctx, request, messageExt));
throw e;
}
+
return () -> {
List Owns a {@link DefaultReceiptHandleManager} and wires its
+ * {@link RenewEvent} listener to {@link MessagingProcessor#changeInvisibleTime}.
+ * When a receipt handle is about to expire, the manager fires a {@code RENEW}
+ * event which this processor translates into a
+ * {@code ChangeInvisibleTime} call.
+ *
+ * When the renewal limit is reached, a {@code STOP_RENEW} event fires
+ * which nacks the message via {@code changeInvisibleTime} with the group's
+ * retry policy delay.
+ */
public class ReceiptHandleProcessor extends AbstractProcessor {
protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected DefaultReceiptHandleManager receiptHandleManager;
+ /**
+ * Wire the receipt handle manager to the messaging processor.
+ *
+ * pass StateEventListener to DefaultReceiptHandleManager
+ * so that when DefaultReceiptHandleManager find the message is expired,
+ * call StateEventListener to change the invisible time of the message.
+ *
+ * Creates an event listener that translates all {@link RenewEvent}
+ * types ({@code RENEW}, {@code STOP_RENEW}, {@code CLEAR_GROUP}) into
+ * {@link MessagingProcessor#changeInvisibleTime} calls, which update
+ * the message's visibility timeout on the broker.
+ *
+ * @param messagingProcessor the core messaging processor
+ * @param serviceManager the service manager providing metadata and consumer services
+ */
public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) {
super(messagingProcessor, serviceManager);
+
+ // create event listener
StateEventListener When auto-renew is enabled, popped messages are registered here with their
+ * {@code PROPERTY_POP_CK} data. A periodic {@link #scheduledExecutorService} scans
+ * all registered handles and extends the invisible time for messages that are
+ * about to expire. When the total renewal duration exceeds
+ * {@code renewMaxTimeMillis}, the message is nack'd and returned to the broker.
+ *
+ * Handles are grouped by {@link ReceiptHandleGroupKey} (channel + consumer group)
+ * and cleaned up automatically when a gRPC client disconnects.
+ */
public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implements ReceiptHandleManager {
protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected final MetadataService metadataService;
@@ -77,6 +89,8 @@ public DefaultReceiptHandleManager(MetadataService metadataService, ConsumerMana
this.consumerManager = consumerManager;
this.eventListener = eventListener;
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+
+ // by default, minThreadNum is 2, maxThreadNum is 4
this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor(
proxyConfig.getRenewThreadPoolNums(),
proxyConfig.getRenewMaxThreadPoolNums(),
@@ -84,6 +98,8 @@ public DefaultReceiptHandleManager(MetadataService metadataService, ConsumerMana
"RenewalWorkerThread",
proxyConfig.getRenewThreadPoolQueueCapacity()
);
+
+ // by default, minThreadNum is 2, maxThreadNum is 4
this.returnHandleGroupWorkerService = ThreadPoolMonitor.createAndMonitor(
proxyConfig.getReturnHandleGroupThreadPoolNums(),
proxyConfig.getReturnHandleGroupThreadPoolNums() * 2,
@@ -91,6 +107,8 @@ public DefaultReceiptHandleManager(MetadataService metadataService, ConsumerMana
"ReturnHandleGroupWorkerThread",
proxyConfig.getRenewThreadPoolQueueCapacity()
);
+
+ // clear receipt by group when consumer unregister
consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() {
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
@@ -115,11 +133,15 @@ public void shutdown() {
}
});
+
this.receiptHandleGroupMap = new ConcurrentHashMap<>();
this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size()));
+
+ // add periodic scan task
this.appendStartAndShutdown(new StartAndShutdown() {
@Override
public void start() throws Exception {
+ // by default, interval is 5000ms
scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0,
ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS);
}
@@ -154,6 +176,23 @@ protected boolean clientIsOffline(ReceiptHandleGroupKey groupKey) {
return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null;
}
+ /**
+ * Periodic scan of all receipt handle groups, called by the
+ * {@link #scheduledExecutorService} at a fixed interval.
+ *
+ * For each group:
+ * The scan runs synchronously in the scheduler thread; the actual
+ * renewal work is dispatched asynchronously to the worker pool.
+ */
protected void scheduleRenewTask() {
Stopwatch stopwatch = Stopwatch.createStarted();
try {
@@ -191,15 +230,39 @@ protected void renewMessage(ProxyContext context, ReceiptHandleGroupKey key, Rec
}
}
+ /**
+ * Renew a single message's visibility timeout, or stop if the renewal
+ * limit has been reached.
+ *
+ * Decision logic:
+ * The method:
+ * Disk flush and HA replication run in parallel via
+ * {@link CompletableFuture#thenCombine}. If either fails, the combined
+ * result is updated with the failure status — both must succeed for
+ * the overall result to be {@code PUT_OK}.
+ *
+ * @param putMessageResult the append result to update
+ * @param messageExt the original message (needed by flush)
+ * @param needAckNums number of slave acks required (0/1 = no HA)
+ * @param needHandleHA whether HA replication is configured
+ * @return a future completing with the merged result
+ */
private CompletableFuture The difference between getData is:
+ * getMessage add process: setInCache
+ *
+ * Finds the mapped file containing the offset and selects a buffer
+ * for the given size. The returned buffer includes cache-status metadata
+ * for cold-data flow control.
+ *
+ * @param offset physical offset in the commit log
+ * @param size number of bytes to read
+ * @return the mapped buffer, or {@code null} if the file is unavailable
+ */
public SelectMappedBufferResult getMessage(final long offset, final int size) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index aee767dae2f..9bfbfca6961 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -211,6 +211,13 @@ public class DefaultMessageStore implements MessageStore {
private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
+ /**
+ * BatchDispatchRequest queue
+ * offer by ConcurrentReputMessageService.createBatchDispatchRequest()
+ * poll by MainBatchDispatchRequestService.pollBatchDispatchRequest()
+ *
+ * if enableBuildConsumeQueueConcurrently is false, It is useless
+ */
private final ConcurrentLinkedQueue Before writing, any registered {@link PutMessageHook} instances are
+ * invoked — a non-null result from a hook short-circuits the process.
+ * Inner-batch message flags are validated
+ * then the actual write is delegated to {@link CommitLog#asyncPutMessage}.
+ *
+ * @param msg the message to write
+ * @return a future that completes with the put result
+ */
@Override
public CompletableFuture The method:
+ * The lookup strategy:
+ * When a batch of messages is popped, the queue offsets of the messages may not
+ * be contiguous (e.g. batch messages, ConsumeQueue compaction, filter mismatch gaps).
+ * This list records {@code actualQueueOffset - startOffset} for each message in the
+ * batch, so that the system can correctly map an ack offset back to its index within
+ * the checkpoint via {@link #indexOfAck}, and reconstruct the original offset via
+ * {@link #ackOffsetByIndex}.
+ *
+ * When this field is null or empty (old-version CK), offsets are assumed to be
+ * {@code startOffset + index}.
+ */
@JSONField(name = "d")
private List The index is used to look up the corresponding bit in the {@link #bitMap}
+ * (or in {@code PopCheckPointWrapper.bits}) and to retrieve the original
+ * queue offset via {@link #ackOffsetByIndex}.
+ *
+ * @param ackOffset the queue offset being acked
+ * @return the sub-message index (0-based), or -1 if the offset is not found
+ * in this checkpoint
+ */
public int indexOfAck(long ackOffset) {
if (ackOffset < startOffset) {
return -1;
}
- // old version of checkpoint
+ // old version of checkpoint, this will not happen in 5.*
if (queueOffsetDiff == null || queueOffsetDiff.isEmpty()) {
if (ackOffset - startOffset < num) {
@@ -184,8 +233,16 @@ public int indexOfAck(long ackOffset) {
return queueOffsetDiff.indexOf((int) (ackOffset - startOffset));
}
+ /**
+ * get original queue offset by index.
+ * the method name is miss-leading, it should be getQueueOffsetByIndex.
+ * queueOffset = startOffset + queueOffsetDiff[index]
+ *
+ * @param index sub-message index within this checkpoint (0-based)
+ * @return the original queue offset in the consume queue
+ */
public long ackOffsetByIndex(byte index) {
- // old version of checkpoint
+ // old version of checkpoint, this will not happen in 5.*
if (queueOffsetDiff == null || queueOffsetDiff.isEmpty()) {
return startOffset + index;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorage.java b/store/src/main/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorage.java
index d55596a293c..68844f095e1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorage.java
+++ b/store/src/main/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorage.java
@@ -57,19 +57,37 @@
import static org.apache.rocketmq.store.timer.rocksdb.TimerRocksDBRecord.TIMER_ROCKSDB_PUT;
import static org.apache.rocketmq.store.timer.rocksdb.TimerRocksDBRecord.TIMER_ROCKSDB_UPDATE;
+/**
+ * RocksDB-based storage engine for index, timer, and transaction data.
+ *
+ * Manages three column families:
+ * The index key is structured as {@code hour | KEY_SPLIT | topic | KEY_SPLIT |
+ * indexType | KEY_SPLIT | key | KEY_SPLIT | offsetPy}.
+ * Iterates hour-by-hour within {@code [beginTime, endTime]}, extracting
+ * the trailing offset bytes from each matching key.
+ *
+ * @param lastKey cursor for pagination, format "hour|topic|indexType|key|offsetPy"
+ */
public List For {@code TIMER_ROCKSDB_UPDATE}, the write is skipped if the key was
+ * recently deleted (tracked in {@link #DELETE_KEY_CACHE_FOR_TIMER}) to
+ * avoid resurrecting a stale timer entry.
+ */
public void writeRecordsForTimer(byte[] columnFamily, List Half-message records are put and track the maximum offsetPy for
+ * {@code LAST_OFFSET_PY}. OP (commit/rollback) records are deleted from
+ * the column family. The metadata key {@code LAST_OFFSET_PY} is updated
+ * atomically in the same write batch.
+ */
public void writeRecordsForTrans(byte[] columnFamily, List Three tiers of flow control based on the number of messages already
+ * scheduled for the given delivery time:
+ * Replaces the CommitLog-based HALF/OP Topic approach with a RocksDB
+ * column family ({@link MessageRocksDBStorage#TRANS_COLUMN_FAMILY}). When
+ * a half or OP message is dispatched from the CommitLog, this store builds
+ * an index entry (or tombstone) in RocksDB, enabling O(1) state lookups
+ * during transaction check-back instead of scanning HALF/OP queues.
+ *
+ * Index building is asynchronous: {@link #buildTransIndex} enqueues
+ * {@link TransRocksDBRecord}s into a bounded blocking queue, and a
+ * background {@link TransIndexBuildService} batches them into RocksDB
+ * writes.
+ */
public class TransMessageRocksDBStore implements CommitLogDispatchStore {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final Logger logError = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
@@ -64,8 +78,10 @@ public class TransMessageRocksDBStore implements CommitLogDispatchStore {
private final MessageRocksDBStorage messageRocksDBStorage;
private final BrokerStatsManager brokerStatsManager;
private final SocketAddress storeHost;
+ // Thread-local buffer for reading messages from CommitLog, grown on demand
private ThreadLocal Skips records whose CommitLog offset is already covered by the RocksDB
+ * {@code lastOffsetPy} watermark (idempotent on re-dispatch).
+ *
+ * For {@code RMQ_SYS_ROCKSDB_TRANS_HALF_TOPIC}: creates a half-message
+ * record (offsetPy, topic, uniqKey, size, checkTimes=0).
+ * For {@code RMQ_SYS_ROCKSDB_TRANS_OP_HALF_TOPIC}: creates a tombstone
+ * record referencing the half-message's offset via
+ * {@code PROPERTY_TRANS_OFFSET}.
+ */
public void buildTransIndex(DispatchRequest dispatchRequest) {
+ // validate request and init context params
if (null == dispatchRequest || dispatchRequest.getCommitLogOffset() < 0L || dispatchRequest.getMsgSize() <= 0 || state != RUNNING || null == this.originTransMsgQueue) {
logError.error("TransMessageRocksDBStore buildTransIndex error, dispatchRequest: {}, state: {}, originTransMsgQueue: {}", dispatchRequest, state, originTransMsgQueue);
return;
@@ -119,6 +148,8 @@ public void buildTransIndex(DispatchRequest dispatchRequest) {
int reqMsgSize = dispatchRequest.getMsgSize();
try {
MessageExt msgInner = getMessage(reqOffsetPy, reqMsgSize);
+
+ // parse and validate msgInner
if (null == msgInner) {
logError.error("TransMessageRocksDBStore buildTransIndex error, msgInner is not found, reqOffsetPy: {}, reqMsgSize: {}", reqOffsetPy, reqMsgSize);
return;
@@ -131,6 +162,7 @@ public void buildTransIndex(DispatchRequest dispatchRequest) {
}
TransRocksDBRecord transRocksDBRecord = null;
String reqTopic = dispatchRequest.getTopic();
+
if (TopicValidator.RMQ_SYS_ROCKSDB_TRANS_HALF_TOPIC.equals(reqTopic)) {
transRocksDBRecord = new TransRocksDBRecord(reqOffsetPy, topic, uniqKey, reqMsgSize, 0);
} else if (TopicValidator.RMQ_SYS_ROCKSDB_TRANS_OP_HALF_TOPIC.equals(reqTopic)) {
@@ -148,6 +180,7 @@ public void buildTransIndex(DispatchRequest dispatchRequest) {
logError.error("TransMessageRocksDBStore buildTransIndex error, transOffsetPy: {}, error: {}", transOffsetPy, e.getMessage());
}
}
+
if (null != transRocksDBRecord) {
while (!originTransMsgQueue.offer(transRocksDBRecord, 3, TimeUnit.SECONDS)) {
if (System.currentTimeMillis() % 1000 == 0) {
@@ -160,6 +193,13 @@ public void buildTransIndex(DispatchRequest dispatchRequest) {
}
}
+ /**
+ * Persist an OP (commit/rollback) message to CommitLog. The message is
+ * written to {@code RMQ_SYS_ROCKSDB_TRANS_OP_HALF_TOPIC} with
+ * {@code PROPERTY_TRANS_OFFSET} pointing back to the half message's
+ * commitLog offset. When the CommitLog dispatcher processes it,
+ * {@link #buildTransIndex} converts it into a RocksDB tombstone delete.
+ */
public void deletePrepareMessage(MessageExt messageExt) {
if (null == messageExt) {
logError.error("TransMessageRocksDBStore deletePrepareMessage error, messageExt is null");
@@ -214,6 +254,13 @@ public MessageRocksDBStorage getMessageRocksDBStorage() {
return messageRocksDBStorage;
}
+ /**
+ * Build an OP message for the RocksDB trans OP half topic.
+ *
+ * Body is a single fill byte (actual data is in properties).
+ * {@code PROPERTY_TRANS_OFFSET} carries the half message's commitLog
+ * offset so {@link #buildTransIndex} can issue a RocksDB delete.
+ */
private MessageExtBrokerInner makeOpMessageInner(MessageExt messageExt) {
if (null == messageExt) {
logError.error("TransMessageRocksDBStore makeOpMessageInner messageExt is null");
@@ -246,6 +293,11 @@ private MessageExtBrokerInner makeOpMessageInner(MessageExt messageExt) {
}
}
+ /**
+ * Look up how many times a half message has been checked by the
+ * transaction checker. Returns null if the record does not exist
+ * (e.g. already committed/rolled back or never indexed).
+ */
public Integer getCheckTimes(String topic, String uniqKey, Long offsetPy) {
if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(uniqKey) || null == offsetPy || offsetPy < 0L) {
return null;
@@ -262,6 +314,13 @@ public Integer getCheckTimes(String topic, String uniqKey, Long offsetPy) {
}
}
+ /**
+ * Called during CommitLog recovery to decide whether a mapped file's
+ * data has already been indexed in RocksDB.
+ *
+ * @return true if the file's phyOffset is covered by the trans column
+ * family's lastOffsetPy watermark, meaning no re-dispatch needed.
+ */
public boolean isMappedFileMatchedRecover(long phyOffset, long storeTimestamp,
boolean recoverNormally) throws RocksDBException {
if (!storeConfig.isTransRocksDBEnable()) {
@@ -287,6 +346,14 @@ private String getServiceThreadName() {
return brokerIdentifier;
}
+ /**
+ * Background service that drains {@link #originTransMsgQueue} in batches
+ * and writes them to the trans column family via
+ * {@link MessageRocksDBStorage#writeRecordsForTrans}.
+ *
+ * Polls up to {@link #BATCH_SIZE} records per iteration. Continues
+ * draining even after shutdown is requested to avoid data loss.
+ */
public class TransIndexBuildService extends ServiceThread {
private final Logger log = TransMessageRocksDBStore.log;
private List Key format: {@code offsetPy + "@" + topic + "@" + uniqKey} Two record types share this structure:
+ * When {@code isOp=true}, this record signals a RocksDB delete
+ * (tombstone) — no value is persisted.
+ */
public TransRocksDBRecord(long offsetPy, String topic, String uniqKey, boolean isOp) {
this.offsetPy = offsetPy;
this.topic = topic;
@@ -53,6 +92,12 @@ public TransRocksDBRecord(long offsetPy, String topic, String uniqKey, boolean i
public TransRocksDBRecord() {}
+ /**
+ * get rocksDB record key:
+ * offsetPy + KEY_SPLIT + topic + KEY_SPLIT + uniqKey(transactionId)
+ *
+ * @return get key bytes
+ */
public byte[] getKeyBytes() {
if (offsetPy < 0L || StringUtils.isEmpty(topic) || StringUtils.isEmpty(uniqKey)) {
return null;
@@ -62,6 +107,12 @@ public byte[] getKeyBytes() {
return ByteBuffer.allocate(keyLength).putLong(offsetPy).put(keySuffixBytes).array();
}
+ /**
+ * Serialize value as {@code [checkTimes (4 bytes) | sizePy (4 bytes)]}.
+ * Total encoded length is {@link #VALUE_LENGTH} (8 bytes).
+ *
+ * @return encoded value bytes, or null if checkTimes or sizePy is invalid
+ */
public byte[] getValueBytes() {
if (checkTimes < 0 || sizePy <= 0) {
logError.error("TransRocksDBRecord getValueBytes error, checkTimes: {}, sizePy: {}", checkTimes, sizePy);
@@ -70,6 +121,12 @@ public byte[] getValueBytes() {
return ByteBuffer.allocate(VALUE_LENGTH).putInt(checkTimes).putInt(sizePy).array();
}
+ /**
+ * Deserialize a record from its RocksDB key-value pair.
+ *
+ * Key layout: {@code [offsetPy (8 bytes)][suffix ("@" + topic + "@" + uniqKey)]}
+ *
+ */
protected static class ConsumerRecords {
private final String groupId;
private final String topicId;
private final int queueId;
private final BrokerConfig brokerConfig;
+ /**
+ * Staged records awaiting cleanup (revival or KVStore write).
+ *
+ *
+ *
+ */
public interface PopConsumerKVStore {
/**
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
index d10b584ef69..73c85311614 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
@@ -58,6 +58,15 @@ public int getCode() {
@JSONField(ordinal = 4)
private int retryFlag;
+ /**
+ * Message visibility timeout in milliseconds.
+ *
+ *
+ *
+ */
@JSONField(ordinal = 8)
private String attemptId;
+ /**
+ * Whether the consumer has suspended (nacked) this message.
+ *
+ *
+ * visibilityTimeout(8B) + groupId + '@' + topicId + '@' + queueId(4B) + '@' + offset(8B)
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * @param context the pop context to update
+ * @param result the result from the message store
+ * @param topicId topic name
+ * @param queueId queue id
+ * @param retryType whether this is a retry topic V1/V2
+ * @param offset the original consume offset used for this fetch
+ * @return the updated pop context
+ */
public PopConsumerContext handleGetMessageResult(PopConsumerContext context, GetMessageResult result,
String topicId, int queueId, PopConsumerRecord.RetryType retryType, long offset) {
@@ -205,6 +236,29 @@ public PopConsumerContext handleGetMessageResult(PopConsumerContext context, Get
return context;
}
+ /**
+ * Retrieve the starting consume offset for a pop request.
+ * should be private, no external callers.
+ *
+ *
+ *
+ *
+ * @param future the accumulator future carrying the pop context
+ * @param clientHost the client address
+ * @param groupId consumer group id
+ * @param topicId topic name
+ * @param queueId queue id
+ * @param batchSize max number of messages still needed
+ * @param filter message filter
+ * @param retryType whether this is a retry topic V1/V2
+ * @return a future completing with the pop context updated with results
+ */
protected CompletableFuture
+ *
+ *
+ * @param clientHost the client address
+ * @param popTime the pop invocation timestamp
+ * @param invisibleTime the message visibility timeout
+ * @param groupId consumer group id
+ * @param topicId topic name
+ * @param queueId queue id (-1 for all queues)
+ * @param batchSize max number of messages to return
+ * @param fifo whether this is a FIFO ordered consumption
+ * @param attemptId attempt id for idempotent consumption
+ * @param initMode consume init mode (min/max)
+ * @param filter message filter expression
+ * @return a future that completes with the pop result context
+ */
public CompletableFuture
+ *
+ *
+ *
+ *
+ *
+ * @param currentTime tracks the last scanned visibility timeout (for incremental progress)
+ * @param maxCount maximum number of records to process per batch(load from config: 16 * 1024)
+ * @return the number of consumed (revived) records
+ */
public long revive(AtomicLong currentTime, int maxCount) {
Stopwatch stopwatch = Stopwatch.createStarted();
long upperTime = System.currentTimeMillis() - 50L;
+
+ // scan expired records between [currentTime-3s, now-50ms)]
List
+ *
+ */
@Override
public void run() {
this.consumerRunning.set(true);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerManager.java
index 6f496fa13b3..88ac91149b8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerManager.java
@@ -84,7 +84,9 @@ protected void updateLockFreeTimestamp(String topic, String group, int queueId,
/**
* update the message list received
+ * called after message pop
*
+ * @param attemptId attemptId
* @param isRetry is retry topic or not
* @param topic topic
* @param group group
@@ -97,6 +99,7 @@ protected void updateLockFreeTimestamp(String topic, String group, int queueId,
public void update(String attemptId, boolean isRetry, String topic, String group, int queueId, long popTime,
long invisibleTime,
List
+ *
+ */
+ @JSONField(serialize = false, deserialize = false)
+ public boolean needBlock(String attemptId, long currentInvisibleTime) {
+ // all offsets are not consumed, do not block
if (offsetList == null || offsetList.isEmpty()) {
return false;
}
+
+ // same request, do not block
if (this.attemptId != null && this.attemptId.equals(attemptId)) {
return false;
}
+
int num = offsetList.size();
int i = 0;
if (this.invisibleTime == null || this.invisibleTime <= 0) {
this.invisibleTime = currentInvisibleTime;
}
long currentTime = System.currentTimeMillis();
+
for (; i < num; i++) {
if (isNotAck(i)) {
+ // calculate nextVisibleTime
long nextVisibleTime = popTime + invisibleTime;
if (offsetNextVisibleTime != null) {
Long time = offsetNextVisibleTime.get(this.getQueueOffset(i));
@@ -550,10 +593,14 @@ public boolean needBlock(String attemptId, long currentInvisibleTime) {
nextVisibleTime = time;
}
}
+
+ // if offset is not expired, block
if (currentTime < nextVisibleTime) {
return true;
}
}
+
+ // if acked, do nothing
}
return false;
}
@@ -671,6 +718,7 @@ public boolean isNotAck(int offsetIndex) {
@JSONField(serialize = false, deserialize = false)
public void mergeOffsetConsumedCount(String preAttemptId, List
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * @param channel the Netty channel
+ * @param request the incoming request
+ * @param brokerAllowSuspend whether the broker may suspend
+ * @return a future that completes with the response
+ * @throws RemotingCommandException if the request cannot be decoded
+ */
public CompletableFuture
+ *
+ *
+ * @param requestHeader the original request header
+ * @param reviveQid the revive queue to write to
+ * @param queueId the original queue id
+ * @param offset the message offset being extended
+ * @param popTime the new pop time (current time)
+ * @param extraInfo the extra info from the original pop request
+ * @return a future that completes with {@code true} on success
+ */
private CompletableFuture
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * @param result the operation result containing the prepared message
+ */
private void deletePrepareMessage(OperationResult result) {
if (null == result || null == result.getPrepareMessage()) {
LOGGER.error("deletePrepareMessage param error, result is null or prepareMessage is null");
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/LiteSubscriptionCtlProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/LiteSubscriptionCtlProcessor.java
index bcf0df41270..1a72f5a7b2c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/LiteSubscriptionCtlProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/LiteSubscriptionCtlProcessor.java
@@ -37,6 +37,26 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Handles subscription control requests for Lite Topics, dispatched to
+ * {@link LiteSubscriptionRegistry} which manages client → topic → lmq set
+ * mappings.
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ */
private void scanGarbage() {
Iterator
+ *
+ *
+ *
+ *
+ *
+ * @param reviveQid revive queue id (used only for logging)
+ * @param ackMsg the ack message from the consumer
+ * @return true if the ack was merged successfully
+ */
public boolean addAk(int reviveQid, AckMsg ackMsg) {
+ // validate env
if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {
return false;
}
if (!serving) {
return false;
}
+
try {
+ // get and validate checkpoint
PopCheckPointWrapper pointWrapper = this.buffer.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName());
if (pointWrapper == null) {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
@@ -568,7 +768,8 @@ public boolean addAk(int reviveQid, AckMsg ackMsg) {
return false;
}
- if (ackMsg instanceof BatchAckMsg) {
+ // merge ackMsg with checkpoint
+ if (ackMsg instanceof BatchAckMsg) { // merge batch ackMsg
for (Long ackOffset : ((BatchAckMsg) ackMsg).getAckOffsetList()) {
int indexOfAck = point.indexOfAck(ackOffset);
if (indexOfAck > -1) {
@@ -577,7 +778,7 @@ public boolean addAk(int reviveQid, AckMsg ackMsg) {
POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point);
}
}
- } else {
+ } else { // merge ackMsg
int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
if (indexOfAck > -1) {
markBitCAS(pointWrapper.getBits(), indexOfAck);
@@ -587,6 +788,7 @@ public boolean addAk(int reviveQid, AckMsg ackMsg) {
}
}
+ // logging
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]add ack, rqId={}, {}, {}", reviveQid, pointWrapper, ackMsg);
}
@@ -608,6 +810,12 @@ public void clearOffsetQueue(String lockKey) {
this.commitOffsets.remove(lockKey);
}
+ /**
+ * write message(checkpoint) to revive topic, then update pointWrapper related info.
+ *
+ * @param pointWrapper checkpoint
+ * @param runInCurrent async or sync
+ */
private void putCkToStore(final PopCheckPointWrapper pointWrapper, final boolean runInCurrent) {
if (pointWrapper.getReviveQueueOffset() >= 0) {
return;
@@ -617,6 +825,7 @@ private void putCkToStore(final PopCheckPointWrapper pointWrapper, final boolean
// Indicates that ck message is storing
pointWrapper.setReviveQueueOffset(Long.MAX_VALUE);
+ // default value of isAppendCkAsync is false
if (brokerController.getBrokerConfig().isAppendCkAsync() && runInCurrent) {
brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult -> {
handleCkMessagePutResult(putMessageResult, pointWrapper);
@@ -655,7 +864,21 @@ private void handleCkMessagePutResult(PutMessageResult putMessageResult, final P
}
}
+ /**
+ * Persist message which created by checkpoint to the revive topic.
+ *
+ *
+ *
+ *
+ * @param pointWrapper the checkpoint wrapper containing the original CK
+ * @param msgIndex the sub-message index within the CK batch to ack
+ * @param count atomic counter incremented on successful persistence
+ */
private void putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgIndex, AtomicInteger count) {
+ // build ackMsg and Message by checkpoint
PopCheckPoint point = pointWrapper.getCk();
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
final AckMsg ackMsg = new AckMsg();
@@ -679,7 +902,8 @@ private void putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgInde
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
- if (brokerController.getBrokerConfig().isAppendAckAsync()) {
+ // store message then change store status of the checkpoint
+ if (brokerController.getBrokerConfig().isAppendAckAsync()) { // default value is false
brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult -> {
handleAckPutMessageResult(ackMsg, putMessageResult, pointWrapper, count, msgIndex);
}).exceptionally(throwable -> {
@@ -687,11 +911,22 @@ private void putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgInde
return null;
});
} else {
+ // store message
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+ // change store status of the checkpoint
handleAckPutMessageResult(ackMsg, putMessageResult, pointWrapper, count, msgIndex);
}
}
+ /**
+ * update store status of checkpoint if revive message stored successfully.
+ *
+ * @param ackMsg the ack message that was persisted
+ * @param putMessageResult the result returned by the store
+ * @param pointWrapper the checkpoint wrapper being processed
+ * @param count atomic counter incremented on success
+ * @param msgIndex the sub-message index that was persisted
+ */
private void handleAckPutMessageResult(AckMsg ackMsg, PutMessageResult putMessageResult,
PopCheckPointWrapper pointWrapper, AtomicInteger count, byte msgIndex) {
brokerController.getBrokerMetricsManager().getPopMetricsManager().incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus());
@@ -797,6 +1032,17 @@ private boolean cancelCkTimer(final PopCheckPointWrapper pointWrapper) {
return true;
}
+ /**
+ * Check whether all sub-messages in the checkpoint have been acked.
+ *
+ *
+ *
+ */
private volatile long reviveQueueOffset;
private final PopCheckPoint ck;
- // bit for concurrent
+ // store ack states of messages, one byte for each message
private final AtomicInteger bits;
- // bit for stored buffer ak
+ // bits for stored buffer ak, one byte for each message
private final AtomicInteger toStoreBits;
+ // nextOffset of original topic
private final long nextBeginOffset;
+ // topic@group@queueId
private final String lockKey;
+ // topic + group + queueId + startOffset + popTime + brokerName
private final String mergeKey;
+ /**
+ * Whether this checkpoint should be written to the revive topic directly.
+ *
+ *
+ *
+ *
+ * @see PopBufferMergeService#addCkJustOffset
+ * @see PopBufferMergeService#addCkMock
+ */
private final boolean justOffset;
+ // whether check point has stored in revive queue
private volatile boolean ckStored = false;
public PopCheckPointWrapper(int reviveQueueId, long reviveQueueOffset, PopCheckPoint point,
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 55cabe6f5e5..dfbe2f40bfd 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -33,6 +33,7 @@
import org.apache.rocketmq.broker.longpolling.PopRequest;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.broker.pop.PopConsumerContext;
+import org.apache.rocketmq.broker.pop.PopConsumerService;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
@@ -99,6 +100,24 @@
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT;
+/**
+ * Processes PopMessage requests from consumers.
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * @param ctx the Netty channel handler context
+ * @param request the incoming PopMessage request
+ * @return the response, or {@code null} if the response is sent asynchronously
+ * (zero-copy path or long-polling suspension)
+ * @throws RemotingCommandException if the request cannot be decoded
+ */
@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
+ // init request and response
final long beginTimeMills = this.brokerController.getMessageStore().now();
Channel channel = ctx.channel();
+
RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
response.setOpaque(request.getOpaque());
@@ -235,6 +277,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
}
final PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader();
+ // validation
// Pop mode only supports consumption in cluster load balancing mode
brokerController.getConsumerManager().compensateBasicConsumerInfo(
requestHeader.getConsumerGroup(), ConsumeType.CONSUME_POP, MessageModel.CLUSTERING);
@@ -314,6 +357,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
return response;
}
+ // init filter
BrokerConfig brokerConfig = brokerController.getBrokerConfig();
SubscriptionData subscriptionData = null;
ExpressionMessageFilter messageFilter = null;
@@ -377,6 +421,9 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
ExpressionMessageFilter finalMessageFilter = messageFilter;
SubscriptionData finalSubscriptionData = subscriptionData;
+ // There are two type of ack mode:
+ // 1. ack by KV service
+ // 2. ack by file merge service, default mode
if (brokerConfig.isPopConsumerKVServiceEnable()) {
CompletableFuture
+ *
+ *
+ * @param topicConfig topic configuration; {@code null} skips all queues
+ * @param isRetry whether the topic is a retry topic
+ * @param getMessageResult accumulator for the messages popped so far
+ * @param requestHeader pop request parameters
+ * @param reviveQid revive queue id
+ * @param channel netty channel of the requesting client
+ * @param popTime pop timestamp
+ * @param messageFilter expression filter applied to each message
+ * @param startOffsetInfo buffer for offset tracing info
+ * @param msgOffsetInfo buffer for per-message offset tracing info
+ * @param orderCountInfo buffer for order-consume count info
+ * @param randomQ random queue offset for round-robin load balancing
+ * @param getMessageFuture future that carries the remaining message count
+ * @return a future completing with the remaining number of messages needed
+ */
private CompletableFuture
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * @param consumeReviveObj the mutable container that receives the collected
+ * CKs and the computed {@code endTime}
+ */
protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
+ // init context parameters
HashMap
+ *
+ *
+ *
+ *
+ *
+ * @param popCheckPoint the checkpoint whose un-acked messages should be revived
+ */
private void reviveMsgFromCk(PopCheckPoint popCheckPoint) {
+ // env check and init
if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip retry, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
return;
}
inflightReviveRequestMap.put(popCheckPoint, new Pair<>(System.currentTimeMillis(), false));
List
+ *
+ *
+ *
+ *
+ */
@Override
public void run() {
int slow = 1;
while (!this.isStopped()) {
try {
+ // env check
if (System.currentTimeMillis() < brokerController.getShouldStartTime()) {
POP_LOGGER.info("PopReviveService Ready to run after {}", brokerController.getShouldStartTime());
this.waitForRunning(1000);
@@ -676,6 +903,8 @@ public void run() {
}
POP_LOGGER.info("start revive topic={}, reviveQueueId={}", reviveTopic, queueId);
+
+ // consume revive message
ConsumeReviveObj consumeReviveObj = new ConsumeReviveObj();
consumeReviveMessage(consumeReviveObj);
@@ -684,8 +913,10 @@ public void run() {
continue;
}
+ // merge checkpoint and ackMsg then revive
mergeAndRevive(consumeReviveObj);
+ // wait and logging
ArrayList
+ *
+ *
+ * @param msgInner the original transactional message
+ * @return the transformed half message
+ */
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
+ // set transactionId
String uniqId = msgInner.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqId != null && !uniqId.isEmpty()) {
MessageAccessor.putProperty(msgInner, TransactionalMessageUtil.TRANSACTION_ID, uniqId);
}
+
+ // store real topic and queueId to properties
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
+
+ // Clears the transaction sys-flag to prevent re-interception
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
+
+ // set transactional topic
+ // 1. TopicValidator.RMQ_SYS_ROCKSDB_TRANS_HALF_TOPIC if rocksdb enable
+ // 2. TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC
if (null != store.getMessageStoreConfig() && store.getMessageStoreConfig().isTransRocksDBEnable() && !store.getMessageStoreConfig().isTransWriteOriginTransHalfEnable()) {
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopicForRocksDB());
} else {
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
}
+
+ // set queueId and propertiesString
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
@@ -260,6 +290,19 @@ public boolean putMessage(MessageExtBrokerInner messageInner) {
}
}
+ /**
+ * Renew a half message and set property MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET if not exists
+ *
+ *
+ *
+ *
+ * @param transactionTimeout the transaction timeout in milliseconds
+ * @param transactionCheckMax maximum number of times to check a transaction
+ * @param listener callback for resolved or discarded messages
+ */
@Override
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
+ // fetch message queues of the half-message topic, one queue by default
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set
+ *
+ *
+ *
*
* @param removeMap Op message map to determine whether a half message was responded by producer.
* @param doneOpOffset Op Message which has been checked.
* @param msgExt Half message
- * @return Return true if put success, otherwise return false.
+ * @return true if the message can be skipped (completed or re-queued),
+ * false if the offset is illegal
*/
private boolean checkPrepareQueueOffset(HashMap
+ *
+ *
+ *
+ *
+ *
+ * @return non-null {@link PutMessageResult} if the message was rejected
+ */
public static PutMessageResult handleScheduleMessage(BrokerController brokerController,
final MessageExtBrokerInner msg) {
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
+ // normal message or committed message can be delayed
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
+ // is timer topic
if (!isRolledTimerMessage(msg)) {
+ // double check, has delay level or, is timer topic and has delivery time
if (checkIfTimerMessage(msg)) {
if (!brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
//wheel timer is not enabled, reject the message
@@ -144,7 +196,7 @@ public static PutMessageResult handleScheduleMessage(BrokerController brokerCont
}
}
}
- // Delay Delivery
+ // Delay Delivery, useless with default config
if (msg.getDelayTimeLevel() > 0) {
transformDelayLevelMessage(brokerController, msg);
}
@@ -152,6 +204,13 @@ public static PutMessageResult handleScheduleMessage(BrokerController brokerCont
return null;
}
+ /**
+ * Enforce Light Message Queue (LMQ) quota: reject the message if the
+ * number of LMQ consume queues would exceed the configured maximum and
+ * the target queue does not already exist.
+ *
+ * @return null if the check passes, or a rejection {@link PutMessageResult}
+ */
public static PutMessageResult handleLmqQuota(BrokerController brokerController, final MessageExtBrokerInner msg) {
if (!brokerController.getMessageStoreConfig().isEnableLmqQuota()
|| !brokerController.getMessageStoreConfig().isEnableLmq()
@@ -201,10 +260,34 @@ public static boolean checkIfTimerMessage(MessageExtBrokerInner msg) {
return null != msg.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS) || null != msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS) || null != msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC);
}
+ /**
+ * Transform a timer message and redirect it to the timer wheel topic.
+ *
+ *
+ *
+ *
+ * @param brokerController the broker controller
+ * @param msg the message to transform
+ * @return a non-null {@link PutMessageResult} if the message is rejected
+ */
private static PutMessageResult transformTimerMessage(BrokerController brokerController,
MessageExtBrokerInner msg) {
//do transform
int delayLevel = msg.getDelayTimeLevel();
+
+ // calculate deliver time
long deliverMs;
try {
if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) {
@@ -217,21 +300,28 @@ private static PutMessageResult transformTimerMessage(BrokerController brokerCon
} catch (Exception e) {
return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);
}
+
if (deliverMs > System.currentTimeMillis()) {
+ // default value of timerMaxDelaySec is 3600 * 24 * 3
if (delayLevel <= 0 && deliverMs - System.currentTimeMillis() > brokerController.getMessageStoreConfig().getTimerMaxDelaySec() * 1000L) {
return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);
}
+ // precision operation
int timerPrecisionMs = brokerController.getMessageStoreConfig().getTimerPrecisionMs();
if (deliverMs % timerPrecisionMs == 0) {
+ // Exactly on boundary → move one tick earlier
deliverMs -= timerPrecisionMs;
} else {
+ // Not on boundary → round down to nearest tick
deliverMs = deliverMs / timerPrecisionMs * timerPrecisionMs;
}
+ // flow control, always skip with default config
if (brokerController.getTimerMessageStore().isReject(deliverMs)) {
return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_FLOW_CONTROL, null);
}
+
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TIMER_OUT_MS, deliverMs + "");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
@@ -259,6 +349,14 @@ public static void transformDelayLevelMessage(BrokerController brokerController,
msg.setQueueId(ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()));
}
+ /**
+ * Forward messages to another broker (typically the retry / dead-letter
+ * queue destination). Used as the {@link SendMessageBackHook} implementation.
+ *
+ *
+ *
+ */
public class GrpcMessagingApplication extends MessagingServiceGrpc.MessagingServiceImplBase implements StartAndShutdown {
private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -168,6 +177,16 @@ protected Status convertExceptionToStatus(Throwable t) {
return ResponseBuilder.getInstance().buildStatus(t);
}
+ /**
+ * submit grpc task to related thread pool.
+ *
+ * @param executor thread pool
+ * @param context context
+ * @param request grpc request
+ * @param runnable process task
+ * @param responseObserver grpc response observer
+ * @param statusResponseCreator error response creator
+ */
protected
+ *
+ */
@Override
public StreamObserver> sendMessage(ProxyContext ctx, QueueSelector queueSelector,
String producerGroup, int sysFlag, List
> future = new CompletableFuture<>();
@@ -96,6 +104,7 @@ public CompletableFuture
> sendMessage(ProxyContext ctx, QueueSe
SendMessageRequestHeader requestHeader = buildSendMessageRequestHeader(messageList, producerGroup, sysFlag, messageQueue.getQueueId());
AddressableMessageQueue finalMessageQueue = messageQueue;
+ // call SendMessageProcessor of broker
future = this.serviceManager.getMessageService().sendMessage(
ctx,
messageQueue,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index bc3730aed9a..555b78f1906 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -29,17 +29,50 @@
import org.apache.rocketmq.proxy.service.ServiceManager;
import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager;
+/**
+ * Bridges receipt handle renewal events to the messaging processor.
+ *
+ *
> sendMessage(
ProxyContext ctx,
AddressableMessageQueue messageQueue,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index f9dfd825337..9f168a3128b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -60,6 +60,18 @@
import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+/**
+ * Manages receipt handles for gRPC proxy auto-renewal of message visibility timeouts.
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * @param context the proxy context
+ * @param key the receipt handle group key
+ * @param messageReceiptHandle the handle to renew
+ * @return a future completing with the updated handle (or {@code null} if
+ * renewal is stopped)
+ */
protected CompletableFuture
+ *
+ *
+ * @param msg the message to write
+ * @return a future that completes with the append result
+ */
public CompletableFuture
+ *
+ *
+ * @param group consumer group
+ * @param topic topic name
+ * @param queueId queue id
+ * @param offset starting offset in the consume queue
+ * @param maxMsgNums maximum number of messages to return
+ * @param maxTotalMsgSize maximum total message body size
+ * @param messageFilter message filter (maybe null)
+ * @return the pull result with status, messages, and next offset
+ */
@Override
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums, final int maxTotalMsgSize, final MessageFilter messageFilter) {
@@ -876,6 +925,7 @@ public GetMessageResult getMessage(final String group, final String topic, final
return null;
}
+ // try to get from compaction store
Optionalfalse).
+ *
+ *
+ *
+ * @param offset physical offset to find
+ * @param returnFirstOnNotFound if {@code true}, returns the first mapped
+ * file when the offset is outside the range
+ * @return the mapped file, or {@code null} if not found and
+ * {@code returnFirstOnNotFound} is {@code false}
*/
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
MappedFile firstMappedFile = this.getFirstMappedFile();
MappedFile lastMappedFile = this.getLastMappedFile();
if (firstMappedFile != null && lastMappedFile != null) {
+ // offset is not in range of [firstOffset, lastOffset]
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
offset,
@@ -705,6 +718,7 @@ public MappedFile findMappedFileByOffset(final long offset, final boolean return
this.mappedFileSize,
this.mappedFiles.size());
} else {
+ // get file by index
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
MappedFile targetFile = null;
try {
@@ -717,6 +731,7 @@ public MappedFile findMappedFileByOffset(final long offset, final boolean return
return targetFile;
}
+ // iterate to find file
for (MappedFile tmpMappedFile : this.mappedFiles) {
if (offset >= tmpMappedFile.getFileFromOffset()
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
index 5c38cfe92a9..b96dfd98882 100644
--- a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
@@ -19,6 +19,12 @@
import java.nio.ByteBuffer;
import org.apache.rocketmq.store.logfile.MappedFile;
+/**
+ * result while select mapped file
+ * - mapped file
+ * - offset and size
+ * - whether it is in memory
+ */
public class SelectMappedBufferResult {
private final long startOffset;
diff --git a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
index 803ebc68957..e4ed5c085e8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
@@ -21,24 +21,62 @@
import java.util.ArrayList;
import java.util.List;
+/**
+ * state check info for multi-messages pop from consume queue
+ */
public class PopCheckPoint implements Comparable
+ *
+ * The database directory is {@code ${storePathRootDir}/rocksdbstore}.
+ */
public class MessageRocksDBStorage extends AbstractRocksDBStorage {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final Logger logError = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
private static final String ROCKSDB_MESSAGE_DIRECTORY = "rocksdbstore";
+ // Column family identifiers
public static final byte[] TIMER_COLUMN_FAMILY = "timer".getBytes(StandardCharsets.UTF_8);
public static final byte[] TRANS_COLUMN_FAMILY = "trans".getBytes(StandardCharsets.UTF_8);
+
+ // Metadata keys stored inside each column family
private static final byte[] LAST_OFFSET_PY = "lastOffsetPy".getBytes(StandardCharsets.UTF_8);
private static final byte[] LAST_STORE_TIMESTAMP = "lastStoreTimeStamp".getBytes(StandardCharsets.UTF_8);
+
+ // Suffix filled with 0xFF for range-delete upper bound
private static final byte[] END_SUFFIX_BYTES = new byte[512];
static {
Arrays.fill(END_SUFFIX_BYTES, (byte) 0xFF);
}
+
+ // Allowed checkpoint keys for the timer column family
private static final Set
+ *
+ *
+ * @param deliverTimeMs the target delivery timestamp
+ * @return {@code true} if the message should be rejected
+ */
public boolean isReject(long deliverTimeMs) {
long congestNum = timerWheel.getNum(deliverTimeMs);
if (congestNum <= storeConfig.getTimerCongestNumEachSlot()) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/transaction/TransMessageRocksDBStore.java b/store/src/main/java/org/apache/rocketmq/store/transaction/TransMessageRocksDBStore.java
index 4166f2a3077..6816e93dfad 100644
--- a/store/src/main/java/org/apache/rocketmq/store/transaction/TransMessageRocksDBStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/transaction/TransMessageRocksDBStore.java
@@ -48,6 +48,20 @@
import org.rocksdb.RocksDBException;
import static org.apache.rocketmq.store.rocksdb.MessageRocksDBStorage.TRANS_COLUMN_FAMILY;
+/**
+ * RocksDB-based transactional half-message index store.
+ *
+ *
+ * Value format: {@code [checkTimes (int) | sizePy (int)]} — 8 bytes total
+ *
+ *
+ *
+ */
public class TransRocksDBRecord {
private static final Logger logError = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
public static final int VALUE_LENGTH = Integer.BYTES + Integer.BYTES;
private static final String KEY_SPLIT = "@";
+
+ // CommitLog physical offset prefix — primary sort key for RocksDB ordering
protected long offsetPy;
+ // Real business topic of the transaction
private String topic;
+ // Unique transaction ID (UniqID / transactionId)
private String uniqKey;
+ // How many times the transaction checker has probed this record
private int checkTimes = 0;
+ // Message body size in CommitLog, used to read the original message on check
private int sizePy;
+ // True if this record came from an OP message (commit/rollback tombstone)
private boolean isOp;
+ // True if the record should be deleted after exceeding max check times
private boolean delete;
+ // Transient holder for the deserialized MessageExt, not persisted
private MessageExt messageExt;
+ /**
+ * Create a half record for initial indexing from CommitLog dispatch.
+ *
+ * @param offsetPy CommitLog phy offset
+ * @param topic original business topic
+ * @param uniqKey transaction ID
+ * @param sizePy message body size in CommitLog
+ * @param checkTimes initial check counter (usually 0)
+ */
public TransRocksDBRecord(long offsetPy, String topic, String uniqKey, int sizePy, int checkTimes) {
this.offsetPy = offsetPy;
this.topic = topic;
@@ -44,6 +77,12 @@ public TransRocksDBRecord(long offsetPy, String topic, String uniqKey, int sizeP
this.checkTimes = checkTimes;
}
+ /**
+ * Create an OP record referencing a half record by its offsetPy.
+ *
+ *
+ * Value layout: {@code [checkTimes (4 bytes) | sizePy (4 bytes)]}
+ */
public static TransRocksDBRecord decode(byte[] key, byte[] value) {
if (null == key || key.length <= Long.BYTES || null == value || value.length != VALUE_LENGTH) {
logError.error("TransRocksDBRecord decode param error, key: {}, value: {}", key, value);