Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rollback tool for Raft shard #187

Merged
merged 25 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
13d40a5
Reorganize build.sbt
Nov 15, 2022
c93553d
Rollback tool for akka-entity-replication
Nov 15, 2022
b06baa0
Revise CHANGELOG
Nov 15, 2022
56b67fa
Remove unused import
Nov 16, 2022
c058b4f
Merge remote-tracking branch 'origin/master' into rollback-tool
Nov 22, 2022
4904d0e
Merge remote-tracking branch 'origin/master' into rollback-tool
Nov 22, 2022
02ca14b
Wait for persistence plugin initializations before tests
Nov 22, 2022
f0ec088
PersistenceCassandraConfigProvider provides Persistence Cassandra config
Nov 22, 2022
6e14fcd
Merge remote-tracking branch 'origin/typed-cluster-replication-settin…
Nov 22, 2022
11df3a6
Revise LogBack settings for tests
Nov 22, 2022
71383ca
Fix INFO log messages of RaftShardRollback
Nov 22, 2022
c1462f3
Add integration tests for CassandraRaftShardRollback
Nov 22, 2022
f3186ef
Add rollback guide
Nov 22, 2022
555321e
Rename sub-project for rollback tool
Nov 22, 2022
814d62d
Merge branch 'master' into rollback-tool
xirc Nov 28, 2022
43a8d86
Revise README
Nov 28, 2022
e474387
Use enterBarrier timeout instead
Nov 28, 2022
cccfd04
Describe avoiding sending requests to disabled entities
Nov 28, 2022
bfb017c
Merge branch 'master' into rollback-tool
xirc Nov 29, 2022
e019adf
Revise build for akka-entity-replication-rollback-tool-cassandra
Nov 29, 2022
03dab8f
Disable binary compatibility check on rollbackToolCassandra project
Nov 29, 2022
02084ce
Format build.sbt
Nov 29, 2022
7fad100
Set jvmOptions in MultiJvm
Nov 29, 2022
62c695c
Restrict the number of concurrently executing multi-JVM tests in all …
Nov 29, 2022
92751c4
Merge branch 'master' into rollback-tool
negokaz Nov 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ sbt-cache/

# For Windows
# protobridge are installed below directory
/null/
/null/

# For rollback-cassandra
rollback-cassandra/.toDelete
.toDelete
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[PR#173](https://github.com/lerna-stack/akka-entity-replication/pull/173),
[PR#188](https://github.com/lerna-stack/akka-entity-replication/pull/188),
[PR#189](https://github.com/lerna-stack/akka-entity-replication/pull/189)
- Add a rollback tool for Raft shard [PR#187](https://github.com/lerna-stack/akka-entity-replication/pull/187)

### Changed
- Enhance leader's replication response handling [PR#160](https://github.com/lerna-stack/akka-entity-replication/pull/160)
Expand Down
82 changes: 59 additions & 23 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,28 @@ import org.scalafmt.sbt.ScalafmtPlugin.scalafmtConfigSettings

resolvers += "dnvriend" at "https://dl.bintray.com/dnvriend/maven"

lazy val akkaVersion = "2.6.17"
lazy val akkaVersion = "2.6.17"
lazy val akkaPersistenceCassandraVersion = "1.0.5"

ThisBuild / scalaVersion := "2.13.4"
ThisBuild / scalacOptions ++= Seq(
"-feature",
"-unchecked",
"-Xlint",
"-Yrangepos",
"-Ywarn-unused:imports",
)
ThisBuild / scalacOptions ++= sys.props.get("lerna.enable.discipline").map(_ => "-Xfatal-warnings").toSeq
ThisBuild / scalafixScalaBinaryVersion := CrossVersion.binaryScalaVersion(scalaVersion.value)
// https://scalacenter.github.io/scalafix/docs/users/installation.html#sbt
ThisBuild / semanticdbEnabled := true
ThisBuild / semanticdbVersion := scalafixSemanticdb.revision
// doc
ThisBuild / Compile / doc / autoAPIMappings := true
ThisBuild / git.remoteRepo := "[email protected]:lerna-stack/akka-entity-replication.git"
// test coverage
ThisBuild / coverageMinimum := 80
ThisBuild / coverageFailOnMinimum := true

lazy val lerna = (project in file("."))
xirc marked this conversation as resolved.
Show resolved Hide resolved
.enablePlugins(
Expand All @@ -12,23 +33,6 @@ lazy val lerna = (project in file("."))
)
.configs(MultiJvm)
.settings(
inThisBuild(
List(
scalaVersion := "2.13.4",
scalacOptions ++= Seq(
"-feature",
"-unchecked",
"-Xlint",
"-Yrangepos",
"-Ywarn-unused:imports",
),
scalacOptions ++= sys.props.get("lerna.enable.discipline").map(_ => "-Xfatal-warnings").toSeq,
scalafixScalaBinaryVersion := CrossVersion.binaryScalaVersion(scalaVersion.value),
// https://scalacenter.github.io/scalafix/docs/users/installation.html#sbt
semanticdbEnabled := true,
semanticdbVersion := scalafixSemanticdb.revision,
),
),
name := "akka-entity-replication",
fork in Test := true,
parallelExecution in Test := false,
Expand Down Expand Up @@ -68,12 +72,7 @@ lazy val lerna = (project in file("."))
),
),
),
// doc
Compile / doc / autoAPIMappings := true,
git.remoteRepo := "[email protected]:lerna-stack/akka-entity-replication.git",
// test-coverage
coverageMinimum := 80,
coverageFailOnMinimum := true,
coverageExcludedPackages := Seq(
"lerna\\.akka\\.entityreplication\\.protobuf\\.msg\\..*",
).mkString(";"),
Expand All @@ -86,6 +85,43 @@ lazy val lerna = (project in file("."))
mimaReportSignatureProblems := true, // check also generic parameters
)

lazy val rollbackToolCassandra = (project in file("rollback-tool-cassandra"))
.dependsOn(lerna)
.enablePlugins(MultiJvmPlugin)
.configs(MultiJvm)
.settings(
name := "akka-entity-replication-rollback-tool-cassandra",
fork in Test := true,
parallelExecution in Test := false,
javaOptions in Test ++= sbtJavaOptions,
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-query" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-cassandra" % akkaPersistenceCassandraVersion,
"com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % akkaPersistenceCassandraVersion % Test,
"ch.qos.logback" % "logback-classic" % "1.2.3" % Test,
"org.scalatest" %% "scalatest" % "3.0.9" % Test,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion % Test,
"org.scalamock" %% "scalamock" % "5.2.0" % Test,
),
inConfig(MultiJvm)(
scalafmtConfigSettings
++ scalafixConfigSettings(MultiJvm)
++ Seq(
scalatestOptions ++= Seq(
"-u",
(baseDirectory.value) + "/target/multi-jvm-test-reports",
),
),
),
// MiMa
mimaPreviousArtifacts := previousStableVersion.value.map(organization.value %% moduleName.value % _).toSet,
mimaReportSignatureProblems := true, // check also generic parameters
)

/**
* This is used to pass specific system properties (mostly from CI environment variables)
* to the forked process by sbt
Expand Down
176 changes: 176 additions & 0 deletions docs/rollback_guide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# Rollback Guide

## Prerequisites

Rollback requires the following information:

* The Raft shard ID to rollback
* Target UNIX timestamp: the rollback tool will roll back the Raft shard to this timestamp (called the rollback point
below). Use th leader's timestamp if the cluster nodes might have a clock gap (clock out-of-sync).
* The leader's Raft role at the rollback point: you can determine the leader by searching INFO logs.
See [Determine the leader by searching INFO logs](#determine-the-leader-by-searching-info-logs) for details.

All clock synchronization gaps among all nodes should be less than a certain period. By default, these gaps should be
less than 10 seconds. To configure this period, see [Configure the rollback tool](#configure-the-rollback-tool) for
details.

## Rollback Procedures

### 1. Disable the target Raft shard

To ensure that the target Raft shard stops during the rollback, deploy a new configuration to disable the shard.
Use `ClusterReplicationSettings.withDisabledShards` to disable the specific shards.

For example, to disable Raft shard `1` use the following code:

```scala
import akka.actor.typed.ActorSystem
import lerna.akka.entityreplication.typed._

val system: ActorSystem[_] = ???
val clusterReplication = ClusterReplication(system)

val settings =
ClusterReplicationSettings(system)
.withDisabledShards(Set("1"))

val entity = ReplicatedEntity(???)(???).withSettings(settings)
clusterReplication.init(entity)
```
negokaz marked this conversation as resolved.
Show resolved Hide resolved

### 2. Execute rollback for the target shard

`CassandraRaftShardRollback` can roll back the specific Raft shard to the specific UNIX timestamp as below:

```scala
import akka.actor.typed.ActorSystem
import com.typesafe.config._
import java.time.Instant
import lerna.akka.entityreplication.typed._
import lerna.akka.entityreplication.rollback.cassandra._

val system: ActorSystem[_] = ???

val toTimestamp: Instant = ???
val typeName: String = ???
val targetShardId: String = ???
val multiRaftRoles: Set[String] = ClusterReplicationSettings(system).raftSettings.multiRaftRoles
val targetLeaderRaftRole: String = ???

val rollback = CassandraRaftShardRollback(system)
for {
rollbackSetup <- rollback.prepareRollback(
typeName,
targetShardId,
multiRaftRoles,
targetLeaderRaftRole,
toTimestamp,
)
_ <- ??? // review the setup if needed
_ <- rollback.rollback(rollbackSetup)
} yield Done
```

Note that some events are not committed (not replicated to the majority of nodes) after this rollback. These events will
be committed by procedures described in the following sections.

### 3. Enable the target Raft shard and the sticky leader

For replicating events (should be committed, but not yet committed) by the newly elected leader, deploy the following
configurations:

* Enable the sticky leader for the target Raft shard. Use `ClusterReplicationSettings.withStickyLeaders` to enable the
sticky leader for the specific Raft shards.
* Enable the target Raft shard. Use `ClusterReplicationSettings.withDisabledShards` to enable the specific Raft shards.

For example, to set Raft role `replica-group-1` as the sticky leader for Raft shard `1` and enable the Raft shard, use
the following code:

```scala
import akka.actor.typed.ActorSystem
import lerna.akka.entityreplication.typed._

val system: ActorSystem[_] = ???
val clusterReplication = ClusterReplication(system)

val settings =
ClusterReplicationSettings(system)
.withDisabledShards(Set.empty)
.withStickyLeaders(Map("1" -> "replica-group-1"))

val entity = ReplicatedEntity(???)(???).withSettings(settings)
clusterReplication.init(entity)
```

The leader at the rollback point is elected again after this deployment. This election ensures the leader replicates
events that should be committed. Note that you must only deploy with the sticky leader configuration. If you deploy
without the sticky leader, the newly elected leader might truncate events that should be committed, which means some
committed events before the rollback will be lost.

### 4. Disable the sticky leader

After the completion of replication for events that should be committed, deploy a new configuration to disable sticky
leader. Use `ClusterReplicationSettings.withStickyLeaders` to disable the sticky leader for the specific shards.

For example, to disable the sticky leader for Raft shard `1`, use the following code:

```scala
import akka.actor.typed.ActorSystem
import lerna.akka.entityreplication.typed._

val system: ActorSystem[_] = ???
val clusterReplication = ClusterReplication(system)

val settings =
ClusterReplicationSettings(system)
.withStickyLeaders(Map.empty)

val entity = ReplicatedEntity(???)(???).withSettings(settings)
clusterReplication.init(entity)
```

## Configure the rollback tool

There are configuration properties for the rollback tool. Refer
to [reference.conf](../rollback-tool-cassandra/src/main/resources/reference.conf) for details.

You also can configure the rollback tool using the custom configuration object programmatically as below:

```scala
import akka.actor.typed.ActorSystem
import com.typesafe.config._
import lerna.akka.entityreplication.rollback.cassandra._

val customConfig: Config = ConfigFactory.parseString(
"""
|dry-run = false
|log-progress-every = 200
|clock-out-of-sync-tolerance = 20s
|read-parallelism = 2
|write-parallelism = 2
|cassandra.raft-persistence-plugin-location = "custom.akka.persistence.cassandra.plugin"
|cassandra.raft-eventsourced-persistence-plugin-location = "custom.akka.persistence.cassandra.plugin"
|""".stripMargin)

val settings = CassandraRaftShardRollbackSettings(system, customConfig)
val rollback = CassandraRaftShardRollback(system, settings)
```

## Determine the leader by searching INFO logs

You can search INFO logs to determine the leader at the given time point.

Suppose that you have the following leader-election logs (for shard `22`):

```
$ zcat application_*.log.gz | grep -iF 'elected' | awk -F'\t' '{ if ($4 ~ /\/22$/) { print $0 } }'
00:19:57.944 INFO lerna.akka.entityreplication.raft.RaftActor akka://System@*.*.*.*:25520/system/sharding/raft-shard-*-replica-group-1/22/22 [Leader] New leader was elected (term: Term(2), lastLogTerm: Term(0), lastLogIndex: 0)
00:30:09.426 INFO lerna.akka.entityreplication.raft.RaftActor akka://System@*.*.*.*:25520/system/sharding/raft-shard-*-replica-group-3/22/22 [Leader] New leader was elected (term: Term(3), lastLogTerm: Term(2), lastLogIndex: 1662)
00:30:15.165 INFO lerna.akka.entityreplication.raft.RaftActor akka://System@*.*.*.*:25520/system/sharding/raft-shard-*-replica-group-3/22/22 [Leader] New leader was elected (term: Term(6), lastLogTerm: Term(3), lastLogIndex: 1663)
00:30:42.244 INFO lerna.akka.entityreplication.raft.RaftActor akka://System@*.*.*.*:25520/system/sharding/raft-shard-*-replica-group-3/22/22 [Leader] New leader was elected (term: Term(10), lastLogTerm: Term(6), lastLogIndex: 1747)
```

The above logs indicate that:

* The leader's Raft role is `replica-group-3` if you want to roll back to `00:30:10.000`.
* The leader's Raft role is `replica-group-1` if you want to roll back to `00:25:00.000`.
30 changes: 30 additions & 0 deletions rollback-tool-cassandra/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Settings for the Rollback tool
lerna.akka.entityreplication.rollback {
# The rollback tool executes no write operations by default (`true`). Instead, it logs messages at the INFO level.
# Use `false` to execute the write operations (deletes,inserts,updates).
# Note that the tool runs read operations even if this value is true.
dry-run = true

# The rollback tool logs its progress every this number of rollback operations.
# It logs each rollback operation if this settin value is 1.
log-progress-every = 100

# This setting value must be greater than the clock synchronization gap between all Akka nodes.
# If this value is higher, the rollback tool requires more persistence operations.
clock-out-of-sync-tolerance = 10s

# How many read query is executed in parallel
read-parallelism = 1

# How many write query is executed in parallel
write-parallelism = 1

# Full configuration path of Akka Persistence Cassandra plugin to use for
# `lerna.akka.entityreplication.raft.persistence`
cassandra.raft-persistence-plugin-location = "akka.persistence.cassandra"

# Full configuration path of Akka Persistence Cassandra plugin to use for
# `lerna.akka.entityreplication.raft.eventsourced.persistence`
cassandra.raft-eventsourced-persistence-plugin-location = "akka.persistence.cassandra"

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package akka.persistence.cassandra.lerna

import akka.actor.ActorSystem
import akka.persistence.PersistentRepr
import akka.persistence.cassandra.Extractors
import akka.persistence.cassandra.journal.CassandraJournal
import akka.persistence.cassandra.lerna.Extractor.TaggedPersistentRepr
import akka.persistence.query.TimeBasedUUID
import akka.serialization.{ Serialization, SerializationExtension }
import com.datastax.oss.driver.api.core.cql.Row

import scala.concurrent.{ ExecutionContext, Future }

object Extractor {
final case class TaggedPersistentRepr(repr: PersistentRepr, offset: TimeBasedUUID, tags: Set[String])
}

/** Provides extractor from rows to event envelopes
*
* Since it depends on some internal APIs of Akka Persistence Cassandra, this class is under namespace
* `akka.persistence.cassandra`.
*/
final class Extractor(system: ActorSystem) {

private val eventDeserializer: CassandraJournal.EventDeserializer =
new CassandraJournal.EventDeserializer(system)
private val serialization: Serialization =
SerializationExtension(system)
private val extractor: Extractors.Extractor[Extractors.TaggedPersistentRepr] =
Extractors.taggedPersistentRepr(eventDeserializer, serialization)

/** Returns `TaggedPersistentRepr` extracted from the given row
*
* If `async` is true, internal deserialization is executed asynchronously.
*/
def extract(row: Row, async: Boolean)(implicit executionContext: ExecutionContext): Future[TaggedPersistentRepr] = {
extractor
.extract(row, async)
.map { repr =>
TaggedPersistentRepr(repr.pr, TimeBasedUUID(repr.offset), repr.tags)
}
}

}
Loading