Skip to content

Commit

Permalink
Merge pull request #48
Browse files Browse the repository at this point in the history
Uses queues for notifications and adds node notifications
  • Loading branch information
thomasjpfan authored Mar 12, 2018
1 parent 6988292 commit 07bcba7
Show file tree
Hide file tree
Showing 46 changed files with 4,141 additions and 2,060 deletions.
7 changes: 3 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.9-alpine3.6 AS build
FROM golang:1.10.0-alpine3.7 AS build

RUN apk add --update git
ADD . /src
Expand All @@ -8,12 +8,11 @@ RUN go build -v -o docker-flow-swarm-listener



FROM alpine:3.6
MAINTAINER Viktor Farcic <[email protected]>
FROM alpine:3.7
LABEL maintainer="Viktor Farcic <[email protected]>"

ENV DF_DOCKER_HOST="unix:///var/run/docker.sock" \
DF_NOTIFICATION_URL="" \
DF_INTERVAL="5" \
DF_RETRY="50" \
DF_RETRY_INTERVAL="5" \
DF_NOTIFY_LABEL="com.df.notify" \
Expand Down
14 changes: 4 additions & 10 deletions Dockerfile.test
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
FROM golang:1.9
FROM docker:17.12.1-ce

MAINTAINER Viktor Farcic <[email protected]>

RUN apt-get update && \
apt-get install -y apt-transport-https ca-certificates curl software-properties-common expect && \
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add - && \
add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/debian $(lsb_release -cs) stable" && \
apt-get update && \
apt-get -y install docker-ce
RUN apk add --no-cache gcc musl-dev openssl git go expect curl && \
go install cmd/...

COPY . /src
WORKDIR /src
RUN go get -d -v -t
RUN chmod +x /src/run-tests.sh

CMD ["sh", "-c", "/src/run-tests.sh"]
CMD ["sh", "-c", "/src/run-tests.sh"]
1 change: 0 additions & 1 deletion args.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ type args struct {

func getArgs() *args {
return &args{
Interval: getValue(5, "DF_INTERVAL"),
Retry: getValue(1, "DF_RETRY"),
RetryInterval: getValue(0, "DF_RETRY_INTERVAL"),
}
Expand Down
15 changes: 2 additions & 13 deletions args_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package main

import (
"github.com/stretchr/testify/suite"
"math/rand"
"os"
"strconv"
"testing"

"github.com/stretchr/testify/suite"
)

type ArgsTestSuite struct {
Expand All @@ -24,22 +25,10 @@ func TestArgsUnitTestSuite(t *testing.T) {
func (s *ArgsTestSuite) Test_GetArgs_ReturnsDefaultValues() {
args := getArgs()

s.Equal(5, args.Interval)
s.Equal(1, args.Retry)
s.Equal(0, args.RetryInterval)
}

func (s *ArgsTestSuite) Test_GetArgs_ReturnsIntervalFromEnv() {
expected := rand.Int()
intervalOrig := os.Getenv("DF_INTERVAL")
defer func() { os.Setenv("DF_INTERVAL", intervalOrig) }()
os.Setenv("DF_INTERVAL", strconv.Itoa(expected))

args := getArgs()

s.Equal(expected, args.Interval)
}

func (s *ArgsTestSuite) Test_GetArgs_ReturnsRetryFromEnv() {
expected := rand.Int()
intervalOrig := os.Getenv("DF_RETRY")
Expand Down
14 changes: 14 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,23 @@ services:
image: vfarcic/docker-flow-swarm-listener-test
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- $PWD:/src
networks:
- dfsl_network

docs:
image: cilerler/mkdocs
volumes:
- .:/docs
command: bash -c "pip install pygments && pip install pymdown-extensions && mkdocs build"

tests_local:
image: vfarcic/docker-flow-swarm-listener-test
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- $PWD:/src
networks:
- dfsl_network

networks:
dfsl_network:
9 changes: 5 additions & 4 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ The following environment variables can be used when creating the `swarm-listene
|Name |Description |
|-------------------|-------------------------------------------------------------------------------|
|DF_DOCKER_HOST |Path to the Docker socket<br>**Default**: `unix:///var/run/docker.sock` |
|DF_NOTIFY_CREATE_SERVICE_URL|Comma separated list of URLs that will be used to send notification requests when a service is created. If `com.df.notifyService` service labels is present, only URLs related to that service will be used. The `com.df.notifyService` label can have multiple values separated with comma (`,`).<br>**Example**: `url1,url2`|
|DF_NOTIFY_LABEL |Label that is used to distinguish whether a service should trigger a notification<br>**Default**: `com.df.notify`<br>**Example**: `com.df.notifyDev`|
|DF_NOTIFY_CREATE_SERVICE_URL|Comma separated list of URLs that will be used to send notification requests when a service is created. If `com.df.notifyService` service labels is present, only URLs related to that service will be used. The `com.df.notifyService` label can have multiple values separated with comma (`,`).<br>**Example**: `url1,url2`|
|DF_NOTIFY_REMOVE_SERVICE_URL|Comma separated list of URLs that will be used to send notification requests when a service is removed.<br>**Example**: `url1,url2`|
|DF_INTERVAL |Interval (in seconds) between service discovery requests<br>**Default**: `5`<br>**Example**: `10`|
|DF_RETRY |Number of notification request retries<br>**Default**: `50`<br>**Example**: `100`|
|DF_RETRY_INTERVAL |Interval (in seconds) between notification request retries<br>**Default**: `5`<br>**Example**: `10`|
|DF_INCLUDE_NODE_IP_INFO|Include node and ip information for service in notification.<br>**Default**:`false`|
|DF_NOTIFY_CREATE_NODE_URL |Comma separated list of URLs that will be used to send notification requests when a node is created or updated.<br>**Example**: `url1,url2`|
|DF_NOTIFY_REMOVE_NODE_URL |Comma separated list of URLs that will be used to send notification requests when a node is remove.<br>**Example**: `url1,url2`|
|DF_RETRY |Number of notification request retries<br>**Default**: `50`<br>**Example**: `100`|
|DF_RETRY_INTERVAL |Time between each notificationo request retry, in seconds.<br>**Default**: `5`<br>**Example**:`10`|
49 changes: 49 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Usage

## Notification Format

*Docker Flow Swarm Listener*, sends GET notifcations to configured URLs when a service or node is created, updated or removed. Please consult the [configuration](config.md) page on how to configure the URLs.

### Service Notification

When a service is created or updated a notification will be sent to **[DF_NOTIFY_CREATE_SERVICE_URL]** with the following parameters:

| Query | Description | Example |
|-------------|------------------------------------------------------------------------|---------|
| serviceName | Name of service. If `com.df.shortName` is true, and the service is part of a stack the stack name will be trimed off. | `go-demo` |
| replicas | Number of replicas of service. If the service is global, this parameter will be excluded.| `3` |
| nodeInfo | An array of node with its ip on an overlay network. The network is defined with the label: `com.df.scrapeNetwork`. This parameter is included when environment variable, `DF_INCLUDE_NODE_IP_INFO`, is true. | `[["node-3","10.0.0.23"], ["node-2", "10.0.0.22"]]` |

All service labels prefixed by `com.df.` will be added to the notification. For example, a service with label `com.df.hello=world` will translate to parameter: `hello=world`.

When a service is removed, a notification will be sent to **[DF_NOTIFY_REMOVE_SERVICE_URL]**. Only the `serviceName` parameter is included.

### Node Notification

When a node is created or updated a notification will be sent to **[DF_NOTIFY_CREATE_NODE_UR]** with the following parameters:

| Query | Description | Example |
|-------|-------------|---------|
| id | The ID of node given by docker | `2pe2xpkrx780xrhujws42a73w` |
| hostname | Hostname of node | `ap1.hostname.com` |
| address | Address of node | `10.0.0.1` |
| versionIndex | The version index of node | `24` |
| state | State of node. [`unknown`, `down`, `ready`, `disconnected`] | `down` |
| role | Role of node. [`worker`, `manager`] | `worker` |
| availability | Availability of node. [`active`, `pause`, `drain` ]| `active` |

All service labels prefixed by `com.df.` will be added to the notification. For example, a node with label `com.df.hello=world` will translate to parameter: `hello=world`.

When a node is removed, a notification will be sent to **[DF_NOTIFY_REMOVE_NODE_URl]**. Only the `id`, `hostname`, and `address` parameters are included.

## API

*Docker Flow Swarm Listener* exposes a API to query series and to send notifications.

### Get Services

The *Get Services* endpoint is used to query all running services with the `DF_NOTIFY_LABEL` label. A `GET` request to **[SWARM_IP]:[SWARM_PORT]/v1/docker-flow-swarm-listener/get-services** returns a json representation of these services.

### Notify Services

*DFSL* normally sends out notifcations when a service is created, updated, or removed. The *Notify Services* endpoint will force *DFSL* to send out notifications for all running services with the `DF_NOTIFY_LABEL` label. A `GET` request to **[SWARM_IP]:[SWARM_PORT]/v1/docker-flow-swarm-listener/notify-services** sends out the notifications.
77 changes: 15 additions & 62 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,76 +1,29 @@
package main

import (
"./metrics"
"log"
"os"

"./service"
)

func main() {
logPrintf("Starting Docker Flow: Swarm Listener")
s := service.NewServiceFromEnv()
n := service.NewNotificationFromEnv()
el := service.NewEventListenerFromEnv()
serve := NewServe(s, n)
go serve.Run()
l := log.New(os.Stdout, "", log.LstdFlags)

l.Printf("Starting Docker Flow: Swarm Listener")
args := getArgs()
if len(n.CreateServiceAddr) == 0 {
return
}

logPrintf("Sending notifications for running services")
allServices, err := s.GetServices()
swarmListener, err := service.NewSwarmListenerFromEnv(args.Retry, args.RetryInterval, l)
if err != nil {
metrics.RecordError("GetServices")
}

newServices, err := s.GetNewServices(allServices)
if err != nil {
metrics.RecordError("GetNewServices")
}
err = n.ServicesCreate(
newServices,
args.Retry,
args.RetryInterval,
)
if err != nil {
metrics.RecordError("ServicesCreate")
l.Printf("Failed to initialize Docker Flow: Swarm Listener")
l.Printf("ERROR: %v", err)
return
}

logPrintf("Start listening to docker service events")
events, errs := el.ListenForEvents()
for {
select {
case event := <-events:
if event.Action == "create" || event.Action == "update" {
eventServices, err := s.GetServicesFromID(event.ServiceID)
if err != nil {
metrics.RecordError("GetServicesFromID")
}
newServices, err := s.GetNewServices(eventServices)
if err != nil {
metrics.RecordError("GetNewServices")
}
err = n.ServicesCreate(
newServices,
args.Retry,
args.RetryInterval,
)
if err != nil {
metrics.RecordError("ServicesCreate")
}
l.Printf("Sending notifications for running services and nodes")
swarmListener.NotifyServices(true)
swarmListener.NotifyNodes(true)

} else if event.Action == "remove" {
err = n.ServicesRemove(&[]string{event.ServiceID}, args.Retry, args.RetryInterval)
metrics.RecordService(len(service.CachedServices))
if err != nil {
metrics.RecordError("ServicesRemove")
}
}
case <-errs:
metrics.RecordError("ListenForEvents")
// Restart listening for events
events, errs = el.ListenForEvents()
}
}
swarmListener.Run()
serve := NewServe(swarmListener, l)
l.Fatal(serve.Run())
}
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pages:
- Home: index.md
- Tutorial: tutorial.md
- Configuration: config.md
- Usage: usage.md
- About:
- Release Notes: release-notes.md
- Feedback and Contribution: feedback-and-contribution.md
Expand Down
2 changes: 1 addition & 1 deletion run-tests.sh
Original file line number Diff line number Diff line change
@@ -1 +1 @@
go test --cover ./... --run UnitTest
go test --cover ./... -p 1
36 changes: 20 additions & 16 deletions serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"encoding/json"
"log"
"net/http"

"./metrics"
Expand All @@ -14,22 +15,22 @@ var httpWriterSetContentType = func(w http.ResponseWriter, value string) {
w.Header().Set("Content-Type", value)
}

// Serve is the instance structure
type Serve struct {
Service service.Servicer
Notification service.Sender
}

//Response message
type Response struct {
Status string
}

// Serve is the instance structure
type Serve struct {
SwarmListener service.SwarmListening
Log *log.Logger
}

// NewServe returns a new instance of the `Serve`
func NewServe(service service.Servicer, notification service.Sender) *Serve {
func NewServe(swarmListener service.SwarmListening, logger *log.Logger) *Serve {
return &Serve{
Service: service,
Notification: notification,
SwarmListener: swarmListener,
Log: logger,
}
}

Expand All @@ -45,8 +46,7 @@ func (m *Serve) Run() error {

// NotifyServices notifies all configured endpoints of new, updated, or removed services
func (m *Serve) NotifyServices(w http.ResponseWriter, req *http.Request) {
services, _ := m.Service.GetServices()
go m.Notification.ServicesCreate(services, 10, 5)
m.SwarmListener.NotifyServices(false)
js, _ := json.Marshal(Response{Status: "OK"})
httpWriterSetContentType(w, "application/json")
w.WriteHeader(http.StatusOK)
Expand All @@ -55,11 +55,15 @@ func (m *Serve) NotifyServices(w http.ResponseWriter, req *http.Request) {

// GetServices retrieves all services with the `com.df.notify` label set to `true`
func (m *Serve) GetServices(w http.ResponseWriter, req *http.Request) {
services, _ := m.Service.GetServices()
parameters := m.Service.GetServicesParameters(services)
bytes, error := json.Marshal(parameters)
if error != nil {
logPrintf("ERROR: Unable to prepare response: %s", error)
parameters, err := m.SwarmListener.GetServicesParameters(req.Context())
if err != nil {
m.Log.Printf("ERROR: Unable to prepare response: %s", err)
metrics.RecordError("serveGetServices")
w.WriteHeader(http.StatusInternalServerError)
}
bytes, err := json.Marshal(parameters)
if err != nil {
m.Log.Printf("ERROR: Unable to prepare response: %s", err)
metrics.RecordError("serveGetServices")
w.WriteHeader(http.StatusInternalServerError)
} else {
Expand Down
Loading

0 comments on commit 07bcba7

Please sign in to comment.