Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration tool - journal and snapshot metadata extraction (#91, #96) #100

Merged
merged 17 commits into from
Oct 4, 2020
14 changes: 12 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ lazy val `akka-persistence-postgres` = project
.in(file("."))
.enablePlugins(ScalaUnidocPlugin)
.disablePlugins(MimaPlugin)
.aggregate(core)
.settings(publish / skip := true)
.aggregate(core, migration)
.settings(
publish / skip := true
)

lazy val core = project
.in(file("core"))
Expand All @@ -16,6 +18,14 @@ lazy val core = project
mimaBinaryIssueFilters ++= Seq()
)

lazy val migration = project
.in(file("migration"))
.disablePlugins(MimaPlugin)
.settings(
name := "akka-persistence-postgres-migration",
libraryDependencies ++= Dependencies.Migration)
.dependsOn(core)

TaskKey[Unit]("verifyCodeFmt") := {
scalafmtCheckAll.all(ScopeFilter(inAnyProject)).result.value.toEither.left.foreach { _ =>
throw new MessageOnlyException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ trait ExtendedPostgresProfile

override val api = MyAPI

object MyAPI
trait MyAPI
extends API
with ArrayImplicits
with SimpleArrayPlainImplicits
Expand All @@ -39,6 +39,7 @@ trait ExtendedPostgresProfile
with JsonImplicits {
implicit val strListTypeMapper = new SimpleArrayJdbcType[String]("text").to(_.toList)
}
object MyAPI extends MyAPI
}

object ExtendedPostgresProfile extends ExtendedPostgresProfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ object SlickDriver {
*/
object SlickDatabase {

/**
* INTERNAL API
*/
@deprecated(message = "Internal API, will be removed in 4.0.0", since = "3.4.0")
def forConfig(config: Config, slickConfiguration: SlickConfiguration): Database = {
database(config, slickConfiguration, "slick.db")
}

/**
* INTERNAL API
*/
Expand Down
1 change: 0 additions & 1 deletion core/src/test/resources/general.conf
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ akka {
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

actor {
// Required until https://github.com/akka/akka/pull/28333 is available
allow-java-serialization = on
debug {
receive = on // log all messages sent to an actor if that actors receive method is a LoggingReceive
Expand Down
21 changes: 21 additions & 0 deletions migration/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8" ?>
<configuration debug="false">

<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>debug</level>
</filter>
<encoder>
<pattern>%date{ISO8601} - %logger -> %-5level[%thread] %logger{0} - %msg%n</pattern>
</encoder>
</appender>

<!--<logger name="akka.persistence.jdbc" level="debug"/>-->

<logger name="com.zaxxer.hikari" level="warn"/>

<root level="INFO">
<appender-ref ref="console"/>
</root>

</configuration>
7 changes: 7 additions & 0 deletions migration/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
akka-persistence-postgres {
migration {
v2 {
batchSize = 500
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package akka.persistence.postgres.migration

import java.io.PrintWriter
import java.sql.{ Connection, DriverManager }

import akka.actor.ActorSystem
import akka.persistence.postgres.db.SlickExtension
import akka.persistence.postgres.migration.v2.V2__Extract_journal_metadata
import akka.stream.{ Materializer, SystemMaterializer }
import com.typesafe.config.Config
import javax.sql.DataSource
import org.flywaydb.core.Flyway
import org.flywaydb.core.api.configuration.FluentConfiguration
import slick.jdbc.JdbcBackend

import scala.concurrent.Future
import scala.util.Try

class AkkaPersistencePostgresMigration private (flyway: Flyway, onComplete: Try[Int] => Unit)(
implicit system: ActorSystem) {

/**
* Perform journal & snapshot store migrations.
*
* @return Future containing a number of successfully applied migrations.
*/
def run: Future[Int] = {
import system.dispatcher
implicit val met: Materializer = SystemMaterializer(system).materializer

val migrationFut = Future {
flyway.baseline()
flyway.migrate()
}

migrationFut.onComplete(onComplete)

migrationFut
}
}

object AkkaPersistencePostgresMigration {

def configure(config: Config): Builder =
Builder(Flyway.configure.table("persistence_migration_log"), config)

case class Builder private (flywayConfig: FluentConfiguration, config: Config) {

def withMigrationLogTableName(tableName: String): Builder =
copy(flywayConfig = flywayConfig.table(tableName))

def withMigrationLogTableSchema(schema: String): Builder =
copy(flywayConfig = flywayConfig.schemas(schema).defaultSchema(schema))

def build(implicit system: ActorSystem): AkkaPersistencePostgresMigration = {
implicit val met: Materializer = SystemMaterializer(system).materializer

val slickDb = SlickExtension(system).database(config.getConfig("postgres-journal"))
val db = slickDb.database

val flyway = flywayConfig
.dataSource(new DatasourceAdapter(db))
.javaMigrations(new V2__Extract_journal_metadata(config, db))
.load()

new AkkaPersistencePostgresMigration(flyway, _ => db.close())
}
}

private[AkkaPersistencePostgresMigration] class DatasourceAdapter(database: JdbcBackend#Database) extends DataSource {
override def getConnection: Connection = database.createSession().conn
override def getConnection(username: String, password: String) = throw new UnsupportedOperationException()
override def unwrap[T](iface: Class[T]): T =
if (iface.isInstance(this)) this.asInstanceOf[T]
else throw new IllegalArgumentException(getClass.getName + " is not a wrapper for " + iface)
override def isWrapperFor(iface: Class[_]): Boolean = iface.isInstance(this)
override def getLogWriter = throw new UnsupportedOperationException()
override def setLogWriter(out: PrintWriter): Unit = throw new UnsupportedOperationException()
override def setLoginTimeout(seconds: Int): Unit = DriverManager.setLoginTimeout(seconds)
override def getLoginTimeout: Int = DriverManager.getLoginTimeout
override def getParentLogger = throw new UnsupportedOperationException()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package akka.persistence.postgres.migration

import java.io.File

import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import org.slf4j.{ Logger, LoggerFactory }

object Main {

val log: Logger = LoggerFactory.getLogger(this.getClass)

def main(args: Array[String]): Unit = {
val configPath = args.head
val configFile = new File(configPath)
if (configFile.exists()) {
val config =
ConfigFactory.parseFile(configFile).withFallback(ConfigFactory.defaultReferenceUnresolved()).resolve()

implicit val system: ActorSystem = ActorSystem("migration-tool-AS", config)
import system.dispatcher

val migration = AkkaPersistencePostgresMigration.configure(config).build
migration.run.onComplete(_ => system.terminate())
} else
log.error(s"Cannot load migration config - '$configPath' does not exist. Aborting.")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package akka.persistence.postgres.migration

import akka.persistence.postgres.db.ExtendedPostgresProfile
import io.circe.{ Json, Printer }
import org.flywaydb.core.api.migration.BaseJavaMigration
import org.slf4j.{ Logger, LoggerFactory }
import slick.jdbc.{ GetResult, SetParameter }

abstract class SlickMigration extends BaseJavaMigration with ExtendedPostgresProfile.MyAPI {

lazy val log: Logger = LoggerFactory.getLogger(this.getClass)

implicit val GetIntList: GetResult[List[Int]] = GetResult(_.nextArray[Int]().toList)
implicit val GetByteArr: GetResult[Array[Byte]] = GetResult(_.nextBytes())
implicit val SetByteArr: SetParameter[Array[Byte]] = SetParameter((arr, pp) => pp.setBytes(arr))
implicit val SetJson: SetParameter[Json] = SetParameter((json, pp) => pp.setString(json.printWith(Printer.noSpaces)))

}
Loading