Skip to content

Commit 0a2220b

Browse files
committed
INTERNAL: make piped insert operations process synchronously
1 parent 707c894 commit 0a2220b

File tree

11 files changed

+247
-58
lines changed

11 files changed

+247
-58
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

Lines changed: 90 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.concurrent.TimeUnit;
3939
import java.util.concurrent.atomic.AtomicBoolean;
4040
import java.util.concurrent.atomic.AtomicInteger;
41+
import java.util.function.BiFunction;
4142
import java.util.jar.JarFile;
4243
import java.util.jar.Manifest;
4344

@@ -1783,7 +1784,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPip
17831784
insertList.add(new BTreePipedInsert<>(key, elementMap, attributesForCreate, tc));
17841785
}
17851786
}
1786-
return asyncCollectionPipedInsert(key, insertList);
1787+
return syncCollectionPipedInsert(key, insertList);
17871788
}
17881789

17891790
@Override
@@ -1806,7 +1807,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPip
18061807
insertList.add(new ByteArraysBTreePipedInsert<>(key, elementList, attributesForCreate, tc));
18071808
}
18081809
}
1809-
return asyncCollectionPipedInsert(key, insertList);
1810+
return syncCollectionPipedInsert(key, insertList);
18101811
}
18111812

18121813
@Override
@@ -1833,7 +1834,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncMopPip
18331834
insertList.add(new MapPipedInsert<>(key, elementMap, attributesForCreate, tc));
18341835
}
18351836
}
1836-
return asyncCollectionPipedInsert(key, insertList);
1837+
return syncCollectionPipedInsert(key, insertList);
18371838
}
18381839

18391840
@Override
@@ -1859,7 +1860,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncLopPip
18591860
}
18601861
}
18611862
}
1862-
return asyncCollectionPipedInsert(key, insertList);
1863+
return syncCollectionPipedInsert(key, insertList);
18631864
}
18641865

18651866
@Override
@@ -1882,7 +1883,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncSopPip
18821883
insertList.add(new SetPipedInsert<>(key, elementList, attributesForCreate, tc));
18831884
}
18841885
}
1885-
return asyncCollectionPipedInsert(key, insertList);
1886+
return syncCollectionPipedInsert(key, insertList);
18861887
}
18871888

18881889
@Override
@@ -3106,6 +3107,90 @@ public void gotStatus(Integer index, OperationStatus status) {
31063107
return rv;
31073108
}
31083109

3110+
/**
3111+
* insert items into collection synchronously.
3112+
*
3113+
* @param key arcus cache key
3114+
* @param insertList must not be empty.
3115+
* @return future holding the map of element index and the reason why insert operation failed
3116+
*/
3117+
private <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> syncCollectionPipedInsert(
3118+
final String key, final List<CollectionPipedInsert<T>> insertList) {
3119+
final CountDownLatch latch = new CountDownLatch(1);
3120+
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
3121+
new PipedCollectionFuture<>(latch, operationTimeout);
3122+
3123+
final List<Operation> ops = new ArrayList<>(insertList.size());
3124+
BiFunction<Integer, Integer, OperationCallback> makeCallback = (opIdx, itemCount) -> new PipedOperationCallback() {
3125+
3126+
private int lastExecutedIndex;
3127+
3128+
public void receivedStatus(OperationStatus status) {
3129+
CollectionOperationStatus cstatus;
3130+
3131+
if (status instanceof CollectionOperationStatus) {
3132+
cstatus = (CollectionOperationStatus) status;
3133+
} else {
3134+
getLogger().warn("Unhandled state: " + status);
3135+
cstatus = new CollectionOperationStatus(status);
3136+
}
3137+
rv.setOperationStatus(cstatus);
3138+
}
3139+
3140+
public void complete() {
3141+
if (rv.getOperationStatus().isSuccess()) {
3142+
if (opIdx + 1 < ops.size()) {
3143+
// If operations are succeed and next operation exists, then add it.
3144+
Operation nextOp = ops.get(opIdx + 1);
3145+
rv.addOperation(nextOp);
3146+
addOp(key, nextOp);
3147+
} else {
3148+
latch.countDown();
3149+
}
3150+
} else {
3151+
// If this operation has been errored or cancelled,
3152+
// add the command not executed in a operation as not executed state.
3153+
// The remaining operations will be also not executed state but not added into failed result.
3154+
int nextIndex = 0;
3155+
if (lastExecutedIndex < itemCount - 1) {
3156+
// command remained in the same operation object.
3157+
nextIndex = lastExecutedIndex + 1 + (opIdx * MAX_PIPED_ITEM_COUNT);
3158+
} else if (opIdx + 1 < ops.size()) {
3159+
// command remained in the next operation object.
3160+
nextIndex = (opIdx + 1) * MAX_PIPED_ITEM_COUNT;
3161+
}
3162+
if (nextIndex > 0) {
3163+
rv.addEachResult(nextIndex,
3164+
new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED));
3165+
}
3166+
latch.countDown();
3167+
}
3168+
}
3169+
3170+
public void gotStatus(Integer index, OperationStatus status) {
3171+
if (!status.isSuccess()) {
3172+
if (status instanceof CollectionOperationStatus) {
3173+
rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT),
3174+
(CollectionOperationStatus) status);
3175+
} else {
3176+
rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT),
3177+
new CollectionOperationStatus(status));
3178+
}
3179+
}
3180+
this.lastExecutedIndex = index;
3181+
}
3182+
};
3183+
3184+
for (int i = 0; i < insertList.size(); i++) {
3185+
final CollectionPipedInsert<T> insert = insertList.get(i);
3186+
Operation op = opFact.collectionPipedInsert(key, insert, makeCallback.apply(i, insert.getItemCount()));
3187+
ops.add(op);
3188+
}
3189+
rv.addOperation(ops.get(0));
3190+
addOp(key, ops.get(0));
3191+
return rv;
3192+
}
3193+
31093194
@Override
31103195
public Future<Map<String, CollectionOperationStatus>> asyncBopInsertBulk(
31113196
List<String> keyList, long bkey, byte[] eFlag, Object value,

src/main/java/net/spy/memcached/collection/CollectionResponse.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public enum CollectionResponse {
4444

4545
UNDEFINED,
4646
CANCELED,
47+
NOT_EXECUTED,
4748

4849
INTERRUPT_EXCEPTION,
4950
EXECUTION_EXCEPTION,

src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java

Lines changed: 37 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package net.spy.memcached.internal;
22

33
import java.util.ArrayList;
4-
import java.util.Collection;
4+
import java.util.List;
55
import java.util.Map;
66
import java.util.concurrent.ConcurrentHashMap;
77
import java.util.concurrent.CountDownLatch;
@@ -11,13 +11,15 @@
1111
import java.util.concurrent.atomic.AtomicReference;
1212

1313
import net.spy.memcached.MemcachedConnection;
14+
import net.spy.memcached.collection.CollectionResponse;
1415
import net.spy.memcached.ops.CollectionOperationStatus;
1516
import net.spy.memcached.ops.Operation;
1617
import net.spy.memcached.ops.OperationState;
1718

1819
public class PipedCollectionFuture<K, V>
1920
extends CollectionFuture<Map<K, V>> {
20-
private final Collection<Operation> ops = new ArrayList<>();
21+
// operations that are completed or in progress
22+
private final List<Operation> ops = new ArrayList<>();
2123
private final AtomicReference<CollectionOperationStatus> operationStatus
2224
= new AtomicReference<>(null);
2325

@@ -30,66 +32,63 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) {
3032

3133
@Override
3234
public boolean cancel(boolean ign) {
33-
boolean rv = false;
34-
for (Operation op : ops) {
35-
rv |= op.cancel("by application.");
36-
}
37-
return rv;
35+
return ops.get(ops.size() - 1).cancel("by application.");
3836
}
3937

38+
/**
39+
* if previous op is cancelled, then next ops are not added to the opQueue.
40+
* So we only need to check current op.
41+
*
42+
* @return true if operation is cancelled.
43+
*/
4044
@Override
4145
public boolean isCancelled() {
42-
for (Operation op : ops) {
43-
if (op.isCancelled()) {
44-
return true;
45-
}
46-
}
47-
return false;
46+
return operationStatus.get().getResponse() == CollectionResponse.CANCELED;
47+
}
48+
49+
/**
50+
* if previous op threw exception, then next ops are not added to the opQueue.
51+
* So we only need to check current op.
52+
*
53+
* @return true if operation has errored by exception.
54+
*/
55+
public boolean hasErrored() {
56+
return ops.get(ops.size() - 1).hasErrored();
4857
}
4958

5059
@Override
5160
public boolean isDone() {
52-
for (Operation op : ops) {
53-
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
54-
return false;
55-
}
56-
}
57-
return true;
61+
return latch.getCount() == 0;
5862
}
5963

6064
@Override
6165
public Map<K, V> get(long duration, TimeUnit unit)
6266
throws InterruptedException, TimeoutException, ExecutionException {
6367

6468
long beforeAwait = System.currentTimeMillis();
69+
Operation lastExecutedOp;
6570
if (!latch.await(duration, unit)) {
66-
Collection<Operation> timedOutOps = new ArrayList<>();
67-
for (Operation op : ops) {
68-
if (op.getState() != OperationState.COMPLETE) {
69-
timedOutOps.add(op);
70-
} else {
71-
MemcachedConnection.opSucceeded(op);
72-
}
73-
}
74-
if (!timedOutOps.isEmpty()) {
75-
// set timeout only once for piped ops requested to single node.
76-
MemcachedConnection.opTimedOut(timedOutOps.iterator().next());
71+
lastExecutedOp = ops.get(ops.size() - 1);
72+
if (lastExecutedOp.getState() != OperationState.COMPLETE) {
73+
MemcachedConnection.opTimedOut(lastExecutedOp);
7774

7875
long elapsed = System.currentTimeMillis() - beforeAwait;
79-
throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps);
76+
throw new CheckedOperationTimeoutException(duration, unit, elapsed, lastExecutedOp);
8077
}
8178
} else {
8279
// continuous timeout counter will be reset only once in pipe
83-
MemcachedConnection.opSucceeded(ops.iterator().next());
80+
lastExecutedOp = ops.get(ops.size() - 1);
81+
MemcachedConnection.opSucceeded(lastExecutedOp);
8482
}
8583

86-
for (Operation op : ops) {
87-
if (op != null && op.hasErrored()) {
88-
throw new ExecutionException(op.getException());
84+
// If previous op has errored or cancelled, operations are not executed anymore.
85+
// Therefore, we only need to check last executed op.
86+
if (lastExecutedOp != null) {
87+
if (lastExecutedOp.hasErrored()) {
88+
throw new ExecutionException(lastExecutedOp.getException());
8989
}
90-
91-
if (op != null && op.isCancelled()) {
92-
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
90+
if (lastExecutedOp.isCancelled()) {
91+
throw new ExecutionException(new RuntimeException(lastExecutedOp.getCancelCause()));
9392
}
9493
}
9594

src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
/**
3030
* Operation to store collection data in a memcached server.
3131
*/
32-
public final class CollectionPipedInsertOperationImpl extends PipeOperationImpl
32+
public final class CollectionPipedInsertOperationImpl extends SingleKeyPipeOperationImpl
3333
implements CollectionPipedInsertOperation {
3434

3535
public CollectionPipedInsertOperationImpl(String key,

src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,13 @@ abstract class PipeOperationImpl extends OperationImpl {
6262

6363
protected boolean successAll = true;
6464

65-
private final CollectionPipe collectionPipe;
66-
private final PipedOperationCallback cb;
67-
private final List<String> keys;
65+
protected final CollectionPipe collectionPipe;
66+
protected final PipedOperationCallback cb;
67+
protected final List<String> keys;
6868
private final boolean isIdempotent;
6969

70-
private int index = 0;
71-
private boolean readUntilLastLine = false;
70+
protected int index = 0;
71+
protected boolean readUntilLastLine = false;
7272

7373
protected PipeOperationImpl(List<String> keys, CollectionPipe collectionPipe,
7474
OperationCallback cb) {

0 commit comments

Comments
 (0)