Skip to content

Commit

Permalink
Add support for closing publisher.Client (elastic#1402)
Browse files Browse the repository at this point in the history
* Init support for closing publisher.Client

* refactor + fixes in client close

- refactor signaling moving duplicated code from publisher/outputs to
  libbeat/common/op
- do not wait until pipelines are emptied by outputs on shutdown (clients must
  be disconnected before shutdown)
- fix cancel signaler in broker worker (in pipeline)
- proper signal cancel if publisher.Client has been disconnected

* close canceler done channel

* force beats to connect to publisher

* topbeat close publisher.Client on stop

* Winlogbeat close publisher.client

* Packetbeat close publisher client on stop

* filebeat publish shutdown

* Fix mockbeat

* Fix typos

* Enforce all clients must have disconnected

publisher pipeline stop method requires all clients having disconnected before
shutting down the pipeline. If this is not the case we will generated a panic.

* update winlogbeat stopping behavior

* clarify filebeat publisher exit on registrar queue

* remove obsolete 'failures' from winlogbeat expvars
  • Loading branch information
Steffen Siering authored and andrewkroh committed Apr 20, 2016
1 parent 8abd826 commit 57477cf
Show file tree
Hide file tree
Showing 47 changed files with 889 additions and 710 deletions.
10 changes: 7 additions & 3 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Filebeat struct {
spooler *Spooler
registrar *crawler.Registrar
crawler *crawler.Crawler
pub logPublisher
done chan struct{}
}

Expand Down Expand Up @@ -92,9 +93,9 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}

// Publishes event to output
pub := newPublisher(fb.FbConfig.Filebeat.PublishAsync,
fb.publisherChan, fb.registrar.Channel, b.Events)
pub.Start()
fb.pub = newPublisher(fb.FbConfig.Filebeat.PublishAsync,
fb.publisherChan, fb.registrar.Channel, b.Publisher.Connect())
fb.pub.Start()

// Blocks progressing
select {
Expand All @@ -119,6 +120,9 @@ func (fb *Filebeat) Stop() {
// Stopping spooler will flush items
fb.spooler.Stop()

// stopping publisher (might potentially drop items)
fb.pub.Stop()

// Stopping registrar will write last state
fb.registrar.Stop()

Expand Down
52 changes: 38 additions & 14 deletions filebeat/beater/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type asyncLogPublisher struct {
in, out chan []*input.FileEvent

// list of in-flight batches
active batchList
failing bool
active batchList
stopping bool

done chan struct{}
wg sync.WaitGroup
Expand All @@ -48,13 +48,17 @@ type batchList struct {
head, tail *eventsBatch
}

type batchStatus int32

const (
defaultGCTimeout = 1 * time.Second
)

const (
batchSuccess int32 = 1
batchFailed int32 = 2
batchInProgress batchStatus = iota
batchSuccess
batchFailed
batchCanceled
)

func newPublisher(
Expand Down Expand Up @@ -115,6 +119,7 @@ func (p *syncLogPublisher) Start() {

func (p *syncLogPublisher) Stop() {
close(p.done)
p.client.Close()
p.wg.Wait()
}

Expand Down Expand Up @@ -169,6 +174,7 @@ func (p *asyncLogPublisher) Start() {

func (p *asyncLogPublisher) Stop() {
close(p.done)
p.client.Close()
p.wg.Wait()
}

Expand All @@ -177,33 +183,46 @@ func (p *asyncLogPublisher) Stop() {
// as bulk-Events have been received by the spooler
func (p *asyncLogPublisher) collect() bool {
for batch := p.active.head; batch != nil; batch = batch.next {
state := atomic.LoadInt32(&batch.flag)
if state == 0 && !p.failing {
state := batchStatus(atomic.LoadInt32(&batch.flag))
if state == batchInProgress && !p.stopping {
break
}

if state == batchFailed {
// with guaranteed enabled this must must not happen.
msg := "Failed to process batch"
logp.Critical(msg)
panic(msg)
}

// remove batch from active list
p.active.head = batch.next
if batch.next == nil {
p.active.tail = nil
}

// Batches get marked as failed, if publisher pipeline is shutting down
// Batches get marked as canceled, if publisher pipeline is shutting down
// In this case we do not want to send any more batches to the registrar
if state == batchFailed {
p.failing = true
if state == batchCanceled {
p.stopping = true
}

if p.failing {
logp.Warn("No registrar update for potentially published batch.")
if p.stopping {
logp.Info("Shutting down - No registrar update for potentially published batch.")

// if in failing state keep cleaning up queue
continue
}

// Tell the registrar that we've successfully sent these events
// Tell the registrar that we've successfully publish the last batch events.
// If registrar queue is blocking (quite unlikely), but stop signal has been
// received in the meantime (by closing p.done), we do not wait for
// registrar picking up the current batch. Instead prefer to shut-down and
// resend the last published batch on next restart, basically taking advantage
// of send-at-last-once semantics in order to speed up cleanup on shutdown.
select {
case <-p.done:
logp.Info("Shutting down - No registrar update for successfully published batch.")
return false
case p.out <- batch.events:
}
Expand All @@ -212,12 +231,17 @@ func (p *asyncLogPublisher) collect() bool {
}

func (b *eventsBatch) Completed() {
atomic.StoreInt32(&b.flag, batchSuccess)
atomic.StoreInt32(&b.flag, int32(batchSuccess))
}

func (b *eventsBatch) Failed() {
logp.Err("Failed to publish batch. Stop updating registrar.")
atomic.StoreInt32(&b.flag, batchFailed)
atomic.StoreInt32(&b.flag, int32(batchFailed))
}

func (b *eventsBatch) Canceled() {
logp.Info("In-flight batch has been canceled during shutdown")
atomic.StoreInt32(&b.flag, int32(batchCanceled))
}

func (l *batchList) append(b *eventsBatch) {
Expand Down
10 changes: 5 additions & 5 deletions filebeat/beater/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"time"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/common/op"
pubtest "github.com/elastic/beats/libbeat/publisher/testing"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -45,7 +45,7 @@ func TestPublisherModes(t *testing.T) {

pubChan := make(chan []*input.FileEvent, len(test.order)+1)
regChan := make(chan []*input.FileEvent, len(test.order)+1)
client := publisher.ExtChanClient{make(chan publisher.PublishMessage)}
client := pubtest.NewChanClient(0)

pub := newPublisher(test.async, pubChan, regChan, client)
pub.Start()
Expand All @@ -57,14 +57,14 @@ func TestPublisherModes(t *testing.T) {
events = append(events, tmp)
}

var msgs []publisher.PublishMessage
var msgs []pubtest.PublishMessage
for _ = range test.order {
m := <-client.Channel
msgs = append(msgs, m)
}

for _, i := range test.order {
outputs.SignalCompleted(msgs[i-1].Context.Signal)
op.SigCompleted(msgs[i-1].Context.Signal)
}

var regEvents [][]*input.FileEvent
Expand Down
17 changes: 7 additions & 10 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,13 @@ type FlagsHandler interface {
// Beat contains the basic beat data and the publisher client used to publish
// events.
type Beat struct {
Name string // Beat name.
Version string // Beat version number. Defaults to the libbeat version when an implementation does not set a version.
UUID uuid.UUID // ID assigned to a Beat instance.
BT Beater // Beater implementation.
RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data.
Config BeatConfig // Common Beat configuration data.
Events publisher.Client // Client used for publishing events.
Publisher *publisher.PublisherType // Publisher
Name string // Beat name.
Version string // Beat version number. Defaults to the libbeat version when an implementation does not set a version.
UUID uuid.UUID // ID assigned to a Beat instance.
BT Beater // Beater implementation.
RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data.
Config BeatConfig // Common Beat configuration data.
Publisher *publisher.Publisher // Publisher

filters *filter.FilterList // Filters
}
Expand Down Expand Up @@ -234,8 +233,6 @@ func (bc *instance) setup() error {
}

bc.data.Publisher.RegisterFilter(bc.data.filters)
bc.data.Events = bc.data.Publisher.Client()

err = bc.beater.Setup(bc.data)
if err != nil {
return err
Expand Down
28 changes: 28 additions & 0 deletions libbeat/common/op/cancel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package op

import "sync"

type Canceler struct {
lock sync.RWMutex
done chan struct{}
active bool
}

func NewCanceler() *Canceler {
return &Canceler{
done: make(chan struct{}),
active: true,
}
}

func (c *Canceler) Cancel() {
c.lock.Lock()
c.active = false
c.lock.Unlock()

close(c.done)
}

func (c *Canceler) Done() <-chan struct{} {
return c.done
}
Loading

0 comments on commit 57477cf

Please sign in to comment.