Skip to content

Commit 1cb4403

Browse files
ARTEMIS-5573 and ARTEMIS-5975 Adding RELOAD back
MemoryEstimate is now persisted with the message so we won't have to scan it to recalculate it. If loading a previous version, the server will at that point scan the message.
1 parent 6606896 commit 1cb4403

File tree

10 files changed

+238
-20
lines changed

10 files changed

+238
-20
lines changed

artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/PersisterIDs.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
*/
2424
public class PersisterIDs {
2525

26-
public static final int MAX_PERSISTERS = 5;
26+
public static final int MAX_PERSISTERS = 6;
2727

2828
public static final byte CoreLargeMessagePersister_ID = (byte)0;
2929

@@ -37,4 +37,6 @@ public class PersisterIDs {
3737

3838
public static final byte AMQPMessagePersisterV3_ID = (byte)5;
3939

40+
public static final byte AMQPMessagePersisterV4_ID = (byte)6;
41+
4042
}

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ public int getEncodeSize(Message record) {
5858
try {
5959
int encodeSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_BOOLEAN + buf.writerIndex() +
6060
DataConstants.SIZE_LONG + // expiredTime
61-
DataConstants.SIZE_BOOLEAN; // reencoded
61+
DataConstants.SIZE_BOOLEAN + // reencoded
62+
DataConstants.SIZE_INT; // memoryEstimate
6263

6364
TypedProperties properties = ((AMQPMessage) record).getExtraProperties();
6465

@@ -93,6 +94,7 @@ public void encode(ActiveMQBuffer buffer, Message record) {
9394
buffer.writeBytes(savedEncodeBuffer, 0, savedEncodeBuffer.writerIndex());
9495
buffer.writeLong(record.getExpiration());
9596
buffer.writeBoolean(msgEncode.isReencoded());
97+
buffer.writeInt(msgEncode.getMemoryEstimate());
9698
msgEncode.releaseEncodedBufferAfterWrite(); // we need two releases, as getSavedEncodedBuffer will keep 1 for himself until encoding has happened
9799
// which this is the expected event where we need to release the extra refCounter
98100
}
@@ -134,6 +136,12 @@ public Message decode(ActiveMQBuffer buffer, Message record, CoreMessageObjectPo
134136
largeMessage.setReencoded(reEncoded);
135137
}
136138

139+
if (buffer.readableBytes() >= DataConstants.SIZE_INT) {
140+
largeMessage.setMemoryEstimate(buffer.readInt());
141+
} else {
142+
largeMessage.setMemoryEstimate(AMQPMessage.VALUE_NOT_PRESENT);
143+
}
144+
137145
return largeMessage;
138146
}
139147

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
155155
* developing purposes.
156156
*/
157157
public enum MessageDataScanningStatus {
158-
NOT_SCANNED(0), SCANNED(1);
158+
NOT_SCANNED(0), RELOAD_PERSISTENCE(1), SCANNED(2);
159159

160160
private static final MessageDataScanningStatus[] STATES;
161161

@@ -228,6 +228,10 @@ public void routed() {
228228
this.routed = true;
229229
}
230230

231+
void setMemoryEstimate(int memoryEstimate) {
232+
this.memoryEstimate = memoryEstimate;
233+
}
234+
231235
// The Proton based AMQP message section that are retained in memory, these are the
232236
// mutable portions of the Message as the broker sees it, although AMQP defines that
233237
// the Properties and ApplicationProperties are immutable so care should be taken
@@ -648,6 +652,9 @@ protected synchronized void ensureMessageDataScanned() {
648652
case NOT_SCANNED:
649653
scanMessageData();
650654
break;
655+
case RELOAD_PERSISTENCE:
656+
lazyScanAfterReloadPersistence();
657+
break;
651658
case SCANNED:
652659
// NO-OP
653660
break;
@@ -1044,6 +1051,16 @@ protected int internalPersistSize() {
10441051
@Override
10451052
public abstract void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools);
10461053

1054+
protected synchronized void lazyScanAfterReloadPersistence() {
1055+
assert messageDataScanned == MessageDataScanningStatus.RELOAD_PERSISTENCE.code;
1056+
scanMessageData();
1057+
messageDataScanned = MessageDataScanningStatus.SCANNED.code;
1058+
modified = false;
1059+
// reinitialise memory estimate as message will already be on a queue
1060+
// and lazy decode will want to update
1061+
getMemoryEstimate();
1062+
}
1063+
10471064
@Override
10481065
public abstract long getPersistentSize() throws ActiveMQException;
10491066

@@ -1225,8 +1242,9 @@ public boolean isDurable() {
12251242
if (header != null && header .getDurable() != null) {
12261243
return header.getDurable();
12271244
} else {
1228-
// we will assume it's non-persistent if no header
1229-
return false;
1245+
// if header == null and scanningStatus=RELOAD_PERSISTENCE, it means the message can only be durable
1246+
// even though the parsing hasn't happened yet
1247+
return getDataScanningStatus() == MessageDataScanningStatus.RELOAD_PERSISTENCE;
12301248
}
12311249
}
12321250

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.artemis.protocol.amqp.broker;
18+
19+
import java.lang.invoke.MethodHandles;
20+
21+
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
22+
import org.apache.activemq.artemis.api.core.Message;
23+
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
24+
import org.apache.activemq.artemis.utils.DataConstants;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import static org.apache.activemq.artemis.core.persistence.PersisterIDs.AMQPMessagePersisterV4_ID;
29+
30+
/** V4 adds a size field to determine persister boundaries during paging, enabling forward-compatible
31+
* extensions without additional versioning. */
32+
public class AMQPMessagePersisterV4 extends AMQPMessagePersisterV3 {
33+
34+
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
35+
36+
public static final byte ID = AMQPMessagePersisterV4_ID;
37+
38+
public static AMQPMessagePersisterV4 theInstance;
39+
40+
public static AMQPMessagePersisterV4 getInstance() {
41+
if (theInstance == null) {
42+
theInstance = new AMQPMessagePersisterV4();
43+
}
44+
return theInstance;
45+
}
46+
47+
@Override
48+
public byte getID() {
49+
return ID;
50+
}
51+
52+
public AMQPMessagePersisterV4() {
53+
super();
54+
}
55+
56+
57+
@Override
58+
public int getEncodeSize(Message record) {
59+
int encodeSize = super.getEncodeSize(record) +
60+
DataConstants.SIZE_INT + // How many bytes are written in Persister4.
61+
DataConstants.SIZE_INT; // Memory estimate
62+
return encodeSize;
63+
}
64+
65+
66+
/**
67+
* Sub classes must add the first short as the protocol-id
68+
*/
69+
@Override
70+
public void encode(ActiveMQBuffer buffer, Message record) {
71+
super.encode(buffer, record);
72+
73+
// V2, V3, and V4 were created as new journal fields were added.
74+
// Queue counts and IDs persisted after the Message make the size non-deterministic.
75+
buffer.writeInt(DataConstants.SIZE_INT); // for future use
76+
77+
buffer.writeInt(record.getMemoryEstimate());
78+
}
79+
80+
@Override
81+
public Message decode(ActiveMQBuffer buffer, Message ignore, CoreMessageObjectPools pool) {
82+
Message record = super.decode(buffer, ignore, pool);
83+
84+
assert record != null && AMQPStandardMessage.class.equals(record.getClass());
85+
86+
// This might be useful in the future if we need to add more data to this persister. For now, it's just a filler.
87+
int sizeV4 = buffer.readInt();
88+
89+
((AMQPStandardMessage)record).setMemoryEstimate(buffer.readInt());
90+
91+
// if in the future you need to add more data, you can then verify how many bytes are still left
92+
// so you won't need to add another version for additional data.
93+
94+
return record;
95+
}
96+
97+
}

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,12 @@
3939
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
4040
import org.apache.qpid.proton.amqp.messaging.Properties;
4141
import org.apache.qpid.proton.amqp.messaging.Section;
42+
import org.apache.qpid.proton.codec.DecodeException;
43+
import org.apache.qpid.proton.codec.DecoderImpl;
4244
import org.apache.qpid.proton.codec.EncoderImpl;
45+
import org.apache.qpid.proton.codec.EncodingCodes;
4346
import org.apache.qpid.proton.codec.ReadableBuffer;
47+
import org.apache.qpid.proton.codec.TypeConstructor;
4448
import org.apache.qpid.proton.codec.WritableBuffer;
4549

4650
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
@@ -238,10 +242,79 @@ public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pool
238242

239243
// Message state is now that the underlying buffer is loaded, but the contents not yet scanned
240244
resetMessageData();
241-
scanMessageData(data);
245+
recoverHeaderDataFromEncoding();
242246

243247
modified = false;
244-
messageDataScanned = MessageDataScanningStatus.SCANNED.code;
248+
messageDataScanned = MessageDataScanningStatus.RELOAD_PERSISTENCE.code;
249+
}
250+
251+
private void recoverHeaderDataFromEncoding() {
252+
final DecoderImpl decoder = TLSEncode.getDecoder();
253+
decoder.setBuffer(data);
254+
255+
try {
256+
// At one point the broker could write the header and delivery annotations out of order
257+
// which means a full scan is required for maximum compatibility with that older data
258+
// where delivery annotations could be found ahead of the Header in the encoding.
259+
//
260+
// We manually extract the priority from the Header encoding if present to ensure we do
261+
// not create any unneeded GC overhead during load from storage. We don't directly store
262+
// other values from the header except for a value that is computed based on TTL and or
263+
// absolute expiration time in the Properties section, but that value is stored in the
264+
// data of the persisted message.
265+
for (int section = 0; section < 2 && data.hasRemaining(); section++) {
266+
final TypeConstructor<?> constructor = decoder.readConstructor();
267+
268+
if (Header.class.equals(constructor.getTypeClass())) {
269+
final byte typeCode = data.get();
270+
271+
@SuppressWarnings("unused")
272+
int size = 0;
273+
int count = 0;
274+
275+
switch (typeCode) {
276+
case EncodingCodes.LIST0:
277+
break;
278+
case EncodingCodes.LIST8:
279+
size = data.get() & 0xff;
280+
count = data.get() & 0xff;
281+
break;
282+
case EncodingCodes.LIST32:
283+
size = data.getInt();
284+
count = data.getInt();
285+
break;
286+
default:
287+
throw new DecodeException("Incorrect type found in Header encoding: " + typeCode);
288+
}
289+
290+
// Priority is stored in the second slot of the Header list encoding if present
291+
if (count >= 2) {
292+
decoder.readBoolean(false); // Discard durable for now, it is computed elsewhere.
293+
294+
final byte encodingCode = data.get();
295+
final int priority = switch (encodingCode) {
296+
case EncodingCodes.UBYTE -> data.get() & 0xff;
297+
case EncodingCodes.NULL -> DEFAULT_MESSAGE_PRIORITY;
298+
default ->
299+
throw new DecodeException("Expected UnsignedByte type but found encoding: " + EncodingCodes.toString(encodingCode));
300+
};
301+
302+
// Scaled here so do not call setPriority as that will store the set value in the AMQP header
303+
// and we don't want to create that Header instance at this stage.
304+
this.priority = (byte) Math.min(priority, MAX_MESSAGE_PRIORITY);
305+
}
306+
307+
return;
308+
} else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) {
309+
constructor.skipValue();
310+
} else {
311+
return;
312+
}
313+
}
314+
} finally {
315+
decoder.setBuffer(null);
316+
data.rewind(); // Ensure next scan start at the beginning.
317+
}
245318
}
246319

247320
@Override
@@ -251,7 +324,7 @@ public long getPersistentSize() throws ActiveMQException {
251324

252325
@Override
253326
public Persister<org.apache.activemq.artemis.api.core.Message> getPersister() {
254-
return AMQPMessagePersisterV3.getInstance();
327+
return AMQPMessagePersisterV4.getInstance();
255328
}
256329

257330
@Override

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
5252
@Override
5353
public Persister<Message>[] getPersister() {
5454

55-
Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance(), AMQPLargeMessagePersister.getInstance(), AMQPMessagePersisterV3.getInstance()};
55+
Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance(), AMQPLargeMessagePersister.getInstance(), AMQPMessagePersisterV3.getInstance(), AMQPMessagePersisterV4.getInstance()};
5656
return persisters;
5757
}
5858

artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -216,11 +216,11 @@ public void testHasScheduledDeliveryTimeReloadPersistence() {
216216
// Now reload from encoded data
217217
message.reloadPersistence(encoded, null);
218218

219-
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
219+
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
220220

221221
assertTrue(message.hasScheduledDeliveryTime());
222222

223-
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
223+
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
224224

225225
message.getHeader();
226226

@@ -249,11 +249,11 @@ public void testHasScheduledDeliveryDelayReloadPersistence() {
249249
// Now reload from encoded data
250250
message.reloadPersistence(encoded, null);
251251

252-
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
252+
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
253253

254254
assertTrue(message.hasScheduledDeliveryTime());
255255

256-
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
256+
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
257257

258258
message.getHeader();
259259

@@ -279,11 +279,11 @@ public void testNoScheduledDeliveryTimeOrDelayReloadPersistence() {
279279
// Now reload from encoded data
280280
message.reloadPersistence(encoded, null);
281281

282-
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
282+
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
283283

284284
assertFalse(message.hasScheduledDeliveryTime());
285285

286-
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
286+
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
287287

288288
message.getHeader();
289289

@@ -1658,7 +1658,7 @@ public void testExtraProperty() {
16581658
ActiveMQBuffer buffer = ActiveMQBuffers.pooledBuffer(10 * 1024);
16591659
try {
16601660
decoded.getPersister().encode(buffer, decoded);
1661-
assertEquals(AMQPMessagePersisterV3.getInstance().getID(), buffer.readByte()); // the journal reader will read 1 byte to find the persister
1661+
assertEquals(AMQPMessagePersisterV4.getInstance().getID(), buffer.readByte()); // the journal reader will read 1 byte to find the persister
16621662
AMQPStandardMessage readMessage = (AMQPStandardMessage)decoded.getPersister().decode(buffer, null, null);
16631663
assertEquals(33, readMessage.getMessageID());
16641664
assertEquals("someAddress", readMessage.getAddress());
@@ -2881,7 +2881,7 @@ private boolean isEquals(MessageImpl left, MessageImpl right) {
28812881
assertTrue(isEquals(left.getBody(), right.getBody()));
28822882
assertFootersEquals(left.getFooter(), right.getFooter());
28832883
} catch (Throwable e) {
2884-
logger.info(e.getMessage(), e);
2884+
logger.debug(e.getMessage(), e);
28852885
return false;
28862886
}
28872887

0 commit comments

Comments
 (0)