Skip to content

Commit

Permalink
Elasticsearch output support for gzip compressed content-encoding (#1835
Browse files Browse the repository at this point in the history
)

* Add missing file
* Document elasticsearch output compression_level
  • Loading branch information
Steffen Siering authored and ruflin committed Jun 15, 2016
1 parent ec5c4c4 commit 7397fd5
Show file tree
Hide file tree
Showing 18 changed files with 328 additions and 152 deletions.
3 changes: 3 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ output.elasticsearch:
# IPv6 addresses should always be defined as: https://[2001:db8::1]:9200
hosts: ["localhost:9200"]

# Set gzip compression level.
#compression_level: 0

# Optional protocol and basic auth credentials.
#protocol: "https"
#username: "admin"
Expand Down
3 changes: 3 additions & 0 deletions libbeat/_beat/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ output.elasticsearch:
# IPv6 addresses should always be defined as: https://[2001:db8::1]:9200
hosts: ["localhost:9200"]

# Set gzip compression level.
#compression_level: 0

# Optional protocol and basic auth credentials.
#protocol: "https"
#username: "admin"
Expand Down
9 changes: 8 additions & 1 deletion libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ output.elasticsearch:
In the previous example, the Elasticsearch nodes are available at `https://10.45.3.2:9220/elasticsearch` and
`https://10.45.3.1:9230/elasticsearch`.

===== compression_level

The gzip compression level. Setting this value to 0 disables compression.
The compression level must be in the range of 1 (best speed) to 9 (best compression).

The default value is 0.

===== worker

The number of workers per configured host publishing events to Elasticsearch. This
Expand Down Expand Up @@ -363,7 +370,7 @@ is used as the default port number.

===== compression_level

The gzip compression level. Setting this value to values less than or equal to 0 disables compression.
The gzip compression level. Setting this value to 0 disables compression.
The compression level must be in the range of 1 (best speed) to 9 (best compression).

The default value is 3.
Expand Down
6 changes: 5 additions & 1 deletion libbeat/outputs/elasticsearch/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,5 +169,9 @@ func newTestClient(url string) *Client {
}

func newTestClientAuth(url, user, pass string) *Client {
return NewClient(url, "", nil, nil, user, pass, nil, 60*time.Second, nil)
client, err := NewClient(url, "", nil, nil, user, pass, nil, 60*time.Second, 3, nil)
if err != nil {
panic(err)
}
return client
}
66 changes: 15 additions & 51 deletions libbeat/outputs/elasticsearch/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package elasticsearch

import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net/http"
Expand All @@ -25,18 +24,10 @@ type bulkRequest struct {
requ *http.Request
}

type bulkBody interface {
Reader() io.Reader
}

type bulkResult struct {
raw []byte
}

type jsonBulkBody struct {
buf *bytes.Buffer
}

// Bulk performs many index/delete operations in a single API call.
// Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
func (conn *Connection) Bulk(
Expand All @@ -60,12 +51,13 @@ func (conn *Connection) BulkWith(
return nil, nil
}

bulkBody := newJSONBulkBody(nil)
if err := bulkEncode(bulkBody, metaBuilder, body); err != nil {
enc := conn.encoder
enc.Reset()
if err := bulkEncode(enc, metaBuilder, body); err != nil {
return nil, err
}

requ, err := newBulkRequest(conn.URL, index, docType, params, bulkBody)
requ, err := newBulkRequest(conn.URL, index, docType, params, enc)
if err != nil {
return nil, err
}
Expand All @@ -81,7 +73,7 @@ func newBulkRequest(
urlStr string,
index, docType string,
params map[string]string,
body bulkBody,
body bodyEncoder,
) (*bulkRequest, error) {
path, err := makePath(index, docType, "_bulk")
if err != nil {
Expand All @@ -100,12 +92,16 @@ func newBulkRequest(
return nil, err
}

if body != nil {
body.AddHeader(&requ.Header)
}

return &bulkRequest{
requ: requ,
}, nil
}

func (r *bulkRequest) Reset(body bulkBody) {
func (r *bulkRequest) Reset(body bodyEncoder) {
bdy := body.Reader()

rc, ok := bdy.(io.ReadCloser)
Expand All @@ -124,6 +120,8 @@ func (r *bulkRequest) Reset(body bulkBody) {

r.requ.Header = http.Header{}
r.requ.Body = rc

body.AddHeader(&r.requ.Header)
}

func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, bulkResult, error) {
Expand All @@ -140,53 +138,19 @@ func readBulkResult(obj []byte) (bulkResult, error) {
return bulkResult{obj}, nil
}

func newJSONBulkBody(buf *bytes.Buffer) *jsonBulkBody {
if buf == nil {
buf = bytes.NewBuffer(nil)
}
return &jsonBulkBody{buf}
}

func (b *jsonBulkBody) Reset() {
b.buf.Reset()
}

func (b *jsonBulkBody) Reader() io.Reader {
return b.buf
}

func (b *jsonBulkBody) AddRaw(raw interface{}) error {
enc := json.NewEncoder(b.buf)
return enc.Encode(raw)
}

func (b *jsonBulkBody) Add(meta, obj interface{}) error {
enc := json.NewEncoder(b.buf)
pos := b.buf.Len()

if err := enc.Encode(meta); err != nil {
b.buf.Truncate(pos)
return err
}
if err := enc.Encode(obj); err != nil {
b.buf.Truncate(pos)
return err
}
return nil
}
func bulkEncode(out *jsonBulkBody, metaBuilder MetaBuilder, body []interface{}) error {
func bulkEncode(out bulkWriter, metaBuilder MetaBuilder, body []interface{}) error {
if metaBuilder == nil {
for _, obj := range body {
if err := out.AddRaw(obj); err != nil {
debug("Failed to encode message: %s", err)
debugf("Failed to encode message: %s", err)
return err
}
}
} else {
for _, obj := range body {
meta := metaBuilder(obj)
if err := out.Add(meta, obj); err != nil {
debug("Failed to encode message: %s", err)
debugf("Failed to encode event (dropping event): %s", err)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions libbeat/outputs/elasticsearch/bulkapi_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,15 @@ func TestBulkMoreOperations(t *testing.T) {
{
"field1": "value1",
},

{
"delete": map[string]interface{}{
"_index": index,
"_type": "type1",
"_id": "2",
},
},

{
"create": map[string]interface{}{
"_index": index,
Expand All @@ -119,6 +121,7 @@ func TestBulkMoreOperations(t *testing.T) {
{
"field1": "value3",
},

{
"update": map[string]interface{}{
"_id": "1",
Expand Down
Loading

0 comments on commit 7397fd5

Please sign in to comment.