Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve query building routines of DML event queries, reducing time and allocations #1459

Merged
merged 4 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type Applier struct {
migrationContext *base.MigrationContext
finishedMigrating int64
name string

dmlDeleteQueryBuilder *sql.DMLDeleteQueryBuilder
dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder
dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder
}

func NewApplier(migrationContext *base.MigrationContext) *Applier {
Expand Down Expand Up @@ -106,6 +110,37 @@ func (this *Applier) InitDBConnections() (err error) {
return nil
}

func (this *Applier) prepareQueries() (err error) {
if this.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder(
this.migrationContext.DatabaseName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.OriginalTableColumns,
&this.migrationContext.UniqueKey.Columns,
); err != nil {
return err
}
if this.dmlInsertQueryBuilder, err = sql.NewDMLInsertQueryBuilder(
this.migrationContext.DatabaseName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.OriginalTableColumns,
this.migrationContext.SharedColumns,
this.migrationContext.MappedSharedColumns,
); err != nil {
return err
}
if this.dmlUpdateQueryBuilder, err = sql.NewDMLUpdateQueryBuilder(
this.migrationContext.DatabaseName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.OriginalTableColumns,
this.migrationContext.SharedColumns,
this.migrationContext.MappedSharedColumns,
&this.migrationContext.UniqueKey.Columns,
); err != nil {
return err
}
return nil
}

// validateAndReadGlobalVariables potentially reads server global variables, such as the time_zone and wait_timeout.
func (this *Applier) validateAndReadGlobalVariables() error {
query := `select /* gh-ost */ @@global.time_zone, @@global.wait_timeout`
Expand Down Expand Up @@ -1137,35 +1172,36 @@ func (this *Applier) updateModifiesUniqueKeyColumns(dmlEvent *binlog.BinlogDMLEv

// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
// event entry on the original table.
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (results [](*dmlBuildResult)) {
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlBuildResult {
switch dmlEvent.DML {
case binlog.DeleteDML:
{
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
return append(results, newDmlBuildResult(query, uniqueKeyArgs, -1, err))
query, uniqueKeyArgs, err := this.dmlDeleteQueryBuilder.BuildQuery(dmlEvent.WhereColumnValues.AbstractValues())
return []*dmlBuildResult{newDmlBuildResult(query, uniqueKeyArgs, -1, err)}
}
case binlog.InsertDML:
{
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
return append(results, newDmlBuildResult(query, sharedArgs, 1, err))
query, sharedArgs, err := this.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues())
return []*dmlBuildResult{newDmlBuildResult(query, sharedArgs, 1, err)}
}
case binlog.UpdateDML:
{
if _, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified {
results := make([]*dmlBuildResult, 0, 2)
dmlEvent.DML = binlog.DeleteDML
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
dmlEvent.DML = binlog.InsertDML
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
return results
}
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
query, sharedArgs, uniqueKeyArgs, err := this.dmlUpdateQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args := sqlutils.Args()
args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...)
return append(results, newDmlBuildResult(query, args, 0, err))
return []*dmlBuildResult{newDmlBuildResult(query, args, 0, err)}
}
}
return append(results, newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)))
return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))}
}

// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
Expand Down
7 changes: 7 additions & 0 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
columnValues := sql.ToColumnValues([]interface{}{123456, 42})

migrationContext := base.NewMigrationContext()
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "test"
migrationContext.OriginalTableColumns = columns
migrationContext.SharedColumns = columns
Expand All @@ -111,6 +112,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
}

applier := NewApplier(migrationContext)
applier.prepareQueries()

t.Run("delete", func(t *testing.T) {
binlogEvent := &binlog.BinlogDMLEvent{
Expand Down Expand Up @@ -307,8 +309,13 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "primary_key",
Columns: *sql.NewColumnList([]string{"id"}),
}

applier := NewApplier(migrationContext)
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
Expand Down
4 changes: 4 additions & 0 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,10 @@ func (this *Migrator) Migrate() (err error) {
if err := this.inspector.inspectOriginalAndGhostTables(); err != nil {
return err
}
// We can prepare some of the queries on the applier
if err := this.applier.prepareQueries(); err != nil {
return err
}
// Validation complete! We're good to execute this migration
if err := this.hooksExecutor.onValidated(); err != nil {
return err
Expand Down
177 changes: 122 additions & 55 deletions go/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,25 +406,29 @@ func buildUniqueKeyMinMaxValuesPreparedQuery(databaseName, tableName string, uni
return query, nil
}

func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKeyColumns *ColumnList, args []interface{}) (result string, uniqueKeyArgs []interface{}, err error) {
if len(args) != tableColumns.Len() {
return result, uniqueKeyArgs, fmt.Errorf("args count differs from table column count in BuildDMLDeleteQuery")
}
// DMLDeleteQueryBuilder can build DELETE queries for DML events.
// It holds the prepared query statement so it doesn't need to be recreated every time.
type DMLDeleteQueryBuilder struct {
tableColumns, uniqueKeyColumns *ColumnList
preparedStatement string
}

// NewDMLDeleteQueryBuilder creates a new DMLDeleteQueryBuilder.
// It prepares the DELETE query statement.
// Returns an error if no unique key columns are given
// or the prepared statement cannot be built.
func NewDMLDeleteQueryBuilder(databaseName, tableName string, tableColumns, uniqueKeyColumns *ColumnList) (*DMLDeleteQueryBuilder, error) {
if uniqueKeyColumns.Len() == 0 {
return result, uniqueKeyArgs, fmt.Errorf("No unique key columns found in BuildDMLDeleteQuery")
}
for _, column := range uniqueKeyColumns.Columns() {
tableOrdinal := tableColumns.Ordinals[column.Name]
arg := column.convertArg(args[tableOrdinal], true)
uniqueKeyArgs = append(uniqueKeyArgs, arg)
return nil, fmt.Errorf("no unique key columns found in NewDMLDeleteQueryBuilder")
}
databaseName = EscapeName(databaseName)
tableName = EscapeName(tableName)
equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names())
if err != nil {
return result, uniqueKeyArgs, err
return nil, err
}
result = fmt.Sprintf(`

stmt := fmt.Sprintf(`
delete /* gh-ost %s.%s */
from
%s.%s
Expand All @@ -434,35 +438,58 @@ func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKey
databaseName, tableName,
equalsComparison,
)
return result, uniqueKeyArgs, nil

b := &DMLDeleteQueryBuilder{
tableColumns: tableColumns,
uniqueKeyColumns: uniqueKeyColumns,
preparedStatement: stmt,
}
return b, nil
}

func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList, args []interface{}) (result string, sharedArgs []interface{}, err error) {
if len(args) != tableColumns.Len() {
return result, args, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery")
// BuildQuery builds the arguments array for a DML event DELETE query.
// It returns the query string and the unique key arguments array.
// Returns an error if the number of arguments is not equal to the number of table columns.
func (b *DMLDeleteQueryBuilder) BuildQuery(args []interface{}) (string, []interface{}, error) {
if len(args) != b.tableColumns.Len() {
return "", nil, fmt.Errorf("args count differs from table column count in BuildDMLDeleteQuery")
}
uniqueKeyArgs := make([]interface{}, 0, b.uniqueKeyColumns.Len())
for _, column := range b.uniqueKeyColumns.Columns() {
tableOrdinal := b.tableColumns.Ordinals[column.Name]
arg := column.convertArg(args[tableOrdinal], true)
uniqueKeyArgs = append(uniqueKeyArgs, arg)
}
return b.preparedStatement, uniqueKeyArgs, nil
}

// DMLInsertQueryBuilder can build INSERT queries for DML events.
// It holds the prepared query statement so it doesn't need to be recreated every time.
type DMLInsertQueryBuilder struct {
tableColumns, sharedColumns *ColumnList
preparedStatement string
}

// NewDMLInsertQueryBuilder creates a new DMLInsertQueryBuilder.
// It prepares the INSERT query statement.
// Returns an error if no shared columns are given, the shared columns are not a subset of the table columns,
// or the prepared statement cannot be built.
func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList) (*DMLInsertQueryBuilder, error) {
if !sharedColumns.IsSubsetOf(tableColumns) {
return result, args, fmt.Errorf("shared columns is not a subset of table columns in BuildDMLInsertQuery")
return nil, fmt.Errorf("shared columns is not a subset of table columns in NewDMLInsertQueryBuilder")
}
if sharedColumns.Len() == 0 {
return result, args, fmt.Errorf("No shared columns found in BuildDMLInsertQuery")
return nil, fmt.Errorf("no shared columns found in NewDMLInsertQueryBuilder")
}
databaseName = EscapeName(databaseName)
tableName = EscapeName(tableName)

for _, column := range sharedColumns.Columns() {
tableOrdinal := tableColumns.Ordinals[column.Name]
arg := column.convertArg(args[tableOrdinal], false)
sharedArgs = append(sharedArgs, arg)
}

mappedSharedColumnNames := duplicateNames(mappedSharedColumns.Names())
for i := range mappedSharedColumnNames {
mappedSharedColumnNames[i] = EscapeName(mappedSharedColumnNames[i])
}
preparedValues := buildColumnsPreparedValues(mappedSharedColumns)

result = fmt.Sprintf(`
stmt := fmt.Sprintf(`
replace /* gh-ost %s.%s */
into
%s.%s
Expand All @@ -474,53 +501,63 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol
strings.Join(mappedSharedColumnNames, ", "),
strings.Join(preparedValues, ", "),
)
return result, sharedArgs, nil

return &DMLInsertQueryBuilder{
tableColumns: tableColumns,
sharedColumns: sharedColumns,
preparedStatement: stmt,
}, nil
}

func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns, uniqueKeyColumns *ColumnList, valueArgs, whereArgs []interface{}) (result string, sharedArgs, uniqueKeyArgs []interface{}, err error) {
if len(valueArgs) != tableColumns.Len() {
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("value args count differs from table column count in BuildDMLUpdateQuery")
// BuildQuery builds the arguments array for a DML event INSERT query.
// It returns the query string and the shared arguments array.
// Returns an error if the number of arguments differs from the number of table columns.
func (b *DMLInsertQueryBuilder) BuildQuery(args []interface{}) (string, []interface{}, error) {
if len(args) != b.tableColumns.Len() {
return "", nil, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery")
}
if len(whereArgs) != tableColumns.Len() {
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("where args count differs from table column count in BuildDMLUpdateQuery")
sharedArgs := make([]interface{}, 0, b.sharedColumns.Len())
for _, column := range b.sharedColumns.Columns() {
tableOrdinal := b.tableColumns.Ordinals[column.Name]
arg := column.convertArg(args[tableOrdinal], false)
sharedArgs = append(sharedArgs, arg)
}
return b.preparedStatement, sharedArgs, nil
}

// DMLUpdateQueryBuilder can build UPDATE queries for DML events.
// It holds the prepared query statement so it doesn't need to be recreated every time.
type DMLUpdateQueryBuilder struct {
tableColumns, sharedColumns, uniqueKeyColumns *ColumnList
preparedStatement string
}

// NewDMLUpdateQueryBuilder creates a new DMLUpdateQueryBuilder.
// It prepares the UPDATE query statement.
// Returns an error if no shared columns are given, the shared columns are not a subset of the table columns,
// no unique key columns are given or the prepared statement cannot be built.
func NewDMLUpdateQueryBuilder(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns, uniqueKeyColumns *ColumnList) (*DMLUpdateQueryBuilder, error) {
if !sharedColumns.IsSubsetOf(tableColumns) {
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("shared columns is not a subset of table columns in BuildDMLUpdateQuery")
}
if !uniqueKeyColumns.IsSubsetOf(sharedColumns) {
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("unique key columns is not a subset of shared columns in BuildDMLUpdateQuery")
return nil, fmt.Errorf("shared columns is not a subset of table columns in NewDMLUpdateQueryBuilder")
}
if sharedColumns.Len() == 0 {
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("No shared columns found in BuildDMLUpdateQuery")
return nil, fmt.Errorf("no shared columns found in NewDMLUpdateQueryBuilder")
}
if uniqueKeyColumns.Len() == 0 {
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("No unique key columns found in BuildDMLUpdateQuery")
return nil, fmt.Errorf("no unique key columns found in NewDMLUpdateQueryBuilder")
}
databaseName = EscapeName(databaseName)
tableName = EscapeName(tableName)

for _, column := range sharedColumns.Columns() {
tableOrdinal := tableColumns.Ordinals[column.Name]
arg := column.convertArg(valueArgs[tableOrdinal], false)
sharedArgs = append(sharedArgs, arg)
}

for _, column := range uniqueKeyColumns.Columns() {
tableOrdinal := tableColumns.Ordinals[column.Name]
arg := column.convertArg(whereArgs[tableOrdinal], true)
uniqueKeyArgs = append(uniqueKeyArgs, arg)
}

setClause, err := BuildSetPreparedClause(mappedSharedColumns)
if err != nil {
return "", sharedArgs, uniqueKeyArgs, err
return nil, err
}

equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names())
if err != nil {
return "", sharedArgs, uniqueKeyArgs, err
return nil, err
}
result = fmt.Sprintf(`
stmt := fmt.Sprintf(`
update /* gh-ost %s.%s */
%s.%s
set
Expand All @@ -532,5 +569,35 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol
setClause,
equalsComparison,
)
return result, sharedArgs, uniqueKeyArgs, nil
return &DMLUpdateQueryBuilder{
tableColumns: tableColumns,
sharedColumns: sharedColumns,
uniqueKeyColumns: uniqueKeyColumns,
preparedStatement: stmt,
}, nil
}

// BuildQuery builds the arguments array for a DML event UPDATE query.
// It returns the query string, the shared arguments array, and the unique key arguments array.
func (b *DMLUpdateQueryBuilder) BuildQuery(valueArgs, whereArgs []interface{}) (string, []interface{}, []interface{}, error) {
// TODO: move this check back to `NewDMLUpdateQueryBuilder()`, needs fix on generated columns.
if !b.uniqueKeyColumns.IsSubsetOf(b.sharedColumns) {
return "", nil, nil, fmt.Errorf("unique key columns is not a subset of shared columns in DMLUpdateQueryBuilder")
}

sharedArgs := make([]interface{}, 0, b.sharedColumns.Len())
for _, column := range b.sharedColumns.Columns() {
tableOrdinal := b.tableColumns.Ordinals[column.Name]
arg := column.convertArg(valueArgs[tableOrdinal], false)
sharedArgs = append(sharedArgs, arg)
}

uniqueKeyArgs := make([]interface{}, 0, b.uniqueKeyColumns.Len())
for _, column := range b.uniqueKeyColumns.Columns() {
tableOrdinal := b.tableColumns.Ordinals[column.Name]
arg := column.convertArg(whereArgs[tableOrdinal], true)
uniqueKeyArgs = append(uniqueKeyArgs, arg)
}

return b.preparedStatement, sharedArgs, uniqueKeyArgs, nil
}
Loading