Skip to content

Commit

Permalink
support rolling updates akka#710
Browse files Browse the repository at this point in the history
  • Loading branch information
AndyChen authored and Roiocam committed Oct 25, 2023
1 parent 0ed72af commit c075995
Show file tree
Hide file tree
Showing 16 changed files with 130 additions and 21 deletions.
15 changes: 14 additions & 1 deletion core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ jdbc-journal {
}

event_tag {
tableName = "event_tag"
tableName = "event_tags"
schemaName = ""

columnNames {
Expand All @@ -144,6 +144,19 @@ jdbc-journal {
}
}

legacy_event_tag {
tableName = "event_tag"
schemaName = ""

columnNames {
eventId = "event_id"
tag = "tag"
}
}
# for rolling updates the event_tag table migration.
write-legacy-tag = false
read-legacy-tag = false

# Otherwise it would be a pinned dispatcher, see https://github.com/akka/akka/issues/31058
plugin-dispatcher = "akka.actor.default-dispatcher"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS "event_tag" (
"event_id" BIGINT NOT NULL,
"tag" VARCHAR NOT NULL,
PRIMARY KEY("event_id", "tag"),
CONSTRAINT fk_event_journal
FOREIGN KEY("event_id")
REFERENCES "event_journal"("ordering")
ON DELETE CASCADE
);
2 changes: 1 addition & 1 deletion core/src/main/resources/schema/h2/h2-create-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS "event_journal" (

CREATE UNIQUE INDEX "event_journal_ordering_idx" on "event_journal" ("ordering");

CREATE TABLE IF NOT EXISTS "event_tag" (
CREATE TABLE IF NOT EXISTS "event_tags" (
"persistence_id" VARCHAR(255) NOT NULL,
"sequence_number" BIGINT NOT NULL,
"tag" VARCHAR NOT NULL,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE IF NOT EXISTS event_tag (
event_id BIGINT UNSIGNED NOT NULL,
tag VARCHAR(255) NOT NULL,
PRIMARY KEY(event_id, tag),
FOREIGN KEY (event_id)
REFERENCES event_journal(ordering)
ON DELETE CASCADE
)
4 changes: 2 additions & 2 deletions core/src/main/resources/schema/mysql/mysql-create-schema.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE IF NOT EXISTS event_journal(
CREATE TABLE IF NOT EXISTS event_journal (
ordering SERIAL,
deleted BOOLEAN DEFAULT false NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
Expand All @@ -16,7 +16,7 @@ CREATE TABLE IF NOT EXISTS event_journal(

CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering);

CREATE TABLE IF NOT EXISTS event_tag (
CREATE TABLE IF NOT EXISTS event_tags (
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
tag VARCHAR(255) NOT NULL,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE EVENT_TAG (
EVENT_ID NUMERIC NOT NULL,
TAG VARCHAR(255) NOT NULL,
PRIMARY KEY(EVENT_ID, TAG),
FOREIGN KEY(EVENT_ID) REFERENCES EVENT_JOURNAL(ORDERING)
ON DELETE CASCADE
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ CREATE TABLE EVENT_JOURNAL (
CREATE OR REPLACE TRIGGER EVENT_JOURNAL__ORDERING_TRG before insert on EVENT_JOURNAL REFERENCING NEW AS NEW FOR EACH ROW WHEN (new.ORDERING is null) begin select EVENT_JOURNAL__ORDERING_seq.nextval into :new.ORDERING from sys.dual; end;
/

CREATE TABLE EVENT_TAG (
CREATE TABLE EVENT_TAGS (
PERSISTENCE_ID VARCHAR(255) NOT NULL,
SEQUENCE_NUMBER NUMERIC NOT NULL,
TAG VARCHAR(255) NOT NULL,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS public.event_tag(
event_id BIGINT,
tag VARCHAR(256),
PRIMARY KEY(event_id, tag),
CONSTRAINT fk_event_journal
FOREIGN KEY(event_id)
REFERENCES event_journal(ordering)
ON DELETE CASCADE
);
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE IF NOT EXISTS public.event_journal(
CREATE TABLE IF NOT EXISTS public.event_journal (
ordering BIGSERIAL,
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
Expand All @@ -21,7 +21,7 @@ CREATE TABLE IF NOT EXISTS public.event_journal(

CREATE UNIQUE INDEX event_journal_ordering_idx ON public.event_journal(ordering);

CREATE TABLE IF NOT EXISTS public.event_tag(
CREATE TABLE IF NOT EXISTS public.event_tags (
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
tag VARCHAR(256),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE event_tag (
"event_id" BIGINT NOT NULL,
"tag" NVARCHAR(255) NOT NULL
PRIMARY KEY ("event_id","tag")
constraint "fk_event_journal"
foreign key("event_id")
references "dbo"."event_journal"("ordering")
on delete CASCADE
);
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE event_journal(
CREATE TABLE event_journal (
"ordering" BIGINT IDENTITY(1,1) NOT NULL,
"deleted" BIT DEFAULT 0 NOT NULL,
"persistence_id" NVARCHAR(255) NOT NULL,
Expand All @@ -17,7 +17,7 @@ CREATE TABLE event_journal(

CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering);

CREATE TABLE event_tag (
CREATE TABLE event_tags (
"persistence_id" NVARCHAR(255) NOT NULL,
"sequence_number" NUMERIC(10,0) NOT NULL,
"tag" NVARCHAR(255) NOT NULL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class EventTagTableColumnNames(config: Config) {
val sequenceNumber: String = cfg.getString("sequenceNumber")
val tag: String = cfg.getString("tag")
}
class LegacyEventTagTableColumnNames(config: Config) {
private val cfg = config.getConfig("tables.legacy_event_tag.columnNames")
val eventId: String = cfg.getString("eventId")
val tag: String = cfg.getString("tag")
}

class LegacyJournalTableConfiguration(config: Config) {
private val cfg = config.getConfig("tables.legacy_journal")
Expand All @@ -75,9 +80,15 @@ class EventJournalTableConfiguration(config: Config) {
}
class EventTagTableConfiguration(config: Config) {
private val cfg = config.getConfig("tables.event_tag")
private val legacyCfg = config.getConfig("tables.legacy_event_tag")
val writeLegacyTag: Boolean = config.getBoolean("tables.write-legacy-tag")
val readLegacyTag: Boolean = config.getBoolean("tables.read-legacy-tag")
val tableName: String = cfg.getString("tableName")
val schemaName: Option[String] = cfg.asStringOption("schemaName")
val columnNames: EventTagTableColumnNames = new EventTagTableColumnNames(config)
val legacyTableName: String = legacyCfg.getString("tableName")
val legacySchemaName: Option[String] = legacyCfg.asStringOption("schemaName")
val legacyColumnNames: LegacyEventTagTableColumnNames = new LegacyEventTagTableColumnNames(config)
}
class LegacySnapshotTableColumnNames(config: Config) {
private val cfg = config.getConfig("tables.legacy_snapshot.columnNames")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package akka.persistence.jdbc.journal.dao

import akka.persistence.jdbc.config.{ EventJournalTableConfiguration, EventTagTableConfiguration }
import akka.persistence.jdbc.journal.dao.JournalTables.{ JournalAkkaSerializationRow, TagRow }
import akka.persistence.jdbc.journal.dao.JournalTables.{ JournalAkkaSerializationRow, LegacyTagRow, TagRow }
import slick.jdbc.JdbcProfile

import scala.concurrent.ExecutionContext
Expand All @@ -20,25 +20,45 @@ class JournalQueries(
import profile.api._

private val JournalTableC = Compiled(JournalTable)
private val insertAndReturn = JournalTable.returning(JournalTable.map(_.ordering))
private val TagTableC = Compiled(TagTable)
private val LegacyTagTableC = Compiled(LegacyTagTable)

def writeJournalRows(xs: Seq[(JournalAkkaSerializationRow, Set[String])])(implicit ec: ExecutionContext): DBIOAction[Any, NoStream, Effect.Write] = {
def writeJournalRows(xs: Seq[(JournalAkkaSerializationRow, Set[String])])(
implicit ec: ExecutionContext): DBIOAction[Any, NoStream, Effect.Write] = {
val sorted = xs.sortBy((event => event._1.sequenceNumber))
// not matter what, always insert event.
val events = sorted.map(_._1)
if (sorted.exists(_._2.nonEmpty)) {
// only if there are any tags
val tagsRows = sorted.flatMap(e => e._2.map(v => TagRow(e._1.persistenceId, e._1.sequenceNumber, v)))
for {
_ <- JournalTableC ++= events
_ <- TagTableC ++= tagsRows
} yield ()
writeEventsAndTags(sorted)
} else {
// optimization avoid some work when not using tags
val events = sorted.map(_._1)
JournalTableC ++= events
}
}

private def writeEventsAndTags(sorted: Seq[(JournalAkkaSerializationRow, Set[String])])(
implicit ec: ExecutionContext): DBIOAction[Any, NoStream, Effect.Write] = {
val (events, tags) = sorted.unzip
val tagRows = sorted.flatMap { case (eventRow, tags) =>
tags.map(t => TagRow(eventRow.persistenceId, eventRow.sequenceNumber, t))
}
if (tagTableCfg.writeLegacyTag) {
for {
ids <- insertAndReturn ++= events
tagInserts = ids.zip(tags).flatMap { case (id, tags) => tags.map(tag => LegacyTagRow(id, tag)) }
_ <- LegacyTagTableC ++= tagInserts
_ <- TagTableC ++= tagRows
} yield ()
} else {
// optimization using batch insert
for {
_ <- JournalTableC ++= events
_ <- TagTableC ++= tagRows
} yield ()
}
}

private def selectAllJournalForPersistenceIdDesc(persistenceId: Rep[String]) =
selectAllJournalForPersistenceId(persistenceId).sortBy(_.sequenceNumber.desc)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.persistence.jdbc.journal.dao

import akka.annotation.InternalApi
import akka.persistence.jdbc.config.{ EventJournalTableConfiguration, EventTagTableConfiguration }
import akka.persistence.jdbc.journal.dao.JournalTables.{ JournalAkkaSerializationRow, TagRow }
import akka.persistence.jdbc.journal.dao.JournalTables.{ JournalAkkaSerializationRow, LegacyTagRow, TagRow }

/**
* INTERNAL API
Expand All @@ -30,6 +30,7 @@ object JournalTables {
metaSerManifest: Option[String])

case class TagRow(persistenceId: String, sequenceNumber: Long, tag: String)
case class LegacyTagRow(eventId: Long, tag: String)
}

/**
Expand All @@ -44,7 +45,6 @@ trait JournalTables {

def journalTableCfg: EventJournalTableConfiguration
def tagTableCfg: EventTagTableConfiguration

class JournalEvents(_tableTag: Tag)
extends Table[JournalAkkaSerializationRow](
_tableTag,
Expand Down Expand Up @@ -104,4 +104,17 @@ trait JournalTables {
}

lazy val TagTable = new TableQuery(tag => new EventTags(tag))

class LegacyEventTags(_tableTag: Tag)
extends Table[LegacyTagRow](_tableTag, tagTableCfg.legacySchemaName, tagTableCfg.legacyTableName) {
override def * = (eventId, tag) <> (LegacyTagRow.tupled, LegacyTagRow.unapply)

val eventId: Rep[Long] = column[Long](tagTableCfg.legacyColumnNames.eventId)
val tag: Rep[String] = column[String](tagTableCfg.legacyColumnNames.tag)

val pk = primaryKey(s"${tagTableCfg.legacyTableName}_pk", (eventId, tag))
val journalEvent = foreignKey(s"fk_legacy_${journalTableCfg.tableName}", eventId, JournalTable)(_.ordering)
}

lazy val LegacyTagTable = new TableQuery(tag => new LegacyEventTags(tag))
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
JournalTable.filter(_.deleted === false)

private def baseTableWithTagsQuery() = {
baseTableQuery().join(TagTable).on((e, t) => e.persistenceId === t.persistenceId && e.sequenceNumber === t.sequenceNumber)
if (tagTableCfg.readLegacyTag) {
baseTableQuery().join(LegacyTagTable).on(_.ordering === _.eventId)
} else {
baseTableQuery()
.join(TagTable)
.on((e, t) => e.persistenceId === t.persistenceId && e.sequenceNumber === t.sequenceNumber)
}
}

val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ private[jdbc] object SchemaUtilsImpl {
def legacy(configKey: String, config: Config): Boolean =
config.getConfig(configKey).getString("dao") != "akka.persistence.jdbc.journal.dao.DefaultJournalDao"

// TODO How to gracefully create and drop legacy tags table.
def eventTagsLegacy(configKey: String, config: Config): Boolean =
config.getConfig(configKey).getBoolean("tables.use-legacy-tag")

/**
* INTERNAL API
*/
Expand Down

0 comments on commit c075995

Please sign in to comment.