Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
117692: changefeedccl: introduce quota to parallelio r=jayshrivastava a=jayshrivastava

Problem:

In this (#111829) investigation, it was observed that there was a lot of wasted CPU
performing unions in intsets. This is caused by the `parallelIO` struct which relies
on performing unions to check for conflicting keys. The problem is not with intsets being
slow, but it has to do with how they are used:

`parallelIO` uses a goroutine to both process incoming requests and emit outgoing results.
It accepts incoming requests unconditionally, enqueing them if they cannot be emitted
due to conflicting in flight requests. As outgoing results are processed, each outgoing result
is cross checked with, in the worst case, all enqueued requests to see if they can be emitted.
The cross checking requires unions.

```
incoming request -> request queue -> request handler -> result
                        ^                                 ^
                        | cross check all entries to see  |
                        | if a new request can be emitted |
```

A problem arises when the incoming request queue grows to some critical length where
it significantly slows down the cross checking. This slows down result processing and
ultimately slows down consumption from the request queue. This creates a negative feedback loop
which causes the request queue to grow so large that results take very long to process. This
creates a bottle neck, which throttles the entire changefeed.

See comments #115536 for more details.

The request queue is unbounded. The only reason it doesn't cause an OOM is because the incoming
requests are bounded (by the per-changefeed memory limit).

Solution:

This change solves this problem by setting a quota for the maximum events being processed by the
library at the same time. This change sets a size of 128 requests by default. This setting can be
changed using a new cluster setting `changefeed.parallel_io.request_quota`.

Before this change, the API for the parallelio library was very bad. It required the caller to
select on both the request channel and result channel to prevent deadlock. There were also no
public methods. This made it unclear how to properly use the API. This change makes an explicit
API with public methods. However, it keeps the same 2-channel scheme because removing that would
require a larger refactor. This is left as a TODO.

Closes: #115536
Release note: None
Epic: None

118288: roachtest: make ruby-pg test work on Ubuntu 22.04 r=rafiss a=rafiss

This required updating the version under test, which led to a few new test failures that we track now.

fixes #112109
Release note: None

Co-authored-by: Jayant Shrivastava <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
3 people committed Jan 29, 2024
3 parents 8765e8a + cb674c2 + 167f66a commit 0cb4b7e
Show file tree
Hide file tree
Showing 10 changed files with 515 additions and 250 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ go_library(
"//pkg/util/mon",
"//pkg/util/parquet",
"//pkg/util/protoutil",
"//pkg/util/quotapool",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/span",
Expand Down
67 changes: 46 additions & 21 deletions pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/intsets"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

// SinkClient is an interface to an external sink, where messages are written
Expand Down Expand Up @@ -66,9 +69,10 @@ type batchingSink struct {
minFlushFrequency time.Duration
retryOpts retry.Options

ts timeutil.TimeSource
metrics metricsRecorder
knobs batchingSinkKnobs
ts timeutil.TimeSource
metrics metricsRecorder
settings *cluster.Settings
knobs batchingSinkKnobs

// eventCh is the channel used to send requests from the Sink caller routines
// to the batching routine. Messages can either be a flushReq or a rowEvent.
Expand Down Expand Up @@ -157,7 +161,7 @@ func freeRowEvent(e *rowEvent) {
eventPool.Put(e)
}

var batchPool sync.Pool = sync.Pool{
var batchPool = sync.Pool{
New: func() interface{} {
return new(sinkBatch)
},
Expand Down Expand Up @@ -331,19 +335,20 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
s.metrics.recordSinkIOInflightChange(int64(batch.numMessages))
return s.client.Flush(ctx, batch.payload)
}
ioEmitter := newParallelIO(ctx, s.retryOpts, s.ioWorkers, ioHandler, s.metrics)
ioEmitter := NewParallelIO(ctx, s.retryOpts, s.ioWorkers, ioHandler, s.metrics, s.settings)
defer ioEmitter.Close()

// Flushing requires tracking the number of inflight messages and confirming
// completion to the requester once the counter reaches 0.
inflight := 0
var sinkFlushWaiter chan struct{}

handleResult := func(result *ioResult) {
batch, _ := result.request.(*sinkBatch)
handleResult := func(result IOResult) {
req, err := result.Consume()
batch, _ := req.(*sinkBatch)

if result.err != nil {
s.handleError(result.err)
if err != nil {
s.handleError(err)
} else {
s.metrics.recordEmittedBatch(
batch.bufferTime, batch.numMessages, batch.mvcc, batch.numKVBytes, sinkDoesNotCompress,
Expand All @@ -352,12 +357,11 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {

inflight -= batch.numMessages

if (result.err != nil || inflight == 0) && sinkFlushWaiter != nil {
if (err != nil || inflight == 0) && sinkFlushWaiter != nil {
close(sinkFlushWaiter)
sinkFlushWaiter = nil
}

freeIOResult(result)
batch.alloc.Release(ctx)
freeSinkBatchEvent(batch)
}
Expand All @@ -373,22 +377,41 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
return err
}

// Emitting needs to also handle any incoming results to avoid a deadlock
// with trying to emit while the emitter is blocked on returning a result.
for {
req, send, err := ioEmitter.AdmitRequest(ctx, batchBuffer)
if errors.Is(err, ErrNotEnoughQuota) {
// Quota can only be freed by consuming a result.
select {
case <-ctx.Done():
return ctx.Err()
case ioEmitter.requestCh <- batchBuffer:
case result := <-ioEmitter.resultCh:
handleResult(result)
continue
case <-s.doneCh:
return nil
case result := <-ioEmitter.GetResult():
handleResult(result)
}
break

// The request should be emitted after freeing quota since this is
// a single producer scenario.
req, send, err = ioEmitter.AdmitRequest(ctx, batchBuffer)
if errors.Is(err, ErrNotEnoughQuota) {
logcrash.ReportOrPanic(ctx, &s.settings.SV, "expected request to be emitted after waiting for quota")
return errors.AssertionFailedf("expected request to be emitted after waiting for quota")
} else if err != nil {
return err
}
} else if err != nil {
return err
}

return nil
// The request was admitted, it must be sent. There are no concurrent requests being sent which
// would use up the quota.
select {
case <-ctx.Done():
return ctx.Err()
case <-s.doneCh:
return nil
case send <- req:
return nil
}
}

flushAll := func() error {
Expand Down Expand Up @@ -478,7 +501,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
default:
s.handleError(fmt.Errorf("received unknown request of unknown type: %v", r))
}
case result := <-ioEmitter.resultCh:
case result := <-ioEmitter.GetResult():
handleResult(result)
case <-flushTimer.Ch():
flushTimer.MarkRead()
Expand All @@ -505,6 +528,7 @@ func makeBatchingSink(
pacerFactory func() *admission.Pacer,
timeSource timeutil.TimeSource,
metrics metricsRecorder,
settings *cluster.Settings,
) Sink {
sink := &batchingSink{
client: client,
Expand All @@ -515,6 +539,7 @@ func makeBatchingSink(
retryOpts: retryOpts,
ts: timeSource,
metrics: metrics,
settings: settings,
eventCh: make(chan interface{}, flushQueueDepth),
wg: ctxgroup.WithContext(ctx),
hasher: makeHasher(),
Expand Down
Loading

0 comments on commit 0cb4b7e

Please sign in to comment.