Skip to content

Commit

Permalink
Merge pull request #1459 from github/danieljoos-dml-query-builders
Browse files Browse the repository at this point in the history
Improve query building routines of DML event queries, reducing time and allocations
  • Loading branch information
arthurschreiber authored Oct 23, 2024
2 parents 30f28c2 + 5c0829a commit a834c00
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 85 deletions.
52 changes: 44 additions & 8 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type Applier struct {
migrationContext *base.MigrationContext
finishedMigrating int64
name string

dmlDeleteQueryBuilder *sql.DMLDeleteQueryBuilder
dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder
dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder
}

func NewApplier(migrationContext *base.MigrationContext) *Applier {
Expand Down Expand Up @@ -106,6 +110,37 @@ func (this *Applier) InitDBConnections() (err error) {
return nil
}

func (this *Applier) prepareQueries() (err error) {
if this.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder(
this.migrationContext.DatabaseName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.OriginalTableColumns,
&this.migrationContext.UniqueKey.Columns,
); err != nil {
return err
}
if this.dmlInsertQueryBuilder, err = sql.NewDMLInsertQueryBuilder(
this.migrationContext.DatabaseName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.OriginalTableColumns,
this.migrationContext.SharedColumns,
this.migrationContext.MappedSharedColumns,
); err != nil {
return err
}
if this.dmlUpdateQueryBuilder, err = sql.NewDMLUpdateQueryBuilder(
this.migrationContext.DatabaseName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.OriginalTableColumns,
this.migrationContext.SharedColumns,
this.migrationContext.MappedSharedColumns,
&this.migrationContext.UniqueKey.Columns,
); err != nil {
return err
}
return nil
}

// validateAndReadGlobalVariables potentially reads server global variables, such as the time_zone and wait_timeout.
func (this *Applier) validateAndReadGlobalVariables() error {
query := `select /* gh-ost */ @@global.time_zone, @@global.wait_timeout`
Expand Down Expand Up @@ -1137,35 +1172,36 @@ func (this *Applier) updateModifiesUniqueKeyColumns(dmlEvent *binlog.BinlogDMLEv

// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
// event entry on the original table.
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (results [](*dmlBuildResult)) {
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlBuildResult {
switch dmlEvent.DML {
case binlog.DeleteDML:
{
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
return append(results, newDmlBuildResult(query, uniqueKeyArgs, -1, err))
query, uniqueKeyArgs, err := this.dmlDeleteQueryBuilder.BuildQuery(dmlEvent.WhereColumnValues.AbstractValues())
return []*dmlBuildResult{newDmlBuildResult(query, uniqueKeyArgs, -1, err)}
}
case binlog.InsertDML:
{
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
return append(results, newDmlBuildResult(query, sharedArgs, 1, err))
query, sharedArgs, err := this.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues())
return []*dmlBuildResult{newDmlBuildResult(query, sharedArgs, 1, err)}
}
case binlog.UpdateDML:
{
if _, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified {
results := make([]*dmlBuildResult, 0, 2)
dmlEvent.DML = binlog.DeleteDML
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
dmlEvent.DML = binlog.InsertDML
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
return results
}
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
query, sharedArgs, uniqueKeyArgs, err := this.dmlUpdateQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args := sqlutils.Args()
args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...)
return append(results, newDmlBuildResult(query, args, 0, err))
return []*dmlBuildResult{newDmlBuildResult(query, args, 0, err)}
}
}
return append(results, newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)))
return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))}
}

// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
Expand Down
7 changes: 7 additions & 0 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
columnValues := sql.ToColumnValues([]interface{}{123456, 42})

migrationContext := base.NewMigrationContext()
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "test"
migrationContext.OriginalTableColumns = columns
migrationContext.SharedColumns = columns
Expand All @@ -111,6 +112,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
}

applier := NewApplier(migrationContext)
applier.prepareQueries()

t.Run("delete", func(t *testing.T) {
binlogEvent := &binlog.BinlogDMLEvent{
Expand Down Expand Up @@ -307,8 +309,13 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "primary_key",
Columns: *sql.NewColumnList([]string{"id"}),
}

applier := NewApplier(migrationContext)
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
Expand Down
4 changes: 4 additions & 0 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,10 @@ func (this *Migrator) Migrate() (err error) {
if err := this.inspector.inspectOriginalAndGhostTables(); err != nil {
return err
}
// We can prepare some of the queries on the applier
if err := this.applier.prepareQueries(); err != nil {
return err
}
// Validation complete! We're good to execute this migration
if err := this.hooksExecutor.onValidated(); err != nil {
return err
Expand Down
177 changes: 122 additions & 55 deletions go/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,25 +406,29 @@ func buildUniqueKeyMinMaxValuesPreparedQuery(databaseName, tableName string, uni
return query, nil
}

func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKeyColumns *ColumnList, args []interface{}) (result string, uniqueKeyArgs []interface{}, err error) {
if len(args) != tableColumns.Len() {
return result, uniqueKeyArgs, fmt.Errorf("args count differs from table column count in BuildDMLDeleteQuery")
}
// DMLDeleteQueryBuilder can build DELETE queries for DML events.
// It holds the prepared query statement so it doesn't need to be recreated every time.
type DMLDeleteQueryBuilder struct {
tableColumns, uniqueKeyColumns *ColumnList
preparedStatement string
}

// NewDMLDeleteQueryBuilder creates a new DMLDeleteQueryBuilder.
// It prepares the DELETE query statement.
// Returns an error if no unique key columns are given
// or the prepared statement cannot be built.
func NewDMLDeleteQueryBuilder(databaseName, tableName string, tableColumns, uniqueKeyColumns *ColumnList) (*DMLDeleteQueryBuilder, error) {
if uniqueKeyColumns.Len() == 0 {
return result, uniqueKeyArgs, fmt.Errorf("No unique key columns found in BuildDMLDeleteQuery")
}
for _, column := range uniqueKeyColumns.Columns() {
tableOrdinal := tableColumns.Ordinals[column.Name]
arg := column.convertArg(args[tableOrdinal], true)
uniqueKeyArgs = append(uniqueKeyArgs, arg)
return nil, fmt.Errorf("no unique key columns found in NewDMLDeleteQueryBuilder")
}
databaseName = EscapeName(databaseName)
tableName = EscapeName(tableName)
equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names())
if err != nil {
return result, uniqueKeyArgs, err
return nil, err
}
result = fmt.Sprintf(`

stmt := fmt.Sprintf(`
delete /* gh-ost %s.%s */
from
%s.%s
Expand All @@ -434,35 +438,58 @@ func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKey
databaseName, tableName,
equalsComparison,
)
return result, uniqueKeyArgs, nil

b := &DMLDeleteQueryBuilder{
tableColumns: tableColumns,
uniqueKeyColumns: uniqueKeyColumns,
preparedStatement: stmt,
}
return b, nil
}

func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList, args []interface{}) (result string, sharedArgs []interface{}, err error) {
if len(args) != tableColumns.Len() {
return result, args, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery")
// BuildQuery builds the arguments array for a DML event DELETE query.
// It returns the query string and the unique key arguments array.
// Returns an error if the number of arguments is not equal to the number of table columns.
func (b *DMLDeleteQueryBuilder) BuildQuery(args []interface{}) (string, []interface{}, error) {
if len(args) != b.tableColumns.Len() {
return "", nil, fmt.Errorf("args count differs from table column count in BuildDMLDeleteQuery")
}
uniqueKeyArgs := make([]interface{}, 0, b.uniqueKeyColumns.Len())
for _, column := range b.uniqueKeyColumns.Columns() {
tableOrdinal := b.tableColumns.Ordinals[column.Name]
arg := column.convertArg(args[tableOrdinal], true)
uniqueKeyArgs = append(uniqueKeyArgs, arg)
}
return b.preparedStatement, uniqueKeyArgs, nil
}

// DMLInsertQueryBuilder can build INSERT queries for DML events.
// It holds the prepared query statement so it doesn't need to be recreated every time.
type DMLInsertQueryBuilder struct {
tableColumns, sharedColumns *ColumnList
preparedStatement string
}

// NewDMLInsertQueryBuilder creates a new DMLInsertQueryBuilder.
// It prepares the INSERT query statement.
// Returns an error if no shared columns are given, the shared columns are not a subset of the table columns,
// or the prepared statement cannot be built.
func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList) (*DMLInsertQueryBuilder, error) {
if !sharedColumns.IsSubsetOf(tableColumns) {
return result, args, fmt.Errorf("shared columns is not a subset of table columns in BuildDMLInsertQuery")
return nil, fmt.Errorf("shared columns is not a subset of table columns in NewDMLInsertQueryBuilder")
}
if sharedColumns.Len() == 0 {
return result, args, fmt.Errorf("No shared columns found in BuildDMLInsertQuery")
return nil, fmt.Errorf("no shared columns found in NewDMLInsertQueryBuilder")
}
databaseName = EscapeName(databaseName)
tableName = EscapeName(tableName)

for _, column := range sharedColumns.Columns() {
tableOrdinal := tableColumns.Ordinals[column.Name]
arg := column.convertArg(args[tableOrdinal], false)
sharedArgs = append(sharedArgs, arg)
}

mappedSharedColumnNames := duplicateNames(mappedSharedColumns.Names())
for i := range mappedSharedColumnNames {
mappedSharedColumnNames[i] = EscapeName(mappedSharedColumnNames[i])
}
preparedValues := buildColumnsPreparedValues(mappedSharedColumns)

result = fmt.Sprintf(`
stmt := fmt.Sprintf(`
replace /* gh-ost %s.%s */
into
%s.%s
Expand All @@ -474,53 +501,63 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol
strings.Join(mappedSharedColumnNames, ", "),
strings.Join(preparedValues, ", "),
)
return result, sharedArgs, nil

return &DMLInsertQueryBuilder{
tableColumns: tableColumns,
sharedColumns: sharedColumns,
preparedStatement: stmt,
}, nil
}

func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns, uniqueKeyColumns *ColumnList, valueArgs, whereArgs []interface{}) (result string, sharedArgs, uniqueKeyArgs []interface{}, err error) {
if len(valueArgs) != tableColumns.Len() {
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("value args count differs from table column count in BuildDMLUpdateQuery")
// BuildQuery builds the arguments array for a DML event INSERT query.
// It returns the query string and the shared arguments array.
// Returns an error if the number of arguments differs from the number of table columns.
func (b *DMLInsertQueryBuilder) BuildQuery(args []interface{}) (string, []interface{}, error) {
if len(args) != b.tableColumns.Len() {
return "", nil, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery")
}
if len(whereArgs) != tableColumns.Len() {
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("where args count differs from table column count in BuildDMLUpdateQuery")
sharedArgs := make([]interface{}, 0, b.sharedColumns.Len())
for _, column := range b.sharedColumns.Columns() {
tableOrdinal := b.tableColumns.Ordinals[column.Name]
arg := column.convertArg(args[tableOrdinal], false)
sharedArgs = append(sharedArgs, arg)
}
return b.preparedStatement, sharedArgs, nil
}

// DMLUpdateQueryBuilder can build UPDATE queries for DML events.
// It holds the prepared query statement so it doesn't need to be recreated every time.
type DMLUpdateQueryBuilder struct {
tableColumns, sharedColumns, uniqueKeyColumns *ColumnList
preparedStatement string
}

// NewDMLUpdateQueryBuilder creates a new DMLUpdateQueryBuilder.
// It prepares the UPDATE query statement.
// Returns an error if no shared columns are given, the shared columns are not a subset of the table columns,
// no unique key columns are given or the prepared statement cannot be built.
func NewDMLUpdateQueryBuilder(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns, uniqueKeyColumns *ColumnList) (*DMLUpdateQueryBuilder, error) {
if !sharedColumns.IsSubsetOf(tableColumns) {
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("shared columns is not a subset of table columns in BuildDMLUpdateQuery")
}
if !uniqueKeyColumns.IsSubsetOf(sharedColumns) {
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("unique key columns is not a subset of shared columns in BuildDMLUpdateQuery")
return nil, fmt.Errorf("shared columns is not a subset of table columns in NewDMLUpdateQueryBuilder")
}
if sharedColumns.Len() == 0 {
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("No shared columns found in BuildDMLUpdateQuery")
return nil, fmt.Errorf("no shared columns found in NewDMLUpdateQueryBuilder")
}
if uniqueKeyColumns.Len() == 0 {
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("No unique key columns found in BuildDMLUpdateQuery")
return nil, fmt.Errorf("no unique key columns found in NewDMLUpdateQueryBuilder")
}
databaseName = EscapeName(databaseName)
tableName = EscapeName(tableName)

for _, column := range sharedColumns.Columns() {
tableOrdinal := tableColumns.Ordinals[column.Name]
arg := column.convertArg(valueArgs[tableOrdinal], false)
sharedArgs = append(sharedArgs, arg)
}

for _, column := range uniqueKeyColumns.Columns() {
tableOrdinal := tableColumns.Ordinals[column.Name]
arg := column.convertArg(whereArgs[tableOrdinal], true)
uniqueKeyArgs = append(uniqueKeyArgs, arg)
}

setClause, err := BuildSetPreparedClause(mappedSharedColumns)
if err != nil {
return "", sharedArgs, uniqueKeyArgs, err
return nil, err
}

equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names())
if err != nil {
return "", sharedArgs, uniqueKeyArgs, err
return nil, err
}
result = fmt.Sprintf(`
stmt := fmt.Sprintf(`
update /* gh-ost %s.%s */
%s.%s
set
Expand All @@ -532,5 +569,35 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol
setClause,
equalsComparison,
)
return result, sharedArgs, uniqueKeyArgs, nil
return &DMLUpdateQueryBuilder{
tableColumns: tableColumns,
sharedColumns: sharedColumns,
uniqueKeyColumns: uniqueKeyColumns,
preparedStatement: stmt,
}, nil
}

// BuildQuery builds the arguments array for a DML event UPDATE query.
// It returns the query string, the shared arguments array, and the unique key arguments array.
func (b *DMLUpdateQueryBuilder) BuildQuery(valueArgs, whereArgs []interface{}) (string, []interface{}, []interface{}, error) {
// TODO: move this check back to `NewDMLUpdateQueryBuilder()`, needs fix on generated columns.
if !b.uniqueKeyColumns.IsSubsetOf(b.sharedColumns) {
return "", nil, nil, fmt.Errorf("unique key columns is not a subset of shared columns in DMLUpdateQueryBuilder")
}

sharedArgs := make([]interface{}, 0, b.sharedColumns.Len())
for _, column := range b.sharedColumns.Columns() {
tableOrdinal := b.tableColumns.Ordinals[column.Name]
arg := column.convertArg(valueArgs[tableOrdinal], false)
sharedArgs = append(sharedArgs, arg)
}

uniqueKeyArgs := make([]interface{}, 0, b.uniqueKeyColumns.Len())
for _, column := range b.uniqueKeyColumns.Columns() {
tableOrdinal := b.tableColumns.Ordinals[column.Name]
arg := column.convertArg(whereArgs[tableOrdinal], true)
uniqueKeyArgs = append(uniqueKeyArgs, arg)
}

return b.preparedStatement, sharedArgs, uniqueKeyArgs, nil
}
Loading

0 comments on commit a834c00

Please sign in to comment.