Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LeaderData.clients has leaking possibility #154

Open
xirc opened this issue Jul 5, 2022 · 0 comments
Open

LeaderData.clients has leaking possibility #154

xirc opened this issue Jul 5, 2022 · 0 comments

Comments

@xirc
Copy link
Contributor

xirc commented Jul 5, 2022

Situation

  1. A leader received a Replicate message from an entity and will be replicating an entry of the message for the entity.
    • The leader register this replication (ClientContext with LogEntryIndex) into LeaderData.clients.
  2. The leader becomes a follower for some reason before it completes the replication.
    • There is another leader at this point.
  3. The new leader completes the replication.
  4. The old leader (a follower now) sends Replica to the entity.
    • The entry (ClientContext with LogEntryIndex) of LeaderData.clients is not removed.

Related source code

  • case FollowedLeaderCommit(leaderMember, leaderCommit) =>
    currentData
    .detectLeaderMember(leaderMember)
    .followLeaderCommit(leaderCommit)
    .applyCommittedLogEntries { logEntries =>
    logEntries.foreach { logEntry =>
    applyToReplicationActor(logEntry)
    }
    }
    case Committed(logEntryIndex) =>
    currentData
    .commit(logEntryIndex)
    .handleCommittedLogEntriesAndClients { entries =>
    entries.foreach {
    case (logEntry, Some(client)) =>
    if (log.isDebugEnabled)
    log.debug("=== [Leader] committed {} and will notify it to {} ===", logEntry, client)
    client.ref.tell(
    ReplicationSucceeded(logEntry.event.event, logEntry.index, client.instanceId),
    client.originSender.getOrElse(ActorRef.noSender),
    )
    case (logEntry, None) =>
    // 復旧中の commit or リーダー昇格時に未コミットのログがあった場合の commit
    applyToReplicationActor(logEntry)
    }
    }
  • def handleCommittedLogEntriesAndClients(handler: Seq[(LogEntry, Option[ClientContext])] => Unit): RaftMemberData = {
    val applicableLogEntries = selectApplicableLogEntries
    handler(applicableLogEntries.map(e => (e, clients.get(e.index))))
    updateVolatileState(lastApplied = applicableLogEntries.lastOption.map(_.index).getOrElse(lastApplied))
    .updateLeaderVolatileState(clients = clients -- applicableLogEntries.map(_.index)) // 通知したクライアントは削除してメモリを節約
    }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant