From b319fcad1ec7024ba430b7d4de1c79b1819270fc Mon Sep 17 00:00:00 2001 From: urso Date: Sat, 11 Jun 2016 15:36:18 +0200 Subject: [PATCH 1/3] ES-output support for gzip compressed content-encoding --- libbeat/outputs/elasticsearch/api_test.go | 6 +- libbeat/outputs/elasticsearch/bulkapi.go | 66 +++-------- .../elasticsearch/bulkapi_integration_test.go | 3 + libbeat/outputs/elasticsearch/client.go | 110 +++++++++++------- .../elasticsearch/client_integration_test.go | 18 ++- libbeat/outputs/elasticsearch/config.go | 48 ++++---- libbeat/outputs/elasticsearch/output.go | 44 ++++--- libbeat/outputs/elasticsearch/output_test.go | 4 +- libbeat/outputs/elasticsearch/topology.go | 8 +- .../logstash/logstash_integration_test.go | 9 +- 10 files changed, 165 insertions(+), 151 deletions(-) diff --git a/libbeat/outputs/elasticsearch/api_test.go b/libbeat/outputs/elasticsearch/api_test.go index a0c74055cd06..32391379081d 100644 --- a/libbeat/outputs/elasticsearch/api_test.go +++ b/libbeat/outputs/elasticsearch/api_test.go @@ -169,5 +169,9 @@ func newTestClient(url string) *Client { } func newTestClientAuth(url, user, pass string) *Client { - return NewClient(url, "", nil, nil, user, pass, nil, 60*time.Second, nil) + client, err := NewClient(url, "", nil, nil, user, pass, nil, 60*time.Second, 3, nil) + if err != nil { + panic(err) + } + return client } diff --git a/libbeat/outputs/elasticsearch/bulkapi.go b/libbeat/outputs/elasticsearch/bulkapi.go index a38f43ed6d4d..429c2b61e27f 100644 --- a/libbeat/outputs/elasticsearch/bulkapi.go +++ b/libbeat/outputs/elasticsearch/bulkapi.go @@ -2,7 +2,6 @@ package elasticsearch import ( "bytes" - "encoding/json" "io" "io/ioutil" "net/http" @@ -25,18 +24,10 @@ type bulkRequest struct { requ *http.Request } -type bulkBody interface { - Reader() io.Reader -} - type bulkResult struct { raw []byte } -type jsonBulkBody struct { - buf *bytes.Buffer -} - // Bulk performs many index/delete operations in a single API call. // Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html func (conn *Connection) Bulk( @@ -60,12 +51,13 @@ func (conn *Connection) BulkWith( return nil, nil } - bulkBody := newJSONBulkBody(nil) - if err := bulkEncode(bulkBody, metaBuilder, body); err != nil { + enc := conn.encoder + enc.Reset() + if err := bulkEncode(enc, metaBuilder, body); err != nil { return nil, err } - requ, err := newBulkRequest(conn.URL, index, docType, params, bulkBody) + requ, err := newBulkRequest(conn.URL, index, docType, params, enc) if err != nil { return nil, err } @@ -81,7 +73,7 @@ func newBulkRequest( urlStr string, index, docType string, params map[string]string, - body bulkBody, + body bodyEncoder, ) (*bulkRequest, error) { path, err := makePath(index, docType, "_bulk") if err != nil { @@ -100,12 +92,16 @@ func newBulkRequest( return nil, err } + if body != nil { + body.AddHeader(&requ.Header) + } + return &bulkRequest{ requ: requ, }, nil } -func (r *bulkRequest) Reset(body bulkBody) { +func (r *bulkRequest) Reset(body bodyEncoder) { bdy := body.Reader() rc, ok := bdy.(io.ReadCloser) @@ -124,6 +120,8 @@ func (r *bulkRequest) Reset(body bulkBody) { r.requ.Header = http.Header{} r.requ.Body = rc + + body.AddHeader(&r.requ.Header) } func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, bulkResult, error) { @@ -140,45 +138,11 @@ func readBulkResult(obj []byte) (bulkResult, error) { return bulkResult{obj}, nil } -func newJSONBulkBody(buf *bytes.Buffer) *jsonBulkBody { - if buf == nil { - buf = bytes.NewBuffer(nil) - } - return &jsonBulkBody{buf} -} - -func (b *jsonBulkBody) Reset() { - b.buf.Reset() -} - -func (b *jsonBulkBody) Reader() io.Reader { - return b.buf -} - -func (b *jsonBulkBody) AddRaw(raw interface{}) error { - enc := json.NewEncoder(b.buf) - return enc.Encode(raw) -} - -func (b *jsonBulkBody) Add(meta, obj interface{}) error { - enc := json.NewEncoder(b.buf) - pos := b.buf.Len() - - if err := enc.Encode(meta); err != nil { - b.buf.Truncate(pos) - return err - } - if err := enc.Encode(obj); err != nil { - b.buf.Truncate(pos) - return err - } - return nil -} -func bulkEncode(out *jsonBulkBody, metaBuilder MetaBuilder, body []interface{}) error { +func bulkEncode(out bulkWriter, metaBuilder MetaBuilder, body []interface{}) error { if metaBuilder == nil { for _, obj := range body { if err := out.AddRaw(obj); err != nil { - debug("Failed to encode message: %s", err) + debugf("Failed to encode message: %s", err) return err } } @@ -186,7 +150,7 @@ func bulkEncode(out *jsonBulkBody, metaBuilder MetaBuilder, body []interface{}) for _, obj := range body { meta := metaBuilder(obj) if err := out.Add(meta, obj); err != nil { - debug("Failed to encode message: %s", err) + debugf("Failed to encode event (dropping event): %s", err) } } } diff --git a/libbeat/outputs/elasticsearch/bulkapi_integration_test.go b/libbeat/outputs/elasticsearch/bulkapi_integration_test.go index 5cb4e4193242..d55d4beaf7d2 100644 --- a/libbeat/outputs/elasticsearch/bulkapi_integration_test.go +++ b/libbeat/outputs/elasticsearch/bulkapi_integration_test.go @@ -102,6 +102,7 @@ func TestBulkMoreOperations(t *testing.T) { { "field1": "value1", }, + { "delete": map[string]interface{}{ "_index": index, @@ -109,6 +110,7 @@ func TestBulkMoreOperations(t *testing.T) { "_id": "2", }, }, + { "create": map[string]interface{}{ "_index": index, @@ -119,6 +121,7 @@ func TestBulkMoreOperations(t *testing.T) { { "field1": "value3", }, + { "update": map[string]interface{}{ "_id": "1", diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 5c3eae2e97bc..935ec7f48c7f 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -3,7 +3,6 @@ package elasticsearch import ( "bytes" "crypto/tls" - "encoding/json" "errors" "expvar" "fmt" @@ -13,7 +12,6 @@ import ( "net/url" "time" - "github.com/dustin/go-humanize" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs/mode" @@ -26,11 +24,14 @@ type Client struct { params map[string]string // buffered bulk requests - bulkBody *jsonBulkBody bulkRequ *bulkRequest // buffered json response reader json jsonReader + + // additional configs + compressionLevel int + proxyURL *url.URL } type connectCallback func(client *Client) error @@ -43,6 +44,8 @@ type Connection struct { http *http.Client connected bool onConnectCallback func() error + + encoder bodyEncoder } // Metrics that can retrieved through the expvar web interface. @@ -75,8 +78,9 @@ func NewClient( username, password string, params map[string]string, timeout time.Duration, + compression int, onConnectCallback connectCallback, -) *Client { +) (*Client, error) { proxy := http.ProxyFromEnvironment if proxyURL != nil { proxy = http.ProxyURL(proxyURL) @@ -94,7 +98,17 @@ func NewClient( bulkRequ, err := newBulkRequest(esURL, "", "", params, nil) if err != nil { - logp.Critical("Elasticsearch output not correctly initialized: %v", err) + return nil, err + } + + var encoder bodyEncoder + if compression == 0 { + encoder = newJSONEncoder(nil) + } else { + encoder, err = newGzipEncoder(compression, nil) + if err != nil { + return nil, err + } } client := &Client{ @@ -110,12 +124,15 @@ func NewClient( }, Timeout: timeout, }, + encoder: encoder, }, index: index, params: params, - bulkBody: newJSONBulkBody(nil), bulkRequ: bulkRequ, + + compressionLevel: compression, + proxyURL: proxyURL, } client.Connection.onConnectCallback = func() error { @@ -124,23 +141,30 @@ func NewClient( } return nil } - return client + + return client, nil } func (client *Client) Clone() *Client { - newClient := &Client{ - Connection: Connection{ - URL: client.URL, - Username: client.Username, - Password: client.Password, - http: &http.Client{ - Transport: client.http.Transport, - }, - connected: false, - }, - index: client.index, - } - return newClient + // when cloning the connection callback and params are not copied. A + // client's close is for example generated for topology-map support. With params + // most likely containing the ingest node pipeline and default callback trying to + // create install a template, we don't want these to be included in the clone. + + transport := client.http.Transport.(*http.Transport) + c, _ := NewClient( + client.URL, + client.index, + client.proxyURL, + transport.TLSClientConfig, + client.Username, + client.Password, + nil, // XXX: do not pass params? + client.http.Timeout, + client.compressionLevel, + nil, // XXX: do not pass connection callback? + ) + return c } // PublishEvents sends all events to elasticsearch. On error a slice with all @@ -160,12 +184,9 @@ func (client *Client) PublishEvents( return events, ErrNotConnected } - body := client.bulkBody + body := client.encoder body.Reset() - requ := client.bulkRequ - requ.Reset(body) - // encode events into bulk request buffer, dropping failed elements from // events slice events = bulkEncodePublishRequest(body, client.index, events) @@ -173,15 +194,16 @@ func (client *Client) PublishEvents( return nil, nil } + requ := client.bulkRequ + requ.Reset(body) status, result, sendErr := client.sendBulkRequest(requ) if sendErr != nil { logp.Err("Failed to perform any bulk index operations: %s", sendErr) return events, sendErr } - logp.Debug("elasticsearch", "PublishEvents: %d metrics have been packed into a buffer of %s and published to elasticsearch in %v.", + debugf("PublishEvents: %d metrics have been published to elasticsearch in %v.", len(events), - humanize.Bytes(uint64(body.buf.Len())), time.Now().Sub(begin)) // check response for transient errors @@ -208,7 +230,7 @@ func (client *Client) PublishEvents( // fillBulkRequest encodes all bulk requests and returns slice of events // successfully added to bulk request. func bulkEncodePublishRequest( - body *jsonBulkBody, + body bulkWriter, index string, events []common.MapStr, ) []common.MapStr { @@ -413,7 +435,7 @@ func (client *Client) PublishEvent(event common.MapStr) error { } index := getIndex(event, client.index) - logp.Debug("output_elasticsearch", "Publish event: %s", event) + debugf("Publish event: %s", event) // insert the events one by one status, _, err := client.Index( @@ -441,9 +463,10 @@ func (client *Client) PublishEvent(event common.MapStr) error { // LoadTemplate loads a template into Elasticsearch overwriting the existing // template if it exists. If you wish to not overwrite an existing template // then use CheckTemplate prior to calling this method. -func (client *Client) LoadTemplate(templateName string, reader *bytes.Reader) error { +func (client *Client) LoadTemplate(templateName string, template map[string]interface{}) error { - status, _, err := client.execRequest("PUT", client.URL+"/_template/"+templateName, reader) + path := "/_template/" + templateName + status, _, err := client.request("PUT", path, nil, template) if err != nil { return fmt.Errorf("Template could not be loaded. Error: %s", err) @@ -488,16 +511,16 @@ func (conn *Connection) Connect(timeout time.Duration) error { } func (conn *Connection) Ping(timeout time.Duration) (bool, error) { - debug("ES Ping(url=%v, timeout=%v)", conn.URL, timeout) + debugf("ES Ping(url=%v, timeout=%v)", conn.URL, timeout) conn.http.Timeout = timeout status, _, err := conn.execRequest("HEAD", conn.URL, nil) if err != nil { - debug("Ping request failed with: %v", err) + debugf("Ping request failed with: %v", err) return false, err } - debug("Ping status code: %v", status) + debugf("Ping status code: %v", status) return status < 300, nil } @@ -516,19 +539,17 @@ func (conn *Connection) request( body interface{}, ) (int, []byte, error) { url := makeURL(conn.URL, path, params) - logp.Debug("elasticsearch", "%s %s %v", method, url, body) + debugf("%s %s %v", method, url, body) - var obj []byte - if body != nil { - var err error - obj, err = json.Marshal(body) - if err != nil { - logp.Warn("Failed to json encode body (%v): %#v", err, body) - return 0, nil, ErrJSONEncodeFailed - } + if body == nil { + return conn.execRequest(method, url, nil) } - return conn.execRequest(method, url, bytes.NewReader(obj)) + if err := conn.encoder.Marshal(body); err != nil { + logp.Warn("Failed to json encode body (%v): %#v", err, body) + return 0, nil, ErrJSONEncodeFailed + } + return conn.execRequest(method, url, conn.encoder.Reader()) } func (conn *Connection) execRequest( @@ -540,6 +561,9 @@ func (conn *Connection) execRequest( logp.Warn("Failed to create request", err) return 0, nil, err } + if body != nil { + conn.encoder.AddHeader(&req.Header) + } return conn.execHTTPRequest(req) } diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index ee6c35adb8d8..2b90f94c52e8 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -6,8 +6,6 @@ import ( "testing" "time" - "bytes" - "io/ioutil" "path/filepath" "github.com/elastic/beats/libbeat/common" @@ -42,8 +40,7 @@ func TestLoadTemplate(t *testing.T) { assert.Nil(t, err) templatePath := absPath + "/template.json" - content, err := ioutil.ReadFile(templatePath) - reader := bytes.NewReader(content) + content, err := readTemplate(templatePath) assert.Nil(t, err) // Setup ES @@ -54,7 +51,7 @@ func TestLoadTemplate(t *testing.T) { templateName := "testbeat" // Load template - err = client.LoadTemplate(templateName, reader) + err = client.LoadTemplate(templateName, content) assert.Nil(t, err) // Make sure template was loaded @@ -71,7 +68,9 @@ func TestLoadTemplate(t *testing.T) { func TestLoadInvalidTemplate(t *testing.T) { // Invalid Template - reader := bytes.NewReader([]byte("{json:invalid}")) + template := map[string]interface{}{ + "json": "invalid", + } // Setup ES client := GetTestingElasticsearch() @@ -81,7 +80,7 @@ func TestLoadInvalidTemplate(t *testing.T) { templateName := "invalidtemplate" // Try to load invalid template - err = client.LoadTemplate(templateName, reader) + err = client.LoadTemplate(templateName, template) assert.Error(t, err) // Make sure template was not loaded @@ -106,8 +105,7 @@ func TestLoadBeatsTemplate(t *testing.T) { assert.Nil(t, err) templatePath := absPath + "/" + beat + ".template.json" - content, err := ioutil.ReadFile(templatePath) - reader := bytes.NewReader(content) + content, err := readTemplate(templatePath) assert.Nil(t, err) // Setup ES @@ -118,7 +116,7 @@ func TestLoadBeatsTemplate(t *testing.T) { templateName := beat // Load template - err = client.LoadTemplate(templateName, reader) + err = client.LoadTemplate(templateName, content) assert.Nil(t, err) // Make sure template was loaded diff --git a/libbeat/outputs/elasticsearch/config.go b/libbeat/outputs/elasticsearch/config.go index b9e64ed76eae..e512e346c3aa 100644 --- a/libbeat/outputs/elasticsearch/config.go +++ b/libbeat/outputs/elasticsearch/config.go @@ -7,19 +7,20 @@ import ( ) type elasticsearchConfig struct { - Protocol string `config:"protocol"` - Path string `config:"path"` - Params map[string]string `config:"parameters"` - Username string `config:"username"` - Password string `config:"password"` - ProxyURL string `config:"proxy_url"` - Index string `config:"index"` - LoadBalance bool `config:"loadbalance"` - TLS *outputs.TLSConfig `config:"tls"` - MaxRetries int `config:"max_retries"` - Timeout time.Duration `config:"timeout"` - SaveTopology bool `config:"save_topology"` - Template Template `config:"template"` + Protocol string `config:"protocol"` + Path string `config:"path"` + Params map[string]string `config:"parameters"` + Username string `config:"username"` + Password string `config:"password"` + ProxyURL string `config:"proxy_url"` + Index string `config:"index"` + LoadBalance bool `config:"loadbalance"` + CompressionLevel int `config:"compression_level" validate:"min=0, max=9"` + TLS *outputs.TLSConfig `config:"tls"` + MaxRetries int `config:"max_retries"` + Timeout time.Duration `config:"timeout"` + SaveTopology bool `config:"save_topology"` + Template Template `config:"template"` } type Template struct { @@ -34,16 +35,17 @@ const ( var ( defaultConfig = elasticsearchConfig{ - Protocol: "", - Path: "", - ProxyURL: "", - Params: nil, - Username: "", - Password: "", - Timeout: 90 * time.Second, - MaxRetries: 3, - TLS: nil, - LoadBalance: true, + Protocol: "", + Path: "", + ProxyURL: "", + Params: nil, + Username: "", + Password: "", + Timeout: 90 * time.Second, + MaxRetries: 3, + CompressionLevel: 0, + TLS: nil, + LoadBalance: true, } ) diff --git a/libbeat/outputs/elasticsearch/output.go b/libbeat/outputs/elasticsearch/output.go index 6d6817dc8132..8ad73c31ad8a 100644 --- a/libbeat/outputs/elasticsearch/output.go +++ b/libbeat/outputs/elasticsearch/output.go @@ -1,12 +1,12 @@ package elasticsearch import ( - "bytes" "crypto/tls" + "encoding/json" "errors" "fmt" - "io/ioutil" "net/url" + "os" "strings" "sync" "time" @@ -25,8 +25,8 @@ type elasticsearchOutput struct { mode mode.ConnectionMode topology - templateContents []byte - templateMutex sync.Mutex + template map[string]interface{} + templateMutex sync.Mutex } func init() { @@ -34,7 +34,7 @@ func init() { } var ( - debug = logp.MakeDebug("elasticsearch") + debugf = logp.MakeDebug("elasticsearch") ) var ( @@ -140,15 +140,33 @@ func (out *elasticsearchOutput) readTemplate(config Template) error { logp.Info("Loading template enabled. Reading template file: %v", templatePath) - var err error - out.templateContents, err = ioutil.ReadFile(templatePath) + template, err := readTemplate(templatePath) if err != nil { return fmt.Errorf("Error loading template %s: %v", templatePath, err) } + + out.template = template } return nil } +func readTemplate(filename string) (map[string]interface{}, error) { + f, err := os.Open(filename) + if err != nil { + return nil, err + } + defer f.Close() + + var template map[string]interface{} + dec := json.NewDecoder(f) + err = dec.Decode(&template) + if err != nil { + return nil, err + } + + return template, nil +} + // loadTemplate checks if the index mapping template should be loaded // In case the template is not already loaded or overwritting is enabled, the // template is written to index @@ -166,8 +184,7 @@ func (out *elasticsearchOutput) loadTemplate(config Template, client *Client) er logp.Info("Existing template will be overwritten, as overwrite is enabled.") } - reader := bytes.NewReader(out.templateContents) - err := client.LoadTemplate(config.Name, reader) + err := client.LoadTemplate(config.Name, out.template) if err != nil { return fmt.Errorf("Could not load template: %v", err) } @@ -207,17 +224,18 @@ func makeClientFactory( // define a callback to be called on connection var onConnected connectCallback - if len(out.templateContents) > 0 { + if out.template != nil { onConnected = func(client *Client) error { return out.loadTemplate(config.Template, client) } } - client := NewClient( + return NewClient( esURL, config.Index, proxyURL, tls, config.Username, config.Password, - params, config.Timeout, onConnected) - return client, nil + params, config.Timeout, + config.CompressionLevel, + onConnected) } } diff --git a/libbeat/outputs/elasticsearch/output_test.go b/libbeat/outputs/elasticsearch/output_test.go index a13a0f301603..a77d7075c794 100644 --- a/libbeat/outputs/elasticsearch/output_test.go +++ b/libbeat/outputs/elasticsearch/output_test.go @@ -108,7 +108,7 @@ func TestOneEvent(t *testing.T) { r["response"] = "value1" index := fmt.Sprintf("%s-%d.%02d.%02d", output.index, ts.Year(), ts.Month(), ts.Day()) - logp.Debug("output_elasticsearch", "index = %s", index) + debugf("index = %s", index) client := output.randomClient() client.CreateIndex(index, common.MapStr{ @@ -148,7 +148,7 @@ func TestOneEvent(t *testing.T) { t.Errorf("Failed to query elasticsearch for index(%s): %s", index, err) return } - logp.Debug("output_elasticsearch", "resp = %s", resp) + debugf("resp = %s", resp) if resp.Hits.Total != 1 { t.Errorf("Wrong number of results: %d", resp.Hits.Total) } diff --git a/libbeat/outputs/elasticsearch/topology.go b/libbeat/outputs/elasticsearch/topology.go index 7befb431f8c3..eb9cd2575396 100644 --- a/libbeat/outputs/elasticsearch/topology.go +++ b/libbeat/outputs/elasticsearch/topology.go @@ -79,8 +79,7 @@ func (t *topology) GetNameByIP(ip string) string { // Each shipper publishes a list of IPs together with its name to Elasticsearch func (t *topology) PublishIPs(name string, localAddrs []string) error { if !t.ttlEnabled { - logp.Debug("output_elasticsearch", - "Not publishing IPs because TTL was not yet confirmed to be enabled") + debugf("Not publishing IPs because TTL was not yet confirmed to be enabled") return nil } @@ -89,8 +88,7 @@ func (t *topology) PublishIPs(name string, localAddrs []string) error { return ErrNotConnected } - logp.Debug("output_elasticsearch", - "Publish IPs %s with expiration time %d", localAddrs, t.TopologyExpire) + debugf("Publish IPs %s with expiration time %d", localAddrs, t.TopologyExpire) params := map[string]string{ "ttl": fmt.Sprintf("%dms", t.TopologyExpire), @@ -161,6 +159,6 @@ func loadTopolgyMap(client *Client) (map[string]string, error) { } } - logp.Debug("output_elasticsearch", "Topology map %s", topology) + debugf("Topology map %s", topology) return topology, nil } diff --git a/libbeat/outputs/logstash/logstash_integration_test.go b/libbeat/outputs/logstash/logstash_integration_test.go index f960e5807a98..3ee2309be922 100644 --- a/libbeat/outputs/logstash/logstash_integration_test.go +++ b/libbeat/outputs/logstash/logstash_integration_test.go @@ -73,13 +73,16 @@ func esConnect(t *testing.T, index string) *esConnection { username := os.Getenv("ES_USER") password := os.Getenv("ES_PASS") - client := elasticsearch.NewClient(host, "", nil, nil, username, password, - nil, 60*time.Second, nil) + client, err := elasticsearch.NewClient(host, "", nil, nil, username, password, + nil, 60*time.Second, 0, nil) + if err != nil { + t.Fatal(err) + } // try to drop old index if left over from failed test _, _, _ = client.Delete(index, "", "", nil) // ignore error - _, _, err := client.CreateIndex(index, common.MapStr{ + _, _, err = client.CreateIndex(index, common.MapStr{ "settings": common.MapStr{ "number_of_shards": 1, "number_of_replicas": 0, From 039ecd7037f9f972dbcaaeba5f5cdb026701b5fa Mon Sep 17 00:00:00 2001 From: urso Date: Mon, 13 Jun 2016 10:01:10 +0200 Subject: [PATCH 2/3] Add missing file --- libbeat/outputs/elasticsearch/enc.go | 137 +++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 libbeat/outputs/elasticsearch/enc.go diff --git a/libbeat/outputs/elasticsearch/enc.go b/libbeat/outputs/elasticsearch/enc.go new file mode 100644 index 000000000000..d150c3d21e80 --- /dev/null +++ b/libbeat/outputs/elasticsearch/enc.go @@ -0,0 +1,137 @@ +package elasticsearch + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "io" + "net/http" +) + +type bodyEncoder interface { + bulkBodyEncoder + Reader() io.Reader + Marshal(doc interface{}) error +} + +type bulkBodyEncoder interface { + bulkWriter + + AddHeader(*http.Header) + Reset() +} + +type bulkWriter interface { + Add(meta, obj interface{}) error + AddRaw(raw interface{}) error +} + +type jsonEncoder struct { + buf *bytes.Buffer +} + +type gzipEncoder struct { + buf *bytes.Buffer + gzip *gzip.Writer +} + +func newJSONEncoder(buf *bytes.Buffer) *jsonEncoder { + if buf == nil { + buf = bytes.NewBuffer(nil) + } + return &jsonEncoder{buf} +} + +func (b *jsonEncoder) Reset() { + b.buf.Reset() +} + +func (b *jsonEncoder) AddHeader(header *http.Header) { + header.Add("Content-Type", "application/json; charset=UTF-8") +} + +func (b *jsonEncoder) Reader() io.Reader { + return b.buf +} + +func (b *jsonEncoder) Marshal(obj interface{}) error { + b.Reset() + enc := json.NewEncoder(b.buf) + return enc.Encode(obj) +} + +func (b *jsonEncoder) AddRaw(raw interface{}) error { + enc := json.NewEncoder(b.buf) + return enc.Encode(raw) +} + +func (b *jsonEncoder) Add(meta, obj interface{}) error { + enc := json.NewEncoder(b.buf) + pos := b.buf.Len() + + if err := enc.Encode(meta); err != nil { + b.buf.Truncate(pos) + return err + } + if err := enc.Encode(obj); err != nil { + b.buf.Truncate(pos) + return err + } + return nil +} + +func newGzipEncoder(level int, buf *bytes.Buffer) (*gzipEncoder, error) { + if buf == nil { + buf = bytes.NewBuffer(nil) + } + w, err := gzip.NewWriterLevel(buf, level) + if err != nil { + return nil, err + } + + return &gzipEncoder{buf, w}, nil +} + +func (b *gzipEncoder) Reset() { + b.buf.Reset() + b.gzip.Reset(b.buf) +} + +func (b *gzipEncoder) Reader() io.Reader { + b.gzip.Close() + return b.buf +} + +func (b *gzipEncoder) AddHeader(header *http.Header) { + header.Add("Content-Type", "application/json; charset=UTF-8") + header.Add("Content-Encoding", "gzip") +} + +func (b *gzipEncoder) Marshal(obj interface{}) error { + b.Reset() + enc := json.NewEncoder(b.gzip) + err := enc.Encode(obj) + return err +} + +func (b *gzipEncoder) AddRaw(raw interface{}) error { + enc := json.NewEncoder(b.gzip) + return enc.Encode(raw) +} + +func (b *gzipEncoder) Add(meta, obj interface{}) error { + enc := json.NewEncoder(b.gzip) + pos := b.buf.Len() + + if err := enc.Encode(meta); err != nil { + b.buf.Truncate(pos) + return err + } + if err := enc.Encode(obj); err != nil { + b.buf.Truncate(pos) + return err + } + + b.gzip.Flush() + return nil +} From 51c94103d0344bc7391f674f31ea20360e9e0de4 Mon Sep 17 00:00:00 2001 From: urso Date: Tue, 14 Jun 2016 15:03:19 +0200 Subject: [PATCH 3/3] Document elasticsearch output compression_level --- filebeat/filebeat.full.yml | 3 +++ libbeat/_beat/config.full.yml | 3 +++ libbeat/docs/outputconfig.asciidoc | 9 ++++++++- metricbeat/metricbeat.full.yml | 3 +++ packetbeat/packetbeat.full.yml | 3 +++ topbeat/topbeat.full.yml | 3 +++ winlogbeat/winlogbeat.full.yml | 3 +++ 7 files changed, 26 insertions(+), 1 deletion(-) diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 472951692fdd..be3787ca3154 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -286,6 +286,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] + # Set gzip compression level. + #compression_level: 0 + # Optional protocol and basic auth credentials. #protocol: "https" #username: "admin" diff --git a/libbeat/_beat/config.full.yml b/libbeat/_beat/config.full.yml index 2ecff3f4ffc6..c134cbb77a77 100644 --- a/libbeat/_beat/config.full.yml +++ b/libbeat/_beat/config.full.yml @@ -83,6 +83,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] + # Set gzip compression level. + #compression_level: 0 + # Optional protocol and basic auth credentials. #protocol: "https" #username: "admin" diff --git a/libbeat/docs/outputconfig.asciidoc b/libbeat/docs/outputconfig.asciidoc index 970d32c1b381..ab4362312c62 100644 --- a/libbeat/docs/outputconfig.asciidoc +++ b/libbeat/docs/outputconfig.asciidoc @@ -120,6 +120,13 @@ output.elasticsearch: In the previous example, the Elasticsearch nodes are available at `https://10.45.3.2:9220/elasticsearch` and `https://10.45.3.1:9230/elasticsearch`. +===== compression_level + +The gzip compression level. Setting this value to 0 disables compression. +The compression level must be in the range of 1 (best speed) to 9 (best compression). + +The default value is 0. + ===== worker The number of workers per configured host publishing events to Elasticsearch. This @@ -363,7 +370,7 @@ is used as the default port number. ===== compression_level -The gzip compression level. Setting this value to values less than or equal to 0 disables compression. +The gzip compression level. Setting this value to 0 disables compression. The compression level must be in the range of 1 (best speed) to 9 (best compression). The default value is 3. diff --git a/metricbeat/metricbeat.full.yml b/metricbeat/metricbeat.full.yml index 3031b23312c6..f6bce4745e9a 100644 --- a/metricbeat/metricbeat.full.yml +++ b/metricbeat/metricbeat.full.yml @@ -217,6 +217,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] + # Set gzip compression level. + #compression_level: 0 + # Optional protocol and basic auth credentials. #protocol: "https" #username: "admin" diff --git a/packetbeat/packetbeat.full.yml b/packetbeat/packetbeat.full.yml index 764dfff49746..0ea862180071 100644 --- a/packetbeat/packetbeat.full.yml +++ b/packetbeat/packetbeat.full.yml @@ -467,6 +467,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] + # Set gzip compression level. + #compression_level: 0 + # Optional protocol and basic auth credentials. #protocol: "https" #username: "admin" diff --git a/topbeat/topbeat.full.yml b/topbeat/topbeat.full.yml index 2f9463b387dc..70fe42575b57 100644 --- a/topbeat/topbeat.full.yml +++ b/topbeat/topbeat.full.yml @@ -114,6 +114,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] + # Set gzip compression level. + #compression_level: 0 + # Optional protocol and basic auth credentials. #protocol: "https" #username: "admin" diff --git a/winlogbeat/winlogbeat.full.yml b/winlogbeat/winlogbeat.full.yml index c7c6c6c0154d..202567f67970 100644 --- a/winlogbeat/winlogbeat.full.yml +++ b/winlogbeat/winlogbeat.full.yml @@ -118,6 +118,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] + # Set gzip compression level. + #compression_level: 0 + # Optional protocol and basic auth credentials. #protocol: "https" #username: "admin"