Skip to content

Commit

Permalink
Make clean shutdown of prospector and crawler.
Browse files Browse the repository at this point in the history
* Start registrar first
* Introduce waitgroup for started prospectors. Keep list of prospectors in crawler
* Code cleanup
  • Loading branch information
ruflin committed Jan 14, 2016
1 parent be576ad commit 2cb241d
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 52 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
*Packetbeat*

*Topbeat*
- Rename proc.cpu.user_p with proc.cpu.total_p as includes CPU time spent in kernel space {pull}631[631]
- Rename proc.cpu.user_p with proc.cpu.total_p as includes CPU time spent in kernel space {pull}631[631]

*Filebeat*

Expand All @@ -43,6 +43,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
*Filebeat*
- Add exclude_files configuration option {pull}563[563]
- Stop filebeat if filebeat is started without any prospectors defined or empty prospectors {pull}644[644] {pull}647[647]
- Improve shutdown of crawler and prospector to wait for clean completion {pull}720[720]
- Set spool_size default value to 2048 {pull}628[628]

*Winlogbeat*
Expand Down
36 changes: 23 additions & 13 deletions filebeat/beat/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ type Filebeat struct {
FbConfig *cfg.Config
// Channel from harvesters to spooler
publisherChan chan []*FileEvent
Spooler *Spooler
spooler *Spooler
registrar *Registrar
cralwer *Crawler
done chan struct{}
}

func New() *Filebeat {
Expand All @@ -44,6 +46,8 @@ func (fb *Filebeat) Config(b *beat.Beat) error {
}

func (fb *Filebeat) Setup(b *beat.Beat) error {
fb.done = make(chan struct{})

return nil
}

Expand All @@ -61,35 +65,40 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

crawl := &Crawler{
fb.cralwer = &Crawler{
Registrar: fb.registrar,
}

// Load the previous log file locations now, for use in prospector
fb.registrar.LoadState()

// Init and Start spooler: Harvesters dump events into the spooler.
fb.Spooler = NewSpooler(fb)
err = fb.Spooler.Config()
fb.spooler = NewSpooler(fb)
err = fb.spooler.Config()

if err != nil {
logp.Err("Could not init spooler: %v", err)
return err
}

// Start up spooler
go fb.Spooler.Run()
go fb.spooler.Run()

// registrar records last acknowledged positions in all files.
go fb.registrar.Run()

err = crawl.Start(fb.FbConfig.Filebeat.Prospectors, fb.Spooler.Channel)
err = fb.cralwer.Start(fb.FbConfig.Filebeat.Prospectors, fb.spooler.Channel)
if err != nil {
return err
}

// Publishes event to output
go Publish(b, fb)

// registrar records last acknowledged positions in all files.
fb.registrar.Run()
// Blocks progressing
select {
case <-fb.done:
}

return nil
}
Expand All @@ -101,17 +110,18 @@ func (fb *Filebeat) Cleanup(b *beat.Beat) error {
// Stop is called on exit for cleanup
func (fb *Filebeat) Stop() {

// Stop harvesters
// Stop prospectors
logp.Info("Stopping filebeat")
// Stop crawler -> stop prospectors -> stop harvesters
fb.cralwer.Stop()

// Stopping spooler will flush items
fb.Spooler.Stop()
fb.spooler.Stop()

// Stopping registrar will write last state
fb.registrar.Stop()

// Close channels
//close(fb.publisherChan)
// Stop Filebeat
close(fb.done)
}

func Publish(beat *beat.Beat, fb *Filebeat) {
Expand Down
23 changes: 10 additions & 13 deletions filebeat/beat/spooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

type Spooler struct {
Filebeat *Filebeat
running bool
exit chan struct{}
nextFlushTime time.Time
spool []*input.FileEvent
Channel chan *input.FileEvent
Expand All @@ -19,7 +19,7 @@ type Spooler struct {
func NewSpooler(filebeat *Filebeat) *Spooler {
spooler := &Spooler{
Filebeat: filebeat,
running: false,
exit: make(chan struct{}),
}

config := &spooler.Filebeat.FbConfig.Filebeat
Expand Down Expand Up @@ -66,9 +66,6 @@ func (s *Spooler) Run() {

config := &s.Filebeat.FbConfig.Filebeat

// Enable running
s.running = true

// Sets up ticket channel
ticker := time.NewTicker(config.IdleTimeoutDuration / 2)

Expand All @@ -78,11 +75,10 @@ func (s *Spooler) Run() {

// Loops until running is set to false
for {
if !s.running {
break
}

select {

case <-s.exit:
break
case event := <-s.Channel:
s.spool = append(s.spool, event)

Expand All @@ -99,18 +95,19 @@ func (s *Spooler) Run() {
}
}
}
}

// Stop stops the spooler. Flushes events before stopping
func (s *Spooler) Stop() {
logp.Info("Stopping spooler")
close(s.exit)

// Flush again before exiting spooler and closes channel
logp.Info("Spooler last flush spooler")
s.flush()
close(s.Channel)
}

// Stop stops the spooler. Flushes events before stopping
func (s *Spooler) Stop() {
}

// flush flushes all event and sends them to the publisher
func (s *Spooler) flush() {
// Checks if any new objects
Expand Down
40 changes: 23 additions & 17 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package crawler

import (
"fmt"
"sync"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/input"
Expand All @@ -22,47 +23,52 @@ import (

type Crawler struct {
// Registrar object to persist the state
Registrar *Registrar
running bool
Registrar *Registrar
prospectors []*Prospector
wg sync.WaitGroup
}

func (crawler *Crawler) Start(prospectorConfigs []config.ProspectorConfig, eventChan chan *input.FileEvent) error {

crawler.running = true
func (c *Crawler) Start(prospectorConfigs []config.ProspectorConfig, eventChan chan *input.FileEvent) error {

if len(prospectorConfigs) == 0 {
return fmt.Errorf("No prospectors defined. You must have at least one prospector defined in the config file.")
}

var prospectors []*Prospector

logp.Info("Loading Prospectors: %v", len(prospectorConfigs))

// Prospect the globs/paths given on the command line and launch harvesters
for _, prospectorConfig := range prospectorConfigs {

logp.Debug("prospector", "File Configs: %v", prospectorConfig.Paths)

prospector, err := NewProspector(prospectorConfig, crawler.Registrar, eventChan)
prospectors = append(prospectors, prospector)
prospector, err := NewProspector(prospectorConfig, c.Registrar, eventChan)
c.prospectors = append(c.prospectors, prospector)

if err != nil {
return fmt.Errorf("Error in initing prospector: %s", err)
}
}
logp.Info("Loading Prospectors completed")

logp.Info("Running Prospectors")
for _, prospector := range prospectors {
go prospector.Run()
logp.Info("Loading Prospectors completed. Number of prospectors: %v", len(c.prospectors))

c.wg = sync.WaitGroup{}
for _, prospector := range c.prospectors {
c.wg.Add(1)
go prospector.Run(&c.wg)
}
logp.Info("All prospectors are running")

logp.Info("All prospectors initialised with %d states to persist", len(crawler.Registrar.State))
logp.Info("All prospectors are initialised and running with %d states to persist", len(c.Registrar.State))

return nil
}

func (crawler *Crawler) Stop() {
// TODO: Properly stop prospectors and harvesters
func (c *Crawler) Stop() {
logp.Info("Stopping Crawler")

logp.Info("Stopping %v prospectors", len(c.prospectors))
for _, prospector := range c.prospectors {
prospector.Stop()
}
c.wg.Wait()
logp.Info("Crawler stopped")
}
28 changes: 21 additions & 7 deletions filebeat/crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package crawler

import (
"fmt"
"sync"
"time"

cfg "github.com/elastic/beats/filebeat/config"
Expand All @@ -15,7 +16,7 @@ type Prospector struct {
prospectorer Prospectorer
channel chan *input.FileEvent
registrar *Registrar
running bool
done chan struct{}
}

type Prospectorer interface {
Expand All @@ -28,6 +29,7 @@ func NewProspector(prospectorConfig cfg.ProspectorConfig, registrar *Registrar,
ProspectorConfig: prospectorConfig,
registrar: registrar,
channel: channel,
done: make(chan struct{}),
}

err := prospector.Init()
Expand Down Expand Up @@ -61,6 +63,7 @@ func (p *Prospector) Init() error {
case cfg.LogInputType:
prospectorer, err = NewProspectorLog(p.ProspectorConfig, p.channel, p.registrar)
prospectorer.Init()

default:
return fmt.Errorf("Invalid prospector type: %v", p.ProspectorConfig.Harvester.InputType)
}
Expand All @@ -71,21 +74,32 @@ func (p *Prospector) Init() error {
}

// Starts scanning through all the file paths and fetch the related files. Start a harvester for each file
func (p *Prospector) Run() {
func (p *Prospector) Run(wg *sync.WaitGroup) {

// TODO: Defer the wg.Done() call to block shutdown
// Currently there are 2 cases where shutting down the prospector could be blocked:
// 1. reading from file
// 2. forwarding event to spooler
// As this is not implemented yet, no blocking on prospector shutdown is done.
wg.Done()

p.running = true
logp.Info("Starting prospector of type: %v", p.ProspectorConfig.Harvester.InputType)

for {
p.prospectorer.Run(p.channel)
if !p.running {
break
select {
case <-p.done:
logp.Info("Prospector stopped")
return
default:
logp.Info("Run prospector")
p.prospectorer.Run(p.channel)
}
}
}

func (p *Prospector) Stop() {
// TODO: Stopping is currently not implemented
logp.Info("Stopping Prospector")
close(p.done)
}

// Setup Prospector Config
Expand Down
1 change: 0 additions & 1 deletion filebeat/harvester/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ func readLine(reader processor.LineProcessor) (time.Time, string, int, error) {

// Full line read to be returned
if l.Bytes != 0 && err == nil {
logp.Debug("harvester", "full line read")
return l.Ts, string(l.Content), l.Bytes, err
}

Expand Down

0 comments on commit 2cb241d

Please sign in to comment.