Skip to content

Commit 132254c

Browse files
brido4125jhpark816
authored andcommitted
FIX: Send flush op to master node only.
1 parent d79a8d1 commit 132254c

File tree

3 files changed

+23
-6
lines changed

3 files changed

+23
-6
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1902,7 +1902,8 @@ public OperationFuture<Boolean> flush(final String prefix) {
19021902

19031903
@Override
19041904
public OperationFuture<Boolean> flush(final String prefix, final int delay) {
1905-
Collection<MemcachedNode> nodes = getAllNodes();
1905+
Collection<MemcachedNode> nodes = getFlushNodes();
1906+
19061907
final BroadcastFuture<Boolean> rv
19071908
= new BroadcastFuture<>(operationTimeout, Boolean.TRUE, nodes.size());
19081909
final Map<MemcachedNode, Operation> opsMap = new HashMap<>();

src/main/java/net/spy/memcached/ArcusReplKetamaNodeLocator.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.SortedSet;
3333
import java.util.TreeMap;
3434
import java.util.TreeSet;
35+
import java.util.concurrent.ConcurrentHashMap;
3536
import java.util.concurrent.locks.Lock;
3637
import java.util.concurrent.locks.ReentrantLock;
3738

@@ -41,7 +42,7 @@
4142
public final class ArcusReplKetamaNodeLocator extends SpyObject implements NodeLocator {
4243

4344
private final TreeMap<Long, SortedSet<MemcachedReplicaGroup>> ketamaGroups;
44-
private final HashMap<String, MemcachedReplicaGroup> allGroups;
45+
private final ConcurrentHashMap<String, MemcachedReplicaGroup> allGroups;
4546
private final Collection<MemcachedNode> allNodes;
4647

4748
/* ENABLE_MIGRATION if */
@@ -67,7 +68,7 @@ public ArcusReplKetamaNodeLocator(List<MemcachedNode> nodes) {
6768
super();
6869
allNodes = nodes;
6970
ketamaGroups = new TreeMap<>();
70-
allGroups = new HashMap<>();
71+
allGroups = new ConcurrentHashMap<>();
7172

7273
// create all memcached replica group
7374
for (MemcachedNode node : nodes) {
@@ -103,7 +104,7 @@ public ArcusReplKetamaNodeLocator(List<MemcachedNode> nodes) {
103104
}
104105

105106
private ArcusReplKetamaNodeLocator(TreeMap<Long, SortedSet<MemcachedReplicaGroup>> kg,
106-
HashMap<String, MemcachedReplicaGroup> ag,
107+
ConcurrentHashMap<String, MemcachedReplicaGroup> ag,
107108
Collection<MemcachedNode> an) {
108109
super();
109110
ketamaGroups = kg;
@@ -208,7 +209,8 @@ public NodeLocator getReadonlyCopy() {
208209
lock.lock();
209210
try {
210211
TreeMap<Long, SortedSet<MemcachedReplicaGroup>> ketamaCopy = new TreeMap<>();
211-
HashMap<String, MemcachedReplicaGroup> groupsCopy = new HashMap<>(allGroups.size());
212+
ConcurrentHashMap<String, MemcachedReplicaGroup> groupsCopy
213+
= new ConcurrentHashMap<>(allGroups.size());
212214
Collection<MemcachedNode> nodesCopy = new ArrayList<>(allNodes.size());
213215

214216
// Rewrite the values a copy of the map

src/main/java/net/spy/memcached/MemcachedClient.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.concurrent.Future;
4040
import java.util.concurrent.TimeUnit;
4141
import java.util.concurrent.TimeoutException;
42+
import java.util.stream.Collectors;
4243

4344
import net.spy.memcached.auth.AuthDescriptor;
4445
import net.spy.memcached.auth.AuthThreadMonitor;
@@ -1880,7 +1881,8 @@ public void complete() {
18801881
* is too full to accept any more requests
18811882
*/
18821883
public Future<Boolean> flush(final int delay) {
1883-
Collection<MemcachedNode> nodes = getAllNodes();
1884+
Collection<MemcachedNode> nodes = getFlushNodes();
1885+
18841886
final BroadcastFuture<Boolean> rv
18851887
= new BroadcastFuture<>(operationTimeout, Boolean.TRUE, nodes.size());
18861888
final Map<MemcachedNode, Operation> opsMap = new HashMap<>();
@@ -2148,6 +2150,18 @@ protected Collection<MemcachedNode> getAllNodes() {
21482150
return conn.getLocator().getAll();
21492151
}
21502152

2153+
protected Collection<MemcachedNode> getFlushNodes() {
2154+
/* ENABLE_REPLICATION if */
2155+
if (conn.getArcusReplEnabled()) {
2156+
return ((ArcusReplKetamaNodeLocator) getNodeLocator()).getAllGroups().values()
2157+
.stream()
2158+
.map(MemcachedReplicaGroup::getMasterNode)
2159+
.collect(Collectors.toList());
2160+
}
2161+
/* ENABLE_REPLICATION end */
2162+
return conn.getLocator().getAll();
2163+
}
2164+
21512165
/**
21522166
* Turn the list of keys into groups of keys.
21532167
* All keys in a group belong to the same memcached server.

0 commit comments

Comments
 (0)