From 9cbe4334634b40b01cdf764453928c62093e4f08 Mon Sep 17 00:00:00 2001 From: AndyChen Date: Thu, 14 Sep 2023 10:45:59 +0800 Subject: [PATCH 1/2] perf: Reducing memory cost while replaying --- .../akka/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala | 2 +- .../jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala b/core/src/main/scala/akka/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala index 1fe61dcd3..ad4089fb2 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala @@ -118,7 +118,7 @@ class JdbcAsyncWriteJournal(config: Config) extends AsyncWriteJournal { journalDao .messagesWithBatch(persistenceId, fromSequenceNr, toSequenceNr, journalConfig.daoConfig.replayBatchSize, None) .take(max) - .mapAsync(1)(reprAndOrdNr => Future.fromTry(reprAndOrdNr)) + .collect { case Success(reprAndOrdNr) => reprAndOrdNr } .runForeach { case (repr, _) => recoveryCallback(repr) } diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala index f1174d64b..65627a6ed 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala @@ -12,7 +12,6 @@ import akka.persistence.jdbc.journal.dao.FlowControl.{ Continue, ContinueDelayed import akka.stream.Materializer import akka.stream.scaladsl.{ Sink, Source } -import scala.collection.immutable.Seq import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.duration.FiniteDuration import scala.util.{ Failure, Success, Try } @@ -67,7 +66,7 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { akka.pattern.after(delay, scheduler)(retrieveNextBatch()) } } - .mapConcat(identity(_)) + .mapConcat(identity) } } From 1d40958b427c731a5125995f459248cd10c4abe8 Mon Sep 17 00:00:00 2001 From: AndyChen Date: Thu, 14 Sep 2023 19:52:14 +0800 Subject: [PATCH 2/2] fix: rethrow exception on failure --- .../persistence/jdbc/journal/JdbcAsyncWriteJournal.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala b/core/src/main/scala/akka/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala index ad4089fb2..08a6886a7 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala @@ -118,9 +118,10 @@ class JdbcAsyncWriteJournal(config: Config) extends AsyncWriteJournal { journalDao .messagesWithBatch(persistenceId, fromSequenceNr, toSequenceNr, journalConfig.daoConfig.replayBatchSize, None) .take(max) - .collect { case Success(reprAndOrdNr) => reprAndOrdNr } - .runForeach { case (repr, _) => - recoveryCallback(repr) + .runForeach { + case Success((repr, _)) => + recoveryCallback(repr) + case Failure(ex) => throw ex } .map(_ => ())