Skip to content

Commit

Permalink
goroutine-release
Browse files Browse the repository at this point in the history
Signed-off-by: Tonis Tiigi <[email protected]>
  • Loading branch information
tonistiigi committed May 30, 2023
1 parent db69cd3 commit 451e18c
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 33 deletions.
68 changes: 38 additions & 30 deletions executor/resources/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package resources

import (
"bufio"
"context"
"os"
"path/filepath"
"strconv"
Expand All @@ -28,21 +27,22 @@ var initOnce sync.Once
var isCgroupV2 bool

type cgroupRecord struct {
once sync.Once
ns string
sampler *Sub[*types.Sample]
closeSampler func() error
samples []*types.Sample
err error
done chan struct{}
monitor *Monitor
netSampler NetworkSampler
startCPUStat *procfs.CPUStat
sysCPUStat *types.SysCPUStat
once sync.Once
ns string
sampler *Sub[*types.Sample]
closeSampler func() error
samples []*types.Sample
err error
done chan struct{}
monitor *Monitor
netSampler NetworkSampler
startCPUStat *procfs.CPUStat
sysCPUStat *types.SysCPUStat
afterReleaseHook func() error
}

func (r *cgroupRecord) Wait() error {
go r.close()
go r.Close()
<-r.done
return r.err
}
Expand All @@ -56,15 +56,7 @@ func (r *cgroupRecord) Start() {
r.closeSampler = s.Close
}

func (r *cgroupRecord) CloseAsync(next func(context.Context) error) error {
go func() {
r.close()
next(context.TODO())
}()
return nil
}

func (r *cgroupRecord) close() {
func (r *cgroupRecord) Close() error {
r.once.Do(func() {
defer close(r.done)
go func() {
Expand Down Expand Up @@ -101,7 +93,15 @@ func (r *cgroupRecord) close() {
r.sysCPUStat = cpu
}
}

if r.afterReleaseHook != nil {
err := r.afterReleaseHook()
if r.err == nil {
r.err = err
}
}
})
return r.err
}

func (r *cgroupRecord) sample(tm time.Time) (*types.Sample, error) {
Expand Down Expand Up @@ -150,6 +150,7 @@ func (r *cgroupRecord) Samples() (*types.Samples, error) {
}

type nopRecord struct {
afterReleaseHook func() error
}

func (r *nopRecord) Wait() error {
Expand All @@ -160,8 +161,11 @@ func (r *nopRecord) Samples() (*types.Samples, error) {
return nil, nil
}

func (r *nopRecord) CloseAsync(next func(context.Context) error) error {
return next(context.TODO())
func (r *nopRecord) Close() error {
if r.afterReleaseHook != nil {
r.afterReleaseHook()
}
return nil
}

func (r *nopRecord) Start() {
Expand All @@ -180,6 +184,7 @@ type NetworkSampler interface {

type RecordOpt struct {
NetworkSampler NetworkSampler
ReleaseFunc func() error
}

func (m *Monitor) RecordNamespace(ns string, opt RecordOpt) (types.Recorder, error) {
Expand All @@ -190,13 +195,16 @@ func (m *Monitor) RecordNamespace(ns string, opt RecordOpt) (types.Recorder, err
default:
}
if !isCgroupV2 || isClosed {
return &nopRecord{}, nil
return &nopRecord{
afterReleaseHook: opt.ReleaseFunc,
}, nil
}
r := &cgroupRecord{
ns: ns,
done: make(chan struct{}),
monitor: m,
netSampler: opt.NetworkSampler,
ns: ns,
done: make(chan struct{}),
monitor: m,
netSampler: opt.NetworkSampler,
afterReleaseHook: opt.ReleaseFunc,
}
m.mu.Lock()
m.records[ns] = r
Expand All @@ -210,7 +218,7 @@ func (m *Monitor) Close() error {
defer m.mu.Unlock()

for _, r := range m.records {
r.close()
r.Close()
}
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions executor/resources/types/types.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package types

import (
"context"
"time"

"github.com/moby/buildkit/util/network"
)

type Recorder interface {
Start()
CloseAsync(func(context.Context) error) error
Close() error
Wait() error
Samples() (*Samples, error)
}
Expand Down
10 changes: 9 additions & 1 deletion executor/runcexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,15 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,

bklog.G(ctx).Debugf("> creating %s %v", id, meta.Args)

recorderReleaseHook := func(ctx context.Context) error { return nil }

cgroupPath := spec.Linux.CgroupsPath
if cgroupPath != "" {
rec, err = w.resmon.RecordNamespace(cgroupPath, resources.RecordOpt{
NetworkSampler: namespace,
ReleaseFunc: func() error {
return recorderReleaseHook(context.TODO())
},
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -341,9 +346,12 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,

if rec == nil {
return nil, releaseContainer(context.TODO())
} else {
recorderReleaseHook = releaseContainer
go rec.Close()
}

return rec, rec.CloseAsync(releaseContainer)
return rec, nil
}

func exitError(ctx context.Context, err error) error {
Expand Down

0 comments on commit 451e18c

Please sign in to comment.