Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: 修复dml回滚语句未忽略计算列的问题(#324) #328

Merged
merged 1 commit into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions session/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,22 @@ type FieldInfo struct {
Privileges string `gorm:"Column:Privileges"`
Comment string `gorm:"Column:Comment"`

IsDeleted bool `gorm:"-"`
IsNew bool `gorm:"-"`
IsDeleted bool `gorm:"-"`
IsNew bool `gorm:"-"`
isGenerated *bool `gorm:"-"`

Tp *types.FieldType `gorm:"-"`
}

func (f *FieldInfo) IsGenerated() bool {
if f.isGenerated == nil {
v := strings.Contains(f.Extra, "VIRTUAL GENERATED") ||
strings.Contains(f.Extra, "STORED GENERATED")
f.isGenerated = &v
}
return *f.isGenerated
}

// TableInfo 表结构.
// 表结构实现了快照功能,在表结构变更前,会复制快照,在快照上做变更
// 在解析binlog时,基于执行时的快照做binlog解析,以实现删除列时的binlog解析
Expand Down Expand Up @@ -238,6 +248,9 @@ type TableInfo struct {

// 字符集&排序规则
Collation string

// 有效列数,移除已删除列和生成列
effectiveFieldCount int
}

// IndexInfo 索引信息
Expand Down Expand Up @@ -277,6 +290,25 @@ type DBInfo struct {
IsNew bool
}

// EffectiveFieldCount 有效列数,移除已删除列和生成列
func (t *TableInfo) EffectiveFieldCount() (count int) {
if t == nil {
return
}

if t.effectiveFieldCount > 0 {
return t.effectiveFieldCount
}

for _, f := range t.Fields {
if !f.IsDeleted && !f.IsGenerated() {
count++
}
}
t.effectiveFieldCount = count
return
}

func (t *TableInfo) copy() *TableInfo {
p := &TableInfo{}

Expand Down
21 changes: 18 additions & 3 deletions session/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,12 +601,12 @@ func (s *session) generateInsertSql(t *TableInfo, e *replication.RowsEvent,
c := "`%s`"
template := "INSERT INTO `%s`.`%s`(%s) VALUES(%s)"
for i, col := range t.Fields {
if i < int(e.ColumnCount) {
if i < int(e.ColumnCount) && !col.IsGenerated() {
columnNames = append(columnNames, fmt.Sprintf(c, col.Field))
}
}

paramValues := strings.Repeat("?,", int(e.ColumnCount))
paramValues := strings.Repeat("?,", t.EffectiveFieldCount())
paramValues = strings.TrimRight(paramValues, ",")

sql := fmt.Sprintf(template, e.Table.Schema, e.Table.Table,
Expand All @@ -616,6 +616,9 @@ func (s *session) generateInsertSql(t *TableInfo, e *replication.RowsEvent,

var vv []driver.Value
for i, d := range rows {
if t.Fields[i].IsGenerated() {
continue
}
if t.Fields[i].isUnsigned() {
d = processValue(d, GetDataTypeBase(t.Fields[i].Type))
}
Expand Down Expand Up @@ -777,7 +780,7 @@ func (s *session) generateUpdateSql(t *TableInfo, e *replication.RowsEvent,
// 日志是minimal模式时, 只取有值的新列
// && uint8(e.ColumnBitmap2[i/8])&(1<<(uint(i)%8)) == uint8(e.ColumnBitmap2[i/8])

if i < int(e.ColumnCount) {
if i < int(e.ColumnCount) && !col.IsGenerated() {
sets = append(sets, fmt.Sprintf(setValue, col.Field))
}
}
Expand Down Expand Up @@ -815,6 +818,9 @@ func (s *session) generateUpdateSql(t *TableInfo, e *replication.RowsEvent,
}
// 最小化模式下,列如果相等则省略
if !equal {
if t.Fields[j].IsGenerated() {
continue
}
if t.Fields[j].isUnsigned() {
d = processValue(d, GetDataTypeBase(t.Fields[j].Type))
}
Expand All @@ -824,6 +830,9 @@ func (s *session) generateUpdateSql(t *TableInfo, e *replication.RowsEvent,
}
}
} else {
if t.Fields[j].IsGenerated() {
continue
}
if t.Fields[j].isUnsigned() {
d = processValue(d, GetDataTypeBase(t.Fields[j].Type))
}
Expand All @@ -836,6 +845,9 @@ func (s *session) generateUpdateSql(t *TableInfo, e *replication.RowsEvent,
for j, d := range rows {
if t.hasPrimary {
if _, ok := t.primarys[j]; ok {
if t.Fields[j].IsGenerated() {
continue
}
if t.Fields[j].isUnsigned() {
d = processValue(d, GetDataTypeBase(t.Fields[j].Type))
}
Expand All @@ -850,6 +862,9 @@ func (s *session) generateUpdateSql(t *TableInfo, e *replication.RowsEvent,
}
}
} else {
if t.Fields[j].IsGenerated() {
continue
}
if t.Fields[j].isUnsigned() {
d = processValue(d, GetDataTypeBase(t.Fields[j].Type))
}
Expand Down
7 changes: 7 additions & 0 deletions session/session_inception.go
Original file line number Diff line number Diff line change
Expand Up @@ -7544,6 +7544,13 @@ func (s *session) buildNewColumnToCache(t *TableInfo, field *ast.ColumnDef) *Fie
field.Tp.Flag |= mysql.OnUpdateNowFlag
case ast.ColumnOptionCollate:
c.Collation = op.StrValue
case ast.ColumnOptionGenerated:
if op.Stored {
c.Extra += "STORED"
} else {
c.Extra += "VIRTUAL"
}
c.Extra += " GENERATED"
}
}

Expand Down
37 changes: 37 additions & 0 deletions session/session_inception_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,3 +1055,40 @@ func (s *testSessionIncBackupSuite) TestEmptyUseDB(c *C) {

s.mustRunExec(c, `drop table if exists test_inc.t1;`)
}

func (s *testSessionIncBackupSuite) TestGenerateColumns(c *C) {
config.GetGlobalConfig().Inc.CheckColumnComment = false
config.GetGlobalConfig().Inc.CheckTableComment = false
config.GetGlobalConfig().Inc.EnableDropTable = true

sql := ""

// 删除后添加列
sql = "drop table if exists t1;"
s.mustRunExec(c, sql)

sql = `-- 创建测试表
CREATE TABLE t1 (
id int(10) unsigned NOT NULL comment "主键",
c1 int(10) NOT NULL default 1 comment "c1",
jdoc json DEFAULT NULL comment "json类型字段",
project varchar(30) GENERATED ALWAYS AS (json_unquote(json_extract(jdoc,'$."project"'))) VIRTUAL NOT NULL comment "json生成虚拟列",
c2 int(11) GENERATED ALWAYS AS ((c1 + 1)) STORED comment "json生成虚拟列"
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 comment '生成虚拟列表delete回滚测试';

-- 写入测试数据
INSERT INTO test_inc.t1(id,jdoc,c1) VALUES(1,'{"project": "goInception"}',100);

update test_inc.t1 set c1 = 200 where id > 0;

-- delete删除测试
DELETE FROM t1 WHERE id=1;`
s.mustRunBackup(c, sql)

s.assertRows(c, s.rows[1:],
"DROP TABLE `test_inc`.`t1`;",
"DELETE FROM `test_inc`.`t1` WHERE `id`=1 AND `c1`=100 AND `jdoc`='{\\\"project\\\":\\\"goInception\\\"}' AND `project`='goInception' AND `c2`=101;",
"UPDATE `test_inc`.`t1` SET `id`=1, `c1`=100, `jdoc`='{\\\"project\\\":\\\"goInception\\\"}' WHERE `id`=1 AND `c1`=200 AND `jdoc`='{\\\"project\\\":\\\"goInception\\\"}';",
"INSERT INTO `test_inc`.`t1`(`id`,`c1`,`jdoc`) VALUES(1,200,'{\\\"project\\\":\\\"goInception\\\"}');",
)
}