-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit 185faa2
Showing
3 changed files
with
191 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
# OPENTSDB metrics push GO client | ||
|
||
Client has 2 options to send metrics: | ||
- Enqueue metrics, send when batchSize collected and flush buffer. Use Push to force send current buffer. | ||
- Send single Metric immediately. | ||
|
||
*Important* | ||
|
||
Do not forger invoke `Close` on service down, to send unfilled buffer | ||
|
||
## Usage | ||
|
||
Import: | ||
|
||
`go get github.com/n10ty/opentsdb-go-push` | ||
|
||
Example: | ||
|
||
```go | ||
func main() { | ||
client := opentsdb.NewClient("http://localhost:4242", opentsdb.WithBatchSize(30)) | ||
err := client.Enqueue(opentsdb.Metric{ | ||
Timestamp: time.Now().Truncate(time.Minute).Unix(), | ||
Metric: "http_response_time", | ||
Value: 39.3, | ||
Tags: map[string]string{ | ||
"server": "server1", | ||
}, | ||
}) | ||
if err != nil { | ||
//... | ||
} | ||
client.Push() | ||
} | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
package opentsdb | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"io/ioutil" | ||
"net/http" | ||
) | ||
|
||
const defaultBatchSize = 20 | ||
|
||
// Client has 2 options to send metrics: | ||
// - Enqueue metrics, send when batchSize collected and flush buffer. Use Push to force send current buffer. | ||
// - Send single Metric immediately. | ||
type Client struct { | ||
host string | ||
authUser string | ||
authPass string | ||
buffer []Metric | ||
batchSize int | ||
} | ||
|
||
type Metric struct { | ||
Timestamp int64 `json:"timestamp"` | ||
Metric string `json:"metric"` | ||
Value any `json:"value"` | ||
Tags map[string]string `json:"tags"` | ||
} | ||
|
||
type config struct { | ||
authUsername string | ||
authPassword string | ||
batchSize int | ||
} | ||
|
||
func NewClient(host string, options ...Option) *Client { | ||
config := &config{ | ||
authUsername: "", | ||
authPassword: "", | ||
batchSize: defaultBatchSize, | ||
} | ||
for _, o := range options { | ||
o(config) | ||
} | ||
|
||
return &Client{ | ||
host: host, | ||
authUser: config.authUsername, | ||
authPass: config.authPassword, | ||
batchSize: config.batchSize, | ||
} | ||
} | ||
|
||
// Enqueue send metric to a buffer. Metrics are sent when buffer reaches batchSize number. | ||
func (c *Client) Enqueue(metric Metric) error { | ||
if metric.Tags == nil { | ||
return errors.New("tags can not be nil") | ||
} | ||
c.buffer = append(c.buffer, metric) | ||
if len(c.buffer) >= c.batchSize { | ||
err := c.send(c.buffer) | ||
c.buffer = []Metric{} | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// Send single Metric immediately | ||
func (c *Client) Send(metric Metric) error { | ||
if metric.Tags == nil { | ||
return errors.New("tags can not be nil") | ||
} | ||
return c.send([]Metric{metric}) | ||
} | ||
|
||
func (c *Client) send(metric []Metric) error { | ||
url := fmt.Sprintf("%s/api/put", c.host) | ||
m, err := json.Marshal(metric) | ||
if err != nil { | ||
return err | ||
} | ||
req, err := http.NewRequest(http.MethodPut, url, body(m)) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if c.authUser != "" { | ||
req.SetBasicAuth(c.authUser, c.authPass) | ||
} | ||
req.Header.Add("Content-Type", "application/json") | ||
res, err := http.DefaultClient.Do(req) | ||
if err != nil { | ||
return err | ||
} | ||
if res.StatusCode >= 400 { | ||
b, err := ioutil.ReadAll(res.Body) | ||
if err != nil { | ||
return err | ||
} | ||
return fmt.Errorf("%s: %s", res.Status, string(b)) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Push buffer and clean it | ||
func (c *Client) Push() error { | ||
if len(c.buffer) == 0 { | ||
return nil | ||
} | ||
err := c.send(c.buffer) | ||
c.buffer = []Metric{} | ||
if err != nil { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
// Close should be used on service down to prevent an unfilled buffer to be gone | ||
func (c *Client) Close() error { | ||
return c.send(c.buffer) | ||
} | ||
|
||
func body(buf []byte) io.Reader { | ||
return bytes.NewBuffer(buf) | ||
} | ||
|
||
type Option func(*config) error | ||
|
||
// WithAuth setup BasicAuth username and password to use in request | ||
func WithAuth(username, password string) Option { | ||
return func(c *config) error { | ||
c.authUsername = username | ||
c.authPassword = password | ||
return nil | ||
} | ||
} | ||
|
||
// WithBatchSize change default number of buffer size to push | ||
func WithBatchSize(n int) Option { | ||
return func(c *config) error { | ||
if n < 1 || n > 1024 { | ||
return errors.New("batch size should be between 1 and 1024") | ||
} | ||
c.batchSize = n | ||
return nil | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
module github.com/n10ty/opentsdb-go-push | ||
|
||
go 1.18 |