Skip to content

Commit a711c34

Browse files
committed
INTERNAL: make piped insert operations process synchronously
1 parent 024bc9c commit a711c34

File tree

11 files changed

+369
-146
lines changed

11 files changed

+369
-146
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

Lines changed: 275 additions & 121 deletions
Large diffs are not rendered by default.

src/main/java/net/spy/memcached/collection/CollectionResponse.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public enum CollectionResponse {
4444

4545
UNDEFINED,
4646
CANCELED,
47+
STOPPED,
4748

4849
INTERRUPT_EXCEPTION,
4950
EXECUTION_EXCEPTION,

src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.ArrayList;
44
import java.util.Collection;
5+
import java.util.List;
56
import java.util.Map;
67
import java.util.concurrent.ConcurrentHashMap;
78
import java.util.concurrent.CountDownLatch;
@@ -17,10 +18,9 @@
1718

1819
public class PipedCollectionFuture<K, V>
1920
extends CollectionFuture<Map<K, V>> {
20-
private final Collection<Operation> ops = new ArrayList<>();
21+
private final List<Operation> ops = new ArrayList<>();
2122
private final AtomicReference<CollectionOperationStatus> operationStatus
2223
= new AtomicReference<>(null);
23-
2424
private final Map<K, V> failedResult =
2525
new ConcurrentHashMap<>();
2626

@@ -49,12 +49,7 @@ public boolean isCancelled() {
4949

5050
@Override
5151
public boolean isDone() {
52-
for (Operation op : ops) {
53-
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
54-
return false;
55-
}
56-
}
57-
return true;
52+
return latch.getCount() == 0;
5853
}
5954

6055
@Override
@@ -74,7 +69,6 @@ public Map<K, V> get(long duration, TimeUnit unit)
7469
if (!timedOutOps.isEmpty()) {
7570
// set timeout only once for piped ops requested to single node.
7671
MemcachedConnection.opTimedOut(timedOutOps.iterator().next());
77-
7872
long elapsed = System.currentTimeMillis() - beforeAwait;
7973
throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps);
8074
}
@@ -116,7 +110,7 @@ public void addEachResult(K index, V status) {
116110
failedResult.put(index, status);
117111
}
118112

119-
public void addOperation(Operation op) {
120-
ops.add(op);
113+
public void addOperations(Collection<Operation> ops) {
114+
this.ops.addAll(ops);
121115
}
122116
}

src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,9 @@ assert getState() == OperationState.READING
125125
successAll = false;
126126
}
127127
cb.gotStatus(index, status);
128+
index++;
128129

129-
cb.receivedStatus((successAll) ? END : FAILED_END);
130+
cb.receivedStatus(getEndStatus());
130131
transitionState(OperationState.COMPLETE);
131132
return;
132133
}
@@ -145,7 +146,7 @@ assert getState() == OperationState.READING
145146
return;
146147
}
147148
/* ENABLE_MIGRATION end */
148-
cb.receivedStatus((successAll) ? END : FAILED_END);
149+
cb.receivedStatus(getEndStatus());
149150
transitionState(OperationState.COMPLETE);
150151
} else if (line.startsWith("RESPONSE ")) {
151152
getLogger().debug("Got line %s", line);
@@ -168,6 +169,11 @@ assert getState() == OperationState.READING
168169
}
169170
}
170171

172+
private OperationStatus getEndStatus() {
173+
return (exception == null && index == collectionPipe.getItemCount() && successAll)
174+
? END : FAILED_END;
175+
}
176+
171177
@Override
172178
protected void handleError(OperationErrorType eType, String line) throws IOException {
173179
if (!readUntilLastLine) {

src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import static org.junit.jupiter.api.Assertions.assertSame;
4343
import static org.junit.jupiter.api.Assertions.assertTrue;
4444
import static org.junit.jupiter.api.Assertions.assertThrows;
45+
import static org.junit.jupiter.api.Assertions.fail;
4546

4647
/**
4748
* Test the basic operation buffer handling stuff.
@@ -154,6 +155,45 @@ public void gotStatus(Integer index, OperationStatus status) {
154155
assertThrows(OperationException.class, () -> op.readFromBuffer(b));
155156
}
156157

158+
159+
@Test
160+
void doNotCallReceivedStatusWhenErrorBeforeResponse() throws Exception {
161+
String key = "testPipeLine";
162+
CollectionPipedInsert.ListPipedInsert<String> insert =
163+
new CollectionPipedInsert.ListPipedInsert<>(key, 0,
164+
Arrays.asList("a", "b"), null, null);
165+
166+
OperationCallback cb = new PipedOperationCallback() {
167+
@Override
168+
public void receivedStatus(OperationStatus status) {
169+
fail("should not be called");
170+
}
171+
172+
@Override
173+
public void complete() {
174+
// called
175+
}
176+
177+
@Override
178+
public void gotStatus(Integer index, OperationStatus status) {
179+
fail("should not be called");
180+
}
181+
};
182+
CollectionPipedInsertOperationImpl op =
183+
new CollectionPipedInsertOperationImpl("test", insert, cb);
184+
LinkedBlockingQueue<Operation> queue = new LinkedBlockingQueue<>();
185+
op.setHandlingNode(new AsciiMemcachedNodeImpl("testnode", new InetSocketAddress(11211),
186+
60, queue, queue, queue, 0L));
187+
188+
ByteBuffer b = ByteBuffer.allocate(40);
189+
// pipe operation but error came before 'RESPONSE <count>'
190+
String line1 = "CLIENT_ERROR blah\r\n";
191+
op.writeComplete();
192+
b.put(line1.getBytes());
193+
b.flip();
194+
assertThrows(OperationException.class, () -> op.readFromBuffer(b));
195+
}
196+
157197
private static class SimpleOp extends OperationImpl {
158198

159199
private final LinkedList<String> lines = new LinkedList<>();

src/test/manual/net/spy/memcached/bulkoperation/BopInsertBulkMultipleTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.TimeoutException;
2525

26+
import net.spy.memcached.ArcusClient;
2627
import net.spy.memcached.collection.BaseIntegrationTest;
2728
import net.spy.memcached.collection.CollectionAttributes;
2829
import net.spy.memcached.collection.Element;
@@ -122,7 +123,7 @@ void testErrorCount() {
122123

123124
Map<Integer, CollectionOperationStatus> map = future.get(2000L,
124125
TimeUnit.MILLISECONDS);
125-
assertEquals(bkeySize, map.size());
126+
assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size());
126127

127128
} catch (Exception e) {
128129
e.printStackTrace();

src/test/manual/net/spy/memcached/bulkoperation/BopPipeUpdateTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Map;
2222
import java.util.concurrent.TimeUnit;
2323

24+
import net.spy.memcached.ArcusClient;
2425
import net.spy.memcached.collection.BaseIntegrationTest;
2526
import net.spy.memcached.collection.CollectionAttributes;
2627
import net.spy.memcached.collection.CollectionResponse;
@@ -233,13 +234,13 @@ void testBopPipeUpdateNotFoundElement() {
233234

234235
// System.out.println(System.currentTimeMillis() - start + "ms");
235236

236-
assertEquals(600, map2.size());
237+
assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map2.size());
237238
assertEquals(CollectionResponse.NOT_FOUND_ELEMENT, map2.get(0)
238239
.getResponse());
239240

240241
for (long i = 600; i < elementCount; i++) {
241242
assertEquals(
242-
"updated" + i,
243+
"value" + i,
243244
mc.asyncBopGet(KEY, i, ElementFlagFilter.DO_NOT_FILTER,
244245
false, false).get(1000L, TimeUnit.MILLISECONDS)
245246
.get(i).getValue());
@@ -274,7 +275,7 @@ void testBopPipeUpdateNotFoundKey() {
274275

275276
// System.out.println(System.currentTimeMillis() - start + "ms");
276277

277-
assertEquals(elementCount, map2.size());
278+
assertEquals(501, map2.size());
278279
assertEquals(CollectionResponse.NOT_FOUND, map2.get(0)
279280
.getResponse());
280281

src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.TimeoutException;
2525

26+
import net.spy.memcached.ArcusClient;
2627
import net.spy.memcached.collection.BaseIntegrationTest;
2728
import net.spy.memcached.collection.CollectionAttributes;
29+
import net.spy.memcached.collection.CollectionResponse;
2830
import net.spy.memcached.ops.CollectionOperationStatus;
2931

3032
import org.junit.jupiter.api.AfterEach;
@@ -111,9 +113,7 @@ void testInsertAndGet() {
111113
void testErrorCount() {
112114
int valueCount = 1200;
113115
Object[] valueList = new Object[valueCount];
114-
for (int i = 0; i < valueList.length; i++) {
115-
valueList[i] = "MyValue";
116-
}
116+
Arrays.fill(valueList, "MyValue");
117117

118118
try {
119119
// SET
@@ -123,8 +123,11 @@ void testErrorCount() {
123123

124124
Map<Integer, CollectionOperationStatus> map = future.get(1000L,
125125
TimeUnit.MILLISECONDS);
126-
assertEquals(valueCount, map.size());
127-
126+
assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size());
127+
assertEquals(map.get(ArcusClient.MAX_PIPED_ITEM_COUNT - 1).getResponse(),
128+
CollectionResponse.NOT_FOUND);
129+
assertEquals(map.get(ArcusClient.MAX_PIPED_ITEM_COUNT).getResponse(),
130+
CollectionResponse.STOPPED);
128131
} catch (Exception e) {
129132
e.printStackTrace();
130133
fail();

src/test/manual/net/spy/memcached/bulkoperation/MopInsertBulkMultipleTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.TimeoutException;
2525

26+
import net.spy.memcached.ArcusClient;
2627
import net.spy.memcached.collection.BaseIntegrationTest;
2728
import net.spy.memcached.collection.CollectionAttributes;
2829
import net.spy.memcached.ops.CollectionOperationStatus;
@@ -116,7 +117,7 @@ void testErrorCount() {
116117

117118
Map<Integer, CollectionOperationStatus> map = future.get(2000L,
118119
TimeUnit.MILLISECONDS);
119-
assertEquals(elementSize, map.size());
120+
assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size());
120121

121122
} catch (Exception e) {
122123
e.printStackTrace();

src/test/manual/net/spy/memcached/bulkoperation/PipeInsertTest.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.TreeMap;
2323
import java.util.concurrent.TimeUnit;
2424

25+
import net.spy.memcached.ArcusClient;
2526
import net.spy.memcached.collection.BaseIntegrationTest;
2627
import net.spy.memcached.collection.CollectionAttributes;
2728
import net.spy.memcached.collection.Element;
@@ -34,6 +35,7 @@
3435
import org.junit.jupiter.api.Test;
3536

3637
import static org.junit.jupiter.api.Assertions.assertEquals;
38+
import static org.junit.jupiter.api.Assertions.assertFalse;
3739
import static org.junit.jupiter.api.Assertions.assertNull;
3840
import static org.junit.jupiter.api.Assertions.assertTrue;
3941
import static org.junit.jupiter.api.Assertions.fail;
@@ -253,7 +255,7 @@ void testMopPipeInsert() {
253255
Map<Integer, CollectionOperationStatus> map = future.get(5000L,
254256
TimeUnit.MILLISECONDS);
255257

256-
assertEquals(1000, map.size());
258+
assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size());
257259

258260
Map<String, Object> rmap = mc.asyncMopGet(KEY, false, false)
259261
.get();
@@ -265,4 +267,24 @@ void testMopPipeInsert() {
265267
}
266268
}
267269

270+
@Test
271+
void cancel() {
272+
int elementCount = 600;
273+
274+
Map<String, Object> elements = new TreeMap<>();
275+
276+
for (int i = 0; i < elementCount; i++) {
277+
elements.put(String.valueOf(i), "value" + i);
278+
}
279+
CollectionAttributes attr = new CollectionAttributes();
280+
281+
CollectionFuture<Map<Integer, CollectionOperationStatus>> future = mc
282+
.asyncMopPipedInsertBulk(KEY, elements, attr);
283+
284+
future.cancel(false);
285+
286+
assertTrue(future.isCancelled());
287+
assertFalse(future.cancel(false));
288+
}
289+
268290
}

0 commit comments

Comments
 (0)