Skip to content

Commit ae69814

Browse files
authored
[ISSUE #9923] Transactional messages should not send custom delayed messages (#9924)
* [ISSUE #9923] Transactional messages should not send custom delayed messages * Transactional messages do not support delayed delivery
1 parent 4c66580 commit ae69814

File tree

2 files changed

+23
-6
lines changed

2 files changed

+23
-6
lines changed

client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1434,11 +1434,7 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
14341434
throw new MQClientException("tranExecutor is null", null);
14351435
}
14361436

1437-
// ignore DelayTimeLevel parameter
1438-
if (msg.getDelayTimeLevel() != 0) {
1439-
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
1440-
}
1441-
1437+
ensureNotDelayedForTransactional(msg);
14421438
Validators.checkMessage(msg, this.defaultMQProducer);
14431439

14441440
SendResult sendResult = null;
@@ -1495,7 +1491,7 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
14951491
try {
14961492
this.endTransaction(msg, sendResult, localTransactionState, localException);
14971493
} catch (Exception e) {
1498-
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
1494+
log.warn("local transaction execute {}, but end broker transaction failed", localTransactionState, e);
14991495
}
15001496

15011497
TransactionSendResult transactionSendResult = new TransactionSendResult();
@@ -1508,6 +1504,15 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
15081504
return transactionSendResult;
15091505
}
15101506

1507+
private void ensureNotDelayedForTransactional(final Message msg) throws MQClientException {
1508+
if (msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null
1509+
|| msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS) != null
1510+
|| msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null
1511+
|| msg.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null) {
1512+
throw new MQClientException("Transactional messages do not support delayed delivery", null);
1513+
}
1514+
}
1515+
15111516
/**
15121517
* DEFAULT SYNC -------------------------------------------------------
15131518
*/

client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,18 @@ public Object answer(InvocationOnMock mock) throws Throwable {
168168
assertThat(ctx.getMessage().getTopic()).isEqualTo(topic);
169169
}
170170

171+
@Test(expected = MQClientException.class)
172+
public void testSendMessageInTransaction_NoListener_ThrowsException() throws MQClientException {
173+
producer.setTransactionListener(null);
174+
producer.sendMessageInTransaction(message, null);
175+
}
176+
177+
@Test(expected = MQClientException.class)
178+
public void testSendMessageInTransaction_DelayMsg_ThrowsException() throws MQClientException {
179+
message.setDelayTimeLevel(3);
180+
producer.sendMessageInTransaction(message, null);
181+
}
182+
171183
@After
172184
public void terminate() {
173185
producer.shutdown();

0 commit comments

Comments
 (0)