-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #210 from lerna-stack/rollback-preparation-fails-i…
…f-required-data-have-been-deleted Rollback preparation fails if required data have been deleted
- Loading branch information
Showing
38 changed files
with
2,656 additions
and
503 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
20 changes: 20 additions & 0 deletions
20
...0.backwards.excludes/pr-210-rollback-preparation-fails-if-required-data-have-been-deleted
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
# PersistenceQueries#TaggedEventEnvelope is private: | ||
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.rollback.PersistenceQueries#TaggedEventEnvelope.copy") | ||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.akka.entityreplication.rollback.PersistenceQueries#TaggedEventEnvelope.copy$default$5") | ||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.akka.entityreplication.rollback.PersistenceQueries#TaggedEventEnvelope.copy$default$6") | ||
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.rollback.PersistenceQueries#TaggedEventEnvelope.this") | ||
ProblemFilters.exclude[MissingTypesProblem]("lerna.akka.entityreplication.rollback.PersistenceQueries$TaggedEventEnvelope$") | ||
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.rollback.PersistenceQueries#TaggedEventEnvelope.apply") | ||
ProblemFilters.exclude[IncompatibleSignatureProblem]("lerna.akka.entityreplication.rollback.PersistenceQueries#TaggedEventEnvelope.unapply") | ||
|
||
# PersistentActorRollback is private: | ||
ProblemFilters.exclude[ReversedMissingMethodProblem]("lerna.akka.entityreplication.rollback.PersistentActorRollback.findRollbackRequirements") | ||
|
||
# RaftEventSourcedPersistence is private: | ||
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.rollback.RaftEventSourcedPersistence.this") | ||
|
||
# RaftPersistence is private: | ||
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.rollback.RaftPersistence.this") | ||
|
||
# CassandraSnapshotSettings is private: | ||
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.rollback.cassandra.CassandraSnapshotSettings.this") |
87 changes: 87 additions & 0 deletions
87
...ain/scala/lerna/akka/entityreplication/rollback/DefaultRollbackRequirementsVerifier.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
package lerna.akka.entityreplication.rollback | ||
|
||
import akka.Done | ||
import akka.actor.{ ActorSystem, ClassicActorSystemProvider } | ||
|
||
import java.time.Instant | ||
import scala.concurrent.Future | ||
|
||
/** @inheritdoc */ | ||
private final class DefaultRollbackRequirementsVerifier( | ||
systemProvider: ClassicActorSystemProvider, | ||
rollback: PersistentActorRollback, | ||
timestampHintFinder: RollbackTimestampHintFinder, | ||
) extends RollbackRequirementsVerifier { | ||
|
||
private implicit val system: ActorSystem = | ||
systemProvider.classicSystem | ||
|
||
import system.dispatcher | ||
|
||
/** @inheritdoc */ | ||
override def verify( | ||
persistenceId: String, | ||
toSequenceNr: Option[SequenceNr], | ||
toTimestampHintOpt: Option[Instant], | ||
): Future[Done] = { | ||
rollback.findRollbackRequirements(persistenceId).flatMap { requirements => | ||
assert(requirements.persistenceId == persistenceId) | ||
toSequenceNr match { | ||
case Some(sequenceNr) => | ||
if (sequenceNr >= requirements.lowestSequenceNr) { | ||
Future.successful(Done) | ||
} else { | ||
tryFindRollbackTimestampHint(requirements, toTimestampHintOpt) | ||
.flatMap { rollbackTimestampHintOpt => | ||
Future.failed( | ||
new RollbackRequirementsNotFulfilled( | ||
s"Rollback to sequence number [${sequenceNr.value}] for the persistent actor [${persistenceId}] is impossible" + | ||
s" since the sequence number should be greater than or equal to [${requirements.lowestSequenceNr.value}]." + | ||
rollbackTimestampHintOpt.fold("")(" " + _), | ||
), | ||
) | ||
} | ||
} | ||
case None => | ||
if (requirements.lowestSequenceNr == SequenceNr(1)) { | ||
Future.successful(Done) | ||
} else { | ||
tryFindRollbackTimestampHint(requirements, toTimestampHintOpt) | ||
.flatMap { rollbackTimestampHintOpt => | ||
Future.failed( | ||
new RollbackRequirementsNotFulfilled( | ||
s"Deleting all data for the persistent actor [${persistenceId}] is impossible" + | ||
" since already deleted events might contain required data for consistency with other persistent actors." + | ||
rollbackTimestampHintOpt.fold("")(" " + _), | ||
), | ||
) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
private def tryFindRollbackTimestampHint( | ||
requirements: PersistentActorRollback.RollbackRequirements, | ||
toTimestampHintOpt: Option[Instant], | ||
): Future[Option[String]] = { | ||
toTimestampHintOpt match { | ||
case Some(toTimestamp) => | ||
timestampHintFinder | ||
.findTimestampHint(requirements) | ||
.transform { hintOrError => | ||
hintOrError | ||
.map { hint => | ||
s"Hint: rollback timestamp [$toTimestamp] should be newer than timestamp [${hint.timestamp}] of sequence number [${hint.sequenceNr.value}], at least." | ||
}.recover { cause => | ||
s"Hint: not available due to [${cause.getClass.getCanonicalName}]: ${cause.getMessage}]." | ||
} | ||
.map(Option.apply) | ||
} | ||
case None => | ||
// Not needed for non-timestamp-based rollback. | ||
Future.successful(None) | ||
} | ||
} | ||
|
||
} |
46 changes: 46 additions & 0 deletions
46
.../main/scala/lerna/akka/entityreplication/rollback/LinearRollbackTimestampHintFinder.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package lerna.akka.entityreplication.rollback | ||
import akka.actor.{ ActorSystem, ClassicActorSystemProvider } | ||
import akka.stream.scaladsl.Sink | ||
|
||
import java.time.Instant | ||
import scala.concurrent.Future | ||
|
||
/** @inheritdoc */ | ||
private final class LinearRollbackTimestampHintFinder( | ||
systemProvider: ClassicActorSystemProvider, | ||
queries: PersistenceQueries, | ||
) extends RollbackTimestampHintFinder { | ||
|
||
private implicit val system: ActorSystem = | ||
systemProvider.classicSystem | ||
|
||
import system.dispatcher | ||
|
||
/** @inheritdoc */ | ||
override def findTimestampHint( | ||
requirements: PersistentActorRollback.RollbackRequirements, | ||
): Future[RollbackTimestampHintFinder.TimestampHint] = { | ||
// NOTE: In most cases, an event with `lowestSequenceNr` or `lowestSequenceNr+1` exists. | ||
// TODO Search a timestamp hint from snapshots since the persistent actor can delete the event with `lowestSequenceNr`. | ||
queries | ||
.currentEventsAfter(requirements.persistenceId, requirements.lowestSequenceNr) | ||
.runWith(Sink.headOption) | ||
.flatMap { | ||
case Some(hintEvent) => | ||
val hint = RollbackTimestampHintFinder.TimestampHint( | ||
requirements.persistenceId, | ||
hintEvent.sequenceNr, | ||
Instant.ofEpochMilli(hintEvent.timestamp), | ||
) | ||
Future.successful(hint) | ||
case None => | ||
Future.failed( | ||
new RollbackTimestampHintNotFound( | ||
s"no events of persistenceId=[${requirements.persistenceId}] with a sequence number" + | ||
s" greater than or equal to lowestSequenceNr=[${requirements.lowestSequenceNr.value}]", | ||
), | ||
) | ||
} | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.