Skip to content

Commit df233d9

Browse files
committed
INTERNAL: make lop piped operations process synchronously
1 parent d065b77 commit df233d9

File tree

3 files changed

+138
-66
lines changed

3 files changed

+138
-66
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.ArrayList;
2727
import java.util.Arrays;
2828
import java.util.Collection;
29+
import java.util.Collections;
2930
import java.util.Enumeration;
3031
import java.util.HashMap;
3132
import java.util.HashSet;
@@ -38,6 +39,7 @@
3839
import java.util.concurrent.TimeUnit;
3940
import java.util.concurrent.atomic.AtomicBoolean;
4041
import java.util.concurrent.atomic.AtomicInteger;
42+
import java.util.function.IntFunction;
4143
import java.util.jar.JarFile;
4244
import java.util.jar.Manifest;
4345

@@ -1862,11 +1864,16 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncLopPip
18621864
} else {
18631865
PartitionedList<T> list = new PartitionedList<>(valueList,
18641866
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
1867+
18651868
for (List<T> elementList : list) {
18661869
insertList.add(new ListPipedInsert<>(key, index, elementList, attributesForCreate, tc));
1870+
if (index >= 0) {
1871+
index += elementList.size();
1872+
}
18671873
}
18681874
}
1869-
return asyncCollectionPipedInsert(key, insertList);
1875+
1876+
return syncCollectionPipedInsert(key, Collections.unmodifiableList(insertList));
18701877
}
18711878

18721879
@Override
@@ -3137,6 +3144,79 @@ public void gotStatus(Integer index, OperationStatus status) {
31373144
return rv;
31383145
}
31393146

3147+
/**
3148+
* Pipe insert method for collection items.
3149+
*
3150+
* @param key arcus cache key
3151+
* @param insertList must not be empty.
3152+
* @return future holding the map of element index and the reason why insert operation failed
3153+
*/
3154+
private <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> syncCollectionPipedInsert(
3155+
final String key, final List<CollectionPipedInsert<T>> insertList) {
3156+
final CountDownLatch latch = new CountDownLatch(1);
3157+
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
3158+
new PipedCollectionFuture<>(latch, operationTimeout);
3159+
3160+
final List<Operation> ops = new ArrayList<>(insertList.size());
3161+
IntFunction<OperationCallback> makeCallback = idx -> new CollectionPipedInsertOperation.Callback() {
3162+
// each result status
3163+
public void receivedStatus(OperationStatus status) {
3164+
CollectionOperationStatus cstatus;
3165+
3166+
if (status instanceof CollectionOperationStatus) {
3167+
cstatus = (CollectionOperationStatus) status;
3168+
} else {
3169+
getLogger().warn("Unhandled state: " + status);
3170+
cstatus = new CollectionOperationStatus(status);
3171+
}
3172+
rv.setOperationStatus(cstatus);
3173+
}
3174+
3175+
// complete
3176+
public void complete() {
3177+
if (idx == insertList.size() - 1 || rv.hasErrored() || rv.isCancelled()) {
3178+
latch.countDown();
3179+
} else if (!rv.getOperationStatus().isSuccess()) {
3180+
// if error or cancel occurred by this operation,
3181+
// do not add all remaining operations and mark as cancelled
3182+
for (int chunkIdx = idx + 1; chunkIdx < insertList.size(); chunkIdx++) {
3183+
for (int itemIdx = 0; itemIdx < insertList.get(chunkIdx).getItemCount(); itemIdx++) {
3184+
rv.addEachResult(itemIdx + (chunkIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3185+
new CollectionOperationStatus(new CollectionOperationStatus(
3186+
false, "CANCELED", CollectionResponse.CANCELED)));
3187+
}
3188+
}
3189+
latch.countDown();
3190+
} else {
3191+
// add next operation if this is not last op
3192+
Operation nextOp = ops.get(idx + 1);
3193+
rv.addOperation(nextOp);
3194+
addOp(key, nextOp);
3195+
}
3196+
}
3197+
3198+
// got status
3199+
public void gotStatus(Integer index, OperationStatus status) {
3200+
if (status instanceof CollectionOperationStatus) {
3201+
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3202+
(CollectionOperationStatus) status);
3203+
} else {
3204+
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3205+
new CollectionOperationStatus(status));
3206+
}
3207+
}
3208+
};
3209+
3210+
for (int i = 0; i < insertList.size(); i++) {
3211+
final CollectionPipedInsert<T> insert = insertList.get(i);
3212+
Operation op = opFact.collectionPipedInsert(key, insert, makeCallback.apply(i));
3213+
ops.add(op);
3214+
}
3215+
rv.addOperation(ops.get(0));
3216+
addOp(key, ops.get(0));
3217+
return rv;
3218+
}
3219+
31403220
@Override
31413221
public Future<Map<String, CollectionOperationStatus>> asyncBopInsertBulk(
31423222
List<String> keyList, long bkey, byte[] eFlag, Object value,

src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java

Lines changed: 35 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package net.spy.memcached.internal;
22

33
import java.util.ArrayList;
4-
import java.util.Collection;
5-
import java.util.HashSet;
4+
import java.util.List;
65
import java.util.Map;
76
import java.util.concurrent.ConcurrentHashMap;
87
import java.util.concurrent.CountDownLatch;
@@ -12,13 +11,15 @@
1211
import java.util.concurrent.atomic.AtomicReference;
1312

1413
import net.spy.memcached.MemcachedConnection;
14+
import net.spy.memcached.collection.CollectionResponse;
1515
import net.spy.memcached.ops.CollectionOperationStatus;
1616
import net.spy.memcached.ops.Operation;
1717
import net.spy.memcached.ops.OperationState;
1818

1919
public class PipedCollectionFuture<K, V>
2020
extends CollectionFuture<Map<K, V>> {
21-
private final Collection<Operation> ops = new ArrayList<>();
21+
// operations that are completed or in progress
22+
private final List<Operation> ops = new ArrayList<>();
2223
private final AtomicReference<CollectionOperationStatus> operationStatus
2324
= new AtomicReference<>(null);
2425

@@ -31,67 +32,56 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) {
3132

3233
@Override
3334
public boolean cancel(boolean ign) {
34-
boolean rv = false;
35-
for (Operation op : ops) {
36-
rv |= op.cancel("by application.");
37-
}
38-
return rv;
35+
return ops.get(ops.size() - 1).cancel("by application.");
3936
}
4037

38+
/**
39+
* if previous op is cancelled, then next ops are not added to the opQueue.
40+
* So we only need to check current op.
41+
*
42+
* @return true if operation is cancelled.
43+
*/
4144
@Override
4245
public boolean isCancelled() {
43-
for (Operation op : ops) {
44-
if (op.isCancelled()) {
45-
return true;
46-
}
47-
}
48-
return false;
46+
return operationStatus.get().getResponse() == CollectionResponse.CANCELED;
47+
}
48+
49+
/**
50+
* if previous op threw exception, then next ops are not added to the opQueue.
51+
* So we only need to check current op.
52+
*
53+
* @return true if operation has errored by exception.
54+
*/
55+
public boolean hasErrored() {
56+
return ops.get(ops.size() - 1).hasErrored();
4957
}
5058

5159
@Override
5260
public boolean isDone() {
53-
for (Operation op : ops) {
54-
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
55-
return false;
56-
}
57-
}
58-
return true;
61+
return latch.getCount() == 0;
5962
}
6063

6164
@Override
6265
public Map<K, V> get(long duration, TimeUnit unit)
6366
throws InterruptedException, TimeoutException, ExecutionException {
64-
6567
long beforeAwait = System.currentTimeMillis();
66-
if (!latch.await(duration, unit)) {
67-
Collection<Operation> timedOutOps = new HashSet<>();
68-
for (Operation op : ops) {
69-
if (op.getState() != OperationState.COMPLETE) {
70-
timedOutOps.add(op);
71-
} else {
72-
MemcachedConnection.opSucceeded(op);
73-
}
74-
}
75-
if (!timedOutOps.isEmpty()) {
76-
// set timeout only once for piped ops requested to single node.
77-
MemcachedConnection.opTimedOut(timedOutOps.iterator().next());
78-
79-
long elapsed = System.currentTimeMillis() - beforeAwait;
80-
throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps);
81-
}
68+
Operation lastOp = ops.get(ops.size() - 1);
69+
if (!latch.await(duration, unit) && lastOp.getState() != OperationState.COMPLETE) {
70+
MemcachedConnection.opTimedOut(lastOp);
71+
72+
long elapsed = System.currentTimeMillis() - beforeAwait;
73+
throw new CheckedOperationTimeoutException(duration, unit, elapsed, lastOp);
8274
} else {
8375
// continuous timeout counter will be reset only once in pipe
84-
MemcachedConnection.opSucceeded(ops.iterator().next());
76+
MemcachedConnection.opSucceeded(lastOp);
8577
}
8678

87-
for (Operation op : ops) {
88-
if (op != null && op.hasErrored()) {
89-
throw new ExecutionException(op.getException());
90-
}
79+
if (lastOp != null && lastOp.hasErrored()) {
80+
throw new ExecutionException(lastOp.getException());
81+
}
9182

92-
if (op != null && op.isCancelled()) {
93-
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
94-
}
83+
if (lastOp != null && lastOp.isCancelled()) {
84+
throw new ExecutionException(new RuntimeException(lastOp.getCancelCause()));
9585
}
9686

9787
return failedResult;

src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java renamed to src/test/manual/net/spy/memcached/bulkoperation/LopPipedInsertBulkMultipleValueTest.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package net.spy.memcached.bulkoperation;
1818

19+
import java.util.ArrayList;
1920
import java.util.Arrays;
2021
import java.util.List;
2122
import java.util.Map;
@@ -25,13 +26,14 @@
2526

2627
import net.spy.memcached.collection.BaseIntegrationTest;
2728
import net.spy.memcached.collection.CollectionAttributes;
29+
import net.spy.memcached.collection.CollectionResponse;
2830
import net.spy.memcached.ops.CollectionOperationStatus;
2931

3032
import org.junit.Assert;
3133

32-
public class LopInsertBulkMultipleValueTest extends BaseIntegrationTest {
34+
public class LopPipedInsertBulkMultipleValueTest extends BaseIntegrationTest {
3335

34-
private String key = "LopInsertBulkMultipleValueTest";
36+
private String key = "LopPipedInsertBulkMultipleValueTest";
3537

3638
@Override
3739
protected void tearDown() throws Exception {
@@ -42,10 +44,10 @@ protected void tearDown() throws Exception {
4244
public void testInsertAndGet() {
4345
String value = "MyValue";
4446

45-
int valueCount = 500;
46-
Object[] valueList = new Object[valueCount];
47-
for (int i = 0; i < valueList.length; i++) {
48-
valueList[i] = "MyValue";
47+
int valueCount = 510;
48+
List<Object> valueList = new ArrayList<>(valueCount);
49+
for (int i = 0; i < valueCount; i++) {
50+
valueList.add("MyValue" + i);
4951
}
5052

5153
try {
@@ -54,8 +56,7 @@ public void testInsertAndGet() {
5456

5557
// SET
5658
Future<Map<Integer, CollectionOperationStatus>> future = mc
57-
.asyncLopPipedInsertBulk(key, 0, Arrays.asList(valueList),
58-
new CollectionAttributes());
59+
.asyncLopPipedInsertBulk(key, 0, valueList, new CollectionAttributes());
5960
try {
6061
Map<Integer, CollectionOperationStatus> errorList = future.get(
6162
20000L, TimeUnit.MILLISECONDS);
@@ -68,27 +69,27 @@ public void testInsertAndGet() {
6869

6970
// GET
7071
int errorCount = 0;
71-
List<Object> list = null;
72+
List<Object> resultList = null;
7273
Future<List<Object>> f = mc.asyncLopGet(key, 0, valueCount, false,
7374
false);
7475
try {
75-
list = f.get();
76+
resultList = f.get();
7677
} catch (Exception e) {
7778
f.cancel(true);
7879
e.printStackTrace();
7980
Assert.fail(e.getMessage());
8081
}
8182

82-
Assert.assertNotNull("List is null.", list);
83-
Assert.assertTrue("Cached list is empty.", !list.isEmpty());
84-
Assert.assertEquals(valueCount, list.size());
83+
Assert.assertNotNull("List is null.", resultList);
84+
Assert.assertTrue("Cached resultList is empty.", !resultList.isEmpty());
85+
Assert.assertEquals(valueCount, resultList.size());
8586

86-
for (Object o : list) {
87-
if (!value.equals(o)) {
87+
for (int i = 0; i < resultList.size(); i++) {
88+
if (!resultList.get(i).equals(valueList.get(i))) {
8889
errorCount++;
8990
}
9091
}
91-
Assert.assertEquals(valueCount, list.size());
92+
Assert.assertEquals(valueCount, resultList.size());
9293
Assert.assertEquals(0, errorCount);
9394

9495
// REMOVE
@@ -102,9 +103,7 @@ public void testInsertAndGet() {
102103
public void testErrorCount() {
103104
int valueCount = 1200;
104105
Object[] valueList = new Object[valueCount];
105-
for (int i = 0; i < valueList.length; i++) {
106-
valueList[i] = "MyValue";
107-
}
106+
Arrays.fill(valueList, "MyValue");
108107

109108
try {
110109
// SET
@@ -115,7 +114,10 @@ public void testErrorCount() {
115114
Map<Integer, CollectionOperationStatus> map = future.get(1000L,
116115
TimeUnit.MILLISECONDS);
117116
assertEquals(valueCount, map.size());
118-
117+
assertEquals(map.get(mc.getMaxPipedItemCount() - 1).getResponse(),
118+
CollectionResponse.NOT_FOUND);
119+
assertEquals(map.get(mc.getMaxPipedItemCount()).getResponse(),
120+
CollectionResponse.CANCELED);
119121
} catch (Exception e) {
120122
e.printStackTrace();
121123
Assert.fail();

0 commit comments

Comments
 (0)