-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added a generic binlog catchup process and reconcile the binlogs #88
base: main
Are you sure you want to change the base?
Conversation
Instead of passing the Table as a struct member, this makes it pass by argument in the Run function. It makes more sense as it's not really a configuration on how to Run the DataIterator, but what to run the DataIterator on. This also makes the DataIterator slightly easier to use independently of the Ferry.
c4caee5
to
d796be7
Compare
d796be7
to
c8e4d54
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does mean this PR is not strictly necessary if we just want to interrupt/resume a run without handling schema changes
I thought we came to agreement to ship full interrupt/resume before venturing onto the schema change plans. What changed your mind?
If we decide to go for full interruption, I would like us to not even have a Reconciler
struct. Just feed the Ferry the last state and everything else happens automatically. Am I wrong to assume that's possible?
In the original ghostferry algorithm, what's the difference between starting the binlog streamer from the current position vs an earlier position? Assume there's no schema change.
// This is used during the resume when we are catching up to the binlog events | ||
// missed by Ghostferry while it is down. | ||
type ReconcilationDMLEvent struct { | ||
*DMLEventBase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to embed this? It doesn't make sense to call this struct a *DMLEvent
either. It's not really an event itself. You may store the event that produced this as a field on the struct but calling this a DML event itself is very confusing
Is the only goal of this struct to provide a different AsSQLString
so that it works as-is with the BinlogWriter? I feel like we need to refactor the current BinlogWriter
to become a MultiStatementWriter
.
type MultiStatementWriter struct{} // current BinlogWriter implementation
type BinlogWriter struct{
writer *MultiStatementWriter
}
type BinlogReconciler struct{
writer *MultiStatementWriter
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to refactor the BinlogWriter and it turned out to be a bit more difficult and tedious than I would have liked. Since the interface of the DMLEvent is mainly to call AsSQLString
, I decided that it's easier to work with this version and refactor later.
return err | ||
} | ||
|
||
batch = append(batch, NewReconciliationDMLEvent(table, pk, row)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait... why do you need ReconcilationDMLEvent
at all? can't you re-use RowBatch
here?
// TODO: Step 2: setup the iterative verifier reconciler | ||
|
||
// Step 3: initialize the binlog streamer and the throttler | ||
binlogStreamer := f.NewBinlogStreamer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't it be cleaner to have this inside the reconciler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BinlogReconciler is supposed to catch up binlogs events that occured while Ghostferry is down. However, if we want to interrupt/resume the verifier, the process of resuming that will also need an instance of the binlog streamer. Since we don't want two binlog streamers, this is managed directly in the Ferry.
batch = make([]DMLEvent, 0, r.BatchSize) | ||
} | ||
|
||
row, err := r.selectRowFromSource(table, pk) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
selecting one row at a time? wouldn't that be very slow for high traffic dbs?
} | ||
|
||
func (r *BinlogReconciler) replaceBatch(batch []DMLEvent) error { | ||
err := r.BinlogWriter.WriteEvents(batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are not binlog events. see my comment earlier about MultiStatementWriter
defer func() { | ||
shutdown() | ||
throttlerWg.Wait() | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this here? seems very out of place. smells like it should be started before we even get anywhere close to here
return err | ||
} | ||
|
||
r.modifiedRows.AddRow(ev.TableSchema(), pk) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really wanna REPLACE
for every table? seems like the usual binlog playing logic should handle most tables?
The original Ghostferry algorithm is insensitive to any starting locations. As long as we don't miss any events, the data copied should be correct. If we don't start at the position of interruption and start at later point, we'll need something like the Reconciler. We would only really want to do this in the event of a schema change, because the events between the position of interruption and the position of resume could contain schema changes and thus change the binlog event structure.
So I did this PR primarily to futureproof the algorithm. It is not strictly necessary if we don't want to handle schema changes for the time being. It might be possible to resume without the reconciler but I'll have to try it out to see how it looks, especially with respect to the iterative verifier. |
This PR adds a generic binlog catchup process prior to resuming Ghostferry (in
Ferry.CatchUpOnMissedBinlogsOnResume()
). Right now, all we reconcile is the binlog events: we delete any rows that are modified during the downtime of Ghostferry and recopy them from the source. In the future, we'll add mechanisms to reconcile the iterative verifier, and so on.Note that there are no added integration tests for this PR because the current master code can already interrupt/resume by simply replaying all the binlog entries as is. The tests that covers that also covers this PR. This does mean this PR is not strictly necessary if we just want to interrupt/resume a run without handling schema changes. The reconciliation process becomes more important with a schema change as we agreed to delete the table whose schema changed and recopy it completely upon resume.
Also performed a minor refactor such that the DataIterator takes the tables to iterate over as an argument to
Run
, as opposed to a struct member.