Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce amount of anonymous analytics events #1310

Merged
merged 3 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 31 additions & 23 deletions cmd/botkube-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,21 @@ func run(ctx context.Context) (err error) {
logger.Warnf("Configuration validation warnings: %v", confDetails.ValidateWarnings.Error())
}
// Set up analytics reporter
reporter, err := getAnalyticsReporter(conf.Analytics.Disable, logger)
analyticsReporter, err := getAnalyticsReporter(conf.Analytics.Disable, logger)
if err != nil {
return fmt.Errorf("while creating analytics reporter: %w", err)
}
defer func() {
err := reporter.Close()
err := analyticsReporter.Close()
if err != nil {
logger.Errorf("while closing reporter: %s", err.Error())
}
}()
// from now on recover from any panic, report it and close reader and app.
// The reader must be not closed to report the panic properly.
defer analytics.ReportPanicIfOccurs(logger, reporter)
defer analytics.ReportPanicIfOccurs(logger, analyticsReporter)

reportFatalError := reportFatalErrFn(logger, reporter, statusReporter)
reportFatalError := reportFatalErrFn(logger, analyticsReporter, statusReporter)
// Prepare K8s clients and mapper
kubeConfig, err := kubex.BuildConfigFromFlags("", conf.Settings.Kubeconfig, conf.Settings.SACredentialsPathPrefix)
if err != nil {
Expand All @@ -148,7 +148,7 @@ func run(ctx context.Context) (err error) {
if err = statusReporter.ReportDeploymentConnectionInit(ctx, k8sVer); err != nil {
return reportFatalError("while reporting botkube connection initialization", err)
}
err = reporter.RegisterCurrentIdentity(ctx, k8sCli, remoteCfg.Identifier)
err = analyticsReporter.RegisterCurrentIdentity(ctx, k8sCli, remoteCfg.Identifier)
if err != nil {
return reportFatalError("while registering current identity", err)
}
Expand Down Expand Up @@ -183,6 +183,14 @@ func run(ctx context.Context) (err error) {
err = reportFatalError("while waiting for goroutines to finish gracefully", multiErr.ErrorOrNil())
}()

errGroup.Go(func() error {
err := analyticsReporter.Run(ctx)
if err != nil {
logger.Errorf("while closing reporter: %s", err.Error())
}
return err
})

schedulerChan := make(chan string)
pluginHealthStats := plugin.NewHealthStats(conf.Plugins.RestartPolicy.Threshold)
collector := plugin.NewCollector(logger)
Expand All @@ -193,7 +201,7 @@ func run(ctx context.Context) (err error) {
healthChecker := health.NewChecker(ctx, conf, pluginHealthStats)
healthSrv := healthChecker.NewServer(logger.WithField(componentLogFieldKey, "Health server"), conf.Settings.HealthPort)
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(logger, reporter)
defer analytics.ReportPanicIfOccurs(logger, analyticsReporter)
return healthSrv.Serve(ctx)
})

Expand All @@ -206,7 +214,7 @@ func run(ctx context.Context) (err error) {
// Prometheus metrics
metricsSrv := newMetricsServer(logger.WithField(componentLogFieldKey, "Metrics server"), conf.Settings.MetricsPort)
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(logger, reporter)
defer analytics.ReportPanicIfOccurs(logger, analyticsReporter)
return metricsSrv.Serve(ctx)
})

Expand All @@ -218,7 +226,7 @@ func run(ctx context.Context) (err error) {
Log: logger.WithField(componentLogFieldKey, "Executor"),
Cfg: *conf,
CfgManager: cfgManager,
AnalyticsReporter: reporter,
AnalyticsReporter: analyticsReporter,
CommandGuard: cmdGuard,
PluginManager: pluginManager,
BotKubeVersion: botkubeVersion,
Expand Down Expand Up @@ -253,62 +261,62 @@ func run(ctx context.Context) (err error) {
scheduleBotNotifier := func(in bot.Bot) {
bots[fmt.Sprintf("%s-%s", commGroupName, in.IntegrationName())] = in
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(commGroupLogger, reporter)
defer analytics.ReportPanicIfOccurs(commGroupLogger, analyticsReporter)
return in.Start(ctx)
})
}

// Run bots
if commGroupCfg.Slack.Enabled {
sb, err := bot.NewSlack(commGroupLogger.WithField(botLogFieldKey, "Slack"), commGroupMeta, commGroupCfg.Slack, executorFactory, reporter)
sb, err := bot.NewSlack(commGroupLogger.WithField(botLogFieldKey, "Slack"), commGroupMeta, commGroupCfg.Slack, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating Slack bot", err)
}
scheduleBotNotifier(sb)
}

if commGroupCfg.SocketSlack.Enabled {
sb, err := bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, reporter)
sb, err := bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating SocketSlack bot", err)
}
scheduleBotNotifier(sb)
}

if commGroupCfg.CloudSlack.Enabled {
sb, err := bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, reporter)
sb, err := bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating CloudSlack bot", err)
}
scheduleBotNotifier(sb)
}

if commGroupCfg.Mattermost.Enabled {
mb, err := bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, reporter)
mb, err := bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating Mattermost bot", err)
}
scheduleBotNotifier(mb)
}

if commGroupCfg.Teams.Enabled {
tb, err := bot.NewTeams(commGroupLogger.WithField(botLogFieldKey, "MS Teams"), commGroupMeta, commGroupCfg.Teams, conf.Settings.ClusterName, executorFactory, reporter)
tb, err := bot.NewTeams(commGroupLogger.WithField(botLogFieldKey, "MS Teams"), commGroupMeta, commGroupCfg.Teams, conf.Settings.ClusterName, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating Teams bot", err)
}
scheduleBotNotifier(tb)
}

if commGroupCfg.CloudTeams.Enabled {
ctb, err := bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, reporter)
ctb, err := bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating CloudSlack bot", err)
}
scheduleBotNotifier(ctb)
}

if commGroupCfg.Discord.Enabled {
db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, reporter)
db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating Discord bot", err)
}
Expand All @@ -317,15 +325,15 @@ func run(ctx context.Context) (err error) {

// Run sinks
if commGroupCfg.Elasticsearch.Enabled {
es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, reporter)
es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, analyticsReporter)
if err != nil {
return reportFatalError("while creating Elasticsearch sink", err)
}
sinkNotifiers = append(sinkNotifiers, es)
}

if commGroupCfg.Webhook.Enabled {
wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, reporter)
wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, analyticsReporter)
if err != nil {
return reportFatalError("while creating Webhook sink", err)
}
Expand All @@ -352,7 +360,7 @@ func run(ctx context.Context) (err error) {
deployClient,
dynamicCli,
restarter,
reporter,
analyticsReporter,
*conf,
cfgVersion,
cfgManager,
Expand All @@ -361,7 +369,7 @@ func run(ctx context.Context) (err error) {
return reportFatalError("while creating config reloader", err)
}
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(logger, reporter)
defer analytics.ReportPanicIfOccurs(logger, analyticsReporter)
return cfgReloader.Do(ctx)
})
}
Expand All @@ -384,14 +392,14 @@ func run(ctx context.Context) (err error) {
ghCli.Repositories,
)
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(logger, reporter)
defer analytics.ReportPanicIfOccurs(logger, analyticsReporter)
return upgradeChecker.Run(ctx)
})
}

actionProvider := action.NewProvider(logger.WithField(componentLogFieldKey, "Action Provider"), conf.Actions, executorFactory)

sourcePluginDispatcher := source.NewDispatcher(logger, conf.Settings.ClusterName, bots, sinkNotifiers, pluginManager, actionProvider, reporter, auditReporter, kubeConfig)
sourcePluginDispatcher := source.NewDispatcher(logger, conf.Settings.ClusterName, bots, sinkNotifiers, pluginManager, actionProvider, analyticsReporter, auditReporter, kubeConfig)
scheduler := source.NewScheduler(ctx, logger, conf, sourcePluginDispatcher, schedulerChan)
err = scheduler.Start(ctx)
if err != nil {
Expand All @@ -407,7 +415,7 @@ func run(ctx context.Context) (err error) {
)

errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(logger, reporter)
defer analytics.ReportPanicIfOccurs(logger, analyticsReporter)
return incomingWebhookSrv.Serve(ctx)
})
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
github.com/mattermost/mattermost/server/public v0.0.6
github.com/mattn/go-isatty v0.0.19
github.com/mattn/go-shellwords v1.0.12
github.com/mitchellh/mapstructure v1.5.0
github.com/morikuni/aec v1.0.0
github.com/muesli/reflow v0.3.0
github.com/olekukonko/tablewriter v0.0.5
Expand Down Expand Up @@ -202,7 +203,6 @@ require (
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/go-testing-interface v1.14.1 // indirect
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/moby/locker v1.0.1 // indirect
github.com/moby/spdystream v0.2.0 // indirect
Expand Down
57 changes: 57 additions & 0 deletions internal/analytics/batched/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package batched

import "sync"

// Data is a struct that holds data for batched reporting
type Data struct {
mutex sync.RWMutex

defaultTimeWindowInHours int
heartbeatProperties HeartbeatProperties
}

func NewData(defaultTimeWindowInHours int) *Data {
return &Data{
defaultTimeWindowInHours: defaultTimeWindowInHours,
heartbeatProperties: HeartbeatProperties{
TimeWindowInHours: defaultTimeWindowInHours,
EventsCount: 0,
Sources: make(map[string]SourceProperties),
}}
}

func (d *Data) IncrementTimeWindowInHours() {
d.mutex.Lock()
defer d.mutex.Unlock()

d.heartbeatProperties.TimeWindowInHours++
}

func (d *Data) Reset() {
d.mutex.Lock()
defer d.mutex.Unlock()

d.heartbeatProperties.TimeWindowInHours = d.defaultTimeWindowInHours
d.heartbeatProperties.Sources = make(map[string]SourceProperties)
d.heartbeatProperties.EventsCount = 0
}

func (d *Data) HeartbeatProperties() HeartbeatProperties {
d.mutex.RLock()
defer d.mutex.RUnlock()

return d.heartbeatProperties
}

func (d *Data) AddSourceEvent(in SourceEvent) {
d.mutex.Lock()
defer d.mutex.Unlock()

key := in.PluginName
sourceProps := d.heartbeatProperties.Sources[key]
sourceProps.Events = append(sourceProps.Events, in)
sourceProps.EventsCount = len(sourceProps.Events)

d.heartbeatProperties.Sources[key] = sourceProps
d.heartbeatProperties.EventsCount++
}
98 changes: 98 additions & 0 deletions internal/analytics/batched/data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package batched

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/kubeshop/botkube/pkg/config"
)

func TestData(t *testing.T) {
// given
event := SourceEvent{
IntegrationType: config.BotIntegrationType,
Platform: config.DiscordCommPlatformIntegration,
PluginName: "botkube/kubernetes",
AnonymizedEventFields: map[string]any{
"foo": "bar",
},
Success: true,
}
event2 := SourceEvent{
IntegrationType: config.SinkIntegrationType,
Platform: config.ElasticsearchCommPlatformIntegration,
PluginName: "botkube/kubernetes",
AnonymizedEventFields: map[string]any{
"els": true,
},
Success: true,
}
event3 := SourceEvent{
IntegrationType: config.BotIntegrationType,
Platform: config.CloudSlackCommPlatformIntegration,
PluginName: "botkube/argocd",
AnonymizedEventFields: nil,
Success: true,
}

// when
data := NewData(1)
// then
assert.Equal(t, 1, data.heartbeatProperties.TimeWindowInHours)

// when
data.AddSourceEvent(event)
data.AddSourceEvent(event2)
// then
expected := HeartbeatProperties{
TimeWindowInHours: 1,
Sources: map[string]SourceProperties{
"botkube/kubernetes": {
EventsCount: 2,
Events: []SourceEvent{
event,
event2,
},
},
},
EventsCount: 2,
}
assert.Equal(t, expected, data.heartbeatProperties)

// when
data.IncrementTimeWindowInHours()
// then
assert.Equal(t, 2, data.heartbeatProperties.TimeWindowInHours)

// when
data.AddSourceEvent(event3)
// then
expected = HeartbeatProperties{
TimeWindowInHours: 2,
Sources: map[string]SourceProperties{
"botkube/kubernetes": {
EventsCount: 2,
Events: []SourceEvent{
event,
event2,
},
},
"botkube/argocd": {
EventsCount: 1,
Events: []SourceEvent{
event3,
},
},
},
EventsCount: 3,
}
assert.Equal(t, expected, data.heartbeatProperties)

// when
data.Reset()
// then
assert.Equal(t, 1, data.heartbeatProperties.TimeWindowInHours)
assert.Equal(t, 0, data.heartbeatProperties.EventsCount)
assert.Len(t, data.heartbeatProperties.Sources, 0)
}
Loading
Loading