From e3527449b9b76d8a61c53cb44d9efd6577928c32 Mon Sep 17 00:00:00 2001 From: Alexey Pervushin Date: Wed, 8 Sep 2021 14:18:41 -0700 Subject: [PATCH] Add gzip compression support Add go-modules support --- client.go | 27 +++++++- client_test.go | 175 ++++++++++++++++++++++++++++-------------------- cluster.go | 8 +++ cluster_test.go | 105 ++++++++++++++++------------- go.mod | 11 +++ go.sum | 13 ++++ hec.go | 1 + 7 files changed, 219 insertions(+), 121 deletions(-) create mode 100644 go.mod create mode 100644 go.sum diff --git a/client.go b/client.go index db1d543..ce6069b 100644 --- a/client.go +++ b/client.go @@ -2,6 +2,7 @@ package hec import ( "bytes" + "compress/gzip" "context" "encoding/json" "fmt" @@ -52,6 +53,9 @@ type Client struct { // Mutex to allow threadsafe acknowledgement checking ackMux sync.Mutex + + // Compression type, "" and "gzip" are supported + compression string } func NewClient(serverURL string, token string) HEC { @@ -88,6 +92,10 @@ func (hec *Client) SetMaxContentLength(size int) { hec.maxLength = size } +func (hec *Client) SetCompression(compression string) { + hec.compression = compression +} + func (hec *Client) WriteEventWithContext(ctx context.Context, event *Event) error { if event.empty() { return nil // skip empty events @@ -314,7 +322,21 @@ func (res *Response) String() string { func (hec *Client) makeRequest(ctx context.Context, endpoint string, data []byte) (*Response, error) { retries := 0 RETRY: - req, err := http.NewRequest(http.MethodPost, hec.serverURL+endpoint, bytes.NewReader(data)) + var reader io.Reader + if hec.compression == "gzip" { + var buffer bytes.Buffer + gzipWriter := gzip.NewWriter(&buffer) + _, err := gzipWriter.Write(data) + gzipWriter.Close() + if err != nil { + return nil, err + } + reader = &buffer + } else { + reader = bytes.NewReader(data) + } + + req, err := http.NewRequest(http.MethodPost, hec.serverURL+endpoint, reader) if err != nil { return nil, err } @@ -323,6 +345,9 @@ RETRY: req.Header.Set("Connection", "keep-alive") } req.Header.Set("Authorization", "Splunk "+hec.token) + if hec.compression == "gzip" { + req.Header.Set("Content-Encoding", "gzip") + } res, err := hec.httpClient.Do(req) if err != nil { return nil, err diff --git a/client_test.go b/client_test.go index da4f8fa..3f68e30 100644 --- a/client_test.go +++ b/client_test.go @@ -1,6 +1,7 @@ package hec import ( + "compress/gzip" "crypto/tls" "encoding/json" "net/http" @@ -24,11 +25,23 @@ var testHttpClient *http.Client = &http.Client{ Timeout: 100 * time.Millisecond, } -func jsonEndpoint(t *testing.T) http.Handler { +func jsonEndpoint(t *testing.T, compression string) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { failed := false input := make(map[string]interface{}) - j := json.NewDecoder(r.Body) + content := r.Body + if compression == "gzip" { + var err error + content, err = gzip.NewReader(r.Body) + if err != nil { + t.Errorf("Unexpected error in gzip: %v", err) + } + header := r.Header.Get("Content-Encoding") + if header != "gzip" { + t.Errorf("Content-Encoding header wasn't sent for gzip") + } + } + j := json.NewDecoder(content) err := j.Decode(&input) if err != nil { t.Errorf("Decoding JSON: %v", err) @@ -66,20 +79,23 @@ func jsonEndpoint(t *testing.T) http.Handler { } func TestHEC_WriteEvent(t *testing.T) { - event := &Event{ - Index: String("main"), - Source: String("test-hec-raw"), - SourceType: String("manual"), - Host: String("localhost"), - Time: String("1485237827.123"), - Event: "hello, world", - } + for _, compression := range []string{"", "gzip"} { + event := &Event{ + Index: String("main"), + Source: String("test-hec-raw"), + SourceType: String("manual"), + Host: String("localhost"), + Time: String("1485237827.123"), + Event: "hello, world", + } - ts := httptest.NewServer(jsonEndpoint(t)) - c := NewClient(ts.URL, testSplunkToken) - c.SetHTTPClient(testHttpClient) - err := c.WriteEvent(event) - assert.NoError(t, err) + ts := httptest.NewServer(jsonEndpoint(t, compression)) + c := NewClient(ts.URL, testSplunkToken) + c.SetHTTPClient(testHttpClient) + c.SetCompression(compression) + err := c.WriteEvent(event) + assert.NoError(t, err) + } } func TestHEC_WriteEventServerFailure(t *testing.T) { @@ -103,23 +119,26 @@ func TestHEC_WriteEventServerFailure(t *testing.T) { } func TestHEC_WriteObjectEvent(t *testing.T) { - event := &Event{ - Index: String("main"), - Source: String("test-hec-raw"), - SourceType: String("manual"), - Host: String("localhost"), - Time: String("1485237827.123"), - Event: map[string]interface{}{ - "str": "hello", - "time": time.Now(), - }, - } + for _, compression := range []string{"", "gzip"} { + event := &Event{ + Index: String("main"), + Source: String("test-hec-raw"), + SourceType: String("manual"), + Host: String("localhost"), + Time: String("1485237827.123"), + Event: map[string]interface{}{ + "str": "hello", + "time": time.Now(), + }, + } - ts := httptest.NewServer(jsonEndpoint(t)) - c := NewClient(ts.URL, testSplunkToken) - c.SetHTTPClient(testHttpClient) - err := c.WriteEvent(event) - assert.NoError(t, err) + ts := httptest.NewServer(jsonEndpoint(t, compression)) + c := NewClient(ts.URL, testSplunkToken) + c.SetHTTPClient(testHttpClient) + c.SetCompression(compression) + err := c.WriteEvent(event) + assert.NoError(t, err) + } } func TestHEC_WriteLongEvent(t *testing.T) { @@ -132,7 +151,7 @@ func TestHEC_WriteLongEvent(t *testing.T) { Event: "hello, world", } - ts := httptest.NewServer(jsonEndpoint(t)) + ts := httptest.NewServer(jsonEndpoint(t, "")) c := NewClient(ts.URL, testSplunkToken) c.SetHTTPClient(testHttpClient) @@ -143,62 +162,74 @@ func TestHEC_WriteLongEvent(t *testing.T) { } func TestHEC_WriteEventBatch(t *testing.T) { - events := []*Event{ - {Event: "event one"}, - {Event: "event two"}, - } + for _, compression := range []string{"", "gzip"} { + events := []*Event{ + {Event: "event one"}, + {Event: "event two"}, + } - ts := httptest.NewServer(jsonEndpoint(t)) - c := NewClient(ts.URL, testSplunkToken) + ts := httptest.NewServer(jsonEndpoint(t, compression)) + c := NewClient(ts.URL, testSplunkToken) - c.SetHTTPClient(testHttpClient) - err := c.WriteBatch(events) - assert.NoError(t, err) + c.SetHTTPClient(testHttpClient) + c.SetCompression(compression) + err := c.WriteBatch(events) + assert.NoError(t, err) + } } func TestHEC_WriteLongEventBatch(t *testing.T) { - events := []*Event{ - {Event: "event one"}, - {Event: "event two"}, - } + for _, compression := range []string{"", "gzip"} { + events := []*Event{ + {Event: "event one"}, + {Event: "event two"}, + } - ts := httptest.NewServer(jsonEndpoint(t)) - c := NewClient(ts.URL, testSplunkToken) - c.SetHTTPClient(testHttpClient) - c.SetMaxContentLength(25) - err := c.WriteBatch(events) - assert.NoError(t, err) + ts := httptest.NewServer(jsonEndpoint(t, compression)) + c := NewClient(ts.URL, testSplunkToken) + c.SetHTTPClient(testHttpClient) + c.SetMaxContentLength(25) + c.SetCompression(compression) + err := c.WriteBatch(events) + assert.NoError(t, err) + } } func TestHEC_WriteEventRaw(t *testing.T) { - events := `2017-01-24T06:07:10.488Z Raw event one + for _, compression := range []string{"", "gzip"} { + events := `2017-01-24T06:07:10.488Z Raw event one 2017-01-24T06:07:12.434Z Raw event two` - metadata := EventMetadata{ - Source: String("test-hec-raw"), + metadata := EventMetadata{ + Source: String("test-hec-raw"), + } + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"text":"Success","code":0}`)) + })) + c := NewClient(ts.URL, testSplunkToken) + c.SetHTTPClient(testHttpClient) + c.SetCompression(compression) + err := c.WriteRaw(strings.NewReader(events), &metadata) + assert.NoError(t, err) } - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte(`{"text":"Success","code":0}`)) - })) - c := NewClient(ts.URL, testSplunkToken) - c.SetHTTPClient(testHttpClient) - err := c.WriteRaw(strings.NewReader(events), &metadata) - assert.NoError(t, err) } func TestHEC_WriteLongEventRaw(t *testing.T) { - events := `2017-01-24T06:07:10.488Z Raw event one + for _, compression := range []string{"", "gzip"} { + events := `2017-01-24T06:07:10.488Z Raw event one 2017-01-24T06:07:12.434Z Raw event two` - metadata := EventMetadata{ - Source: String("test-hec-raw"), + metadata := EventMetadata{ + Source: String("test-hec-raw"), + } + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"text":"Success","code":0}`)) + })) + c := NewClient(ts.URL, testSplunkToken) + c.SetMaxContentLength(40) + c.SetHTTPClient(testHttpClient) + c.SetCompression(compression) + err := c.WriteRaw(strings.NewReader(events), &metadata) + assert.NoError(t, err) } - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte(`{"text":"Success","code":0}`)) - })) - c := NewClient(ts.URL, testSplunkToken) - c.SetMaxContentLength(40) - c.SetHTTPClient(testHttpClient) - err := c.WriteRaw(strings.NewReader(events), &metadata) - assert.NoError(t, err) } func TestHEC_WriteRawFailure(t *testing.T) { diff --git a/cluster.go b/cluster.go index c7f5b08..768198c 100644 --- a/cluster.go +++ b/cluster.go @@ -78,6 +78,14 @@ func (c *Cluster) SetMaxContentLength(size int) { c.mtx.Unlock() } +func (c *Cluster) SetCompression(compression string) { + c.mtx.Lock() + for _, client := range c.clients { + client.SetCompression(compression) + } + c.mtx.Unlock() +} + func (c *Cluster) WriteEvent(event *Event) error { return c.retry(func(client *Client) error { return client.WriteEvent(event) diff --git a/cluster_test.go b/cluster_test.go index e8ae5af..6f1279c 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -14,65 +14,74 @@ var ( ) func TestCluster_WriteEvent(t *testing.T) { - event := &Event{ - Index: String("main"), - Source: String("test-hec-raw"), - SourceType: String("manual"), - Host: String("localhost"), - Time: String("1485237827.123"), - Event: String("hello, world"), - } + for _, compression := range []string{"", "gzip"} { + event := &Event{ + Index: String("main"), + Source: String("test-hec-raw"), + SourceType: String("manual"), + Host: String("localhost"), + Time: String("1485237827.123"), + Event: String("hello, world"), + } - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte(`{"text":"Success","code":0}`)) - })) - c := NewCluster([]string{ts.URL}, testSplunkToken) - c.SetHTTPClient(testHttpClient) - err := c.WriteEvent(event) - assert.NoError(t, err) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"text":"Success","code":0}`)) + })) + c := NewCluster([]string{ts.URL}, testSplunkToken) + c.SetHTTPClient(testHttpClient) + c.SetCompression(compression) + err := c.WriteEvent(event) + assert.NoError(t, err) + } } func TestCluster_WriteEventBatch(t *testing.T) { - eventBatches := [][]*Event{ - { - {Event: "event one"}, - {Event: "event two"}, - }, - { - {Event: "event foo"}, - {Event: "event bar"}, - }, - } + for _, compression := range []string{"", "gzip"} { + eventBatches := [][]*Event{ + { + {Event: "event one"}, + {Event: "event two"}, + }, + { + {Event: "event foo"}, + {Event: "event bar"}, + }, + } - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte(`{"text":"Success","code":0}`)) - })) - c := NewCluster([]string{ts.URL}, testSplunkToken) - c.SetHTTPClient(testHttpClient) - for _, batch := range eventBatches { - err := c.WriteBatch(batch) - assert.NoError(t, err) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"text":"Success","code":0}`)) + })) + c := NewCluster([]string{ts.URL}, testSplunkToken) + c.SetHTTPClient(testHttpClient) + c.SetCompression(compression) + for _, batch := range eventBatches { + err := c.WriteBatch(batch) + assert.NoError(t, err) + } } } func TestCluster_WriteEventRaw(t *testing.T) { - eventBlocks := []string{ - `2017-01-24T06:07:10.488Z Raw event one + for _, compression := range []string{"", "gzip"} { + eventBlocks := []string{ + `2017-01-24T06:07:10.488Z Raw event one 2017-01-24T06:07:12.434Z Raw event two`, - `2017-01-24T06:07:10.488Z Raw event foo + `2017-01-24T06:07:10.488Z Raw event foo 2017-01-24T06:07:12.434Z Raw event bar`, - } - metadata := EventMetadata{ - Source: String("test-hec-raw"), - } - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte(`{"text":"Success","code":0}`)) - })) - c := NewCluster([]string{ts.URL}, testSplunkToken) - c.SetHTTPClient(testHttpClient) - for _, block := range eventBlocks { - err := c.WriteRaw(strings.NewReader(block), &metadata) - assert.NoError(t, err) + } + metadata := EventMetadata{ + Source: String("test-hec-raw"), + } + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"text":"Success","code":0}`)) + })) + c := NewCluster([]string{ts.URL}, testSplunkToken) + c.SetHTTPClient(testHttpClient) + c.SetCompression(compression) + for _, block := range eventBlocks { + err := c.WriteRaw(strings.NewReader(block), &metadata) + assert.NoError(t, err) + } } } diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e11a40f --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module github.com/fuyufjh/splunk-hec-go + +go 1.17 + +require ( + github.com/davecgh/go-spew v1.1.0 // indirect + github.com/google/uuid v1.0.0 + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/testify v1.7.0 + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..19c9388 --- /dev/null +++ b/go.sum @@ -0,0 +1,13 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA= +github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/hec.go b/hec.go index 76de35a..ef24eae 100644 --- a/hec.go +++ b/hec.go @@ -12,6 +12,7 @@ type HEC interface { SetChannel(channel string) SetMaxRetry(retries int) SetMaxContentLength(size int) + SetCompression(compression string) // WriteEvent writes single event via HEC json mode WriteEvent(event *Event) error