Skip to content

Commit

Permalink
Merge pull request #799 from ruflin/flush-race
Browse files Browse the repository at this point in the history
Prevent race condition in flush methods
  • Loading branch information
andrewkroh committed Jan 25, 2016
2 parents 7a2aeeb + 3f1a1fb commit ba80aa2
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 19 deletions.
40 changes: 29 additions & 11 deletions filebeat/beat/spooler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package beat

import (
"sync"
"time"

cfg "github.com/elastic/beats/filebeat/config"
Expand All @@ -9,23 +10,23 @@ import (
)

type Spooler struct {
Filebeat *Filebeat
exit chan struct{}
nextFlushTime time.Time
spool []*input.FileEvent
Channel chan *input.FileEvent
Filebeat *Filebeat
exit chan struct{}
nextFlushTime time.Time
nextFlushTimeMutex *sync.Mutex
spool []*input.FileEvent
Channel chan *input.FileEvent
}

func NewSpooler(filebeat *Filebeat) *Spooler {
spooler := &Spooler{
Filebeat: filebeat,
exit: make(chan struct{}),
Filebeat: filebeat,
exit: make(chan struct{}),
nextFlushTimeMutex: &sync.Mutex{},
}

config := &spooler.Filebeat.FbConfig.Filebeat

// Set the next flush time
spooler.nextFlushTime = time.Now().Add(config.IdleTimeoutDuration)
spooler.setNextFlushTime()
spooler.Channel = make(chan *input.FileEvent, 16)

return spooler
Expand Down Expand Up @@ -91,7 +92,7 @@ func (s *Spooler) Run() {
}
case <-ticker.C:
// Flush periodically
if time.Now().After(s.nextFlushTime) {
if time.Now().After(s.getNextFlushTime()) {
logp.Debug("spooler", "Flushing spooler because of timeout. Events flushed: %v", len(s.spool))
s.flush()
}
Expand All @@ -101,6 +102,7 @@ func (s *Spooler) Run() {

// Stop stops the spooler. Flushes events before stopping
func (s *Spooler) Stop() {

logp.Info("Stopping spooler")
close(s.exit)

Expand All @@ -112,6 +114,7 @@ func (s *Spooler) Stop() {

// flush flushes all event and sends them to the publisher
func (s *Spooler) flush() {

// Checks if any new objects
if len(s.spool) > 0 {

Expand All @@ -125,5 +128,20 @@ func (s *Spooler) flush() {
// send
s.Filebeat.publisherChan <- tmpCopy
}

s.setNextFlushTime()
}

func (s *Spooler) setNextFlushTime() {
s.nextFlushTimeMutex.Lock()
defer s.nextFlushTimeMutex.Unlock()

s.nextFlushTime = time.Now().Add(s.Filebeat.FbConfig.Filebeat.IdleTimeoutDuration)
}

func (s *Spooler) getNextFlushTime() time.Time {
s.nextFlushTimeMutex.Lock()
defer s.nextFlushTimeMutex.Unlock()

return s.nextFlushTime
}
7 changes: 0 additions & 7 deletions filebeat/crawler/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ type Registrar struct {
State map[string]*FileState
// Channel used by the prospector and crawler to send FileStates to be persisted
Persist chan *input.FileState
running bool

Channel chan []*FileEvent
done chan struct{}
Expand Down Expand Up @@ -82,8 +81,6 @@ func (r *Registrar) LoadState() {
func (r *Registrar) Run() {
logp.Info("Starting Registrar")

r.running = true

// Writes registry on shutdown
defer r.writeRegistry()

Expand Down Expand Up @@ -112,9 +109,6 @@ func (r *Registrar) processEvents(events []*FileEvent) {

// Take the last event found for each file source
for _, event := range events {
if !r.running {
break
}

// skip stdin
if event.InputType == cfg.StdinInputType {
Expand All @@ -127,7 +121,6 @@ func (r *Registrar) processEvents(events []*FileEvent) {

func (r *Registrar) Stop() {
logp.Info("Stopping Registrar")
r.running = false
close(r.done)
// Note: don't block using waitGroup, cause this method is run by async signal handler
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/scripts/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ build: $(GOFILES)
# Create test coverage binary
.PHONY: buildbeat.test
buildbeat.test: $(GOFILES)
go test -c -covermode=atomic -coverpkg ${GOPACKAGES_COMMA_SEP}
go test -c -race -covermode=atomic -coverpkg ${GOPACKAGES_COMMA_SEP}

# Cross-compile beat for the OS'es specified in GOX_OS variable.
# The binaries are placed in the build/bin directory.
Expand Down

0 comments on commit ba80aa2

Please sign in to comment.