Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CassandraPersistenceQueries should find the highest sequence number even if deleted partition exists #207

Closed
xirc opened this issue May 17, 2023 · 0 comments · Fixed by #209
Milestone

Comments

@xirc
Copy link
Contributor

xirc commented May 17, 2023

For #201

CassandraPersistenceQueries doesn't yet support deleted partitions (which are empty). CassandraPersistenceQueries can handle one empty partition but not two or more consecutive empty partitions:

/** Finds the highest sequence number from the given partition or above
*
* If there are no events whose partition numbers are greater than or equal to the given partition, this method
* returns `Future.successful(None)`.
*
* Since `akka.persistence.AtomicWrite` can skip at most one entire partition, the partition gap (an empty partition
* exists between non-empty partitions) could occur. This method can handle such a partition gap.
*
* @see [[findHighestPartitionNr]]
*/
def findHighestSequenceNr(
persistenceId: String,
from: PartitionNr,
): Future[Option[SequenceNr]] = {
val allowableConsecutiveEmptyPartitionCount: Int = 1
def find(
currentPartitionNr: PartitionNr,
lastHighestSequenceNr: Option[SequenceNr],
consecutiveEmptyPartitionCount: Int,
): Future[Option[SequenceNr]] = {
selectHighestSequenceNr(persistenceId, currentPartitionNr)
.flatMap {
case Some(highestSequenceNr) =>
assert(highestSequenceNr.value > 0)
find(currentPartitionNr + 1, Some(highestSequenceNr), 0)
case None =>
if (consecutiveEmptyPartitionCount < allowableConsecutiveEmptyPartitionCount) {
find(currentPartitionNr + 1, lastHighestSequenceNr, consecutiveEmptyPartitionCount + 1)
} else {
Future.successful(lastHighestSequenceNr)
}
}
}
find(from, None, 0)
}

For example, CassandraPersistenceQueries.findHighestSequenceNr returns a Future containing None mistakenly in some cases if two or more consecutive empty partitions exist (all events on those partitions have been deleted).

Suppose that:

  • Partition 0 (sequence numbers: 1 ~ 10): This partition is empty. All events on this partition have been deleted.
  • Partition 1 (sequence numbers: 11 ~ 20): This partition is empty. All events on this partition have been deleted.
  • Partition 2 (sequence numbers: 21 ~ 30): Events with sequence numbers from 24 up to 26 exist. Events with sequence numbers 21 up to 23 have been deleted.

CassandraPersistenceQueries.findHighestSequenceNr(persistenceId=???,from=PartitionNr(0)) returns a Future containing None now but should return the highest sequence number 26 instead.

Because LinearSequenceNrSearchStrategy (https://github.com/lerna-stack/akka-entity-replication/blob/v2.2.0/rollback-tool-cassandra/src/main/scala/lerna/akka/entityreplication/rollback/LinearSequenceNrSearchStrategy.scala) depends on CassandraPersistenceQueries.findHighestSequenceNr indirectly, a rollback might be impossible if two or more deleted partitions exist.

To address this issue, CassandraPersistenceQueries can fetch the highest deleted sequence number (called deleted_to) from the metadata table (akka.metadata) and then skip deleted partitions.

Akka Persistence Cassandra Journal Schema is described as:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant