Skip to content

Commit

Permalink
Introduce "Lock Strategy" option
Browse files Browse the repository at this point in the history
With this commit, we allow using an in-application lock in ghostferry,
instead of using the source DB as lock. The lock is required to avoid
race conditions between the data iteration/copy and the binlog writer.

The default behavior is preserved; a new option "LockStrategy" allows
moving the lock from the source DB into ghostferry, or disabling the
lock altogether.

This fixes Shopify#169

Change-Id: I20f1d2a189078a3877f831c7a98e8ca956620cc7
  • Loading branch information
Clemens Kolbitsch committed May 3, 2020
1 parent 50a7760 commit ceaba06
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 4 deletions.
17 changes: 17 additions & 0 deletions binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

sql "github.com/Shopify/ghostferry/sqlwrapper"
"sync"

"github.com/sirupsen/logrus"
)
Expand All @@ -16,6 +17,7 @@ type BinlogWriter struct {

BatchSize int
WriteRetries int
LockStrategy string

ErrorHandler ErrorHandler
StateTracker *StateTracker
Expand Down Expand Up @@ -79,6 +81,7 @@ func (b *BinlogWriter) writeEvents(events []DMLEvent) error {
WaitForThrottle(b.Throttler)

queryBuffer := []byte(sql.AnnotateStmt("BEGIN;\n", b.DB.Marginalia))
locksToObtain := make(map[string]*sync.RWMutex)

for _, ev := range events {
eventDatabaseName := ev.Database()
Expand All @@ -98,13 +101,27 @@ func (b *BinlogWriter) writeEvents(events []DMLEvent) error {

queryBuffer = append(queryBuffer, sql.AnnotateStmt(sqlStmt, b.DB.Marginalia)...)
queryBuffer = append(queryBuffer, ";\n"...)

if b.LockStrategy == LockStrategyInGhostferry {
fullTableName := ev.TableSchema().Table.String()
if _, found := locksToObtain[fullTableName]; !found {
locksToObtain[fullTableName] = b.StateTracker.GetTableLock(fullTableName)
}
}
}

queryBuffer = append(queryBuffer, "COMMIT"...)

startEv := events[0]
endEv := events[len(events)-1]
query := string(queryBuffer)

for _, lock := range locksToObtain {
if lock != nil {
lock.Lock()
defer lock.Unlock()
}
}
_, err := b.DB.Exec(query)
if err != nil {
return fmt.Errorf("exec query at pos %v -> %v (%d bytes): %v", startEv.BinlogPosition(), endEv.BinlogPosition(), len(query), err)
Expand Down
25 changes: 25 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ const (
VerifierTypeNoVerification = "NoVerification"

DefaultMarginalia = "application:ghostferry"

LockStrategySourceDB = "LockOnSourceDB"
LockStrategyInGhostferry = "LockInGhostferry"
LockStrategyNone = "None"
)

type TLSConfig struct {
Expand Down Expand Up @@ -412,6 +416,21 @@ type Config struct {
// Optional: defaults to false
AutomaticCutover bool

// This specifies how to prevent races between the data copy and binlog
// streaming. Possible values are:
// - LockOnSourceDB: obtain a table lock on the source table while copying
// data, which will prevent any type of data modification on the source
// DB; this is the strictest method but may intervene with the
// application trying to insert data,
// - LockInGhostferry: obtain a lock in ghostferry, preventing updates to
// the target DB while copying data; this should be sufficient in most
// scenarios, and
// - None: do not perform locking, assume the application does not update
// or delete data in a way that races may occur.
//
// Optional: defaults to "LockOnSourceDB"
LockStrategy string

// This specifies whether or not Ferry.Run will handle SIGINT and SIGTERM
// by dumping the current state to stdout and the error HTTP callback.
// The dumped state can be used to resume Ghostferry.
Expand Down Expand Up @@ -538,6 +557,12 @@ func (c *Config) ValidateConfig() error {
}
}

if c.LockStrategy == "" {
c.LockStrategy = LockStrategySourceDB
} else if c.LockStrategy != LockStrategySourceDB && c.LockStrategy != LockStrategyInGhostferry && c.LockStrategy != LockStrategyNone {
return fmt.Errorf("Invalid LockStrategy specified (set to %s)", c.LockStrategy)
}

if c.DBWriteRetries == 0 {
c.DBWriteRetries = 5
}
Expand Down
24 changes: 22 additions & 2 deletions cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
sql "github.com/Shopify/ghostferry/sqlwrapper"
"strings"
"sync"

"github.com/Masterminds/squirrel"
"github.com/siddontang/go-mysql/schema"
Expand All @@ -18,9 +19,24 @@ type SqlPreparer interface {

type SqlDBWithFakeRollback struct {
*sql.DB
lock *sync.RWMutex
}

func NewSqlDBWithFakeRollback(db *sql.DB, lock *sync.RWMutex) *SqlDBWithFakeRollback {
tx := &SqlDBWithFakeRollback{
DB: db,
lock: lock,
}
if lock != nil {
lock.Lock()
}
return tx
}

func (d *SqlDBWithFakeRollback) Rollback() error {
if d.lock != nil {
d.lock.Unlock()
}
return nil
}

Expand Down Expand Up @@ -53,9 +69,12 @@ func (c *CursorConfig) NewCursor(table *TableSchema, startPaginationKey, maxPagi
}

// returns a new Cursor with an embedded copy of itself
func (c *CursorConfig) NewCursorWithoutRowLock(table *TableSchema, startPaginationKey, maxPaginationKey uint64) *Cursor {
func (c *CursorConfig) NewCursorWithoutRowLock(table *TableSchema, startPaginationKey, maxPaginationKey uint64, tableLock *sync.RWMutex) *Cursor {
cursor := c.NewCursor(table, startPaginationKey, maxPaginationKey)
cursor.RowLock = false
// NOTE: We only allow internal table locking, if row-locking is disabled
// to avoid a potential deadlock
cursor.tableLock = tableLock
return cursor
}

Expand All @@ -68,6 +87,7 @@ type Cursor struct {

paginationKeyColumn *schema.TableColumn
lastSuccessfulPaginationKey uint64
tableLock *sync.RWMutex
logger *logrus.Entry
}

Expand Down Expand Up @@ -101,7 +121,7 @@ func (c *Cursor) Each(f func(*RowBatch) error) error {
return err
}
} else {
tx = &SqlDBWithFakeRollback{c.DB}
tx = NewSqlDBWithFakeRollback(c.DB, c.tableLock)
}

batch, paginationKeypos, err = c.Fetch(tx)
Expand Down
33 changes: 32 additions & 1 deletion data_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type DataIterator struct {
ErrorHandler ErrorHandler
CursorConfig *CursorConfig
StateTracker *StateTracker
LockStrategy string

targetPaginationKeys *sync.Map
batchListeners []func(*RowBatch) error
Expand Down Expand Up @@ -88,7 +89,37 @@ func (d *DataIterator) Run(tables []*TableSchema) {
return
}

cursor := d.CursorConfig.NewCursor(table, startPaginationKey, targetPaginationKeyInterface.(uint64))
// NOTE: Using a lock to synchronize data iteration and binlog writing is
// necessary. It is possible that we read data on the source while the
// binlog receives an update to the same data.
//
// Example event sequence:
// 1) application writes table row version "v1" to the source
// 2) data iterator reads v1
// 3) application updates row v1 to become v2
// 4) binlog reader receives UPDATE command v1 -> v2
// 5) binlog writer executes UPDATE v1 -> v2: this is a NOP due to how the
// writer formats UPDATE statements (v1 does not exist in the target, so
// the UPDATE has no rows to operate on)
// 6) batch writer inserts v1
// Outcome: Source contains v2 while target contains v1.
//
// There are similar events for DELETE statements. INSERT should be safe.
//
// To avoid the problem, we use a lock from steps 2 to 6 to ensure the
// source data is not modified between reading from the source and writing
// the batch to the target.
var cursor *Cursor
if d.LockStrategy == LockStrategySourceDB {
cursor = d.CursorConfig.NewCursor(table, startPaginationKey, targetPaginationKeyInterface.(uint64))
} else {
var tableLock *sync.RWMutex
if d.LockStrategy == LockStrategyInGhostferry {
tableLock = d.StateTracker.GetTableLock(table.Table.String())
}
cursor = d.CursorConfig.NewCursorWithoutRowLock(table, startPaginationKey, targetPaginationKeyInterface.(uint64), tableLock)
}

if d.SelectFingerprint {
if len(cursor.ColumnsToSelect) == 0 {
cursor.ColumnsToSelect = []string{"*"}
Expand Down
1 change: 1 addition & 0 deletions examples/copydb/run-on-replica.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
},

"RunFerryFromReplica": true,
"LockStrategy": "LockInGhostferry",
"SourceReplicationMaster": {
"Host": "127.0.0.1",
"Port": 29291,
Expand Down
2 changes: 2 additions & 0 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (f *Ferry) NewDataIterator() *DataIterator {
ReadRetries: f.Config.DBReadRetries,
},
StateTracker: f.StateTracker,
LockStrategy: f.Config.LockStrategy,
}

if f.CopyFilter != nil {
Expand Down Expand Up @@ -149,6 +150,7 @@ func (f *Ferry) NewBinlogWriter() *BinlogWriter {

BatchSize: f.Config.BinlogEventBatchSize,
WriteRetries: f.Config.DBWriteRetries,
LockStrategy: f.Config.LockStrategy,

ErrorHandler: f.ErrorHandler,
StateTracker: f.StateTracker,
Expand Down
2 changes: 1 addition & 1 deletion iterative_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (v *IterativeVerifier) iterateAllTables(mismatchedPaginationKeyFunc func(ui
func (v *IterativeVerifier) iterateTableFingerprints(table *TableSchema, mismatchedPaginationKeyFunc func(uint64, *TableSchema) error) error {
// The cursor will stop iterating when it cannot find anymore rows,
// so it will not iterate until MaxUint64.
cursor := v.CursorConfig.NewCursorWithoutRowLock(table, 0, math.MaxUint64)
cursor := v.CursorConfig.NewCursorWithoutRowLock(table, 0, math.MaxUint64, nil)

// It only needs the PaginationKeys, not the entire row.
cursor.ColumnsToSelect = []string{fmt.Sprintf("`%s`", table.GetPaginationColumn().Name)}
Expand Down
22 changes: 22 additions & 0 deletions state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type StateTracker struct {

lastSuccessfulPaginationKeys map[string]uint64
completedTables map[string]bool
tableLocks map[string]*sync.RWMutex

iterationSpeedLog *ring.Ring
}
Expand All @@ -100,6 +101,7 @@ func NewStateTracker(speedLogCount int) *StateTracker {

lastSuccessfulPaginationKeys: make(map[string]uint64),
completedTables: make(map[string]bool),
tableLocks: make(map[string]*sync.RWMutex),
iterationSpeedLog: newSpeedLogRing(speedLogCount),
}
}
Expand Down Expand Up @@ -178,6 +180,26 @@ func (s *StateTracker) IsTableComplete(table string) bool {
return s.completedTables[table]
}

func (s *StateTracker) GetTableLock(table string) *sync.RWMutex {
s.CopyRWMutex.Lock()
defer s.CopyRWMutex.Unlock()

// table locks are needed only for synchronizing data copy and binlog
// writing. We optimize this into a NULL-lock if we know this race is
// not possible
if s.completedTables[table] {
return nil
}

if lock, found := s.tableLocks[table]; found {
return lock
}

lock := &sync.RWMutex{}
s.tableLocks[table] = lock
return lock
}

// This is reasonably accurate if the rows copied are distributed uniformly
// between paginationKey = 0 -> max(paginationKey). It would not be accurate if the distribution is
// concentrated in a particular region.
Expand Down
4 changes: 4 additions & 0 deletions test/helpers/ghostferry_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ def start_ghostferry(resuming_state = nil)
environment["GHOSTFERRY_MARGINALIA"] = @config[:marginalia]
end

if @config[:lock_strategy]
environment["GHOSTFERRY_LOCK_STRATEGY"] = @config[:lock_strategy]
end

@logger.info("starting ghostferry test binary #{@compiled_binary_path}")
Open3.popen3(environment, @compiled_binary_path) do |stdin, stdout, stderr, wait_thr|
stdin.puts(resuming_state) unless resuming_state.nil?
Expand Down
9 changes: 9 additions & 0 deletions test/integration/trivial_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,13 @@ def test_logged_query_omits_columns
end
end
end

def test_lock_strategy_in_ghostferry
seed_simple_database_with_single_table

ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { lock_strategy: "LockInGhostferry" })
ghostferry.run

assert_test_table_is_identical
end
end
4 changes: 4 additions & 0 deletions test/lib/go/integrationferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ func NewStandardConfig() (*ghostferry.Config, error) {
}
}

if lockStrategy := os.Getenv("GHOSTFERRY_LOCK_STRATEGY"); lockStrategy != "" {
config.LockStrategy = lockStrategy
}

return config, config.ValidateConfig()
}

Expand Down

0 comments on commit ceaba06

Please sign in to comment.