Skip to content

Commit

Permalink
consumer(cdc): fix unstable test move_table (#5086)
Browse files Browse the repository at this point in the history
close #2887, close #5090, close #5101
  • Loading branch information
3AceShowHand authored Apr 7, 2022
1 parent 5b9eddd commit 361aa92
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 53 deletions.
2 changes: 1 addition & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
// This condition implies that the DDL resolved-ts has not yet reached checkpointTs,
// which implies that it would be premature to schedule tables or to update status.
// So we return here.
log.Info("barrierTs < checkpointTs, premature to schedule tables or update status",
log.Debug("barrierTs < checkpointTs, premature to schedule tables or update status",
zap.String("changefeed", c.id),
zap.Uint64("barrierTs", barrierTs), zap.Uint64("checkpointTs", checkpointTs))
return nil
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,14 +322,14 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {
}

func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (bool, error) {
if n.status == TableStatusStopped {
if n.status.Load() == TableStatusStopped {
return false, cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs()
}
switch msg.Tp {
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
if event.RawKV.OpType == model.OpTypeResolved {
if n.status == TableStatusInitializing {
if n.status.Load() == TableStatusInitializing {
n.status.Store(TableStatusRunning)
}
failpoint.Inject("ProcessorSyncResolvedError", func() {
Expand Down
6 changes: 2 additions & 4 deletions cdc/sink/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ func (c *UnresolvedTxnCache) Append(filter *filter.Filter, rows ...*model.RowCha
// fail-fast check
if len(txns) != 0 && txns[len(txns)-1].commitTs > row.CommitTs {
log.Panic("the commitTs of the emit row is less than the received row",
zap.Stringer("table", row.Table),
zap.Uint64("emit row startTs", row.StartTs),
zap.Uint64("emit row commitTs", row.CommitTs),
zap.Uint64("last received row commitTs", txns[len(txns)-1].commitTs))
zap.Uint64("lastReceivedCommitTs", txns[len(txns)-1].commitTs),
zap.Any("row", row))
}
txns = append(txns, &txnsWithTheSameCommitTs{
commitTs: row.CommitTs,
Expand Down
3 changes: 2 additions & 1 deletion cdc/sink/producer/kafka/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ func (sm *saramaMetricsMonitor) collectBrokers() {
"use historical brokers to collect kafka broker level metrics",
zap.String("changefeed", sm.changefeedID),
zap.Any("role", sm.role),
zap.Duration("duration", time.Since(start)))
zap.Duration("duration", time.Since(start)),
zap.Error(err))
return
}

Expand Down
68 changes: 58 additions & 10 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"os"
"os/signal"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -453,9 +454,37 @@ func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

type eventsGroup struct {
events []*model.RowChangedEvent
}

func newEventsGroup() *eventsGroup {
return &eventsGroup{
events: make([]*model.RowChangedEvent, 0),
}
}

func (g *eventsGroup) Append(e *model.RowChangedEvent) {
g.events = append(g.events, e)
}

func (g *eventsGroup) Resolve(resolveTs uint64) []*model.RowChangedEvent {
sort.Slice(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
})

i := sort.Search(len(g.events), func(i int) bool {
return g.events[i].CommitTs > resolveTs
})
result := g.events[:i]
g.events = g.events[i:]

return result
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := context.TODO()
ctx := context.Background()
partition := claim.Partition()
c.sinksMu.Lock()
sink := c.sinks[partition]
Expand All @@ -464,6 +493,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
panic("sink should initialized")
}

eventGroups := make(map[int64]*eventsGroup)
for message := range claim.Messages() {
var (
decoder codec.EventBatchDecoder
Expand Down Expand Up @@ -547,15 +577,16 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
if row.Table.IsPartition {
partitionID = row.Table.TableID
}
row.Table.TableID = c.fakeTableIDGenerator.generateFakeTableID(row.Table.Schema, row.Table.Table, partitionID)
err = sink.EmitRowChangedEvents(ctx, row)
if err != nil {
log.Panic("emit row changed event failed", zap.Error(err))
}
lastCommitTs, ok := sink.tablesMap.Load(row.Table.TableID)
if !ok || lastCommitTs.(uint64) < row.CommitTs {
sink.tablesMap.Store(row.Table.TableID, row.CommitTs)
tableID := c.fakeTableIDGenerator.
generateFakeTableID(row.Table.Schema, row.Table.Table, partitionID)
row.Table.TableID = tableID

group, ok := eventGroups[tableID]
if !ok {
group = newEventsGroup()
eventGroups[tableID] = group
}
group.Append(row)
case model.MqMessageTypeResolved:
ts, err := decoder.NextResolvedEvent()
if err != nil {
Expand All @@ -570,7 +601,24 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
zap.Int32("partition", partition))
}
if ts > resolvedTs {
log.Info("update sink resolved ts",
for tableID, group := range eventGroups {
events := group.Resolve(ts)
if len(events) == 0 {
continue
}
if err := sink.EmitRowChangedEvents(ctx, events...); err != nil {
log.Panic("emit row changed event failed",
zap.Any("events", events),
zap.Error(err),
zap.Int32("partition", partition))
}
commitTs := events[len(events)-1].CommitTs
lastCommitTs, ok := sink.tablesMap.Load(tableID)
if !ok || lastCommitTs.(uint64) < commitTs {
sink.tablesMap.Store(tableID, commitTs)
}
}
log.Debug("update sink resolved ts",
zap.Uint64("ts", ts),
zap.Int32("partition", partition))
atomic.StoreUint64(&sink.resolvedTs, ts)
Expand Down
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.18
require (
github.com/BurntSushi/toml v0.3.1
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/Shopify/sarama v1.32.0
github.com/Shopify/sarama v1.29.0
github.com/apache/pulsar-client-go v0.6.0
github.com/aws/aws-sdk-go v1.35.3
github.com/benbjohnson/clock v1.3.0
Expand Down Expand Up @@ -50,7 +50,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20220303073211-00fea37feb66
github.com/pingcap/kvproto v0.0.0-20220328072018-6e75c12dbd73
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee
github.com/pingcap/tidb v1.1.0-beta.0.20220329110428-fbaaa1117936
github.com/pingcap/tidb v1.1.0-beta.0.20220407075432-5c6248d6ec88
github.com/pingcap/tidb-tools v6.0.0-alpha.0.20220317013353-dfc5146f4746+incompatible
github.com/pingcap/tidb/parser v0.0.0-20220329110428-fbaaa1117936
github.com/prometheus/client_golang v1.11.0
Expand All @@ -66,13 +66,13 @@ require (
github.com/swaggo/gin-swagger v1.2.0
github.com/swaggo/swag v1.6.6-0.20200529100950-7c765ddd0476
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
github.com/tikv/client-go/v2 v2.0.1-0.20220328083738-8489c3e8c3d9
github.com/tikv/client-go/v2 v2.0.1-0.20220329092050-6bf6951325ad
github.com/tikv/pd v1.1.0-beta.0.20220303060546-3695d8164800
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710
github.com/tinylib/msgp v1.1.6
github.com/uber-go/atomic v1.4.0
github.com/vmihailenco/msgpack/v5 v5.3.5
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/xdg/scram v1.0.3
go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v3 v3.5.2
Expand All @@ -85,7 +85,7 @@ require (
go.uber.org/zap v1.21.0
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20220318055525-2edf467146b5
golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f
golang.org/x/text v0.3.7
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
golang.org/x/tools v0.1.10 // indirect
Expand Down Expand Up @@ -133,7 +133,7 @@ require (
github.com/danieljoos/wincred v1.0.2 // indirect
github.com/danjacques/gofslock v0.0.0-20191023191349-0a45f885bc37 // indirect
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect
github.com/dgraph-io/ristretto v0.1.0 // indirect
github.com/dgraph-io/ristretto v0.1.1-0.20211108053508-297c39e6640f // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a // indirect
Expand Down Expand Up @@ -238,7 +238,7 @@ require (
github.com/valyala/fasttemplate v1.2.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f // indirect
github.com/xdg/stringprep v1.0.0 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/xitongsys/parquet-go v1.6.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
Expand Down
Loading

0 comments on commit 361aa92

Please sign in to comment.