Skip to content

Commit e7a888a

Browse files
committed
INTERNAL: read while END/PIPE_ERROR received in the pipe operation
1 parent d84094a commit e7a888a

File tree

5 files changed

+166
-71
lines changed

5 files changed

+166
-71
lines changed

src/main/java/net/spy/memcached/internal/BulkOperationFuture.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.ArrayList;
44
import java.util.Collection;
55
import java.util.HashMap;
6+
import java.util.List;
67
import java.util.Map;
78
import java.util.concurrent.CountDownLatch;
89
import java.util.concurrent.ExecutionException;
@@ -90,16 +91,21 @@ public Map<String, T> get(long duration,
9091
MemcachedConnection.opsSucceeded(ops);
9192
}
9293

94+
List<Exception> exceptions = new ArrayList<>();
9395
for (Operation op : ops) {
9496
if (op != null && op.hasErrored()) {
95-
throw new ExecutionException(op.getException());
97+
exceptions.add(op.getException());
9698
}
9799

98100
if (op != null && op.isCancelled()) {
99-
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
101+
exceptions.add(new RuntimeException(op.getCancelCause()));
100102
}
101103
}
102104

105+
if (!exceptions.isEmpty()) {
106+
throw new CompositeException(exceptions);
107+
}
108+
103109
return failedResult;
104110
}
105111

src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import net.spy.memcached.ops.OperationType;
3737
import net.spy.memcached.ops.StatusCode;
3838

39+
import static net.spy.memcached.ops.OperationErrorType.CLIENT;
40+
3941
/**
4042
* Base class for protocol-specific operation implementations.
4143
*/
@@ -51,7 +53,7 @@ public abstract class BaseOperationImpl extends SpyObject {
5153
private boolean cancelled = false;
5254
private final AtomicBoolean callbacked = new AtomicBoolean(false);
5355
private String cancelCause = null;
54-
private OperationException exception = null;
56+
protected OperationException exception = null;
5557
private OperationCallback callback = null;
5658
private volatile MemcachedNode handlingNode = null;
5759

@@ -239,31 +241,25 @@ public final void writeComplete() {
239241
public abstract void readFromBuffer(ByteBuffer data) throws IOException;
240242

241243
protected void handleError(OperationErrorType eType, String line)
242-
throws IOException {
244+
throws OperationException {
243245
getLogger().error("Error: %s by %s", line, this);
244-
switch (eType) {
245-
case GENERAL:
246-
case SERVER:
247-
exception = new OperationException(eType, line + " @ " + handlingNode.getNodeName());
248-
break;
249-
case CLIENT:
250-
if (line.contains("bad command line format")) {
251-
initialize();
252-
byte[] bytes = new byte[cmd.remaining()];
253-
cmd.get(bytes);
254-
255-
String[] cmdLines = new String(bytes).split("\r\n");
256-
getLogger().error("Bad command: %s", cmdLines[0]);
257-
}
258-
exception = new OperationException(eType, line + " @ " + handlingNode.getNodeName());
259-
break;
260-
default:
261-
assert false;
262-
}
246+
exception = createException(eType, line);
263247
transitionState(OperationState.COMPLETE);
264248
throw exception;
265249
}
266250

251+
protected final OperationException createException(OperationErrorType eType, String line) {
252+
if (eType == CLIENT && line.contains("bad command line format")) {
253+
initialize();
254+
byte[] bytes = new byte[cmd.remaining()];
255+
cmd.get(bytes);
256+
257+
String[] cmdLines = new String(bytes).split("\r\n");
258+
getLogger().error("Bad command: %s", cmdLines[0]);
259+
}
260+
return new OperationException(eType, line + " @ " + handlingNode.getNodeName());
261+
}
262+
267263
public void handleRead(ByteBuffer data) {
268264
assert false;
269265
}

src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java

Lines changed: 81 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import net.spy.memcached.ops.CollectionOperationStatus;
2828
import net.spy.memcached.ops.CollectionPipedInsertOperation;
2929
import net.spy.memcached.ops.OperationCallback;
30+
import net.spy.memcached.ops.OperationErrorType;
3031
import net.spy.memcached.ops.OperationState;
3132
import net.spy.memcached.ops.OperationStatus;
3233
import net.spy.memcached.ops.OperationType;
@@ -89,33 +90,29 @@ public CollectionPipedInsertOperationImpl(String key,
8990
}
9091
setOperationType(OperationType.WRITE);
9192
}
92-
9393
@Override
9494
public void handleLine(String line) {
9595
assert getState() == OperationState.READING
9696
: "Read ``" + line + "'' when in " + getState() + " state";
9797

98-
/* ENABLE_REPLICATION if */
99-
if (hasSwitchedOver(line)) {
100-
this.insert.setNextOpIndex(index);
101-
prepareSwitchover(line);
102-
return;
103-
}
104-
/* ENABLE_REPLICATION end */
105-
/* ENABLE_MIGRATION if */
106-
if (hasNotMyKey(line)) {
107-
// Only one NOT_MY_KEY is provided in response of single key piped operation when redirection.
108-
addRedirectSingleKeyOperation(line, key);
109-
if (insert.isNotPiped()) {
98+
if (insert.isNotPiped()) {
99+
// insert object contains only one command.
100+
101+
/* ENABLE_REPLICATION if */
102+
if (hasSwitchedOver(line)) {
103+
prepareSwitchover(line);
104+
return;
105+
}
106+
/* ENABLE_REPLICATION end */
107+
108+
/* ENABLE_MIGRATION if */
109+
if (hasNotMyKey(line)) {
110+
addRedirectSingleKeyOperation(line, key);
110111
transitionState(OperationState.REDIRECT);
111-
} else {
112-
insert.setNextOpIndex(index);
112+
return;
113113
}
114-
return;
115-
}
116-
/* ENABLE_MIGRATION end */
114+
/* ENABLE_MIGRATION end */
117115

118-
if (insert.isNotPiped()) {
119116
OperationStatus status = matchStatus(line, STORED, CREATED_STORED,
120117
NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
121118
TYPE_MISMATCH, BKEY_MISMATCH);
@@ -126,26 +123,8 @@ assert getState() == OperationState.READING
126123
cb.receivedStatus(FAILED_END);
127124
}
128125
transitionState(OperationState.COMPLETE);
129-
return;
130-
}
131-
132-
/*
133-
RESPONSE <count>\r\n
134-
<status of the 1st pipelined command>\r\n
135-
[ ... ]
136-
<status of the last pipelined command>\r\n
137-
END|PIPE_ERROR <error_string>\r\n
138-
*/
139-
if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) {
140-
/* ENABLE_MIGRATION if */
141-
if (needRedirect()) {
142-
transitionState(OperationState.REDIRECT);
143-
return;
144-
}
145-
/* ENABLE_MIGRATION end */
146-
cb.receivedStatus((successAll) ? END : FAILED_END);
147-
transitionState(OperationState.COMPLETE);
148126
} else if (line.startsWith("RESPONSE ")) {
127+
// insert object contains multiple commands
149128
getLogger().debug("Got line %s", line);
150129

151130
// TODO server should be fixed
@@ -155,16 +134,72 @@ assert getState() == OperationState.READING
155134
String[] stuff = line.split(" ");
156135
assert "RESPONSE".equals(stuff[0]);
157136
count = Integer.parseInt(stuff[1]);
158-
} else {
159-
OperationStatus status = matchStatus(line, STORED, CREATED_STORED,
160-
NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
161-
TYPE_MISMATCH, BKEY_MISMATCH);
137+
setReadType(OperationReadType.DATA);
138+
}
139+
}
162140

163-
if (!status.isSuccess()) {
164-
cb.gotStatus(index, status);
165-
successAll = false;
141+
@Override
142+
public void handleRead(ByteBuffer bb) {
143+
while (bb.remaining() > 0) {
144+
try {
145+
String line = getLineFromBuffer(bb);
146+
if (line == null) {
147+
break;
148+
}
149+
OperationErrorType eType = classifyError(line);
150+
if (eType != null) {
151+
this.exception = createException(eType, line);
152+
continue;
153+
}
154+
155+
/* ENABLE_REPLICATION if */
156+
if (hasSwitchedOver(line)) {
157+
this.insert.setNextOpIndex(index);
158+
prepareSwitchover(line);
159+
return;
160+
}
161+
/* ENABLE_REPLICATION end */
162+
163+
/* ENABLE_MIGRATION if */
164+
if (hasNotMyKey(line)) {
165+
// Only one NOT_MY_KEY is provided in
166+
// response of single key piped operation when redirection.
167+
addRedirectSingleKeyOperation(line, key);
168+
if (insert.isNotPiped()) {
169+
transitionState(OperationState.REDIRECT);
170+
} else {
171+
insert.setNextOpIndex(index);
172+
}
173+
return;
174+
}
175+
/* ENABLE_MIGRATION end */
176+
177+
if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) {
178+
/* ENABLE_MIGRATION if */
179+
if (needRedirect()) {
180+
transitionState(OperationState.REDIRECT);
181+
return;
182+
}
183+
/* ENABLE_MIGRATION end */
184+
cb.receivedStatus((successAll) ? END : FAILED_END);
185+
transitionState(OperationState.COMPLETE);
186+
return;
187+
} else {
188+
OperationStatus status = matchStatus(line, STORED, CREATED_STORED,
189+
NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
190+
TYPE_MISMATCH, BKEY_MISMATCH);
191+
192+
if (!status.isSuccess()) {
193+
cb.gotStatus(index, status);
194+
successAll = false;
195+
}
196+
index++;
197+
}
198+
} catch (Exception e) {
199+
getLogger().error("Failed to parse line: %s", e.getMessage());
200+
transitionState(OperationState.COMPLETE);
201+
return;
166202
}
167-
index++;
168203
}
169204
}
170205

src/main/java/net/spy/memcached/protocol/ascii/OperationImpl.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ protected final void setArguments(ByteBuffer bb, Object... args) {
104104
bb.put(CRLF);
105105
}
106106

107-
private String getLineFromBuffer(ByteBuffer data) throws UnsupportedEncodingException {
107+
protected final String getLineFromBuffer(ByteBuffer data) throws UnsupportedEncodingException {
108108
boolean lineFound = false;
109109
while (data.remaining() > 0) {
110110
byte b = data.get();
@@ -128,7 +128,7 @@ private String getLineFromBuffer(ByteBuffer data) throws UnsupportedEncodingExce
128128
return null;
129129
}
130130

131-
private OperationErrorType classifyError(String line) {
131+
protected final OperationErrorType classifyError(String line) {
132132
OperationErrorType rv = null;
133133
if (line.startsWith("ERROR")) {
134134
rv = OperationErrorType.GENERAL;
@@ -163,6 +163,9 @@ public void readFromBuffer(ByteBuffer data) throws IOException {
163163
} else { // OperationReadType.DATA
164164
handleRead(data);
165165
}
166+
if (hasErrored() && isPipeOperation() && getState() == OperationState.COMPLETE) {
167+
throw getException();
168+
}
166169
}
167170
}
168171

src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,24 @@
1919

2020
package net.spy.memcached.protocol.ascii;
2121

22+
import java.net.InetSocketAddress;
2223
import java.nio.Buffer;
2324
import java.nio.ByteBuffer;
2425
import java.util.Arrays;
2526
import java.util.LinkedList;
2627
import java.util.List;
28+
import java.util.concurrent.LinkedBlockingQueue;
29+
30+
import net.spy.memcached.collection.CollectionPipedInsert;
31+
import net.spy.memcached.ops.CollectionPipedInsertOperation;
32+
import net.spy.memcached.ops.Operation;
33+
import net.spy.memcached.ops.OperationCallback;
34+
import net.spy.memcached.ops.OperationException;
35+
import net.spy.memcached.ops.OperationStatus;
2736

2837
import org.junit.jupiter.api.Test;
2938

39+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
3040
import static org.junit.jupiter.api.Assertions.assertEquals;
3141
import static org.junit.jupiter.api.Assertions.assertNull;
3242
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -99,6 +109,51 @@ void testPartialLine() throws Exception {
99109
assertEquals("this is a test", op.getCurrentLine());
100110
}
101111

112+
@Test
113+
void throwExceptionAfterReadingEndOrPipeError() throws Exception {
114+
String key = "testPipeLine";
115+
CollectionPipedInsert.ListPipedInsert<String> insert =
116+
new CollectionPipedInsert.ListPipedInsert<>(key, 0,
117+
Arrays.asList("a", "b"), null, null);
118+
OperationCallback cb = new CollectionPipedInsertOperation.Callback() {
119+
@Override
120+
public void receivedStatus(OperationStatus status) {
121+
}
122+
123+
@Override
124+
public void complete() {
125+
}
126+
127+
@Override
128+
public void gotStatus(Integer index, OperationStatus status) {
129+
}
130+
};
131+
CollectionPipedInsertOperationImpl op =
132+
new CollectionPipedInsertOperationImpl("test", insert, cb);
133+
LinkedBlockingQueue<Operation> queue = new LinkedBlockingQueue<>();
134+
op.setHandlingNode(new AsciiMemcachedNodeImpl("testnode", new InetSocketAddress(11211),
135+
60, queue, queue, queue, 0L));
136+
137+
ByteBuffer b = ByteBuffer.allocate(40);
138+
String line1 = "RESPONSE 2\r\n";
139+
op.writeComplete();
140+
b.put(line1.getBytes());
141+
b.flip();
142+
assertDoesNotThrow(() -> op.readFromBuffer(b));
143+
b.clear();
144+
145+
String line2 = "SERVER_ERROR out of memory\r\n";
146+
b.put(line2.getBytes());
147+
b.flip();
148+
assertDoesNotThrow(() -> op.readFromBuffer(b));
149+
b.clear();
150+
151+
String line4 = "PIPE_ERROR failed\r\n";
152+
b.put(line4.getBytes());
153+
b.flip();
154+
assertThrows(OperationException.class, () -> op.readFromBuffer(b));
155+
}
156+
102157
private static class SimpleOp extends OperationImpl {
103158

104159
private final LinkedList<String> lines = new LinkedList<>();

0 commit comments

Comments
 (0)