diff --git a/engine/api/api.go b/engine/api/api.go index 0fbed2f846..1669f3dd57 100644 --- a/engine/api/api.go +++ b/engine/api/api.go @@ -215,6 +215,9 @@ type Configuration struct { DefaultRetentionPolicy string `toml:"defaultRetentionPolicy" comment:"Default rule for workflow run retention policy, this rule can be overridden on each workflow.\n Example: 'return run_days_before < 365' keeps runs for one year." json:"defaultRetentionPolicy" default:"return run_days_before < 365"` DisablePurgeDeletion bool `toml:"disablePurgeDeletion" comment:"Allow you to disable the deletion part of the purge. Workflow run will only be marked as delete" json:"disablePurgeDeletion" default:"false"` } `toml:"workflow" comment:"######################\n 'Workflow' global configuration \n######################" json:"workflow"` + EventBus struct { + GlobalKafka event.KafkaConfig `toml:"globalKafka" default:"false" json:"globalKafka"` + } `toml:"events" comment:"######################\n Event bus configuration \n######################" json:"events"` } // DefaultValues is the struc for API Default configuration default values @@ -688,7 +691,7 @@ func (a *API) Serve(ctx context.Context) error { } log.Info(ctx, "Initializing event broker...") - if err := event.Initialize(ctx, a.mustDB(), a.Cache); err != nil { + if err := event.Initialize(ctx, a.mustDB(), a.Cache, a.Config.EventBus.GlobalKafka); err != nil { log.Error(ctx, "error while initializing event system: %s", err) } diff --git a/engine/api/event/event.go b/engine/api/event/event.go index e393dbc97c..e956b18817 100644 --- a/engine/api/event/event.go +++ b/engine/api/event/event.go @@ -2,6 +2,7 @@ package event import ( "context" + "errors" "fmt" "os" "strconv" @@ -18,10 +19,13 @@ import ( ) // cache with go cache -var brokersConnectionCache = gocache.New(10*time.Minute, 6*time.Hour) -var hostname, cdsname string -var brokers []Broker -var subscribers []chan<- sdk.Event +var ( + brokersConnectionCache = gocache.New(10*time.Minute, 6*time.Hour) + hostname, cdsname string + brokers []Broker + globalBroker Broker + subscribers []chan<- sdk.Event +) func init() { subscribers = make([]chan<- sdk.Event, 0) @@ -96,7 +100,11 @@ func ResetEventIntegration(ctx context.Context, db gorp.SqlExecutor, eventIntegr } // Initialize initializes event system -func Initialize(ctx context.Context, db *gorp.DbMap, cache Store) error { +func Initialize(ctx context.Context, db *gorp.DbMap, cache Store, glolbalKafkaConfigs ...KafkaConfig) error { + if len(glolbalKafkaConfigs) > 1 { + return errors.New("only one global kafka global config is supported") + } + store = cache var err error hostname, err = os.Hostname() @@ -113,6 +121,14 @@ func Initialize(ctx context.Context, db *gorp.DbMap, cache Store) error { } } + if len(glolbalKafkaConfigs) == 1 && glolbalKafkaConfigs[0].BrokerAddresses != "" { + globalBroker, err = getBroker(ctx, "kafka", glolbalKafkaConfigs[0]) + if err != nil { + ctx = log.ContextWithStackTrace(ctx, err) + log.Error(ctx, "unable to init builtin kafka broker from config: %v", err) + } + } + return nil } @@ -138,6 +154,13 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) { s <- e } + if globalBroker != nil { + log.Info(ctx, "sending event %q to global broker", e.EventType) + if err := globalBroker.sendEvent(&e); err != nil { + log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err) + } + } + for _, eventIntegrationID := range e.EventIntegrationsID { brokerConnectionKey := strconv.FormatInt(eventIntegrationID, 10) brokerConnection, ok := brokersConnectionCache.Get(brokerConnectionKey) diff --git a/engine/api/event/kafka.go b/engine/api/event/kafka.go index 44e03814f5..db57cb4ffb 100644 --- a/engine/api/event/kafka.go +++ b/engine/api/event/kafka.go @@ -20,16 +20,16 @@ type KafkaClient struct { // KafkaConfig handles all config to connect to Kafka type KafkaConfig struct { - Enabled bool - BrokerAddresses string - User string - Password string - Version string - Topic string - MaxMessageByte int - DisableTLS bool - DisableSASL bool - ClientID string + Enabled bool `toml:"enabled" json:"-" commented:"true" default:"true"` + BrokerAddresses string `toml:"broker" json:"-" commented:"true"` + User string `toml:"user" json:"-" commented:"true"` + Password string `toml:"password" json:"-" commented:"true"` + Version string `toml:"version" json:"-" commented:"true"` + Topic string `toml:"topic" json:"-" commented:"true"` + MaxMessageByte int `toml:"maxMessageByte" json:"-" commented:"true"` + DisableTLS bool `toml:"disableTLS" json:"-" commented:"true"` + DisableSASL bool `toml:"disableSASL" json:"-" commented:"true"` + ClientID string `toml:"clientID" json:"-" commented:"true"` } // initialize returns broker, isInit and err if @@ -43,6 +43,18 @@ func (c *KafkaClient) initialize(ctx context.Context, options interface{}) (Brok conf.Topic == "" { return nil, fmt.Errorf("initKafka> Invalid Kafka Configuration") } + + if conf.MaxMessageByte == 0 { + conf.MaxMessageByte = 10000000 + } + + if conf.ClientID == "" { + conf.ClientID = conf.User + } + if conf.ClientID == "" { + conf.ClientID = "cds" + } + c.options = conf if err := c.initProducer(); err != nil { diff --git a/tests/05_sc_workflow_event_kafka.yml b/tests/05_sc_workflow_event_kafka.yml index 7c5b307da6..be2233ac27 100644 --- a/tests/05_sc_workflow_event_kafka.yml +++ b/tests/05_sc_workflow_event_kafka.yml @@ -13,7 +13,6 @@ testcases: - name: import integrations steps: - - script: {{.cdsctl}} -f {{.cdsctl.config}} admin integration-model import ./fixtures/integrations/kafka-public.yml - script: {{.cdsctl}} -f {{.cdsctl.config}} project integration import ITSCWRKFLW18 ./fixtures/integrations/kafka.yml - script: {{.cdsctl}} -f {{.cdsctl.config}} project integration import ITSCWRKFLW18 ./fixtures/integrations/kafka-hook.yml diff --git a/tests/fixtures/integrations/kafka-public.yml b/tests/fixtures/integrations/kafka-public.yml deleted file mode 100644 index 6cc403cbd9..0000000000 --- a/tests/fixtures/integrations/kafka-public.yml +++ /dev/null @@ -1,23 +0,0 @@ -name: myPublicKafkaIntegration -event: true -public: true -public_configurations: - myPublicKafkaIntegration: - "broker url": - type: string - value: "localhost:9092" - "topic": - type: string - value: "test.eventspublic" - "username": - type: string - value: "" - "password": - type: password - value: "" - "disableTLS": - type: string - value: "true" - "disableSASL": - type: string - value: "true"