Skip to content

Commit 2c1940d

Browse files
committed
OffsetOption support for LiteSubscription
Change-Id: If8174ba1e35c899e8fa9067923fea7ecb69291e1
1 parent 324bb30 commit 2c1940d

File tree

12 files changed

+329
-56
lines changed

12 files changed

+329
-56
lines changed

WORKSPACE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ maven_install(
7171
"org.bouncycastle:bcpkix-jdk15on:1.69",
7272
"com.google.code.gson:gson:2.8.9",
7373
"com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2",
74-
"org.apache.rocketmq:rocketmq-proto:2.1.0",
74+
"org.apache.rocketmq:rocketmq-proto:2.1.1",
7575
"com.google.protobuf:protobuf-java:3.20.1",
7676
"com.google.protobuf:protobuf-java-util:3.20.1",
7777
"com.conversantmedia:disruptor:1.2.10",

broker/src/main/java/org/apache/rocketmq/broker/lite/LiteCtlListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public interface LiteCtlListener {
2121

2222
void onRegister(String clientId, String group, String lmqName);
2323

24-
void onUnregister(String clientId, String group, String lmqName, boolean resetOffset);
24+
void onUnregister(String clientId, String group, String lmqName);
2525

2626
void onRemoveAll(String clientId, String group);
2727

broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -491,15 +491,7 @@ public void onRegister(String clientId, String group, String lmqName) {
491491
}
492492

493493
@Override
494-
public void onUnregister(String clientId, String group, String lmqName, boolean resetOffset) {
495-
if (resetOffset) {
496-
long consumerOffset = consumerOffsetManager.queryOffset(group, lmqName, 0);
497-
LOGGER.info("unregister and reset offset. {}, {}, {}, {}", group, clientId, lmqName, consumerOffset);
498-
if (consumerOffset > 0) {
499-
consumerOffsetManager.assignResetOffset(lmqName, group, 0, 0);
500-
consumerOrderInfoManager.remove(lmqName, group);
501-
}
502-
}
494+
public void onUnregister(String clientId, String group, String lmqName) {
503495
}
504496

505497
/**

broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistry.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Set;
2424
import org.apache.rocketmq.common.entity.ClientGroup;
2525
import org.apache.rocketmq.common.lite.LiteSubscription;
26+
import org.apache.rocketmq.common.lite.OffsetOption;
2627

2728
public interface LiteSubscriptionRegistry {
2829

@@ -32,7 +33,7 @@ public interface LiteSubscriptionRegistry {
3233

3334
int getActiveSubscriptionNum();
3435

35-
void addPartialSubscription(String clientId, String group, String topic, Set<String> lmqNameSet, boolean exclusive);
36+
void addPartialSubscription(String clientId, String group, String topic, Set<String> lmqNameSet, OffsetOption offsetOption);
3637

3738
void removePartialSubscription(String clientId, String group, String topic, Set<String> lmqNameSet);
3839

broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImpl.java

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.rocketmq.common.entity.ClientGroup;
3636
import org.apache.rocketmq.common.lite.LiteSubscription;
3737
import org.apache.rocketmq.common.lite.LiteUtil;
38+
import org.apache.rocketmq.common.lite.OffsetOption;
3839
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
3940
import org.apache.rocketmq.logging.org.slf4j.Logger;
4041
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -68,7 +69,7 @@ public void updateClientChannel(String clientId, Channel channel) {
6869

6970
@Override
7071
public void addPartialSubscription(String clientId, String group, String topic, Set<String> lmqNameSet,
71-
boolean exclusive) {
72+
OffsetOption offsetOption) {
7273
long maxCount = brokerController.getBrokerConfig().getMaxLiteSubscriptionCount();
7374
if (getActiveSubscriptionNum() >= maxCount) {
7475
// No need to check existence, if reach here, it must be new.
@@ -84,9 +85,10 @@ public void addPartialSubscription(String clientId, String group, String topic,
8485
}
8586
thisSub.addLiteTopic(lmqName);
8687
// First remove the old subscription
87-
if (exclusive) {
88+
if (LiteMetadataUtil.isSubLiteExclusive(group, brokerController)) {
8889
excludeClientByLmqName(clientId, group, lmqName);
8990
}
91+
resetOffset(lmqName, group, clientId, offsetOption);
9092
addTopicGroup(clientGroup, lmqName);
9193
}
9294
}
@@ -198,7 +200,11 @@ protected void removeTopicGroup(ClientGroup clientGroup, String lmqName, boolean
198200
if (topicGroupSet.remove(clientGroup)) {
199201
activeNum.decrementAndGet();
200202
for (LiteCtlListener listener : listeners) {
201-
listener.onUnregister(clientGroup.clientId, clientGroup.group, lmqName, resetOffset);
203+
listener.onUnregister(clientGroup.clientId, clientGroup.group, lmqName);
204+
}
205+
if (resetOffset) {
206+
resetOffset(lmqName, clientGroup.group, clientGroup.clientId,
207+
new OffsetOption(OffsetOption.Type.POLICY, OffsetOption.POLICY_MIN_VALUE));
202208
}
203209
}
204210
if (topicGroupSet.isEmpty()) {
@@ -235,12 +241,12 @@ protected void excludeClientByLmqName(String newClientId, String group, String l
235241
* Notify the client to remove the liteTopic subscription from its local memory
236242
*/
237243
private void notifyUnsubscribeLite(String clientId, String group, String lmqName) {
238-
String parentTopic = LiteUtil.getParentTopic(lmqName);
244+
String topic = LiteUtil.getParentTopic(lmqName);
239245
String liteTopic = LiteUtil.getLiteTopic(lmqName);
240246
Channel channel = clientChannels.get(clientId);
241247
if (channel == null) {
242248
LOGGER.warn("notifyUnsubscribeLite but channel is null, liteTopic:{}, group:{}, topic:{}, clientId:{},",
243-
liteTopic, group, parentTopic, clientId);
249+
liteTopic, group, topic, clientId);
244250
return;
245251
}
246252

@@ -249,7 +255,7 @@ private void notifyUnsubscribeLite(String clientId, String group, String lmqName
249255
header.setConsumerGroup(group);
250256
header.setLiteTopic(liteTopic);
251257
brokerController.getBroker2Client().notifyUnsubscribeLite(channel, header);
252-
LOGGER.info("notifyUnsubscribeLite liteTopic:{}, group:{}, topic:{}, clientId:{}", liteTopic, group, parentTopic, clientId);
258+
LOGGER.info("notifyUnsubscribeLite liteTopic:{}, group:{}, topic:{}, clientId:{}", liteTopic, group, topic, clientId);
253259
}
254260

255261
@Override
@@ -270,6 +276,41 @@ public List<String> getAllClientIdByGroup(String group) {
270276
.collect(Collectors.toList());
271277
}
272278

279+
protected void resetOffset(String lmqName, String group, String clientId, OffsetOption offsetOption) {
280+
if (null == offsetOption) {
281+
return;
282+
}
283+
Long targetOffset = null;
284+
long currentOffset = brokerController.getConsumerOffsetManager().queryOffset(group, lmqName, 0);
285+
switch (offsetOption.getType()) {
286+
case POLICY:
287+
if (OffsetOption.POLICY_MIN_VALUE == offsetOption.getValue()) {
288+
targetOffset = 0L;
289+
} else if (OffsetOption.POLICY_MAX_VALUE == offsetOption.getValue()) {
290+
targetOffset = liteLifecycleManager.getMaxOffsetInQueue(lmqName);
291+
}
292+
break;
293+
case OFFSET:
294+
targetOffset = offsetOption.getValue();
295+
break;
296+
case TAIL_N:
297+
if (currentOffset >= 0) { // only when consumer offset exists
298+
targetOffset = Math.max(0L, currentOffset - offsetOption.getValue());
299+
}
300+
break;
301+
case TIMESTAMP:
302+
// timestamp option is disabled silently for now
303+
break;
304+
}
305+
306+
LOGGER.info("try to reset lite offset. {}, {}, {}, {}, current:{}, target:{}",
307+
group, lmqName, clientId, offsetOption, currentOffset, targetOffset);
308+
if (targetOffset != null && currentOffset != targetOffset) {
309+
brokerController.getConsumerOffsetManager().assignResetOffset(lmqName, group, 0, targetOffset);
310+
brokerController.getPopLiteMessageProcessor().getConsumerOrderInfoManager().remove(lmqName, group);
311+
}
312+
}
313+
273314
private LiteSubscription getOrCreateLiteSubscription(String clientId, String group, String topic) {
274315
LiteSubscription curLiteSubscription = ConcurrentHashMapUtils.computeIfAbsent(client2Subscription, clientId,
275316
k -> new LiteSubscription().setGroup(group).setTopic(topic));

broker/src/main/java/org/apache/rocketmq/broker/processor/LiteSubscriptionCtlProcessor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
8686
case PARTIAL_ADD:
8787
checkConsumeEnable(group);
8888
this.liteSubscriptionRegistry.updateClientChannel(clientId, ctx.channel());
89-
boolean isExclusive = LiteMetadataUtil.isSubLiteExclusive(group, brokerController);
90-
this.liteSubscriptionRegistry.addPartialSubscription(clientId, group, topic, lmqNameSet, isExclusive);
89+
this.liteSubscriptionRegistry.addPartialSubscription(clientId, group, topic, lmqNameSet, entry.getOffsetOption());
9190
break;
9291
case PARTIAL_REMOVE:
9392
this.liteSubscriptionRegistry.removePartialSubscription(clientId, group, topic, lmqNameSet);

0 commit comments

Comments
 (0)