Skip to content

Commit 00d28cd

Browse files
committed
INTERNAL: read while END/PIPE_ERROR received in the pipe operation
1 parent 381b25a commit 00d28cd

File tree

9 files changed

+147
-12
lines changed

9 files changed

+147
-12
lines changed

src/main/java/net/spy/memcached/internal/BulkOperationFuture.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.ArrayList;
44
import java.util.Collection;
55
import java.util.HashMap;
6+
import java.util.List;
67
import java.util.Map;
78
import java.util.concurrent.CountDownLatch;
89
import java.util.concurrent.ExecutionException;
@@ -90,16 +91,21 @@ public Map<String, T> get(long duration,
9091
MemcachedConnection.opsSucceeded(ops);
9192
}
9293

94+
List<Exception> exceptions = new ArrayList<>();
9395
for (Operation op : ops) {
9496
if (op != null && op.hasErrored()) {
95-
throw new ExecutionException(op.getException());
97+
exceptions.addAll(op.getExceptions());
9698
}
9799

98100
if (op != null && op.isCancelled()) {
99-
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
101+
exceptions.add(new RuntimeException(op.getCancelCause()));
100102
}
101103
}
102104

105+
if (!exceptions.isEmpty()) {
106+
throw new CompositeException(exceptions);
107+
}
108+
103109
return failedResult;
104110
}
105111

src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.ArrayList;
44
import java.util.Collection;
5+
import java.util.List;
56
import java.util.Map;
67
import java.util.concurrent.ConcurrentHashMap;
78
import java.util.concurrent.CountDownLatch;
@@ -83,16 +84,21 @@ public Map<K, V> get(long duration, TimeUnit unit)
8384
MemcachedConnection.opSucceeded(ops.iterator().next());
8485
}
8586

87+
List<Exception> exceptions = new ArrayList<>();
8688
for (Operation op : ops) {
8789
if (op != null && op.hasErrored()) {
88-
throw new ExecutionException(op.getException());
90+
exceptions.addAll(op.getExceptions());
8991
}
9092

9193
if (op != null && op.isCancelled()) {
92-
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
94+
exceptions.add(new RuntimeException(op.getCancelCause()));
9395
}
9496
}
9597

98+
if (!exceptions.isEmpty()) {
99+
throw new CompositeException(exceptions);
100+
}
101+
96102
return failedResult;
97103
}
98104

src/main/java/net/spy/memcached/ops/Operation.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.IOException;
2121
import java.nio.ByteBuffer;
22+
import java.util.List;
2223

2324
import net.spy.memcached.MemcachedNode;
2425
import net.spy.memcached.RedirectHandler;
@@ -44,6 +45,11 @@ public interface Operation {
4445
*/
4546
OperationException getException();
4647

48+
/**
49+
* Get the all exceptions that occurred (or empty if no exception occurred).
50+
*/
51+
List<OperationException> getExceptions();
52+
4753
/**
4854
* Get the callback for this get operation.
4955
*/

src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import java.io.IOException;
2121
import java.nio.Buffer;
2222
import java.nio.ByteBuffer;
23+
import java.util.ArrayList;
24+
import java.util.Collections;
25+
import java.util.List;
2326
import java.util.concurrent.atomic.AtomicBoolean;
2427

2528
import net.spy.memcached.MemcachedNode;
@@ -46,12 +49,12 @@ public abstract class BaseOperationImpl extends SpyObject {
4649
*/
4750
public static final OperationStatus CANCELLED =
4851
new CancelledOperationStatus();
52+
private final AtomicBoolean callbacked = new AtomicBoolean(false);
53+
private final List<OperationException> exceptions = new ArrayList<>();
4954
private OperationState state = OperationState.WRITE_QUEUED;
5055
private ByteBuffer cmd = null;
5156
private boolean cancelled = false;
52-
private final AtomicBoolean callbacked = new AtomicBoolean(false);
5357
private String cancelCause = null;
54-
private OperationException exception = null;
5558
protected OperationCallback callback = null;
5659
private volatile MemcachedNode handlingNode = null;
5760

@@ -85,11 +88,22 @@ public final boolean isCancelled() {
8588
}
8689

8790
public final boolean hasErrored() {
88-
return exception != null;
91+
return !exceptions.isEmpty();
8992
}
9093

9194
public final OperationException getException() {
92-
return exception;
95+
if (exceptions.isEmpty()) {
96+
return null;
97+
}
98+
return exceptions.get(0);
99+
}
100+
101+
public final List<OperationException> getExceptions() {
102+
return Collections.unmodifiableList(exceptions);
103+
}
104+
105+
protected final void addException(OperationException e) {
106+
exceptions.add(e);
93107
}
94108

95109
public final boolean cancel(String cause) {
@@ -244,7 +258,7 @@ protected void handleError(OperationErrorType eType, String line)
244258
switch (eType) {
245259
case GENERAL:
246260
case SERVER:
247-
exception = new OperationException(eType, line + " @ " + handlingNode.getNodeName());
261+
exceptions.add(new OperationException(eType, line + " @ " + handlingNode.getNodeName()));
248262
break;
249263
case CLIENT:
250264
if (line.contains("bad command line format")) {
@@ -255,13 +269,16 @@ protected void handleError(OperationErrorType eType, String line)
255269
String[] cmdLines = new String(bytes).split("\r\n");
256270
getLogger().error("Bad command: %s", cmdLines[0]);
257271
}
258-
exception = new OperationException(eType, line + " @ " + handlingNode.getNodeName());
272+
exceptions.add(new OperationException(eType, line + " @ " + handlingNode.getNodeName()));
259273
break;
260274
default:
261275
assert false;
262276
}
263-
transitionState(OperationState.COMPLETE);
264-
throw exception;
277+
278+
if (!isPipeOperation()) {
279+
transitionState(OperationState.COMPLETE);
280+
throw exceptions.get(0);
281+
}
265282
}
266283

267284
public void handleRead(ByteBuffer data) {

src/main/java/net/spy/memcached/protocol/ascii/CollectionBulkInsertOperationImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import net.spy.memcached.ops.CollectionBulkInsertOperation;
2727
import net.spy.memcached.ops.CollectionOperationStatus;
2828
import net.spy.memcached.ops.OperationCallback;
29+
import net.spy.memcached.ops.OperationErrorType;
30+
import net.spy.memcached.ops.OperationException;
2931
import net.spy.memcached.ops.OperationState;
3032
import net.spy.memcached.ops.OperationStatus;
3133
import net.spy.memcached.ops.OperationType;
@@ -135,6 +137,12 @@ assert getState() == OperationState.READING
135137
/* ENABLE_MIGRATION end */
136138
cb.receivedStatus((successAll) ? END : FAILED_END);
137139
transitionState(OperationState.COMPLETE);
140+
} else if (count > 0 && index + getExceptions().size() >= count) {
141+
// If END|PIPE_ERROR not received after reading all responses,
142+
// then throw exception to reconnect.
143+
addException(new OperationException(
144+
OperationErrorType.SERVER, "END|PIPE_ERROR not received"));
145+
transitionState(OperationState.COMPLETE);
138146
} else if (line.startsWith("RESPONSE ")) {
139147
getLogger().debug("Got line %s", line);
140148

src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import net.spy.memcached.ops.CollectionOperationStatus;
2828
import net.spy.memcached.ops.CollectionPipedInsertOperation;
2929
import net.spy.memcached.ops.OperationCallback;
30+
import net.spy.memcached.ops.OperationErrorType;
31+
import net.spy.memcached.ops.OperationException;
3032
import net.spy.memcached.ops.OperationState;
3133
import net.spy.memcached.ops.OperationStatus;
3234
import net.spy.memcached.ops.OperationType;
@@ -145,6 +147,12 @@ assert getState() == OperationState.READING
145147
/* ENABLE_MIGRATION end */
146148
cb.receivedStatus((successAll) ? END : FAILED_END);
147149
transitionState(OperationState.COMPLETE);
150+
} else if (count > 0 && index + getExceptions().size() >= count) {
151+
// If END|PIPE_ERROR not received after reading all responses,
152+
// then throw exception to reconnect.
153+
addException(new OperationException(
154+
OperationErrorType.SERVER, "END|PIPE_ERROR not received"));
155+
transitionState(OperationState.COMPLETE);
148156
} else if (line.startsWith("RESPONSE ")) {
149157
getLogger().debug("Got line %s", line);
150158

src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedUpdateOperationImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import net.spy.memcached.ops.CollectionOperationStatus;
3030
import net.spy.memcached.ops.CollectionPipedUpdateOperation;
3131
import net.spy.memcached.ops.OperationCallback;
32+
import net.spy.memcached.ops.OperationErrorType;
33+
import net.spy.memcached.ops.OperationException;
3234
import net.spy.memcached.ops.OperationState;
3335
import net.spy.memcached.ops.OperationStatus;
3436
import net.spy.memcached.ops.OperationType;
@@ -141,6 +143,12 @@ assert getState() == OperationState.READING : "Read ``" + line
141143
/* ENABLE_MIGRATION end */
142144
cb.receivedStatus((successAll) ? END : FAILED_END);
143145
transitionState(OperationState.COMPLETE);
146+
} else if (count > 0 && index + getExceptions().size() >= count) {
147+
// If END|PIPE_ERROR not received after reading all responses,
148+
// then throw exception to reconnect.
149+
addException(new OperationException(
150+
OperationErrorType.SERVER, "END|PIPE_ERROR not received"));
151+
transitionState(OperationState.COMPLETE);
144152
} else if (line.startsWith("RESPONSE ")) {
145153
getLogger().debug("Got line %s", line);
146154

src/main/java/net/spy/memcached/protocol/ascii/OperationImpl.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@
2222
import java.io.IOException;
2323
import java.io.UnsupportedEncodingException;
2424
import java.nio.ByteBuffer;
25+
import java.util.ArrayList;
2526

2627
import net.spy.memcached.KeyUtil;
28+
import net.spy.memcached.internal.CompositeException;
2729
import net.spy.memcached.ops.Operation;
2830
import net.spy.memcached.ops.OperationCallback;
2931
import net.spy.memcached.ops.OperationErrorType;
32+
import net.spy.memcached.ops.OperationException;
3033
import net.spy.memcached.ops.OperationState;
3134
import net.spy.memcached.ops.OperationStatus;
3235
import net.spy.memcached.ops.StatusCode;
@@ -160,12 +163,23 @@ public void readFromBuffer(ByteBuffer data) throws IOException {
160163
} else {
161164
handleLine(line);
162165
}
166+
167+
if (isPipeOperation()) {
168+
validatePipeEnd();
169+
}
163170
} else { // OperationReadType.DATA
164171
handleRead(data);
165172
}
166173
}
167174
}
168175

176+
private void validatePipeEnd() throws OperationException {
177+
if (hasErrored() && getState() == OperationState.COMPLETE) {
178+
throw new OperationException(OperationErrorType.SERVER,
179+
new CompositeException(new ArrayList<>(getExceptions())).getMessage());
180+
}
181+
}
182+
169183
public abstract void handleLine(String line);
170184

171185
protected boolean hasSwitchedOver(String line) {

src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,24 @@
1919

2020
package net.spy.memcached.protocol.ascii;
2121

22+
import java.net.InetSocketAddress;
2223
import java.nio.Buffer;
2324
import java.nio.ByteBuffer;
2425
import java.util.Arrays;
2526
import java.util.LinkedList;
2627
import java.util.List;
28+
import java.util.concurrent.LinkedBlockingQueue;
29+
30+
import net.spy.memcached.collection.CollectionPipedInsert;
31+
import net.spy.memcached.ops.CollectionPipedInsertOperation;
32+
import net.spy.memcached.ops.Operation;
33+
import net.spy.memcached.ops.OperationCallback;
34+
import net.spy.memcached.ops.OperationException;
35+
import net.spy.memcached.ops.OperationStatus;
2736

2837
import org.junit.jupiter.api.Test;
2938

39+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
3040
import static org.junit.jupiter.api.Assertions.assertEquals;
3141
import static org.junit.jupiter.api.Assertions.assertNull;
3242
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -99,6 +109,58 @@ void testPartialLine() throws Exception {
99109
assertEquals("this is a test", op.getCurrentLine());
100110
}
101111

112+
@Test
113+
void throwExceptionIfEndOrPipeErrorNotReceived() throws Exception {
114+
String key = "testPipeLine";
115+
CollectionPipedInsert.ListPipedInsert<String> insert =
116+
new CollectionPipedInsert.ListPipedInsert<>(key, 0,
117+
Arrays.asList("a", "b"), null, null);
118+
OperationCallback cb = new CollectionPipedInsertOperation.Callback() {
119+
@Override
120+
public void receivedStatus(OperationStatus status) {
121+
}
122+
123+
@Override
124+
public void complete() {
125+
}
126+
127+
@Override
128+
public void gotStatus(Integer index, OperationStatus status) {
129+
}
130+
};
131+
CollectionPipedInsertOperationImpl op =
132+
new CollectionPipedInsertOperationImpl("test", insert, cb);
133+
LinkedBlockingQueue<Operation> queue = new LinkedBlockingQueue<>();
134+
op.setHandlingNode(new AsciiMemcachedNodeImpl("testnode", new InetSocketAddress(11211),
135+
60, queue, queue, queue, 0L));
136+
137+
ByteBuffer b = ByteBuffer.allocate(40);
138+
String line1 = "RESPONSE 2\r\n";
139+
op.writeComplete();
140+
b.put(line1.getBytes());
141+
b.flip();
142+
assertDoesNotThrow(() -> op.readFromBuffer(b));
143+
b.clear();
144+
145+
String line2 = "SERVER_ERROR out of memory\r\n";
146+
b.put(line2.getBytes());
147+
b.flip();
148+
assertDoesNotThrow(() -> op.readFromBuffer(b));
149+
b.clear();
150+
151+
String line3 = "CLIENT_ERROR too large value\r\n";
152+
b.put(line3.getBytes());
153+
b.flip();
154+
assertDoesNotThrow(() -> op.readFromBuffer(b));
155+
b.clear();
156+
157+
String line4 = "RESPONSE 2\r\n";
158+
b.put(line4.getBytes());
159+
b.flip();
160+
op.readFromBuffer(b);
161+
// assertThrows(OperationException.class, () -> op.readFromBuffer(b));
162+
}
163+
102164
private static class SimpleOp extends OperationImpl {
103165

104166
private final LinkedList<String> lines = new LinkedList<>();

0 commit comments

Comments
 (0)