Skip to content

Commit

Permalink
[release]Support for Node Events
Browse files Browse the repository at this point in the history
DFSL now supports sending node events to urls configured with environmental variables: `DF_NOTIFY_CREATE_NODE_URL` and `DF_NOTIFY_REMOVE_NODE_URL`. Notifications sent by DFSL are detailed on the [usage](http://swarmlistener.dockerflow.com/usage/) documentation page.
  • Loading branch information
thomasjpfan authored Mar 13, 2018
1 parent 07bcba7 commit 1649857
Show file tree
Hide file tree
Showing 12 changed files with 678 additions and 61 deletions.
2 changes: 1 addition & 1 deletion service/eventnodelistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s NodeListener) ListenForNodeEvents(

}

// validEventNode returns false when event is valid (should be passed through)
// validEventNode returns true when event is valid (should be passed through)
// this will still allow through 4-5 events from changing a worker node
// to a manager node or vise versa.
func (s NodeListener) validEventNode(msg events.Message) bool {
Expand Down
15 changes: 15 additions & 0 deletions service/eventservicelistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"../metrics"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
)
Expand Down Expand Up @@ -38,6 +39,9 @@ func (s SwarmServiceListener) ListenForServiceEvents(eventChan chan<- Event) {
for {
select {
case msg := <-msgStream:
if !s.validEventNode(msg) {
continue
}
eventType := EventTypeCreate
if msg.Action == "remove" {
eventType = EventTypeRemove
Expand All @@ -57,3 +61,14 @@ func (s SwarmServiceListener) ListenForServiceEvents(eventChan chan<- Event) {
}
}()
}

// validEventNode returns true when event is valid (should be passed through)
func (s SwarmServiceListener) validEventNode(msg events.Message) bool {
if msg.Action != "update" {
return true
}
if name, ok := msg.Actor.Attributes["updatestate.new"]; ok && len(name) > 0 {
return false
}
return true
}
8 changes: 4 additions & 4 deletions service/minify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ func (s *MinifyUnitTestSuite) Test_MinifySwarmService_Global() {
},
Global: true,
Replicas: uint64(0),
NodeInfo: &nodeSet,
NodeInfo: nodeSet,
}

ss := SwarmService{service, &nodeSet}
ss := SwarmService{service, nodeSet}
ssMini := MinifySwarmService(ss, "com.df.notify", "com.docker.stack.namespace")

s.Equal(expectMini, ssMini)
Expand Down Expand Up @@ -159,10 +159,10 @@ func (s *MinifyUnitTestSuite) Test_MinifySwarmService_Replicas() {
},
Global: false,
Replicas: uint64(3),
NodeInfo: &nodeSet,
NodeInfo: nodeSet,
}

ss := SwarmService{service, &nodeSet}
ss := SwarmService{service, nodeSet}
ssMini := MinifySwarmService(ss, "com.df.notify", "com.docker.stack.namespace")

s.Equal(expectMini, ssMini)
Expand Down
46 changes: 26 additions & 20 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package service

import (
"context"
"fmt"
"log"
"strings"

"github.com/docker/docker/api/types"
Expand All @@ -22,15 +24,18 @@ type SwarmServiceClient struct {
FilterLabel string
FilterKey string
ScrapeNetLabel string
Log *log.Logger
}

// NewSwarmServiceClient creates a `SwarmServiceClient`
func NewSwarmServiceClient(c *client.Client, filterLabel, scrapNetLabel string) *SwarmServiceClient {
func NewSwarmServiceClient(c *client.Client, filterLabel, scrapNetLabel string, logger *log.Logger) *SwarmServiceClient {
key := strings.SplitN(filterLabel, "=", 2)[0]
return &SwarmServiceClient{DockerClient: c,
FilterLabel: filterLabel,
FilterKey: key,
ScrapeNetLabel: scrapNetLabel}
ScrapeNetLabel: scrapNetLabel,
Log: logger,
}
}

// SwarmServiceInspect returns `SwarmService` from its ID
Expand All @@ -49,7 +54,12 @@ func (c SwarmServiceClient) SwarmServiceInspect(serviceID string, includeNodeIPI

ss := SwarmService{service, nil}
if includeNodeIPInfo {
ss.NodeInfo = c.getNodeInfo(service)
nodeInfo, err := c.getNodeInfo(context.Background(), service)
if err != nil {
c.Log.Printf("%v", err)
} else {
ss.NodeInfo = nodeInfo
}
}
return &ss, nil
}
Expand All @@ -67,23 +77,28 @@ func (c SwarmServiceClient) SwarmServiceList(ctx context.Context, includeNodeIPI
for _, s := range services {
ss := SwarmService{s, nil}
if includeNodeIPInfo {
ss.NodeInfo = c.getNodeInfo(s)
nodeInfo, _ := c.getNodeInfo(ctx, s)
if err != nil {
c.Log.Printf("%v", err)
} else {
ss.NodeInfo = nodeInfo
}
}
swarmServices = append(swarmServices, ss)
}
return swarmServices, nil
}

func (c SwarmServiceClient) getNodeInfo(ss swarm.Service) *NodeIPSet {
func (c SwarmServiceClient) getNodeInfo(ctx context.Context, ss swarm.Service) (NodeIPSet, error) {

networkName, ok := ss.Spec.Labels[c.ScrapeNetLabel]
if !ok {
return nil
return nil, fmt.Errorf("NodeInfo %s label is not defined for service %s", c.ScrapeNetLabel, ss.Spec.Name)
}

taskList, err := c.getTaskList(ss.ID)
taskList, err := GetTaskList(ctx, c.DockerClient, ss.ID)
if err != nil {
return nil
return nil, err
}

nodeInfo := NodeIPSet{}
Expand All @@ -106,7 +121,7 @@ func (c SwarmServiceClient) getNodeInfo(ss swarm.Service) *NodeIPSet {
if nodeName, ok := nodeIPCache[task.NodeID]; ok {
nodeInfo.Add(nodeName, address)
} else {
node, _, err := c.DockerClient.NodeInspectWithRaw(context.Background(), task.NodeID)
node, _, err := c.DockerClient.NodeInspectWithRaw(ctx, task.NodeID)
if err != nil {
continue
}
Expand All @@ -116,16 +131,7 @@ func (c SwarmServiceClient) getNodeInfo(ss swarm.Service) *NodeIPSet {
}

if nodeInfo.Cardinality() == 0 {
return nil
return nil, nil
}
return &nodeInfo
}

func (c SwarmServiceClient) getTaskList(serviceID string) ([]swarm.Task, error) {

filter := filters.NewArgs()
filter.Add("desired-state", "running")
filter.Add("service", serviceID)
return c.DockerClient.TaskList(
context.Background(), types.TaskListOptions{Filters: filter})
return nodeInfo, nil
}
24 changes: 16 additions & 8 deletions service/service_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package service

import (
"bytes"
"context"
"log"
"testing"
"time"

Expand All @@ -10,11 +12,13 @@ import (

type SwarmServiceClientTestSuite struct {
suite.Suite
SClient *SwarmServiceClient
Util1ID string
Util2ID string
Util3ID string
Util4ID string
SClient *SwarmServiceClient
Util1ID string
Util2ID string
Util3ID string
Util4ID string
Logger *log.Logger
LogBytes *bytes.Buffer
}

func TestSwarmServiceClientTestSuite(t *testing.T) {
Expand All @@ -24,7 +28,11 @@ func TestSwarmServiceClientTestSuite(t *testing.T) {
func (s *SwarmServiceClientTestSuite) SetupSuite() {
c, err := NewDockerClientFromEnv()
s.Require().NoError(err)
s.SClient = NewSwarmServiceClient(c, "com.df.notify=true", "com.df.scrapeNetwork")

s.LogBytes = new(bytes.Buffer)
s.Logger = log.New(s.LogBytes, "", 0)

s.SClient = NewSwarmServiceClient(c, "com.df.notify=true", "com.df.scrapeNetwork", s.Logger)

createTestOverlayNetwork("util-network")
createTestService("util-1", []string{"com.df.notify=true", "com.df.scrapeNetwork=util-network"}, false, "", "util-network")
Expand Down Expand Up @@ -84,7 +92,7 @@ func (s *SwarmServiceClientTestSuite) Test_SwarmServiceInspect_NodeInfo_OneRepli
s.Equal(s.Util1ID, util1Service.ID)
s.Require().NotNil(util1Service.NodeInfo)

nodeInfo := *util1Service.NodeInfo
nodeInfo := util1Service.NodeInfo
s.Require().Len(nodeInfo, 1)
}

Expand All @@ -97,7 +105,7 @@ func (s *SwarmServiceClientTestSuite) Test_SwarmServiceInspect_NodeInfo_TwoRepli
s.Equal(s.Util4ID, util4Service.ID)
s.Require().NotNil(util4Service.NodeInfo)

nodeInfo := *util4Service.NodeInfo
nodeInfo := util4Service.NodeInfo
s.Require().Len(nodeInfo, 2)
}

Expand Down
2 changes: 1 addition & 1 deletion service/servicecache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *SwarmServiceCacheTestSuite) Test_InsertAndCheck_NewNodeInfo_ReturnsTrue
newSSMini := getNewSwarmServiceMini()
nodeSet := NodeIPSet{}
nodeSet.Add("node-3", "1.0.2.1")
newSSMini.NodeInfo = &nodeSet
newSSMini.NodeInfo = nodeSet

isUpdated = s.Cache.InsertAndCheck(newSSMini)
s.True(isUpdated)
Expand Down
2 changes: 1 addition & 1 deletion service/swarmlistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewSwarmListenerFromEnv(retries, interval int, logger *log.Logger) (*SwarmL
return nil, err
}
ssListener := NewSwarmServiceListener(dockerClient, logger)
ssClient := NewSwarmServiceClient(dockerClient, ignoreKey, "com.df.scrapeNetwork")
ssClient := NewSwarmServiceClient(dockerClient, ignoreKey, "com.df.scrapeNetwork", logger)
ssCache := NewSwarmServiceCache()

nodeListener := NewNodeListener(dockerClient, logger)
Expand Down
Loading

0 comments on commit 1649857

Please sign in to comment.