Skip to content

Commit 4c1de11

Browse files
uhm0311jhpark816
authored andcommitted
FIX: Split piped operations into single-key and multi-key to resolve bug during migration
1 parent 88b8dab commit 4c1de11

13 files changed

+118
-57
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,12 @@
138138
import net.spy.memcached.ops.CollectionGetOperation;
139139
import net.spy.memcached.ops.CollectionOperationStatus;
140140
import net.spy.memcached.ops.GetAttrOperation;
141+
import net.spy.memcached.ops.MultiKeyPipedOperationCallback;
141142
import net.spy.memcached.ops.Mutator;
142143
import net.spy.memcached.ops.Operation;
143144
import net.spy.memcached.ops.OperationCallback;
144145
import net.spy.memcached.ops.OperationStatus;
145-
import net.spy.memcached.ops.PipedOperationCallback;
146+
import net.spy.memcached.ops.SingleKeyPipedOperationCallback;
146147
import net.spy.memcached.ops.StatusCode;
147148
import net.spy.memcached.ops.StoreType;
148149
import net.spy.memcached.plugin.FrontCacheMemcachedClient;
@@ -1662,7 +1663,7 @@ private <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> syncCollec
16621663
new PipedCollectionFuture<>(latch, operationTimeout);
16631664

16641665
final List<Operation> ops = new ArrayList<>(insertList.size());
1665-
IntFunction<OperationCallback> makeCallback = opIdx -> new PipedOperationCallback() {
1666+
IntFunction<OperationCallback> makeCallback = opIdx -> new SingleKeyPipedOperationCallback() {
16661667

16671668
public void receivedStatus(OperationStatus status) {
16681669
CollectionOperationStatus cstatus = toCollectionOperationStatus(status);
@@ -2031,7 +2032,7 @@ private <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> syncCollec
20312032
new PipedCollectionFuture<>(latch, operationTimeout);
20322033

20332034
final List<Operation> ops = new ArrayList<>(updateList.size());
2034-
IntFunction<OperationCallback> makeCallback = opIdx -> new PipedOperationCallback() {
2035+
IntFunction<OperationCallback> makeCallback = opIdx -> new SingleKeyPipedOperationCallback() {
20352036

20362037
public void receivedStatus(OperationStatus status) {
20372038
CollectionOperationStatus cstatus = toCollectionOperationStatus(status);
@@ -2603,7 +2604,7 @@ <T> CollectionFuture<Map<T, Boolean>> asyncSetPipedExist(
26032604
latch, operationTimeout);
26042605

26052606
Operation op = opFact.collectionPipedExist(key, exist,
2606-
new PipedOperationCallback() {
2607+
new SingleKeyPipedOperationCallback() {
26072608

26082609
private final Map<T, Boolean> result = new HashMap<>();
26092610
private boolean hasAnError = false;
@@ -2878,7 +2879,7 @@ private <T> Future<Map<String, CollectionOperationStatus>> asyncCollectionInsert
28782879

28792880
for (final CollectionBulkInsert<T> insert : insertList) {
28802881
Operation op = opFact.collectionBulkInsert(
2881-
insert, new PipedOperationCallback() {
2882+
insert, new MultiKeyPipedOperationCallback() {
28822883
public void receivedStatus(OperationStatus status) {
28832884
// Nothing to do here because the user MUST search the result Map instance.
28842885
}
@@ -2887,9 +2888,8 @@ public void complete() {
28872888
latch.countDown();
28882889
}
28892890

2890-
public void gotStatus(Integer index, OperationStatus status) {
2891+
public void gotStatus(String key, OperationStatus status) {
28912892
if (!status.isSuccess()) {
2892-
String key = insert.getKey(index);
28932893
CollectionOperationStatus cstatus = toCollectionOperationStatus(status);
28942894
rv.addFailedResult(key, cstatus);
28952895
}

src/main/java/net/spy/memcached/ops/MultiCollectionBulkInsertOperationCallback.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
package net.spy.memcached.ops;
1919

2020
public class MultiCollectionBulkInsertOperationCallback extends MultiOperationCallback
21-
implements PipedOperationCallback {
21+
implements MultiKeyPipedOperationCallback {
2222

2323
public MultiCollectionBulkInsertOperationCallback(OperationCallback original, int todo) {
2424
super(original, todo);
2525
}
2626

2727
@Override
28-
public void gotStatus(Integer index, OperationStatus status) {
29-
((PipedOperationCallback) originalCallback).gotStatus(index, status);
28+
public void gotStatus(String key, OperationStatus status) {
29+
((MultiKeyPipedOperationCallback) originalCallback).gotStatus(key, status);
3030
}
3131
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package net.spy.memcached.ops;
2+
3+
public interface MultiKeyPipedOperationCallback extends OperationCallback {
4+
void gotStatus(String key, OperationStatus status);
5+
}

src/main/java/net/spy/memcached/ops/PipedOperationCallback.java renamed to src/main/java/net/spy/memcached/ops/SingleKeyPipedOperationCallback.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package net.spy.memcached.ops;
22

3-
public interface PipedOperationCallback extends OperationCallback {
3+
public interface SingleKeyPipedOperationCallback extends OperationCallback {
44
void gotStatus(Integer index, OperationStatus status);
55
}

src/main/java/net/spy/memcached/protocol/ascii/CollectionBulkInsertOperationImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
/**
2828
* Operation to store collection data in a memcached server.
2929
*/
30-
public final class CollectionBulkInsertOperationImpl extends PipeOperationImpl
30+
public final class CollectionBulkInsertOperationImpl extends MultiKeyPipeOperationImpl
3131
implements CollectionBulkInsertOperation {
3232

3333
public CollectionBulkInsertOperationImpl(CollectionBulkInsert<?> insert, OperationCallback cb) {

src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedExistOperationImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,19 @@
1717
*/
1818
package net.spy.memcached.protocol.ascii;
1919

20-
import java.util.Collections;
21-
2220
import net.spy.memcached.collection.SetPipedExist;
2321
import net.spy.memcached.ops.APIType;
2422
import net.spy.memcached.ops.CollectionPipedExistOperation;
2523
import net.spy.memcached.ops.OperationCallback;
2624
import net.spy.memcached.ops.OperationStatus;
2725
import net.spy.memcached.ops.OperationType;
2826

29-
public final class CollectionPipedExistOperationImpl extends PipeOperationImpl implements
27+
public final class CollectionPipedExistOperationImpl extends SingleKeyPipeOperationImpl implements
3028
CollectionPipedExistOperation {
3129

3230
public CollectionPipedExistOperationImpl(String key,
3331
SetPipedExist<?> collectionExist, OperationCallback cb) {
34-
super(Collections.singletonList(key), collectionExist, cb);
32+
super(key, collectionExist, cb);
3533
setAPIType(APIType.SOP_EXIST);
3634
setOperationType(OperationType.READ);
3735
}

src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
*/
1818
package net.spy.memcached.protocol.ascii;
1919

20-
import java.util.Collections;
21-
2220
import net.spy.memcached.collection.CollectionPipedInsert;
2321
import net.spy.memcached.ops.APIType;
2422
import net.spy.memcached.ops.CollectionPipedInsertOperation;
@@ -29,12 +27,12 @@
2927
/**
3028
* Operation to store collection data in a memcached server.
3129
*/
32-
public final class CollectionPipedInsertOperationImpl extends PipeOperationImpl
30+
public final class CollectionPipedInsertOperationImpl extends SingleKeyPipeOperationImpl
3331
implements CollectionPipedInsertOperation {
3432

3533
public CollectionPipedInsertOperationImpl(String key,
3634
CollectionPipedInsert<?> insert, OperationCallback cb) {
37-
super(Collections.singletonList(key), insert, cb);
35+
super(key, insert, cb);
3836
if (insert instanceof CollectionPipedInsert.ListPipedInsert) {
3937
setAPIType(APIType.LOP_INSERT);
4038
} else if (insert instanceof CollectionPipedInsert.SetPipedInsert) {

src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedUpdateOperationImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
*/
1818
package net.spy.memcached.protocol.ascii;
1919

20-
import java.util.Collections;
21-
2220
import net.spy.memcached.collection.CollectionPipedUpdate;
2321
import net.spy.memcached.collection.CollectionPipedUpdate.BTreePipedUpdate;
2422
import net.spy.memcached.collection.CollectionPipedUpdate.MapPipedUpdate;
@@ -31,12 +29,12 @@
3129
/**
3230
* Operation to update collection data in a memcached server.
3331
*/
34-
public final class CollectionPipedUpdateOperationImpl extends PipeOperationImpl implements
32+
public final class CollectionPipedUpdateOperationImpl extends SingleKeyPipeOperationImpl implements
3533
CollectionPipedUpdateOperation {
3634

3735
public CollectionPipedUpdateOperationImpl(String key,
3836
CollectionPipedUpdate<?> update, OperationCallback cb) {
39-
super(Collections.singletonList(key), update, cb);
37+
super(key, update, cb);
4038
if (update instanceof BTreePipedUpdate) {
4139
setAPIType(APIType.BOP_UPDATE);
4240
} else if (update instanceof MapPipedUpdate) {
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package net.spy.memcached.protocol.ascii;
2+
3+
import java.util.Collection;
4+
import java.util.Collections;
5+
import java.util.List;
6+
7+
import net.spy.memcached.collection.CollectionPipe;
8+
import net.spy.memcached.ops.MultiKeyPipedOperationCallback;
9+
import net.spy.memcached.ops.OperationCallback;
10+
import net.spy.memcached.ops.OperationStatus;
11+
12+
abstract class MultiKeyPipeOperationImpl extends PipeOperationImpl {
13+
protected final List<String> keys;
14+
protected final MultiKeyPipedOperationCallback cb;
15+
16+
protected MultiKeyPipeOperationImpl(List<String> keys, CollectionPipe pipe,
17+
OperationCallback cb) {
18+
super(pipe, cb);
19+
20+
this.keys = keys;
21+
this.cb = (MultiKeyPipedOperationCallback) cb;
22+
}
23+
24+
@Override
25+
protected void gotStatus(OperationStatus status) {
26+
cb.gotStatus(keys.get(index), status);
27+
}
28+
29+
@Override
30+
protected String getKey(Integer index) {
31+
return keys.get(index);
32+
}
33+
34+
public Collection<String> getKeys() {
35+
return Collections.unmodifiableList(keys);
36+
}
37+
}

src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22

33
import java.io.IOException;
44
import java.nio.ByteBuffer;
5-
import java.util.Collection;
6-
import java.util.Collections;
7-
import java.util.List;
85

96
import net.spy.memcached.collection.CollectionBulkInsert;
107
import net.spy.memcached.collection.CollectionPipe;
@@ -16,7 +13,6 @@
1613
import net.spy.memcached.ops.OperationException;
1714
import net.spy.memcached.ops.OperationState;
1815
import net.spy.memcached.ops.OperationStatus;
19-
import net.spy.memcached.ops.PipedOperationCallback;
2016
import net.spy.memcached.ops.StatusCode;
2117

2218
abstract class PipeOperationImpl extends OperationImpl {
@@ -61,21 +57,13 @@ abstract class PipeOperationImpl extends OperationImpl {
6157
protected boolean successAll = true;
6258

6359
private final CollectionPipe collectionPipe;
64-
private final PipedOperationCallback cb;
65-
private final List<String> keys;
6660
private final boolean isIdempotent;
6761

68-
private int index = 0;
62+
protected int index = 0;
6963
private boolean readUntilLastLine = false;
7064

71-
protected PipeOperationImpl(List<String> keys, CollectionPipe collectionPipe,
72-
OperationCallback cb) {
65+
protected PipeOperationImpl(CollectionPipe collectionPipe, OperationCallback cb) {
7366
super(cb);
74-
this.cb = (PipedOperationCallback) cb;
75-
if (keys == null || keys.isEmpty()) {
76-
throw new IllegalArgumentException("No keys provided");
77-
}
78-
this.keys = keys;
7967
this.collectionPipe = collectionPipe;
8068
this.isIdempotent = !(collectionPipe instanceof CollectionPipedInsert.ListPipedInsert ||
8169
collectionPipe instanceof CollectionBulkInsert.ListBulkInsert);
@@ -97,7 +85,7 @@ assert getState() == OperationState.READING
9785
/* ENABLE_MIGRATION if */
9886
if (hasNotMyKey(line)) {
9987
if (isBulkOperation()) {
100-
addRedirectMultiKeyOperation(line, keys.get(index));
88+
addRedirectMultiKeyOperation(line, getKey(index));
10189
if (collectionPipe.isNotPiped()) {
10290
transitionState(OperationState.REDIRECT);
10391
} else {
@@ -106,7 +94,7 @@ assert getState() == OperationState.READING
10694
} else {
10795
// Only one NOT_MY_KEY is provided in response of
10896
// single key piped operation when redirection.
109-
addRedirectSingleKeyOperation(line, keys.get(0));
97+
addRedirectSingleKeyOperation(line, getKey(0));
11098
if (collectionPipe.isNotPiped()) {
11199
transitionState(OperationState.REDIRECT);
112100
} else {
@@ -122,7 +110,7 @@ assert getState() == OperationState.READING
122110
if (!status.isSuccess()) {
123111
successAll = false;
124112
}
125-
cb.gotStatus(index, status);
113+
gotStatus(status);
126114

127115
complete((successAll) ? END : FAILED_END);
128116
return;
@@ -164,12 +152,16 @@ assert getState() == OperationState.READING
164152
if (!status.isSuccess()) {
165153
successAll = false;
166154
}
167-
cb.gotStatus(index, status);
155+
gotStatus(status);
168156

169157
index++;
170158
}
171159
}
172160

161+
protected abstract void gotStatus(OperationStatus status);
162+
163+
protected abstract String getKey(Integer index);
164+
173165
@Override
174166
protected void handleError(OperationErrorType eType, String line) throws IOException {
175167
if (!readUntilLastLine) {
@@ -204,10 +196,6 @@ public void initialize() {
204196
}
205197
}
206198

207-
public Collection<String> getKeys() {
208-
return Collections.unmodifiableList(keys);
209-
}
210-
211199
public CollectionPipe getCollectionPipe() {
212200
return collectionPipe;
213201
}

0 commit comments

Comments
 (0)