Skip to content

Commit

Permalink
fix: optimized migrate step
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam committed Oct 25, 2023
1 parent a54afb9 commit cfb6e7a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,9 @@ import akka.persistence.jdbc.query.{

class PostgresScalaEventsByTagMigrationTest
extends EventsByTagMigrationTest("postgres-application.conf")
with PostgresCleaner {

override def alterEventIdToNullable(): Unit =
alterColumn(changeToDialect = "DROP NOT NULL")
}
with PostgresCleaner {}

class MySQLScalaEventByTagMigrationTest extends EventsByTagMigrationTest("mysql-application.conf") with MysqlCleaner {
override def alterEventIdToNullable(): Unit =
alterColumn(alterDialect = "MODIFY COLUMN", changeToDialect = "BIGINT UNSIGNED NULL")

override def dropLegacyFKConstraint(): Unit =
dropConstraint(constraintType = "FOREIGN KEY", constraintDialect = "FOREIGN KEY")
Expand All @@ -47,9 +41,6 @@ class OracleScalaEventByTagMigrationTest
alterColumn(alterDialect = "MODIFY", changeToDialect = "NOT NULL")
}

override def alterEventIdToNullable(): Unit =
alterColumn(alterDialect = "MODIFY", changeToDialect = "NULL")

override def dropLegacyFKConstraint(): Unit =
dropConstraint(constraintTableName = "USER_CONSTRAINTS", constraintType = "R")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,16 @@ abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(co
def addNewColumn(): Unit = {}

/**
* drop legacy not compatible inserted rows.
* fill new column for exists rows.
*/
def deleteLegacyRows(): Unit = {
def migrateLegacyRows(): Unit = {
withStatement { stmt =>
stmt.execute(s"""
|DELETE FROM ${tagTableCfg.tableName}
|WHERE ${tagTableCfg.columnNames.persistenceId} IS NULL
|AND ${tagTableCfg.columnNames.sequenceNumber} IS NULL
|UPDATE ${tagTableCfg.tableName}
|SET ${tagTableCfg.columnNames.persistenceId} = ${journalTableName}.${journalTableCfg.columnNames.persistenceId},
| ${tagTableCfg.columnNames.sequenceNumber} = ${journalTableName}.${journalTableCfg.columnNames.sequenceNumber}
|FROM ${journalTableName}
|WHERE ${tagTableCfg.tableName}.${tagTableCfg.columnNames.eventId} = ${journalTableName}.${journalTableCfg.columnNames.ordering}
|""".stripMargin)
}
}
Expand Down Expand Up @@ -142,12 +144,6 @@ abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(co
def addNewFKConstraint(): Unit =
addFKConstraint()

/**
* alter the event_id to nullable, so we can skip the InsertAndReturn.
*/
def alterEventIdToNullable(): Unit =
alterColumn()

// override this, so we can reset the value.
def withRollingUpdateActorSystem(f: ActorSystem => Unit): Unit = {
val legacyTagKeyConfig = legacyTagKeyConfigOverride.foldLeft(ConfigFactory.load(config)) {
Expand All @@ -161,9 +157,8 @@ abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(co
}

it should "migrate event tag to new way" in {
// 1. Mock legacy data on here, but actually using redundant write and read.
// 1. Mock legacy tag column on here, but actually using new tag write.
withRollingUpdateActorSystem { implicit system =>
pendingIfOracleWithLegacy()

val journalOps = new ScalaJdbcReadJournalOperations(system)
withTestActors(replyToMessages = true) { (actor1, actor2, actor3) =>
Expand All @@ -181,10 +176,11 @@ abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(co
}(system)
}

// Assume that the user has completed the addition of the new column, then we don't need to maintain
// Assume that the user could alter table for the addition of the new column manually, then we don't need to maintain
// the legacy table schema creation.
if (newDao) {
addNewColumn();
migrateLegacyRows();
}

// 2. write and read redundancy
Expand All @@ -208,19 +204,16 @@ abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(co
}(system)
}

// 3. Delete the rows inserted in the old way and alter the event_id to nullable so that we can migrate to the read and write from the new PK.
// 3. Migrate the old constraints so that we can change read and write from the new PK.
if (newDao) {
deleteLegacyRows();
dropLegacyFKConstraint();
dropLegacyPKConstraint()
addNewPKConstraint()
addNewFKConstraint()
alterEventIdToNullable();
}

// 4. check the migration completed.
withActorSystem { implicit system =>
pendingIfOracleWithLegacy()

val journalOps = new ScalaJdbcReadJournalOperations(system)
withTestActors(replyToMessages = true) { (actor1, actor2, actor3) =>
Expand Down

0 comments on commit cfb6e7a

Please sign in to comment.