Skip to content

Commit

Permalink
Load the ES template on connect. (#1381)
Browse files Browse the repository at this point in the history
It used to try loading it only once on init, causing bug #1321.
This change moves the call to loadTemplate at connection time, immediately
after successful connection. This has the effect that if overwrite is true,
the template will be loaded on each new established connection.

The template is read on init time and sent to Elasticsearch at connect time.
This means that if the template path is wrong, it will be discovered at
startup (including `-configtest`).

In case there is an error loading the template, the Connect call fails.

This commit includes an integration test for the behaviour.
  • Loading branch information
tsg authored and ruflin committed Apr 19, 2016
1 parent bf03dac commit d601de2
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha1...master[Check the HEAD d
*Affecting all Beats*
- Drain response buffers when pipelining is used by redis output. {pull}1353[1353]
- Unterminated environment variable expressions in config files will now cause an error {pull}1389[1389]
- Fix issue with the automatic template loading when Elasticsearch is not available on Beat start. {issue}1321[1321]

*Packetbeat*

Expand Down
1 change: 1 addition & 0 deletions filebeat/invalid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sdasda
6 changes: 3 additions & 3 deletions libbeat/outputs/elasticsearch/api_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestOneHostSuccessResp(t *testing.T) {

server := ElasticsearchMock(200, expectedResp)

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)

params := map[string]string{
"refresh": "true",
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestOneHost500Resp(t *testing.T) {

server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened"))

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)
err := client.Connect(1 * time.Second)
if err != nil {
t.Fatalf("Failed to connect: %v", err)
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestOneHost503Resp(t *testing.T) {

server := ElasticsearchMock(503, []byte("Something wrong happened"))

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)

params := map[string]string{
"refresh": "true",
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func GetTestingElasticsearch() *Client {
var address = "http://" + GetEsHost() + ":" + GetEsPort()
username := os.Getenv("ES_USER")
pass := os.Getenv("ES_PASS")
return NewClient(address, "", nil, nil, username, pass, nil)
return NewClient(address, "", nil, nil, username, pass, nil, nil)
}

func GetValidQueryResult() QueryResult {
Expand Down
6 changes: 3 additions & 3 deletions libbeat/outputs/elasticsearch/bulkapi_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) {

server := ElasticsearchMock(200, expectedResp)

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)

params := map[string]string{
"refresh": "true",
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestOneHost500Resp_Bulk(t *testing.T) {

server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened"))

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)

params := map[string]string{
"refresh": "true",
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestOneHost503Resp_Bulk(t *testing.T) {

server := ElasticsearchMock(503, []byte("Something wrong happened"))

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)

params := map[string]string{
"refresh": "true",
Expand Down
20 changes: 18 additions & 2 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ type Client struct {
json jsonReader
}

type connectCallback func(client *Client) error

type Connection struct {
URL string
Username string
Password string

http *http.Client
connected bool
http *http.Client
connected bool
onConnectCallback func() error
}

var (
Expand All @@ -61,6 +64,7 @@ func NewClient(
esURL, index string, proxyURL *url.URL, tls *tls.Config,
username, password string,
params map[string]string,
onConnectCallback connectCallback,
) *Client {
proxy := http.ProxyFromEnvironment
if proxyURL != nil {
Expand All @@ -82,6 +86,13 @@ func NewClient(
index: index,
params: params,
}

client.Connection.onConnectCallback = func() error {
if onConnectCallback != nil {
return onConnectCallback(client)
}
return nil
}
return client
}

Expand Down Expand Up @@ -424,6 +435,11 @@ func (conn *Connection) Connect(timeout time.Duration) error {
if !conn.connected {
return ErrNotConnected
}

err = conn.onConnectCallback()
if err != nil {
return fmt.Errorf("Connection marked as failed because the onConnect callback failed: %v", err)
}
return nil
}

Expand Down
59 changes: 58 additions & 1 deletion libbeat/outputs/elasticsearch/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"io/ioutil"
"path/filepath"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/outputs"
"github.com/stretchr/testify/assert"
)

Expand All @@ -29,7 +31,7 @@ func TestCheckTemplate(t *testing.T) {
assert.Nil(t, err)

// Check for non existant template
assert.False(t, client.CheckTemplate("libbeat"))
assert.False(t, client.CheckTemplate("libbeat-notexists"))
}

func TestLoadTemplate(t *testing.T) {
Expand Down Expand Up @@ -129,3 +131,58 @@ func TestLoadBeatsTemplate(t *testing.T) {
assert.False(t, client.CheckTemplate(templateName))
}
}

// TestOutputLoadTemplate checks that the template is inserted before
// the first event is published.
func TestOutputLoadTemplate(t *testing.T) {

client := GetTestingElasticsearch()
err := client.Connect(5 * time.Second)
if err != nil {
t.Fatal(err)
}

// delete template if it exists
client.request("DELETE", "/_template/libbeat", nil, nil)

// Make sure template is not yet there
assert.False(t, client.CheckTemplate("libbeat"))

tPath, err := filepath.Abs("../../../topbeat/topbeat.template.json")
if err != nil {
t.Fatal(err)
}
config := map[string]interface{}{
"hosts": GetEsHost(),
"template": map[string]interface{}{
"name": "libbeat",
"path": tPath,
},
}

cfg, err := common.NewConfigFrom(config)
if err != nil {
t.Fatal(err)
}

output, err := New(cfg, 0)
if err != nil {
t.Fatal(err)
}
event := common.MapStr{
"@timestamp": common.Time(time.Now()),
"host": "test-host",
"type": "libbeat",
"message": "Test message from libbeat",
}

err = output.PublishEvent(nil, outputs.Options{Guaranteed: true}, event)
if err != nil {
t.Fatal(err)
}

// Guaranteed publish, so the template should be there

assert.True(t, client.CheckTemplate("libbeat"))

}
89 changes: 55 additions & 34 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"bytes"
"crypto/tls"
"errors"
"fmt"
"io/ioutil"
"net/url"
"strings"
"sync"
"time"

"github.com/elastic/beats/libbeat/common"
Expand All @@ -20,6 +22,9 @@ type elasticsearchOutput struct {
index string
mode mode.ConnectionMode
topology

templateContents []byte
templateMutex sync.Mutex
}

func init() {
Expand Down Expand Up @@ -69,7 +74,12 @@ func (out *elasticsearchOutput) init(
return err
}

clients, err := mode.MakeClients(cfg, makeClientFactory(tlsConfig, &config))
err = out.readTemplate(config.Template)
if err != nil {
return err
}

clients, err := mode.MakeClients(cfg, makeClientFactory(tlsConfig, &config, out))
if err != nil {
return err
}
Expand All @@ -91,8 +101,6 @@ func (out *elasticsearchOutput) init(
return err
}

loadTemplate(config.Template, clients)

if config.SaveTopology {
err := out.EnableTTL()
if err != nil {
Expand Down Expand Up @@ -122,52 +130,56 @@ func (out *elasticsearchOutput) init(
return nil
}

// loadTemplate checks if the index mapping template should be loaded
// In case template loading is enabled, template is written to index
func loadTemplate(config Template, clients []mode.ProtocolClient) {
// Check if template should be loaded
// Not being able to load the template will output an error but will not stop execution
if config.Name != "" && len(clients) > 0 {

// Always takes the first client
esClient := clients[0].(*Client)

// readTemplates reads the ES mapping template from the disk, if configured.
func (out *elasticsearchOutput) readTemplate(config Template) error {
if len(config.Name) > 0 {
// Look for the template in the configuration path, if it's not absolute
templatePath := paths.Resolve(paths.Config, config.Path)

logp.Info("Loading template enabled. Trying to load template: %v", templatePath)
logp.Info("Loading template enabled. Reading template file: %v", templatePath)

exists := esClient.CheckTemplate(config.Name)
var err error
out.templateContents, err = ioutil.ReadFile(templatePath)
if err != nil {
return fmt.Errorf("Error loading template %s: %v", templatePath, err)
}
}
return nil
}

// Check if template already exist or should be overwritten
if !exists || config.Overwrite {
// loadTemplate checks if the index mapping template should be loaded
// In case the template is not already loaded or overwritting is enabled, the
// template is written to index
func (out *elasticsearchOutput) loadTemplate(config Template, client *Client) error {
out.templateMutex.Lock()
defer out.templateMutex.Unlock()

if config.Overwrite {
logp.Info("Existing template will be overwritten, as overwrite is enabled.")
}
logp.Info("Trying to load template for client: %s", client)

// Load template from file
content, err := ioutil.ReadFile(templatePath)
if err != nil {
logp.Err("Could not load template from file path: %s; Error: %s", templatePath, err)
} else {
reader := bytes.NewReader(content)
err = esClient.LoadTemplate(config.Name, reader)
// Check if template already exist or should be overwritten
exists := client.CheckTemplate(config.Name)
if !exists || config.Overwrite {

if err != nil {
logp.Err("Could not load template: %v", err)
}
}
} else {
logp.Info("Template already exists and will not be overwritten.")
if config.Overwrite {
logp.Info("Existing template will be overwritten, as overwrite is enabled.")
}

reader := bytes.NewReader(out.templateContents)
err := client.LoadTemplate(config.Name, reader)
if err != nil {
return fmt.Errorf("Could not load template: %v", err)
}
} else {
logp.Info("Template already exists and will not be overwritten.")
}

return nil
}

func makeClientFactory(
tls *tls.Config,
config *elasticsearchConfig,
out *elasticsearchOutput,
) func(string) (mode.ProtocolClient, error) {
return func(host string) (mode.ProtocolClient, error) {
esURL, err := getURL(config.Protocol, config.Path, host)
Expand Down Expand Up @@ -196,10 +208,19 @@ func makeClientFactory(
if len(params) == 0 {
params = nil
}

// define a callback to be called on connection
var onConnected connectCallback
if len(out.templateContents) > 0 {
onConnected = func(client *Client) error {
return out.loadTemplate(config.Template, client)
}
}

client := NewClient(
esURL, config.Index, proxyURL, tls,
config.Username, config.Password,
params)
params, onConnected)
return client, nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/logstash_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func esConnect(t *testing.T, index string) *esConnection {

username := os.Getenv("ES_USER")
password := os.Getenv("ES_PASS")
client := elasticsearch.NewClient(host, "", nil, nil, username, password, nil)
client := elasticsearch.NewClient(host, "", nil, nil, username, password, nil, nil)

// try to drop old index if left over from failed test
_, _, _ = client.Delete(index, "", "", nil) // ignore error
Expand Down
1 change: 1 addition & 0 deletions libbeat/tests/system/beatname.template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"template": true}
1 change: 1 addition & 0 deletions libbeat/tests/system/mockbeat.template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"template": true}
12 changes: 8 additions & 4 deletions libbeat/tests/system/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ def test_config_test(self):
"""
shutil.copy("../../etc/libbeat.yml",
os.path.join(self.working_dir, "libbeat.yml"))
with open(self.working_dir + "/beatname.template.json", "w") as f:
f.write('{"template": true}')

exit_code = self.run_beat(config="libbeat.yml",
extra_args=["-configtest"])
exit_code = self.run_beat(
config="libbeat.yml",
extra_args=["-configtest",
"-path.config", self.working_dir])

assert exit_code == 0
assert self.log_contains("Config OK") is True
Expand All @@ -70,8 +74,8 @@ def test_version(self):

assert self.log_contains("error loading config file") is False

with open(os.path.join(self.working_dir, "mockbeat.log"), "wb") \
as outputfile:
with open(os.path.join(self.working_dir, "mockbeat.log"), "wb") \
as outputfile:
proc = subprocess.Popen(args,
stdout=outputfile,
stderr=subprocess.STDOUT)
Expand Down

0 comments on commit d601de2

Please sign in to comment.