Skip to content
This repository has been archived by the owner on Nov 12, 2024. It is now read-only.

Adopt 1-indexed logs. With this change, fix many other dependent issues #75

Merged
merged 6 commits into from
Aug 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 65 additions & 40 deletions src/main/scala/pl/project13/scala/akka/raft/Follower.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,60 @@ private[raft] trait Follower {
sender() ! LeaderIs(recentlyContactedByLeader, Some(msg))
stay()

// start of election

case Event(msg: RequestVote, m: Meta) if msg.term < m.currentTerm =>
log.info("Rejecting RequestVote msg by {} in {}. Received stale {}.", candidate, m.currentTerm, msg.term)
candidate ! DeclineCandidate(m.currentTerm)
stay()

case Event(msg: RequestVote, m: Meta) if msg.term >= m.currentTerm =>
val msgTerm = msg.term
if (m.canVoteIn(msgTerm)) {
log.info("Voting for {} in {}", candidate, msgTerm)
candidate ! VoteCandidate(msgTerm)
stay() using m.withVote(msgTerm, candidate)
// election
case Event(r @ RequestVote(term, candidate, lastLogTerm, lastLogIndex), m: Meta)
if term > m.currentTerm =>
log.info("Received newer {}. Current term is {}.", term, m.currentTerm)
// Let the next case clause deal with it, after we've updated our Term
m.clusterSelf forward r
stay() using m.withTerm(term)

case Event(RequestVote(term, candidate, lastLogTerm, lastLogIndex), m: Meta)
if m.canVoteIn(term) =>

resetElectionDeadline()
// Check if the log is up-to-date before granting vote.
// Raft determines which of two logs is more up-to-date
// by comparing the index and term of the last entries in the
// logs. If the logs have last entries with different terms, then
// the log with the later term is more up-to-date. If the logs
// end with the same term, then whichever log is longer is
// more up-to-date.
if (lastLogTerm < replicatedLog.lastTerm) {
log.info("Rejecting vote for {}, and {}. Candidate's lastLogTerm: {} < ours: {}",
candidate, term, lastLogTerm, replicatedLog.lastTerm)
sender ! DeclineCandidate(m.currentTerm)
stay()
} else if (lastLogTerm == replicatedLog.lastTerm &&
lastLogIndex < replicatedLog.lastIndex) {
log.info("Rejecting vote for {}, and {}. Candidate's lastLogIndex: {} < ours: {}",
candidate, term, lastLogIndex, replicatedLog.lastIndex)
sender ! DeclineCandidate(m.currentTerm)
stay()
} else {
log.info("Rejecting RequestVote msg by {} in {}. Already voted for {}", candidate, msgTerm, m.currentTerm, m.votes.get(msgTerm))
sender ! DeclineCandidate(msgTerm)
stay() using m.withTerm(msgTerm)
log.info("Voting for {} in {}", candidate, term)
sender ! VoteCandidate(m.currentTerm)

stay() using m.withVote(term, candidate)
}

case Event(RequestVote(term, candidateId, lastLogTerm, lastLogIndex), m: Meta) =>
log.info("Rejecting vote for {}, and {}, currentTerm: {}, already voted for: {}", candidate(), term, m.currentTerm, m.votes.get(term))
sender ! DeclineCandidate(m.currentTerm)
stay()

// end of election

// take writes
case Event(msg: AppendEntries[Command], m: Meta) =>
senderIsCurrentLeader()
appendEntries(msg, m)
// First check the consistency of this request
if (!replicatedLog.containsMatchingEntry(msg.prevLogTerm, msg.prevLogIndex)) {
log.info("Rejecting write (inconsistent log): {} {} {} ", msg.prevLogIndex, msg.prevLogTerm, replicatedLog)
leader ! AppendRejected(m.currentTerm)
stay()
} else {
appendEntries(msg, m)
}

// end of take writes

Expand All @@ -61,35 +90,32 @@ private[raft] trait Follower {
implicit val self = m.clusterSelf // todo this is getting pretty crap, revert to having Cluster awareness a trait IMO

if (leaderIsLagging(msg, m)) {
if (msg.isNotHeartbeat) {
log.info("Rejecting write (Leader is lagging) of: " + msg + "; " + replicatedLog)
leader ! AppendRejected(m.currentTerm, replicatedLog.lastIndex) // no need to respond if only heartbeat
}
stay()

} else if (msg.isHeartbeat) {
acceptHeartbeat()

} else {
log.debug("Appending: " + msg.entries)
leader ! append(msg.entries, m)
replicatedLog = commitUntilLeadersIndex(m, msg)

val meta = maybeUpdateConfiguration(m, msg.entries.map(_.command))
val metaWithUpdatedTerm = meta.copy(currentTerm = replicatedLog.lastTerm)
acceptHeartbeat() using metaWithUpdatedTerm
log.info("Rejecting write (Leader is lagging) of: " + msg + "; " + replicatedLog)
leader ! AppendRejected(m.currentTerm)
return stay()
}

senderIsCurrentLeader()

log.debug("Appending: " + msg.entries)
leader ! append(msg.entries, m)
replicatedLog = commitUntilLeadersIndex(m, msg)

val meta = maybeUpdateConfiguration(m, msg.entries.map(_.command))
val metaWithUpdatedTerm = meta.copy(currentTerm = replicatedLog.lastTerm)
acceptHeartbeat() using metaWithUpdatedTerm
}

def leaderIsLagging(msg: AppendEntries[Command], m: Meta): Boolean =
msg.term < m.currentTerm

def append(entries: immutable.Seq[Entry[Command]], m: Meta): AppendSuccessful = {
val atIndex = entries.map(_.index).min
log.debug("executing: replicatedLog = replicatedLog.append({}, {})", entries, atIndex)
if (!entries.isEmpty) {
val atIndex = entries.map(_.index).min
log.debug("executing: replicatedLog = replicatedLog.append({}, {})", entries, atIndex-1)

replicatedLog = replicatedLog.append(entries, atIndex)
// log.debug("log after append: " + replicatedLog.entries)
replicatedLog = replicatedLog.append(entries, take=atIndex-1)
}
AppendSuccessful(replicatedLog.lastTerm, replicatedLog.lastIndex)
}

Expand Down Expand Up @@ -133,5 +159,4 @@ private[raft] trait Follower {
// simply ignore applying cluster configurations onto the client state machine,
// it's an internal thing and the client does not care about cluster config change.
}

}
32 changes: 17 additions & 15 deletions src/main/scala/pl/project13/scala/akka/raft/Leader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ private[raft] trait Leader {

// append entries response handling

case Event(AppendRejected(term, index), m: LeaderMeta) if term > m.currentTerm =>
case Event(AppendRejected(term), m: LeaderMeta) if term > m.currentTerm =>
stopHeartbeat()
stepDown(m) // since there seems to be another leader!

case Event(msg: AppendRejected, m: LeaderMeta) =>
case Event(msg: AppendRejected, m: LeaderMeta) if msg.term == m.currentTerm =>
registerAppendRejected(follower(), msg, m)

case Event(msg: AppendSuccessful, m: LeaderMeta) =>
case Event(msg: AppendSuccessful, m: LeaderMeta) if msg.term == m.currentTerm =>
registerAppendSuccessful(follower(), msg, m)

case Event(RequestConfiguration, m: LeaderMeta) =>
Expand All @@ -91,7 +91,7 @@ private[raft] trait Leader {
m.currentTerm,
replicatedLog,
fromIndex = nextIndex.valueFor(follower),
leaderCommitId = replicatedLog.committedIndex
leaderCommitIdx = replicatedLog.committedIndex
)
}

Expand Down Expand Up @@ -119,17 +119,19 @@ private[raft] trait Leader {
m.currentTerm,
replicatedLog,
fromIndex = nextIndex.valueFor(member),
leaderCommitId = replicatedLog.committedIndex
leaderCommitIdx = replicatedLog.committedIndex
)
}
}

def registerAppendRejected(member: ActorRef, msg: AppendRejected, m: LeaderMeta) = {
val AppendRejected(followerTerm, followerIndex) = msg
val AppendRejected(followerTerm) = msg

log.info("Follower {} rejected write: {} @ {}, back out the first index in this term and retry", follower(), followerTerm, followerIndex)
log.info("Follower {} rejected write: {}, back out the first index in this term and retry", follower(), followerTerm)

nextIndex.putIfSmaller(follower(), followerIndex)
if (nextIndex.valueFor(follower()) > 1) {
nextIndex.decrementFor(follower())
}

// todo think if we send here or keep in heartbeat
sendEntries(follower(), m)
Expand All @@ -140,11 +142,12 @@ private[raft] trait Leader {
def registerAppendSuccessful(member: ActorRef, msg: AppendSuccessful, m: LeaderMeta) = {
val AppendSuccessful(followerTerm, followerIndex) = msg

log.info("Follower {} took write in term: {}, index: {}", follower(), followerTerm, nextIndex.valueFor(follower()))
log.info("Follower {} took write in term: {}, next index was: {}", follower(), followerTerm, nextIndex.valueFor(follower()))

// update our tables for this member
nextIndex.put(follower(), followerIndex)
matchIndex.putIfGreater(follower(), nextIndex.valueFor(follower()))
assert(followerIndex <= replicatedLog.lastIndex)
nextIndex.put(follower(), followerIndex + 1)
matchIndex.putIfGreater(follower(), followerIndex)

replicatedLog = maybeCommitEntry(m, matchIndex, replicatedLog)

Expand All @@ -155,10 +158,9 @@ private[raft] trait Leader {
val indexOnMajority = matchIndex.consensusForIndex(m.config)
val willCommit = indexOnMajority > replicatedLog.committedIndex

if (willCommit) log.info("Consensus for persisted index: {}. (Comitted index: {}, will commit now: {})", indexOnMajority, replicatedLog.committedIndex, willCommit)
else log.info("Consensus for persisted index: {}. (Comitted index: {})", indexOnMajority, replicatedLog.committedIndex)

if (willCommit) {
log.info("Consensus for persisted index: {}. (Comitted index: {}, will commit now: {})", indexOnMajority, replicatedLog.committedIndex, willCommit)

val entries = replicatedLog.between(replicatedLog.committedIndex, indexOnMajority)

entries foreach { entry =>
Expand Down Expand Up @@ -208,4 +210,4 @@ private[raft] trait Leader {
meta
}

}
}
4 changes: 2 additions & 2 deletions src/main/scala/pl/project13/scala/akka/raft/RaftActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ abstract class RaftActor extends Actor with LoggingFSM[RaftState, Metadata]

var replicatedLog = ReplicatedLog.empty[Command](raftConfig.defaultAppendEntriesBatchSize)

var nextIndex = LogIndexMap.initialize(Set.empty, replicatedLog.lastIndex)
var nextIndex = LogIndexMap.initialize(Set.empty, replicatedLog.nextIndex)

var matchIndex = LogIndexMap.initialize(Set.empty, -1)
var matchIndex = LogIndexMap.initialize(Set.empty, 0)

// end of raft member state --------------

Expand Down
48 changes: 30 additions & 18 deletions src/main/scala/pl/project13/scala/akka/raft/model/LogIndexMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ import pl.project13.scala.akka.raft.{StableClusterConfiguration, JointConsensusC
*/
case class LogIndexMap private (private var backing: Map[ActorRef, Int], private val initializeWith: Int) {

def decrementFor(member: ActorRef): Int = backing(member) match {
case 0 => 0
case n =>
val value = n - 1
backing = backing.updated(member, value)
value
def decrementFor(member: ActorRef): Int = {
val value = backing(member) - 1
backing = backing.updated(member, value)
value
}

def incrementFor(member: ActorRef): Int = {
Expand Down Expand Up @@ -62,38 +60,52 @@ case class LogIndexMap private (private var backing: Map[ActorRef, Int], private
val oldQuorum = indexOnMajority(oldMembers)
val newQuorum = indexOnMajority(newMembers)

if (oldQuorum == -1) newQuorum
else if (newQuorum == -1) oldQuorum
if (oldQuorum == 0) newQuorum
else if (newQuorum == 0) oldQuorum
else math.min(oldQuorum, newQuorum)
}

private def indexOnMajority(include: Set[ActorRef]): Int = {
val indexCountPairs = backing
// Our goal is to find the match index e that has the
// following property:
// - a quorum [N / 2 + 1] of the nodes has match index >= e

// We first sort the match indices
val sortedMatchIndices = backing
.filterKeys(include)
.groupBy(_._2)
.map { case (k, m) => k -> m.size }
.values
.toList
.sorted

indexCountPairs match {
case Nil => -1
case _ =>
indexCountPairs.sortBy(- _._2).head // sort by size
._1
if (sortedMatchIndices.isEmpty) {
return 0
}

assert(sortedMatchIndices.size == include.size)

// The element e we are looking is now at the (CEILING[N / 2] - 1)st index.
// [ consider three examples:
// 1,1,1,3,3 => correct value: 1
// 1,1,3,3,3 => correct value: 3
// 1,1,3,3 => correct value: 1
// ]
return sortedMatchIndices(LogIndexMap.ceiling(include.size, 2) - 1)
}

@tailrec final def valueFor(member: ActorRef): Int = backing.get(member) match {
case None =>
backing = backing.updated(member, initializeWith)
valueFor(member)
case Some(value) =>
value
}

}

object LogIndexMap {
def initialize(members: Set[ActorRef], initializeWith: Int) =
new LogIndexMap(Map(members.toList.map(_ -> initializeWith): _*), initializeWith)

def ceiling(numerator: Int, divisor: Int): Int = {
if (numerator % divisor == 0) numerator / divisor else (numerator / divisor) + 1
}
}
Loading