Skip to content

Commit

Permalink
Merge pull request #97 from SwissBorg/extract-metadata-journal
Browse files Browse the repository at this point in the history
Extract journal event metadata into the separate 'metadata' column
  • Loading branch information
mkubala authored Sep 14, 2020
2 parents b50d2fc + 733e218 commit 1aeb333
Show file tree
Hide file tree
Showing 16 changed files with 175 additions and 33 deletions.
1 change: 1 addition & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ postgres-journal {
created = "created"
tags = "tags"
message = "message"
metadata = "metadata"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class JournalTableColumnNames(config: Config) {
val created: String = cfg.as[String]("created", "created")
val tags: String = cfg.as[String]("tags", "tags")
val message: String = cfg.as[String]("message", "message")
val metadata: String = cfg.as[String]("metadata", "metadata")
override def toString: String = s"JournalTableColumnNames($persistenceId,$sequenceNumber,$created,$tags,$message)"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ trait ExtendedPostgresProfile
with PgHStoreSupport
with PgSearchSupport
with PgNetSupport
with PgLTreeSupport {
with PgLTreeSupport
with PgJsonSupport
with array.PgArrayJdbcTypes
with PgCirceJsonSupport {

def pgjson = "jsonb" // jsonb support is in postgres 9.4.0 onward; for 9.3.x use "json"

// Add back `capabilities.insertOrUpdate` to enable native `upsert` support; for postgres 9.5+
Expand All @@ -32,6 +36,9 @@ trait ExtendedPostgresProfile
with HStoreImplicits
with SearchImplicits
with SearchAssistants
with JsonImplicits {
implicit val strListTypeMapper = new SimpleArrayJdbcType[String]("text").to(_.toList)
}
}

object ExtendedPostgresProfile extends ExtendedPostgresProfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ import akka.actor.Scheduler
import akka.persistence.postgres.config.JournalConfig
import akka.persistence.postgres.serialization.FlowPersistentReprSerializer
import akka.persistence.postgres.tag.TagIdResolver
import akka.persistence.{AtomicWrite, PersistentRepr}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult}
import akka.{Done, NotUsed}
import org.slf4j.{Logger, LoggerFactory}
import akka.persistence.{ AtomicWrite, PersistentRepr }
import akka.stream.scaladsl.{ Keep, Sink, Source }
import akka.stream.{ Materializer, OverflowStrategy, QueueOfferResult }
import akka.{ Done, NotUsed }
import org.slf4j.{ Logger, LoggerFactory }
import slick.jdbc.JdbcBackend._

import scala.collection.immutable._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.util.{ Failure, Success, Try }

/**
* The DefaultJournalDao contains all the knowledge to persist and load serialized journal entries
Expand Down Expand Up @@ -129,7 +129,7 @@ trait BaseByteArrayJournalDao extends JournalDaoWithUpdates with BaseJournalDaoW
def update(persistenceId: String, sequenceNr: Long, payload: AnyRef): Future[Done] = {
val write = PersistentRepr(payload, sequenceNr, persistenceId)
serializer.serialize(write).transformWith {
case Success(t) => db.run(queries.update(persistenceId, sequenceNr, t.message).map(_ => Done))
case Success(t) => db.run(queries.update(persistenceId, sequenceNr, t.message, t.metadata).map(_ => Done))
case Failure(_) =>
throw new IllegalArgumentException(
s"Failed to serialize ${write.getClass} for update of [$persistenceId] @ [$sequenceNr]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ package akka.persistence.postgres
package journal.dao

import akka.persistence.PersistentRepr
import akka.persistence.postgres.journal.dao.ByteArrayJournalSerializer.Metadata
import akka.persistence.postgres.serialization.FlowPersistentReprSerializer
import akka.persistence.postgres.tag.TagIdResolver
import akka.serialization.Serialization
import akka.serialization.{ Serialization, Serializers }
import io.circe.{ Decoder, Encoder }

import scala.collection.immutable._
import scala.concurrent.{ ExecutionContext, Future }
Expand All @@ -20,25 +22,62 @@ class ByteArrayJournalSerializer(serialization: Serialization, tagConverter: Tag
extends FlowPersistentReprSerializer[JournalRow] {

override def serialize(persistentRepr: PersistentRepr, tags: Set[String]): Future[JournalRow] = {
val convertedTagsFut =
import io.circe.syntax._
val convertedTagsFut = {
if (tags.nonEmpty) tagConverter.getOrAssignIdsFor(tags).map(_.values)
else Future.successful(Nil)
val serializedEventFut: Future[Array[Byte]] = Future.fromTry(serialization.serialize(persistentRepr))
}
val payload: AnyRef = persistentRepr.payload.asInstanceOf[AnyRef]
val serializedEventFut: Future[Array[Byte]] = Future.fromTry(serialization.serialize(payload))
for {
serializer <- Future.fromTry(Try(serialization.findSerializerFor(payload)))
convertedTags <- convertedTagsFut
serializedEvent <- serializedEventFut
} yield {
val serId = serializer.identifier
val serManifest = Serializers.manifestFor(serializer, payload)
val meta =
Metadata(serId, serManifest, persistentRepr.manifest, persistentRepr.writerUuid, persistentRepr.timestamp)
JournalRow(
Long.MinValue,
persistentRepr.deleted,
persistentRepr.persistenceId,
persistentRepr.sequenceNr,
serializedEvent,
convertedTags.toList)
convertedTags.toList,
meta.asJson)
}
}

override def deserialize(journalRow: JournalRow): Try[(PersistentRepr, Long)] =
serialization.deserialize(journalRow.message, classOf[PersistentRepr]).map((_, journalRow.ordering))
for {
metadata <- journalRow.metadata.as[Metadata].toTry
e <- serialization.deserialize(journalRow.message, metadata.serId, metadata.serManifest)
} yield {
(
PersistentRepr(
e,
journalRow.sequenceNumber,
journalRow.persistenceId,
metadata.eventManifest,
// not used, marked as deprecated (https://github.com/akka/akka/issues/27278)
deleted = false,
// not used, marked as deprecated (https://github.com/akka/akka/issues/27278
sender = null,
metadata.writerUuid).withTimestamp(metadata.timestamp),
journalRow.ordering)
}

}

object ByteArrayJournalSerializer {
case class Metadata(serId: Int, serManifest: String, eventManifest: String, writerUuid: String, timestamp: Long)

object Metadata {
implicit val encoder: Encoder[Metadata] =
Encoder.forProduct5("serId", "serManifest", "eventManifest", "writerUuid", "timestamp")(e =>
(e.serId, e.serManifest, e.eventManifest, e.writerUuid, e.timestamp))
implicit val decoder: Decoder[Metadata] =
Decoder.forProduct5("serId", "serManifest", "eventManifest", "writerUuid", "timestamp")(Metadata.apply)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package akka.persistence.postgres
package journal.dao

import io.circe.Json
import slick.lifted.TableQuery
import slick.sql.FixedSqlAction

Expand All @@ -32,10 +33,11 @@ class JournalQueries(journalTable: TableQuery[JournalTable]) {
def update(
persistenceId: String,
seqNr: Long,
replacement: Array[Byte]): FixedSqlAction[Int, NoStream, Effect.Write] = {
replacement: Array[Byte],
metadata: Json): FixedSqlAction[Int, NoStream, Effect.Write] = {
val baseQuery = journalTable.filter(_.persistenceId === persistenceId).filter(_.sequenceNumber === seqNr)

baseQuery.map(_.message).update(replacement)
baseQuery.map(r => (r.message, r.metadata)).update((replacement, metadata))
}

def markJournalMessagesAsDeleted(persistenceId: String, maxSequenceNr: Long) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package journal.dao

import akka.persistence.postgres.config.JournalTableConfiguration
import akka.persistence.postgres.db.ExtendedPostgresProfile.api._
import io.circe.Json

trait JournalTable extends Table[JournalRow] {
def ordering: Rep[Long]
Expand All @@ -16,6 +17,7 @@ trait JournalTable extends Table[JournalRow] {
def deleted: Rep[Boolean]
def tags: Rep[List[Int]]
def message: Rep[Array[Byte]]
def metadata: Rep[Json]
}

abstract class BaseJournalTable(_tableTag: Tag, journalTableCfg: JournalTableConfiguration)
Expand All @@ -27,7 +29,7 @@ abstract class BaseJournalTable(_tableTag: Tag, journalTableCfg: JournalTableCon

class FlatJournalTable private[dao] (_tableTag: Tag, journalTableCfg: JournalTableConfiguration)
extends BaseJournalTable(_tableTag, journalTableCfg) {
def * = (ordering, deleted, persistenceId, sequenceNumber, message, tags) <> (JournalRow.tupled, JournalRow.unapply)
def * = (ordering, deleted, persistenceId, sequenceNumber, message, tags, metadata) <> (JournalRow.tupled, JournalRow.unapply)

val ordering: Rep[Long] = column[Long](journalTableCfg.columnNames.ordering, O.AutoInc)
val persistenceId: Rep[String] =
Expand All @@ -36,6 +38,8 @@ class FlatJournalTable private[dao] (_tableTag: Tag, journalTableCfg: JournalTab
val deleted: Rep[Boolean] = column[Boolean](journalTableCfg.columnNames.deleted, O.Default(false))
val tags: Rep[List[Int]] = column[List[Int]](journalTableCfg.columnNames.tags)
val message: Rep[Array[Byte]] = column[Array[Byte]](journalTableCfg.columnNames.message)
val metadata: Rep[Json] = column[Json](journalTableCfg.columnNames.metadata)

val pk = primaryKey(s"${tableName}_pk", (persistenceId, sequenceNumber))
val orderingIdx = index(s"${tableName}_ordering_idx", ordering, unique = true)
val tagsIdx = index(s"${tableName}_tags_idx", tags)
Expand All @@ -48,7 +52,7 @@ object FlatJournalTable {

class PartitionedJournalTable private (_tableTag: Tag, journalTableCfg: JournalTableConfiguration)
extends BaseJournalTable(_tableTag, journalTableCfg) {
def * = (ordering, deleted, persistenceId, sequenceNumber, message, tags) <> (JournalRow.tupled, JournalRow.unapply)
def * = (ordering, deleted, persistenceId, sequenceNumber, message, tags, metadata) <> (JournalRow.tupled, JournalRow.unapply)

val ordering: Rep[Long] = column[Long](journalTableCfg.columnNames.ordering)
val persistenceId: Rep[String] =
Expand All @@ -57,6 +61,8 @@ class PartitionedJournalTable private (_tableTag: Tag, journalTableCfg: JournalT
val deleted: Rep[Boolean] = column[Boolean](journalTableCfg.columnNames.deleted, O.Default(false))
val tags: Rep[List[Int]] = column[List[Int]](journalTableCfg.columnNames.tags)
val message: Rep[Array[Byte]] = column[Array[Byte]](journalTableCfg.columnNames.message)
val metadata: Rep[Json] = column[Json](journalTableCfg.columnNames.metadata)

val pk = primaryKey(s"${tableName}_pk", (persistenceId, sequenceNumber, ordering))
val tagsIdx = index(s"${tableName}_tags_idx", tags)
}
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/akka/persistence/postgres/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@

package akka.persistence

import io.circe.Json

package object postgres {
final case class JournalRow(
ordering: Long,
deleted: Boolean,
persistenceId: String,
sequenceNumber: Long,
message: Array[Byte],
tags: List[Int])
tags: List[Int],
metadata: Json)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ CREATE TABLE IF NOT EXISTS public.journal
persistence_id TEXT NOT NULL,
message BYTEA NOT NULL,
tags int[],
metadata jsonb,
PRIMARY KEY (persistence_id, sequence_number)
) PARTITION BY LIST (persistence_id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ CREATE TABLE IF NOT EXISTS public.journal
persistence_id TEXT NOT NULL,
message BYTEA NOT NULL,
tags int[],
metadata jsonb,
PRIMARY KEY (persistence_id, sequence_number, ordering)
) PARTITION BY RANGE (ordering);

Expand Down
1 change: 1 addition & 0 deletions core/src/test/resources/schema/postgres/plain-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ CREATE TABLE IF NOT EXISTS public.journal
persistence_id TEXT NOT NULL,
message BYTEA NOT NULL,
tags int[],
metadata jsonb,
PRIMARY KEY (persistence_id, sequence_number)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,25 @@
package akka.persistence.postgres
package journal.dao

import java.nio.charset.Charset
import java.time.{ LocalDateTime, ZoneOffset }
import java.util.UUID

import akka.persistence.journal.Tagged
import akka.persistence.postgres.journal.dao.ByteArrayJournalSerializer.Metadata
import akka.persistence.postgres.journal.dao.FakeTagIdResolver.unwanted1
import akka.persistence.postgres.tag.TagIdResolver
import akka.persistence.{ AtomicWrite, PersistentRepr }
import akka.serialization.Serializers
import io.circe.Json
import org.scalatest.EitherValues
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.must.Matchers

import scala.collection.immutable._
import scala.concurrent.Future

class ByteArrayJournalSerializerTest extends SharedActorSystemTestSpec with ScalaFutures {
class ByteArrayJournalSerializerTest extends SharedActorSystemTestSpec with ScalaFutures with EitherValues {
it should "serialize a serializable message and indicate whether or not the serialization succeeded" in {
val serializer = new ByteArrayJournalSerializer(serialization, new FakeTagIdResolver())
val result = serializer.serialize(Seq(AtomicWrite(PersistentRepr("foo"))))
Expand Down Expand Up @@ -50,14 +58,65 @@ class ByteArrayJournalSerializerTest extends SharedActorSystemTestSpec with Scal
result.last.futureValue should not be empty
}

it should "not serialize a serializable message a non-serializable tag(s)" in {
it should "not serialize a serializable message tagged with a non-serializable tag(s)" in {
val failingEventTagConverter =
new FakeTagIdResolver(getOrAssignIdsForF = _ => Future.failed(new RuntimeException("Fake exception")))
val serializer = new ByteArrayJournalSerializer(serialization, failingEventTagConverter)
val result = serializer.serialize(Seq(AtomicWrite(PersistentRepr(Tagged("foo", Set("bar", "baz"))))))
result should have size 1
result.head.failed.futureValue shouldBe a[Throwable]
}

it should "serialize metadata" in {
val serializer = new ByteArrayJournalSerializer(serialization, new FakeTagIdResolver())
val payload = "foo"
val repr = PersistentRepr(payload)
val result = serializer.serialize(Seq(AtomicWrite(repr)))
val serializedRows = result.head.futureValue
serializedRows should have size 1

val meta = serializedRows.head.metadata.as[Metadata].right.value
val payloadSer = serialization.serializerFor(payload.getClass)
meta should equal {
Metadata(
serId = payloadSer.identifier,
serManifest = Serializers.manifestFor(payloadSer, payload),
eventManifest = repr.manifest,
writerUuid = repr.writerUuid,
timestamp = repr.timestamp)
}
}

it should "deserialize metadata" in {
val serializer = new ByteArrayJournalSerializer(serialization, new FakeTagIdResolver())

val payload = "foo"
val payloadSer = serialization.serializerFor(payload.getClass)
val serId = payloadSer.identifier
val serManifest = Serializers.manifestFor(payloadSer, payload)
val eventManifest = "event manifest"
val writerUuid = UUID.randomUUID().toString
val timestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)

val meta = Json.fromFields(
List(
"serId" -> Json.fromLong(serId),
"serManifest" -> Json.fromString(serManifest),
"eventManifest" -> Json.fromString(eventManifest),
"writerUuid" -> Json.fromString(writerUuid),
"timestamp" -> Json.fromLong(timestamp)))
val row = JournalRow(1L, false, "my-7", 2137L, payload.getBytes(Charset.forName("UTF-8")), Nil, meta)

val result = serializer.deserialize(row)

val (repr, ordering) = result.success.value

ordering should equal(1L)
repr should equal {
PersistentRepr(payload, 2137L, "my-7", eventManifest, false, null, writerUuid)
}
}

}

class FakeTagIdResolver(
Expand Down
Loading

0 comments on commit 1aeb333

Please sign in to comment.