Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rollback preparation fails if required data have been deleted #210

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `CassandraPersistenceQueries.currentEventsBefore` skips deleted partitions
[#208](https://github.com/lerna-stack/akka-entity-replication/issues/208),
[PR#209](https://github.com/lerna-stack/akka-entity-replication/pull/209)
- Rollback preparation fails if required data have been deleted
[#206](https://github.com/lerna-stack/akka-entity-replication/issues/206),
[PR#210](https://github.com/lerna-stack/akka-entity-replication/pull/210)

### Fixed
- Dead letters about ReplicationRegion$Passivate continue
Expand Down
8 changes: 3 additions & 5 deletions docs/rollback_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ details.
## WARNING

`akka-entity-replication` (v.2.3.0 or later) supports deleting old events and snapshots. Once events or snapshots
are deleted, a rollback to a timestamp that requires such deleted events or snapshots is impossible. At the time of
writing, the rollback tool can't detect such deletions yet. If such a timestamp is specified, the rollback tool will
delete all events and snapshots of the target Raft shard, or persistent actors of the target Raft shard will be
inconsistent state. Please ensure that a rollback is possible by inspecting data stores if either event or snapshot
deletion is enabled.
have been deleted, a rollback to a timestamp that requires such deleted events or snapshots is impossible. The rollback
tool can detect such deletions. If such a timestamp is specified, the rollback tool fails during preparation and doesn't
issue any deletion of data for the rollback.

## Rollback Procedures

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# PersistenceQueries#TaggedEventEnvelope is private:
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.rollback.PersistenceQueries#TaggedEventEnvelope.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.akka.entityreplication.rollback.PersistenceQueries#TaggedEventEnvelope.copy$default$5")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.akka.entityreplication.rollback.PersistenceQueries#TaggedEventEnvelope.copy$default$6")
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.rollback.PersistenceQueries#TaggedEventEnvelope.this")
ProblemFilters.exclude[MissingTypesProblem]("lerna.akka.entityreplication.rollback.PersistenceQueries$TaggedEventEnvelope$")
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.rollback.PersistenceQueries#TaggedEventEnvelope.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("lerna.akka.entityreplication.rollback.PersistenceQueries#TaggedEventEnvelope.unapply")

# PersistentActorRollback is private:
ProblemFilters.exclude[ReversedMissingMethodProblem]("lerna.akka.entityreplication.rollback.PersistentActorRollback.findRollbackRequirements")

# RaftEventSourcedPersistence is private:
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.rollback.RaftEventSourcedPersistence.this")

# RaftPersistence is private:
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.rollback.RaftPersistence.this")

# CassandraSnapshotSettings is private:
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.rollback.cassandra.CassandraSnapshotSettings.this")
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package lerna.akka.entityreplication.rollback

import akka.Done
import akka.actor.{ ActorSystem, ClassicActorSystemProvider }

import java.time.Instant
import scala.concurrent.Future

/** @inheritdoc */
private final class DefaultRollbackRequirementsVerifier(
systemProvider: ClassicActorSystemProvider,
rollback: PersistentActorRollback,
timestampHintFinder: RollbackTimestampHintFinder,
) extends RollbackRequirementsVerifier {

private implicit val system: ActorSystem =
systemProvider.classicSystem

import system.dispatcher

/** @inheritdoc */
override def verify(
persistenceId: String,
toSequenceNr: Option[SequenceNr],
toTimestampHintOpt: Option[Instant],
): Future[Done] = {
rollback.findRollbackRequirements(persistenceId).flatMap { requirements =>
assert(requirements.persistenceId == persistenceId)
toSequenceNr match {
case Some(sequenceNr) =>
if (sequenceNr >= requirements.lowestSequenceNr) {
Future.successful(Done)
} else {
tryFindRollbackTimestampHint(requirements, toTimestampHintOpt)
.flatMap { rollbackTimestampHintOpt =>
Future.failed(
new RollbackRequirementsNotFulfilled(
s"Rollback to sequence number [${sequenceNr.value}] for the persistent actor [${persistenceId}] is impossible" +
s" since the sequence number should be greater than or equal to [${requirements.lowestSequenceNr.value}]." +
rollbackTimestampHintOpt.fold("")(" " + _),
),
)
}
}
case None =>
if (requirements.lowestSequenceNr == SequenceNr(1)) {
Future.successful(Done)
} else {
tryFindRollbackTimestampHint(requirements, toTimestampHintOpt)
.flatMap { rollbackTimestampHintOpt =>
Future.failed(
new RollbackRequirementsNotFulfilled(
s"Deleting all data for the persistent actor [${persistenceId}] is impossible" +
" since already deleted events might contain required data for consistency with other persistent actors." +
rollbackTimestampHintOpt.fold("")(" " + _),
),
)
}
}
}
}
}

private def tryFindRollbackTimestampHint(
requirements: PersistentActorRollback.RollbackRequirements,
toTimestampHintOpt: Option[Instant],
): Future[Option[String]] = {
toTimestampHintOpt match {
case Some(toTimestamp) =>
timestampHintFinder
.findTimestampHint(requirements)
.transform { hintOrError =>
hintOrError
.map { hint =>
s"Hint: rollback timestamp [$toTimestamp] should be newer than timestamp [${hint.timestamp}] of sequence number [${hint.sequenceNr.value}], at least."
}.recover { cause =>
s"Hint: not available due to [${cause.getClass.getCanonicalName}]: ${cause.getMessage}]."
}
.map(Option.apply)
}
case None =>
// Not needed for non-timestamp-based rollback.
Future.successful(None)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package lerna.akka.entityreplication.rollback
import akka.actor.{ ActorSystem, ClassicActorSystemProvider }
import akka.stream.scaladsl.Sink

import java.time.Instant
import scala.concurrent.Future

/** @inheritdoc */
private final class LinearRollbackTimestampHintFinder(
systemProvider: ClassicActorSystemProvider,
queries: PersistenceQueries,
) extends RollbackTimestampHintFinder {

private implicit val system: ActorSystem =
systemProvider.classicSystem

import system.dispatcher

/** @inheritdoc */
override def findTimestampHint(
requirements: PersistentActorRollback.RollbackRequirements,
): Future[RollbackTimestampHintFinder.TimestampHint] = {
// NOTE: In most cases, an event with `lowestSequenceNr` or `lowestSequenceNr+1` exists.
// TODO Search a timestamp hint from snapshots since the persistent actor can delete the event with `lowestSequenceNr`.
queries
.currentEventsAfter(requirements.persistenceId, requirements.lowestSequenceNr)
.runWith(Sink.headOption)
.flatMap {
case Some(hintEvent) =>
val hint = RollbackTimestampHintFinder.TimestampHint(
requirements.persistenceId,
hintEvent.sequenceNr,
Instant.ofEpochMilli(hintEvent.timestamp),
)
Future.successful(hint)
case None =>
Future.failed(
new RollbackTimestampHintNotFound(
s"no events of persistenceId=[${requirements.persistenceId}] with a sequence number" +
s" greater than or equal to lowestSequenceNr=[${requirements.lowestSequenceNr.value}]",
),
)
}
}

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package lerna.akka.entityreplication.rollback

import akka.actor.{ ActorSystem, ClassicActorSystemProvider }
import akka.persistence.query.TimeBasedUUID
import akka.stream.scaladsl.{ Sink, Source }
import com.datastax.oss.driver.api.core.uuid.Uuids
import lerna.akka.entityreplication.rollback.PersistenceQueries.TaggedEventEnvelope

import java.time.Instant
Expand Down Expand Up @@ -34,15 +32,7 @@ private final class LinearSequenceNrSearchStrategy(
highestSequenceNr.fold(Source.empty[TaggedEventEnvelope])(queries.currentEventsBefore(persistenceId, _))
})
currentEventsBefore
.dropWhile { envelope =>
envelope.offset match {
case TimeBasedUUID(uuid) =>
val eventTimestampInMillis = Uuids.unixTimestamp(uuid)
eventTimestampInMillis > targetTimestampMillis
case _ =>
throw new IllegalStateException("event offset should always be TimeBasedUUID")
}
}
.dropWhile(_.timestamp > targetTimestampMillis)
.map(_.sequenceNr)
.runWith(Sink.headOption)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ private object PersistenceQueries {
sequenceNr: SequenceNr,
event: Any,
offset: Offset,
timestamp: Long,
tags: Set[String],
writerUuid: String,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,23 @@ import scala.concurrent.Future
* provide such compensation, tool users might have to conduct such compensation.
*/
private trait PersistentActorRollback {
import PersistentActorRollback._

/** Returns `true` if this rollback is running in dry-run mode, `false` otherwise */
def isDryRun: Boolean

/** Returns [[PersistenceQueries]] this rollback uses */
def persistenceQueries: PersistenceQueries

/** Finds rollback requirements for the persistent actor
*
* If any rollback is impossible, this method returns a failed `Future` containing a [[RollbackRequirementsNotFound]].
*/
def findRollbackRequirements(persistenceId: String): Future[RollbackRequirements]

/** Rolls back the persistent actor to the given sequence number
*
* This method doesn't verify that the rollback is actually possible. Use [[findRollbackRequirements]] to confirm that.
*
* Since restrictions depends on concrete implementations, see documents of concrete implementation to use.
*/
Expand All @@ -31,3 +40,16 @@ private trait PersistentActorRollback {
def deleteAll(persistenceId: String): Future[Done]

}

private object PersistentActorRollback {

/** Rollback requirements for the persistent actor with `persistenceId`
*
* The persistent actor can be rolled back to a sequence number greater than or equal to `lowestSequenceNr`.
*/
final case class RollbackRequirements(
persistenceId: String,
lowestSequenceNr: SequenceNr,
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ package lerna.akka.entityreplication.rollback
/** Persistence operations for persistence plugin `lerna.akka.entityreplication.raft.eventsourced.persistence` */
private class RaftEventSourcedPersistence(
val persistentActorRollback: PersistentActorRollback,
val requirementsVerifier: RollbackRequirementsVerifier,
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ private class RaftPersistence(
val persistentActorRollback: PersistentActorRollback,
val raftShardPersistenceQueries: RaftShardPersistenceQueries,
val sequenceNrSearchStrategy: SequenceNrSearchStrategy,
val requirementsVerifier: RollbackRequirementsVerifier,
)
Loading