Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filebeat Crawler and Prospector Refactoring #720

Merged
merged 2 commits into from
Jan 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
*Packetbeat*

*Topbeat*
- Rename proc.cpu.user_p with proc.cpu.total_p as includes CPU time spent in kernel space {pull}631[631]
- Rename proc.cpu.user_p with proc.cpu.total_p as includes CPU time spent in kernel space {pull}631[631]

*Filebeat*

Expand All @@ -43,6 +43,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
*Filebeat*
- Add exclude_files configuration option {pull}563[563]
- Stop filebeat if filebeat is started without any prospectors defined or empty prospectors {pull}644[644] {pull}647[647]
- Improve shutdown of crawler and prospector to wait for clean completion {pull}720[720]
- Set spool_size default value to 2048 {pull}628[628]

*Winlogbeat*
Expand Down
36 changes: 23 additions & 13 deletions filebeat/beat/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ type Filebeat struct {
FbConfig *cfg.Config
// Channel from harvesters to spooler
publisherChan chan []*FileEvent
Spooler *Spooler
spooler *Spooler
registrar *Registrar
cralwer *Crawler
done chan struct{}
}

func New() *Filebeat {
Expand All @@ -44,6 +46,8 @@ func (fb *Filebeat) Config(b *beat.Beat) error {
}

func (fb *Filebeat) Setup(b *beat.Beat) error {
fb.done = make(chan struct{})

return nil
}

Expand All @@ -61,35 +65,40 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

crawl := &Crawler{
fb.cralwer = &Crawler{
Registrar: fb.registrar,
}

// Load the previous log file locations now, for use in prospector
fb.registrar.LoadState()

// Init and Start spooler: Harvesters dump events into the spooler.
fb.Spooler = NewSpooler(fb)
err = fb.Spooler.Config()
fb.spooler = NewSpooler(fb)
err = fb.spooler.Config()

if err != nil {
logp.Err("Could not init spooler: %v", err)
return err
}

// Start up spooler
go fb.Spooler.Run()
go fb.spooler.Run()

// registrar records last acknowledged positions in all files.
go fb.registrar.Run()

err = crawl.Start(fb.FbConfig.Filebeat.Prospectors, fb.Spooler.Channel)
err = fb.cralwer.Start(fb.FbConfig.Filebeat.Prospectors, fb.spooler.Channel)
if err != nil {
return err
}

// Publishes event to output
go Publish(b, fb)

// registrar records last acknowledged positions in all files.
fb.registrar.Run()
// Blocks progressing
select {
case <-fb.done:
}

return nil
}
Expand All @@ -101,17 +110,18 @@ func (fb *Filebeat) Cleanup(b *beat.Beat) error {
// Stop is called on exit for cleanup
func (fb *Filebeat) Stop() {

// Stop harvesters
// Stop prospectors
logp.Info("Stopping filebeat")
// Stop crawler -> stop prospectors -> stop harvesters
fb.cralwer.Stop()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if harvester is blocked by channel forwarding events to the spooler and spooler itself is blocked by libbeat due to ES/LS not responding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I just tried it out locally and unfortunately it exactly happens what you described. Any suggestion on how we should solve it?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stopping must ripple through the layers. The done queue can be either shared globally or we're using done queue per layer.

In harvester there are effectively 2 places the harvester can block:

  1. reading from file
  2. forwarding event to spooler

Part 2 is quite simple by using select when publishing. Part 1 should be possible by closing the file from another goroutine. You can get check actual stack-trace if filebeat shutdown is hanging by running filebeat with -httpprof :6060 and pointing your browser to http://localhost:6060/debug/pprof

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're basically missing some function in libbeat to close the publisher.Client asynchronously.


// Stopping spooler will flush items
fb.Spooler.Stop()
fb.spooler.Stop()

// Stopping registrar will write last state
fb.registrar.Stop()

// Close channels
//close(fb.publisherChan)
// Stop Filebeat
close(fb.done)
}

func Publish(beat *beat.Beat, fb *Filebeat) {
Expand Down
23 changes: 10 additions & 13 deletions filebeat/beat/spooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

type Spooler struct {
Filebeat *Filebeat
running bool
exit chan struct{}
nextFlushTime time.Time
spool []*input.FileEvent
Channel chan *input.FileEvent
Expand All @@ -19,7 +19,7 @@ type Spooler struct {
func NewSpooler(filebeat *Filebeat) *Spooler {
spooler := &Spooler{
Filebeat: filebeat,
running: false,
exit: make(chan struct{}),
}

config := &spooler.Filebeat.FbConfig.Filebeat
Expand Down Expand Up @@ -66,9 +66,6 @@ func (s *Spooler) Run() {

config := &s.Filebeat.FbConfig.Filebeat

// Enable running
s.running = true

// Sets up ticket channel
ticker := time.NewTicker(config.IdleTimeoutDuration / 2)

Expand All @@ -78,11 +75,10 @@ func (s *Spooler) Run() {

// Loops until running is set to false
for {
if !s.running {
break
}

select {

case <-s.exit:
break
case event := <-s.Channel:
s.spool = append(s.spool, event)

Expand All @@ -99,18 +95,19 @@ func (s *Spooler) Run() {
}
}
}
}

// Stop stops the spooler. Flushes events before stopping
func (s *Spooler) Stop() {
logp.Info("Stopping spooler")
close(s.exit)

// Flush again before exiting spooler and closes channel
logp.Info("Spooler last flush spooler")
s.flush()
close(s.Channel)
}

// Stop stops the spooler. Flushes events before stopping
func (s *Spooler) Stop() {
}

// flush flushes all event and sends them to the publisher
func (s *Spooler) flush() {
// Checks if any new objects
Expand Down
40 changes: 23 additions & 17 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package crawler

import (
"fmt"
"sync"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/input"
Expand All @@ -22,47 +23,52 @@ import (

type Crawler struct {
// Registrar object to persist the state
Registrar *Registrar
running bool
Registrar *Registrar
prospectors []*Prospector
wg sync.WaitGroup
}

func (crawler *Crawler) Start(prospectorConfigs []config.ProspectorConfig, eventChan chan *input.FileEvent) error {

crawler.running = true
func (c *Crawler) Start(prospectorConfigs []config.ProspectorConfig, eventChan chan *input.FileEvent) error {

if len(prospectorConfigs) == 0 {
return fmt.Errorf("No prospectors defined. You must have at least one prospector defined in the config file.")
}

var prospectors []*Prospector

logp.Info("Loading Prospectors: %v", len(prospectorConfigs))

// Prospect the globs/paths given on the command line and launch harvesters
for _, prospectorConfig := range prospectorConfigs {

logp.Debug("prospector", "File Configs: %v", prospectorConfig.Paths)

prospector, err := NewProspector(prospectorConfig, crawler.Registrar, eventChan)
prospectors = append(prospectors, prospector)
prospector, err := NewProspector(prospectorConfig, c.Registrar, eventChan)
c.prospectors = append(c.prospectors, prospector)

if err != nil {
return fmt.Errorf("Error in initing prospector: %s", err)
}
}
logp.Info("Loading Prospectors completed")

logp.Info("Running Prospectors")
for _, prospector := range prospectors {
go prospector.Run()
logp.Info("Loading Prospectors completed. Number of prospectors: %v", len(c.prospectors))

c.wg = sync.WaitGroup{}
for _, prospector := range c.prospectors {
c.wg.Add(1)
go prospector.Run(&c.wg)
}
logp.Info("All prospectors are running")

logp.Info("All prospectors initialised with %d states to persist", len(crawler.Registrar.State))
logp.Info("All prospectors are initialised and running with %d states to persist", len(c.Registrar.State))

return nil
}

func (crawler *Crawler) Stop() {
// TODO: Properly stop prospectors and harvesters
func (c *Crawler) Stop() {
logp.Info("Stopping Crawler")

logp.Info("Stopping %v prospectors", len(c.prospectors))
for _, prospector := range c.prospectors {
prospector.Stop()
}
c.wg.Wait()
logp.Info("Crawler stopped")
}
28 changes: 21 additions & 7 deletions filebeat/crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package crawler

import (
"fmt"
"sync"
"time"

cfg "github.com/elastic/beats/filebeat/config"
Expand All @@ -15,7 +16,7 @@ type Prospector struct {
prospectorer Prospectorer
channel chan *input.FileEvent
registrar *Registrar
running bool
done chan struct{}
}

type Prospectorer interface {
Expand All @@ -28,6 +29,7 @@ func NewProspector(prospectorConfig cfg.ProspectorConfig, registrar *Registrar,
ProspectorConfig: prospectorConfig,
registrar: registrar,
channel: channel,
done: make(chan struct{}),
}

err := prospector.Init()
Expand Down Expand Up @@ -61,6 +63,7 @@ func (p *Prospector) Init() error {
case cfg.LogInputType:
prospectorer, err = NewProspectorLog(p.ProspectorConfig, p.channel, p.registrar)
prospectorer.Init()

default:
return fmt.Errorf("Invalid prospector type: %v", p.ProspectorConfig.Harvester.InputType)
}
Expand All @@ -71,21 +74,32 @@ func (p *Prospector) Init() error {
}

// Starts scanning through all the file paths and fetch the related files. Start a harvester for each file
func (p *Prospector) Run() {
func (p *Prospector) Run(wg *sync.WaitGroup) {

// TODO: Defer the wg.Done() call to block shutdown
// Currently there are 2 cases where shutting down the prospector could be blocked:
// 1. reading from file
// 2. forwarding event to spooler
// As this is not implemented yet, no blocking on prospector shutdown is done.
wg.Done()

p.running = true
logp.Info("Starting prospector of type: %v", p.ProspectorConfig.Harvester.InputType)

for {
p.prospectorer.Run(p.channel)
if !p.running {
break
select {
case <-p.done:
logp.Info("Prospector stopped")
return
default:
logp.Info("Run prospector")
p.prospectorer.Run(p.channel)
}
}
}

func (p *Prospector) Stop() {
// TODO: Stopping is currently not implemented
logp.Info("Stopping Prospector")
close(p.done)
}

// Setup Prospector Config
Expand Down
1 change: 0 additions & 1 deletion filebeat/harvester/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ func readLine(reader processor.LineProcessor) (time.Time, string, int, error) {

// Full line read to be returned
if l.Bytes != 0 && err == nil {
logp.Debug("harvester", "full line read")
return l.Ts, string(l.Content), l.Bytes, err
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this line as it generated too much debu output.

}

Expand Down
13 changes: 5 additions & 8 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,12 @@ func Run(name string, version string, bt Beater) {
}()

// Waits until beats channel is closed
for {
select {
case <-b.exit:
b.Stop()
logp.Info("Exit beat completed")
return
}
select {
case <-b.exit:
b.Stop()
logp.Info("Exit beat completed")
return
}

}

func (b *Beat) Start() error {
Expand Down