Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the _ttl param from the Elasticsearch topology code #1907

Merged
merged 1 commit into from
Jun 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d

*Affecting all Beats*

- The topology_expire option of the Elasticserach output was removed. {pull}1907[1907]

*Metricbeat*

*Packetbeat*
Expand Down
8 changes: 3 additions & 5 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@ filebeat.prospectors:

# Expiration time (in seconds) of the IPs published by a shipper to the topology map.
# All the IPs will be deleted afterwards. Note, that the value must be higher than
# refresh_topology_freq. The default is 15 seconds.
# refresh_topology_freq. This setting is used only by the Redis output. The other
# outputs don't support expiring entries.
# The default is 15 seconds.
#topology_expire: 15

# Internal queue size for single events in processing pipeline
Expand Down Expand Up @@ -333,10 +335,6 @@ output.elasticsearch:
# false. This option makes sense only for Packetbeat.
#save_topology: false

# The time to live in seconds for the topology information that is stored in
# Elasticsearch. The default is 15 seconds.
#topology_expire: 15

# A template is used to set the mapping in Elasticsearch
# By default template loading is enabled and the template is loaded.
# These settings can be adjusted to load your own template or overwrite existing ones
Expand Down
8 changes: 3 additions & 5 deletions libbeat/_meta/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@

# Expiration time (in seconds) of the IPs published by a shipper to the topology map.
# All the IPs will be deleted afterwards. Note, that the value must be higher than
# refresh_topology_freq. The default is 15 seconds.
# refresh_topology_freq. This setting is used only by the Redis output. The other
# outputs don't support expiring entries.
# The default is 15 seconds.
#topology_expire: 15

# Internal queue size for single events in processing pipeline
Expand Down Expand Up @@ -130,10 +132,6 @@ output.elasticsearch:
# false. This option makes sense only for Packetbeat.
#save_topology: false

# The time to live in seconds for the topology information that is stored in
# Elasticsearch. The default is 15 seconds.
#topology_expire: 15

# A template is used to set the mapping in Elasticsearch
# By default template loading is enabled and the template is loaded.
# These settings can be adjusted to load your own template or overwrite existing ones
Expand Down
12 changes: 8 additions & 4 deletions libbeat/docs/generalconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,14 @@ IP addresses to the topology map. The default is 10 seconds.

===== topology_expire

The expiration time for the topology in seconds. This is
useful in case a Beat stops publishing its IP addresses. The IP addresses
are removed automatically from the topology map after expiration. The default
is 15 seconds.
The expiration time for the topology in seconds. This is useful in case a Beat
stops publishing its IP addresses. The IP addresses are removed automatically
from the topology map after expiration.

This setting is used only by the Redis output. The other outputs don't support
expiring entries.

The default is 15 seconds.

===== queue_size

Expand Down
7 changes: 1 addition & 6 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ The maximum number of events to bulk in a single Elasticsearch bulk API index re

If the Beat sends single events, the events are collected into batches. If the Beat publishes
a large batch of events (larger than the value specified by `bulk_max_size`), the batch is
split.
split.

Specifying a larger batch size can improve performance by lowering the overhead of sending events.
However big batch sizes can also increase processing times, which might result in
Expand Down Expand Up @@ -257,11 +257,6 @@ false.

This option is relevant for Packetbeat only.

===== topology_expire

The time to live in seconds for the topology information that is stored in
Elasticsearch. The default is 15 seconds.

===== tls

Configuration options for TLS parameters like the certificate authority to use
Expand Down
23 changes: 0 additions & 23 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,29 +103,6 @@ func (out *elasticsearchOutput) init(
return err
}

if config.SaveTopology {
err := out.EnableTTL()
if err != nil {
logp.Err("Fail to set _ttl mapping: %s", err)
// keep trying in the background
go func() {
for {
err := out.EnableTTL()
if err == nil {
break
}
logp.Err("Fail to set _ttl mapping: %s", err)
time.Sleep(5 * time.Second)
}
}()
}
}

out.TopologyExpire = 15000
if topologyExpire != 0 {
out.TopologyExpire = topologyExpire * 1000 // millisec
}

out.mode = m
out.index = config.Index

Expand Down
24 changes: 0 additions & 24 deletions libbeat/outputs/elasticsearch/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ func createElasticsearchConnection(flushInterval int, bulkSize int) elasticsearc

func TestTopologyInES(t *testing.T) {

t.Skip("This tests is skipped because it currently fails with 5.0.0-alpha4. Probably because _ttl was removed")

if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch"})
}
Expand Down Expand Up @@ -310,25 +308,3 @@ func TestBulkEvents(t *testing.T) {
output = createElasticsearchConnection(50, 5)
testBulkWithParams(t, output)
}

func TestEnableTTL(t *testing.T) {

t.Skip("This tests is skipped as ttl is not compatible with 5.0.0-alpha4 as _ttl was removed")
if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch", "elasticsearch"})
}

output := createElasticsearchConnection(0, 0)
output.randomClient().Delete(".packetbeat-topology", "", "", nil)

err := output.EnableTTL()
if err != nil {
t.Errorf("Fail to enable TTL: %s", err)
}

// should succeed also when index already exists
err = output.EnableTTL()
if err != nil {
t.Errorf("Fail to enable TTL: %s", err)
}
}
41 changes: 2 additions & 39 deletions libbeat/outputs/elasticsearch/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package elasticsearch

import (
"encoding/json"
"fmt"
"math/rand"
"strconv"
"strings"
Expand All @@ -15,9 +14,7 @@ import (
type topology struct {
clients []mode.ProtocolClient

TopologyExpire int
TopologyMap atomic.Value // Value holds a map[string][string]
ttlEnabled bool
TopologyMap atomic.Value // Value holds a map[string][string]
}

type publishedTopology struct {
Expand All @@ -36,34 +33,6 @@ func (t *topology) randomClient() *Client {
}
}

// Enable using ttl as paramters in a server-ip doc type
func (t *topology) EnableTTL() error {
client := t.randomClient()
if client == nil {
return ErrNotConnected
}

setting := map[string]interface{}{
"server-ip": map[string]interface{}{
"_ttl": map[string]string{"enabled": "true", "default": "15s"},
},
}

// make sure the .packetbeat-topology index exists
// Ignore error here, as CreateIndex will error (400 Bad Request) if index
// already exists. If index could not be created, next api call to index will
// fail anyway.
index := ".packetbeat-topology"
_, _, _ = client.CreateIndex(index, nil)
_, _, err := client.Index(index, "server-ip", "_mapping", nil, setting)
if err != nil {
return err
}

t.ttlEnabled = true
return nil
}

// Get the name of a shipper by its IP address from the local topology map
func (t *topology) GetNameByIP(ip string) string {
topologyMap, ok := t.TopologyMap.Load().(map[string]string)
Expand All @@ -78,20 +47,14 @@ func (t *topology) GetNameByIP(ip string) string {

// Each shipper publishes a list of IPs together with its name to Elasticsearch
func (t *topology) PublishIPs(name string, localAddrs []string) error {
if !t.ttlEnabled {
debugf("Not publishing IPs because TTL was not yet confirmed to be enabled")
return nil
}

client := t.randomClient()
if client == nil {
return ErrNotConnected
}

debugf("Publish IPs %s with expiration time %d", localAddrs, t.TopologyExpire)
debugf("Publish IPs: %s", localAddrs)

params := map[string]string{
"ttl": fmt.Sprintf("%dms", t.TopologyExpire),
"refresh": "true",
}
_, _, err := client.Index(
Expand Down
8 changes: 3 additions & 5 deletions metricbeat/metricbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ metricbeat.modules:

# Expiration time (in seconds) of the IPs published by a shipper to the topology map.
# All the IPs will be deleted afterwards. Note, that the value must be higher than
# refresh_topology_freq. The default is 15 seconds.
# refresh_topology_freq. This setting is used only by the Redis output. The other
# outputs don't support expiring entries.
# The default is 15 seconds.
#topology_expire: 15

# Internal queue size for single events in processing pipeline
Expand Down Expand Up @@ -275,10 +277,6 @@ output.elasticsearch:
# false. This option makes sense only for Packetbeat.
#save_topology: false

# The time to live in seconds for the topology information that is stored in
# Elasticsearch. The default is 15 seconds.
#topology_expire: 15

# A template is used to set the mapping in Elasticsearch
# By default template loading is enabled and the template is loaded.
# These settings can be adjusted to load your own template or overwrite existing ones
Expand Down
8 changes: 3 additions & 5 deletions packetbeat/packetbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,9 @@ packetbeat.protocols.nfs:

# Expiration time (in seconds) of the IPs published by a shipper to the topology map.
# All the IPs will be deleted afterwards. Note, that the value must be higher than
# refresh_topology_freq. The default is 15 seconds.
# refresh_topology_freq. This setting is used only by the Redis output. The other
# outputs don't support expiring entries.
# The default is 15 seconds.
#topology_expire: 15

# Internal queue size for single events in processing pipeline
Expand Down Expand Up @@ -514,10 +516,6 @@ output.elasticsearch:
# false. This option makes sense only for Packetbeat.
#save_topology: false

# The time to live in seconds for the topology information that is stored in
# Elasticsearch. The default is 15 seconds.
#topology_expire: 15

# A template is used to set the mapping in Elasticsearch
# By default template loading is enabled and the template is loaded.
# These settings can be adjusted to load your own template or overwrite existing ones
Expand Down
8 changes: 3 additions & 5 deletions winlogbeat/winlogbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ winlogbeat.event_logs:

# Expiration time (in seconds) of the IPs published by a shipper to the topology map.
# All the IPs will be deleted afterwards. Note, that the value must be higher than
# refresh_topology_freq. The default is 15 seconds.
# refresh_topology_freq. This setting is used only by the Redis output. The other
# outputs don't support expiring entries.
# The default is 15 seconds.
#topology_expire: 15

# Internal queue size for single events in processing pipeline
Expand Down Expand Up @@ -165,10 +167,6 @@ output.elasticsearch:
# false. This option makes sense only for Packetbeat.
#save_topology: false

# The time to live in seconds for the topology information that is stored in
# Elasticsearch. The default is 15 seconds.
#topology_expire: 15

# A template is used to set the mapping in Elasticsearch
# By default template loading is enabled and the template is loaded.
# These settings can be adjusted to load your own template or overwrite existing ones
Expand Down