Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leader step down when removing itself #248

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion doc/manual/using.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,20 @@ The steps to add a member are as follows (say we have `RAFT.members="A,B,C"` and
* A new member `D` can now be started (its XML config needs to have the correct `members` attribute !)

Notice that membership changes survive through restarts. If a node must be removed or added, an operation must be
submitted, only restarting does not affect membership.
submitted, only shutting down the node does not affect membership.

Contrary to the reconfiguration defined in the Raft article, jgroups-raft only applies the membership change after the operation is committed.
This does not violate safety, as there is still an intersection between the old and new configuration.

WARNING: Currently, there are no safeguards to guarantee the quorum is maintained after a node is removed.
It is advised to check the leader's view before applying a removal operation until safeguards are included.

==== Removing the leader

The leader can self-remove from the configuration.
After the operation is committed and the leader applies it to the state machine, the leader will step down and trigger a new election.
This causes any enqueued or outstanding request to be completed exceptionally since the leader transitions to a follower.

Removing the leader in a two-node cluster and the leader crashes before the second node applies the operation to the state machine renders the cluster unavailable.
The same problem applies to any other operation when the cluster has less than a majority of nodes available.
The recommendation is to utilize more nodes in the cluster.
3 changes: 3 additions & 0 deletions src/org/jgroups/protocols/raft/InternalCommand.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.jgroups.protocols.raft;

import org.jgroups.protocols.raft.election.BaseElection;
import org.jgroups.util.Bits;
import org.jgroups.util.Streamable;

Expand Down Expand Up @@ -43,6 +44,8 @@ public Object execute(RAFT raft) throws Exception {
break;
case removeServer:
raft._removeServer(name);
BaseElection be = raft.getProtocolStack().findProtocol(BaseElection.class);
if (be != null) be.raftServerRemoved(name);
break;
}
return null;
Expand Down
6 changes: 6 additions & 0 deletions src/org/jgroups/protocols/raft/Leader.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public void handleAppendEntriesResponse(Address sender, long term, AppendResult
log.trace("%s: received AppendEntries response from %s for term %d: %s", raft.getAddress(), sender, term, result);
switch(result.result) {
case OK:
// Make sure that non-raft members do not count towards majority.
if (!raft.isRaftMember(sender)) {
if (log.isDebugEnabled()) log.debug("%s: dropping vote of non-member %s/%s", raft.getAddress(), sender, raft.members());
break;
}

raft.commit_table.update(sender, result.index(), result.index() + 1, result.commit_index, false);
boolean done = reqtab.add(result.index, sender_raft_id, this.majority);
if(done) {
Expand Down
10 changes: 10 additions & 0 deletions src/org/jgroups/protocols/raft/PersistentState.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ public List<String> getMembers() {
return new ArrayList<>(members);
}

/**
* Verify if the given member is a member of the current members list.
*
* @param raftId: The Raft ID to verify.
* @return <code>true</code> if the Raft ID is a current member. <code>false</code>, otherwise.
*/
public boolean containsMember(String raftId) {
return members.contains(raftId);
}

public void setMembers(Collection<String> value) {
members.clear();
members.addAll(new HashSet<>(value));
Expand Down
13 changes: 13 additions & 0 deletions src/org/jgroups/protocols/raft/RAFT.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.jgroups.raft.util.CommitTable;
import org.jgroups.raft.util.LogCache;
import org.jgroups.raft.util.RequestTable;
import org.jgroups.raft.util.Utils;
import org.jgroups.stack.Protocol;
import org.jgroups.util.*;

Expand Down Expand Up @@ -1072,6 +1073,18 @@ protected void snapshotIfNeeded(int bytes_added) {
}
}

/**
* Verify if the node identified by the given address is a Raft member.
*
* @param address: The node address to verify.
* @return <code>true</code> if the address is present in the current members. <code>false</code>, otherwise.
* @see Utils#extractRaftId(Address)
*/
public boolean isRaftMember(Address address) {
String raftId = Utils.extractRaftId(address);
return raftId != null && internal_state.containsMember(raftId);
}

/**
* Applies log entries [commit_index+1 .. to_inclusive] to the state machine and notifies clients in RequestTable.
* @param to_inclusive The end index (inclusive) of the log entries to apply
Expand Down
32 changes: 31 additions & 1 deletion src/org/jgroups/protocols/raft/election/BaseElection.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@
import org.jgroups.protocols.raft.LogEntry;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.protocols.raft.RaftHeader;
import org.jgroups.raft.util.Utils;
import org.jgroups.stack.Protocol;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.ResponseCollector;
import org.jgroups.util.Runner;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static org.jgroups.Message.Flag.OOB;
import static org.jgroups.Message.TransientFlag.DONT_LOOPBACK;
Expand Down Expand Up @@ -268,7 +272,7 @@ protected boolean isHigher(long last_term, long last_index) {
protected void runVotingProcess() {
long new_term=raft.createNewTerm();
raft.votedFor(null);
votes.reset(view.getMembersRaw());
votes.reset(findRaftMembersInView(view));
num_voting_rounds++;
long start=System.currentTimeMillis();
sendVoteRequest(new_term);
Expand All @@ -291,6 +295,32 @@ protected void runVotingProcess() {
local_addr, votes.getValidResults(), time, majority);
}

public void raftServerRemoved(String raftId) {
// I am the leader and I removed myself.
// I should step down and start the election thread without myself.
if (raftId.equals(raft.raftId()) && raft.isLeader()) {
log.debug("%s: removed myself as leader (%s), starting voting thread", local_addr, raftId);
raft.setLeaderAndTerm(null);
startVotingThread();
}
}

private Address[] findRaftMembersInView(View view) {
Set<Address> mbrs = new HashSet<>();
List<String> currentMembers = raft.members();
for(Address address : view) {
String raftId = Utils.extractRaftId(address);
if (raftId == null) {
log.warn("%s: raft-id not found for address %s (%s)", local_addr, address, address.getClass());
continue;
}

if (currentMembers.contains(raftId))
mbrs.add(address);
}
return mbrs.toArray(Address[]::new);
}

public synchronized BaseElection startVotingThread() {
if(!isVotingThreadRunning())
voting_thread.start();
Expand Down
22 changes: 22 additions & 0 deletions src/org/jgroups/raft/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import org.jgroups.View;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.raft.testfwk.RaftTestUtils;
import org.jgroups.util.ExtendedUUID;
import org.jgroups.util.Util;

/**
* @author Bela Ban
Expand Down Expand Up @@ -47,4 +49,24 @@ public static void deleteLog(RAFT r) throws Exception {
RaftTestUtils.deleteRaftLog(r);
}

/**
* Extract the Raft ID from the given address instance.
* <p>
* During creation, the node embed it's ID in the address. It needs to be an instance of
* {@link ExtendedUUID} to hold the information.
* </p>
*
* @param address: The {@link Address} to extract the Raft ID.
* @return The Raft ID embedded in the address, or <code>null</code>, otherwise.
*/
public static String extractRaftId(Address address) {
if (!(address instanceof ExtendedUUID))
return null;

ExtendedUUID uuid = (ExtendedUUID) address;
byte[] value = uuid.get(RAFT.raft_id_key);
return value == null
? null
: Util.bytesToString(value);
}
}
Loading