Skip to content

Commit 853fb3d

Browse files
committed
INTERNAL: make piped insert operations process synchronously
1 parent 2c31ec4 commit 853fb3d

File tree

11 files changed

+231
-21
lines changed

11 files changed

+231
-21
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

Lines changed: 92 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.concurrent.TimeUnit;
3939
import java.util.concurrent.atomic.AtomicBoolean;
4040
import java.util.concurrent.atomic.AtomicInteger;
41+
import java.util.function.BiFunction;
4142
import java.util.jar.JarFile;
4243
import java.util.jar.Manifest;
4344

@@ -1783,7 +1784,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPip
17831784
insertList.add(new BTreePipedInsert<>(key, elementMap, attributesForCreate, tc));
17841785
}
17851786
}
1786-
return asyncCollectionPipedInsert(key, insertList);
1787+
return syncCollectionPipedInsert(key, insertList);
17871788
}
17881789

17891790
@Override
@@ -1806,7 +1807,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPip
18061807
insertList.add(new ByteArraysBTreePipedInsert<>(key, elementList, attributesForCreate, tc));
18071808
}
18081809
}
1809-
return asyncCollectionPipedInsert(key, insertList);
1810+
return syncCollectionPipedInsert(key, insertList);
18101811
}
18111812

18121813
@Override
@@ -1833,7 +1834,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncMopPip
18331834
insertList.add(new MapPipedInsert<>(key, elementMap, attributesForCreate, tc));
18341835
}
18351836
}
1836-
return asyncCollectionPipedInsert(key, insertList);
1837+
return syncCollectionPipedInsert(key, insertList);
18371838
}
18381839

18391840
@Override
@@ -1859,7 +1860,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncLopPip
18591860
}
18601861
}
18611862
}
1862-
return asyncCollectionPipedInsert(key, insertList);
1863+
return syncCollectionPipedInsert(key, insertList);
18631864
}
18641865

18651866
@Override
@@ -1882,7 +1883,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncSopPip
18821883
insertList.add(new SetPipedInsert<>(key, elementList, attributesForCreate, tc));
18831884
}
18841885
}
1885-
return asyncCollectionPipedInsert(key, insertList);
1886+
return syncCollectionPipedInsert(key, insertList);
18861887
}
18871888

18881889
@Override
@@ -3106,6 +3107,92 @@ public void gotStatus(Integer index, OperationStatus status) {
31063107
return rv;
31073108
}
31083109

3110+
/**
3111+
* insert items into collection synchronously.
3112+
*
3113+
* @param key arcus cache key
3114+
* @param insertList must not be empty.
3115+
* @return future holding the map of element index and the reason why insert operation failed
3116+
*/
3117+
private <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> syncCollectionPipedInsert(
3118+
final String key, final List<CollectionPipedInsert<T>> insertList) {
3119+
final CountDownLatch latch = new CountDownLatch(1);
3120+
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
3121+
new PipedCollectionFuture<>(latch, operationTimeout);
3122+
3123+
BiFunction<Integer, Integer, OperationCallback> makeCallback = (opIdx, itemCount) -> new PipedOperationCallback() {
3124+
3125+
private int currentCommandIdx = -1;
3126+
3127+
public void receivedStatus(OperationStatus status) {
3128+
CollectionOperationStatus cstatus;
3129+
3130+
if (status instanceof CollectionOperationStatus) {
3131+
cstatus = (CollectionOperationStatus) status;
3132+
} else {
3133+
getLogger().warn("Unhandled state: " + status);
3134+
cstatus = new CollectionOperationStatus(status);
3135+
}
3136+
rv.setOperationStatus(cstatus);
3137+
}
3138+
3139+
public void complete() {
3140+
if (rv.getOperationStatus().isSuccess()) {
3141+
int currentOpIdx = rv.getCurrentOpIdx();
3142+
int nextOpIdx = currentOpIdx + 1;
3143+
if (nextOpIdx < insertList.size()) {
3144+
Operation nextOp = rv.getOperation(nextOpIdx);
3145+
if (!nextOp.isCancelled()) {
3146+
addOp(key, nextOp);
3147+
rv.setCurrentOpIdx(nextOpIdx);
3148+
}
3149+
} else {
3150+
latch.countDown();
3151+
}
3152+
} else {
3153+
// If this operation has been errored or cancelled,
3154+
// add the command not executed in a operation as not executed state.
3155+
// The remaining operations will be also not executed state but not added into failed result.
3156+
int nextCommandIdx = 0;
3157+
if (currentCommandIdx < itemCount - 1) {
3158+
// command remained in the same operation object.
3159+
nextCommandIdx = currentCommandIdx + 1 + (opIdx * MAX_PIPED_ITEM_COUNT);
3160+
} else if (opIdx + 1 < insertList.size()) {
3161+
// command remained in the next operation object.
3162+
nextCommandIdx = (opIdx + 1) * MAX_PIPED_ITEM_COUNT;
3163+
}
3164+
if (nextCommandIdx > 0) {
3165+
rv.addEachResult(nextCommandIdx,
3166+
new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED));
3167+
}
3168+
latch.countDown();
3169+
}
3170+
}
3171+
3172+
public void gotStatus(Integer index, OperationStatus status) {
3173+
if (!status.isSuccess()) {
3174+
if (status instanceof CollectionOperationStatus) {
3175+
rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT),
3176+
(CollectionOperationStatus) status);
3177+
} else {
3178+
rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT),
3179+
new CollectionOperationStatus(status));
3180+
}
3181+
}
3182+
currentCommandIdx = index;
3183+
}
3184+
};
3185+
3186+
for (int i = 0; i < insertList.size(); i++) {
3187+
final CollectionPipedInsert<T> insert = insertList.get(i);
3188+
Operation op = opFact.collectionPipedInsert(key, insert, makeCallback.apply(i, insert.getItemCount()));
3189+
rv.addOperation(op);
3190+
}
3191+
addOp(key, rv.getOperation(0));
3192+
rv.setCurrentOpIdx(0);
3193+
return rv;
3194+
}
3195+
31093196
@Override
31103197
public Future<Map<String, CollectionOperationStatus>> asyncBopInsertBulk(
31113198
List<String> keyList, long bkey, byte[] eFlag, Object value,

src/main/java/net/spy/memcached/collection/CollectionResponse.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public enum CollectionResponse {
4444

4545
UNDEFINED,
4646
CANCELED,
47+
NOT_EXECUTED,
4748

4849
INTERRUPT_EXCEPTION,
4950
EXECUTION_EXCEPTION,

src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.ArrayList;
44
import java.util.Collection;
5+
import java.util.List;
56
import java.util.Map;
67
import java.util.concurrent.ConcurrentHashMap;
78
import java.util.concurrent.CountDownLatch;
@@ -17,9 +18,10 @@
1718

1819
public class PipedCollectionFuture<K, V>
1920
extends CollectionFuture<Map<K, V>> {
20-
private final Collection<Operation> ops = new ArrayList<>();
21+
private final List<Operation> ops = new ArrayList<>();
2122
private final AtomicReference<CollectionOperationStatus> operationStatus
2223
= new AtomicReference<>(null);
24+
private int currentOpIdx = 0;
2325

2426
private final Map<K, V> failedResult =
2527
new ConcurrentHashMap<>();
@@ -37,6 +39,9 @@ public boolean cancel(boolean ign) {
3739
return rv;
3840
}
3941

42+
/**
43+
* @return true if any operation is cancelled.
44+
*/
4045
@Override
4146
public boolean isCancelled() {
4247
for (Operation op : ops) {
@@ -119,4 +124,16 @@ public void addEachResult(K index, V status) {
119124
public void addOperation(Operation op) {
120125
ops.add(op);
121126
}
127+
128+
public Operation getOperation(int idx) {
129+
return ops.get(idx);
130+
}
131+
132+
public void setCurrentOpIdx(int idx) {
133+
this.currentOpIdx = idx;
134+
}
135+
136+
public int getCurrentOpIdx() {
137+
return currentOpIdx;
138+
}
122139
}

src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
/**
3030
* Operation to store collection data in a memcached server.
3131
*/
32-
public final class CollectionPipedInsertOperationImpl extends PipeOperationImpl
32+
public final class CollectionPipedInsertOperationImpl extends SingleKeyPipeOperationImpl
3333
implements CollectionPipedInsertOperation {
3434

3535
public CollectionPipedInsertOperationImpl(String key,

src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,13 @@ abstract class PipeOperationImpl extends OperationImpl {
6262

6363
protected boolean successAll = true;
6464

65-
private final CollectionPipe collectionPipe;
66-
private final PipedOperationCallback cb;
67-
private final List<String> keys;
65+
protected final CollectionPipe collectionPipe;
66+
protected final PipedOperationCallback cb;
67+
protected final List<String> keys;
6868
private final boolean isIdempotent;
6969

70-
private int index = 0;
71-
private boolean readUntilLastLine = false;
70+
protected int index = 0;
71+
protected boolean readUntilLastLine = false;
7272

7373
protected PipeOperationImpl(List<String> keys, CollectionPipe collectionPipe,
7474
OperationCallback cb) {
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package net.spy.memcached.protocol.ascii;
2+
3+
import java.util.List;
4+
5+
import net.spy.memcached.collection.CollectionPipe;
6+
import net.spy.memcached.ops.OperationCallback;
7+
import net.spy.memcached.ops.OperationState;
8+
import net.spy.memcached.ops.OperationStatus;
9+
10+
public abstract class SingleKeyPipeOperationImpl extends PipeOperationImpl {
11+
12+
protected SingleKeyPipeOperationImpl(List<String> keys,
13+
CollectionPipe collectionPipe,
14+
OperationCallback cb) {
15+
super(keys, collectionPipe, cb);
16+
}
17+
18+
@Override
19+
public void handleLine(String line) {
20+
assert getState() == OperationState.READING
21+
: "Read ``" + line + "'' when in " + getState() + " state";
22+
23+
/* ENABLE_REPLICATION if */
24+
if (isWriteOperation() && hasSwitchedOver(line)) {
25+
collectionPipe.setNextOpIndex(index);
26+
prepareSwitchover(line);
27+
return;
28+
}
29+
/* ENABLE_REPLICATION end */
30+
31+
/* ENABLE_MIGRATION if */
32+
if (hasNotMyKey(line)) {
33+
// Only one NOT_MY_KEY is provided in response of
34+
// single key piped operation when redirection.
35+
addRedirectSingleKeyOperation(line, keys.get(0));
36+
if (collectionPipe.isNotPiped()) {
37+
transitionState(OperationState.REDIRECT);
38+
} else {
39+
collectionPipe.setNextOpIndex(index);
40+
}
41+
return;
42+
}
43+
/* ENABLE_MIGRATION end */
44+
45+
if (collectionPipe.isNotPiped()) {
46+
OperationStatus status = checkStatus(line);
47+
if (!status.isSuccess()) {
48+
successAll = false;
49+
}
50+
cb.gotStatus(index, status);
51+
52+
cb.receivedStatus((successAll) ? END : FAILED_END);
53+
transitionState(OperationState.COMPLETE);
54+
return;
55+
}
56+
57+
/*
58+
RESPONSE <count>\r\n
59+
<status of the 1st pipelined command>\r\n
60+
[ ... ]
61+
<status of the last pipelined command>\r\n
62+
END|PIPE_ERROR <error_string>\r\n
63+
*/
64+
if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) {
65+
/* ENABLE_MIGRATION if */
66+
if (needRedirect()) {
67+
transitionState(OperationState.REDIRECT);
68+
return;
69+
}
70+
/* ENABLE_MIGRATION end */
71+
cb.receivedStatus((index == collectionPipe.getItemCount() && successAll) ? END : FAILED_END);
72+
transitionState(OperationState.COMPLETE);
73+
} else if (line.startsWith("RESPONSE ")) {
74+
getLogger().debug("Got line %s", line);
75+
76+
// TODO server should be fixed
77+
line = line.replace(" ", " ");
78+
line = line.replace(" ", " ");
79+
80+
String[] stuff = line.split(" ");
81+
assert "RESPONSE".equals(stuff[0]);
82+
readUntilLastLine = true;
83+
} else {
84+
OperationStatus status = checkStatus(line);
85+
if (!status.isSuccess()) {
86+
successAll = false;
87+
}
88+
cb.gotStatus(index, status);
89+
90+
index++;
91+
}
92+
}
93+
94+
@Override
95+
protected OperationStatus checkStatus(String line) {
96+
return null;
97+
}
98+
99+
}

src/test/manual/net/spy/memcached/bulkoperation/BopInsertBulkMultipleTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.TimeoutException;
2525

26+
import net.spy.memcached.ArcusClient;
2627
import net.spy.memcached.collection.BaseIntegrationTest;
2728
import net.spy.memcached.collection.CollectionAttributes;
2829
import net.spy.memcached.collection.Element;
@@ -122,7 +123,7 @@ void testErrorCount() {
122123

123124
Map<Integer, CollectionOperationStatus> map = future.get(2000L,
124125
TimeUnit.MILLISECONDS);
125-
assertEquals(bkeySize, map.size());
126+
assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size());
126127

127128
} catch (Exception e) {
128129
e.printStackTrace();

src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.TimeoutException;
2525

26+
import net.spy.memcached.ArcusClient;
2627
import net.spy.memcached.collection.BaseIntegrationTest;
2728
import net.spy.memcached.collection.CollectionAttributes;
29+
import net.spy.memcached.collection.CollectionResponse;
2830
import net.spy.memcached.ops.CollectionOperationStatus;
2931

3032
import org.junit.jupiter.api.AfterEach;
@@ -111,9 +113,7 @@ void testInsertAndGet() {
111113
void testErrorCount() {
112114
int valueCount = 1200;
113115
Object[] valueList = new Object[valueCount];
114-
for (int i = 0; i < valueList.length; i++) {
115-
valueList[i] = "MyValue";
116-
}
116+
Arrays.fill(valueList, "MyValue");
117117

118118
try {
119119
// SET
@@ -123,8 +123,11 @@ void testErrorCount() {
123123

124124
Map<Integer, CollectionOperationStatus> map = future.get(1000L,
125125
TimeUnit.MILLISECONDS);
126-
assertEquals(valueCount, map.size());
127-
126+
assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size());
127+
assertEquals(map.get(ArcusClient.MAX_PIPED_ITEM_COUNT - 1).getResponse(),
128+
CollectionResponse.NOT_FOUND);
129+
assertEquals(map.get(ArcusClient.MAX_PIPED_ITEM_COUNT).getResponse(),
130+
CollectionResponse.NOT_EXECUTED);
128131
} catch (Exception e) {
129132
e.printStackTrace();
130133
fail();

src/test/manual/net/spy/memcached/bulkoperation/MopInsertBulkMultipleTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.TimeoutException;
2525

26+
import net.spy.memcached.ArcusClient;
2627
import net.spy.memcached.collection.BaseIntegrationTest;
2728
import net.spy.memcached.collection.CollectionAttributes;
2829
import net.spy.memcached.ops.CollectionOperationStatus;
@@ -116,7 +117,7 @@ void testErrorCount() {
116117

117118
Map<Integer, CollectionOperationStatus> map = future.get(2000L,
118119
TimeUnit.MILLISECONDS);
119-
assertEquals(elementSize, map.size());
120+
assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size());
120121

121122
} catch (Exception e) {
122123
e.printStackTrace();

0 commit comments

Comments
 (0)