Skip to content

Commit

Permalink
Cherry-pick moby/swarmkit#1651
Browse files Browse the repository at this point in the history
To identify the issue in allocator.

Signed-off-by: Madhu Venugopal <[email protected]>
  • Loading branch information
mavenugo committed Oct 19, 2016
1 parent eeea167 commit 699f3ca
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 32 deletions.
2 changes: 1 addition & 1 deletion hack/vendor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ clone git github.com/docker/docker-credential-helpers v0.3.0
clone git github.com/docker/containerd 837e8c5e1cad013ed57f5c2090c8591c10cbbdae

# cluster
clone git github.com/docker/swarmkit 7e63bdefb94e5bea2641e8bdebae2cfa61a0ed44
clone git github.com/docker/swarmkit 5fc74cad8d204ef9818d3ef3b78d8ee962f2f852 https://github.com/mavenugo/swarmkit
clone git github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
clone git github.com/gogo/protobuf v0.3
clone git github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
Expand Down
1 change: 0 additions & 1 deletion vendor/src/github.com/docker/swarmkit/agent/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,6 @@ func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{})
// Using listen address instead of advertised address because this is a
// local connection.
addr := n.config.ListenControlAPI
// opts = append(opts, grpc.WithInsecure())
conn, err := grpc.Dial(addr, opts...)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func (a *Allocator) Run(ctx context.Context) error {
aaCopy := aa
actor := func() error {
wg.Add(1)
defer wg.Done()

// init might return an allocator specific context
// which is a child of the passed in context to hold
// allocator specific state
Expand All @@ -133,10 +135,10 @@ func (a *Allocator) Run(ctx context.Context) error {
// if we are failing in the init of
// this allocator.
aa.cancel()
wg.Done()
return err
}

wg.Add(1)
go func() {
defer wg.Done()
a.run(ctx, aaCopy)
Expand Down
77 changes: 49 additions & 28 deletions vendor/src/github.com/docker/swarmkit/manager/allocator/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type networkContext struct {
unallocatedNetworks map[string]*api.Network
}

func (a *Allocator) doNetworkInit(ctx context.Context) error {
func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
na, err := networkallocator.New()
if err != nil {
return err
Expand All @@ -81,6 +81,13 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
unallocatedNetworks: make(map[string]*api.Network),
ingressNetwork: newIngressNetwork(),
}
a.netCtx = nc
defer func() {
// Clear a.netCtx if initialization was unsuccessful.
if err != nil {
a.netCtx = nil
}
}()

// Check if we have the ingress network. If not found create
// it before reading all network objects for allocation.
Expand Down Expand Up @@ -125,7 +132,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
// that the we can get the preferred subnet for ingress
// network.
if !na.IsAllocated(nc.ingressNetwork) {
if err := a.allocateNetwork(ctx, nc, nc.ingressNetwork); err != nil {
if err := a.allocateNetwork(ctx, nc.ingressNetwork); err != nil {
log.G(ctx).WithError(err).Error("failed allocating ingress network during init")
}

Expand Down Expand Up @@ -155,7 +162,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
continue
}

if err := a.allocateNetwork(ctx, nc, n); err != nil {
if err := a.allocateNetwork(ctx, n); err != nil {
log.G(ctx).WithError(err).Errorf("failed allocating network %s during init", n.ID)
}
}
Expand All @@ -179,7 +186,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
}

node.Attachment.Network = nc.ingressNetwork.Copy()
if err := a.allocateNode(ctx, nc, node); err != nil {
if err := a.allocateNode(ctx, node); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s during init", node.ID)
}
}
Expand All @@ -198,7 +205,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
continue
}

if err := a.allocateService(ctx, nc, s); err != nil {
if err := a.allocateService(ctx, s); err != nil {
log.G(ctx).WithError(err).Errorf("failed allocating service %s during init", s.ID)
}
}
Expand Down Expand Up @@ -260,7 +267,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
}

err := batch.Update(func(tx store.Tx) error {
_, err := a.allocateTask(ctx, nc, tx, t)
_, err := a.allocateTask(ctx, tx, t)
return err
})
if err != nil {
Expand All @@ -274,7 +281,6 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
return err
}

a.netCtx = nc
return nil
}

Expand All @@ -288,7 +294,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
break
}

if err := a.allocateNetwork(ctx, nc, n); err != nil {
if err := a.allocateNetwork(ctx, n); err != nil {
log.G(ctx).WithError(err).Errorf("Failed allocation for network %s", n.ID)
break
}
Expand All @@ -309,7 +315,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
break
}

if err := a.allocateService(ctx, nc, s); err != nil {
if err := a.allocateService(ctx, s); err != nil {
log.G(ctx).WithError(err).Errorf("Failed allocation for service %s", s.ID)
break
}
Expand All @@ -320,7 +326,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
break
}

if err := a.allocateService(ctx, nc, s); err != nil {
if err := a.allocateService(ctx, s); err != nil {
log.G(ctx).WithError(err).Errorf("Failed allocation during update of service %s", s.ID)
break
}
Expand All @@ -335,18 +341,18 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
// it's still there.
delete(nc.unallocatedServices, s.ID)
case state.EventCreateNode, state.EventUpdateNode, state.EventDeleteNode:
a.doNodeAlloc(ctx, nc, ev)
a.doNodeAlloc(ctx, ev)
case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask:
a.doTaskAlloc(ctx, nc, ev)
a.doTaskAlloc(ctx, ev)
case state.EventCommit:
a.procUnallocatedNetworks(ctx, nc)
a.procUnallocatedServices(ctx, nc)
a.procUnallocatedTasksNetwork(ctx, nc)
a.procUnallocatedNetworks(ctx)
a.procUnallocatedServices(ctx)
a.procUnallocatedTasksNetwork(ctx)
return
}
}

func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev events.Event) {
func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
var (
isDelete bool
node *api.Node
Expand All @@ -362,6 +368,8 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev even
node = v.Node.Copy()
}

nc := a.netCtx

if isDelete {
if nc.nwkAllocator.IsNodeAllocated(node) {
if err := nc.nwkAllocator.DeallocateNode(node); err != nil {
Expand All @@ -377,7 +385,7 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev even
}

node.Attachment.Network = nc.ingressNetwork.Copy()
if err := a.allocateNode(ctx, nc, node); err != nil {
if err := a.allocateNode(ctx, node); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
}
}
Expand Down Expand Up @@ -464,7 +472,7 @@ func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) {
taskUpdateNetworks(t, networks)
}

func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev events.Event) {
func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
var (
isDelete bool
t *api.Task
Expand All @@ -480,6 +488,8 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even
t = v.Task.Copy()
}

nc := a.netCtx

// If the task has stopped running or it's being deleted then
// we should free the network resources associated with the
// task right away.
Expand Down Expand Up @@ -530,7 +540,9 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even
nc.unallocatedTasks[t.ID] = t
}

func (a *Allocator) allocateNode(ctx context.Context, nc *networkContext, node *api.Node) error {
func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
nc := a.netCtx

if err := nc.nwkAllocator.AllocateNode(node); err != nil {
return err
}
Expand Down Expand Up @@ -563,7 +575,9 @@ func (a *Allocator) allocateNode(ctx context.Context, nc *networkContext, node *
return nil
}

func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s *api.Service) error {
func (a *Allocator) allocateService(ctx context.Context, s *api.Service) error {
nc := a.netCtx

if s.Spec.Endpoint != nil {
// service has user-defined endpoint
if s.Endpoint == nil {
Expand Down Expand Up @@ -648,7 +662,9 @@ func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s *
return nil
}

func (a *Allocator) allocateNetwork(ctx context.Context, nc *networkContext, n *api.Network) error {
func (a *Allocator) allocateNetwork(ctx context.Context, n *api.Network) error {
nc := a.netCtx

if err := nc.nwkAllocator.Allocate(n); err != nil {
nc.unallocatedNetworks[n.ID] = n
return errors.Wrapf(err, "failed during network allocation for network %s", n.ID)
Expand All @@ -670,7 +686,7 @@ func (a *Allocator) allocateNetwork(ctx context.Context, nc *networkContext, n *
return nil
}

func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx store.Tx, t *api.Task) (*api.Task, error) {
func (a *Allocator) allocateTask(ctx context.Context, tx store.Tx, t *api.Task) (*api.Task, error) {
taskUpdated := false

// Get the latest task state from the store before updating.
Expand All @@ -679,6 +695,8 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sto
return nil, fmt.Errorf("could not find task %s while trying to update network allocation", t.ID)
}

nc := a.netCtx

// We might be here even if a task allocation has already
// happened but wasn't successfully committed to store. In such
// cases skip allocation and go straight ahead to updating the
Expand Down Expand Up @@ -738,10 +756,11 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sto
return storeT, nil
}

func (a *Allocator) procUnallocatedNetworks(ctx context.Context, nc *networkContext) {
func (a *Allocator) procUnallocatedNetworks(ctx context.Context) {
nc := a.netCtx
for _, n := range nc.unallocatedNetworks {
if !nc.nwkAllocator.IsAllocated(n) {
if err := a.allocateNetwork(ctx, nc, n); err != nil {
if err := a.allocateNetwork(ctx, n); err != nil {
log.G(ctx).Debugf("Failed allocation of unallocated network %s: %v", n.ID, err)
continue
}
Expand All @@ -751,10 +770,11 @@ func (a *Allocator) procUnallocatedNetworks(ctx context.Context, nc *networkCont
}
}

func (a *Allocator) procUnallocatedServices(ctx context.Context, nc *networkContext) {
func (a *Allocator) procUnallocatedServices(ctx context.Context) {
nc := a.netCtx
for _, s := range nc.unallocatedServices {
if !nc.nwkAllocator.IsServiceAllocated(s) {
if err := a.allocateService(ctx, nc, s); err != nil {
if err := a.allocateService(ctx, s); err != nil {
log.G(ctx).Debugf("Failed allocation of unallocated service %s: %v", s.ID, err)
continue
}
Expand All @@ -764,15 +784,16 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context, nc *networkCont
}
}

func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context, nc *networkContext) {
func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
nc := a.netCtx
tasks := make([]*api.Task, 0, len(nc.unallocatedTasks))

committed, err := a.store.Batch(func(batch *store.Batch) error {
for _, t := range nc.unallocatedTasks {
var allocatedT *api.Task
err := batch.Update(func(tx store.Tx) error {
var err error
allocatedT, err = a.allocateTask(ctx, nc, tx, t)
allocatedT, err = a.allocateTask(ctx, tx, t)
return err
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ func (na *NetworkAllocator) allocatePools(n *api.Network) (map[string]string, er
ic.Subnet = poolIP.String()
}

if ic.Gateway == "" {
if ic.Gateway == "" && gwIP != nil {
ic.Gateway = gwIP.IP.String()
}

Expand Down

0 comments on commit 699f3ca

Please sign in to comment.