Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.apple.foundationdb.annotation.API;

import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.RecordCursorContinuation;
import com.apple.foundationdb.relational.api.Continuation;
import com.apple.foundationdb.relational.api.exceptions.ErrorCode;
Expand All @@ -43,26 +44,43 @@
public final class ContinuationImpl implements Continuation {
public static final int CURRENT_VERSION = 1;

public static final ContinuationImpl BEGIN = new ContinuationImpl((byte[]) null);
public static final ContinuationImpl BEGIN = new ContinuationImpl((byte[]) null, Reason.CURSOR_AFTER_LAST);

public static final ContinuationImpl END = new ContinuationImpl(new byte[0]);
public static final ContinuationImpl END = new ContinuationImpl(new byte[0], Reason.CURSOR_AFTER_LAST);

@Nonnull
private final ContinuationProto proto;

// TODO(yhatem) remove semantic nulls.
private ContinuationImpl(@Nullable byte[] continuationBytes) {
private ContinuationImpl(@Nullable byte[] continuationBytes, @Nullable final Reason reason) {
ContinuationProto.Builder builder = ContinuationProto.newBuilder().setVersion(CURRENT_VERSION);
if (continuationBytes != null) {
builder.setExecutionState(ByteString.copyFrom(continuationBytes));
}
if (reason != null) {
builder.setReason(ContinuationProto.Reason.valueOf(reason.name()));
}
proto = builder.build();
}

ContinuationImpl(@Nonnull ContinuationProto proto) {
this.proto = proto;
}

@Nullable
public static Reason reasonFromCursor(@Nullable final RecordCursor.NoNextReason noNextReason) {
if (noNextReason == null) {
return null;
}
return switch (noNextReason) {
case SOURCE_EXHAUSTED -> Reason.CURSOR_AFTER_LAST;
case RETURN_LIMIT_REACHED -> Reason.USER_REQUESTED_CONTINUATION;
case TIME_LIMIT_REACHED -> Reason.TRANSACTION_LIMIT_REACHED;
case SCAN_LIMIT_REACHED -> Reason.QUERY_EXECUTION_LIMIT_REACHED;
case BYTE_LIMIT_REACHED -> Reason.QUERY_EXECUTION_LIMIT_REACHED;
};
}

public int getVersion() {
return proto.getVersion();
}
Expand Down Expand Up @@ -155,36 +173,53 @@

/**
* Create a new continuation from a given (inner) continuation bytes.
*
* @param bytes the inner (cursor continuation) to be placed inside the newly created continuation
*
* @return a continuation that holds the given cursor continuation
*/
public static Continuation fromUnderlyingBytes(@Nullable byte[] bytes, final RecordCursor.NoNextReason reason) {
return fromUnderlyingBytes(bytes, reasonFromCursor(reason));
}

/**
* Create a new continuation from a given (inner) continuation bytes.
*
* @param bytes the inner (cursor continuation) to be placed inside the newly created continuation
*
* @return a continuation that holds the given cursor continuation
*/
public static Continuation fromUnderlyingBytes(@Nullable byte[] bytes) {
public static Continuation fromUnderlyingBytes(@Nullable byte[] bytes, @Nullable final Reason reason) {
if (bytes == null) {
return BEGIN;
} else if (bytes.length == 0) {
return END;
}
return new ContinuationImpl(bytes);
return new ContinuationImpl(bytes, reason);
}

/**
* Create a new continuation from a given (inner) continuation Integer offset.
*
* @param offset the offset to be placed inside the newly created continuation
*
* @return a continuation that holds the given offset
*/
public static Continuation fromInt(int offset) {
public static Continuation fromInt(int offset, final Reason reason) {
assert offset >= 0;
return new ContinuationImpl(Ints.toByteArray(offset));
return new ContinuationImpl(Ints.toByteArray(offset), reason);
}

/**
* Create a new continuation from a given cursor continuation.
*
* @param cursorContinuation the inner cursor continuation to be placed inside the newly created continuation
*
* @return a continuation that holds the given cursor continuation
*/
public static Continuation fromRecordCursorContinuation(RecordCursorContinuation cursorContinuation) {
return cursorContinuation.isEnd() ? END : new ContinuationImpl(cursorContinuation.toBytes());
public static Continuation fromRecordCursorContinuation(RecordCursorContinuation cursorContinuation, final RecordCursor.NoNextReason noNextReason) {
return cursorContinuation.isEnd() ? END : ContinuationImpl.fromUnderlyingBytes(cursorContinuation.toBytes(), noNextReason);
}

Check warning on line 222 in fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/ContinuationImpl.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Test Gaps

fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/ContinuationImpl.java#L220-L222

[Test Gap] Changed method `fromRecordCursorContinuation` has not been tested. https://fdb.teamscale.io/metrics/code/foundationdb-fdb-record-layer/fdb-relational-core%2Fsrc%2Fmain%2Fjava%2Fcom%2Fapple%2Ffoundationdb%2Frelational%2Frecordlayer%2FContinuationImpl.java?coverage-mode=test-gap&t=FORK_MR%2F4165%2FScottDugas%2Fmixed-mode-option-e1-with-mm%3AHEAD&selection=char-7710-7995&merge-request=FoundationDB%2Ffdb-record-layer%2F4165

/**
* Deserialize and parse a continuation. This would create a continuation from a previously serialized byte array.
Expand All @@ -196,7 +231,15 @@
if (bytes == null) {
return BEGIN;
} else {
return new ContinuationImpl(ContinuationProto.parseFrom(bytes));
final var proto = ContinuationProto.parseFrom(bytes);
// The reason field was introduced in 4.11.1.0. Reject continuations from older versions that
// do not include it, as they may have incompatible serialization semantics.
if (!proto.hasReason()) {
throw new InvalidProtocolBufferException(
"Continuation is missing required 'reason' field; " +
"it may have been generated by a version older than 4.11.1.0");
}
return new ContinuationImpl(proto);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private void fetchNextResult() {
if (noNextReason == RecordCursor.NoNextReason.SOURCE_EXHAUSTED) {
this.continuation = ContinuationImpl.END;
} else {
this.continuation = ContinuationImpl.fromUnderlyingBytes(result.getContinuation().toBytes());
this.continuation = ContinuationImpl.fromUnderlyingBytes(result.getContinuation().toBytes(), noNextReason);
}
}
}
Expand All @@ -104,7 +104,7 @@ public Row next() {
try {
final var row = transform.apply(result.get());
// TODO(sfines,yhatem) pass the Record-Layer Continuation object as-is to avoid copying bytes around.
this.continuation = ContinuationImpl.fromUnderlyingBytes(result.getContinuation().toBytes());
this.continuation = ContinuationImpl.fromUnderlyingBytes(result.getContinuation().toBytes(), result.hasNext() ? null : result.getNoNextReason());
return row;
} catch (RecordCoreException exception) {
throw ExceptionUtil.toRelationalException(exception).toUncheckedWrappedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package com.apple.foundationdb.relational.recordlayer;

import com.apple.foundationdb.relational.api.Continuation;
import com.apple.foundationdb.relational.api.Options;
import com.apple.foundationdb.relational.continuation.ContinuationProto;
import com.google.common.primitives.Ints;
Expand All @@ -45,19 +46,19 @@ public void testEnd() {

@Test
public void testBytes() {
ContinuationImpl continuation = (ContinuationImpl) ContinuationImpl.fromUnderlyingBytes("Hello".getBytes());
ContinuationImpl continuation = (ContinuationImpl) ContinuationImpl.fromUnderlyingBytes("Hello".getBytes(), Continuation.Reason.CURSOR_AFTER_LAST);
assertContinuation(continuation, false, false, "Hello".getBytes());
}

@Test
public void testInt() {
ContinuationImpl continuation = (ContinuationImpl) ContinuationImpl.fromInt(5);
ContinuationImpl continuation = (ContinuationImpl) ContinuationImpl.fromInt(5, Continuation.Reason.CURSOR_AFTER_LAST);
assertContinuation(continuation, false, false, Ints.toByteArray(5));
}

@Test
public void serializeAndRestore() throws Exception {
ContinuationImpl continuation = (ContinuationImpl) ContinuationImpl.fromUnderlyingBytes("Hello".getBytes());
ContinuationImpl continuation = (ContinuationImpl) ContinuationImpl.fromUnderlyingBytes("Hello".getBytes(), Continuation.Reason.CURSOR_AFTER_LAST);
byte[] bytes = continuation.serialize();
continuation = ContinuationImpl.parseContinuation(bytes);
assertContinuation(continuation, false, false, "Hello".getBytes());
Expand All @@ -69,6 +70,7 @@ public void customProto() throws Exception {
.setVersion(5)
.setExecutionState(ByteString.copyFrom("Blah".getBytes()))
.setBindingHash(1234)
.setReason(ContinuationProto.Reason.USER_REQUESTED_CONTINUATION)
.build();
ContinuationImpl continuation = ContinuationImpl.parseContinuation(proto.toByteArray());
Assertions.assertThat(continuation.atBeginning()).isEqualTo(false);
Expand All @@ -80,7 +82,7 @@ public void customProto() throws Exception {

@Test
public void testNullSameAsBegin() {
ContinuationImpl continuation = (ContinuationImpl) ContinuationImpl.fromUnderlyingBytes(null);
ContinuationImpl continuation = (ContinuationImpl) ContinuationImpl.fromUnderlyingBytes(null, Continuation.Reason.CURSOR_AFTER_LAST);
assertContinuation(continuation, true, false, null);
}

Expand All @@ -101,11 +103,11 @@ private void assertContinuation(ContinuationImpl continuation, boolean atBeginni
// Note that for the same reason it is not possible to get the continuation back from the string property.
public void testContinuationOption() throws Exception {
final byte[] asBytes = { (byte)0xFE, (byte)0xED, (byte)0xBA, (byte)0xC1 };
final ContinuationImpl continuation = (ContinuationImpl) ContinuationImpl.fromUnderlyingBytes(asBytes);
final ContinuationImpl continuation = (ContinuationImpl) ContinuationImpl.fromUnderlyingBytes(asBytes, Continuation.Reason.USER_REQUESTED_CONTINUATION);
final Options options = Options.builder().withOption(Options.Name.CONTINUATION, continuation).build();
final Properties properties = Options.toProperties(options);
// Field 1 (version): varint 1; field 2 (execution_state): len 4
Assertions.assertThat(properties).hasFieldOrPropertyWithValue(Options.Name.CONTINUATION.name(), "08011204FEEDBAC1");
Assertions.assertThat(properties).hasFieldOrPropertyWithValue(Options.Name.CONTINUATION.name(), "08011204FEEDBAC13000");
Assertions.assertThatThrownBy(() -> Options.fromProperties(properties)).isInstanceOf(UnsupportedOperationException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,7 @@ void listMappingsWithContinuation() throws RelationalException, SQLException {
.hasNoNextRow();
continuation = resultSet.getContinuation();
Assertions.assertThat(continuation.getReason())
.as("Continuation reasons are all erroneously null due to: https://github.com/FoundationDB/fdb-record-layer/issues/3227")
.isNull();
.isEqualTo(continuation.atEnd() ? Continuation.Reason.CURSOR_AFTER_LAST : Continuation.Reason.USER_REQUESTED_CONTINUATION);
}
}
}
Expand Down Expand Up @@ -499,8 +498,7 @@ void listResolverStateWithContinuation() throws RelationalException, SQLExceptio
Assertions.assertThat(continuation.atEnd())
.isFalse();
Assertions.assertThat(continuation.getReason())
.as("Continuation reasons are all erroneously null due to: https://github.com/FoundationDB/fdb-record-layer/issues/3227")
.isNull();
.isEqualTo(Continuation.Reason.USER_REQUESTED_CONTINUATION);
}
// There's always only a single record, so just assert that there is not another row returned
// when resumed from a continuation
Expand All @@ -513,8 +511,7 @@ void listResolverStateWithContinuation() throws RelationalException, SQLExceptio
Assertions.assertThat(continuation.atEnd())
.isTrue();
Assertions.assertThat(continuation.getReason())
.as("Continuation reasons are all erroneously null due to: https://github.com/FoundationDB/fdb-record-layer/issues/3227")
.isNull();
.isEqualTo(Continuation.Reason.CURSOR_AFTER_LAST);
}
}
}
Expand Down
Loading