Skip to content
This repository has been archived by the owner on Nov 12, 2024. It is now read-only.

Commit

Permalink
Merge pull request #75 from colin-scott/1-indexing
Browse files Browse the repository at this point in the history
Adopt 1-indexed logs. With this change, fix many other dependent issues
  • Loading branch information
ktoso committed Aug 14, 2015
2 parents 4463499 + ffdd20d commit 2f1959f
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 164 deletions.
105 changes: 65 additions & 40 deletions src/main/scala/pl/project13/scala/akka/raft/Follower.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,60 @@ private[raft] trait Follower {
sender() ! LeaderIs(recentlyContactedByLeader, Some(msg))
stay()

// start of election

case Event(msg: RequestVote, m: Meta) if msg.term < m.currentTerm =>
log.info("Rejecting RequestVote msg by {} in {}. Received stale {}.", candidate, m.currentTerm, msg.term)
candidate ! DeclineCandidate(m.currentTerm)
stay()

case Event(msg: RequestVote, m: Meta) if msg.term >= m.currentTerm =>
val msgTerm = msg.term
if (m.canVoteIn(msgTerm)) {
log.info("Voting for {} in {}", candidate, msgTerm)
candidate ! VoteCandidate(msgTerm)
stay() using m.withVote(msgTerm, candidate)
// election
case Event(r @ RequestVote(term, candidate, lastLogTerm, lastLogIndex), m: Meta)
if term > m.currentTerm =>
log.info("Received newer {}. Current term is {}.", term, m.currentTerm)
// Let the next case clause deal with it, after we've updated our Term
m.clusterSelf forward r
stay() using m.withTerm(term)

case Event(RequestVote(term, candidate, lastLogTerm, lastLogIndex), m: Meta)
if m.canVoteIn(term) =>

resetElectionDeadline()
// Check if the log is up-to-date before granting vote.
// Raft determines which of two logs is more up-to-date
// by comparing the index and term of the last entries in the
// logs. If the logs have last entries with different terms, then
// the log with the later term is more up-to-date. If the logs
// end with the same term, then whichever log is longer is
// more up-to-date.
if (lastLogTerm < replicatedLog.lastTerm) {
log.info("Rejecting vote for {}, and {}. Candidate's lastLogTerm: {} < ours: {}",
candidate, term, lastLogTerm, replicatedLog.lastTerm)
sender ! DeclineCandidate(m.currentTerm)
stay()
} else if (lastLogTerm == replicatedLog.lastTerm &&
lastLogIndex < replicatedLog.lastIndex) {
log.info("Rejecting vote for {}, and {}. Candidate's lastLogIndex: {} < ours: {}",
candidate, term, lastLogIndex, replicatedLog.lastIndex)
sender ! DeclineCandidate(m.currentTerm)
stay()
} else {
log.info("Rejecting RequestVote msg by {} in {}. Already voted for {}", candidate, msgTerm, m.currentTerm, m.votes.get(msgTerm))
sender ! DeclineCandidate(msgTerm)
stay() using m.withTerm(msgTerm)
log.info("Voting for {} in {}", candidate, term)
sender ! VoteCandidate(m.currentTerm)

stay() using m.withVote(term, candidate)
}

case Event(RequestVote(term, candidateId, lastLogTerm, lastLogIndex), m: Meta) =>
log.info("Rejecting vote for {}, and {}, currentTerm: {}, already voted for: {}", candidate(), term, m.currentTerm, m.votes.get(term))
sender ! DeclineCandidate(m.currentTerm)
stay()

// end of election

// take writes
case Event(msg: AppendEntries[Command], m: Meta) =>
senderIsCurrentLeader()
appendEntries(msg, m)
// First check the consistency of this request
if (!replicatedLog.containsMatchingEntry(msg.prevLogTerm, msg.prevLogIndex)) {
log.info("Rejecting write (inconsistent log): {} {} {} ", msg.prevLogIndex, msg.prevLogTerm, replicatedLog)
leader ! AppendRejected(m.currentTerm)
stay()
} else {
appendEntries(msg, m)
}

// end of take writes

Expand All @@ -61,35 +90,32 @@ private[raft] trait Follower {
implicit val self = m.clusterSelf // todo this is getting pretty crap, revert to having Cluster awareness a trait IMO

if (leaderIsLagging(msg, m)) {
if (msg.isNotHeartbeat) {
log.info("Rejecting write (Leader is lagging) of: " + msg + "; " + replicatedLog)
leader ! AppendRejected(m.currentTerm, replicatedLog.lastIndex) // no need to respond if only heartbeat
}
stay()

} else if (msg.isHeartbeat) {
acceptHeartbeat()

} else {
log.debug("Appending: " + msg.entries)
leader ! append(msg.entries, m)
replicatedLog = commitUntilLeadersIndex(m, msg)

val meta = maybeUpdateConfiguration(m, msg.entries.map(_.command))
val metaWithUpdatedTerm = meta.copy(currentTerm = replicatedLog.lastTerm)
acceptHeartbeat() using metaWithUpdatedTerm
log.info("Rejecting write (Leader is lagging) of: " + msg + "; " + replicatedLog)
leader ! AppendRejected(m.currentTerm)
return stay()
}

senderIsCurrentLeader()

log.debug("Appending: " + msg.entries)
leader ! append(msg.entries, m)
replicatedLog = commitUntilLeadersIndex(m, msg)

val meta = maybeUpdateConfiguration(m, msg.entries.map(_.command))
val metaWithUpdatedTerm = meta.copy(currentTerm = replicatedLog.lastTerm)
acceptHeartbeat() using metaWithUpdatedTerm
}

def leaderIsLagging(msg: AppendEntries[Command], m: Meta): Boolean =
msg.term < m.currentTerm

def append(entries: immutable.Seq[Entry[Command]], m: Meta): AppendSuccessful = {
val atIndex = entries.map(_.index).min
log.debug("executing: replicatedLog = replicatedLog.append({}, {})", entries, atIndex)
if (!entries.isEmpty) {
val atIndex = entries.map(_.index).min
log.debug("executing: replicatedLog = replicatedLog.append({}, {})", entries, atIndex-1)

replicatedLog = replicatedLog.append(entries, atIndex)
// log.debug("log after append: " + replicatedLog.entries)
replicatedLog = replicatedLog.append(entries, take=atIndex-1)
}
AppendSuccessful(replicatedLog.lastTerm, replicatedLog.lastIndex)
}

Expand Down Expand Up @@ -133,5 +159,4 @@ private[raft] trait Follower {
// simply ignore applying cluster configurations onto the client state machine,
// it's an internal thing and the client does not care about cluster config change.
}

}
32 changes: 17 additions & 15 deletions src/main/scala/pl/project13/scala/akka/raft/Leader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ private[raft] trait Leader {

// append entries response handling

case Event(AppendRejected(term, index), m: LeaderMeta) if term > m.currentTerm =>
case Event(AppendRejected(term), m: LeaderMeta) if term > m.currentTerm =>
stopHeartbeat()
stepDown(m) // since there seems to be another leader!

case Event(msg: AppendRejected, m: LeaderMeta) =>
case Event(msg: AppendRejected, m: LeaderMeta) if msg.term == m.currentTerm =>
registerAppendRejected(follower(), msg, m)

case Event(msg: AppendSuccessful, m: LeaderMeta) =>
case Event(msg: AppendSuccessful, m: LeaderMeta) if msg.term == m.currentTerm =>
registerAppendSuccessful(follower(), msg, m)

case Event(RequestConfiguration, m: LeaderMeta) =>
Expand All @@ -91,7 +91,7 @@ private[raft] trait Leader {
m.currentTerm,
replicatedLog,
fromIndex = nextIndex.valueFor(follower),
leaderCommitId = replicatedLog.committedIndex
leaderCommitIdx = replicatedLog.committedIndex
)
}

Expand Down Expand Up @@ -119,17 +119,19 @@ private[raft] trait Leader {
m.currentTerm,
replicatedLog,
fromIndex = nextIndex.valueFor(member),
leaderCommitId = replicatedLog.committedIndex
leaderCommitIdx = replicatedLog.committedIndex
)
}
}

def registerAppendRejected(member: ActorRef, msg: AppendRejected, m: LeaderMeta) = {
val AppendRejected(followerTerm, followerIndex) = msg
val AppendRejected(followerTerm) = msg

log.info("Follower {} rejected write: {} @ {}, back out the first index in this term and retry", follower(), followerTerm, followerIndex)
log.info("Follower {} rejected write: {}, back out the first index in this term and retry", follower(), followerTerm)

nextIndex.putIfSmaller(follower(), followerIndex)
if (nextIndex.valueFor(follower()) > 1) {
nextIndex.decrementFor(follower())
}

// todo think if we send here or keep in heartbeat
sendEntries(follower(), m)
Expand All @@ -140,11 +142,12 @@ private[raft] trait Leader {
def registerAppendSuccessful(member: ActorRef, msg: AppendSuccessful, m: LeaderMeta) = {
val AppendSuccessful(followerTerm, followerIndex) = msg

log.info("Follower {} took write in term: {}, index: {}", follower(), followerTerm, nextIndex.valueFor(follower()))
log.info("Follower {} took write in term: {}, next index was: {}", follower(), followerTerm, nextIndex.valueFor(follower()))

// update our tables for this member
nextIndex.put(follower(), followerIndex)
matchIndex.putIfGreater(follower(), nextIndex.valueFor(follower()))
assert(followerIndex <= replicatedLog.lastIndex)
nextIndex.put(follower(), followerIndex + 1)
matchIndex.putIfGreater(follower(), followerIndex)

replicatedLog = maybeCommitEntry(m, matchIndex, replicatedLog)

Expand All @@ -155,10 +158,9 @@ private[raft] trait Leader {
val indexOnMajority = matchIndex.consensusForIndex(m.config)
val willCommit = indexOnMajority > replicatedLog.committedIndex

if (willCommit) log.info("Consensus for persisted index: {}. (Comitted index: {}, will commit now: {})", indexOnMajority, replicatedLog.committedIndex, willCommit)
else log.info("Consensus for persisted index: {}. (Comitted index: {})", indexOnMajority, replicatedLog.committedIndex)

if (willCommit) {
log.info("Consensus for persisted index: {}. (Comitted index: {}, will commit now: {})", indexOnMajority, replicatedLog.committedIndex, willCommit)

val entries = replicatedLog.between(replicatedLog.committedIndex, indexOnMajority)

entries foreach { entry =>
Expand Down Expand Up @@ -208,4 +210,4 @@ private[raft] trait Leader {
meta
}

}
}
4 changes: 2 additions & 2 deletions src/main/scala/pl/project13/scala/akka/raft/RaftActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ abstract class RaftActor extends Actor with LoggingFSM[RaftState, Metadata]

var replicatedLog = ReplicatedLog.empty[Command](raftConfig.defaultAppendEntriesBatchSize)

var nextIndex = LogIndexMap.initialize(Set.empty, replicatedLog.lastIndex)
var nextIndex = LogIndexMap.initialize(Set.empty, replicatedLog.nextIndex)

var matchIndex = LogIndexMap.initialize(Set.empty, -1)
var matchIndex = LogIndexMap.initialize(Set.empty, 0)

// end of raft member state --------------

Expand Down
48 changes: 30 additions & 18 deletions src/main/scala/pl/project13/scala/akka/raft/model/LogIndexMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ import pl.project13.scala.akka.raft.{StableClusterConfiguration, JointConsensusC
*/
case class LogIndexMap private (private var backing: Map[ActorRef, Int], private val initializeWith: Int) {

def decrementFor(member: ActorRef): Int = backing(member) match {
case 0 => 0
case n =>
val value = n - 1
backing = backing.updated(member, value)
value
def decrementFor(member: ActorRef): Int = {
val value = backing(member) - 1
backing = backing.updated(member, value)
value
}

def incrementFor(member: ActorRef): Int = {
Expand Down Expand Up @@ -62,38 +60,52 @@ case class LogIndexMap private (private var backing: Map[ActorRef, Int], private
val oldQuorum = indexOnMajority(oldMembers)
val newQuorum = indexOnMajority(newMembers)

if (oldQuorum == -1) newQuorum
else if (newQuorum == -1) oldQuorum
if (oldQuorum == 0) newQuorum
else if (newQuorum == 0) oldQuorum
else math.min(oldQuorum, newQuorum)
}

private def indexOnMajority(include: Set[ActorRef]): Int = {
val indexCountPairs = backing
// Our goal is to find the match index e that has the
// following property:
// - a quorum [N / 2 + 1] of the nodes has match index >= e

// We first sort the match indices
val sortedMatchIndices = backing
.filterKeys(include)
.groupBy(_._2)
.map { case (k, m) => k -> m.size }
.values
.toList
.sorted

indexCountPairs match {
case Nil => -1
case _ =>
indexCountPairs.sortBy(- _._2).head // sort by size
._1
if (sortedMatchIndices.isEmpty) {
return 0
}

assert(sortedMatchIndices.size == include.size)

// The element e we are looking is now at the (CEILING[N / 2] - 1)st index.
// [ consider three examples:
// 1,1,1,3,3 => correct value: 1
// 1,1,3,3,3 => correct value: 3
// 1,1,3,3 => correct value: 1
// ]
return sortedMatchIndices(LogIndexMap.ceiling(include.size, 2) - 1)
}

@tailrec final def valueFor(member: ActorRef): Int = backing.get(member) match {
case None =>
backing = backing.updated(member, initializeWith)
valueFor(member)
case Some(value) =>
value
}

}

object LogIndexMap {
def initialize(members: Set[ActorRef], initializeWith: Int) =
new LogIndexMap(Map(members.toList.map(_ -> initializeWith): _*), initializeWith)

def ceiling(numerator: Int, divisor: Int): Int = {
if (numerator % divisor == 0) numerator / divisor else (numerator / divisor) + 1
}
}
Loading

0 comments on commit 2f1959f

Please sign in to comment.