Skip to content

Commit 9eb5496

Browse files
committed
INTERNAL: make lop piped operations process synchronously
1 parent e99869f commit 9eb5496

File tree

3 files changed

+148
-66
lines changed

3 files changed

+148
-66
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.ArrayList;
2727
import java.util.Arrays;
2828
import java.util.Collection;
29+
import java.util.Collections;
2930
import java.util.Enumeration;
3031
import java.util.HashMap;
3132
import java.util.HashSet;
@@ -1863,11 +1864,16 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncLopPip
18631864
} else {
18641865
PartitionedList<T> list = new PartitionedList<>(valueList,
18651866
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
1867+
18661868
for (List<T> elementList : list) {
18671869
insertList.add(new ListPipedInsert<>(key, index, elementList, attributesForCreate, tc));
1870+
if (index >= 0) {
1871+
index += elementList.size();
1872+
}
18681873
}
18691874
}
1870-
return asyncCollectionPipedInsert(key, insertList);
1875+
1876+
return syncCollectionPipedInsert(key, Collections.unmodifiableList(insertList));
18711877
}
18721878

18731879
@Override
@@ -3170,6 +3176,80 @@ public void gotStatus(Integer index, OperationStatus status) {
31703176
return rv;
31713177
}
31723178

3179+
/**
3180+
* Pipe insert method for collection items.
3181+
*
3182+
* @param key arcus cache key
3183+
* @param insertList must not be empty.
3184+
* @return future holding the map of element index and the reason why insert operation failed
3185+
*/
3186+
private <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> syncCollectionPipedInsert(
3187+
final String key, final List<CollectionPipedInsert<T>> insertList) {
3188+
final CountDownLatch latch = new CountDownLatch(1);
3189+
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
3190+
new PipedCollectionFuture<>(latch, operationTimeout);
3191+
3192+
for (int i = 0; i < insertList.size(); i++) {
3193+
final CollectionPipedInsert<T> insert = insertList.get(i);
3194+
final int idx = i;
3195+
Operation op = opFact.collectionPipedInsert(key, insert,
3196+
new CollectionPipedInsertOperation.Callback() {
3197+
// each result status
3198+
public void receivedStatus(OperationStatus status) {
3199+
CollectionOperationStatus cstatus;
3200+
3201+
if (status instanceof CollectionOperationStatus) {
3202+
cstatus = (CollectionOperationStatus) status;
3203+
} else {
3204+
getLogger().warn("Unhandled state: " + status);
3205+
cstatus = new CollectionOperationStatus(status);
3206+
}
3207+
rv.setOperationStatus(cstatus);
3208+
}
3209+
3210+
// complete
3211+
public void complete() {
3212+
if (idx == insertList.size() - 1
3213+
|| rv.hasErrored()
3214+
|| rv.getOperationStatus().getResponse() == CollectionResponse.CANCELED) {
3215+
// countdown if this is last op
3216+
latch.countDown();
3217+
} else if (!rv.getOperationStatus().isSuccess()) {
3218+
// if error or cancel occurred by this operation,
3219+
// do not add all remaining operations and mark as cancelled
3220+
for (int chunkIdx = idx + 1; chunkIdx < insertList.size(); chunkIdx++) {
3221+
for (int itemIdx = 0; itemIdx < insertList.get(chunkIdx).getItemCount(); itemIdx++) {
3222+
rv.addEachResult(itemIdx + (chunkIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3223+
new CollectionOperationStatus(new CollectionOperationStatus(
3224+
false, "CANCELED", CollectionResponse.CANCELED)));
3225+
}
3226+
}
3227+
latch.countDown();
3228+
} else {
3229+
// add next operation if this is not last op
3230+
rv.incrCurrentOpIdx();
3231+
Operation nextOp = rv.getOp(idx + 1);
3232+
addOp(key, nextOp);
3233+
}
3234+
}
3235+
3236+
// got status
3237+
public void gotStatus(Integer index, OperationStatus status) {
3238+
if (status instanceof CollectionOperationStatus) {
3239+
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3240+
(CollectionOperationStatus) status);
3241+
} else {
3242+
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3243+
new CollectionOperationStatus(status));
3244+
}
3245+
}
3246+
});
3247+
rv.addOperation(op);
3248+
}
3249+
addOp(key, rv.getOp(0));
3250+
return rv;
3251+
}
3252+
31733253
@Override
31743254
public Future<Map<String, CollectionOperationStatus>> asyncBopInsertBulk(
31753255
List<String> keyList, long bkey, byte[] eFlag, Object value,
Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package net.spy.memcached.internal;
22

3-
import java.util.Collection;
4-
import java.util.HashSet;
3+
import java.util.ArrayList;
4+
import java.util.List;
55
import java.util.Map;
66
import java.util.concurrent.ConcurrentHashMap;
7-
import java.util.concurrent.ConcurrentLinkedQueue;
87
import java.util.concurrent.CountDownLatch;
98
import java.util.concurrent.ExecutionException;
109
import java.util.concurrent.TimeUnit;
1110
import java.util.concurrent.TimeoutException;
11+
import java.util.concurrent.atomic.AtomicInteger;
1212
import java.util.concurrent.atomic.AtomicReference;
1313

1414
import net.spy.memcached.MemcachedConnection;
@@ -18,9 +18,10 @@
1818

1919
public class PipedCollectionFuture<K, V>
2020
extends CollectionFuture<Map<K, V>> {
21-
private final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<>();
21+
private final List<Operation> ops = new ArrayList<>();
2222
private final AtomicReference<CollectionOperationStatus> operationStatus
2323
= new AtomicReference<>(null);
24+
private final AtomicInteger currentOpIdx = new AtomicInteger(0);
2425

2526
private final Map<K, V> failedResult =
2627
new ConcurrentHashMap<>();
@@ -31,67 +32,58 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) {
3132

3233
@Override
3334
public boolean cancel(boolean ign) {
34-
boolean rv = false;
35-
for (Operation op : ops) {
36-
rv |= op.cancel("by application.");
37-
}
38-
return rv;
35+
return ops.get(currentOpIdx.get()).cancel("by application.");
3936
}
4037

38+
/**
39+
* if previous op is cancelled, then next ops are not added to the opQueue.
40+
* So we only need to check current op.
41+
*
42+
* @return true if operation is cancelled.
43+
*/
4144
@Override
4245
public boolean isCancelled() {
43-
for (Operation op : ops) {
44-
if (op.isCancelled()) {
45-
return true;
46-
}
47-
}
48-
return false;
46+
return ops.get(currentOpIdx.get()).isCancelled();
47+
}
48+
49+
/**
50+
* if previous op threw exception, then next ops are not added to the opQueue.
51+
* So we only need to check current op.
52+
*
53+
* @return true if operation has errored by exception.
54+
*/
55+
public boolean hasErrored() {
56+
return ops.get(currentOpIdx.get()).hasErrored();
4957
}
5058

5159
@Override
5260
public boolean isDone() {
53-
for (Operation op : ops) {
54-
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
55-
return false;
56-
}
57-
}
58-
return true;
61+
return latch.getCount() == 0;
5962
}
6063

6164
@Override
6265
public Map<K, V> get(long duration, TimeUnit unit)
6366
throws InterruptedException, TimeoutException, ExecutionException {
6467

6568
long beforeAwait = System.currentTimeMillis();
66-
if (!latch.await(duration, unit)) {
67-
Collection<Operation> timedOutOps = new HashSet<>();
68-
for (Operation op : ops) {
69-
if (op.getState() != OperationState.COMPLETE) {
70-
timedOutOps.add(op);
71-
} else {
72-
MemcachedConnection.opSucceeded(op);
73-
}
74-
}
75-
if (!timedOutOps.isEmpty()) {
76-
// set timeout only once for piped ops requested to single node.
77-
MemcachedConnection.opTimedOut(timedOutOps.iterator().next());
78-
79-
long elapsed = System.currentTimeMillis() - beforeAwait;
80-
throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps);
81-
}
69+
Operation lastOp = ops.get(ops.size() - 1);
70+
Operation currentOp = ops.get(currentOpIdx.get());
71+
if (!latch.await(duration, unit) && lastOp.getState() != OperationState.COMPLETE) {
72+
MemcachedConnection.opTimedOut(lastOp);
73+
74+
long elapsed = System.currentTimeMillis() - beforeAwait;
75+
throw new CheckedOperationTimeoutException(duration, unit, elapsed, currentOp);
8276
} else {
8377
// continuous timeout counter will be reset only once in pipe
84-
MemcachedConnection.opSucceeded(ops.iterator().next());
78+
MemcachedConnection.opSucceeded(lastOp);
8579
}
8680

87-
for (Operation op : ops) {
88-
if (op != null && op.hasErrored()) {
89-
throw new ExecutionException(op.getException());
90-
}
81+
if (currentOp != null && currentOp.hasErrored()) {
82+
throw new ExecutionException(currentOp.getException());
83+
}
9184

92-
if (op != null && op.isCancelled()) {
93-
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
94-
}
85+
if (currentOp != null && currentOp.isCancelled()) {
86+
throw new ExecutionException(new RuntimeException(currentOp.getCancelCause()));
9587
}
9688

9789
return failedResult;
@@ -120,4 +112,12 @@ public void addEachResult(K index, V status) {
120112
public void addOperation(Operation op) {
121113
ops.add(op);
122114
}
115+
116+
public Operation getOp(int index) {
117+
return this.ops.get(index);
118+
}
119+
120+
public void incrCurrentOpIdx() {
121+
this.currentOpIdx.incrementAndGet();
122+
}
123123
}

src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java renamed to src/test/manual/net/spy/memcached/bulkoperation/LopPipedInsertBulkMultipleValueTest.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package net.spy.memcached.bulkoperation;
1818

19+
import java.util.ArrayList;
1920
import java.util.Arrays;
2021
import java.util.List;
2122
import java.util.Map;
@@ -25,13 +26,14 @@
2526

2627
import net.spy.memcached.collection.BaseIntegrationTest;
2728
import net.spy.memcached.collection.CollectionAttributes;
29+
import net.spy.memcached.collection.CollectionResponse;
2830
import net.spy.memcached.ops.CollectionOperationStatus;
2931

3032
import org.junit.Assert;
3133

32-
public class LopInsertBulkMultipleValueTest extends BaseIntegrationTest {
34+
public class LopPipedInsertBulkMultipleValueTest extends BaseIntegrationTest {
3335

34-
private String key = "LopInsertBulkMultipleValueTest";
36+
private String key = "LopPipedInsertBulkMultipleValueTest";
3537

3638
@Override
3739
protected void tearDown() throws Exception {
@@ -42,10 +44,10 @@ protected void tearDown() throws Exception {
4244
public void testInsertAndGet() {
4345
String value = "MyValue";
4446

45-
int valueCount = 500;
46-
Object[] valueList = new Object[valueCount];
47-
for (int i = 0; i < valueList.length; i++) {
48-
valueList[i] = "MyValue";
47+
int valueCount = 510;
48+
List<Object> valueList = new ArrayList<>(valueCount);
49+
for (int i = 0; i < valueCount; i++) {
50+
valueList.add("MyValue" + i);
4951
}
5052

5153
try {
@@ -54,8 +56,7 @@ public void testInsertAndGet() {
5456

5557
// SET
5658
Future<Map<Integer, CollectionOperationStatus>> future = mc
57-
.asyncLopPipedInsertBulk(key, 0, Arrays.asList(valueList),
58-
new CollectionAttributes());
59+
.asyncLopPipedInsertBulk(key, 0, valueList, new CollectionAttributes());
5960
try {
6061
Map<Integer, CollectionOperationStatus> errorList = future.get(
6162
20000L, TimeUnit.MILLISECONDS);
@@ -68,27 +69,27 @@ public void testInsertAndGet() {
6869

6970
// GET
7071
int errorCount = 0;
71-
List<Object> list = null;
72+
List<Object> resultList = null;
7273
Future<List<Object>> f = mc.asyncLopGet(key, 0, valueCount, false,
7374
false);
7475
try {
75-
list = f.get();
76+
resultList = f.get();
7677
} catch (Exception e) {
7778
f.cancel(true);
7879
e.printStackTrace();
7980
Assert.fail(e.getMessage());
8081
}
8182

82-
Assert.assertNotNull("List is null.", list);
83-
Assert.assertTrue("Cached list is empty.", !list.isEmpty());
84-
Assert.assertEquals(valueCount, list.size());
83+
Assert.assertNotNull("List is null.", resultList);
84+
Assert.assertTrue("Cached resultList is empty.", !resultList.isEmpty());
85+
Assert.assertEquals(valueCount, resultList.size());
8586

86-
for (Object o : list) {
87-
if (!value.equals(o)) {
87+
for (int i = 0; i < resultList.size(); i++) {
88+
if (!resultList.get(i).equals(valueList.get(i))) {
8889
errorCount++;
8990
}
9091
}
91-
Assert.assertEquals(valueCount, list.size());
92+
Assert.assertEquals(valueCount, resultList.size());
9293
Assert.assertEquals(0, errorCount);
9394

9495
// REMOVE
@@ -102,9 +103,7 @@ public void testInsertAndGet() {
102103
public void testErrorCount() {
103104
int valueCount = 1200;
104105
Object[] valueList = new Object[valueCount];
105-
for (int i = 0; i < valueList.length; i++) {
106-
valueList[i] = "MyValue";
107-
}
106+
Arrays.fill(valueList, "MyValue");
108107

109108
try {
110109
// SET
@@ -115,7 +114,10 @@ public void testErrorCount() {
115114
Map<Integer, CollectionOperationStatus> map = future.get(1000L,
116115
TimeUnit.MILLISECONDS);
117116
assertEquals(valueCount, map.size());
118-
117+
assertEquals(map.get(mc.getMaxPipedItemCount() - 1).getResponse(),
118+
CollectionResponse.NOT_FOUND);
119+
assertEquals(map.get(mc.getMaxPipedItemCount()).getResponse(),
120+
CollectionResponse.CANCELED);
119121
} catch (Exception e) {
120122
e.printStackTrace();
121123
Assert.fail();

0 commit comments

Comments
 (0)