Skip to content

Commit

Permalink
Merge #113896
Browse files Browse the repository at this point in the history
113896: pkg/workload: ensure `Close` gets called r=renatolabs,annrpom a=chrisseto

#### e2a8bbd pkg/workload: ensure `Close` gets called

Prior to this change, `QueryLoad.Close` would not be called if a
workload worker returned an error. This callback is often used to
perform cleanup or flush log files; actions which are even more
important to perform in the case of an error.

This commit hoists the execution of `Close` into a `defer` to ensure
they're appropriately called in all reasonable exit scenarios. It also
relocates the `schemachange` workload's usage of `PostRun` to
`QueryLoad.Close`.

Epic: CRDB-19168
Release note (cli change): workload commands now appropriately invoke
`.Close` in the case of an error.

Co-authored-by: Chris Seto <[email protected]>
  • Loading branch information
craig[bot] and chrisseto committed Nov 14, 2023
2 parents 8b5bffd + e2a8bbd commit ce3f78b
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pkg/bench/tpcc/subprocess_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ var (
pgURL, ok := envutil.EnvString(pgurlEnvVar, 0)
require.True(t, ok)
ql := makeQueryLoad(t, pgURL)
defer ql.Close(context.Background())
defer func() { _ = ql.Close(context.Background()) }()
eventAddr, ok := envutil.EnvString(eventEnvVar, 0)
require.True(t, ok)
sendEvent(t, eventAddr, runStartEvent)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/tenant_backup_nemesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestTenantBackupNemesis(t *testing.T) {
}
defer func() {
if ops.Close != nil {
ops.Close(ctx)
_ = ops.Close(ctx)
}
}()
fn := ops.WorkerFns[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,5 @@ func TestWorkload(t *testing.T) {
g.Go(workerFn(gCtx, ql.WorkerFns[i]))
}
require.NoError(t, g.Wait())
ql.Close(ctx)
require.NoError(t, ql.Close(ctx))
}
4 changes: 2 additions & 2 deletions pkg/workload/bank/bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ func (b *bank) Ops(

ql := workload.QueryLoad{
SQLDatabase: sqlDatabase,
Close: func(_ context.Context) {
_ = db.Close()
Close: func(_ context.Context) error {
return db.Close()
},
}
for i := 0; i < b.connFlags.Concurrency; i++ {
Expand Down
13 changes: 10 additions & 3 deletions pkg/workload/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,16 @@ func runRun(gen workload.Generator, urls []string, dbName string) error {
rampDone = make(chan struct{})
}

// If ops.Close is specified, defer it to ensure that it is run before
// exiting.
if ops.Close != nil {
defer func() {
if err := ops.Close(ctx); err != nil {
fmt.Printf("failed .Close: %v\n", err)
}
}()
}

workersCtx, cancelWorkers := context.WithCancel(ctx)
defer cancelWorkers()
var wg sync.WaitGroup
Expand Down Expand Up @@ -586,9 +596,6 @@ func runRun(gen workload.Generator, urls []string, dbName string) error {

case <-done:
cancelWorkers()
if ops.Close != nil {
ops.Close(ctx)
}

startElapsed := timeutil.Since(start)
resultTick := histogram.Tick{Name: ops.ResultHist}
Expand Down
3 changes: 2 additions & 1 deletion pkg/workload/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,12 +727,13 @@ func (o *kvOp) tryHandleWriteErr(name string, start time.Time, err error) error
return err
}

func (o *kvOp) close(context.Context) {
func (o *kvOp) close(context.Context) error {
if empty := o.numEmptyResults.Load(); empty != 0 {
fmt.Printf("Number of reads that didn't return any results: %d.\n", empty)
}
fmt.Printf("Write sequence could be resumed by passing --write-seq=%s to the next run.\n",
o.g.state())
return nil
}

type sequence struct {
Expand Down
20 changes: 7 additions & 13 deletions pkg/workload/schemachange/schemachange.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,6 @@ func (s *schemaChange) Tables() []workload.Table {
return nil
}

// Hooks implements the workload.Hookser interface.
func (s *schemaChange) Hooks() workload.Hooks {
return workload.Hooks{
PostRun: func(_ time.Duration) error {
return s.closeJSONLogFile()
},
}
}

// Ops implements the workload.Opser interface.
func (s *schemaChange) Ops(
ctx context.Context, urls []string, reg *histogram.Registry,
Expand Down Expand Up @@ -199,7 +190,13 @@ func (s *schemaChange) Ops(
}
declarativeOps := newDeck(rng, declarativeOpWeights...)

ql := workload.QueryLoad{SQLDatabase: sqlDatabase}
ql := workload.QueryLoad{
SQLDatabase: sqlDatabase,
Close: func(ctx context.Context) error {
pool.Close()
return s.closeJSONLogFile()
},
}

var artifactsLog *atomicLog
if s.logFilePath != "" {
Expand Down Expand Up @@ -251,9 +248,6 @@ func (s *schemaChange) Ops(
s.workers = append(s.workers, w)

ql.WorkerFns = append(ql.WorkerFns, w.run)
ql.Close = func(ctx2 context.Context) {
pool.Close()
}
}
return ql, nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/workload/tpcc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,12 +898,13 @@ func (w *tpcc) Ops(
}

// Close idle connections.
ql.Close = func(context context.Context) {
ql.Close = func(context context.Context) error {
for _, conn := range conns {
if err := conn.Close(ctx); err != nil {
log.Warningf(ctx, "%v", err)
}
}
return nil
}
return ql, nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,11 @@ type QueryLoad struct {
WorkerFns []func(context.Context) error

// Close, if set, is called before the process exits, giving workloads a
// chance to print some information.
// chance to print some information or perform cleanup.
// It's guaranteed that the ctx passed to WorkerFns (if they're still running)
// has been canceled by the time this is called (so an implementer can
// synchronize with the WorkerFns if need be).
Close func(context.Context)
Close func(context.Context) error

// ResultHist is the name of the NamedHistogram to use for the benchmark
// formatted results output at the end of `./workload run`. The empty string
Expand Down

0 comments on commit ce3f78b

Please sign in to comment.