Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: verify there are no missing migrations before migrating #89

Merged
merged 1 commit into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 38 additions & 43 deletions storage/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,57 +41,25 @@ func getSchemaVersions(ctx context.Context, db *pg.DB) (int, int, error) {
return 0, 0, xerrors.Errorf("unable to determine schema version: %w", err)
}

// Current desired schema version is based on the highest migration version
var desiredVersion int64
latestVersion := getLatestSchemaVersion()
return int(dbVersion), latestVersion, nil
}

// Latest schema version is based on the highest migration version
func getLatestSchemaVersion() int {
var latestVersion int64
ms := migrations.DefaultCollection.Migrations()
for _, m := range ms {
if m.Version > desiredVersion {
desiredVersion = m.Version
if m.Version > latestVersion {
latestVersion = m.Version
}
}

return int(dbVersion), int(desiredVersion), nil
return int(latestVersion)
}

// MigrateSchema migrates the database schema to the latest version based on the list of migrations available
func (d *Database) MigrateSchema(ctx context.Context) error {
db, err := connect(ctx, d.opt)
if err != nil {
return xerrors.Errorf("connect: %w", err)
}
defer db.Close()

dbVersion, latestVersion, err := getSchemaVersions(ctx, db)
if err != nil {
return xerrors.Errorf("get schema versions: %w", err)
}
log.Infof("current database schema is version %d", dbVersion)

if dbVersion == latestVersion {
log.Info("current database schema is at latest version, no migration needed")
return nil
}

// Acquire an exclusive lock on the schema so we know no other instances are running
if err := SchemaLock.LockExclusive(ctx, db); err != nil {
return xerrors.Errorf("acquiring schema lock: %w", err)
}

// Remember to release the lock
defer func() {
err := SchemaLock.UnlockExclusive(ctx, db)
if err != nil {
log.Errorf("failed to release exclusive lock: %v", err)
}
}()

log.Infof("running schema migration from version %d to version %d", dbVersion, latestVersion)
_, newDBVersion, err := migrations.Run(db, "up")
if err != nil {
return xerrors.Errorf("run migration: %w", err)
}
log.Infof("current database schema is now version %d", newDBVersion)
return nil
return d.MigrateSchemaTo(ctx, getLatestSchemaVersion())
}

// MigrateSchema migrates the database schema to a specific version. Note that downgrading a schema to an earlier
Expand All @@ -117,6 +85,10 @@ func (d *Database) MigrateSchemaTo(ctx context.Context, target int) error {
return xerrors.Errorf("database schema is already at version %d", dbVersion)
}

if err := checkMigrationSequence(ctx, dbVersion, target); err != nil {
return xerrors.Errorf("check migration sequence: %w", err)
}

// Acquire an exclusive lock on the schema so we know no other instances are running
if err := SchemaLock.LockExclusive(ctx, db); err != nil {
return xerrors.Errorf("acquiring schema lock: %w", err)
Expand Down Expand Up @@ -154,3 +126,26 @@ func (d *Database) MigrateSchemaTo(ctx context.Context, target int) error {

return nil
}

func checkMigrationSequence(ctx context.Context, from, to int) error {
versions := map[int64]bool{}
ms := migrations.DefaultCollection.Migrations()
for _, m := range ms {
if versions[m.Version] {
return xerrors.Errorf("duplication migration for schema version %d", m.Version)
}
versions[m.Version] = true
}

if from > to {
to, from = from, to
}

for i := from; i <= to; i++ {
if !versions[int64(i)] {
return xerrors.Errorf("missing migration for schema version %d", i)
}
}

return nil
}
12 changes: 4 additions & 8 deletions storage/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
"time"

"github.com/go-pg/migrations/v8"
"github.com/go-pg/pg/v10"
"github.com/go-pg/pg/v10/orm"
"github.com/stretchr/testify/assert"
Expand All @@ -19,13 +18,10 @@ import (
"github.com/filecoin-project/sentinel-visor/testutil"
)

func TestNoDuplicateSchemaMigrations(t *testing.T) {
versions := map[int64]bool{}
ms := migrations.DefaultCollection.Migrations()
for _, m := range ms {
require.False(t, versions[m.Version], "Duplication migration for schema version: %d", m.Version)
versions[m.Version] = true
}
func TestConsistentSchemaMigrationSequence(t *testing.T) {
latestVersion := getLatestSchemaVersion()
err := checkMigrationSequence(context.Background(), 1, latestVersion)
require.NoError(t, err)
}

func TestSchemaIsCurrent(t *testing.T) {
Expand Down