Skip to content

Commit

Permalink
Merge pull request moby#4818 from sipsma/more-scheduler-debug
Browse files Browse the repository at this point in the history
solver: add more debug logs to track down inconsistent graph state
  • Loading branch information
sipsma authored Apr 3, 2024
2 parents 76596cc + b2925c2 commit dc23e43
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 10 deletions.
42 changes: 42 additions & 0 deletions solver/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,12 +843,44 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,
addNew := true
if dep.req != nil && !dep.req.Status().Completed {
if dep.req.Request().(*edgeRequest).desiredState != desiredStateDep {
if e.debug {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_req_desired_state", dep.req.Request().(*edgeRequest).desiredState).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
Debug("cancel input request")
}
dep.req.Cancel()
} else {
if e.debug {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_req_desired_state", dep.req.Request().(*edgeRequest).desiredState).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
Debug("skip input request based on existing request")
}
addNew = false
}
}
if addNew {
if e.debug {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
WithField("dep_vertex_name", e.edge.Vertex.Inputs()[dep.index].Vertex.Name()).
WithField("dep_vertex_digest", e.edge.Vertex.Inputs()[dep.index].Vertex.Digest()).
Debug("add input request")
}

req := f.NewInputRequest(e.edge.Vertex.Inputs()[int(dep.index)], &edgeRequest{
currentState: dep.edgeState,
desiredState: desiredStateDep,
Expand All @@ -858,6 +890,16 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,
dep.req = req
addedNew = true
}
} else if e.debug {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
WithField("dep_vertex_name", e.edge.Vertex.Inputs()[dep.index].Vertex.Name()).
WithField("dep_vertex_digest", e.edge.Vertex.Inputs()[dep.index].Vertex.Digest()).
Debug("skip input request based on dep state")
}
// initialize function to compute cache key based on dependency result
if dep.state == edgeStatusComplete && dep.slowCacheReq == nil && (e.slowCacheFunc(dep) != nil || e.preprocessFunc(dep) != nil) && e.cacheMap != nil {
Expand Down
94 changes: 87 additions & 7 deletions solver/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver/errdefs"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/progress/controller"
Expand Down Expand Up @@ -350,7 +351,23 @@ func (jl *Solver) getState(e Edge) *state {
return st
}

func (jl *Solver) getEdge(e Edge) *edge {
func (jl *Solver) getEdge(e Edge) (redge *edge) {
if debugScheduler {
defer func() {
lg := bklog.G(context.TODO()).
WithField("edge_vertex_name", e.Vertex.Name()).
WithField("edge_vertex_digest", e.Vertex.Digest()).
WithField("edge_index", e.Index)
if redge != nil {
lg = lg.
WithField("return_edge_vertex_name", redge.edge.Vertex.Name()).
WithField("return_edge_vertex_digest", redge.edge.Vertex.Digest()).
WithField("return_edge_index", redge.edge.Index)
}
lg.Debug("getEdge return")
}()
}

jl.mu.RLock()
defer jl.mu.RUnlock()

Expand All @@ -362,7 +379,7 @@ func (jl *Solver) getEdge(e Edge) *edge {
}

func (jl *Solver) subBuild(ctx context.Context, e Edge, parent Vertex) (CachedResult, error) {
v, err := jl.load(e.Vertex, parent, nil)
v, err := jl.load(ctx, e.Vertex, parent, nil)
if err != nil {
return nil, err
}
Expand All @@ -374,25 +391,25 @@ func (jl *Solver) Close() {
jl.s.Stop()
}

func (jl *Solver) load(v, parent Vertex, j *Job) (Vertex, error) {
func (jl *Solver) load(ctx context.Context, v, parent Vertex, j *Job) (Vertex, error) {
jl.mu.Lock()
defer jl.mu.Unlock()

cache := map[Vertex]Vertex{}

return jl.loadUnlocked(v, parent, j, cache)
return jl.loadUnlocked(ctx, v, parent, j, cache)
}

// called with solver lock
func (jl *Solver) loadUnlocked(v, parent Vertex, j *Job, cache map[Vertex]Vertex) (Vertex, error) {
func (jl *Solver) loadUnlocked(ctx context.Context, v, parent Vertex, j *Job, cache map[Vertex]Vertex) (Vertex, error) {
if v, ok := cache[v]; ok {
return v, nil
}
origVtx := v

inputs := make([]Edge, len(v.Inputs()))
for i, e := range v.Inputs() {
v, err := jl.loadUnlocked(e.Vertex, parent, j, cache)
v, err := jl.loadUnlocked(ctx, e.Vertex, parent, j, cache)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -449,6 +466,24 @@ func (jl *Solver) loadUnlocked(v, parent Vertex, j *Job, cache map[Vertex]Vertex
origDigest: origVtx.Digest(),
}
jl.actives[dgst] = st

if debugScheduler {
lg := bklog.G(ctx).
WithField("vertex_name", v.Name()).
WithField("vertex_digest", v.Digest())
if j != nil {
lg = lg.WithField("job", j.id)
}
lg.Debug("adding active vertex")
}
} else if debugScheduler {
lg := bklog.G(ctx).
WithField("vertex_name", v.Name()).
WithField("vertex_digest", v.Digest())
if j != nil {
lg = lg.WithField("job", j.id)
}
lg.Debug("reusing active vertex")
}

st.mu.Lock()
Expand All @@ -464,6 +499,17 @@ func (jl *Solver) loadUnlocked(v, parent Vertex, j *Job, cache map[Vertex]Vertex
if _, ok := st.jobs[j]; !ok {
st.jobs[j] = struct{}{}
}
if debugScheduler {
jobIDs := make([]string, 0, len(st.jobs))
for j := range st.jobs {
jobIDs = append(jobIDs, j.id)
}
bklog.G(ctx).
WithField("vertex_name", v.Name()).
WithField("vertex_digest", v.Digest()).
WithField("jobs", jobIDs).
Debug("current jobs for vertex")
}
}
st.mu.Unlock()

Expand Down Expand Up @@ -564,13 +610,39 @@ func (jl *Solver) Get(id string) (*Job, error) {
// called with solver lock
func (jl *Solver) deleteIfUnreferenced(k digest.Digest, st *state) {
if len(st.jobs) == 0 && len(st.parents) == 0 {
if debugScheduler {
bklog.G(context.TODO()).
WithField("vertex_name", st.vtx.Name()).
WithField("vertex_digest", st.vtx.Digest()).
WithField("actives_key", k).
Debug("deleting unreferenced active vertex")
for _, e := range st.edges {
bklog.G(context.TODO()).
WithField("vertex_name", e.edge.Vertex.Name()).
WithField("vertex_digest", e.edge.Vertex.Digest()).
WithField("index", e.edge.Index).
WithField("state", e.state).
Debug("edge in deleted unreferenced state")
}
}
for chKey := range st.childVtx {
chState := jl.actives[chKey]
delete(chState.parents, k)
jl.deleteIfUnreferenced(chKey, chState)
}
st.Release()
delete(jl.actives, k)
} else if debugScheduler {
var jobIDs []string
for j := range st.jobs {
jobIDs = append(jobIDs, j.id)
}
bklog.G(context.TODO()).
WithField("vertex_name", st.vtx.Name()).
WithField("vertex_digest", st.vtx.Digest()).
WithField("actives_key", k).
WithField("jobs", jobIDs).
Debug("not deleting referenced active vertex")
}
}

Expand All @@ -579,7 +651,7 @@ func (j *Job) Build(ctx context.Context, e Edge) (CachedResultWithProvenance, er
j.span = span
}

v, err := j.list.load(e.Vertex, nil, j)
v, err := j.list.load(ctx, e.Vertex, nil, j)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -647,6 +719,14 @@ func (j *Job) Discard() error {
for k, st := range j.list.actives {
st.mu.Lock()
if _, ok := st.jobs[j]; ok {
if debugScheduler {
bklog.G(context.TODO()).
WithField("job", j.id).
WithField("vertex_name", st.vtx.Name()).
WithField("vertex_digest", st.vtx.Digest()).
WithField("actives_key", k).
Debug("deleting job from state")
}
delete(st.jobs, j)
j.list.deleteIfUnreferenced(k, st)
}
Expand Down
42 changes: 39 additions & 3 deletions solver/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,47 @@ func (s *scheduler) dispatch(e *edge) {
origEdge := e.index.LoadOrStore(k, e)
if origEdge != nil {
if e.isDep(origEdge) || origEdge.isDep(e) {
bklog.G(context.TODO()).Debugf("skip merge due to dependency")
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("edge_index", e.edge.Index).
WithField("origEdge_vertex_name", origEdge.edge.Vertex.Name()).
WithField("origEdge_vertex_digest", origEdge.edge.Vertex.Digest()).
WithField("origEdge_index", origEdge.edge.Index).
Debug("skip merge due to dependency")
} else {
dest, src := origEdge, e
if s.ef.hasOwner(origEdge.edge, e.edge) {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("edge_index", e.edge.Index).
WithField("origEdge_vertex_name", origEdge.edge.Vertex.Name()).
WithField("origEdge_vertex_digest", origEdge.edge.Vertex.Digest()).
WithField("origEdge_index", origEdge.edge.Index).
Debug("swap merge due to owner")
dest, src = src, dest
}

bklog.G(context.TODO()).Debugf("merging edge %s[%d] to %s[%d]\n", src.edge.Vertex.Name(), src.edge.Index, dest.edge.Vertex.Name(), dest.edge.Index)
bklog.G(context.TODO()).
WithField("source_edge_vertex_name", src.edge.Vertex.Name()).
WithField("source_edge_vertex_digest", src.edge.Vertex.Digest()).
WithField("source_edge_index", src.edge.Index).
WithField("dest_vertex_name", dest.edge.Vertex.Name()).
WithField("dest_vertex_digest", dest.edge.Vertex.Digest()).
WithField("dest_index", dest.edge.Index).
Debug("merging edges")
if s.mergeTo(dest, src) {
s.ef.setEdge(src.edge, dest)
} else {
bklog.G(context.TODO()).
WithField("source_edge_vertex_name", src.edge.Vertex.Name()).
WithField("source_edge_vertex_digest", src.edge.Vertex.Digest()).
WithField("source_edge_index", src.edge.Index).
WithField("dest_vertex_name", dest.edge.Vertex.Name()).
WithField("dest_vertex_digest", dest.edge.Vertex.Digest()).
WithField("dest_index", dest.edge.Index).
Debug("merging edges skipped")
}
}
}
Expand Down Expand Up @@ -367,8 +398,13 @@ type pipeFactory struct {
func (pf *pipeFactory) NewInputRequest(ee Edge, req *edgeRequest) pipe.Receiver {
target := pf.s.ef.getEdge(ee)
if target == nil {
bklog.G(context.TODO()).
WithField("edge_vertex_name", ee.Vertex.Name()).
WithField("edge_vertex_digest", ee.Vertex.Digest()).
WithField("edge_index", ee.Index).
Error("failed to get edge: inconsistent graph state")
return pf.NewFuncRequest(func(_ context.Context) (interface{}, error) {
return nil, errors.Errorf("failed to get edge: inconsistent graph state")
return nil, errors.Errorf("failed to get edge: inconsistent graph state in edge %s %s %d", ee.Vertex.Name(), ee.Vertex.Digest(), ee.Index)
})
}
p := pf.s.newPipe(target, pf.e, pipe.Request{Payload: req})
Expand Down

0 comments on commit dc23e43

Please sign in to comment.