Skip to content

Commit 2a88e6e

Browse files
Reverting back the reload, adding memory estimate on persisters
1 parent 6606896 commit 2a88e6e

File tree

5 files changed

+120
-14
lines changed

5 files changed

+120
-14
lines changed

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ public int getEncodeSize(Message record) {
5858
try {
5959
int encodeSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_BOOLEAN + buf.writerIndex() +
6060
DataConstants.SIZE_LONG + // expiredTime
61-
DataConstants.SIZE_BOOLEAN; // reencoded
61+
DataConstants.SIZE_BOOLEAN + // reencoded
62+
DataConstants.SIZE_INT; // memoryEstimate
6263

6364
TypedProperties properties = ((AMQPMessage) record).getExtraProperties();
6465

@@ -93,6 +94,7 @@ public void encode(ActiveMQBuffer buffer, Message record) {
9394
buffer.writeBytes(savedEncodeBuffer, 0, savedEncodeBuffer.writerIndex());
9495
buffer.writeLong(record.getExpiration());
9596
buffer.writeBoolean(msgEncode.isReencoded());
97+
buffer.writeInt(msgEncode.getMemoryEstimate());
9698
msgEncode.releaseEncodedBufferAfterWrite(); // we need two releases, as getSavedEncodedBuffer will keep 1 for himself until encoding has happened
9799
// which this is the expected event where we need to release the extra refCounter
98100
}
@@ -134,6 +136,12 @@ public Message decode(ActiveMQBuffer buffer, Message record, CoreMessageObjectPo
134136
largeMessage.setReencoded(reEncoded);
135137
}
136138

139+
if (buffer.readableBytes() >= DataConstants.SIZE_INT) {
140+
largeMessage.setMemoryEstimate(buffer.readInt());
141+
} else {
142+
largeMessage.setMemoryEstimate(AMQPMessage.VALUE_NOT_PRESENT);
143+
}
144+
137145
return largeMessage;
138146
}
139147

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
155155
* developing purposes.
156156
*/
157157
public enum MessageDataScanningStatus {
158-
NOT_SCANNED(0), SCANNED(1);
158+
NOT_SCANNED(0), RELOAD_PERSISTENCE(1), SCANNED(2);
159159

160160
private static final MessageDataScanningStatus[] STATES;
161161

@@ -228,6 +228,10 @@ public void routed() {
228228
this.routed = true;
229229
}
230230

231+
void setMemoryEstimate(int memoryEstimate) {
232+
this.memoryEstimate = memoryEstimate;
233+
}
234+
231235
// The Proton based AMQP message section that are retained in memory, these are the
232236
// mutable portions of the Message as the broker sees it, although AMQP defines that
233237
// the Properties and ApplicationProperties are immutable so care should be taken
@@ -648,6 +652,9 @@ protected synchronized void ensureMessageDataScanned() {
648652
case NOT_SCANNED:
649653
scanMessageData();
650654
break;
655+
case RELOAD_PERSISTENCE:
656+
lazyScanAfterReloadPersistence();
657+
break;
651658
case SCANNED:
652659
// NO-OP
653660
break;
@@ -1044,6 +1051,16 @@ protected int internalPersistSize() {
10441051
@Override
10451052
public abstract void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools);
10461053

1054+
protected synchronized void lazyScanAfterReloadPersistence() {
1055+
assert messageDataScanned == MessageDataScanningStatus.RELOAD_PERSISTENCE.code;
1056+
scanMessageData();
1057+
messageDataScanned = MessageDataScanningStatus.SCANNED.code;
1058+
modified = false;
1059+
// reinitialise memory estimate as message will already be on a queue
1060+
// and lazy decode will want to update
1061+
getMemoryEstimate();
1062+
}
1063+
10471064
@Override
10481065
public abstract long getPersistentSize() throws ActiveMQException;
10491066

@@ -1225,8 +1242,9 @@ public boolean isDurable() {
12251242
if (header != null && header .getDurable() != null) {
12261243
return header.getDurable();
12271244
} else {
1228-
// we will assume it's non-persistent if no header
1229-
return false;
1245+
// if header == null and scanningStatus=RELOAD_PERSISTENCE, it means the message can only be durable
1246+
// even though the parsing hasn't happened yet
1247+
return getDataScanningStatus() == MessageDataScanningStatus.RELOAD_PERSISTENCE;
12301248
}
12311249
}
12321250

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV3.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ public AMQPMessagePersisterV3() {
4949
@Override
5050
public int getEncodeSize(Message record) {
5151
int encodeSize = super.getEncodeSize(record) +
52-
DataConstants.SIZE_LONG; // expiration
52+
DataConstants.SIZE_LONG + // expiration
53+
DataConstants.SIZE_INT; // memory Estimate
5354
return encodeSize;
5455
}
5556

@@ -62,6 +63,7 @@ public void encode(ActiveMQBuffer buffer, Message record) {
6263
super.encode(buffer, record);
6364

6465
buffer.writeLong(record.getExpiration());
66+
buffer.writeInt(record.getMemoryEstimate());
6567
}
6668

6769
@Override
@@ -72,6 +74,11 @@ assert record != null && AMQPStandardMessage.class.equals(record.getClass());
7274

7375
((AMQPStandardMessage)record).reloadExpiration(buffer.readLong());
7476

77+
if (buffer.readableBytes() >= DataConstants.SIZE_INT) {
78+
int memoryEstimate = buffer.readInt();
79+
((AMQPStandardMessage)record).setMemoryEstimate(memoryEstimate);
80+
}
81+
7582
return record;
7683
}
7784

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,12 @@
3939
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
4040
import org.apache.qpid.proton.amqp.messaging.Properties;
4141
import org.apache.qpid.proton.amqp.messaging.Section;
42+
import org.apache.qpid.proton.codec.DecodeException;
43+
import org.apache.qpid.proton.codec.DecoderImpl;
4244
import org.apache.qpid.proton.codec.EncoderImpl;
45+
import org.apache.qpid.proton.codec.EncodingCodes;
4346
import org.apache.qpid.proton.codec.ReadableBuffer;
47+
import org.apache.qpid.proton.codec.TypeConstructor;
4448
import org.apache.qpid.proton.codec.WritableBuffer;
4549

4650
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
@@ -238,10 +242,79 @@ public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pool
238242

239243
// Message state is now that the underlying buffer is loaded, but the contents not yet scanned
240244
resetMessageData();
241-
scanMessageData(data);
245+
recoverHeaderDataFromEncoding();
242246

243247
modified = false;
244-
messageDataScanned = MessageDataScanningStatus.SCANNED.code;
248+
messageDataScanned = MessageDataScanningStatus.RELOAD_PERSISTENCE.code;
249+
}
250+
251+
private void recoverHeaderDataFromEncoding() {
252+
final DecoderImpl decoder = TLSEncode.getDecoder();
253+
decoder.setBuffer(data);
254+
255+
try {
256+
// At one point the broker could write the header and delivery annotations out of order
257+
// which means a full scan is required for maximum compatibility with that older data
258+
// where delivery annotations could be found ahead of the Header in the encoding.
259+
//
260+
// We manually extract the priority from the Header encoding if present to ensure we do
261+
// not create any unneeded GC overhead during load from storage. We don't directly store
262+
// other values from the header except for a value that is computed based on TTL and or
263+
// absolute expiration time in the Properties section, but that value is stored in the
264+
// data of the persisted message.
265+
for (int section = 0; section < 2 && data.hasRemaining(); section++) {
266+
final TypeConstructor<?> constructor = decoder.readConstructor();
267+
268+
if (Header.class.equals(constructor.getTypeClass())) {
269+
final byte typeCode = data.get();
270+
271+
@SuppressWarnings("unused")
272+
int size = 0;
273+
int count = 0;
274+
275+
switch (typeCode) {
276+
case EncodingCodes.LIST0:
277+
break;
278+
case EncodingCodes.LIST8:
279+
size = data.get() & 0xff;
280+
count = data.get() & 0xff;
281+
break;
282+
case EncodingCodes.LIST32:
283+
size = data.getInt();
284+
count = data.getInt();
285+
break;
286+
default:
287+
throw new DecodeException("Incorrect type found in Header encoding: " + typeCode);
288+
}
289+
290+
// Priority is stored in the second slot of the Header list encoding if present
291+
if (count >= 2) {
292+
decoder.readBoolean(false); // Discard durable for now, it is computed elsewhere.
293+
294+
final byte encodingCode = data.get();
295+
final int priority = switch (encodingCode) {
296+
case EncodingCodes.UBYTE -> data.get() & 0xff;
297+
case EncodingCodes.NULL -> DEFAULT_MESSAGE_PRIORITY;
298+
default ->
299+
throw new DecodeException("Expected UnsignedByte type but found encoding: " + EncodingCodes.toString(encodingCode));
300+
};
301+
302+
// Scaled here so do not call setPriority as that will store the set value in the AMQP header
303+
// and we don't want to create that Header instance at this stage.
304+
this.priority = (byte) Math.min(priority, MAX_MESSAGE_PRIORITY);
305+
}
306+
307+
return;
308+
} else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) {
309+
constructor.skipValue();
310+
} else {
311+
return;
312+
}
313+
}
314+
} finally {
315+
decoder.setBuffer(null);
316+
data.rewind(); // Ensure next scan start at the beginning.
317+
}
245318
}
246319

247320
@Override

artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -216,11 +216,11 @@ public void testHasScheduledDeliveryTimeReloadPersistence() {
216216
// Now reload from encoded data
217217
message.reloadPersistence(encoded, null);
218218

219-
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
219+
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
220220

221221
assertTrue(message.hasScheduledDeliveryTime());
222222

223-
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
223+
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
224224

225225
message.getHeader();
226226

@@ -249,11 +249,11 @@ public void testHasScheduledDeliveryDelayReloadPersistence() {
249249
// Now reload from encoded data
250250
message.reloadPersistence(encoded, null);
251251

252-
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
252+
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
253253

254254
assertTrue(message.hasScheduledDeliveryTime());
255255

256-
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
256+
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
257257

258258
message.getHeader();
259259

@@ -279,11 +279,11 @@ public void testNoScheduledDeliveryTimeOrDelayReloadPersistence() {
279279
// Now reload from encoded data
280280
message.reloadPersistence(encoded, null);
281281

282-
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
282+
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
283283

284284
assertFalse(message.hasScheduledDeliveryTime());
285285

286-
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
286+
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
287287

288288
message.getHeader();
289289

@@ -2881,7 +2881,7 @@ private boolean isEquals(MessageImpl left, MessageImpl right) {
28812881
assertTrue(isEquals(left.getBody(), right.getBody()));
28822882
assertFootersEquals(left.getFooter(), right.getFooter());
28832883
} catch (Throwable e) {
2884-
logger.info(e.getMessage(), e);
2884+
logger.debug(e.getMessage(), e);
28852885
return false;
28862886
}
28872887

0 commit comments

Comments
 (0)