Skip to content

Commit 324bb30

Browse files
committed
rebase onto develop
Change-Id: I360bfb5d2ee12d2f5cc7a72343872c12c1bc5bd5
1 parent 3da0f42 commit 324bb30

File tree

14 files changed

+96
-87
lines changed

14 files changed

+96
-87
lines changed

broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.broker.config.v1;
1818

19-
import com.alibaba.fastjson.JSON;
19+
import com.alibaba.fastjson2.JSON;
2020
import com.alibaba.fastjson2.JSONWriter;
2121
import java.nio.file.Path;
2222
import java.nio.file.Paths;

broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
package org.apache.rocketmq.broker.lite;
1919

2020
import com.google.common.collect.Sets;
21+
import java.util.Collections;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.Set;
2125
import org.apache.rocketmq.broker.BrokerController;
2226
import org.apache.rocketmq.common.MixAll;
2327
import org.apache.rocketmq.common.Pair;
@@ -28,11 +32,6 @@
2832
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
2933
import org.apache.rocketmq.store.MessageStore;
3034

31-
import java.util.Collections;
32-
import java.util.List;
33-
import java.util.Map;
34-
import java.util.Set;
35-
3635
import static org.apache.rocketmq.broker.offset.ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR;
3736

3837
/**
@@ -183,7 +182,7 @@ public void deleteLmq(String parentTopic, String lmqName) {
183182
String topicAtGroup = lmqName + TOPIC_GROUP_SEPARATOR + group;
184183
brokerController.getConsumerOffsetManager().getOffsetTable().remove(topicAtGroup);
185184
brokerController.getConsumerOffsetManager().removeConsumerOffset(topicAtGroup); // no iteration
186-
brokerController.getPopLiteMessageProcessor().getConsumerOrderInfoManager().getTable().remove(topicAtGroup);
185+
brokerController.getPopLiteMessageProcessor().getConsumerOrderInfoManager().remove(lmqName, group);
187186
});
188187
brokerController.getMessageStore().deleteTopics(Sets.newHashSet(lmqName));
189188
boolean sharding = brokerName.equals(liteSharding.shardingByLmqName(parentTopic, lmqName));

broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,6 @@
2020
import com.google.common.annotations.VisibleForTesting;
2121
import com.google.common.cache.Cache;
2222
import com.google.common.cache.CacheBuilder;
23-
import org.apache.commons.collections.CollectionUtils;
24-
import org.apache.rocketmq.broker.BrokerController;
25-
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
26-
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
27-
import org.apache.rocketmq.common.ServiceThread;
28-
import org.apache.rocketmq.common.constant.LoggerName;
29-
import org.apache.rocketmq.common.entity.ClientGroup;
30-
import org.apache.rocketmq.common.lite.LiteSubscription;
31-
import org.apache.rocketmq.common.lite.LiteUtil;
32-
import org.apache.rocketmq.logging.org.slf4j.Logger;
33-
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
34-
3523
import java.util.ArrayList;
3624
import java.util.Collections;
3725
import java.util.Comparator;
@@ -48,6 +36,17 @@
4836
import java.util.concurrent.LinkedBlockingQueue;
4937
import java.util.concurrent.ThreadLocalRandom;
5038
import java.util.concurrent.TimeUnit;
39+
import org.apache.commons.collections.CollectionUtils;
40+
import org.apache.rocketmq.broker.BrokerController;
41+
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
42+
import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
43+
import org.apache.rocketmq.common.ServiceThread;
44+
import org.apache.rocketmq.common.constant.LoggerName;
45+
import org.apache.rocketmq.common.entity.ClientGroup;
46+
import org.apache.rocketmq.common.lite.LiteSubscription;
47+
import org.apache.rocketmq.common.lite.LiteUtil;
48+
import org.apache.rocketmq.logging.org.slf4j.Logger;
49+
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
5150

5251
public class LiteEventDispatcher extends ServiceThread {
5352

@@ -498,7 +497,7 @@ public void onUnregister(String clientId, String group, String lmqName, boolean
498497
LOGGER.info("unregister and reset offset. {}, {}, {}, {}", group, clientId, lmqName, consumerOffset);
499498
if (consumerOffset > 0) {
500499
consumerOffsetManager.assignResetOffset(lmqName, group, 0, 0);
501-
consumerOrderInfoManager.getTable().remove(lmqName + "@" + group); // may cause race condition.
500+
consumerOrderInfoManager.remove(lmqName, group);
502501
}
503502
}
504503
}

broker/src/main/java/org/apache/rocketmq/broker/offset/MemoryConsumerOrderInfoManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.rocketmq.broker.offset;
1919

2020
import org.apache.rocketmq.broker.BrokerController;
21+
import org.apache.rocketmq.broker.pop.orderly.QueueLevelConsumerManager;
2122

2223
/**
2324
* Memory-based Consumer Order Information Manager for Lite Topics
@@ -30,7 +31,7 @@
3031
* <p>
3132
* We may make structural adjustments and optimizations to reduce overhead and memory footprint.
3233
*/
33-
public class MemoryConsumerOrderInfoManager extends ConsumerOrderInfoManager {
34+
public class MemoryConsumerOrderInfoManager extends QueueLevelConsumerManager {
3435

3536
public MemoryConsumerOrderInfoManager(BrokerController brokerController) {
3637
super(brokerController);

broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManager.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,20 @@ void update(String attemptId, boolean isRetry, String topic, String group, int q
6868
*/
6969
boolean checkBlock(String attemptId, String topic, String group, int queueId, long invisibleTime);
7070

71+
/**
72+
* Remove the specified topic and group
73+
* Usually called during topic deletion
74+
*
75+
* @param topic Topic name
76+
* @param group Consumer group name
77+
*/
78+
void remove(String topic, String group);
79+
80+
/**
81+
* Get order info count
82+
*/
83+
int getOrderInfoCount();
84+
7185
/**
7286
* Commit message and calculate next consumption offset
7387
* Called when consumer ACKs messages

broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerManager.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,16 @@ public void clearBlock(String topic, String group, int queueId) {
179179
});
180180
}
181181

182+
@Override
183+
public void remove(String topic, String group) {
184+
table.remove(buildKey(topic, group));
185+
}
186+
187+
@Override
188+
public int getOrderInfoCount() {
189+
return table.size();
190+
}
191+
182192
@Override
183193
public OrderedConsumptionLevel getOrderedConsumptionLevel() {
184194
return OrderedConsumptionLevel.QUEUE;
@@ -383,7 +393,7 @@ public CompletableFuture<GetMessageResult> getAvailableMessageResult(String atte
383393
}
384394

385395
@VisibleForTesting
386-
QueueLevelConsumerOrderInfoLockManager getConsumerOrderInfoLockManager() {
396+
protected QueueLevelConsumerOrderInfoLockManager getConsumerOrderInfoLockManager() {
387397
return queueLevelConsumerOrderInfoLockManager;
388398
}
389399

broker/src/main/java/org/apache/rocketmq/broker/processor/LiteManagerProcessor.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
import com.google.common.annotations.VisibleForTesting;
2121
import io.netty.channel.ChannelHandlerContext;
22+
import java.util.HashSet;
2223
import java.util.List;
23-
24+
import java.util.Map;
25+
import java.util.Set;
2426
import org.apache.commons.lang3.StringUtils;
2527
import org.apache.rocketmq.broker.BrokerController;
2628
import org.apache.rocketmq.broker.lite.AbstractLiteLifecycleManager;
@@ -55,10 +57,6 @@
5557
import org.apache.rocketmq.remoting.protocol.header.TriggerLiteDispatchRequestHeader;
5658
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
5759

58-
import java.util.HashSet;
59-
import java.util.Map;
60-
import java.util.Set;
61-
6260
public class LiteManagerProcessor implements NettyRequestProcessor {
6361
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LITE_LOGGER_NAME);
6462

@@ -105,7 +103,7 @@ protected RemotingCommand getBrokerLiteInfo(ChannelHandlerContext ctx,
105103
body.setMaxLmqNum(brokerController.getMessageStoreConfig().getMaxLmqConsumeQueueNum());
106104
body.setCurrentLmqNum(brokerController.getMessageStore().getQueueStore().getLmqNum());
107105
body.setLiteSubscriptionCount(brokerController.getLiteSubscriptionRegistry().getActiveSubscriptionNum());
108-
body.setOrderInfoCount(brokerController.getPopLiteMessageProcessor().getConsumerOrderInfoManager().getTable().size());
106+
body.setOrderInfoCount(brokerController.getPopLiteMessageProcessor().getConsumerOrderInfoManager().getOrderInfoCount());
109107
body.setCqTableSize(brokerController.getMessageStore().getQueueStore().getConsumeQueueTable().size());
110108
body.setOffsetTableSize(brokerController.getConsumerOffsetManager().getOffsetTable().size());
111109
body.setEventMapSize(brokerController.getLiteEventDispatcher().getEventMapSize());

broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,23 @@
2020
import io.netty.channel.Channel;
2121
import io.netty.channel.ChannelHandlerContext;
2222
import io.opentelemetry.api.common.Attributes;
23+
import java.nio.ByteBuffer;
24+
import java.util.Collections;
25+
import java.util.HashSet;
26+
import java.util.Iterator;
27+
import java.util.List;
28+
import java.util.Objects;
29+
import java.util.Set;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicLong;
2332
import org.apache.rocketmq.broker.BrokerController;
2433
import org.apache.rocketmq.broker.lite.LiteEventDispatcher;
2534
import org.apache.rocketmq.broker.longpolling.PollingResult;
2635
import org.apache.rocketmq.broker.longpolling.PopLiteLongPollingService;
2736
import org.apache.rocketmq.broker.metrics.LiteConsumerLagCalculator;
28-
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
2937
import org.apache.rocketmq.broker.offset.MemoryConsumerOrderInfoManager;
3038
import org.apache.rocketmq.broker.pop.PopConsumerLockService;
39+
import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
3140
import org.apache.rocketmq.common.KeyBuilder;
3241
import org.apache.rocketmq.common.MixAll;
3342
import org.apache.rocketmq.common.Pair;
@@ -55,16 +64,6 @@
5564
import org.apache.rocketmq.store.SelectMappedBufferResult;
5665
import org.apache.rocketmq.store.exception.ConsumeQueueException;
5766

58-
import java.nio.ByteBuffer;
59-
import java.util.Collections;
60-
import java.util.HashSet;
61-
import java.util.Iterator;
62-
import java.util.List;
63-
import java.util.Objects;
64-
import java.util.Set;
65-
import java.util.concurrent.TimeUnit;
66-
import java.util.concurrent.atomic.AtomicLong;
67-
6867
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
6968
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY;
7069
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
@@ -341,7 +340,7 @@ public Pair<StringBuilder, GetMessageResult> handleGetMessageResult(GetMessageRe
341340
StringBuilder orderCountInfo = new StringBuilder();
342341
if (GetMessageStatus.FOUND.equals(result.getStatus()) && !result.getMessageQueueOffset().isEmpty()) {
343342
consumerOrderInfoManager.update(attemptId, false, lmqName, group, 0,
344-
popTime, invisibleTime, result.getMessageQueueOffset(), orderCountInfo);
343+
popTime, invisibleTime, result.getMessageQueueOffset(), orderCountInfo, null);
345344
recordPopLiteMetrics(result, parentTopic, group);
346345
orderCountInfo = transformOrderCountInfo(orderCountInfo, result.getMessageCount());
347346
}

broker/src/test/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManagerTest.java

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,9 @@
2121
import java.util.List;
2222
import java.util.concurrent.ConcurrentHashMap;
2323
import java.util.concurrent.ConcurrentMap;
24-
2524
import org.apache.rocketmq.broker.BrokerController;
2625
import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
27-
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
26+
import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
2827
import org.apache.rocketmq.broker.processor.PopLiteMessageProcessor;
2928
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
3029
import org.apache.rocketmq.broker.topic.TopicConfigManager;
@@ -48,12 +47,12 @@
4847
import static org.apache.rocketmq.broker.offset.ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR;
4948
import static org.mockito.ArgumentMatchers.anyLong;
5049
import static org.mockito.ArgumentMatchers.anyString;
51-
import static org.mockito.Mockito.when;
5250
import static org.mockito.Mockito.anyInt;
53-
import static org.mockito.Mockito.verify;
54-
import static org.mockito.Mockito.times;
55-
import static org.mockito.Mockito.never;
5651
import static org.mockito.Mockito.atLeastOnce;
52+
import static org.mockito.Mockito.never;
53+
import static org.mockito.Mockito.times;
54+
import static org.mockito.Mockito.verify;
55+
import static org.mockito.Mockito.when;
5756

5857
@RunWith(MockitoJUnitRunner.class)
5958
public class AbstractLiteLifecycleManagerTest {
@@ -86,8 +85,6 @@ public class AbstractLiteLifecycleManagerTest {
8685
private final TopicConfig topicConfig = new TopicConfig(PARENT_TOPIC, 1, 1);
8786
private final SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
8887
private final ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable = new ConcurrentHashMap<>();
89-
private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumerOrderInfoManager.OrderInfo>>
90-
orderInfoTable = new ConcurrentHashMap<>();
9188

9289
@Before
9390
public void setUp() {
@@ -115,8 +112,7 @@ public void setUp() {
115112
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(groupTable);
116113

117114
when(consumerOffsetManager.getOffsetTable()).thenReturn(offsetTable);
118-
when(consumerOrderInfoManager.getTable()).thenReturn(orderInfoTable);
119-
115+
when(consumerOffsetManager.getPullOffsetTable()).thenReturn(offsetTable);
120116
TestLiteLifecycleManager testObject = new TestLiteLifecycleManager(brokerController, liteSharding);
121117
lifecycleManager = Mockito.spy(testObject);
122118
lifecycleManager.init();
@@ -127,7 +123,6 @@ public void reset() {
127123
topicConfig.getAttributes().clear();
128124
groupConfig.getAttributes().clear();
129125
offsetTable.clear();
130-
orderInfoTable.clear();
131126
}
132127

133128
@Test
@@ -201,29 +196,24 @@ public void testDeleteLmq() {
201196
String removeKey = EXIST_LMQ_NAME + TOPIC_GROUP_SEPARATOR + GROUP;
202197
offsetTable.put(otherKey, new ConcurrentHashMap<>());
203198
offsetTable.put(removeKey, new ConcurrentHashMap<>());
204-
orderInfoTable.put(otherKey, new ConcurrentHashMap<>());
205-
orderInfoTable.put(removeKey, new ConcurrentHashMap<>());
206199

207200
// sharding to this broker
208201
when(liteSharding.shardingByLmqName(PARENT_TOPIC, EXIST_LMQ_NAME)).thenReturn(brokerConfig.getBrokerName());
209202
lifecycleManager.deleteLmq(PARENT_TOPIC, EXIST_LMQ_NAME);
210203

211204
Assert.assertTrue(offsetTable.containsKey(otherKey));
212205
Assert.assertFalse(offsetTable.containsKey(removeKey));
213-
Assert.assertTrue(orderInfoTable.containsKey(otherKey));
214-
Assert.assertFalse(orderInfoTable.containsKey(removeKey));
215206
verify(consumerOffsetManager).removeConsumerOffset(removeKey);
216207
verify(messageStore).deleteTopics(Collections.singleton(EXIST_LMQ_NAME));
217208
verify(liteSubscriptionRegistry).cleanSubscription(EXIST_LMQ_NAME, false);
209+
verify(consumerOrderInfoManager, times(1)).remove(EXIST_LMQ_NAME, GROUP);
218210

219211
// not sharding to this broker
220212
when(liteSharding.shardingByLmqName(PARENT_TOPIC, EXIST_LMQ_NAME)).thenReturn("otherBrokerName");
221213
lifecycleManager.deleteLmq(PARENT_TOPIC, EXIST_LMQ_NAME);
222214

223215
Assert.assertTrue(offsetTable.containsKey(otherKey));
224216
Assert.assertFalse(offsetTable.containsKey(removeKey));
225-
Assert.assertTrue(orderInfoTable.containsKey(otherKey));
226-
Assert.assertFalse(orderInfoTable.containsKey(removeKey));
227217
verify(consumerOffsetManager, times(2)).removeConsumerOffset(removeKey);
228218
verify(messageStore, times(2)).deleteTopics(Collections.singleton(EXIST_LMQ_NAME));
229219
verify(liteSubscriptionRegistry, times(2)).cleanSubscription(EXIST_LMQ_NAME, false);
@@ -287,4 +277,4 @@ public List<String> collectByParentTopic(String parentTopic) {
287277
return PARENT_TOPIC.equals(parentTopic) ? Collections.singletonList(EXIST_LMQ_NAME) : Collections.emptyList();
288278
}
289279
}
290-
}
280+
}

broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,21 @@
1818
package org.apache.rocketmq.broker.lite;
1919

2020
import com.google.common.cache.Cache;
21+
import java.util.Arrays;
22+
import java.util.Collections;
23+
import java.util.HashSet;
24+
import java.util.Iterator;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Set;
28+
import java.util.UUID;
29+
import java.util.concurrent.ConcurrentMap;
30+
import java.util.concurrent.ConcurrentSkipListSet;
2131
import org.apache.commons.lang3.reflect.FieldUtils;
2232
import org.apache.rocketmq.broker.BrokerController;
2333
import org.apache.rocketmq.broker.longpolling.PopLiteLongPollingService;
2434
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
25-
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
35+
import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
2636
import org.apache.rocketmq.broker.processor.PopLiteMessageProcessor;
2737
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
2838
import org.apache.rocketmq.common.BrokerConfig;
@@ -38,17 +48,6 @@
3848
import org.mockito.Mockito;
3949
import org.mockito.junit.MockitoJUnitRunner;
4050

41-
import java.util.Arrays;
42-
import java.util.Collections;
43-
import java.util.HashSet;
44-
import java.util.Iterator;
45-
import java.util.List;
46-
import java.util.Map;
47-
import java.util.Set;
48-
import java.util.UUID;
49-
import java.util.concurrent.ConcurrentMap;
50-
import java.util.concurrent.ConcurrentSkipListSet;
51-
5251
import static org.apache.rocketmq.broker.lite.LiteEventDispatcher.COMPARATOR;
5352
import static org.mockito.ArgumentMatchers.anyString;
5453
import static org.mockito.Mockito.never;

0 commit comments

Comments
 (0)