Skip to content

Commit

Permalink
Remove the _ttl param from the Elasticsearch topology code
Browse files Browse the repository at this point in the history
Ttl is no longer supported by Elasticsearch, so we cannot use it.
The alternative is to use curator or similar to cleanup old values.
  • Loading branch information
Tudor Golubenco committed Jun 24, 2016
1 parent dedc109 commit f00cc96
Show file tree
Hide file tree
Showing 11 changed files with 28 additions and 121 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d

*Affecting all Beats*

- The topology_expire option of the Elasticserach output was removed. {pull}1907[1907]

*Metricbeat*

*Packetbeat*
Expand Down
8 changes: 3 additions & 5 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@ filebeat.prospectors:

# Expiration time (in seconds) of the IPs published by a shipper to the topology map.
# All the IPs will be deleted afterwards. Note, that the value must be higher than
# refresh_topology_freq. The default is 15 seconds.
# refresh_topology_freq. This setting is used only by the Redis output. The other
# outputs don't support expiring entries.
# The default is 15 seconds.
#topology_expire: 15

# Internal queue size for single events in processing pipeline
Expand Down Expand Up @@ -333,10 +335,6 @@ output.elasticsearch:
# false. This option makes sense only for Packetbeat.
#save_topology: false

# The time to live in seconds for the topology information that is stored in
# Elasticsearch. The default is 15 seconds.
#topology_expire: 15

# A template is used to set the mapping in Elasticsearch
# By default template loading is enabled and the template is loaded.
# These settings can be adjusted to load your own template or overwrite existing ones
Expand Down
8 changes: 3 additions & 5 deletions libbeat/_meta/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@

# Expiration time (in seconds) of the IPs published by a shipper to the topology map.
# All the IPs will be deleted afterwards. Note, that the value must be higher than
# refresh_topology_freq. The default is 15 seconds.
# refresh_topology_freq. This setting is used only by the Redis output. The other
# outputs don't support expiring entries.
# The default is 15 seconds.
#topology_expire: 15

# Internal queue size for single events in processing pipeline
Expand Down Expand Up @@ -130,10 +132,6 @@ output.elasticsearch:
# false. This option makes sense only for Packetbeat.
#save_topology: false

# The time to live in seconds for the topology information that is stored in
# Elasticsearch. The default is 15 seconds.
#topology_expire: 15

# A template is used to set the mapping in Elasticsearch
# By default template loading is enabled and the template is loaded.
# These settings can be adjusted to load your own template or overwrite existing ones
Expand Down
12 changes: 8 additions & 4 deletions libbeat/docs/generalconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,14 @@ IP addresses to the topology map. The default is 10 seconds.

===== topology_expire

The expiration time for the topology in seconds. This is
useful in case a Beat stops publishing its IP addresses. The IP addresses
are removed automatically from the topology map after expiration. The default
is 15 seconds.
The expiration time for the topology in seconds. This is useful in case a Beat
stops publishing its IP addresses. The IP addresses are removed automatically
from the topology map after expiration.

This setting is used only by the Redis output. The other outputs don't support
expiring entries.

The default is 15 seconds.

===== queue_size

Expand Down
7 changes: 1 addition & 6 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ The maximum number of events to bulk in a single Elasticsearch bulk API index re

If the Beat sends single events, the events are collected into batches. If the Beat publishes
a large batch of events (larger than the value specified by `bulk_max_size`), the batch is
split.
split.

Specifying a larger batch size can improve performance by lowering the overhead of sending events.
However big batch sizes can also increase processing times, which might result in
Expand Down Expand Up @@ -257,11 +257,6 @@ false.

This option is relevant for Packetbeat only.

===== topology_expire

The time to live in seconds for the topology information that is stored in
Elasticsearch. The default is 15 seconds.

===== tls

Configuration options for TLS parameters like the certificate authority to use
Expand Down
23 changes: 0 additions & 23 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,29 +103,6 @@ func (out *elasticsearchOutput) init(
return err
}

if config.SaveTopology {
err := out.EnableTTL()
if err != nil {
logp.Err("Fail to set _ttl mapping: %s", err)
// keep trying in the background
go func() {
for {
err := out.EnableTTL()
if err == nil {
break
}
logp.Err("Fail to set _ttl mapping: %s", err)
time.Sleep(5 * time.Second)
}
}()
}
}

out.TopologyExpire = 15000
if topologyExpire != 0 {
out.TopologyExpire = topologyExpire * 1000 // millisec
}

out.mode = m
out.index = config.Index

Expand Down
24 changes: 0 additions & 24 deletions libbeat/outputs/elasticsearch/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ func createElasticsearchConnection(flushInterval int, bulkSize int) elasticsearc

func TestTopologyInES(t *testing.T) {

t.Skip("This tests is skipped because it currently fails with 5.0.0-alpha4. Probably because _ttl was removed")

if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch"})
}
Expand Down Expand Up @@ -310,25 +308,3 @@ func TestBulkEvents(t *testing.T) {
output = createElasticsearchConnection(50, 5)
testBulkWithParams(t, output)
}

func TestEnableTTL(t *testing.T) {

t.Skip("This tests is skipped as ttl is not compatible with 5.0.0-alpha4 as _ttl was removed")
if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch", "elasticsearch"})
}

output := createElasticsearchConnection(0, 0)
output.randomClient().Delete(".packetbeat-topology", "", "", nil)

err := output.EnableTTL()
if err != nil {
t.Errorf("Fail to enable TTL: %s", err)
}

// should succeed also when index already exists
err = output.EnableTTL()
if err != nil {
t.Errorf("Fail to enable TTL: %s", err)
}
}
41 changes: 2 additions & 39 deletions libbeat/outputs/elasticsearch/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package elasticsearch

import (
"encoding/json"
"fmt"
"math/rand"
"strconv"
"strings"
Expand All @@ -15,9 +14,7 @@ import (
type topology struct {
clients []mode.ProtocolClient

TopologyExpire int
TopologyMap atomic.Value // Value holds a map[string][string]
ttlEnabled bool
TopologyMap atomic.Value // Value holds a map[string][string]
}

type publishedTopology struct {
Expand All @@ -36,34 +33,6 @@ func (t *topology) randomClient() *Client {
}
}

// Enable using ttl as paramters in a server-ip doc type
func (t *topology) EnableTTL() error {
client := t.randomClient()
if client == nil {
return ErrNotConnected
}

setting := map[string]interface{}{
"server-ip": map[string]interface{}{
"_ttl": map[string]string{"enabled": "true", "default": "15s"},
},
}

// make sure the .packetbeat-topology index exists
// Ignore error here, as CreateIndex will error (400 Bad Request) if index
// already exists. If index could not be created, next api call to index will
// fail anyway.
index := ".packetbeat-topology"
_, _, _ = client.CreateIndex(index, nil)
_, _, err := client.Index(index, "server-ip", "_mapping", nil, setting)
if err != nil {
return err
}

t.ttlEnabled = true
return nil
}

// Get the name of a shipper by its IP address from the local topology map
func (t *topology) GetNameByIP(ip string) string {
topologyMap, ok := t.TopologyMap.Load().(map[string]string)
Expand All @@ -78,20 +47,14 @@ func (t *topology) GetNameByIP(ip string) string {

// Each shipper publishes a list of IPs together with its name to Elasticsearch
func (t *topology) PublishIPs(name string, localAddrs []string) error {
if !t.ttlEnabled {
debugf("Not publishing IPs because TTL was not yet confirmed to be enabled")
return nil
}

client := t.randomClient()
if client == nil {
return ErrNotConnected
}

debugf("Publish IPs %s with expiration time %d", localAddrs, t.TopologyExpire)
debugf("Publish IPs: %s", localAddrs)

params := map[string]string{
"ttl": fmt.Sprintf("%dms", t.TopologyExpire),
"refresh": "true",
}
_, _, err := client.Index(
Expand Down
8 changes: 3 additions & 5 deletions metricbeat/metricbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ metricbeat.modules:

# Expiration time (in seconds) of the IPs published by a shipper to the topology map.
# All the IPs will be deleted afterwards. Note, that the value must be higher than
# refresh_topology_freq. The default is 15 seconds.
# refresh_topology_freq. This setting is used only by the Redis output. The other
# outputs don't support expiring entries.
# The default is 15 seconds.
#topology_expire: 15

# Internal queue size for single events in processing pipeline
Expand Down Expand Up @@ -275,10 +277,6 @@ output.elasticsearch:
# false. This option makes sense only for Packetbeat.
#save_topology: false

# The time to live in seconds for the topology information that is stored in
# Elasticsearch. The default is 15 seconds.
#topology_expire: 15

# A template is used to set the mapping in Elasticsearch
# By default template loading is enabled and the template is loaded.
# These settings can be adjusted to load your own template or overwrite existing ones
Expand Down
8 changes: 3 additions & 5 deletions packetbeat/packetbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,9 @@ packetbeat.protocols.nfs:

# Expiration time (in seconds) of the IPs published by a shipper to the topology map.
# All the IPs will be deleted afterwards. Note, that the value must be higher than
# refresh_topology_freq. The default is 15 seconds.
# refresh_topology_freq. This setting is used only by the Redis output. The other
# outputs don't support expiring entries.
# The default is 15 seconds.
#topology_expire: 15

# Internal queue size for single events in processing pipeline
Expand Down Expand Up @@ -514,10 +516,6 @@ output.elasticsearch:
# false. This option makes sense only for Packetbeat.
#save_topology: false

# The time to live in seconds for the topology information that is stored in
# Elasticsearch. The default is 15 seconds.
#topology_expire: 15

# A template is used to set the mapping in Elasticsearch
# By default template loading is enabled and the template is loaded.
# These settings can be adjusted to load your own template or overwrite existing ones
Expand Down
8 changes: 3 additions & 5 deletions winlogbeat/winlogbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ winlogbeat.event_logs:

# Expiration time (in seconds) of the IPs published by a shipper to the topology map.
# All the IPs will be deleted afterwards. Note, that the value must be higher than
# refresh_topology_freq. The default is 15 seconds.
# refresh_topology_freq. This setting is used only by the Redis output. The other
# outputs don't support expiring entries.
# The default is 15 seconds.
#topology_expire: 15

# Internal queue size for single events in processing pipeline
Expand Down Expand Up @@ -165,10 +167,6 @@ output.elasticsearch:
# false. This option makes sense only for Packetbeat.
#save_topology: false

# The time to live in seconds for the topology information that is stored in
# Elasticsearch. The default is 15 seconds.
#topology_expire: 15

# A template is used to set the mapping in Elasticsearch
# By default template loading is enabled and the template is loaded.
# These settings can be adjusted to load your own template or overwrite existing ones
Expand Down

0 comments on commit f00cc96

Please sign in to comment.