Skip to content

Commit ce2f9fc

Browse files
committed
FIX: Handling abnormal swithover from zk.
1 parent 99a2dda commit ce2f9fc

File tree

2 files changed

+42
-1
lines changed

2 files changed

+42
-1
lines changed

src/main/java/net/spy/memcached/MemcachedConnection.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,17 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
471471
Set<ArcusReplNodeAddress> newSlaveAddrs = getSlaveAddrsFromGroupAddrs(newGroupAddrs);
472472

473473
if (oldMasterAddr.isSameAddress(newMasterAddr)) {
474+
if (oldGroup.getMasterCandidate() != null) {
475+
/**
476+
* Handling the below case.
477+
* old group : [oldMaster, oldSlave]
478+
* old group after switchover response : [oldMaster, oldSlave-masterCandidate]
479+
* new group from zk cache list: [oldMaster, X]
480+
*/
481+
taskList.add(new MoveOperationTask(oldGroup.getMasterCandidate(),
482+
oldMasterNode, false));
483+
oldGroup.clearMasterCandidate();
484+
}
474485
// add newly added slave node
475486
for (ArcusReplNodeAddress newSlaveAddr : newSlaveAddrs) {
476487
if (!oldSlaveAddrs.contains(newSlaveAddr)) {
@@ -490,8 +501,25 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
490501
} else if (oldSlaveAddrs.contains(newMasterAddr)) {
491502
if (newSlaveAddrs.contains(oldMasterAddr)) {
492503
// Switchover
493-
if (oldGroup.getMasterCandidate() != null) {
504+
MemcachedNode oldMasterCandidate = oldGroup.getMasterCandidate();
505+
if (oldMasterCandidate != null) {
506+
ArcusReplNodeAddress masterFromZk = (ArcusReplNodeAddress) oldGroup
507+
.getSlaveNodeBy(newMasterAddr.getIPPort()).getSocketAddress();
494508
changeRoleGroups.add(oldGroup);
509+
if (!masterFromZk.isSameAddress(
510+
((ArcusReplNodeAddress) oldMasterCandidate.getSocketAddress()))) {
511+
/**
512+
* Moves ops from oldMasterCandidate set by cache server to newMasterCandidate.
513+
* Handling the below case.
514+
* old group : [oldMaster, oldSlave1, oldSlave2]
515+
* old group after switchover response :
516+
* [oldMaster, oldSlave1-masterCandidate, oldSlave2]
517+
* new group from zk cache list: [slave1, X, newMaster]
518+
*/
519+
oldGroup.setMasterCandidateByAddr(newMasterAddr.getIPPort());
520+
taskList.add(new MoveOperationTask(
521+
oldMasterCandidate, oldGroup.getMasterCandidate(), false));
522+
}
495523
} else {
496524
// ZK event occurs before cache server response.
497525
oldGroup.setMasterCandidateByAddr(newMasterAddr.getIPPort());

src/main/java/net/spy/memcached/MemcachedReplicaGroup.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ public void setMasterCandidate() {
8686
}
8787
}
8888

89+
public void clearMasterCandidate() {
90+
this.masterCandidate = null;
91+
}
92+
8993
public void setMasterCandidateByAddr(String address) {
9094
for (MemcachedNode node : this.getSlaveNodes()) {
9195
if (address.equals(((ArcusReplNodeAddress) node.getSocketAddress()).getIPPort())) {
@@ -181,5 +185,14 @@ private MemcachedNode getNextActiveSlaveNodeNoRotate() {
181185
public static String getGroupNameFromNode(final MemcachedNode node) {
182186
return ((ArcusReplNodeAddress) node.getSocketAddress()).getGroupName();
183187
}
188+
189+
public MemcachedNode getSlaveNodeBy(String address) {
190+
for (MemcachedNode node : this.getSlaveNodes()) {
191+
if (address.equals(((ArcusReplNodeAddress) node.getSocketAddress()).getIPPort())) {
192+
return node;
193+
}
194+
}
195+
return null;
196+
}
184197
}
185198
/* ENABLE_REPLICATION end */

0 commit comments

Comments
 (0)