Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop filebeat if filebeat is started without any prospectors defined #644

Merged
merged 1 commit into from
Jan 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion filebeat/beat/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
// Start up spooler
go fb.Spooler.Run()

crawl.Start(fb.FbConfig.Filebeat.Prospectors, fb.Spooler.Channel)
err = crawl.Start(fb.FbConfig.Filebeat.Prospectors, fb.Spooler.Channel)
if err != nil {
return err
}

// Publishes event to output
go Publish(b, fb)
Expand Down
14 changes: 10 additions & 4 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,22 @@ type Crawler struct {
running bool
}

func (crawler *Crawler) Start(files []config.ProspectorConfig, eventChan chan *input.FileEvent) {
func (crawler *Crawler) Start(prospectorConfigs []config.ProspectorConfig, eventChan chan *input.FileEvent) error {

pendingProspectorCnt := 0
crawler.running = true

if len(prospectorConfigs) == 0 {
return fmt.Errorf("No prospectors defined. You must have at least one prospector defined in the config file.")
}

// Prospect the globs/paths given on the command line and launch harvesters
for _, fileconfig := range files {
for _, prospectorConfig := range prospectorConfigs {

logp.Debug("prospector", "File Configs: %v", fileconfig.Paths)
logp.Debug("prospector", "File Configs: %v", prospectorConfig.Paths)

prospector := &Prospector{
ProspectorConfig: fileconfig,
ProspectorConfig: prospectorConfig,
registrar: crawler.Registrar,
}

Expand Down Expand Up @@ -76,6 +80,8 @@ func (crawler *Crawler) Start(files []config.ProspectorConfig, eventChan chan *i
}

logp.Info("All prospectors initialised with %d states to persist", len(crawler.Registrar.State))

return nil
}

func (crawler *Crawler) Stop() {
Expand Down
19 changes: 19 additions & 0 deletions filebeat/crawler/crawler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package crawler

import (
"testing"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/input"
"github.com/stretchr/testify/assert"
)

func TestCrawlerStartError(t *testing.T) {
crawler := Crawler{}
channel := make(chan *input.FileEvent, 1)
prospectorConfigs := []config.ProspectorConfig{}

error := crawler.Start(prospectorConfigs, channel)

assert.Error(t, error)
}
6 changes: 5 additions & 1 deletion filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
############################# Filebeat ######################################
filebeat:
prospectors:
{% if prospectors is not defined %}
{% set prospectors = true %}
{% endif %}
{% if prospectors %}
-
# Paths that should be crawled and fetched
{% if path %}paths:
Expand Down Expand Up @@ -48,7 +52,7 @@ filebeat:
timeout: 1s
max_lines: {{ max_lines|default(500) }}
{% endif %}

{% endif %}
spool_size:
idle_timeout: 0.1s
registry_file: {{ fb.working_dir + '/' }}{{ registryFile|default(".filebeat")}}
Expand Down
18 changes: 18 additions & 0 deletions filebeat/tests/system/test_prospector.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,21 @@ def test_rotating_ignore_older_low_write_rate(self):
max_timeout=5)

proc.kill_and_wait()

def test_shutdown_no_prospectors(self):
self.render_config_template(
prospectors=False,
)

proc = self.start_filebeat(debug_selectors=['*'])

# wait for first "Start next scan" log message
self.wait_until(
lambda: self.log_contains(
"No prospectors defined"),
max_timeout=10)

self.wait_until(
lambda: self.log_contains(
"shutting down"),
max_timeout=10)