Skip to content

Commit 540ad20

Browse files
Using a new persister for newer versions
1 parent 2a88e6e commit 540ad20

File tree

8 files changed

+125
-14
lines changed

8 files changed

+125
-14
lines changed

artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/PersisterIDs.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
*/
2424
public class PersisterIDs {
2525

26-
public static final int MAX_PERSISTERS = 5;
26+
public static final int MAX_PERSISTERS = 6;
2727

2828
public static final byte CoreLargeMessagePersister_ID = (byte)0;
2929

@@ -37,4 +37,6 @@ public class PersisterIDs {
3737

3838
public static final byte AMQPMessagePersisterV3_ID = (byte)5;
3939

40+
public static final byte AMQPMessagePersisterV4_ID = (byte)6;
41+
4042
}

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV3.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ public AMQPMessagePersisterV3() {
4949
@Override
5050
public int getEncodeSize(Message record) {
5151
int encodeSize = super.getEncodeSize(record) +
52-
DataConstants.SIZE_LONG + // expiration
53-
DataConstants.SIZE_INT; // memory Estimate
52+
DataConstants.SIZE_LONG; // expiration
5453
return encodeSize;
5554
}
5655

@@ -63,7 +62,6 @@ public void encode(ActiveMQBuffer buffer, Message record) {
6362
super.encode(buffer, record);
6463

6564
buffer.writeLong(record.getExpiration());
66-
buffer.writeInt(record.getMemoryEstimate());
6765
}
6866

6967
@Override
@@ -74,11 +72,6 @@ assert record != null && AMQPStandardMessage.class.equals(record.getClass());
7472

7573
((AMQPStandardMessage)record).reloadExpiration(buffer.readLong());
7674

77-
if (buffer.readableBytes() >= DataConstants.SIZE_INT) {
78-
int memoryEstimate = buffer.readInt();
79-
((AMQPStandardMessage)record).setMemoryEstimate(memoryEstimate);
80-
}
81-
8275
return record;
8376
}
8477

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.artemis.protocol.amqp.broker;
18+
19+
import java.lang.invoke.MethodHandles;
20+
21+
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
22+
import org.apache.activemq.artemis.api.core.Message;
23+
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
24+
import org.apache.activemq.artemis.utils.DataConstants;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import static org.apache.activemq.artemis.core.persistence.PersisterIDs.AMQPMessagePersisterV4_ID;
29+
30+
public class AMQPMessagePersisterV4 extends AMQPMessagePersisterV3 {
31+
32+
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
33+
34+
public static final byte ID = AMQPMessagePersisterV4_ID;
35+
36+
public static AMQPMessagePersisterV4 theInstance;
37+
38+
public static AMQPMessagePersisterV4 getInstance() {
39+
if (theInstance == null) {
40+
theInstance = new AMQPMessagePersisterV4();
41+
}
42+
return theInstance;
43+
}
44+
45+
@Override
46+
public byte getID() {
47+
return ID;
48+
}
49+
50+
public AMQPMessagePersisterV4() {
51+
super();
52+
}
53+
54+
55+
@Override
56+
public int getEncodeSize(Message record) {
57+
int encodeSize = super.getEncodeSize(record) +
58+
DataConstants.SIZE_INT + // How many bytes are written on Persister4.
59+
DataConstants.SIZE_INT; // memory Estimate
60+
return encodeSize;
61+
}
62+
63+
64+
/**
65+
* Sub classes must add the first short as the protocol-id
66+
*/
67+
@Override
68+
public void encode(ActiveMQBuffer buffer, Message record) {
69+
super.encode(buffer, record);
70+
71+
// along the version history we had to create V2, V3, and now V4 as I added new information on the journal
72+
// this is because after the Message, other things are persisted on the Journal such as the number of Queues, and the actual queue IDs, what makes it non deterministic
73+
buffer.writeInt(DataConstants.SIZE_INT); // for future use
74+
75+
buffer.writeInt(record.getMemoryEstimate());
76+
}
77+
78+
@Override
79+
public Message decode(ActiveMQBuffer buffer, Message ignore, CoreMessageObjectPools pool) {
80+
Message record = super.decode(buffer, ignore, pool);
81+
82+
assert record != null && AMQPStandardMessage.class.equals(record.getClass());
83+
84+
// This might be useful in the future if we need to add more data on this persister. for now it's just a filler
85+
int sizeV4 = buffer.readInt();
86+
87+
((AMQPStandardMessage)record).setMemoryEstimate(buffer.readInt());
88+
89+
// if in the future you need to add more data, you can then verify how many bytes are still left
90+
// so you won't need to add another version for additional data.
91+
92+
return record;
93+
}
94+
95+
}

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ public long getPersistentSize() throws ActiveMQException {
324324

325325
@Override
326326
public Persister<org.apache.activemq.artemis.api.core.Message> getPersister() {
327-
return AMQPMessagePersisterV3.getInstance();
327+
return AMQPMessagePersisterV4.getInstance();
328328
}
329329

330330
@Override

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
5252
@Override
5353
public Persister<Message>[] getPersister() {
5454

55-
Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance(), AMQPLargeMessagePersister.getInstance(), AMQPMessagePersisterV3.getInstance()};
55+
Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance(), AMQPLargeMessagePersister.getInstance(), AMQPMessagePersisterV3.getInstance(), AMQPMessagePersisterV4.getInstance()};
5656
return persisters;
5757
}
5858

tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.activemq.artemis.tests.compatibility;
1919

20+
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ARTEMIS_2_33_0;
2021
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
2122
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ARTEMIS_2_4_0;
2223
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ARTEMIS_2_1_0;
@@ -47,6 +48,7 @@ public static Collection getParameters() {
4748

4849
combinations.add(new Object[]{ARTEMIS_2_1_0, SNAPSHOT});
4950
combinations.add(new Object[]{ARTEMIS_2_4_0, SNAPSHOT});
51+
combinations.add(new Object[]{ARTEMIS_2_33_0, SNAPSHOT});
5052
// the purpose on this one is just to validate the test itself.
5153
/// if it can't run against itself it won't work at all
5254
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT});
@@ -89,6 +91,20 @@ public void testSendReceive() throws Throwable {
8991
evaluate(receiverClassloader, "meshTest/sendMessages.groovy", receiver, receiver, "receiveMessages");
9092
}
9193

94+
@TestTemplate
95+
public void testSendReceiveAMQP() throws Throwable {
96+
setVariable(senderClassloader, "persistent", true);
97+
startServer(serverFolder, sender, senderClassloader, "journalTest", null, true);
98+
evaluate(senderClassloader, "meshTest/sendMessages.groovy", sender, sender, "sendAckMessages", "AMQP");
99+
stopServer(senderClassloader);
100+
101+
setVariable(receiverClassloader, "persistent", true);
102+
startServer(serverFolder, receiver, receiverClassloader, "journalTest", null, false);
103+
104+
setVariable(receiverClassloader, "latch", null);
105+
evaluate(receiverClassloader, "meshTest/sendMessages.groovy", receiver, receiver, "receiveMessages", "AMQP");
106+
}
107+
92108
@TestTemplate
93109
public void testSendReceivePaging() throws Throwable {
94110
setVariable(senderClassloader, "persistent", true);

tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MultiVersionReplicaTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.activemq.artemis.tests.compatibility;
1919

20+
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ARTEMIS_2_33_0;
2021
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
2122
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ARTEMIS_2_44_0;
2223
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ARTEMIS_2_17_0;
@@ -38,6 +39,7 @@
3839
import org.apache.activemq.artemis.tests.compatibility.base.ClasspathBase;
3940
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
4041
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
42+
import org.apache.activemq.artemis.utils.FileUtil;
4143
import org.apache.qpid.jms.JmsConnectionFactory;
4244
import org.junit.jupiter.api.AfterEach;
4345
import org.junit.jupiter.api.TestTemplate;
@@ -63,13 +65,11 @@ public static Collection getParameters() {
6365
if (getJavaVersion() <= 22) {
6466
// Old 2.x servers fail on JDK23+ without workarounds.
6567
combinations.add(new Object[]{ARTEMIS_2_22_0, SNAPSHOT});
66-
combinations.add(new Object[]{SNAPSHOT, ARTEMIS_2_22_0});
6768
combinations.add(new Object[]{ARTEMIS_2_17_0, SNAPSHOT});
68-
combinations.add(new Object[]{SNAPSHOT, ARTEMIS_2_17_0});
69+
combinations.add(new Object[]{ARTEMIS_2_33_0, SNAPSHOT});
6970
}
7071

7172
combinations.add(new Object[]{ARTEMIS_2_44_0, SNAPSHOT});
72-
combinations.add(new Object[]{SNAPSHOT, ARTEMIS_2_44_0});
7373

7474
// The SNAPSHOT/SNAPSHOT is here as a test validation only, like in other cases where SNAPSHOT/SNAPSHOT is used.
7575
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT});

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,12 @@ public void durableMessageDataAfterRestart() throws Exception {
6666

6767
afterRestartQueueView.forEach((next) -> {
6868
final AMQPMessage message = (AMQPMessage)next.getMessage();
69+
long memoryEstimate = message.getMemoryEstimate(); // it should not change it
70+
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
71+
message.getApplicationProperties(); // this one should change it
6972
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
73+
// the estimate should be the same even after scanning
74+
assertEquals(memoryEstimate, message.getMemoryEstimate());
7075
messageReference.add(message);
7176
messageSizeAfterRestart.addAndGet(next.getMessage().getMemoryEstimate());
7277
});

0 commit comments

Comments
 (0)