Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate events by time or count #588

Closed
erik-stephens opened this issue Dec 22, 2015 · 1 comment
Closed

Aggregate events by time or count #588

erik-stephens opened this issue Dec 22, 2015 · 1 comment
Labels
discuss Issue needs further discussion. Filebeat Filebeat

Comments

@erik-stephens
Copy link

Now that filebeat is planning to perform filtering, it would be nice to be able to aggregate events with undefined start or stop patterns. Please refer to this issue for more background and use cases.

Doing this in logstash is possible but has some costs. However, one advantage of doing in logstash is the ability to dynamically load code. I expect that the details of what the aggregated event looks like will be very subjective and that's why logstash-filter-aggregate's code block is nice. To get a sense of what I'm talking about, here is an example of how I'm using my hack of logstash-filter-aggregate:

filter {
  # For msgs with no start & finish indicators.  Count number of msgs
  # and save to an aggregated event every 10 minutes.
  #
  grok {
    match => {
      message => '(?<task>(?i:rides the rocket|xmit failed|suddenly disconnected|bad exit|wrong id|duplicate request|failed to connect))'
    }
  }
  if [task] {
    aggregate {
      task_id => '%{process} on %{hostname} %{task}'
      timeout => 600
      timeout_tag => 'aggregated'
      code => "
        if map['_expired']
          event['@timestamp'] = map['@timestamp']
          event['message'] = %Q(#{map['_task_id']} #{map['count']} times)
          event['shipper'] = map['shipper']
          event['type'] = map['type']
          event['hostname'] = map['hostname']
          event['process'] = map['process']
          event['severity'] = map['severity'].keys.min
          event['count'] = map['count']
          event.tag('keep')
        else
          map['@timestamp'] = event['@timestamp']
          map['shipper'] = event['shipper']
          map['type'] = event['type']
          map['hostname'] = event['hostname']
          map['process'] = event['process']
          map['severity'] = {} if map['severity'].nil?
          map['severity'][event['severity']] = true
          map['count'] = 0 if map['count'].nil?
          map['count'] += 1
        end
      "
      create_event_when_timeout => true
    }
    if 'aggregated' not in [tags] {
      drop {}
    }
  }
}
@monicasarbu monicasarbu added discuss Issue needs further discussion. Filebeat Filebeat labels Jan 5, 2016
@erik-stephens
Copy link
Author

Looks like either timeout or max_lines of the multiline option address the most critical part for my use case. 👏 Should now be able to process the multiline message in logstash much more effectively or maybe do host side with something like #451.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discuss Issue needs further discussion. Filebeat Filebeat
Projects
None yet
Development

No branches or pull requests

2 participants