From 9b7e720a59eea1328ee6fff3d7f6d61d78ac103a Mon Sep 17 00:00:00 2001 From: Callum Jones Date: Sun, 18 Dec 2016 19:10:02 -0800 Subject: [PATCH 1/6] WIP: Statsd --- backends.go | 6 + cloudwatch.go | 6 +- lambda.go | 13 +- main.go | 24 +- statsd.go | 46 ++ .../DataDog/datadog-go/CHANGELOG.md | 34 ++ .../github.com/DataDog/datadog-go/LICENSE.txt | 19 + .../github.com/DataDog/datadog-go/README.md | 32 + .../DataDog/datadog-go/statsd/README.md | 52 ++ .../DataDog/datadog-go/statsd/statsd.go | 577 ++++++++++++++++++ vendor/vendor.json | 14 +- 11 files changed, 819 insertions(+), 4 deletions(-) create mode 100644 backends.go create mode 100644 statsd.go create mode 100644 vendor/github.com/DataDog/datadog-go/CHANGELOG.md create mode 100644 vendor/github.com/DataDog/datadog-go/LICENSE.txt create mode 100644 vendor/github.com/DataDog/datadog-go/README.md create mode 100644 vendor/github.com/DataDog/datadog-go/statsd/README.md create mode 100644 vendor/github.com/DataDog/datadog-go/statsd/statsd.go diff --git a/backends.go b/backends.go new file mode 100644 index 00000000..0ba89c21 --- /dev/null +++ b/backends.go @@ -0,0 +1,6 @@ +package main + +// Backend is a receiver of metrics +type Backend interface { + Collect(r *result) error +} diff --git a/cloudwatch.go b/cloudwatch.go index a79b8933..cea8bc91 100644 --- a/cloudwatch.go +++ b/cloudwatch.go @@ -9,7 +9,11 @@ import ( "github.com/buildkite/buildkite-metrics/collector" ) -func cloudwatchSend(r *collector.Result) error { +// CloudWatchBackend sends metrics to AWS CloudWatch +type CloudWatchBackend struct { +} + +func (cb *CloudWatchBackend) Collect(r *collector.Result) error { svc := cloudwatch.New(session.New()) metrics := []*cloudwatch.MetricDatum{} diff --git a/lambda.go b/lambda.go index 951ab937..f6985468 100644 --- a/lambda.go +++ b/lambda.go @@ -18,6 +18,7 @@ func handle(evt json.RawMessage, ctx *runtime.Context) (interface{}, error) { org := os.Getenv("BUILDKITE_ORG") token := os.Getenv("BUILDKITE_TOKEN") + backendOpt := os.Getenv("BUILDKITE_BACKEND") config, err := buildkite.NewTokenConfig(token, false) if err != nil { @@ -32,6 +33,16 @@ func handle(evt json.RawMessage, ctx *runtime.Context) (interface{}, error) { Historical: time.Hour * 24, }) + var backend Backend + if backendOpt == "statsd" { + backend, err = NewStatsdClient(os.Getenv("STATSD_HOST")) + if err != nil { + return nil, err + } + } else { + backend = &CloudWatchBackend{} + } + res, err := col.Collect() if err != nil { return nil, err @@ -39,7 +50,7 @@ func handle(evt json.RawMessage, ctx *runtime.Context) (interface{}, error) { res.Dump() - err = cloudwatchSend(res) + err = backend.Collect(res) if err != nil { return nil, err } diff --git a/main.go b/main.go index b114f901..dc822ada 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "log" "os" + "strings" "time" "github.com/buildkite/buildkite-metrics/collector" @@ -15,6 +16,8 @@ import ( // Version is passed in via ldflags var Version string +var backend Backend + func main() { var ( accessToken = flag.String("token", "", "A Buildkite API Access Token") @@ -26,6 +29,10 @@ func main() { quiet = flag.Bool("quiet", false, "Only print errors") dryRun = flag.Bool("dry-run", false, "Whether to only print metrics") + // backend config + backendOpt = flag.String("backend", "cloudwatch", "Specify the backend to send metrics to. cloudwatch, statsd") + statsdHost = flag.String("statsd-host", "127.0.0.1:8125", "Specify the Statsd server") + // filters queue = flag.String("queue", "", "Only include a specific queue") ) @@ -47,6 +54,21 @@ func main() { os.Exit(1) } + lowerBackendOpt := strings.ToLower(*backendOpt) + if lowerBackendOpt == "cloudwatch" { + backend = &CloudWatchBackend{} + } else if lowerBackendOpt == "statsd" { + var err error + backend, err = NewStatsdClient(*statsdHost) + if err != nil { + fmt.Printf("Error starting Statsd, err: %v\n", err) + os.Exit(1) + } + } else { + fmt.Println("Must provide a supported backend: cloudwatch, statsd") + os.Exit(1) + } + if *quiet { log.SetOutput(ioutil.Discard) } @@ -82,7 +104,7 @@ func main() { } if !*dryRun { - err = cloudwatchSend(res) + err = backend.Collect(res) if err != nil { return err } diff --git a/statsd.go b/statsd.go new file mode 100644 index 00000000..050827c8 --- /dev/null +++ b/statsd.go @@ -0,0 +1,46 @@ +package main + +import "github.com/DataDog/datadog-go/statsd" + +// Statsd sends metrics to Statsd (Datadog spec) +type Statsd struct { + client *statsd.Client +} + +func NewStatsdClient(host string) (*Statsd, error) { + c, err := statsd.NewBuffered(host, 100) + if err != nil { + return nil, err + } + // prefix every metric with the app name + c.Namespace = "buildkite." + return &Statsd{ + client: c, + }, nil +} + +func (cb *Statsd) Collect(r *result) error { + for name, value := range r.totals { + if err := cb.client.Gauge(name, float64(value), []string{}, 1.0); err != nil { + return err + } + } + + for queue, counts := range r.queues { + for name, value := range counts { + if err := cb.client.Gauge("queues."+name, float64(value), []string{"queue:" + queue}, 1.0); err != nil { + return err + } + } + } + + for pipeline, counts := range r.pipelines { + for name, value := range counts { + if err := cb.client.Gauge("pipelines."+name, float64(value), []string{"pipeline:" + pipeline}, 1.0); err != nil { + return err + } + } + } + + return nil +} diff --git a/vendor/github.com/DataDog/datadog-go/CHANGELOG.md b/vendor/github.com/DataDog/datadog-go/CHANGELOG.md new file mode 100644 index 00000000..5853c6ed --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/CHANGELOG.md @@ -0,0 +1,34 @@ +Changes +======= + +# 1.0.0 / 2016-08-22 + +### Details +We hadn't been properly versioning this project. We will begin to do so with this +`1.0.0` release. We had some contributions in the past and would like to thank the +contributors [@aviau][], [@sschepens][], [@jovanbrakus][], [@abtris][], [@tummychow][], [@gphat][], [@diasjorge][], +[@victortrac][], [@seiffert][] and [@w-vi][], in no particular order, for their work. + +Below, for reference, the latest improvements made in 07/2016 - 08/2016 + +### Notes + +* [FEATURE] Implemented support for service checks. See [#17][] and [#5][]. (Thanks [@jovanbrakus][] and [@diasjorge][]). +* [FEATURE] Add Incr, Decr, Timing and more docs.. See [#15][]. (Thanks [@gphat][]) +* [BUGFIX] Do not append to shared slice. See [#16][]. (Thanks [@tummychow][]) + + +[#5]: https://github.com/DataDog/datadog-go/issues/5 +[#16]: https://github.com/DataDog/datadog-go/issues/16 +[#17]: https://github.com/DataDog/datadog-go/issues/17 +[#15]: https://github.com/DataDog/datadog-go/issues/15 +[@abtris]: https://github.com/abtris +[@aviau]: https://github.com/aviau +[@diasjorge]: https://github.com/diasjorge +[@gphat]: https://github.com/gphat +[@jovanbrakus]: https://github.com/jovanbrakus +[@seiffert]: https://github.com/seiffert +[@sschepens]: https://github.com/sschepens +[@tummychow]: https://github.com/tummychow +[@victortrac]: https://github.com/victortrac +[@w-vi]: https://github.com/w-vi diff --git a/vendor/github.com/DataDog/datadog-go/LICENSE.txt b/vendor/github.com/DataDog/datadog-go/LICENSE.txt new file mode 100644 index 00000000..97cd06d7 --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/LICENSE.txt @@ -0,0 +1,19 @@ +Copyright (c) 2015 Datadog, Inc + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/DataDog/datadog-go/README.md b/vendor/github.com/DataDog/datadog-go/README.md new file mode 100644 index 00000000..dce6271d --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/README.md @@ -0,0 +1,32 @@ +# Overview + +Packages in `datadog-go` provide Go clients for various APIs at [DataDog](http://datadoghq.com). + +## Statsd + +[![Godoc](http://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/DataDog/datadog-go/statsd) +[![license](http://img.shields.io/badge/license-MIT-red.svg?style=flat)](http://opensource.org/licenses/MIT) + +The [statsd](https://github.com/DataDog/datadog-go/tree/master/statsd) package provides a client for +[dogstatsd](http://docs.datadoghq.com/guides/dogstatsd/): + +```go +import "github.com/DataDog/datadog-go/statsd" + +func main() { + c, err := statsd.New("127.0.0.1:8125") + if err != nil { + log.Fatal(err) + } + // prefix every metric with the app name + c.Namespace = "flubber." + // send the EC2 availability zone as a tag with every metric + c.Tags = append(c.Tags, "us-east-1a") + err = c.Gauge("request.duration", 1.2, nil, 1) + // ... +} +``` + +## License + +All code distributed under the [MIT License](http://opensource.org/licenses/MIT) unless otherwise specified. diff --git a/vendor/github.com/DataDog/datadog-go/statsd/README.md b/vendor/github.com/DataDog/datadog-go/statsd/README.md new file mode 100644 index 00000000..2e897776 --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/statsd/README.md @@ -0,0 +1,52 @@ +## Overview + +Package `statsd` provides a Go [dogstatsd](http://docs.datadoghq.com/guides/dogstatsd/) client. Dogstatsd extends Statsd, adding tags +and histograms. + +## Get the code + + $ go get github.com/DataDog/datadog-go/statsd + +## Usage + +```go +// Create the client +c, err := statsd.New("127.0.0.1:8125") +if err != nil { + log.Fatal(err) +} +// Prefix every metric with the app name +c.Namespace = "flubber." +// Send the EC2 availability zone as a tag with every metric +c.Tags = append(c.Tags, "us-east-1a") + +// Do some metrics! +err = c.Gauge("request.queue_depth", 12, nil, 1) +err = c.Timing("request.duration", duration, nil, 1) // Uses a time.Duration! +err = c.TimeInMilliseconds("request", 12, nil, 1) +err = c.Incr("request.count_total", nil, 1) +err = c.Decr("request.count_total", nil, 1) +err = c.Count("request.count_total", 2, nil, 1) +``` + +## Buffering Client + +DogStatsD accepts packets with multiple statsd payloads in them. Using the BufferingClient via `NewBufferingClient` will buffer up commands and send them when the buffer is reached or after 100msec. + +## Development + +Run the tests with: + + $ go test + +## Documentation + +Please see: http://godoc.org/github.com/DataDog/datadog-go/statsd + +## License + +go-dogstatsd is released under the [MIT license](http://www.opensource.org/licenses/mit-license.php). + +## Credits + +Original code by [ooyala](https://github.com/ooyala/go-dogstatsd). diff --git a/vendor/github.com/DataDog/datadog-go/statsd/statsd.go b/vendor/github.com/DataDog/datadog-go/statsd/statsd.go new file mode 100644 index 00000000..958aa444 --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/statsd/statsd.go @@ -0,0 +1,577 @@ +// Copyright 2013 Ooyala, Inc. + +/* +Package statsd provides a Go dogstatsd client. Dogstatsd extends the popular statsd, +adding tags and histograms and pushing upstream to Datadog. + +Refer to http://docs.datadoghq.com/guides/dogstatsd/ for information about DogStatsD. + +Example Usage: + + // Create the client + c, err := statsd.New("127.0.0.1:8125") + if err != nil { + log.Fatal(err) + } + // Prefix every metric with the app name + c.Namespace = "flubber." + // Send the EC2 availability zone as a tag with every metric + c.Tags = append(c.Tags, "us-east-1a") + err = c.Gauge("request.duration", 1.2, nil, 1) + +statsd is based on go-statsd-client. +*/ +package statsd + +import ( + "bytes" + "errors" + "fmt" + "io" + "math/rand" + "net" + "strconv" + "strings" + "sync" + "time" +) + +/* +OptimalPayloadSize defines the optimal payload size for a UDP datagram, 1432 bytes +is optimal for regular networks with an MTU of 1500 so datagrams don't get +fragmented. It's generally recommended not to fragment UDP datagrams as losing +a single fragment will cause the entire datagram to be lost. + +This can be increased if your network has a greater MTU or you don't mind UDP +datagrams getting fragmented. The practical limit is MaxUDPPayloadSize +*/ +const OptimalPayloadSize = 1432 + +/* +MaxUDPPayloadSize defines the maximum payload size for a UDP datagram. +Its value comes from the calculation: 65535 bytes Max UDP datagram size - +8byte UDP header - 60byte max IP headers +any number greater than that will see frames being cut out. +*/ +const MaxUDPPayloadSize = 65467 + +// A Client is a handle for sending udp messages to dogstatsd. It is safe to +// use one Client from multiple goroutines simultaneously. +type Client struct { + conn net.Conn + // Namespace to prepend to all statsd calls + Namespace string + // Tags are global tags to be added to every statsd call + Tags []string + // BufferLength is the length of the buffer in commands. + bufferLength int + flushTime time.Duration + commands []string + buffer bytes.Buffer + stop bool + sync.Mutex +} + +// New returns a pointer to a new Client given an addr in the format "hostname:port". +func New(addr string) (*Client, error) { + udpAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return nil, err + } + conn, err := net.DialUDP("udp", nil, udpAddr) + if err != nil { + return nil, err + } + client := &Client{conn: conn} + return client, nil +} + +// NewBuffered returns a Client that buffers its output and sends it in chunks. +// Buflen is the length of the buffer in number of commands. +func NewBuffered(addr string, buflen int) (*Client, error) { + client, err := New(addr) + if err != nil { + return nil, err + } + client.bufferLength = buflen + client.commands = make([]string, 0, buflen) + client.flushTime = time.Millisecond * 100 + go client.watch() + return client, nil +} + +// format a message from its name, value, tags and rate. Also adds global +// namespace and tags. +func (c *Client) format(name, value string, tags []string, rate float64) string { + var buf bytes.Buffer + if c.Namespace != "" { + buf.WriteString(c.Namespace) + } + buf.WriteString(name) + buf.WriteString(":") + buf.WriteString(value) + if rate < 1 { + buf.WriteString(`|@`) + buf.WriteString(strconv.FormatFloat(rate, 'f', -1, 64)) + } + + writeTagString(&buf, c.Tags, tags) + + return buf.String() +} + +func (c *Client) watch() { + for _ = range time.Tick(c.flushTime) { + if c.stop { + return + } + c.Lock() + if len(c.commands) > 0 { + // FIXME: eating error here + c.flush() + } + c.Unlock() + } +} + +func (c *Client) append(cmd string) error { + c.Lock() + defer c.Unlock() + c.commands = append(c.commands, cmd) + // if we should flush, lets do it + if len(c.commands) == c.bufferLength { + if err := c.flush(); err != nil { + return err + } + } + return nil +} + +func (c *Client) joinMaxSize(cmds []string, sep string, maxSize int) ([][]byte, []int) { + c.buffer.Reset() //clear buffer + + var frames [][]byte + var ncmds []int + sepBytes := []byte(sep) + sepLen := len(sep) + + elem := 0 + for _, cmd := range cmds { + needed := len(cmd) + + if elem != 0 { + needed = needed + sepLen + } + + if c.buffer.Len()+needed <= maxSize { + if elem != 0 { + c.buffer.Write(sepBytes) + } + c.buffer.WriteString(cmd) + elem++ + } else { + frames = append(frames, copyAndResetBuffer(&c.buffer)) + ncmds = append(ncmds, elem) + // if cmd is bigger than maxSize it will get flushed on next loop + c.buffer.WriteString(cmd) + elem = 1 + } + } + + //add whatever is left! if there's actually something + if c.buffer.Len() > 0 { + frames = append(frames, copyAndResetBuffer(&c.buffer)) + ncmds = append(ncmds, elem) + } + + return frames, ncmds +} + +func copyAndResetBuffer(buf *bytes.Buffer) []byte { + tmpBuf := make([]byte, buf.Len()) + copy(tmpBuf, buf.Bytes()) + buf.Reset() + return tmpBuf +} + +// flush the commands in the buffer. Lock must be held by caller. +func (c *Client) flush() error { + frames, flushable := c.joinMaxSize(c.commands, "\n", OptimalPayloadSize) + var err error + cmdsFlushed := 0 + for i, data := range frames { + _, e := c.conn.Write(data) + if e != nil { + err = e + break + } + cmdsFlushed += flushable[i] + } + + // clear the slice with a slice op, doesn't realloc + if cmdsFlushed == len(c.commands) { + c.commands = c.commands[:0] + } else { + //this case will cause a future realloc... + // drop problematic command though (sorry). + c.commands = c.commands[cmdsFlushed+1:] + } + return err +} + +func (c *Client) sendMsg(msg string) error { + // return an error if message is bigger than MaxUDPPayloadSize + if len(msg) > MaxUDPPayloadSize { + return errors.New("message size exceeds MaxUDPPayloadSize") + } + + // if this client is buffered, then we'll just append this + if c.bufferLength > 0 { + return c.append(msg) + } + + _, err := c.conn.Write([]byte(msg)) + return err +} + +// send handles sampling and sends the message over UDP. It also adds global namespace prefixes and tags. +func (c *Client) send(name, value string, tags []string, rate float64) error { + if c == nil { + return nil + } + if rate < 1 && rand.Float64() > rate { + return nil + } + data := c.format(name, value, tags, rate) + return c.sendMsg(data) +} + +// Gauge measures the value of a metric at a particular time. +func (c *Client) Gauge(name string, value float64, tags []string, rate float64) error { + stat := fmt.Sprintf("%f|g", value) + return c.send(name, stat, tags, rate) +} + +// Count tracks how many times something happened per second. +func (c *Client) Count(name string, value int64, tags []string, rate float64) error { + stat := fmt.Sprintf("%d|c", value) + return c.send(name, stat, tags, rate) +} + +// Histogram tracks the statistical distribution of a set of values. +func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error { + stat := fmt.Sprintf("%f|h", value) + return c.send(name, stat, tags, rate) +} + +// Decr is just Count of 1 +func (c *Client) Decr(name string, tags []string, rate float64) error { + return c.send(name, "-1|c", tags, rate) +} + +// Incr is just Count of 1 +func (c *Client) Incr(name string, tags []string, rate float64) error { + return c.send(name, "1|c", tags, rate) +} + +// Set counts the number of unique elements in a group. +func (c *Client) Set(name string, value string, tags []string, rate float64) error { + stat := fmt.Sprintf("%s|s", value) + return c.send(name, stat, tags, rate) +} + +// Timing sends timing information, it is an alias for TimeInMilliseconds +func (c *Client) Timing(name string, value time.Duration, tags []string, rate float64) error { + return c.TimeInMilliseconds(name, value.Seconds()*1000, tags, rate) +} + +// TimeInMilliseconds sends timing information in milliseconds. +// It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing) +func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, rate float64) error { + stat := fmt.Sprintf("%f|ms", value) + return c.send(name, stat, tags, rate) +} + +// Event sends the provided Event. +func (c *Client) Event(e *Event) error { + stat, err := e.Encode(c.Tags...) + if err != nil { + return err + } + return c.sendMsg(stat) +} + +// SimpleEvent sends an event with the provided title and text. +func (c *Client) SimpleEvent(title, text string) error { + e := NewEvent(title, text) + return c.Event(e) +} + +// ServiceCheck sends the provided ServiceCheck. +func (c *Client) ServiceCheck(sc *ServiceCheck) error { + stat, err := sc.Encode(c.Tags...) + if err != nil { + return err + } + return c.sendMsg(stat) +} + +// SimpleServiceCheck sends an serviceCheck with the provided name and status. +func (c *Client) SimpleServiceCheck(name string, status ServiceCheckStatus) error { + sc := NewServiceCheck(name, status) + return c.ServiceCheck(sc) +} + +// Close the client connection. +func (c *Client) Close() error { + if c == nil { + return nil + } + c.stop = true + return c.conn.Close() +} + +// Events support + +type eventAlertType string + +const ( + // Info is the "info" AlertType for events + Info eventAlertType = "info" + // Error is the "error" AlertType for events + Error eventAlertType = "error" + // Warning is the "warning" AlertType for events + Warning eventAlertType = "warning" + // Success is the "success" AlertType for events + Success eventAlertType = "success" +) + +type eventPriority string + +const ( + // Normal is the "normal" Priority for events + Normal eventPriority = "normal" + // Low is the "low" Priority for events + Low eventPriority = "low" +) + +// An Event is an object that can be posted to your DataDog event stream. +type Event struct { + // Title of the event. Required. + Title string + // Text is the description of the event. Required. + Text string + // Timestamp is a timestamp for the event. If not provided, the dogstatsd + // server will set this to the current time. + Timestamp time.Time + // Hostname for the event. + Hostname string + // AggregationKey groups this event with others of the same key. + AggregationKey string + // Priority of the event. Can be statsd.Low or statsd.Normal. + Priority eventPriority + // SourceTypeName is a source type for the event. + SourceTypeName string + // AlertType can be statsd.Info, statsd.Error, statsd.Warning, or statsd.Success. + // If absent, the default value applied by the dogstatsd server is Info. + AlertType eventAlertType + // Tags for the event. + Tags []string +} + +// NewEvent creates a new event with the given title and text. Error checking +// against these values is done at send-time, or upon running e.Check. +func NewEvent(title, text string) *Event { + return &Event{ + Title: title, + Text: text, + } +} + +// Check verifies that an event is valid. +func (e Event) Check() error { + if len(e.Title) == 0 { + return fmt.Errorf("statsd.Event title is required") + } + if len(e.Text) == 0 { + return fmt.Errorf("statsd.Event text is required") + } + return nil +} + +// Encode returns the dogstatsd wire protocol representation for an event. +// Tags may be passed which will be added to the encoded output but not to +// the Event's list of tags, eg. for default tags. +func (e Event) Encode(tags ...string) (string, error) { + err := e.Check() + if err != nil { + return "", err + } + text := e.escapedText() + + var buffer bytes.Buffer + buffer.WriteString("_e{") + buffer.WriteString(strconv.FormatInt(int64(len(e.Title)), 10)) + buffer.WriteRune(',') + buffer.WriteString(strconv.FormatInt(int64(len(text)), 10)) + buffer.WriteString("}:") + buffer.WriteString(e.Title) + buffer.WriteRune('|') + buffer.WriteString(text) + + if !e.Timestamp.IsZero() { + buffer.WriteString("|d:") + buffer.WriteString(strconv.FormatInt(int64(e.Timestamp.Unix()), 10)) + } + + if len(e.Hostname) != 0 { + buffer.WriteString("|h:") + buffer.WriteString(e.Hostname) + } + + if len(e.AggregationKey) != 0 { + buffer.WriteString("|k:") + buffer.WriteString(e.AggregationKey) + + } + + if len(e.Priority) != 0 { + buffer.WriteString("|p:") + buffer.WriteString(string(e.Priority)) + } + + if len(e.SourceTypeName) != 0 { + buffer.WriteString("|s:") + buffer.WriteString(e.SourceTypeName) + } + + if len(e.AlertType) != 0 { + buffer.WriteString("|t:") + buffer.WriteString(string(e.AlertType)) + } + + writeTagString(&buffer, tags, e.Tags) + + return buffer.String(), nil +} + +// ServiceCheck support + +type ServiceCheckStatus byte + +const ( + // Ok is the "ok" ServiceCheck status + Ok ServiceCheckStatus = 0 + // Warn is the "warning" ServiceCheck status + Warn ServiceCheckStatus = 1 + // Critical is the "critical" ServiceCheck status + Critical ServiceCheckStatus = 2 + // Unknown is the "unknown" ServiceCheck status + Unknown ServiceCheckStatus = 3 +) + +// An ServiceCheck is an object that contains status of DataDog service check. +type ServiceCheck struct { + // Name of the service check. Required. + Name string + // Status of service check. Required. + Status ServiceCheckStatus + // Timestamp is a timestamp for the serviceCheck. If not provided, the dogstatsd + // server will set this to the current time. + Timestamp time.Time + // Hostname for the serviceCheck. + Hostname string + // A message describing the current state of the serviceCheck. + Message string + // Tags for the serviceCheck. + Tags []string +} + +// NewServiceCheck creates a new serviceCheck with the given name and status. Error checking +// against these values is done at send-time, or upon running sc.Check. +func NewServiceCheck(name string, status ServiceCheckStatus) *ServiceCheck { + return &ServiceCheck{ + Name: name, + Status: status, + } +} + +// Check verifies that an event is valid. +func (sc ServiceCheck) Check() error { + if len(sc.Name) == 0 { + return fmt.Errorf("statsd.ServiceCheck name is required") + } + if byte(sc.Status) < 0 || byte(sc.Status) > 3 { + return fmt.Errorf("statsd.ServiceCheck status has invalid value") + } + return nil +} + +// Encode returns the dogstatsd wire protocol representation for an serviceCheck. +// Tags may be passed which will be added to the encoded output but not to +// the Event's list of tags, eg. for default tags. +func (sc ServiceCheck) Encode(tags ...string) (string, error) { + err := sc.Check() + if err != nil { + return "", err + } + message := sc.escapedMessage() + + var buffer bytes.Buffer + buffer.WriteString("_sc|") + buffer.WriteString(sc.Name) + buffer.WriteRune('|') + buffer.WriteString(strconv.FormatInt(int64(sc.Status), 10)) + + if !sc.Timestamp.IsZero() { + buffer.WriteString("|d:") + buffer.WriteString(strconv.FormatInt(int64(sc.Timestamp.Unix()), 10)) + } + + if len(sc.Hostname) != 0 { + buffer.WriteString("|h:") + buffer.WriteString(sc.Hostname) + } + + writeTagString(&buffer, tags, sc.Tags) + + if len(message) != 0 { + buffer.WriteString("|m:") + buffer.WriteString(message) + } + + return buffer.String(), nil +} + +func (e Event) escapedText() string { + return strings.Replace(e.Text, "\n", "\\n", -1) +} + +func (sc ServiceCheck) escapedMessage() string { + msg := strings.Replace(sc.Message, "\n", "\\n", -1) + return strings.Replace(msg, "m:", `m\:`, -1) +} + +func removeNewlines(str string) string { + return strings.Replace(str, "\n", "", -1) +} + +func writeTagString(w io.Writer, tagList1, tagList2 []string) { + // the tag lists may be shared with other callers, so we cannot modify + // them in any way (which means we cannot append to them either) + // therefore we must make an entirely separate copy just for this call + totalLen := len(tagList1) + len(tagList2) + if totalLen == 0 { + return + } + tags := make([]string, 0, totalLen) + tags = append(tags, tagList1...) + tags = append(tags, tagList2...) + + io.WriteString(w, "|#") + io.WriteString(w, removeNewlines(tags[0])) + for _, tag := range tags[1:] { + io.WriteString(w, ",") + io.WriteString(w, removeNewlines(tag)) + } +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 1e62c2bb..cc6309be 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -2,6 +2,18 @@ "comment": "", "ignore": "test", "package": [ + { + "checksumSHA1": "f2M9re5MrJn/Mr7nadVlM/a0OP4=", + "path": "github.com/DataDog/datadog-go", + "revision": "6ea09a7540648568ce58b3d00eb1da133c2dcdd7", + "revisionTime": "2016-12-13T18:18:37Z" + }, + { + "checksumSHA1": "ckEen3vhU+2+XyPlvdop+BLNzQk=", + "path": "github.com/DataDog/datadog-go/statsd", + "revision": "6ea09a7540648568ce58b3d00eb1da133c2dcdd7", + "revisionTime": "2016-12-13T18:18:37Z" + }, { "checksumSHA1": "/j5eaUxSjyT802BWlCZ8eWdxJZQ=", "path": "github.com/eawsy/aws-lambda-go/service/lambda/runtime", @@ -9,5 +21,5 @@ "revisionTime": "2016-11-16T16:19:46Z" } ], - "rootPath": "github.com/buildkite/buildkite-metrics" + "rootPath": "github.com/callumj/buildkite-metrics" } From 932245fdec06d57df7a6fda646969b206d7ec67a Mon Sep 17 00:00:00 2001 From: Callum Jones Date: Sun, 18 Dec 2016 19:18:41 -0800 Subject: [PATCH 2/6] Fixes --- backends.go | 4 +++- statsd.go | 13 ++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/backends.go b/backends.go index 0ba89c21..247df93a 100644 --- a/backends.go +++ b/backends.go @@ -1,6 +1,8 @@ package main +import "github.com/buildkite/buildkite-metrics/collector" + // Backend is a receiver of metrics type Backend interface { - Collect(r *result) error + Collect(r *collector.Result) error } diff --git a/statsd.go b/statsd.go index 050827c8..867f32cf 100644 --- a/statsd.go +++ b/statsd.go @@ -1,6 +1,9 @@ package main -import "github.com/DataDog/datadog-go/statsd" +import ( + "github.com/DataDog/datadog-go/statsd" + "github.com/buildkite/buildkite-metrics/collector" +) // Statsd sends metrics to Statsd (Datadog spec) type Statsd struct { @@ -19,14 +22,14 @@ func NewStatsdClient(host string) (*Statsd, error) { }, nil } -func (cb *Statsd) Collect(r *result) error { - for name, value := range r.totals { +func (cb *Statsd) Collect(r *collector.Result) error { + for name, value := range r.Totals { if err := cb.client.Gauge(name, float64(value), []string{}, 1.0); err != nil { return err } } - for queue, counts := range r.queues { + for queue, counts := range r.Queues { for name, value := range counts { if err := cb.client.Gauge("queues."+name, float64(value), []string{"queue:" + queue}, 1.0); err != nil { return err @@ -34,7 +37,7 @@ func (cb *Statsd) Collect(r *result) error { } } - for pipeline, counts := range r.pipelines { + for pipeline, counts := range r.Pipelines { for name, value := range counts { if err := cb.client.Gauge("pipelines."+name, float64(value), []string{"pipeline:" + pipeline}, 1.0); err != nil { return err From 008926c4fe1f663ddf711cff8f5db8bd93b95512 Mon Sep 17 00:00:00 2001 From: Callum Jones Date: Sun, 18 Dec 2016 19:43:39 -0800 Subject: [PATCH 3/6] Fix name --- lambda.go | 2 +- main.go | 8 ++++---- statsd.go | 10 +++++----- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lambda.go b/lambda.go index f6985468..883586cb 100644 --- a/lambda.go +++ b/lambda.go @@ -35,7 +35,7 @@ func handle(evt json.RawMessage, ctx *runtime.Context) (interface{}, error) { var backend Backend if backendOpt == "statsd" { - backend, err = NewStatsdClient(os.Getenv("STATSD_HOST")) + backend, err = NewStatsDClient(os.Getenv("STATSD_HOST")) if err != nil { return nil, err } diff --git a/main.go b/main.go index dc822ada..4abcf71f 100644 --- a/main.go +++ b/main.go @@ -30,8 +30,8 @@ func main() { dryRun = flag.Bool("dry-run", false, "Whether to only print metrics") // backend config - backendOpt = flag.String("backend", "cloudwatch", "Specify the backend to send metrics to. cloudwatch, statsd") - statsdHost = flag.String("statsd-host", "127.0.0.1:8125", "Specify the Statsd server") + backendOpt = flag.String("backend", "cloudwatch", "Specify the backend to send metrics to: cloudwatch, statsd") + statsdHost = flag.String("statsd-host", "127.0.0.1:8125", "Specify the StatsD server") // filters queue = flag.String("queue", "", "Only include a specific queue") @@ -59,9 +59,9 @@ func main() { backend = &CloudWatchBackend{} } else if lowerBackendOpt == "statsd" { var err error - backend, err = NewStatsdClient(*statsdHost) + backend, err = NewStatsDClient(*statsdHost) if err != nil { - fmt.Printf("Error starting Statsd, err: %v\n", err) + fmt.Printf("Error starting StatsD, err: %v\n", err) os.Exit(1) } } else { diff --git a/statsd.go b/statsd.go index 867f32cf..a6c2a678 100644 --- a/statsd.go +++ b/statsd.go @@ -5,24 +5,24 @@ import ( "github.com/buildkite/buildkite-metrics/collector" ) -// Statsd sends metrics to Statsd (Datadog spec) -type Statsd struct { +// StatsD sends metrics to StatsD (Datadog spec) +type StatsD struct { client *statsd.Client } -func NewStatsdClient(host string) (*Statsd, error) { +func NewStatsDClient(host string) (*StatsD, error) { c, err := statsd.NewBuffered(host, 100) if err != nil { return nil, err } // prefix every metric with the app name c.Namespace = "buildkite." - return &Statsd{ + return &StatsD{ client: c, }, nil } -func (cb *Statsd) Collect(r *collector.Result) error { +func (cb *StatsD) Collect(r *collector.Result) error { for name, value := range r.Totals { if err := cb.client.Gauge(name, float64(value), []string{}, 1.0); err != nil { return err From a9f6fff686bfc0e259dc11e3ba9e79dabb733ba1 Mon Sep 17 00:00:00 2001 From: Callum Jones Date: Sun, 18 Dec 2016 19:48:51 -0800 Subject: [PATCH 4/6] Re-org --- backends.go => backend/backends.go | 2 +- cloudwatch.go => backend/cloudwatch.go | 6 +++++- statsd.go => backend/statsd.go | 4 ++-- collector/collector.go | 1 + lambda.go | 9 +++++---- main.go | 9 +++++---- 6 files changed, 19 insertions(+), 12 deletions(-) rename backends.go => backend/backends.go (90%) rename cloudwatch.go => backend/cloudwatch.go (95%) rename statsd.go => backend/statsd.go (93%) diff --git a/backends.go b/backend/backends.go similarity index 90% rename from backends.go rename to backend/backends.go index 247df93a..d285140d 100644 --- a/backends.go +++ b/backend/backends.go @@ -1,4 +1,4 @@ -package main +package backend import "github.com/buildkite/buildkite-metrics/collector" diff --git a/cloudwatch.go b/backend/cloudwatch.go similarity index 95% rename from cloudwatch.go rename to backend/cloudwatch.go index cea8bc91..384b4fe0 100644 --- a/cloudwatch.go +++ b/backend/cloudwatch.go @@ -1,4 +1,4 @@ -package main +package backend import ( "log" @@ -13,6 +13,10 @@ import ( type CloudWatchBackend struct { } +func NewCloudWatchBackend() *CloudWatchBackend { + return &CloudWatchBackend{} +} + func (cb *CloudWatchBackend) Collect(r *collector.Result) error { svc := cloudwatch.New(session.New()) diff --git a/statsd.go b/backend/statsd.go similarity index 93% rename from statsd.go rename to backend/statsd.go index a6c2a678..c3084c01 100644 --- a/statsd.go +++ b/backend/statsd.go @@ -1,4 +1,4 @@ -package main +package backend import ( "github.com/DataDog/datadog-go/statsd" @@ -10,7 +10,7 @@ type StatsD struct { client *statsd.Client } -func NewStatsDClient(host string) (*StatsD, error) { +func NewStatsDBackend(host string) (*StatsD, error) { c, err := statsd.NewBuffered(host, 100) if err != nil { return nil, err diff --git a/collector/collector.go b/collector/collector.go index e748a393..cf256b6d 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -43,6 +43,7 @@ func New(c *bk.Client, opts Opts) *Collector { return &Collector{ Opts: opts, buildService: c.Builds, + agentService: c.Agents, } } diff --git a/lambda.go b/lambda.go index 883586cb..fd1609cf 100644 --- a/lambda.go +++ b/lambda.go @@ -7,6 +7,7 @@ import ( "os" "time" + "github.com/buildkite/buildkite-metrics/backend" "github.com/buildkite/buildkite-metrics/collector" "github.com/eawsy/aws-lambda-go/service/lambda/runtime" "gopkg.in/buildkite/go-buildkite.v2/buildkite" @@ -33,14 +34,14 @@ func handle(evt json.RawMessage, ctx *runtime.Context) (interface{}, error) { Historical: time.Hour * 24, }) - var backend Backend + var bk backend.Backend if backendOpt == "statsd" { - backend, err = NewStatsDClient(os.Getenv("STATSD_HOST")) + bk, err = backend.NewStatsDBackend(os.Getenv("STATSD_HOST")) if err != nil { return nil, err } } else { - backend = &CloudWatchBackend{} + bk = &backend.CloudWatchBackend{} } res, err := col.Collect() @@ -50,7 +51,7 @@ func handle(evt json.RawMessage, ctx *runtime.Context) (interface{}, error) { res.Dump() - err = backend.Collect(res) + err = bk.Collect(res) if err != nil { return nil, err } diff --git a/main.go b/main.go index 4abcf71f..e827af35 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/buildkite/buildkite-metrics/backend" "github.com/buildkite/buildkite-metrics/collector" "gopkg.in/buildkite/go-buildkite.v2/buildkite" ) @@ -16,7 +17,7 @@ import ( // Version is passed in via ldflags var Version string -var backend Backend +var bk backend.Backend func main() { var ( @@ -56,10 +57,10 @@ func main() { lowerBackendOpt := strings.ToLower(*backendOpt) if lowerBackendOpt == "cloudwatch" { - backend = &CloudWatchBackend{} + bk = backend.NewCloudWatchBackend() } else if lowerBackendOpt == "statsd" { var err error - backend, err = NewStatsDClient(*statsdHost) + bk, err = backend.NewStatsDBackend(*statsdHost) if err != nil { fmt.Printf("Error starting StatsD, err: %v\n", err) os.Exit(1) @@ -104,7 +105,7 @@ func main() { } if !*dryRun { - err = backend.Collect(res) + err = bk.Collect(res) if err != nil { return err } From 34e55e60973e2b32659ada7db9db5bd6be80b6e7 Mon Sep 17 00:00:00 2001 From: Callum Jones Date: Sun, 18 Dec 2016 19:56:34 -0800 Subject: [PATCH 5/6] Optional tagging --- backend/statsd.go | 28 +++++++++++++++++++++++----- lambda.go | 3 ++- main.go | 3 ++- 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/backend/statsd.go b/backend/statsd.go index c3084c01..29f48246 100644 --- a/backend/statsd.go +++ b/backend/statsd.go @@ -7,10 +7,11 @@ import ( // StatsD sends metrics to StatsD (Datadog spec) type StatsD struct { - client *statsd.Client + client *statsd.Client + tagsSupported bool } -func NewStatsDBackend(host string) (*StatsD, error) { +func NewStatsDBackend(host string, tagsSupported bool) (*StatsD, error) { c, err := statsd.NewBuffered(host, 100) if err != nil { return nil, err @@ -18,7 +19,8 @@ func NewStatsDBackend(host string) (*StatsD, error) { // prefix every metric with the app name c.Namespace = "buildkite." return &StatsD{ - client: c, + client: c, + tagsSupported: tagsSupported, }, nil } @@ -31,7 +33,15 @@ func (cb *StatsD) Collect(r *collector.Result) error { for queue, counts := range r.Queues { for name, value := range counts { - if err := cb.client.Gauge("queues."+name, float64(value), []string{"queue:" + queue}, 1.0); err != nil { + var finalName string + tags := []string{} + if cb.tagsSupported { + finalName = "queues." + name + tags = []string{"queue:" + queue} + } else { + finalName = "queues." + queue + "." + name + } + if err := cb.client.Gauge(finalName, float64(value), tags, 1.0); err != nil { return err } } @@ -39,7 +49,15 @@ func (cb *StatsD) Collect(r *collector.Result) error { for pipeline, counts := range r.Pipelines { for name, value := range counts { - if err := cb.client.Gauge("pipelines."+name, float64(value), []string{"pipeline:" + pipeline}, 1.0); err != nil { + var finalName string + tags := []string{} + if cb.tagsSupported { + finalName = "pipeline." + name + tags = []string{"pipeline:" + pipeline} + } else { + finalName = "pipeline." + pipeline + "." + name + } + if err := cb.client.Gauge(finalName, float64(value), tags, 1.0); err != nil { return err } } diff --git a/lambda.go b/lambda.go index fd1609cf..f428058e 100644 --- a/lambda.go +++ b/lambda.go @@ -5,6 +5,7 @@ import ( "encoding/json" "log" "os" + "strings" "time" "github.com/buildkite/buildkite-metrics/backend" @@ -36,7 +37,7 @@ func handle(evt json.RawMessage, ctx *runtime.Context) (interface{}, error) { var bk backend.Backend if backendOpt == "statsd" { - bk, err = backend.NewStatsDBackend(os.Getenv("STATSD_HOST")) + bk, err = backend.NewStatsDBackend(os.Getenv("STATSD_HOST"), strings.ToLower(os.Getenv("STATSD_TAGS")) == "true") if err != nil { return nil, err } diff --git a/main.go b/main.go index e827af35..6d65b91c 100644 --- a/main.go +++ b/main.go @@ -33,6 +33,7 @@ func main() { // backend config backendOpt = flag.String("backend", "cloudwatch", "Specify the backend to send metrics to: cloudwatch, statsd") statsdHost = flag.String("statsd-host", "127.0.0.1:8125", "Specify the StatsD server") + statsdTags = flag.Bool("statsd-tags", false, "Whether your StatsD server supports tagging like Datadog") // filters queue = flag.String("queue", "", "Only include a specific queue") @@ -60,7 +61,7 @@ func main() { bk = backend.NewCloudWatchBackend() } else if lowerBackendOpt == "statsd" { var err error - bk, err = backend.NewStatsDBackend(*statsdHost) + bk, err = backend.NewStatsDBackend(*statsdHost, *statsdTags) if err != nil { fmt.Printf("Error starting StatsD, err: %v\n", err) os.Exit(1) From 7be4a2e76e72a58d27ce3df7fef69d47add4031f Mon Sep 17 00:00:00 2001 From: Callum Jones Date: Sun, 18 Dec 2016 20:10:38 -0800 Subject: [PATCH 6/6] Add README --- README.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index d4fa5dc7..1b901d07 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Buildkite Metrics -A command-line tool for collecting [Buildkite](https://buildkite.com/) build/job statistics for external metrics systems. Currently [AWS Cloudwatch](http://aws.amazon.com/cloudwatch/) is supported. +A command-line tool for collecting [Buildkite](https://buildkite.com/) build/job statistics for external metrics systems. Currently [AWS Cloudwatch](http://aws.amazon.com/cloudwatch/) and [StatsD](https://github.com/etsy/statsd) are supported. [![Build status](https://badge.buildkite.com/80d04fcde3a306bef44e77aadb1f1ffdc20ebb3c8f1f585a60.svg)](https://buildkite.com/buildkite/buildkite-metrics) @@ -12,6 +12,13 @@ Either download the latest binary from [buildkite-metrics/buildkite-metrics-Linu go get github.com/buildkite/buildkite-metrics ``` +### Backends + +By default metrics will be submitted to CloudWatch but the backend can be switched to StatsD using the command-line argument `-backend statsd`. The StatsD backend supports the following arguments + +* `-statsd-host HOST`: The StatsD host and port (defaults to `127.0.0.1:8125`). +* `-statsd-tags`: Some StatsD servers like the agent provided by DataDog support tags. If specified, metrics will be tagged by `queue` and `pipeline` otherwise metrics will include the queue/pipeline name in the metric. Only enable this option if you know your StatsD server supports tags. + ## Development You can build and run the binary tool locally with golang installed: