From a84facf52f78d0ba65ced4e39ccc007a925d275d Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 17 May 2021 13:51:40 +0530 Subject: [PATCH 01/17] added e2e and unit test for scatter_errors_as_warnings Signed-off-by: Harshit Gangal --- go/test/endtoend/vtgate/main_test.go | 8 +- go/test/endtoend/vtgate/misc_test.go | 41 +++++++++ go/vt/vtgate/engine/route.go | 4 +- go/vt/vtgate/executor_select_test.go | 92 +++++++++++++++++++ .../planbuilder/testdata/select_cases.txt | 10 +- 5 files changed, 149 insertions(+), 6 deletions(-) diff --git a/go/test/endtoend/vtgate/main_test.go b/go/test/endtoend/vtgate/main_test.go index e3c4817a800..3947809b41a 100644 --- a/go/test/endtoend/vtgate/main_test.go +++ b/go/test/endtoend/vtgate/main_test.go @@ -419,10 +419,14 @@ func TestMain(m *testing.M) { } // Start vtgate - err = clusterInstance.StartVtgate() - if err != nil { + vtgateInstance := clusterInstance.NewVtgateInstance() + vtgateInstance.TabletTypesToWait = "MASTER,REPLICA,RDONLY" + if err := vtgateInstance.Setup(); err != nil { return 1 } + // ensure it is torn down during cluster TearDown + clusterInstance.VtgateProcess = *vtgateInstance + vtParams = mysql.ConnParams{ Host: clusterInstance.Hostname, Port: clusterInstance.VtgateMySQLPort, diff --git a/go/test/endtoend/vtgate/misc_test.go b/go/test/endtoend/vtgate/misc_test.go index 5ce8ece4f57..80fa206d0ee 100644 --- a/go/test/endtoend/vtgate/misc_test.go +++ b/go/test/endtoend/vtgate/misc_test.go @@ -640,6 +640,47 @@ func TestShowGtid(t *testing.T) { } } +func TestScatterErrsAsWarns(t *testing.T) { + conn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + defer conn.Close() + + exec(t, conn, `insert into t1(id1, id2) values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)`) + defer func() { + exec(t, conn, "use @master") + exec(t, conn, `delete from t1`) + }() + + exec(t, conn, "use `ks:-80@rdonly`") + qrM80 := exec(t, conn, `select id1 from t1`) + assert.EqualValues(t, 4, len(qrM80.Rows)) + + exec(t, conn, "use `ks:80-@rdonly`") + qr80M := exec(t, conn, `select id1 from t1`) + assert.EqualValues(t, 1, len(qr80M.Rows)) + + exec(t, conn, `use ks@rdonly`) + qrT := exec(t, conn, `select id1 from t1`) + assert.EqualValues(t, 5, len(qrT.Rows)) + + rdonly := clusterInstance.Keyspaces[0].Shards[0].Rdonly() + err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonly.Alias, "spare") + require.NoError(t, err) + defer clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonly.Alias, "rdonly") + + for _, mode := range []string{"oltp", "olap"} { + t.Run(mode, func(t *testing.T) { + exec(t, conn, fmt.Sprintf("set workload = %s", mode)) + + _, err = conn.ExecuteFetch(`select id1 from t1`, 1000, true) + require.Error(t, err) + + qr := exec(t, conn, `select /*vt+ SCATTER_ERRORS_AS_WARNINGS */ id1 from t1`) + assert.EqualValues(t, 1, len(qr.Rows)) + }) + } +} + func TestQueryAndSubQWithLimit(t *testing.T) { conn, err := mysql.Connect(context.Background(), &vtParams) require.NoError(t, err) diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index c8e2a40cdd9..b7696e1fc6e 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -801,7 +801,9 @@ func (route *Route) description() PrimitiveDescription { if route.TruncateColumnCount > 0 { other["ResultColumns"] = route.TruncateColumnCount } - + if route.ScatterErrorsAsWarnings { + other["ScatterErrorsAsWarnings"] = true + } return PrimitiveDescription{ OperatorType: "Route", Variant: routeName[route.Opcode], diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index dfad656d9fc..30179ebdc5f 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -1139,6 +1139,98 @@ func TestSelectScatterPartial(t *testing.T) { testQueryLog(t, logChan, "TestExecute", "SELECT", "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user", 8) } +func TestSelectScatterPartialOLAP(t *testing.T) { + // Special setup: Don't use createLegacyExecutorEnv. + cell := "aa" + hc := discovery.NewFakeLegacyHealthCheck() + s := createSandbox("TestExecutor") + s.VSchema = executorVSchema + getSandbox(KsTestUnsharded).VSchema = unshardedVSchema + serv := new(sandboxTopo) + resolver := newTestLegacyResolver(hc, serv, cell) + shards := []string{"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-"} + var conns []*sandboxconn.SandboxConn + for _, shard := range shards { + sbc := hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_MASTER, true, 1, nil) + conns = append(conns, sbc) + } + + executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig) + logChan := QueryLogger.Subscribe("Test") + defer QueryLogger.Unsubscribe(logChan) + + // Fail 1 of N without the directive fails the whole operation + conns[2].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 + results, err := executorStream(executor, "select id from user") + assert.EqualError(t, err, "target: TestExecutor.40-60.master: RESOURCE_EXHAUSTED error") + assert.Equal(t, vtrpcpb.Code_RESOURCE_EXHAUSTED, vterrors.Code(err)) + assert.Nil(t, results) + testQueryLog(t, logChan, "TestExecuteStream", "SELECT", "select id from user", 8) + + // Fail 1 of N with the directive succeeds with 7 rows + results, err = executorStream(executor, "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user") + require.NoError(t, err) + assert.EqualValues(t, 7, len(results.Rows)) + testQueryLog(t, logChan, "TestExecuteStream", "SELECT", "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user", 8) + + // Even if all shards fail the operation succeeds with 0 rows + conns[0].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 + conns[1].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 + conns[3].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 + conns[4].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 + conns[5].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 + conns[6].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 + conns[7].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 + + results, err = executorStream(executor, "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user") + require.NoError(t, err) + require.Empty(t, results.Rows) + testQueryLog(t, logChan, "TestExecuteStream", "SELECT", "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user", 8) +} + +func TestSelectScatterPartialOLAP2(t *testing.T) { + // Special setup: Don't use createLegacyExecutorEnv. + cell := "aa" + hc := discovery.NewFakeHealthCheck() + s := createSandbox("TestExecutor") + s.VSchema = executorVSchema + getSandbox(KsTestUnsharded).VSchema = unshardedVSchema + serv := new(sandboxTopo) + resolver := newTestResolver(hc, serv, cell) + shards := []string{"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-"} + var conns []*sandboxconn.SandboxConn + for _, shard := range shards { + sbc := hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_MASTER, true, 1, nil) + conns = append(conns, sbc) + } + + executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig) + logChan := QueryLogger.Subscribe("Test") + defer QueryLogger.Unsubscribe(logChan) + + // Fail 1 of N without the directive fails the whole operation + tablet0 := conns[2].Tablet() + ths := hc.GetHealthyTabletStats(&querypb.Target{ + Keyspace: tablet0.GetKeyspace(), + Shard: tablet0.GetShard(), + TabletType: tablet0.GetType(), + }) + sbc0Th := ths[0] + sbc0Th.Serving = false + + results, err := executorStream(executor, "select id from user") + assert.EqualError(t, err, `target: TestExecutor.40-60.master: no healthy tablet available for 'keyspace:"TestExecutor" shard:"40-60" tablet_type:MASTER '`) + assert.Equal(t, vtrpcpb.Code_UNAVAILABLE, vterrors.Code(err)) + assert.Nil(t, results) + testQueryLog(t, logChan, "TestExecuteStream", "SELECT", "select id from user", 8) + + // Fail 1 of N with the directive succeeds with 7 rows + results, err = executorStream(executor, "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user") + require.NoError(t, err) + assert.EqualValues(t, 7, len(results.Rows)) + testQueryLog(t, logChan, "TestExecuteStream", "SELECT", "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user", 8) +} + func TestStreamSelectScatter(t *testing.T) { // Special setup: Don't use createLegacyExecutorEnv. cell := "aa" diff --git a/go/vt/vtgate/planbuilder/testdata/select_cases.txt b/go/vt/vtgate/planbuilder/testdata/select_cases.txt index ea3ad14b0a8..2b82ab9f813 100644 --- a/go/vt/vtgate/planbuilder/testdata/select_cases.txt +++ b/go/vt/vtgate/planbuilder/testdata/select_cases.txt @@ -121,10 +121,10 @@ Gen4 plan same as above } # select with partial scatter directive -"select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ * from user" +"select /*vt+ SCATTER_ERRORS_AS_WARNINGS */ * from user" { "QueryType": "SELECT", - "Original": "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ * from user", + "Original": "select /*vt+ SCATTER_ERRORS_AS_WARNINGS */ * from user", "Instructions": { "OperatorType": "Route", "Variant": "SelectScatter", @@ -133,7 +133,8 @@ Gen4 plan same as above "Sharded": true }, "FieldQuery": "select * from `user` where 1 != 1", - "Query": "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ * from `user`", + "Query": "select /*vt+ SCATTER_ERRORS_AS_WARNINGS */ * from `user`", + "ScatterErrorsAsWarnings": true, "Table": "`user`" } } @@ -157,6 +158,7 @@ Gen4 plan same as above }, "FieldQuery": "select count(*) from `user` where 1 != 1", "Query": "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ count(*) from `user`", + "ScatterErrorsAsWarnings": true, "Table": "`user`" } ] @@ -182,6 +184,7 @@ Gen4 plan same as above }, "FieldQuery": "select count(*) from `user` where 1 != 1", "Query": "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ count(*) from `user`", + "ScatterErrorsAsWarnings": true, "Table": "`user`" } ] @@ -206,6 +209,7 @@ Gen4 plan same as above }, "FieldQuery": "select * from `user` where 1 != 1", "Query": "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ * from `user` limit :__upper_limit", + "ScatterErrorsAsWarnings": true, "Table": "`user`" } ] From d7d79bd6b8076c96d69e827f7d433278ce2cc137 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Mon, 17 May 2021 11:27:19 +0200 Subject: [PATCH 02/17] rewrote test Signed-off-by: Andres Taylor --- .../vtgate/errors_as_warnings/main_test.go | 483 ++++++++++++++++++ go/test/endtoend/vtgate/misc_test.go | 41 -- 2 files changed, 483 insertions(+), 41 deletions(-) create mode 100644 go/test/endtoend/vtgate/errors_as_warnings/main_test.go diff --git a/go/test/endtoend/vtgate/errors_as_warnings/main_test.go b/go/test/endtoend/vtgate/errors_as_warnings/main_test.go new file mode 100644 index 00000000000..4e68540e825 --- /dev/null +++ b/go/test/endtoend/vtgate/errors_as_warnings/main_test.go @@ -0,0 +1,483 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtgate + +import ( + "context" + "flag" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + KeyspaceName = "ks" + Cell = "test" + SchemaSQL = `create table t1( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; + +create table t1_id2_idx( + id2 bigint, + keyspace_id varbinary(10), + primary key(id2) +) Engine=InnoDB; + +create table vstream_test( + id bigint, + val bigint, + primary key(id) +) Engine=InnoDB; + +create table aggr_test( + id bigint, + val1 varchar(16), + val2 bigint, + primary key(id) +) Engine=InnoDB; + +create table t2( + id3 bigint, + id4 bigint, + primary key(id3) +) Engine=InnoDB; + +create table t2_id4_idx( + id bigint not null auto_increment, + id4 bigint, + id3 bigint, + primary key(id), + key idx_id4(id4) +) Engine=InnoDB; + +create table t3( + id5 bigint, + id6 bigint, + id7 bigint, + primary key(id5) +) Engine=InnoDB; + +create table t3_id7_idx( + id bigint not null auto_increment, + id7 bigint, + id6 bigint, + primary key(id) +) Engine=InnoDB; + +create table t4( + id1 bigint, + id2 varchar(10), + primary key(id1) +) ENGINE=InnoDB DEFAULT charset=utf8mb4 COLLATE=utf8mb4_general_ci; + +create table t4_id2_idx( + id2 varchar(10), + id1 bigint, + keyspace_id varbinary(50), + primary key(id2, id1) +) Engine=InnoDB DEFAULT charset=utf8mb4 COLLATE=utf8mb4_general_ci; + +create table t5_null_vindex( + id bigint not null, + idx varchar(50), + primary key(id) +) Engine=InnoDB; + +create table t6( + id1 bigint, + id2 varchar(10), + primary key(id1) +) Engine=InnoDB; + +create table t6_id2_idx( + id2 varchar(10), + id1 bigint, + keyspace_id varbinary(50), + primary key(id1), + key(id2) +) Engine=InnoDB; + +create table t7_xxhash( + uid varchar(50), + phone bigint, + msg varchar(100), + primary key(uid) +) Engine=InnoDB; + +create table t7_xxhash_idx( + phone bigint, + keyspace_id varbinary(50), + primary key(phone, keyspace_id) +) Engine=InnoDB; + +create table t7_fk( + id bigint, + t7_uid varchar(50), + primary key(id), + CONSTRAINT t7_fk_ibfk_1 foreign key (t7_uid) references t7_xxhash(uid) + on delete set null on update cascade +) Engine=InnoDB; +` + + VSchema = ` +{ + "sharded": true, + "vindexes": { + "unicode_loose_xxhash" : { + "type": "unicode_loose_xxhash" + }, + "unicode_loose_md5" : { + "type": "unicode_loose_md5" + }, + "hash": { + "type": "hash" + }, + "xxhash": { + "type": "xxhash" + }, + "t1_id2_vdx": { + "type": "consistent_lookup_unique", + "params": { + "table": "t1_id2_idx", + "from": "id2", + "to": "keyspace_id" + }, + "owner": "t1" + }, + "t2_id4_idx": { + "type": "lookup_hash", + "params": { + "table": "t2_id4_idx", + "from": "id4", + "to": "id3", + "autocommit": "true" + }, + "owner": "t2" + }, + "t3_id7_vdx": { + "type": "lookup_hash", + "params": { + "table": "t3_id7_idx", + "from": "id7", + "to": "id6" + }, + "owner": "t3" + }, + "t4_id2_vdx": { + "type": "consistent_lookup", + "params": { + "table": "t4_id2_idx", + "from": "id2,id1", + "to": "keyspace_id" + }, + "owner": "t4" + }, + "t6_id2_vdx": { + "type": "consistent_lookup", + "params": { + "table": "t6_id2_idx", + "from": "id2,id1", + "to": "keyspace_id", + "ignore_nulls": "true" + }, + "owner": "t6" + }, + "t7_xxhash_vdx": { + "type": "consistent_lookup", + "params": { + "table": "t7_xxhash_idx", + "from": "phone", + "to": "keyspace_id", + "ignore_nulls": "true" + }, + "owner": "t7_xxhash" + } + }, + "tables": { + "t1": { + "column_vindexes": [ + { + "column": "id1", + "name": "hash" + }, + { + "column": "id2", + "name": "t1_id2_vdx" + } + ] + }, + "t1_id2_idx": { + "column_vindexes": [ + { + "column": "id2", + "name": "hash" + } + ] + }, + "t2": { + "column_vindexes": [ + { + "column": "id3", + "name": "hash" + }, + { + "column": "id4", + "name": "t2_id4_idx" + } + ] + }, + "t2_id4_idx": { + "column_vindexes": [ + { + "column": "id4", + "name": "hash" + } + ] + }, + "t3": { + "column_vindexes": [ + { + "column": "id6", + "name": "hash" + }, + { + "column": "id7", + "name": "t3_id7_vdx" + } + ] + }, + "t3_id7_idx": { + "column_vindexes": [ + { + "column": "id7", + "name": "hash" + } + ] + }, + "t4": { + "column_vindexes": [ + { + "column": "id1", + "name": "hash" + }, + { + "columns": ["id2", "id1"], + "name": "t4_id2_vdx" + } + ] + }, + "t4_id2_idx": { + "column_vindexes": [ + { + "column": "id2", + "name": "unicode_loose_md5" + } + ] + }, + "t6": { + "column_vindexes": [ + { + "column": "id1", + "name": "hash" + }, + { + "columns": ["id2", "id1"], + "name": "t6_id2_vdx" + } + ] + }, + "t6_id2_idx": { + "column_vindexes": [ + { + "column": "id2", + "name": "xxhash" + } + ] + }, + "t5_null_vindex": { + "column_vindexes": [ + { + "column": "idx", + "name": "xxhash" + } + ] + }, + "vstream_test": { + "column_vindexes": [ + { + "column": "id", + "name": "hash" + } + ] + }, + "aggr_test": { + "column_vindexes": [ + { + "column": "id", + "name": "hash" + } + ], + "columns": [ + { + "name": "val1", + "type": "VARCHAR" + } + ] + }, + "t7_xxhash": { + "column_vindexes": [ + { + "column": "uid", + "name": "unicode_loose_xxhash" + }, + { + "column": "phone", + "name": "t7_xxhash_vdx" + } + ] + }, + "t7_xxhash_idx": { + "column_vindexes": [ + { + "column": "phone", + "name": "unicode_loose_xxhash" + } + ] + }, + "t7_fk": { + "column_vindexes": [ + { + "column": "t7_uid", + "name": "unicode_loose_xxhash" + } + ] + } + } +}` + routingRules = ` +{"rules": [ + { + "from_table": "ks.t1000", + "to_tables": ["ks.t1"] + } +]} +` +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(Cell, "localhost") + defer clusterInstance.Teardown() + + // Start topo server + err := clusterInstance.StartTopo() + if err != nil { + return 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: KeyspaceName, + SchemaSQL: SchemaSQL, + VSchema: VSchema, + } + err = clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, true) + if err != nil { + return 1 + } + + err = clusterInstance.VtctlclientProcess.ApplyRoutingRules(routingRules) + if err != nil { + return 1 + } + + err = clusterInstance.VtctlclientProcess.ExecuteCommand("RebuildVSchemaGraph") + if err != nil { + return 1 + } + + // Start vtgate + vtgateInstance := clusterInstance.NewVtgateInstance() + vtgateInstance.TabletTypesToWait = "MASTER,REPLICA,RDONLY" + if err := vtgateInstance.Setup(); err != nil { + return 1 + } + // ensure it is torn down during cluster TearDown + clusterInstance.VtgateProcess = *vtgateInstance + + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + return m.Run() + }() + os.Exit(exitCode) +} + +func TestScatterErrsAsWarns(t *testing.T) { + oltp, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + defer oltp.Close() + + olap, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + defer olap.Close() + exec(t, olap, "set workload = olap") + exec(t, olap, "use @replica") + + exec(t, oltp, "use @master") + exec(t, oltp, `insert into t1(id1, id2) values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)`) + exec(t, oltp, "set workload = oltp") + exec(t, oltp, "use @replica") + + defer func() { + exec(t, oltp, "use @master") + exec(t, oltp, `delete from t1`) + }() + + require.NoError(t, // stop one tablet from the first shard + clusterInstance.Keyspaces[0].Shards[0].Replica().MysqlctlProcess.Stop()) + + query := `select /*vt+ SCATTER_ERRORS_AS_WARNINGS */ id1 from t1` + qr, err := oltp.ExecuteFetch(query, 1000, true) + assert.NoError(t, err) + assert.NotEmpty(t, qr.Rows) + + qr, err = olap.ExecuteFetch(query, 1000, true) + require.NoError(t, err) + assert.NotEmpty(t, qr.Rows) +} + +func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { + t.Helper() + qr, err := conn.ExecuteFetch(query, 1000, true) + require.NoError(t, err, "for query: "+query) + return qr +} diff --git a/go/test/endtoend/vtgate/misc_test.go b/go/test/endtoend/vtgate/misc_test.go index 80fa206d0ee..5ce8ece4f57 100644 --- a/go/test/endtoend/vtgate/misc_test.go +++ b/go/test/endtoend/vtgate/misc_test.go @@ -640,47 +640,6 @@ func TestShowGtid(t *testing.T) { } } -func TestScatterErrsAsWarns(t *testing.T) { - conn, err := mysql.Connect(context.Background(), &vtParams) - require.NoError(t, err) - defer conn.Close() - - exec(t, conn, `insert into t1(id1, id2) values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)`) - defer func() { - exec(t, conn, "use @master") - exec(t, conn, `delete from t1`) - }() - - exec(t, conn, "use `ks:-80@rdonly`") - qrM80 := exec(t, conn, `select id1 from t1`) - assert.EqualValues(t, 4, len(qrM80.Rows)) - - exec(t, conn, "use `ks:80-@rdonly`") - qr80M := exec(t, conn, `select id1 from t1`) - assert.EqualValues(t, 1, len(qr80M.Rows)) - - exec(t, conn, `use ks@rdonly`) - qrT := exec(t, conn, `select id1 from t1`) - assert.EqualValues(t, 5, len(qrT.Rows)) - - rdonly := clusterInstance.Keyspaces[0].Shards[0].Rdonly() - err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonly.Alias, "spare") - require.NoError(t, err) - defer clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonly.Alias, "rdonly") - - for _, mode := range []string{"oltp", "olap"} { - t.Run(mode, func(t *testing.T) { - exec(t, conn, fmt.Sprintf("set workload = %s", mode)) - - _, err = conn.ExecuteFetch(`select id1 from t1`, 1000, true) - require.Error(t, err) - - qr := exec(t, conn, `select /*vt+ SCATTER_ERRORS_AS_WARNINGS */ id1 from t1`) - assert.EqualValues(t, 1, len(qr.Rows)) - }) - } -} - func TestQueryAndSubQWithLimit(t *testing.T) { conn, err := mysql.Connect(context.Background(), &vtParams) require.NoError(t, err) From 312d51d897bee7aab9acbf66f9f790cf8b0c713e Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 17 May 2021 19:18:11 +0530 Subject: [PATCH 03/17] minimize the test setup Signed-off-by: Harshit Gangal --- .../vtgate/errors_as_warnings/main_test.go | 342 +----------------- 1 file changed, 3 insertions(+), 339 deletions(-) diff --git a/go/test/endtoend/vtgate/errors_as_warnings/main_test.go b/go/test/endtoend/vtgate/errors_as_warnings/main_test.go index 4e68540e825..ee6425110e2 100644 --- a/go/test/endtoend/vtgate/errors_as_warnings/main_test.go +++ b/go/test/endtoend/vtgate/errors_as_warnings/main_test.go @@ -39,182 +39,14 @@ var ( id1 bigint, id2 bigint, primary key(id1) -) Engine=InnoDB; - -create table t1_id2_idx( - id2 bigint, - keyspace_id varbinary(10), - primary key(id2) -) Engine=InnoDB; - -create table vstream_test( - id bigint, - val bigint, - primary key(id) -) Engine=InnoDB; - -create table aggr_test( - id bigint, - val1 varchar(16), - val2 bigint, - primary key(id) -) Engine=InnoDB; - -create table t2( - id3 bigint, - id4 bigint, - primary key(id3) -) Engine=InnoDB; - -create table t2_id4_idx( - id bigint not null auto_increment, - id4 bigint, - id3 bigint, - primary key(id), - key idx_id4(id4) -) Engine=InnoDB; - -create table t3( - id5 bigint, - id6 bigint, - id7 bigint, - primary key(id5) -) Engine=InnoDB; - -create table t3_id7_idx( - id bigint not null auto_increment, - id7 bigint, - id6 bigint, - primary key(id) -) Engine=InnoDB; - -create table t4( - id1 bigint, - id2 varchar(10), - primary key(id1) -) ENGINE=InnoDB DEFAULT charset=utf8mb4 COLLATE=utf8mb4_general_ci; - -create table t4_id2_idx( - id2 varchar(10), - id1 bigint, - keyspace_id varbinary(50), - primary key(id2, id1) -) Engine=InnoDB DEFAULT charset=utf8mb4 COLLATE=utf8mb4_general_ci; - -create table t5_null_vindex( - id bigint not null, - idx varchar(50), - primary key(id) -) Engine=InnoDB; - -create table t6( - id1 bigint, - id2 varchar(10), - primary key(id1) -) Engine=InnoDB; - -create table t6_id2_idx( - id2 varchar(10), - id1 bigint, - keyspace_id varbinary(50), - primary key(id1), - key(id2) -) Engine=InnoDB; - -create table t7_xxhash( - uid varchar(50), - phone bigint, - msg varchar(100), - primary key(uid) -) Engine=InnoDB; - -create table t7_xxhash_idx( - phone bigint, - keyspace_id varbinary(50), - primary key(phone, keyspace_id) -) Engine=InnoDB; - -create table t7_fk( - id bigint, - t7_uid varchar(50), - primary key(id), - CONSTRAINT t7_fk_ibfk_1 foreign key (t7_uid) references t7_xxhash(uid) - on delete set null on update cascade -) Engine=InnoDB; -` +) Engine=InnoDB;` VSchema = ` { "sharded": true, "vindexes": { - "unicode_loose_xxhash" : { - "type": "unicode_loose_xxhash" - }, - "unicode_loose_md5" : { - "type": "unicode_loose_md5" - }, - "hash": { - "type": "hash" - }, "xxhash": { "type": "xxhash" - }, - "t1_id2_vdx": { - "type": "consistent_lookup_unique", - "params": { - "table": "t1_id2_idx", - "from": "id2", - "to": "keyspace_id" - }, - "owner": "t1" - }, - "t2_id4_idx": { - "type": "lookup_hash", - "params": { - "table": "t2_id4_idx", - "from": "id4", - "to": "id3", - "autocommit": "true" - }, - "owner": "t2" - }, - "t3_id7_vdx": { - "type": "lookup_hash", - "params": { - "table": "t3_id7_idx", - "from": "id7", - "to": "id6" - }, - "owner": "t3" - }, - "t4_id2_vdx": { - "type": "consistent_lookup", - "params": { - "table": "t4_id2_idx", - "from": "id2,id1", - "to": "keyspace_id" - }, - "owner": "t4" - }, - "t6_id2_vdx": { - "type": "consistent_lookup", - "params": { - "table": "t6_id2_idx", - "from": "id2,id1", - "to": "keyspace_id", - "ignore_nulls": "true" - }, - "owner": "t6" - }, - "t7_xxhash_vdx": { - "type": "consistent_lookup", - "params": { - "table": "t7_xxhash_idx", - "from": "phone", - "to": "keyspace_id", - "ignore_nulls": "true" - }, - "owner": "t7_xxhash" } }, "tables": { @@ -222,170 +54,12 @@ create table t7_fk( "column_vindexes": [ { "column": "id1", - "name": "hash" - }, - { - "column": "id2", - "name": "t1_id2_vdx" - } - ] - }, - "t1_id2_idx": { - "column_vindexes": [ - { - "column": "id2", - "name": "hash" - } - ] - }, - "t2": { - "column_vindexes": [ - { - "column": "id3", - "name": "hash" - }, - { - "column": "id4", - "name": "t2_id4_idx" - } - ] - }, - "t2_id4_idx": { - "column_vindexes": [ - { - "column": "id4", - "name": "hash" - } - ] - }, - "t3": { - "column_vindexes": [ - { - "column": "id6", - "name": "hash" - }, - { - "column": "id7", - "name": "t3_id7_vdx" - } - ] - }, - "t3_id7_idx": { - "column_vindexes": [ - { - "column": "id7", - "name": "hash" - } - ] - }, - "t4": { - "column_vindexes": [ - { - "column": "id1", - "name": "hash" - }, - { - "columns": ["id2", "id1"], - "name": "t4_id2_vdx" - } - ] - }, - "t4_id2_idx": { - "column_vindexes": [ - { - "column": "id2", - "name": "unicode_loose_md5" - } - ] - }, - "t6": { - "column_vindexes": [ - { - "column": "id1", - "name": "hash" - }, - { - "columns": ["id2", "id1"], - "name": "t6_id2_vdx" - } - ] - }, - "t6_id2_idx": { - "column_vindexes": [ - { - "column": "id2", "name": "xxhash" } ] - }, - "t5_null_vindex": { - "column_vindexes": [ - { - "column": "idx", - "name": "xxhash" - } - ] - }, - "vstream_test": { - "column_vindexes": [ - { - "column": "id", - "name": "hash" - } - ] - }, - "aggr_test": { - "column_vindexes": [ - { - "column": "id", - "name": "hash" - } - ], - "columns": [ - { - "name": "val1", - "type": "VARCHAR" - } - ] - }, - "t7_xxhash": { - "column_vindexes": [ - { - "column": "uid", - "name": "unicode_loose_xxhash" - }, - { - "column": "phone", - "name": "t7_xxhash_vdx" - } - ] - }, - "t7_xxhash_idx": { - "column_vindexes": [ - { - "column": "phone", - "name": "unicode_loose_xxhash" - } - ] - }, - "t7_fk": { - "column_vindexes": [ - { - "column": "t7_uid", - "name": "unicode_loose_xxhash" - } - ] } } }` - routingRules = ` -{"rules": [ - { - "from_table": "ks.t1000", - "to_tables": ["ks.t1"] - } -]} -` ) func TestMain(m *testing.M) { @@ -408,17 +82,7 @@ func TestMain(m *testing.M) { SchemaSQL: SchemaSQL, VSchema: VSchema, } - err = clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, true) - if err != nil { - return 1 - } - - err = clusterInstance.VtctlclientProcess.ApplyRoutingRules(routingRules) - if err != nil { - return 1 - } - - err = clusterInstance.VtctlclientProcess.ExecuteCommand("RebuildVSchemaGraph") + err = clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false) if err != nil { return 1 } @@ -467,7 +131,7 @@ func TestScatterErrsAsWarns(t *testing.T) { query := `select /*vt+ SCATTER_ERRORS_AS_WARNINGS */ id1 from t1` qr, err := oltp.ExecuteFetch(query, 1000, true) - assert.NoError(t, err) + require.NoError(t, err) assert.NotEmpty(t, qr.Rows) qr, err = olap.ExecuteFetch(query, 1000, true) From 5c24a3e91012420716f7f01b8b0c66477d3a9f0e Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 17 May 2021 19:34:08 +0530 Subject: [PATCH 04/17] add test to CI Signed-off-by: Harshit Gangal --- test/config.json | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/test/config.json b/test/config.json index c309b9df21c..36b315882ce 100644 --- a/test/config.json +++ b/test/config.json @@ -696,6 +696,15 @@ "RetryMax": 0, "Tags": [] }, + "errs_as_warns": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/errors_as_warnings"], + "Command": [], + "Manual": false, + "Shard": "17", + "RetryMax": 0, + "Tags": [] + }, "web_test": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vtctldweb"], From f4f407dc05e8b7a253e8ff5f325ace093471253f Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 17 May 2021 20:29:44 +0530 Subject: [PATCH 05/17] added test expectations Signed-off-by: Harshit Gangal --- .../vtgate/errors_as_warnings/main_test.go | 71 +++++++++++-------- test/config.json | 2 +- 2 files changed, 43 insertions(+), 30 deletions(-) diff --git a/go/test/endtoend/vtgate/errors_as_warnings/main_test.go b/go/test/endtoend/vtgate/errors_as_warnings/main_test.go index ee6425110e2..f35417d10cd 100644 --- a/go/test/endtoend/vtgate/errors_as_warnings/main_test.go +++ b/go/test/endtoend/vtgate/errors_as_warnings/main_test.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Vitess Authors. +Copyright 2021 The Vitess Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -19,9 +19,12 @@ package vtgate import ( "context" "flag" + "fmt" "os" "testing" + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -45,8 +48,8 @@ var ( { "sharded": true, "vindexes": { - "xxhash": { - "type": "xxhash" + "hash": { + "type": "hash" } }, "tables": { @@ -54,7 +57,7 @@ var ( "column_vindexes": [ { "column": "id1", - "name": "xxhash" + "name": "hash" } ] } @@ -82,19 +85,14 @@ func TestMain(m *testing.M) { SchemaSQL: SchemaSQL, VSchema: VSchema, } - err = clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false) - if err != nil { + if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil { return 1 } // Start vtgate - vtgateInstance := clusterInstance.NewVtgateInstance() - vtgateInstance.TabletTypesToWait = "MASTER,REPLICA,RDONLY" - if err := vtgateInstance.Setup(); err != nil { + if err := clusterInstance.StartVtgate(); err != nil { return 1 } - // ensure it is torn down during cluster TearDown - clusterInstance.VtgateProcess = *vtgateInstance vtParams = mysql.ConnParams{ Host: clusterInstance.Hostname, @@ -113,35 +111,50 @@ func TestScatterErrsAsWarns(t *testing.T) { olap, err := mysql.Connect(context.Background(), &vtParams) require.NoError(t, err) defer olap.Close() - exec(t, olap, "set workload = olap") - exec(t, olap, "use @replica") - - exec(t, oltp, "use @master") - exec(t, oltp, `insert into t1(id1, id2) values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)`) - exec(t, oltp, "set workload = oltp") - exec(t, oltp, "use @replica") + checkedExec(t, oltp, `insert into t1(id1, id2) values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)`) defer func() { - exec(t, oltp, "use @master") - exec(t, oltp, `delete from t1`) + checkedExec(t, oltp, "use @master") + checkedExec(t, oltp, `delete from t1`) }() - require.NoError(t, // stop one tablet from the first shard + // connection setup + checkedExec(t, oltp, "use @replica") + checkedExec(t, oltp, "set workload = oltp") + checkedExec(t, olap, "use @replica") + checkedExec(t, olap, "set workload = olap") + + // stop one tablet from the first shard + require.NoError(t, clusterInstance.Keyspaces[0].Shards[0].Replica().MysqlctlProcess.Stop()) query := `select /*vt+ SCATTER_ERRORS_AS_WARNINGS */ id1 from t1` - qr, err := oltp.ExecuteFetch(query, 1000, true) - require.NoError(t, err) - assert.NotEmpty(t, qr.Rows) - qr, err = olap.ExecuteFetch(query, 1000, true) - require.NoError(t, err) - assert.NotEmpty(t, qr.Rows) + assertMatches(t, oltp, query, `[[INT64(4)]]`) + assertMatches(t, olap, query, `[[INT64(4)]]`) + + // change tablet type + assert.NoError(t, + clusterInstance.VtctlclientProcess.ExecuteCommand( + "ChangeTabletType", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "spare")) + + assertMatches(t, oltp, query, `[[INT64(4)]]`) + assertMatches(t, olap, query, `[[INT64(4)]]`) } -func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { +func checkedExec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { t.Helper() qr, err := conn.ExecuteFetch(query, 1000, true) - require.NoError(t, err, "for query: "+query) + require.NoError(t, err) return qr } + +func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) { + t.Helper() + qr := checkedExec(t, conn, query) + got := fmt.Sprintf("%v", qr.Rows) + diff := cmp.Diff(expected, got) + if diff != "" { + t.Errorf("Query: %s (-want +got):\n%s", query, diff) + } +} diff --git a/test/config.json b/test/config.json index 36b315882ce..430c42a421a 100644 --- a/test/config.json +++ b/test/config.json @@ -698,7 +698,7 @@ }, "errs_as_warns": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/errors_as_warnings"], + "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/errors_as_warnings"], "Command": [], "Manual": false, "Shard": "17", From 3475620ee980d1e937a287ceb3456ca42515e256 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 17 May 2021 21:58:26 +0530 Subject: [PATCH 06/17] handle merge sort in streaming to log errors as warns if set Signed-off-by: Harshit Gangal --- .../vtgate/errors_as_warnings/main_test.go | 15 ++++++---- go/vt/vtgate/engine/merge_sort.go | 30 +++++++++++++++---- go/vt/vtgate/engine/route.go | 5 ++-- go/vt/vtgate/executor_select_test.go | 12 ++++++++ 4 files changed, 49 insertions(+), 13 deletions(-) diff --git a/go/test/endtoend/vtgate/errors_as_warnings/main_test.go b/go/test/endtoend/vtgate/errors_as_warnings/main_test.go index f35417d10cd..77169f9efc7 100644 --- a/go/test/endtoend/vtgate/errors_as_warnings/main_test.go +++ b/go/test/endtoend/vtgate/errors_as_warnings/main_test.go @@ -128,18 +128,23 @@ func TestScatterErrsAsWarns(t *testing.T) { require.NoError(t, clusterInstance.Keyspaces[0].Shards[0].Replica().MysqlctlProcess.Stop()) - query := `select /*vt+ SCATTER_ERRORS_AS_WARNINGS */ id1 from t1` + query1 := `select /*vt+ SCATTER_ERRORS_AS_WARNINGS */ id1 from t1` + query2 := `select /*vt+ SCATTER_ERRORS_AS_WARNINGS */ id1 from t1 order by id1` - assertMatches(t, oltp, query, `[[INT64(4)]]`) - assertMatches(t, olap, query, `[[INT64(4)]]`) + assertMatches(t, oltp, query1, `[[INT64(4)]]`) + assertMatches(t, olap, query1, `[[INT64(4)]]`) + assertMatches(t, oltp, query2, `[[INT64(4)]]`) + assertMatches(t, olap, query2, `[[INT64(4)]]`) // change tablet type assert.NoError(t, clusterInstance.VtctlclientProcess.ExecuteCommand( "ChangeTabletType", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "spare")) - assertMatches(t, oltp, query, `[[INT64(4)]]`) - assertMatches(t, olap, query, `[[INT64(4)]]`) + assertMatches(t, oltp, query1, `[[INT64(4)]]`) + assertMatches(t, olap, query1, `[[INT64(4)]]`) + assertMatches(t, oltp, query2, `[[INT64(4)]]`) + assertMatches(t, olap, query2, `[[INT64(4)]]`) } func checkedExec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { diff --git a/go/vt/vtgate/engine/merge_sort.go b/go/vt/vtgate/engine/merge_sort.go index a93b2ba4515..476cb8e4e1f 100644 --- a/go/vt/vtgate/engine/merge_sort.go +++ b/go/vt/vtgate/engine/merge_sort.go @@ -47,8 +47,9 @@ var _ Primitive = (*MergeSort)(nil) // be used like other Primitives in VTGate. However, it satisfies the Primitive API // so that vdiff can use it. In that situation, only StreamExecute is used. type MergeSort struct { - Primitives []StreamExecutor - OrderBy []OrderbyParams + Primitives []StreamExecutor + OrderBy []OrderbyParams + ScatterErrorsAsWarnings bool noInputs noTxNeeded } @@ -84,12 +85,24 @@ func (ms *MergeSort) StreamExecute(vcursor VCursor, bindVars map[string]*querypb wantfields = false } - // Fetch field info from just one stream. - fields := <-handles[0].fields - // If fields is nil, it means there was an error. + var fields []*querypb.Field + if ms.ScatterErrorsAsWarnings { + for _, handle := range handles { + // Fetch field info from just one stream. + fields = <-handle.fields + // If fields is nil, it means there was an error. + if fields != nil { + break + } + } + } else { + // Fetch field info from just one stream. + fields = <-handles[0].fields + } if fields == nil { return handles[0].err } + if err := callback(&sqltypes.Result{Fields: fields}); err != nil { return err } @@ -100,6 +113,7 @@ func (ms *MergeSort) StreamExecute(vcursor VCursor, bindVars map[string]*querypb comparers: comparers, } + var errs []error // Prime the heap. One element must be pulled from // each stream. for i, handle := range handles { @@ -107,6 +121,10 @@ func (ms *MergeSort) StreamExecute(vcursor VCursor, bindVars map[string]*querypb case row, ok := <-handle.row: if !ok { if handle.err != nil { + if ms.ScatterErrorsAsWarnings { + errs = append(errs, handle.err) + break + } return handle.err } // It's possible that a stream returns no rows. @@ -154,7 +172,7 @@ func (ms *MergeSort) StreamExecute(vcursor VCursor, bindVars map[string]*querypb return ctx.Err() } } - return nil + return vterrors.Aggregate(errs) } func (ms *MergeSort) description() PrimitiveDescription { diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index b7696e1fc6e..b247becc5ac 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -375,8 +375,9 @@ func (route *Route) mergeSort(vcursor VCursor, bindVars map[string]*querypb.Bind }) } ms := MergeSort{ - Primitives: prims, - OrderBy: route.OrderBy, + Primitives: prims, + OrderBy: route.OrderBy, + ScatterErrorsAsWarnings: route.ScatterErrorsAsWarnings, } err := ms.StreamExecute(vcursor, bindVars, wantfields, func(qr *sqltypes.Result) error { return callback(qr.Truncate(route.TruncateColumnCount)) diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 30179ebdc5f..649eac9ca89 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -1229,6 +1229,18 @@ func TestSelectScatterPartialOLAP2(t *testing.T) { require.NoError(t, err) assert.EqualValues(t, 7, len(results.Rows)) testQueryLog(t, logChan, "TestExecuteStream", "SELECT", "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user", 8) + + // order by + results, err = executorStream(executor, "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user order by id") + require.NoError(t, err) + assert.EqualValues(t, 7, len(results.Rows)) + testQueryLog(t, logChan, "TestExecuteStream", "SELECT", "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user order by id", 8) + + // order by and limit + results, err = executorStream(executor, "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user order by id limit 5") + require.NoError(t, err) + assert.EqualValues(t, 5, len(results.Rows)) + testQueryLog(t, logChan, "TestExecuteStream", "SELECT", "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user order by id limit 5", 8) } func TestStreamSelectScatter(t *testing.T) { From 61265cfe86023f7b241050b4ecdf6106d4d452f1 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Mon, 17 May 2021 21:21:04 +0200 Subject: [PATCH 07/17] codegen Signed-off-by: Andres Taylor --- go/vt/vtgate/engine/cached_size.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtgate/engine/cached_size.go b/go/vt/vtgate/engine/cached_size.go index ef8d5b448bb..29f2b923b6b 100644 --- a/go/vt/vtgate/engine/cached_size.go +++ b/go/vt/vtgate/engine/cached_size.go @@ -339,7 +339,7 @@ func (cached *MergeSort) CachedSize(alloc bool) int64 { } size := int64(0) if alloc { - size += int64(48) + size += int64(49) } // field Primitives []vitess.io/vitess/go/vt/vtgate/engine.StreamExecutor { From 9eae5b6be8f942045537e5db56b43866e8236ee4 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Tue, 18 May 2021 12:08:00 +0200 Subject: [PATCH 08/17] ScatterErrorsAsWarnings should not kick in when all shards fail Signed-off-by: Andres Taylor --- go/vt/vtgate/engine/fake_vcursor_test.go | 9 ++++--- go/vt/vtgate/engine/primitive.go | 2 +- go/vt/vtgate/engine/route.go | 32 ++++++++++++------------ go/vt/vtgate/engine/send.go | 3 ++- go/vt/vtgate/engine/shard_route.go | 6 +++-- go/vt/vtgate/executor.go | 2 +- go/vt/vtgate/executor_select_test.go | 17 ++++++------- go/vt/vtgate/legacy_scatter_conn_test.go | 4 +-- go/vt/vtgate/scatter_conn.go | 4 +-- go/vt/vtgate/vcursor_impl.go | 4 +-- 10 files changed, 42 insertions(+), 41 deletions(-) diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index d5a2cbc7d8d..e6d4c6befcf 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -230,7 +230,7 @@ func (t *noopVCursor) ExecuteStandalone(query string, bindvars map[string]*query panic("unimplemented") } -func (t *noopVCursor) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error { +func (t *noopVCursor) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) []error { panic("unimplemented") } @@ -401,15 +401,16 @@ func (f *loggingVCursor) ExecuteStandalone(query string, bindvars map[string]*qu return f.nextResult() } -func (f *loggingVCursor) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error { +func (f *loggingVCursor) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) []error { f.mu.Lock() f.log = append(f.log, fmt.Sprintf("StreamExecuteMulti %s %s", query, printResolvedShardsBindVars(rss, bindVars))) r, err := f.nextResult() f.mu.Unlock() if err != nil { - return err + return []error{err} } - return callback(r) + + return []error{callback(r)} } func (f *loggingVCursor) ResolveDestinations(keyspace string, ids []*querypb.Value, destinations []key.Destination) ([]*srvtopo.ResolvedShard, [][]*querypb.Value, error) { diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index 7561b341898..3ad6420f99a 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -74,7 +74,7 @@ type ( // Shard-level functions. ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) (*sqltypes.Result, []error) ExecuteStandalone(query string, bindvars map[string]*querypb.BindVariable, rs *srvtopo.ResolvedShard) (*sqltypes.Result, error) - StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error + StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) []error // Keyspace ID level functions. ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index b247becc5ac..32427348556 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -282,18 +282,16 @@ func (route *Route) execute(vcursor VCursor, bindVars map[string]*querypb.BindVa result, errs := vcursor.ExecuteMultiShard(rss, queries, false /* rollbackOnError */, false /* autocommit */) if errs != nil { - if route.ScatterErrorsAsWarnings { - partialSuccessScatterQueries.Add(1) + if !route.ScatterErrorsAsWarnings || len(errs) == len(rss) { + return nil, vterrors.Aggregate(errs) + } + partialSuccessScatterQueries.Add(1) - for _, err := range errs { - if err != nil { - serr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) - vcursor.Session().RecordWarning(&querypb.QueryWarning{Code: uint32(serr.Num), Message: err.Error()}) - } + for _, err := range errs { + if err != nil { + serr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) + vcursor.Session().RecordWarning(&querypb.QueryWarning{Code: uint32(serr.Num), Message: err.Error()}) } - // fall through - } else { - return nil, vterrors.Aggregate(errs) } } if len(route.OrderBy) == 0 { @@ -347,16 +345,18 @@ func (route *Route) StreamExecute(vcursor VCursor, bindVars map[string]*querypb. } if len(route.OrderBy) == 0 { - err = vcursor.StreamExecuteMulti(route.Query, rss, bvs, func(qr *sqltypes.Result) error { + errs := vcursor.StreamExecuteMulti(route.Query, rss, bvs, func(qr *sqltypes.Result) error { return callback(qr.Truncate(route.TruncateColumnCount)) }) - if err != nil { - if !route.ScatterErrorsAsWarnings { - return err + if len(errs) > 0 { + if !route.ScatterErrorsAsWarnings || len(errs) == len(rss) { + return vterrors.Aggregate(errs) } partialSuccessScatterQueries.Add(1) - sErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) - vcursor.Session().RecordWarning(&querypb.QueryWarning{Code: uint32(sErr.Num), Message: err.Error()}) + for _, err := range errs { + sErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) + vcursor.Session().RecordWarning(&querypb.QueryWarning{Code: uint32(sErr.Num), Message: err.Error()}) + } } return nil } diff --git a/go/vt/vtgate/engine/send.go b/go/vt/vtgate/engine/send.go index 9a7fffa9664..9c2d7f33022 100644 --- a/go/vt/vtgate/engine/send.go +++ b/go/vt/vtgate/engine/send.go @@ -152,7 +152,8 @@ func (s *Send) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindV } multiBindVars[i] = bv } - return vcursor.StreamExecuteMulti(s.Query, rss, multiBindVars, callback) + errors := vcursor.StreamExecuteMulti(s.Query, rss, multiBindVars, callback) + return vterrors.Aggregate(errors) } // GetFields implements Primitive interface diff --git a/go/vt/vtgate/engine/shard_route.go b/go/vt/vtgate/engine/shard_route.go index 1d695489c03..2389facbff3 100644 --- a/go/vt/vtgate/engine/shard_route.go +++ b/go/vt/vtgate/engine/shard_route.go @@ -20,6 +20,7 @@ import ( "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/srvtopo" + "vitess.io/vitess/go/vt/vterrors" ) var _ StreamExecutor = (*shardRoute)(nil) @@ -33,6 +34,7 @@ type shardRoute struct { } // StreamExecute performs a streaming exec. -func (sr *shardRoute) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { - return vcursor.StreamExecuteMulti(sr.query, []*srvtopo.ResolvedShard{sr.rs}, []map[string]*querypb.BindVariable{sr.bv}, callback) +func (sr *shardRoute) StreamExecute(vcursor VCursor, _ map[string]*querypb.BindVariable, _ bool, callback func(*sqltypes.Result) error) error { + errors := vcursor.StreamExecuteMulti(sr.query, []*srvtopo.ResolvedShard{sr.rs}, []map[string]*querypb.BindVariable{sr.bv}, callback) + return vterrors.Aggregate(errors) } diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 4be1cf66c6c..b03845ee056 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -1476,7 +1476,7 @@ func (e *Executor) ExecuteMultiShard(ctx context.Context, rss []*srvtopo.Resolve } // StreamExecuteMulti implements the IExecutor interface -func (e *Executor) StreamExecuteMulti(ctx context.Context, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error) error { +func (e *Executor) StreamExecuteMulti(ctx context.Context, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error) []error { return e.scatterConn.StreamExecuteMulti(ctx, query, rss, vars, options, callback) } diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 649eac9ca89..0c9834dd79f 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -1122,7 +1122,7 @@ func TestSelectScatterPartial(t *testing.T) { } testQueryLog(t, logChan, "TestExecute", "SELECT", "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user", 8) - // Even if all shards fail the operation succeeds with 0 rows + // When all shards fail, the execution should also fail conns[0].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 conns[1].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 conns[3].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 @@ -1131,11 +1131,9 @@ func TestSelectScatterPartial(t *testing.T) { conns[6].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 conns[7].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 - results, err = executorExec(executor, "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user", nil) - require.NoError(t, err) - if results == nil || len(results.Rows) != 0 { - t.Errorf("want 0 result rows, got %v", results) - } + _, err = executorExec(executor, "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user", nil) + require.Error(t, err) + testQueryLog(t, logChan, "TestExecute", "SELECT", "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user", 8) } @@ -1173,7 +1171,7 @@ func TestSelectScatterPartialOLAP(t *testing.T) { assert.EqualValues(t, 7, len(results.Rows)) testQueryLog(t, logChan, "TestExecuteStream", "SELECT", "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user", 8) - // Even if all shards fail the operation succeeds with 0 rows + // If all shards fail, the operation should also fail conns[0].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 conns[1].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 conns[3].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 @@ -1182,9 +1180,8 @@ func TestSelectScatterPartialOLAP(t *testing.T) { conns[6].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 conns[7].MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1000 - results, err = executorStream(executor, "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user") - require.NoError(t, err) - require.Empty(t, results.Rows) + _, err = executorStream(executor, "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user") + require.Error(t, err) testQueryLog(t, logChan, "TestExecuteStream", "SELECT", "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user", 8) } diff --git a/go/vt/vtgate/legacy_scatter_conn_test.go b/go/vt/vtgate/legacy_scatter_conn_test.go index 4c48cd2bf04..a3f46b89c31 100644 --- a/go/vt/vtgate/legacy_scatter_conn_test.go +++ b/go/vt/vtgate/legacy_scatter_conn_test.go @@ -151,11 +151,11 @@ func TestScatterConnStreamExecuteMulti(t *testing.T) { } bvs := make([]map[string]*querypb.BindVariable, len(rss)) qr := new(sqltypes.Result) - err = sc.StreamExecuteMulti(ctx, "query", rss, bvs, nil, func(r *sqltypes.Result) error { + errors := sc.StreamExecuteMulti(ctx, "query", rss, bvs, nil, func(r *sqltypes.Result) error { qr.AppendResult(r) return nil }) - return qr, err + return qr, vterrors.Aggregate(errors) }) } diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index fafd1f124f2..b2246cec40f 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -371,7 +371,7 @@ func (stc *ScatterConn) StreamExecuteMulti( bindVars []map[string]*querypb.BindVariable, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error, -) error { +) []error { // mu protects fieldSent, callback and replyErr var mu sync.Mutex fieldSent := false @@ -381,7 +381,7 @@ func (stc *ScatterConn) StreamExecuteMulti( return stc.processOneStreamingResult(&mu, &fieldSent, qr, callback) }) }) - return allErrors.AggrError(vterrors.Aggregate) + return allErrors.GetErrors() } // timeTracker is a convenience wrapper used by MessageStream diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 6efc72b1e4f..2259c2f7ec1 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -69,7 +69,7 @@ var _ vindexes.VCursor = (*vcursorImpl)(nil) type iExecute interface { Execute(ctx context.Context, method string, session *SafeSession, s string, vars map[string]*querypb.BindVariable) (*sqltypes.Result, error) ExecuteMultiShard(ctx context.Context, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool) (qr *sqltypes.Result, errs []error) - StreamExecuteMulti(ctx context.Context, s string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error) error + StreamExecuteMulti(ctx context.Context, s string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error) []error ExecuteLock(ctx context.Context, rs *srvtopo.ResolvedShard, query *querypb.BoundQuery, session *SafeSession) (*sqltypes.Result, error) Commit(ctx context.Context, safeSession *SafeSession) error ExecuteMessageStream(ctx context.Context, rss []*srvtopo.ResolvedShard, name string, callback func(*sqltypes.Result) error) error @@ -451,7 +451,7 @@ func (vc *vcursorImpl) ExecuteStandalone(query string, bindVars map[string]*quer } // StreamExeculteMulti is the streaming version of ExecuteMultiShard. -func (vc *vcursorImpl) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error { +func (vc *vcursorImpl) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) []error { atomic.AddUint64(&vc.logStats.ShardQueries, uint64(len(rss))) return vc.executor.StreamExecuteMulti(vc.ctx, vc.marginComments.Leading+query+vc.marginComments.Trailing, rss, bindVars, vc.safeSession.Options, callback) } From 8bcd0ab41c69ae294599a5b222d8ae3009add75d Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 18 May 2021 16:42:29 +0530 Subject: [PATCH 09/17] plan showwarnings Signed-off-by: Harshit Gangal --- go/vt/sqlparser/ast_funcs.go | 2 + go/vt/sqlparser/constants.go | 2 + go/vt/sqlparser/parse_test.go | 3 +- go/vt/sqlparser/sql.go | 2 +- go/vt/sqlparser/sql.y | 2 +- go/vt/vtgate/engine/fake_vcursor_test.go | 4 ++ go/vt/vtgate/engine/primitive.go | 1 + go/vt/vtgate/engine/session.go | 63 ++++++++++++++++++++++++ go/vt/vtgate/executor.go | 22 --------- go/vt/vtgate/planbuilder/show.go | 32 ++++++++++++ go/vt/vtgate/vcursor_impl.go | 5 ++ 11 files changed, 112 insertions(+), 26 deletions(-) create mode 100644 go/vt/vtgate/engine/session.go diff --git a/go/vt/sqlparser/ast_funcs.go b/go/vt/sqlparser/ast_funcs.go index 1e3dd08ca10..d9ead9fad5c 100644 --- a/go/vt/sqlparser/ast_funcs.go +++ b/go/vt/sqlparser/ast_funcs.go @@ -1259,6 +1259,8 @@ func (ty ShowCommandType) ToString() string { return VGtidExecGlobalStr case VitessMigrations: return VitessMigrationsStr + case Warnings: + return WarningsStr case Keyspace: return KeyspaceStr default: diff --git a/go/vt/sqlparser/constants.go b/go/vt/sqlparser/constants.go index 9978de0c4b2..8cb1e63779c 100644 --- a/go/vt/sqlparser/constants.go +++ b/go/vt/sqlparser/constants.go @@ -240,6 +240,7 @@ const ( VGtidExecGlobalStr = " global vgtid_executed" KeyspaceStr = " keyspaces" VitessMigrationsStr = " vitess_migrations" + WarningsStr = " warnings" // DropKeyType strings PrimaryKeyTypeStr = "primary key" @@ -498,6 +499,7 @@ const ( VariableSession VGtidExecGlobal VitessMigrations + Warnings Keyspace ) diff --git a/go/vt/sqlparser/parse_test.go b/go/vt/sqlparser/parse_test.go index b062dd3a880..1ccbb79a4ce 100644 --- a/go/vt/sqlparser/parse_test.go +++ b/go/vt/sqlparser/parse_test.go @@ -1595,8 +1595,7 @@ var ( }, { input: "alter vitess_migration cancel all", }, { - input: "show warnings", - output: "show warnings", + input: "show warnings", }, { input: "select warnings from t", output: "select `warnings` from t", diff --git a/go/vt/sqlparser/sql.go b/go/vt/sqlparser/sql.go index b65a7d607dd..4c0faebdac3 100644 --- a/go/vt/sqlparser/sql.go +++ b/go/vt/sqlparser/sql.go @@ -9081,7 +9081,7 @@ yydefault: var yyLOCAL Statement //line sql.y:2531 { - yyLOCAL = &Show{&ShowLegacy{Type: string(yyDollar[2].str), Scope: ImplicitScope}} + yyLOCAL = &Show{&ShowBasic{Command: Warnings}} } yyVAL.union = yyLOCAL case 468: diff --git a/go/vt/sqlparser/sql.y b/go/vt/sqlparser/sql.y index bd6b0fdfda1..78aaa987afc 100644 --- a/go/vt/sqlparser/sql.y +++ b/go/vt/sqlparser/sql.y @@ -2529,7 +2529,7 @@ show_statement: } | SHOW WARNINGS { - $$ = &Show{&ShowLegacy{Type: string($2), Scope: ImplicitScope}} + $$ = &Show{&ShowBasic{Command: Warnings}} } /* vitess_topo supports SHOW VITESS_SHARDS / SHOW VITESS_TABLETS */ | SHOW vitess_topo like_or_where_opt diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index e6d4c6befcf..76aadb8fb67 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -54,6 +54,10 @@ type noopVCursor struct { ctx context.Context } +func (t *noopVCursor) GetWarnings() []*querypb.QueryWarning { + panic("implement me") +} + func (t *noopVCursor) VStream(rss []*srvtopo.ResolvedShard, filter *binlogdatapb.Filter, gtid string, callback func(evs []*binlogdatapb.VEvent) error) error { panic("implement me") } diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index 3ad6420f99a..ab08f72951e 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -152,6 +152,7 @@ type ( // HasCreatedTempTable will mark the session as having created temp tables HasCreatedTempTable() + GetWarnings() []*querypb.QueryWarning } // Plan represents the execution strategy for a given query. diff --git a/go/vt/vtgate/engine/session.go b/go/vt/vtgate/engine/session.go new file mode 100644 index 00000000000..e6fab7add3f --- /dev/null +++ b/go/vt/vtgate/engine/session.go @@ -0,0 +1,63 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" +) + +type Session struct { + Action func(sa SessionActions) (*sqltypes.Result, error) + + noInputs + noTxNeeded +} + +var _ Primitive = (*Session)(nil) + +func (s Session) RouteType() string { + return "SHOW" +} + +func (s Session) GetKeyspaceName() string { + return "" +} + +func (s Session) GetTableName() string { + return "" +} + +func (s Session) Execute(vcursor VCursor, _ map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) { + return s.Action(vcursor.Session()) +} + +func (s Session) StreamExecute(vcursor VCursor, _ map[string]*querypb.BindVariable, _ bool, callback func(*sqltypes.Result) error) error { + qr, err := s.Action(vcursor.Session()) + if err != nil { + return err + } + return callback(qr) +} + +func (s Session) GetFields(_ VCursor, _ map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + panic("implement me") +} + +func (s Session) description() PrimitiveDescription { + panic("implement me") +} diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index b03845ee056..da0578e22c6 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -836,28 +836,6 @@ func (e *Executor) handleShow(ctx context.Context, safeSession *SafeSession, sql Fields: buildVarCharFields("Keyspace", "Name", "Type", "Params", "Owner"), Rows: rows, }, nil - case sqlparser.KeywordString(sqlparser.WARNINGS): - fields := []*querypb.Field{ - {Name: "Level", Type: sqltypes.VarChar}, - {Name: "Code", Type: sqltypes.Uint16}, - {Name: "Message", Type: sqltypes.VarChar}, - } - rows := make([][]sqltypes.Value, 0) - - if safeSession.Warnings != nil { - for _, warning := range safeSession.Warnings { - rows = append(rows, []sqltypes.Value{ - sqltypes.NewVarChar("Warning"), - sqltypes.NewUint32(warning.Code), - sqltypes.NewVarChar(warning.Message), - }) - } - } - - return &sqltypes.Result{ - Fields: fields, - Rows: rows, - }, nil } // Any other show statement is passed through diff --git a/go/vt/vtgate/planbuilder/show.go b/go/vt/vtgate/planbuilder/show.go index 8333844478c..b8c0f2752a7 100644 --- a/go/vt/vtgate/planbuilder/show.go +++ b/go/vt/vtgate/planbuilder/show.go @@ -74,6 +74,8 @@ func buildShowBasicPlan(show *sqlparser.ShowBasic, vschema ContextVSchema) (engi return buildShowVGtidPlan(show, vschema) case sqlparser.GtidExecGlobal: return buildShowGtidPlan(show, vschema) + case sqlparser.Warnings: + return buildWarnings(show, vschema) } return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unknown show query type %s", show.Command.ToString()) @@ -535,3 +537,33 @@ func buildShowGtidPlan(show *sqlparser.ShowBasic, vschema ContextVSchema) (engin ShardNameNeeded: true, }, nil } + +func buildWarnings(show *sqlparser.ShowBasic, vschema ContextVSchema) (engine.Primitive, error) { + + f := func(sa engine.SessionActions) (*sqltypes.Result, error) { + fields := []*querypb.Field{ + {Name: "Level", Type: sqltypes.VarChar}, + {Name: "Code", Type: sqltypes.Uint16}, + {Name: "Message", Type: sqltypes.VarChar}, + } + + warns := sa.GetWarnings() + rows := make([][]sqltypes.Value, 0, len(warns)) + + for _, warn := range warns { + rows = append(rows, []sqltypes.Value{ + sqltypes.NewVarChar("Warning"), + sqltypes.NewUint32(warn.Code), + sqltypes.NewVarChar(warn.Message), + }) + } + return &sqltypes.Result{ + Fields: fields, + Rows: rows, + }, nil + } + + return &engine.Session{ + Action: f, + }, nil +} diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 2259c2f7ec1..284fd9114cc 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -692,6 +692,11 @@ func (vc *vcursorImpl) HasCreatedTempTable() { vc.safeSession.GetOrCreateOptions().HasCreatedTempTables = true } +// GetWarnings implements the SessionActions interface +func (vc *vcursorImpl) GetWarnings() []*querypb.QueryWarning { + return vc.safeSession.GetWarnings() +} + // GetDBDDLPluginName implements the VCursor interface func (vc *vcursorImpl) GetDBDDLPluginName() string { return *dbDDLPlugin From 3ffa14ce070962e8dee135116bd465103dfd0c95 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Tue, 18 May 2021 13:28:16 +0200 Subject: [PATCH 10/17] clean up code a little Signed-off-by: Andres Taylor --- .../endtoend/mysqlserver/mysql_server_test.go | 14 ++-- go/vt/vtgate/engine/session.go | 63 -------------- go/vt/vtgate/engine/session_primitive.go | 83 +++++++++++++++++++ go/vt/vtgate/planbuilder/show.go | 8 +- .../planbuilder/testdata/show_cases.txt | 10 +++ 5 files changed, 103 insertions(+), 75 deletions(-) delete mode 100644 go/vt/vtgate/engine/session.go create mode 100644 go/vt/vtgate/engine/session_primitive.go diff --git a/go/test/endtoend/mysqlserver/mysql_server_test.go b/go/test/endtoend/mysqlserver/mysql_server_test.go index 2c8172c36c6..8eb4cc0ec9e 100644 --- a/go/test/endtoend/mysqlserver/mysql_server_test.go +++ b/go/test/endtoend/mysqlserver/mysql_server_test.go @@ -142,16 +142,16 @@ func TestWarnings(t *testing.T) { ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) - require.Nilf(t, err, "unable to connect mysql: %v", err) + require.NoError(t, err) defer conn.Close() // validate warning with invalid_field error as warning qr, err := conn.ExecuteFetch("SELECT /*vt+ SCATTER_ERRORS_AS_WARNINGS */ invalid_field from vt_insert_test;", 1, false) - require.Nilf(t, err, "select error : %v", err) + require.NoError(t, err) assert.Empty(t, qr.Rows, "number of rows") qr, err = conn.ExecuteFetch("SHOW WARNINGS;", 1, false) - require.Nilf(t, err, "SHOW WARNINGS; execution failed: %v", err) + require.NoError(t, err, "SHOW WARNINGS") assert.EqualValues(t, 1, len(qr.Rows), "number of rows") assert.Contains(t, qr.Rows[0][0].String(), "VARCHAR(\"Warning\")", qr.Rows) assert.Contains(t, qr.Rows[0][1].String(), "UINT16(1054)", qr.Rows) @@ -159,11 +159,11 @@ func TestWarnings(t *testing.T) { // validate warning with query_timeout error as warning qr, err = conn.ExecuteFetch("SELECT /*vt+ SCATTER_ERRORS_AS_WARNINGS QUERY_TIMEOUT_MS=1 */ sleep(1) from vt_insert_test;", 1, false) - require.Nilf(t, err, "insertion error : %v", err) + require.NoError(t, err) assert.Empty(t, qr.Rows, "number of rows") qr, err = conn.ExecuteFetch("SHOW WARNINGS;", 1, false) - require.Nilf(t, err, "SHOW WARNINGS; execution failed: %v", err) + require.NoError(t, err, "SHOW WARNINGS") assert.EqualValues(t, 1, len(qr.Rows), "number of rows") assert.Contains(t, qr.Rows[0][0].String(), "VARCHAR(\"Warning\")", qr.Rows) assert.Contains(t, qr.Rows[0][1].String(), "UINT16(1317)", qr.Rows) @@ -171,10 +171,10 @@ func TestWarnings(t *testing.T) { // validate with 0 warnings _, err = conn.ExecuteFetch("SELECT 1 from vt_insert_test limit 1", 1, false) - require.Nilf(t, err, "select error: %v", err) + require.NoError(t, err) qr, err = conn.ExecuteFetch("SHOW WARNINGS;", 1, false) - require.Nilf(t, err, "SHOW WARNINGS; execution failed: %v", err) + require.NoError(t, err) assert.Empty(t, len(qr.Rows), "number of rows") } diff --git a/go/vt/vtgate/engine/session.go b/go/vt/vtgate/engine/session.go deleted file mode 100644 index e6fab7add3f..00000000000 --- a/go/vt/vtgate/engine/session.go +++ /dev/null @@ -1,63 +0,0 @@ -/* -Copyright 2021 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package engine - -import ( - "vitess.io/vitess/go/sqltypes" - querypb "vitess.io/vitess/go/vt/proto/query" -) - -type Session struct { - Action func(sa SessionActions) (*sqltypes.Result, error) - - noInputs - noTxNeeded -} - -var _ Primitive = (*Session)(nil) - -func (s Session) RouteType() string { - return "SHOW" -} - -func (s Session) GetKeyspaceName() string { - return "" -} - -func (s Session) GetTableName() string { - return "" -} - -func (s Session) Execute(vcursor VCursor, _ map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) { - return s.Action(vcursor.Session()) -} - -func (s Session) StreamExecute(vcursor VCursor, _ map[string]*querypb.BindVariable, _ bool, callback func(*sqltypes.Result) error) error { - qr, err := s.Action(vcursor.Session()) - if err != nil { - return err - } - return callback(qr) -} - -func (s Session) GetFields(_ VCursor, _ map[string]*querypb.BindVariable) (*sqltypes.Result, error) { - panic("implement me") -} - -func (s Session) description() PrimitiveDescription { - panic("implement me") -} diff --git a/go/vt/vtgate/engine/session_primitive.go b/go/vt/vtgate/engine/session_primitive.go new file mode 100644 index 00000000000..22e17e3b74c --- /dev/null +++ b/go/vt/vtgate/engine/session_primitive.go @@ -0,0 +1,83 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" +) + +// SessionPrimitive the session primitive is a very small primitive used +// when we have simple engine code that needs to interact with the Session +type SessionPrimitive struct { + action func(sa SessionActions) (*sqltypes.Result, error) + name string + + noInputs + noTxNeeded +} + +var _ Primitive = (*SessionPrimitive)(nil) + +// NewSessionPrimitive creates a SessionPrimitive +func NewSessionPrimitive(name string, action func(sa SessionActions) (*sqltypes.Result, error)) *SessionPrimitive { + return &SessionPrimitive{ + action: action, + name: name, + } +} + +// RouteType implements the Primitive interface +func (s *SessionPrimitive) RouteType() string { + return "SHOW" +} + +// GetKeyspaceName implements the Primitive interface +func (s *SessionPrimitive) GetKeyspaceName() string { + return "" +} + +// GetTableName implements the Primitive interface +func (s *SessionPrimitive) GetTableName() string { + return "" +} + +// Execute implements the Primitive interface +func (s *SessionPrimitive) Execute(vcursor VCursor, _ map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) { + return s.action(vcursor.Session()) +} + +// StreamExecute implements the Primitive interface +func (s *SessionPrimitive) StreamExecute(vcursor VCursor, _ map[string]*querypb.BindVariable, _ bool, callback func(*sqltypes.Result) error) error { + qr, err := s.action(vcursor.Session()) + if err != nil { + return err + } + return callback(qr) +} + +// GetFields implements the Primitive interface +func (s *SessionPrimitive) GetFields(_ VCursor, _ map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + panic("implement me") +} + +// description implements the Primitive interface +func (s *SessionPrimitive) description() PrimitiveDescription { + return PrimitiveDescription{ + OperatorType: s.name, + } +} diff --git a/go/vt/vtgate/planbuilder/show.go b/go/vt/vtgate/planbuilder/show.go index b8c0f2752a7..bce8a701d7b 100644 --- a/go/vt/vtgate/planbuilder/show.go +++ b/go/vt/vtgate/planbuilder/show.go @@ -75,7 +75,7 @@ func buildShowBasicPlan(show *sqlparser.ShowBasic, vschema ContextVSchema) (engi case sqlparser.GtidExecGlobal: return buildShowGtidPlan(show, vschema) case sqlparser.Warnings: - return buildWarnings(show, vschema) + return buildWarnings() } return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unknown show query type %s", show.Command.ToString()) @@ -538,7 +538,7 @@ func buildShowGtidPlan(show *sqlparser.ShowBasic, vschema ContextVSchema) (engin }, nil } -func buildWarnings(show *sqlparser.ShowBasic, vschema ContextVSchema) (engine.Primitive, error) { +func buildWarnings() (engine.Primitive, error) { f := func(sa engine.SessionActions) (*sqltypes.Result, error) { fields := []*querypb.Field{ @@ -563,7 +563,5 @@ func buildWarnings(show *sqlparser.ShowBasic, vschema ContextVSchema) (engine.Pr }, nil } - return &engine.Session{ - Action: f, - }, nil + return engine.NewSessionPrimitive("SHOW WARNINGS", f), nil } diff --git a/go/vt/vtgate/planbuilder/testdata/show_cases.txt b/go/vt/vtgate/planbuilder/testdata/show_cases.txt index 8c36621c8e2..f0ab459c4da 100644 --- a/go/vt/vtgate/planbuilder/testdata/show_cases.txt +++ b/go/vt/vtgate/planbuilder/testdata/show_cases.txt @@ -553,3 +553,13 @@ "ShardNameNeeded": true } } + +# show warnings +"show warnings" +{ + "QueryType": "SHOW", + "Original": "show warnings", + "Instructions": { + "OperatorType": "SHOW WARNINGS" + } +} From 2f2e59d09061909a2269ff82fd6d3a7fbfe08fc7 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Wed, 19 May 2021 07:43:57 +0200 Subject: [PATCH 11/17] filter out empty errors Signed-off-by: Andres Taylor --- .../vtgate/errors_as_warnings/main_test.go | 66 +++--- go/vt/vtgate/engine/route.go | 21 +- go/vt/vtgate/engine/route_test.go | 198 ++++++++---------- 3 files changed, 141 insertions(+), 144 deletions(-) diff --git a/go/test/endtoend/vtgate/errors_as_warnings/main_test.go b/go/test/endtoend/vtgate/errors_as_warnings/main_test.go index 77169f9efc7..96bb59c4ee9 100644 --- a/go/test/endtoend/vtgate/errors_as_warnings/main_test.go +++ b/go/test/endtoend/vtgate/errors_as_warnings/main_test.go @@ -21,11 +21,11 @@ import ( "flag" "fmt" "os" + "strings" "testing" "github.com/google/go-cmp/cmp" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" @@ -118,33 +118,38 @@ func TestScatterErrsAsWarns(t *testing.T) { checkedExec(t, oltp, `delete from t1`) }() - // connection setup - checkedExec(t, oltp, "use @replica") - checkedExec(t, oltp, "set workload = oltp") - checkedExec(t, olap, "use @replica") - checkedExec(t, olap, "set workload = olap") - - // stop one tablet from the first shard - require.NoError(t, - clusterInstance.Keyspaces[0].Shards[0].Replica().MysqlctlProcess.Stop()) - query1 := `select /*vt+ SCATTER_ERRORS_AS_WARNINGS */ id1 from t1` query2 := `select /*vt+ SCATTER_ERRORS_AS_WARNINGS */ id1 from t1 order by id1` + showQ := "show warnings" - assertMatches(t, oltp, query1, `[[INT64(4)]]`) - assertMatches(t, olap, query1, `[[INT64(4)]]`) - assertMatches(t, oltp, query2, `[[INT64(4)]]`) - assertMatches(t, olap, query2, `[[INT64(4)]]`) + // stop the mysql on one tablet, query will fail at vttablet level + require.NoError(t, + clusterInstance.Keyspaces[0].Shards[0].Replica().MysqlctlProcess.Stop()) - // change tablet type - assert.NoError(t, - clusterInstance.VtctlclientProcess.ExecuteCommand( - "ChangeTabletType", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "spare")) + modes := []struct { + conn *mysql.Conn + m string + }{ + {m: "oltp", conn: oltp}, + {m: "olap", conn: olap}, + } - assertMatches(t, oltp, query1, `[[INT64(4)]]`) - assertMatches(t, olap, query1, `[[INT64(4)]]`) - assertMatches(t, oltp, query2, `[[INT64(4)]]`) - assertMatches(t, olap, query2, `[[INT64(4)]]`) + for _, mode := range modes { + t.Run(mode.m, func(t *testing.T) { + // connection setup + checkedExec(t, mode.conn, "use @replica") + checkedExec(t, mode.conn, fmt.Sprintf("set workload = %s", mode.m)) + + assertMatches(t, mode.conn, query1, `[[INT64(4)]]`) + assertContainsOneOf(t, mode.conn, showQ, "no valid tablet", "no healthy tablet") + assertMatches(t, mode.conn, query2, `[[INT64(4)]]`) + assertContainsOneOf(t, mode.conn, showQ, "no valid tablet", "no healthy tablet") + + // invalid_field should throw error and not warning + _, err = mode.conn.ExecuteFetch("SELECT /*vt+ SCATTER_ERRORS_AS_WARNINGS */ invalid_field from t1;", 1, false) + require.EqualError(t, err, "target: test_keyspace.0.master: vttablet: rpc error: code = NotFound desc = Unknown column 'invalid_field' in 'field list' (errno 1054) (sqlstate 42S22) (CallerID: vtgate client 1): Sql: \"select /*vt+ SCATTER_ERRORS_AS_WARNINGS */ invalid_field from vt_insert_test\", BindVars: {} (errno 1054) (sqlstate 42S22) during query: SELECT /*vt+ SCATTER_ERRORS_AS_WARNINGS */ invalid_field from vt_insert_test") + }) + } } func checkedExec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { @@ -160,6 +165,19 @@ func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) { got := fmt.Sprintf("%v", qr.Rows) diff := cmp.Diff(expected, got) if diff != "" { - t.Errorf("Query: %s (-want +got):\n%s", query, diff) + t.Errorf("Query: %s (-want +got):\n%s\n%s", query, diff, got) } } + +func assertContainsOneOf(t *testing.T, conn *mysql.Conn, query string, expected ...string) { + t.Helper() + qr := checkedExec(t, conn, query) + got := fmt.Sprintf("%v", qr.Rows) + for _, s := range expected { + if strings.Contains(got, s) { + return + } + } + + t.Errorf("%s\n did not match any of %v", got, expected) +} diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index 32427348556..f62b9aaeb6d 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -264,7 +264,7 @@ func (route *Route) execute(vcursor VCursor, bindVars map[string]*querypb.BindVa rss, bvs, err = nil, nil, nil default: // Unreachable. - return nil, fmt.Errorf("unsupported query route: %v", route) + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported query route: %v", route) } if err != nil { return nil, err @@ -282,18 +282,19 @@ func (route *Route) execute(vcursor VCursor, bindVars map[string]*querypb.BindVa result, errs := vcursor.ExecuteMultiShard(rss, queries, false /* rollbackOnError */, false /* autocommit */) if errs != nil { + errs = filterOutNilErrors(errs) if !route.ScatterErrorsAsWarnings || len(errs) == len(rss) { return nil, vterrors.Aggregate(errs) } + partialSuccessScatterQueries.Add(1) for _, err := range errs { - if err != nil { - serr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) - vcursor.Session().RecordWarning(&querypb.QueryWarning{Code: uint32(serr.Num), Message: err.Error()}) - } + serr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) + vcursor.Session().RecordWarning(&querypb.QueryWarning{Code: uint32(serr.Num), Message: err.Error()}) } } + if len(route.OrderBy) == 0 { return result, nil } @@ -301,6 +302,16 @@ func (route *Route) execute(vcursor VCursor, bindVars map[string]*querypb.BindVa return route.sort(result) } +func filterOutNilErrors(errs []error) []error { + var errors []error + for _, err := range errs { + if err != nil { + errors = append(errors, err) + } + } + return errors +} + // StreamExecute performs a streaming exec. func (route *Route) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { var rss []*srvtopo.ResolvedShard diff --git a/go/vt/vtgate/engine/route_test.go b/go/vt/vtgate/engine/route_test.go index 2bb18aee82a..cc160b241df 100644 --- a/go/vt/vtgate/engine/route_test.go +++ b/go/vt/vtgate/engine/route_test.go @@ -20,6 +20,8 @@ import ( "errors" "testing" + "github.com/stretchr/testify/assert" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" @@ -1138,129 +1140,95 @@ func TestParamsFail(t *testing.T) { } func TestExecFail(t *testing.T) { - // Unsharded error - sel := NewRoute( - SelectUnsharded, - &vindexes.Keyspace{ - Name: "ks", - Sharded: false, - }, - "dummy_select", - "dummy_select_field", - ) - vc := &loggingVCursor{shards: []string{"0"}, resultErr: vterrors.NewErrorf(vtrpcpb.Code_CANCELED, vterrors.QueryInterrupted, "query timeout")} - _, err := sel.Execute(vc, map[string]*querypb.BindVariable{}, false) - require.EqualError(t, err, `query timeout`) - vc.ExpectWarnings(t, nil) - - vc.Rewind() - _, err = wrapStreamExecute(sel, vc, map[string]*querypb.BindVariable{}, false) - require.EqualError(t, err, `query timeout`) + t.Run("unsharded", func(t *testing.T) { + // Unsharded error + sel := NewRoute( + SelectUnsharded, + &vindexes.Keyspace{ + Name: "ks", + Sharded: false, + }, + "dummy_select", + "dummy_select_field", + ) - // Scatter fails if one of N fails without ScatterErrorsAsWarnings - sel = NewRoute( - SelectScatter, - &vindexes.Keyspace{ - Name: "ks", - Sharded: true, - }, - "dummy_select", - "dummy_select_field", - ) + vc := &loggingVCursor{shards: []string{"0"}, resultErr: vterrors.NewErrorf(vtrpcpb.Code_CANCELED, vterrors.QueryInterrupted, "query timeout")} + _, err := sel.Execute(vc, map[string]*querypb.BindVariable{}, false) + require.EqualError(t, err, `query timeout`) + assert.Empty(t, vc.warnings) - vc = &loggingVCursor{ - shards: []string{"-20", "20-"}, - results: []*sqltypes.Result{defaultSelectResult}, - multiShardErrs: []error{ - errors.New("result error -20"), - }, - } - _, err = sel.Execute(vc, map[string]*querypb.BindVariable{}, false) - require.EqualError(t, err, `result error -20`) - vc.ExpectWarnings(t, nil) - vc.ExpectLog(t, []string{ - `ResolveDestinations ks [] Destinations:DestinationAllShards()`, - `ExecuteMultiShard ks.-20: dummy_select {} ks.20-: dummy_select {} false false`, + vc.Rewind() + _, err = wrapStreamExecute(sel, vc, map[string]*querypb.BindVariable{}, false) + require.EqualError(t, err, `query timeout`) }) - // Scatter succeeds if all shards fail with ScatterErrorsAsWarnings - sel = NewRoute( - SelectScatter, - &vindexes.Keyspace{ - Name: "ks", - Sharded: true, - }, - "dummy_select", - "dummy_select_field", - ) - sel.ScatterErrorsAsWarnings = true - - vc = &loggingVCursor{ - shards: []string{"-20", "20-"}, - results: []*sqltypes.Result{defaultSelectResult}, - multiShardErrs: []error{ - mysql.NewSQLError(mysql.ERQueryInterrupted, "", "query timeout -20"), - errors.New("not a sql error 20-"), - }, - } - _, err = sel.Execute(vc, map[string]*querypb.BindVariable{}, false) - require.NoError(t, err, "unexpected ScatterErrorsAsWarnings error %v", err) + t.Run("normal route with no scatter errors as warnings", func(t *testing.T) { + // Scatter fails if one of N fails without ScatterErrorsAsWarnings + sel := NewRoute( + SelectScatter, + &vindexes.Keyspace{ + Name: "ks", + Sharded: true, + }, + "dummy_select", + "dummy_select_field", + ) - // Ensure that the error code is preserved from SQLErrors and that it - // turns into ERUnknownError for all others - vc.ExpectWarnings(t, []*querypb.QueryWarning{ - {Code: mysql.ERQueryInterrupted, Message: "query timeout -20 (errno 1317) (sqlstate HY000)"}, - {Code: mysql.ERUnknownError, Message: "not a sql error 20-"}, - }) - vc.ExpectLog(t, []string{ - `ResolveDestinations ks [] Destinations:DestinationAllShards()`, - `ExecuteMultiShard ks.-20: dummy_select {} ks.20-: dummy_select {} false false`, + vc := &loggingVCursor{ + shards: []string{"-20", "20-"}, + results: []*sqltypes.Result{defaultSelectResult}, + multiShardErrs: []error{ + errors.New("result error -20"), + }, + } + _, err := sel.Execute(vc, map[string]*querypb.BindVariable{}, false) + require.EqualError(t, err, `result error -20`) + vc.ExpectWarnings(t, nil) + vc.ExpectLog(t, []string{ + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `ExecuteMultiShard ks.-20: dummy_select {} ks.20-: dummy_select {} false false`, + }) }) - vc.Rewind() - vc.results = nil - vc.resultErr = mysql.NewSQLError(mysql.ERQueryInterrupted, "", "query timeout -20") - _, err = wrapStreamExecute(sel, vc, map[string]*querypb.BindVariable{}, false) - require.NoError(t, err, "unexpected ScatterErrorsAsWarnings error %v", err) - vc.ExpectWarnings(t, []*querypb.QueryWarning{{Code: mysql.ERQueryInterrupted, Message: "query timeout -20 (errno 1317) (sqlstate HY000)"}}) - - // Scatter succeeds if one of N fails with ScatterErrorsAsWarnings - sel = NewRoute( - SelectScatter, - &vindexes.Keyspace{ - Name: "ks", - Sharded: true, - }, - "dummy_select", - "dummy_select_field", - ) - sel.ScatterErrorsAsWarnings = true + t.Run("ScatterErrorsAsWarnings", func(t *testing.T) { + // Scatter succeeds if one of N fails with ScatterErrorsAsWarnings + sel := NewRoute( + SelectScatter, + &vindexes.Keyspace{ + Name: "ks", + Sharded: true, + }, + "dummy_select", + "dummy_select_field", + ) + sel.ScatterErrorsAsWarnings = true + + vc := &loggingVCursor{ + shards: []string{"-20", "20-"}, + results: []*sqltypes.Result{defaultSelectResult}, + multiShardErrs: []error{ + errors.New("result error -20"), + nil, + }, + } + result, err := sel.Execute(vc, map[string]*querypb.BindVariable{}, false) + require.NoError(t, err, "unexpected ScatterErrorsAsWarnings error %v", err) + vc.ExpectLog(t, []string{ + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `ExecuteMultiShard ks.-20: dummy_select {} ks.20-: dummy_select {} false false`, + }) + expectResult(t, "sel.Execute", result, defaultSelectResult) - vc = &loggingVCursor{ - shards: []string{"-20", "20-"}, - results: []*sqltypes.Result{defaultSelectResult}, - multiShardErrs: []error{ - errors.New("result error -20"), - nil, - }, - } - result, err := sel.Execute(vc, map[string]*querypb.BindVariable{}, false) - require.NoError(t, err, "unexpected ScatterErrorsAsWarnings error %v", err) - vc.ExpectLog(t, []string{ - `ResolveDestinations ks [] Destinations:DestinationAllShards()`, - `ExecuteMultiShard ks.-20: dummy_select {} ks.20-: dummy_select {} false false`, + vc.Rewind() + vc.resultErr = mysql.NewSQLError(mysql.ERQueryInterrupted, "", "query timeout -20") + // test when there is order by column + sel.OrderBy = []OrderbyParams{{ + WeightStringCol: -1, + Col: 0, + }} + _, err = wrapStreamExecute(sel, vc, map[string]*querypb.BindVariable{}, false) + require.NoError(t, err, "unexpected ScatterErrorsAsWarnings error %v", err) + vc.ExpectWarnings(t, []*querypb.QueryWarning{{Code: mysql.ERQueryInterrupted, Message: "query timeout -20 (errno 1317) (sqlstate HY000)"}}) }) - expectResult(t, "sel.Execute", result, defaultSelectResult) - - vc.Rewind() - vc.resultErr = mysql.NewSQLError(mysql.ERQueryInterrupted, "", "query timeout -20") - // test when there is order by column - sel.OrderBy = []OrderbyParams{{ - WeightStringCol: -1, - Col: 0, - }} - _, err = wrapStreamExecute(sel, vc, map[string]*querypb.BindVariable{}, false) - require.NoError(t, err, "unexpected ScatterErrorsAsWarnings error %v", err) - vc.ExpectWarnings(t, []*querypb.QueryWarning{{Code: mysql.ERQueryInterrupted, Message: "query timeout -20 (errno 1317) (sqlstate HY000)"}}) } From 6447c84b4ea51598e5891a669d83527aa63b57b8 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Wed, 19 May 2021 11:29:26 +0530 Subject: [PATCH 12/17] fix error checking Signed-off-by: Harshit Gangal --- go/test/endtoend/vtgate/errors_as_warnings/main_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/go/test/endtoend/vtgate/errors_as_warnings/main_test.go b/go/test/endtoend/vtgate/errors_as_warnings/main_test.go index 96bb59c4ee9..21438bbf7e0 100644 --- a/go/test/endtoend/vtgate/errors_as_warnings/main_test.go +++ b/go/test/endtoend/vtgate/errors_as_warnings/main_test.go @@ -141,13 +141,15 @@ func TestScatterErrsAsWarns(t *testing.T) { checkedExec(t, mode.conn, fmt.Sprintf("set workload = %s", mode.m)) assertMatches(t, mode.conn, query1, `[[INT64(4)]]`) - assertContainsOneOf(t, mode.conn, showQ, "no valid tablet", "no healthy tablet") + assertContainsOneOf(t, mode.conn, showQ, "no valid tablet", "no healthy tablet", "mysql.sock: connect: no such file or directory") assertMatches(t, mode.conn, query2, `[[INT64(4)]]`) - assertContainsOneOf(t, mode.conn, showQ, "no valid tablet", "no healthy tablet") + assertContainsOneOf(t, mode.conn, showQ, "no valid tablet", "no healthy tablet", "mysql.sock: connect: no such file or directory") // invalid_field should throw error and not warning _, err = mode.conn.ExecuteFetch("SELECT /*vt+ SCATTER_ERRORS_AS_WARNINGS */ invalid_field from t1;", 1, false) - require.EqualError(t, err, "target: test_keyspace.0.master: vttablet: rpc error: code = NotFound desc = Unknown column 'invalid_field' in 'field list' (errno 1054) (sqlstate 42S22) (CallerID: vtgate client 1): Sql: \"select /*vt+ SCATTER_ERRORS_AS_WARNINGS */ invalid_field from vt_insert_test\", BindVars: {} (errno 1054) (sqlstate 42S22) during query: SELECT /*vt+ SCATTER_ERRORS_AS_WARNINGS */ invalid_field from vt_insert_test") + require.Error(t, err) + serr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) + require.Equal(t, 1054, serr.Number()) }) } } From 1bade57badf91905c5235acf8c2ca0f11b028938 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Wed, 19 May 2021 08:26:18 +0200 Subject: [PATCH 13/17] codegen Signed-off-by: Andres Taylor --- go/vt/vtgate/engine/cached_size.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/go/vt/vtgate/engine/cached_size.go b/go/vt/vtgate/engine/cached_size.go index 29f2b923b6b..e3d585eb755 100644 --- a/go/vt/vtgate/engine/cached_size.go +++ b/go/vt/vtgate/engine/cached_size.go @@ -661,6 +661,18 @@ func (cached *Send) CachedSize(alloc bool) int64 { size += int64(len(cached.Query)) return size } +func (cached *SessionPrimitive) CachedSize(alloc bool) int64 { + if cached == nil { + return int64(0) + } + size := int64(0) + if alloc { + size += int64(24) + } + // field name string + size += int64(len(cached.name)) + return size +} func (cached *Set) CachedSize(alloc bool) int64 { if cached == nil { return int64(0) From 6fe8c55e6fb00047f1b66f38659257d94877dc6b Mon Sep 17 00:00:00 2001 From: GuptaManan100 Date: Wed, 19 May 2021 14:25:10 +0530 Subject: [PATCH 14/17] fixed an end to end test Signed-off-by: GuptaManan100 --- go/test/endtoend/mysqlserver/main_test.go | 15 +++++++++++- .../endtoend/mysqlserver/mysql_server_test.go | 23 +++++++++---------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/go/test/endtoend/mysqlserver/main_test.go b/go/test/endtoend/mysqlserver/main_test.go index 16afa7f96b2..83ce6eda40b 100644 --- a/go/test/endtoend/mysqlserver/main_test.go +++ b/go/test/endtoend/mysqlserver/main_test.go @@ -41,7 +41,14 @@ var ( keyspace_id bigint(20) unsigned NOT NULL, data longblob, primary key (id) - ) Engine=InnoDB` + ) Engine=InnoDB; +` + createProcSQL = `use vt_test_keyspace; +CREATE PROCEDURE testing() +BEGIN + delete from vt_insert_test; +END; +` ) func TestMain(m *testing.M) { @@ -98,6 +105,7 @@ func TestMain(m *testing.M) { "-mysql_auth_server_impl", "static", "-mysql_auth_server_static_file", clusterInstance.TmpDirectory + mysqlAuthServerStatic, "-mysql_server_version", "8.0.16-7", + "-warn_sharded_only=true", } clusterInstance.VtTabletExtraArgs = []string{ @@ -126,6 +134,11 @@ func TestMain(m *testing.M) { Pass: "testpassword1", } + masterProcess := clusterInstance.Keyspaces[0].Shards[0].MasterTablet().VttabletProcess + if _, err := masterProcess.QueryTablet(createProcSQL, keyspaceName, false); err != nil { + return 1, err + } + return m.Run(), nil }() if err != nil { diff --git a/go/test/endtoend/mysqlserver/mysql_server_test.go b/go/test/endtoend/mysqlserver/mysql_server_test.go index 8eb4cc0ec9e..17271dde980 100644 --- a/go/test/endtoend/mysqlserver/mysql_server_test.go +++ b/go/test/endtoend/mysqlserver/mysql_server_test.go @@ -146,7 +146,7 @@ func TestWarnings(t *testing.T) { defer conn.Close() // validate warning with invalid_field error as warning - qr, err := conn.ExecuteFetch("SELECT /*vt+ SCATTER_ERRORS_AS_WARNINGS */ invalid_field from vt_insert_test;", 1, false) + qr, err := conn.ExecuteFetch("CALL testing()", 1, false) require.NoError(t, err) assert.Empty(t, qr.Rows, "number of rows") @@ -154,22 +154,21 @@ func TestWarnings(t *testing.T) { require.NoError(t, err, "SHOW WARNINGS") assert.EqualValues(t, 1, len(qr.Rows), "number of rows") assert.Contains(t, qr.Rows[0][0].String(), "VARCHAR(\"Warning\")", qr.Rows) - assert.Contains(t, qr.Rows[0][1].String(), "UINT16(1054)", qr.Rows) - assert.Contains(t, qr.Rows[0][2].String(), "Unknown column", qr.Rows) + assert.Contains(t, qr.Rows[0][1].String(), "UINT16(1235)", qr.Rows) + assert.Contains(t, qr.Rows[0][2].String(), "'CALL' not supported in sharded mode", qr.Rows) - // validate warning with query_timeout error as warning - qr, err = conn.ExecuteFetch("SELECT /*vt+ SCATTER_ERRORS_AS_WARNINGS QUERY_TIMEOUT_MS=1 */ sleep(1) from vt_insert_test;", 1, false) + // validate with 0 warnings + _, err = conn.ExecuteFetch("SELECT 1 from vt_insert_test limit 1", 1, false) require.NoError(t, err) - assert.Empty(t, qr.Rows, "number of rows") qr, err = conn.ExecuteFetch("SHOW WARNINGS;", 1, false) - require.NoError(t, err, "SHOW WARNINGS") - assert.EqualValues(t, 1, len(qr.Rows), "number of rows") - assert.Contains(t, qr.Rows[0][0].String(), "VARCHAR(\"Warning\")", qr.Rows) - assert.Contains(t, qr.Rows[0][1].String(), "UINT16(1317)", qr.Rows) - assert.Contains(t, qr.Rows[0][2].String(), "context deadline exceeded", qr.Rows) + require.NoError(t, err) + assert.Empty(t, len(qr.Rows), "number of rows") - // validate with 0 warnings + // verify that show warnings are empty if another statement is run before calling it + qr, err = conn.ExecuteFetch("CALL testing()", 1, false) + require.NoError(t, err) + assert.Empty(t, qr.Rows, "number of rows") _, err = conn.ExecuteFetch("SELECT 1 from vt_insert_test limit 1", 1, false) require.NoError(t, err) From 2cd845c3e56ef81f6ae5fe8b682fcc2c5171e3b9 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Thu, 20 May 2021 06:32:12 +0200 Subject: [PATCH 15/17] address review comments Signed-off-by: Andres Taylor --- .../endtoend/mysqlserver/mysql_server_test.go | 6 +- go/test/endtoend/vtgate/main_test.go | 7 +- go/vt/vtgate/engine/merge_sort.go | 82 +++++++++++++------ go/vt/vtgate/engine/route.go | 11 +-- go/vt/vtgate/engine/session_primitive.go | 4 +- go/vt/vtgate/executor_select_test.go | 8 +- 6 files changed, 73 insertions(+), 45 deletions(-) diff --git a/go/test/endtoend/mysqlserver/mysql_server_test.go b/go/test/endtoend/mysqlserver/mysql_server_test.go index 17271dde980..9fb6344778b 100644 --- a/go/test/endtoend/mysqlserver/mysql_server_test.go +++ b/go/test/endtoend/mysqlserver/mysql_server_test.go @@ -145,7 +145,7 @@ func TestWarnings(t *testing.T) { require.NoError(t, err) defer conn.Close() - // validate warning with invalid_field error as warning + // using CALL will produce a warning saying this only works in unsharded qr, err := conn.ExecuteFetch("CALL testing()", 1, false) require.NoError(t, err) assert.Empty(t, qr.Rows, "number of rows") @@ -163,7 +163,7 @@ func TestWarnings(t *testing.T) { qr, err = conn.ExecuteFetch("SHOW WARNINGS;", 1, false) require.NoError(t, err) - assert.Empty(t, len(qr.Rows), "number of rows") + assert.Empty(t, qr.Rows) // verify that show warnings are empty if another statement is run before calling it qr, err = conn.ExecuteFetch("CALL testing()", 1, false) @@ -174,7 +174,7 @@ func TestWarnings(t *testing.T) { qr, err = conn.ExecuteFetch("SHOW WARNINGS;", 1, false) require.NoError(t, err) - assert.Empty(t, len(qr.Rows), "number of rows") + assert.Empty(t, qr.Rows) } // TestSelectWithUnauthorizedUser verifies that an unauthorized user diff --git a/go/test/endtoend/vtgate/main_test.go b/go/test/endtoend/vtgate/main_test.go index 3947809b41a..07099641c2a 100644 --- a/go/test/endtoend/vtgate/main_test.go +++ b/go/test/endtoend/vtgate/main_test.go @@ -419,13 +419,10 @@ func TestMain(m *testing.M) { } // Start vtgate - vtgateInstance := clusterInstance.NewVtgateInstance() - vtgateInstance.TabletTypesToWait = "MASTER,REPLICA,RDONLY" - if err := vtgateInstance.Setup(); err != nil { + err = clusterInstance.StartVtgate() + if err != nil { return 1 } - // ensure it is torn down during cluster TearDown - clusterInstance.VtgateProcess = *vtgateInstance vtParams = mysql.ConnParams{ Host: clusterInstance.Hostname, diff --git a/go/vt/vtgate/engine/merge_sort.go b/go/vt/vtgate/engine/merge_sort.go index 476cb8e4e1f..83dc64fae33 100644 --- a/go/vt/vtgate/engine/merge_sort.go +++ b/go/vt/vtgate/engine/merge_sort.go @@ -20,6 +20,8 @@ import ( "container/heap" "io" + "vitess.io/vitess/go/mysql" + "context" "vitess.io/vitess/go/sqltypes" @@ -77,34 +79,22 @@ func (ms *MergeSort) GetFields(vcursor VCursor, bindVars map[string]*querypb.Bin func (ms *MergeSort) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { ctx, cancel := context.WithCancel(vcursor.Context()) defer cancel() - + gotFields := wantfields handles := make([]*streamHandle, len(ms.Primitives)) for i, input := range ms.Primitives { - handles[i] = runOneStream(ctx, vcursor, input, bindVars, wantfields) - // Need fields only from first handle, if wantfields was true. - wantfields = false - } - - var fields []*querypb.Field - if ms.ScatterErrorsAsWarnings { - for _, handle := range handles { - // Fetch field info from just one stream. - fields = <-handle.fields - // If fields is nil, it means there was an error. - if fields != nil { - break - } + handles[i] = runOneStream(ctx, vcursor, input, bindVars, gotFields) + if !ms.ScatterErrorsAsWarnings { + // we only need the fields from the first input, unless we allow ScatterErrorsAsWarnings. + // in that case, we need to ask all the inputs for fields - we don't know which will return anything + gotFields = false } - } else { - // Fetch field info from just one stream. - fields = <-handles[0].fields - } - if fields == nil { - return handles[0].err } - if err := callback(&sqltypes.Result{Fields: fields}); err != nil { - return err + if wantfields { + err := ms.getStreamingFields(handles, callback) + if err != nil { + return err + } } comparers := extractSlices(ms.OrderBy) @@ -172,7 +162,51 @@ func (ms *MergeSort) StreamExecute(vcursor VCursor, bindVars map[string]*querypb return ctx.Err() } } - return vterrors.Aggregate(errs) + + err := vterrors.Aggregate(errs) + if err != nil && ms.ScatterErrorsAsWarnings && len(errs) < len(handles) { + // we got errors, but not all shards failed, so we can hide the error and just warn instead + partialSuccessScatterQueries.Add(1) + sErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) + vcursor.Session().RecordWarning(&querypb.QueryWarning{Code: uint32(sErr.Num), Message: err.Error()}) + return nil + } + return err +} + +func (ms *MergeSort) getStreamingFields(handles []*streamHandle, callback func(*sqltypes.Result) error) error { + var fields []*querypb.Field + + if ms.ScatterErrorsAsWarnings { + for _, handle := range handles { + // Fetch field info from just one stream. + fields = <-handle.fields + // If fields is nil, it means there was an error. + if fields != nil { + break + } + } + } else { + // Fetch field info from just one stream. + fields = <-handles[0].fields + } + if fields == nil { + // something went wrong. need to figure out where the error can be + if !ms.ScatterErrorsAsWarnings { + return handles[0].err + } + + var errs []error + for _, handle := range handles { + errs = append(errs, handle.err) + } + return vterrors.Aggregate(errs) + } + + if err := callback(&sqltypes.Result{Fields: fields}); err != nil { + return err + } + return nil } func (ms *MergeSort) description() PrimitiveDescription { diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index f62b9aaeb6d..f3f169bc477 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -390,18 +390,9 @@ func (route *Route) mergeSort(vcursor VCursor, bindVars map[string]*querypb.Bind OrderBy: route.OrderBy, ScatterErrorsAsWarnings: route.ScatterErrorsAsWarnings, } - err := ms.StreamExecute(vcursor, bindVars, wantfields, func(qr *sqltypes.Result) error { + return ms.StreamExecute(vcursor, bindVars, wantfields, func(qr *sqltypes.Result) error { return callback(qr.Truncate(route.TruncateColumnCount)) }) - if err != nil { - if !route.ScatterErrorsAsWarnings { - return err - } - partialSuccessScatterQueries.Add(1) - sErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) - vcursor.Session().RecordWarning(&querypb.QueryWarning{Code: uint32(sErr.Num), Message: err.Error()}) - } - return nil } // GetFields fetches the field info. diff --git a/go/vt/vtgate/engine/session_primitive.go b/go/vt/vtgate/engine/session_primitive.go index 22e17e3b74c..f32d2e5f3ae 100644 --- a/go/vt/vtgate/engine/session_primitive.go +++ b/go/vt/vtgate/engine/session_primitive.go @@ -19,6 +19,8 @@ package engine import ( "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" ) // SessionPrimitive the session primitive is a very small primitive used @@ -72,7 +74,7 @@ func (s *SessionPrimitive) StreamExecute(vcursor VCursor, _ map[string]*querypb. // GetFields implements the Primitive interface func (s *SessionPrimitive) GetFields(_ VCursor, _ map[string]*querypb.BindVariable) (*sqltypes.Result, error) { - panic("implement me") + return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "not supported for this primitive") } // description implements the Primitive interface diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 0c9834dd79f..9813a66f9c1 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -111,7 +111,6 @@ func TestSelectDBA(t *testing.T) { "__vtschemaname": sqltypes.StringBindVariable("vt_ks"), }}} utils.MustMatch(t, wantQueries, sbc1.Queries) - } func TestUnsharded(t *testing.T) { @@ -1133,8 +1132,10 @@ func TestSelectScatterPartial(t *testing.T) { _, err = executorExec(executor, "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user", nil) require.Error(t, err) - testQueryLog(t, logChan, "TestExecute", "SELECT", "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user", 8) + + _, err = executorExec(executor, "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user order by id", nil) + require.Error(t, err) } func TestSelectScatterPartialOLAP(t *testing.T) { @@ -1183,6 +1184,9 @@ func TestSelectScatterPartialOLAP(t *testing.T) { _, err = executorStream(executor, "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user") require.Error(t, err) testQueryLog(t, logChan, "TestExecuteStream", "SELECT", "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user", 8) + + _, err = executorStream(executor, "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ id from user order by id") + require.Error(t, err) } func TestSelectScatterPartialOLAP2(t *testing.T) { From 721c070dfb9b7a4f0d33cdc03d3010d930fc2953 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Thu, 20 May 2021 07:53:10 +0200 Subject: [PATCH 16/17] make test less fragile Signed-off-by: Andres Taylor --- go/vt/vtgate/engine/route_test.go | 2 +- go/vt/vtgate/executor_select_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/go/vt/vtgate/engine/route_test.go b/go/vt/vtgate/engine/route_test.go index cc160b241df..e1a562b4d9e 100644 --- a/go/vt/vtgate/engine/route_test.go +++ b/go/vt/vtgate/engine/route_test.go @@ -1098,7 +1098,7 @@ func TestRouteStreamSortTruncate(t *testing.T) { ), }, } - result, err := wrapStreamExecute(sel, vc, map[string]*querypb.BindVariable{}, false) + result, err := wrapStreamExecute(sel, vc, map[string]*querypb.BindVariable{}, true) require.NoError(t, err) vc.ExpectLog(t, []string{ `ResolveDestinations ks [] Destinations:DestinationAnyShard()`, diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 9813a66f9c1..f8b9eb25e5a 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -1220,7 +1220,8 @@ func TestSelectScatterPartialOLAP2(t *testing.T) { sbc0Th.Serving = false results, err := executorStream(executor, "select id from user") - assert.EqualError(t, err, `target: TestExecutor.40-60.master: no healthy tablet available for 'keyspace:"TestExecutor" shard:"40-60" tablet_type:MASTER '`) + require.Error(t, err) + assert.Contains(t, err.Error(), `no healthy tablet available for 'keyspace:"TestExecutor" shard:"40-60"`) assert.Equal(t, vtrpcpb.Code_UNAVAILABLE, vterrors.Code(err)) assert.Nil(t, results) testQueryLog(t, logChan, "TestExecuteStream", "SELECT", "select id from user", 8) From 7c85cedba6dc62cdfd7bfa3750ce3305495687d8 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Thu, 20 May 2021 07:57:47 +0200 Subject: [PATCH 17/17] ask for fields if we depend on them Signed-off-by: Andres Taylor --- go/vt/vtgate/engine/ordered_aggregate.go | 4 +++- go/vt/wrangler/vdiff.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/go/vt/vtgate/engine/ordered_aggregate.go b/go/vt/vtgate/engine/ordered_aggregate.go index e67239004bb..9da023aaa65 100644 --- a/go/vt/vtgate/engine/ordered_aggregate.go +++ b/go/vt/vtgate/engine/ordered_aggregate.go @@ -379,7 +379,9 @@ func (oa *OrderedAggregate) merge(fields []*querypb.Field, row1, row2 []sqltypes var err error switch aggr.Opcode { case AggregateCount, AggregateSum: - result[aggr.Col] = evalengine.NullsafeAdd(row1[aggr.Col], row2[aggr.Col], fields[aggr.Col].Type) + value := row1[aggr.Col] + v2 := row2[aggr.Col] + result[aggr.Col] = evalengine.NullsafeAdd(value, v2, fields[aggr.Col].Type) case AggregateMin: result[aggr.Col], err = evalengine.Min(row1[aggr.Col], row2[aggr.Col]) case AggregateMax: diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index e08a1b6cb01..672ecc3b632 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -819,7 +819,7 @@ func newPrimitiveExecutor(ctx context.Context, prim engine.Primitive) *primitive vcursor := &contextVCursor{ctx: ctx} go func() { defer close(pe.resultch) - pe.err = pe.prim.StreamExecute(vcursor, make(map[string]*querypb.BindVariable), false, func(qr *sqltypes.Result) error { + pe.err = pe.prim.StreamExecute(vcursor, make(map[string]*querypb.BindVariable), true, func(qr *sqltypes.Result) error { select { case pe.resultch <- qr: case <-ctx.Done():