diff --git a/cmd/botkube-agent/main.go b/cmd/botkube-agent/main.go index 5555ff5fa..ea39a9a3e 100644 --- a/cmd/botkube-agent/main.go +++ b/cmd/botkube-agent/main.go @@ -114,21 +114,21 @@ func run(ctx context.Context) (err error) { logger.Warnf("Configuration validation warnings: %v", confDetails.ValidateWarnings.Error()) } // Set up analytics reporter - reporter, err := getAnalyticsReporter(conf.Analytics.Disable, logger) + analyticsReporter, err := getAnalyticsReporter(conf.Analytics.Disable, logger) if err != nil { return fmt.Errorf("while creating analytics reporter: %w", err) } defer func() { - err := reporter.Close() + err := analyticsReporter.Close() if err != nil { logger.Errorf("while closing reporter: %s", err.Error()) } }() // from now on recover from any panic, report it and close reader and app. // The reader must be not closed to report the panic properly. - defer analytics.ReportPanicIfOccurs(logger, reporter) + defer analytics.ReportPanicIfOccurs(logger, analyticsReporter) - reportFatalError := reportFatalErrFn(logger, reporter, statusReporter) + reportFatalError := reportFatalErrFn(logger, analyticsReporter, statusReporter) // Prepare K8s clients and mapper kubeConfig, err := kubex.BuildConfigFromFlags("", conf.Settings.Kubeconfig, conf.Settings.SACredentialsPathPrefix) if err != nil { @@ -148,7 +148,7 @@ func run(ctx context.Context) (err error) { if err = statusReporter.ReportDeploymentConnectionInit(ctx, k8sVer); err != nil { return reportFatalError("while reporting botkube connection initialization", err) } - err = reporter.RegisterCurrentIdentity(ctx, k8sCli, remoteCfg.Identifier) + err = analyticsReporter.RegisterCurrentIdentity(ctx, k8sCli, remoteCfg.Identifier) if err != nil { return reportFatalError("while registering current identity", err) } @@ -183,6 +183,14 @@ func run(ctx context.Context) (err error) { err = reportFatalError("while waiting for goroutines to finish gracefully", multiErr.ErrorOrNil()) }() + errGroup.Go(func() error { + err := analyticsReporter.Run(ctx) + if err != nil { + logger.Errorf("while closing reporter: %s", err.Error()) + } + return err + }) + schedulerChan := make(chan string) pluginHealthStats := plugin.NewHealthStats(conf.Plugins.RestartPolicy.Threshold) collector := plugin.NewCollector(logger) @@ -193,7 +201,7 @@ func run(ctx context.Context) (err error) { healthChecker := health.NewChecker(ctx, conf, pluginHealthStats) healthSrv := healthChecker.NewServer(logger.WithField(componentLogFieldKey, "Health server"), conf.Settings.HealthPort) errGroup.Go(func() error { - defer analytics.ReportPanicIfOccurs(logger, reporter) + defer analytics.ReportPanicIfOccurs(logger, analyticsReporter) return healthSrv.Serve(ctx) }) @@ -206,7 +214,7 @@ func run(ctx context.Context) (err error) { // Prometheus metrics metricsSrv := newMetricsServer(logger.WithField(componentLogFieldKey, "Metrics server"), conf.Settings.MetricsPort) errGroup.Go(func() error { - defer analytics.ReportPanicIfOccurs(logger, reporter) + defer analytics.ReportPanicIfOccurs(logger, analyticsReporter) return metricsSrv.Serve(ctx) }) @@ -218,7 +226,7 @@ func run(ctx context.Context) (err error) { Log: logger.WithField(componentLogFieldKey, "Executor"), Cfg: *conf, CfgManager: cfgManager, - AnalyticsReporter: reporter, + AnalyticsReporter: analyticsReporter, CommandGuard: cmdGuard, PluginManager: pluginManager, BotKubeVersion: botkubeVersion, @@ -253,14 +261,14 @@ func run(ctx context.Context) (err error) { scheduleBotNotifier := func(in bot.Bot) { bots[fmt.Sprintf("%s-%s", commGroupName, in.IntegrationName())] = in errGroup.Go(func() error { - defer analytics.ReportPanicIfOccurs(commGroupLogger, reporter) + defer analytics.ReportPanicIfOccurs(commGroupLogger, analyticsReporter) return in.Start(ctx) }) } // Run bots if commGroupCfg.Slack.Enabled { - sb, err := bot.NewSlack(commGroupLogger.WithField(botLogFieldKey, "Slack"), commGroupMeta, commGroupCfg.Slack, executorFactory, reporter) + sb, err := bot.NewSlack(commGroupLogger.WithField(botLogFieldKey, "Slack"), commGroupMeta, commGroupCfg.Slack, executorFactory, analyticsReporter) if err != nil { return reportFatalError("while creating Slack bot", err) } @@ -268,7 +276,7 @@ func run(ctx context.Context) (err error) { } if commGroupCfg.SocketSlack.Enabled { - sb, err := bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, reporter) + sb, err := bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, analyticsReporter) if err != nil { return reportFatalError("while creating SocketSlack bot", err) } @@ -276,7 +284,7 @@ func run(ctx context.Context) (err error) { } if commGroupCfg.CloudSlack.Enabled { - sb, err := bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, reporter) + sb, err := bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, analyticsReporter) if err != nil { return reportFatalError("while creating CloudSlack bot", err) } @@ -284,7 +292,7 @@ func run(ctx context.Context) (err error) { } if commGroupCfg.Mattermost.Enabled { - mb, err := bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, reporter) + mb, err := bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, analyticsReporter) if err != nil { return reportFatalError("while creating Mattermost bot", err) } @@ -292,7 +300,7 @@ func run(ctx context.Context) (err error) { } if commGroupCfg.Teams.Enabled { - tb, err := bot.NewTeams(commGroupLogger.WithField(botLogFieldKey, "MS Teams"), commGroupMeta, commGroupCfg.Teams, conf.Settings.ClusterName, executorFactory, reporter) + tb, err := bot.NewTeams(commGroupLogger.WithField(botLogFieldKey, "MS Teams"), commGroupMeta, commGroupCfg.Teams, conf.Settings.ClusterName, executorFactory, analyticsReporter) if err != nil { return reportFatalError("while creating Teams bot", err) } @@ -300,7 +308,7 @@ func run(ctx context.Context) (err error) { } if commGroupCfg.CloudTeams.Enabled { - ctb, err := bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, reporter) + ctb, err := bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, analyticsReporter) if err != nil { return reportFatalError("while creating CloudSlack bot", err) } @@ -308,7 +316,7 @@ func run(ctx context.Context) (err error) { } if commGroupCfg.Discord.Enabled { - db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, reporter) + db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, analyticsReporter) if err != nil { return reportFatalError("while creating Discord bot", err) } @@ -317,7 +325,7 @@ func run(ctx context.Context) (err error) { // Run sinks if commGroupCfg.Elasticsearch.Enabled { - es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, reporter) + es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, analyticsReporter) if err != nil { return reportFatalError("while creating Elasticsearch sink", err) } @@ -325,7 +333,7 @@ func run(ctx context.Context) (err error) { } if commGroupCfg.Webhook.Enabled { - wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, reporter) + wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, analyticsReporter) if err != nil { return reportFatalError("while creating Webhook sink", err) } @@ -352,7 +360,7 @@ func run(ctx context.Context) (err error) { deployClient, dynamicCli, restarter, - reporter, + analyticsReporter, *conf, cfgVersion, cfgManager, @@ -361,7 +369,7 @@ func run(ctx context.Context) (err error) { return reportFatalError("while creating config reloader", err) } errGroup.Go(func() error { - defer analytics.ReportPanicIfOccurs(logger, reporter) + defer analytics.ReportPanicIfOccurs(logger, analyticsReporter) return cfgReloader.Do(ctx) }) } @@ -384,14 +392,14 @@ func run(ctx context.Context) (err error) { ghCli.Repositories, ) errGroup.Go(func() error { - defer analytics.ReportPanicIfOccurs(logger, reporter) + defer analytics.ReportPanicIfOccurs(logger, analyticsReporter) return upgradeChecker.Run(ctx) }) } actionProvider := action.NewProvider(logger.WithField(componentLogFieldKey, "Action Provider"), conf.Actions, executorFactory) - sourcePluginDispatcher := source.NewDispatcher(logger, conf.Settings.ClusterName, bots, sinkNotifiers, pluginManager, actionProvider, reporter, auditReporter, kubeConfig) + sourcePluginDispatcher := source.NewDispatcher(logger, conf.Settings.ClusterName, bots, sinkNotifiers, pluginManager, actionProvider, analyticsReporter, auditReporter, kubeConfig) scheduler := source.NewScheduler(ctx, logger, conf, sourcePluginDispatcher, schedulerChan) err = scheduler.Start(ctx) if err != nil { @@ -407,7 +415,7 @@ func run(ctx context.Context) (err error) { ) errGroup.Go(func() error { - defer analytics.ReportPanicIfOccurs(logger, reporter) + defer analytics.ReportPanicIfOccurs(logger, analyticsReporter) return incomingWebhookSrv.Serve(ctx) }) } diff --git a/go.mod b/go.mod index e0e621571..b81b9d1e0 100644 --- a/go.mod +++ b/go.mod @@ -48,6 +48,7 @@ require ( github.com/mattermost/mattermost/server/public v0.0.6 github.com/mattn/go-isatty v0.0.19 github.com/mattn/go-shellwords v1.0.12 + github.com/mitchellh/mapstructure v1.5.0 github.com/morikuni/aec v1.0.0 github.com/muesli/reflow v0.3.0 github.com/olekukonko/tablewriter v0.0.5 @@ -202,7 +203,6 @@ require ( github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-testing-interface v1.14.1 // indirect github.com/mitchellh/go-wordwrap v1.0.1 // indirect - github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/moby/locker v1.0.1 // indirect github.com/moby/spdystream v0.2.0 // indirect diff --git a/internal/analytics/batched/data.go b/internal/analytics/batched/data.go new file mode 100644 index 000000000..09a271536 --- /dev/null +++ b/internal/analytics/batched/data.go @@ -0,0 +1,71 @@ +package batched + +import ( + "sync" +) + +const ( + + // Segment limits the API calls to 32kB per request: https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/ + // We save 2kB (2048 characters) for general metadata. The rest of 30kB we can spend for sending source event details. + // Average event details size is 300 characters. So in theory we could include 30*1024/300=102.4 events. + // As the plugin name and additional labels don't have fixed size, we limit the number of events to 75 to be on the safe side. + maxEventDetailsCount = 75 +) + +// Data is a struct that holds data for batched reporting +type Data struct { + mutex sync.RWMutex + + defaultTimeWindowInHours int + heartbeatProperties HeartbeatProperties +} + +func NewData(defaultTimeWindowInHours int) *Data { + return &Data{ + defaultTimeWindowInHours: defaultTimeWindowInHours, + heartbeatProperties: HeartbeatProperties{ + TimeWindowInHours: defaultTimeWindowInHours, + EventsCount: 0, + Sources: make(map[string]SourceProperties), + }} +} + +func (d *Data) IncrementTimeWindowInHours() { + d.mutex.Lock() + defer d.mutex.Unlock() + + d.heartbeatProperties.TimeWindowInHours++ +} + +func (d *Data) Reset() { + d.mutex.Lock() + defer d.mutex.Unlock() + + d.heartbeatProperties.TimeWindowInHours = d.defaultTimeWindowInHours + d.heartbeatProperties.Sources = make(map[string]SourceProperties) + d.heartbeatProperties.EventsCount = 0 +} + +func (d *Data) HeartbeatProperties() HeartbeatProperties { + d.mutex.RLock() + defer d.mutex.RUnlock() + + return d.heartbeatProperties +} + +func (d *Data) AddSourceEvent(in SourceEvent) { + d.mutex.Lock() + defer d.mutex.Unlock() + + d.heartbeatProperties.EventsCount++ + + key := in.PluginName + sourceProps := d.heartbeatProperties.Sources[key] + sourceProps.EventsCount++ + if d.heartbeatProperties.EventsCount <= maxEventDetailsCount { + // save event details only if we didn't exceed the limit + sourceProps.Events = append(sourceProps.Events, in) + } + d.heartbeatProperties.Sources[key] = sourceProps +} diff --git a/internal/analytics/batched/data_test.go b/internal/analytics/batched/data_test.go new file mode 100644 index 000000000..485b2e1f5 --- /dev/null +++ b/internal/analytics/batched/data_test.go @@ -0,0 +1,159 @@ +package batched + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/kubeshop/botkube/pkg/config" +) + +func TestData(t *testing.T) { + // given + event := SourceEvent{ + IntegrationType: config.BotIntegrationType, + Platform: config.DiscordCommPlatformIntegration, + PluginName: "botkube/kubernetes", + AnonymizedEventFields: map[string]any{ + "foo": "bar", + }, + Success: true, + } + event2 := SourceEvent{ + IntegrationType: config.SinkIntegrationType, + Platform: config.ElasticsearchCommPlatformIntegration, + PluginName: "botkube/kubernetes", + AnonymizedEventFields: map[string]any{ + "els": true, + }, + Success: true, + } + event3 := SourceEvent{ + IntegrationType: config.BotIntegrationType, + Platform: config.CloudSlackCommPlatformIntegration, + PluginName: "botkube/argocd", + AnonymizedEventFields: nil, + Success: true, + } + + // when + data := NewData(1) + // then + assert.Equal(t, 1, data.heartbeatProperties.TimeWindowInHours) + + // when + data.AddSourceEvent(event) + data.AddSourceEvent(event2) + // then + expected := HeartbeatProperties{ + TimeWindowInHours: 1, + Sources: map[string]SourceProperties{ + "botkube/kubernetes": { + EventsCount: 2, + Events: []SourceEvent{ + event, + event2, + }, + }, + }, + EventsCount: 2, + } + assert.Equal(t, expected, data.heartbeatProperties) + + // when + data.IncrementTimeWindowInHours() + // then + assert.Equal(t, 2, data.heartbeatProperties.TimeWindowInHours) + + // when + data.AddSourceEvent(event3) + // then + expected = HeartbeatProperties{ + TimeWindowInHours: 2, + Sources: map[string]SourceProperties{ + "botkube/kubernetes": { + EventsCount: 2, + Events: []SourceEvent{ + event, + event2, + }, + }, + "botkube/argocd": { + EventsCount: 1, + Events: []SourceEvent{ + event3, + }, + }, + }, + EventsCount: 3, + } + assert.Equal(t, expected, data.heartbeatProperties) + + // when + data.Reset() + // then + assert.Equal(t, 1, data.heartbeatProperties.TimeWindowInHours) + assert.Equal(t, 0, data.heartbeatProperties.EventsCount) + assert.Len(t, data.heartbeatProperties.Sources, 0) +} + +func TestData_EventDetailsLimit(t *testing.T) { + // given + data := NewData(1) + addEvent1Count := 50 + addEvent2Count := 70 + addEvent3Count := 30 + + totalCount := addEvent1Count + addEvent2Count + addEvent3Count + expectedKubernetesEventCount := addEvent1Count + addEvent3Count + expectedKubernetesEventDetailsLen := addEvent1Count + expectedArgoCDEventCount := addEvent2Count + + kubernetesPlugin := "botkube/kubernetes" + argoCDPlugin := "botkube/argocd" + + // when + for i := 0; i < addEvent1Count; i++ { + data.AddSourceEvent(SourceEvent{ + IntegrationType: config.BotIntegrationType, + Platform: config.DiscordCommPlatformIntegration, + PluginName: kubernetesPlugin, + AnonymizedEventFields: map[string]any{ + "foo": "bar", + }, + Success: true, + }) + } + + for i := 0; i < addEvent2Count; i++ { + data.AddSourceEvent(SourceEvent{ + IntegrationType: config.BotIntegrationType, + Platform: config.CloudSlackCommPlatformIntegration, + PluginName: argoCDPlugin, + AnonymizedEventFields: nil, + Success: true, + }) + } + + for i := 0; i < addEvent3Count; i++ { + data.AddSourceEvent(SourceEvent{ + IntegrationType: config.SinkIntegrationType, + Platform: config.ElasticsearchCommPlatformIntegration, + PluginName: kubernetesPlugin, + AnonymizedEventFields: map[string]any{ + "foo": "bar", + }, + Success: true, + }) + } + + // then + assert.Equal(t, totalCount, data.heartbeatProperties.EventsCount) + assert.Len(t, data.heartbeatProperties.Sources, 2) + + assert.Equal(t, expectedKubernetesEventCount, data.heartbeatProperties.Sources[kubernetesPlugin].EventsCount) + assert.Len(t, data.heartbeatProperties.Sources[kubernetesPlugin].Events, expectedKubernetesEventDetailsLen) + + assert.Equal(t, expectedArgoCDEventCount, data.heartbeatProperties.Sources[argoCDPlugin].EventsCount) + assert.Len(t, data.heartbeatProperties.Sources[argoCDPlugin].Events, maxEventDetailsCount-addEvent1Count) +} diff --git a/internal/analytics/batched/heartbeat.go b/internal/analytics/batched/heartbeat.go new file mode 100644 index 000000000..4fe16beaf --- /dev/null +++ b/internal/analytics/batched/heartbeat.go @@ -0,0 +1,23 @@ +package batched + +import "github.com/kubeshop/botkube/pkg/config" + +type HeartbeatProperties struct { + TimeWindowInHours int `json:"timeWindowInHours"` + EventsCount int `json:"eventsCount"` + Sources map[string]SourceProperties `json:"sources"` +} + +type SourceProperties struct { + EventsCount int `json:"eventsCount"` + Events []SourceEvent `json:"events"` +} + +type SourceEvent struct { + IntegrationType config.IntegrationType `json:"integrationType"` + Platform config.CommPlatformIntegration `json:"platform"` + PluginName string `json:"pluginName"` + AnonymizedEventFields map[string]any `json:"anonymizedEventFields"` + Success bool `json:"success"` + Error *string `json:"error"` +} diff --git a/internal/analytics/export_test.go b/internal/analytics/export_test.go index 81c4110cf..238a35b5a 100644 --- a/internal/analytics/export_test.go +++ b/internal/analytics/export_test.go @@ -1,5 +1,11 @@ package analytics +import ( + "time" + + "github.com/kubeshop/botkube/internal/analytics/batched" +) + func (r *SegmentReporter) SetIdentity(identity *Identity) { r.identity = identity } @@ -7,3 +13,19 @@ func (r *SegmentReporter) SetIdentity(identity *Identity) { func (r *SegmentReporter) Identity() *Identity { return r.identity } + +func (r *SegmentReporter) SetBatchedData(batchedData BatchedDataStore) { + r.batchedData = batchedData +} + +func (r *SegmentReporter) SetTickDuration(tickDuration time.Duration) { + r.tickDuration = tickDuration +} + +func (r *SegmentReporter) ReportHeartbeatEvent() error { + return r.reportHeartbeatEvent() +} + +func (r *SegmentReporter) HeartbeatProperties() batched.HeartbeatProperties { + return r.batchedData.HeartbeatProperties() +} diff --git a/internal/analytics/noop_reporter.go b/internal/analytics/noop_reporter.go index a0ce0bd55..4b3b556c7 100644 --- a/internal/analytics/noop_reporter.go +++ b/internal/analytics/noop_reporter.go @@ -53,6 +53,11 @@ func (n NoopReporter) ReportFatalError(_ error) error { return nil } +// Run runs the reporter. +func (n NoopReporter) Run(_ context.Context) error { + return nil +} + // Close cleans up the reporter resources. func (n NoopReporter) Close() error { return nil diff --git a/internal/analytics/reporter.go b/internal/analytics/reporter.go index 6fc6533f6..9ab3bc012 100644 --- a/internal/analytics/reporter.go +++ b/internal/analytics/reporter.go @@ -32,6 +32,8 @@ type Reporter interface { // ReportFatalError reports a fatal app error. ReportFatalError(err error) error + Run(ctx context.Context) error + // Close cleans up the reporter resources. Close() error } diff --git a/internal/analytics/segment_reporter.go b/internal/analytics/segment_reporter.go index a91837615..2317999ee 100644 --- a/internal/analytics/segment_reporter.go +++ b/internal/analytics/segment_reporter.go @@ -2,14 +2,18 @@ package analytics import ( "context" + "encoding/json" "errors" "fmt" + "time" segment "github.com/segmentio/analytics-go" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "github.com/kubeshop/botkube/internal/analytics/batched" + "github.com/kubeshop/botkube/internal/ptr" "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/version" ) @@ -22,6 +26,8 @@ const ( // The labels were copied as it is problematic to add k8s.io/kubernetes dependency: https://github.com/kubernetes/kubernetes/issues/79384 controlPlaneNodeLabel = "node-role.kubernetes.io/control-plane" deprecatedControlPlaneNodeLabel = "node-role.kubernetes.io/master" + + defaultTimeWindowInHours = 1 ) var ( @@ -31,19 +37,31 @@ var ( var _ Reporter = &SegmentReporter{} +type BatchedDataStore interface { + AddSourceEvent(event batched.SourceEvent) + HeartbeatProperties() batched.HeartbeatProperties + IncrementTimeWindowInHours() + Reset() +} + // SegmentReporter is a default Reporter implementation that uses Twilio Segment. type SegmentReporter struct { log logrus.FieldLogger cli segment.Client identity *Identity + + batchedData BatchedDataStore + tickDuration time.Duration } // NewSegmentReporter creates a new SegmentReporter instance. func NewSegmentReporter(log logrus.FieldLogger, cli segment.Client) *SegmentReporter { return &SegmentReporter{ - log: log, - cli: cli, + log: log, + cli: cli, + batchedData: batched.NewData(defaultTimeWindowInHours), + tickDuration: defaultTimeWindowInHours * time.Hour, } } @@ -101,13 +119,15 @@ func (r *SegmentReporter) ReportSinkEnabled(platform config.CommPlatformIntegrat // ReportHandledEventSuccess reports a successfully handled event using a given communication platform. // The RegisterCurrentIdentity needs to be called first. func (r *SegmentReporter) ReportHandledEventSuccess(event ReportEventInput) error { - return r.reportEvent("Event handled", map[string]interface{}{ - "platform": event.Platform, - "type": event.IntegrationType, - "plugin": event.PluginName, - "event": event.AnonymizedEventFields, - "success": true, + r.batchedData.AddSourceEvent(batched.SourceEvent{ + IntegrationType: event.IntegrationType, + Platform: event.Platform, + PluginName: event.PluginName, + AnonymizedEventFields: event.AnonymizedEventFields, + Success: true, }) + + return nil } // ReportHandledEventError reports a failure while handling event using a given communication platform. @@ -117,13 +137,16 @@ func (r *SegmentReporter) ReportHandledEventError(event ReportEventInput, err er return nil } - return r.reportEvent("Event handled", map[string]interface{}{ - "platform": event.Platform, - "type": event.IntegrationType, - "plugin": event.PluginName, - "event": event.AnonymizedEventFields, - "error": err.Error(), + r.batchedData.AddSourceEvent(batched.SourceEvent{ + IntegrationType: event.IntegrationType, + Platform: event.Platform, + PluginName: event.PluginName, + AnonymizedEventFields: event.AnonymizedEventFields, + Success: false, + Error: ptr.FromType(err.Error()), }) + + return nil } // ReportFatalError reports a fatal app error. @@ -158,12 +181,60 @@ func (r *SegmentReporter) ReportFatalError(err error) error { return nil } +// Run runs the reporter. +func (r *SegmentReporter) Run(ctx context.Context) error { + r.log.Debug("Running heartbeat reporting...") + + ticker := time.NewTicker(r.tickDuration) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + err := r.reportHeartbeatEvent() + if err != nil { + return fmt.Errorf("while reporting heartbeat event: %w", err) + } + + return nil + case <-ticker.C: + err := r.reportHeartbeatEvent() + if err != nil { + r.log.WithError(err).Error("Failed to report heartbeat event") + r.batchedData.IncrementTimeWindowInHours() + continue + } + + r.batchedData.Reset() + } + } +} + // Close cleans up the reporter resources. func (r *SegmentReporter) Close() error { r.log.Info("Closing...") return r.cli.Close() } +func (r *SegmentReporter) reportHeartbeatEvent() error { + r.log.Debug("Reporting heartbeat event...") + heartbeatProps := r.batchedData.HeartbeatProperties() + + // we can't use mapstructure because of this missing feature: https://github.com/mitchellh/mapstructure/issues/249 + bytes, err := json.Marshal(heartbeatProps) + if err != nil { + return fmt.Errorf("while marshalling heartbeat properties: %w", err) + } + + var props map[string]interface{} + err = json.Unmarshal(bytes, &props) + if err != nil { + return fmt.Errorf("while unmarshalling heartbeat properties: %w", err) + } + + return r.reportEvent("Heartbeat", props) +} + func (r *SegmentReporter) reportEvent(event string, properties map[string]interface{}) error { if r.identity == nil { return errors.New("identity needs to be registered first") diff --git a/internal/analytics/segment_reporter_test.go b/internal/analytics/segment_reporter_test.go index 60cb28fa2..139fc86a9 100644 --- a/internal/analytics/segment_reporter_test.go +++ b/internal/analytics/segment_reporter_test.go @@ -4,7 +4,7 @@ // // To update golden files, run: // -// go test ./internal/analytics/... -test.update-golden +// go test ./internal/analytics -test.update-golden package analytics_test import ( @@ -12,7 +12,9 @@ import ( "encoding/json" "errors" "fmt" + "sync" "testing" + "time" segment "github.com/segmentio/analytics-go" "github.com/stretchr/testify/assert" @@ -166,10 +168,18 @@ func TestSegmentReporter_ReportSinkEnabled(t *testing.T) { compareMessagesAgainstGoldenFile(t, segmentCli.messages) } -func TestSegmentReporter_ReportHandledEventSuccess(t *testing.T) { +// ReportHandledEventSuccess and ReportHandledEventError are tested together as a part of TestSegmentReporter_Run. + +func TestSegmentReporter_Run(t *testing.T) { // given + tick := 50 * time.Millisecond + timeout := 5 * time.Second + sampleErr := errors.New("sample error") + identity := fixIdentity() segmentReporter, segmentCli := fakeSegmentReporterWithIdentity(identity) + segmentReporter.SetTickDuration(tick) + eventDetails := map[string]interface{}{ "type": "create", "apiVersion": "apps/v1", @@ -177,6 +187,17 @@ func TestSegmentReporter_ReportHandledEventSuccess(t *testing.T) { } // when + ctx, cancelFn := context.WithTimeout(context.Background(), timeout) + defer cancelFn() + + wg := sync.WaitGroup{} + var runErr error + wg.Add(1) + go func(ctx context.Context) { + defer wg.Done() + runErr = segmentReporter.Run(ctx) + }(ctx) + err := segmentReporter.ReportHandledEventSuccess(analytics.ReportEventInput{ IntegrationType: config.BotIntegrationType, Platform: config.SlackCommPlatformIntegration, @@ -185,46 +206,35 @@ func TestSegmentReporter_ReportHandledEventSuccess(t *testing.T) { }) require.NoError(t, err) - err = segmentReporter.ReportHandledEventSuccess(analytics.ReportEventInput{ + err = segmentReporter.ReportHandledEventError(analytics.ReportEventInput{ IntegrationType: config.SinkIntegrationType, Platform: config.ElasticsearchCommPlatformIntegration, PluginName: "botkube/kubernetes", AnonymizedEventFields: eventDetails, - }) + }, sampleErr) require.NoError(t, err) - // then - compareMessagesAgainstGoldenFile(t, segmentCli.messages) -} + time.Sleep(tick + 5*time.Millisecond) -func TestSegmentReporter_ReportHandledEventError(t *testing.T) { - // given - identity := fixIdentity() - segmentReporter, segmentCli := fakeSegmentReporterWithIdentity(identity) - eventDetails := map[string]interface{}{ - "type": "create", - "apiVersion": "apps/v1", - "kind": "Deployment", - } - sampleErr := errors.New("sample error") - - // when - err := segmentReporter.ReportHandledEventError(analytics.ReportEventInput{ + err = segmentReporter.ReportHandledEventSuccess(analytics.ReportEventInput{ IntegrationType: config.BotIntegrationType, - Platform: config.SlackCommPlatformIntegration, - PluginName: "botkube/kubernetes", + Platform: config.TeamsCommPlatformIntegration, + PluginName: "botkube/argocd", AnonymizedEventFields: eventDetails, - }, sampleErr) + }) require.NoError(t, err) - - err = segmentReporter.ReportHandledEventError(analytics.ReportEventInput{ - IntegrationType: config.SinkIntegrationType, - Platform: config.ElasticsearchCommPlatformIntegration, + err = segmentReporter.ReportHandledEventSuccess(analytics.ReportEventInput{ + IntegrationType: config.BotIntegrationType, + Platform: config.SlackCommPlatformIntegration, PluginName: "botkube/kubernetes", AnonymizedEventFields: eventDetails, - }, sampleErr) + }) require.NoError(t, err) + cancelFn() + wg.Wait() + require.NoError(t, runErr) + // then compareMessagesAgainstGoldenFile(t, segmentCli.messages) } diff --git a/internal/analytics/testdata/TestSegmentReporter_ReportHandledEventError.json b/internal/analytics/testdata/TestSegmentReporter_ReportHandledEventError.json deleted file mode 100644 index d966e9b77..000000000 --- a/internal/analytics/testdata/TestSegmentReporter_ReportHandledEventError.json +++ /dev/null @@ -1,38 +0,0 @@ -[ - { - "type": "track", - "messageId": "0", - "anonymousId": "cluster-id", - "event": "Event handled", - "timestamp": "2009-11-17T20:34:58.651387237Z", - "properties": { - "error": "sample error", - "event": { - "apiVersion": "apps/v1", - "kind": "Deployment", - "type": "create" - }, - "platform": "slack", - "plugin": "botkube/kubernetes", - "type": "bot" - } - }, - { - "type": "track", - "messageId": "1", - "anonymousId": "cluster-id", - "event": "Event handled", - "timestamp": "2009-11-17T20:34:58.651387237Z", - "properties": { - "error": "sample error", - "event": { - "apiVersion": "apps/v1", - "kind": "Deployment", - "type": "create" - }, - "platform": "elasticsearch", - "plugin": "botkube/kubernetes", - "type": "sink" - } - } -] \ No newline at end of file diff --git a/internal/analytics/testdata/TestSegmentReporter_ReportHandledEventSuccess.json b/internal/analytics/testdata/TestSegmentReporter_ReportHandledEventSuccess.json deleted file mode 100644 index d18ba3972..000000000 --- a/internal/analytics/testdata/TestSegmentReporter_ReportHandledEventSuccess.json +++ /dev/null @@ -1,38 +0,0 @@ -[ - { - "type": "track", - "messageId": "0", - "anonymousId": "cluster-id", - "event": "Event handled", - "timestamp": "2009-11-17T20:34:58.651387237Z", - "properties": { - "event": { - "apiVersion": "apps/v1", - "kind": "Deployment", - "type": "create" - }, - "platform": "slack", - "plugin": "botkube/kubernetes", - "success": true, - "type": "bot" - } - }, - { - "type": "track", - "messageId": "1", - "anonymousId": "cluster-id", - "event": "Event handled", - "timestamp": "2009-11-17T20:34:58.651387237Z", - "properties": { - "event": { - "apiVersion": "apps/v1", - "kind": "Deployment", - "type": "create" - }, - "platform": "elasticsearch", - "plugin": "botkube/kubernetes", - "success": true, - "type": "sink" - } - } -] \ No newline at end of file diff --git a/internal/analytics/testdata/TestSegmentReporter_ReportHeartbeatEvent.json b/internal/analytics/testdata/TestSegmentReporter_ReportHeartbeatEvent.json new file mode 100644 index 000000000..2c66e8444 --- /dev/null +++ b/internal/analytics/testdata/TestSegmentReporter_ReportHeartbeatEvent.json @@ -0,0 +1,54 @@ +[ + { + "type": "track", + "messageId": "0", + "anonymousId": "cluster-id", + "event": "Heartbeat", + "timestamp": "2009-11-17T20:34:58.651387237Z", + "properties": { + "eventsCount": 3, + "sources": { + "botkube/argocd": { + "events": [ + { + "anonymizedEventFields": { + "baz": 1, + "foo": "bar" + }, + "error": null, + "integrationType": "sink", + "platform": "elasticsearch", + "pluginName": "botkube/argocd", + "success": true + } + ], + "eventsCount": 1 + }, + "botkube/kubernetes": { + "events": [ + { + "anonymizedEventFields": null, + "error": "sample error", + "integrationType": "bot", + "platform": "cloudSlack", + "pluginName": "botkube/kubernetes", + "success": false + }, + { + "anonymizedEventFields": { + "foo": "bar" + }, + "error": null, + "integrationType": "bot", + "platform": "discord", + "pluginName": "botkube/kubernetes", + "success": true + } + ], + "eventsCount": 2 + } + }, + "timeWindowInHours": 1 + } + } +] \ No newline at end of file diff --git a/internal/analytics/testdata/TestSegmentReporter_Run.json b/internal/analytics/testdata/TestSegmentReporter_Run.json new file mode 100644 index 000000000..f9ff72819 --- /dev/null +++ b/internal/analytics/testdata/TestSegmentReporter_Run.json @@ -0,0 +1,91 @@ +[ + { + "type": "track", + "messageId": "0", + "anonymousId": "cluster-id", + "event": "Heartbeat", + "timestamp": "2009-11-17T20:34:58.651387237Z", + "properties": { + "eventsCount": 2, + "sources": { + "botkube/kubernetes": { + "events": [ + { + "anonymizedEventFields": { + "apiVersion": "apps/v1", + "kind": "Deployment", + "type": "create" + }, + "error": null, + "integrationType": "bot", + "platform": "slack", + "pluginName": "botkube/kubernetes", + "success": true + }, + { + "anonymizedEventFields": { + "apiVersion": "apps/v1", + "kind": "Deployment", + "type": "create" + }, + "error": "sample error", + "integrationType": "sink", + "platform": "elasticsearch", + "pluginName": "botkube/kubernetes", + "success": false + } + ], + "eventsCount": 2 + } + }, + "timeWindowInHours": 1 + } + }, + { + "type": "track", + "messageId": "1", + "anonymousId": "cluster-id", + "event": "Heartbeat", + "timestamp": "2009-11-17T20:34:58.651387237Z", + "properties": { + "eventsCount": 2, + "sources": { + "botkube/argocd": { + "events": [ + { + "anonymizedEventFields": { + "apiVersion": "apps/v1", + "kind": "Deployment", + "type": "create" + }, + "error": null, + "integrationType": "bot", + "platform": "teams", + "pluginName": "botkube/argocd", + "success": true + } + ], + "eventsCount": 1 + }, + "botkube/kubernetes": { + "events": [ + { + "anonymizedEventFields": { + "apiVersion": "apps/v1", + "kind": "Deployment", + "type": "create" + }, + "error": null, + "integrationType": "bot", + "platform": "slack", + "pluginName": "botkube/kubernetes", + "success": true + } + ], + "eventsCount": 1 + } + }, + "timeWindowInHours": 1 + } + } +] \ No newline at end of file