Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

constraintenforcer: Trigger task restarts when appropriate #1958

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions manager/orchestrator/constraintenforcer/constraint_enforcer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package constraintenforcer

import (
"time"

"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/constraint"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/protobuf/ptypes"
)

// ConstraintEnforcer watches for updates to nodes and shuts down tasks that no
Expand Down Expand Up @@ -43,22 +46,22 @@ func (ce *ConstraintEnforcer) Run() {
log.L.WithError(err).Error("failed to check nodes for noncompliant tasks")
} else {
for _, node := range nodes {
ce.shutdownNoncompliantTasks(node)
ce.rejectNoncompliantTasks(node)
}
}

for {
select {
case event := <-watcher:
node := event.(state.EventUpdateNode).Node
ce.shutdownNoncompliantTasks(node)
ce.rejectNoncompliantTasks(node)
case <-ce.stopChan:
return
}
}
}

func (ce *ConstraintEnforcer) shutdownNoncompliantTasks(node *api.Node) {
func (ce *ConstraintEnforcer) rejectNoncompliantTasks(node *api.Node) {
// If the availability is "drain", the orchestrator will
// shut down all tasks.
// If the availability is "pause", we shouldn't touch
Expand Down Expand Up @@ -134,7 +137,16 @@ func (ce *ConstraintEnforcer) shutdownNoncompliantTasks(node *api.Node) {
return nil
}

t.DesiredState = api.TaskStateShutdown
// We set the observed state to
// REJECTED, rather than the desired
// state. Desired state is owned by the
// orchestrator, and setting it directly
// will bypass actions such as
// restarting the task on another node
// (if applicable).
t.Status.State = api.TaskStateRejected
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change function name from shutdownNoncompliantTasks to rejectNoncompliantTasks?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

t.Status.Message = "assigned node no longer meets constraints"
t.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
return store.UpdateTask(tx, t)
})
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,10 @@ func TestConstraintEnforcer(t *testing.T) {

go constraintEnforcer.Run()

// id0 should be killed immediately
shutdown1 := testutils.WatchShutdownTask(t, watch)
// id0 should be rejected immediately
shutdown1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, "id0", shutdown1.ID)
assert.Equal(t, api.TaskStateRejected, shutdown1.Status.State)

// Change node id1 to a manager
err = s.Update(func(tx store.Tx) error {
Expand All @@ -147,8 +148,9 @@ func TestConstraintEnforcer(t *testing.T) {
})
assert.NoError(t, err)

shutdown2 := testutils.WatchShutdownTask(t, watch)
shutdown2 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, "id2", shutdown2.ID)
assert.Equal(t, api.TaskStateRejected, shutdown2.Status.State)

// Change resources on node id2
err = s.Update(func(tx store.Tx) error {
Expand All @@ -162,6 +164,7 @@ func TestConstraintEnforcer(t *testing.T) {
})
assert.NoError(t, err)

shutdown3 := testutils.WatchShutdownTask(t, watch)
shutdown3 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, "id4", shutdown3.ID)
assert.Equal(t, api.TaskStateRejected, shutdown3.Status.State)
}
12 changes: 12 additions & 0 deletions manager/orchestrator/global/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,22 @@ func (g *Orchestrator) tickTasks(ctx context.Context) {
if t == nil || t.DesiredState > api.TaskStateRunning {
return nil
}

service := store.GetService(tx, t.ServiceID)
if service == nil {
return nil
}

node, nodeExists := g.nodes[t.NodeID]
serviceEntry, serviceExists := g.globalServices[t.ServiceID]
if !nodeExists || !serviceExists {
return nil
}
if !constraint.NodeMatches(serviceEntry.constraints, node) {
t.DesiredState = api.TaskStateShutdown
return store.UpdateTask(tx, t)
}

return g.restarts.Restart(ctx, tx, g.cluster, service, *t)
})
if err != nil {
Expand Down