Skip to content

Commit

Permalink
Merge pull request #17 from billyevans/Add_Compression
Browse files Browse the repository at this point in the history
Add gzip compression support
  • Loading branch information
fuyufjh authored Sep 9, 2021
2 parents 10df423 + e352744 commit feecd03
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 121 deletions.
27 changes: 26 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package hec

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -52,6 +53,9 @@ type Client struct {

// Mutex to allow threadsafe acknowledgement checking
ackMux sync.Mutex

// Compression type, "" and "gzip" are supported
compression string
}

func NewClient(serverURL string, token string) HEC {
Expand Down Expand Up @@ -88,6 +92,10 @@ func (hec *Client) SetMaxContentLength(size int) {
hec.maxLength = size
}

func (hec *Client) SetCompression(compression string) {
hec.compression = compression
}

func (hec *Client) WriteEventWithContext(ctx context.Context, event *Event) error {
if event.empty() {
return nil // skip empty events
Expand Down Expand Up @@ -314,7 +322,21 @@ func (res *Response) String() string {
func (hec *Client) makeRequest(ctx context.Context, endpoint string, data []byte) (*Response, error) {
retries := 0
RETRY:
req, err := http.NewRequest(http.MethodPost, hec.serverURL+endpoint, bytes.NewReader(data))
var reader io.Reader
if hec.compression == "gzip" {
var buffer bytes.Buffer
gzipWriter := gzip.NewWriter(&buffer)
_, err := gzipWriter.Write(data)
gzipWriter.Close()
if err != nil {
return nil, err
}
reader = &buffer
} else {
reader = bytes.NewReader(data)
}

req, err := http.NewRequest(http.MethodPost, hec.serverURL+endpoint, reader)
if err != nil {
return nil, err
}
Expand All @@ -323,6 +345,9 @@ RETRY:
req.Header.Set("Connection", "keep-alive")
}
req.Header.Set("Authorization", "Splunk "+hec.token)
if hec.compression == "gzip" {
req.Header.Set("Content-Encoding", "gzip")
}
res, err := hec.httpClient.Do(req)
if err != nil {
return nil, err
Expand Down
175 changes: 103 additions & 72 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package hec

import (
"compress/gzip"
"crypto/tls"
"encoding/json"
"net/http"
Expand All @@ -24,11 +25,23 @@ var testHttpClient *http.Client = &http.Client{
Timeout: 100 * time.Millisecond,
}

func jsonEndpoint(t *testing.T) http.Handler {
func jsonEndpoint(t *testing.T, compression string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
failed := false
input := make(map[string]interface{})
j := json.NewDecoder(r.Body)
content := r.Body
if compression == "gzip" {
var err error
content, err = gzip.NewReader(r.Body)
if err != nil {
t.Errorf("Unexpected error in gzip: %v", err)
}
header := r.Header.Get("Content-Encoding")
if header != "gzip" {
t.Errorf("Content-Encoding header wasn't sent for gzip")
}
}
j := json.NewDecoder(content)
err := j.Decode(&input)
if err != nil {
t.Errorf("Decoding JSON: %v", err)
Expand Down Expand Up @@ -66,20 +79,23 @@ func jsonEndpoint(t *testing.T) http.Handler {
}

func TestHEC_WriteEvent(t *testing.T) {
event := &Event{
Index: String("main"),
Source: String("test-hec-raw"),
SourceType: String("manual"),
Host: String("localhost"),
Time: String("1485237827.123"),
Event: "hello, world",
}
for _, compression := range []string{"", "gzip"} {
event := &Event{
Index: String("main"),
Source: String("test-hec-raw"),
SourceType: String("manual"),
Host: String("localhost"),
Time: String("1485237827.123"),
Event: "hello, world",
}

ts := httptest.NewServer(jsonEndpoint(t))
c := NewClient(ts.URL, testSplunkToken)
c.SetHTTPClient(testHttpClient)
err := c.WriteEvent(event)
assert.NoError(t, err)
ts := httptest.NewServer(jsonEndpoint(t, compression))
c := NewClient(ts.URL, testSplunkToken)
c.SetHTTPClient(testHttpClient)
c.SetCompression(compression)
err := c.WriteEvent(event)
assert.NoError(t, err)
}
}

func TestHEC_WriteEventServerFailure(t *testing.T) {
Expand All @@ -103,23 +119,26 @@ func TestHEC_WriteEventServerFailure(t *testing.T) {
}

func TestHEC_WriteObjectEvent(t *testing.T) {
event := &Event{
Index: String("main"),
Source: String("test-hec-raw"),
SourceType: String("manual"),
Host: String("localhost"),
Time: String("1485237827.123"),
Event: map[string]interface{}{
"str": "hello",
"time": time.Now(),
},
}
for _, compression := range []string{"", "gzip"} {
event := &Event{
Index: String("main"),
Source: String("test-hec-raw"),
SourceType: String("manual"),
Host: String("localhost"),
Time: String("1485237827.123"),
Event: map[string]interface{}{
"str": "hello",
"time": time.Now(),
},
}

ts := httptest.NewServer(jsonEndpoint(t))
c := NewClient(ts.URL, testSplunkToken)
c.SetHTTPClient(testHttpClient)
err := c.WriteEvent(event)
assert.NoError(t, err)
ts := httptest.NewServer(jsonEndpoint(t, compression))
c := NewClient(ts.URL, testSplunkToken)
c.SetHTTPClient(testHttpClient)
c.SetCompression(compression)
err := c.WriteEvent(event)
assert.NoError(t, err)
}
}

func TestHEC_WriteLongEvent(t *testing.T) {
Expand All @@ -132,7 +151,7 @@ func TestHEC_WriteLongEvent(t *testing.T) {
Event: "hello, world",
}

ts := httptest.NewServer(jsonEndpoint(t))
ts := httptest.NewServer(jsonEndpoint(t, ""))
c := NewClient(ts.URL, testSplunkToken)

c.SetHTTPClient(testHttpClient)
Expand All @@ -143,62 +162,74 @@ func TestHEC_WriteLongEvent(t *testing.T) {
}

func TestHEC_WriteEventBatch(t *testing.T) {
events := []*Event{
{Event: "event one"},
{Event: "event two"},
}
for _, compression := range []string{"", "gzip"} {
events := []*Event{
{Event: "event one"},
{Event: "event two"},
}

ts := httptest.NewServer(jsonEndpoint(t))
c := NewClient(ts.URL, testSplunkToken)
ts := httptest.NewServer(jsonEndpoint(t, compression))
c := NewClient(ts.URL, testSplunkToken)

c.SetHTTPClient(testHttpClient)
err := c.WriteBatch(events)
assert.NoError(t, err)
c.SetHTTPClient(testHttpClient)
c.SetCompression(compression)
err := c.WriteBatch(events)
assert.NoError(t, err)
}
}

func TestHEC_WriteLongEventBatch(t *testing.T) {
events := []*Event{
{Event: "event one"},
{Event: "event two"},
}
for _, compression := range []string{"", "gzip"} {
events := []*Event{
{Event: "event one"},
{Event: "event two"},
}

ts := httptest.NewServer(jsonEndpoint(t))
c := NewClient(ts.URL, testSplunkToken)
c.SetHTTPClient(testHttpClient)
c.SetMaxContentLength(25)
err := c.WriteBatch(events)
assert.NoError(t, err)
ts := httptest.NewServer(jsonEndpoint(t, compression))
c := NewClient(ts.URL, testSplunkToken)
c.SetHTTPClient(testHttpClient)
c.SetMaxContentLength(25)
c.SetCompression(compression)
err := c.WriteBatch(events)
assert.NoError(t, err)
}
}

func TestHEC_WriteEventRaw(t *testing.T) {
events := `2017-01-24T06:07:10.488Z Raw event one
for _, compression := range []string{"", "gzip"} {
events := `2017-01-24T06:07:10.488Z Raw event one
2017-01-24T06:07:12.434Z Raw event two`
metadata := EventMetadata{
Source: String("test-hec-raw"),
metadata := EventMetadata{
Source: String("test-hec-raw"),
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"text":"Success","code":0}`))
}))
c := NewClient(ts.URL, testSplunkToken)
c.SetHTTPClient(testHttpClient)
c.SetCompression(compression)
err := c.WriteRaw(strings.NewReader(events), &metadata)
assert.NoError(t, err)
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"text":"Success","code":0}`))
}))
c := NewClient(ts.URL, testSplunkToken)
c.SetHTTPClient(testHttpClient)
err := c.WriteRaw(strings.NewReader(events), &metadata)
assert.NoError(t, err)
}

func TestHEC_WriteLongEventRaw(t *testing.T) {
events := `2017-01-24T06:07:10.488Z Raw event one
for _, compression := range []string{"", "gzip"} {
events := `2017-01-24T06:07:10.488Z Raw event one
2017-01-24T06:07:12.434Z Raw event two`
metadata := EventMetadata{
Source: String("test-hec-raw"),
metadata := EventMetadata{
Source: String("test-hec-raw"),
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"text":"Success","code":0}`))
}))
c := NewClient(ts.URL, testSplunkToken)
c.SetMaxContentLength(40)
c.SetHTTPClient(testHttpClient)
c.SetCompression(compression)
err := c.WriteRaw(strings.NewReader(events), &metadata)
assert.NoError(t, err)
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"text":"Success","code":0}`))
}))
c := NewClient(ts.URL, testSplunkToken)
c.SetMaxContentLength(40)
c.SetHTTPClient(testHttpClient)
err := c.WriteRaw(strings.NewReader(events), &metadata)
assert.NoError(t, err)
}

func TestHEC_WriteRawFailure(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ func (c *Cluster) SetMaxContentLength(size int) {
c.mtx.Unlock()
}

func (c *Cluster) SetCompression(compression string) {
c.mtx.Lock()
for _, client := range c.clients {
client.SetCompression(compression)
}
c.mtx.Unlock()
}

func (c *Cluster) WriteEvent(event *Event) error {
return c.retry(func(client *Client) error {
return client.WriteEvent(event)
Expand Down
Loading

0 comments on commit feecd03

Please sign in to comment.