From 008ec54d9e8582031f475d936ea1f8374f13b73a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Samin?= Date: Mon, 24 Jan 2022 15:31:35 +0100 Subject: [PATCH] feat(api): don't send event to public event integrations (#6066) Signed-off-by: francois samin --- .../docs/integrations/kafka/kafka_events.md | 37 +---------- engine/api/api.go | 5 +- engine/api/event/elasticsearch.go | 1 + engine/api/event/event.go | 63 +++++++++---------- engine/api/event/kafka.go | 32 +++++++--- engine/api/integration.go | 18 ------ engine/api/integration/dao_model.go | 40 ------------ engine/api/integration/dao_model_test.go | 4 -- tests/05_sc_workflow_event_kafka.yml | 11 ++-- tests/fixtures/integrations/kafka-public.yml | 23 ------- 10 files changed, 63 insertions(+), 171 deletions(-) delete mode 100644 tests/fixtures/integrations/kafka-public.yml diff --git a/docs/content/docs/integrations/kafka/kafka_events.md b/docs/content/docs/integrations/kafka/kafka_events.md index 1c1a7ed4ca..d4a4f32ed3 100644 --- a/docs/content/docs/integrations/kafka/kafka_events.md +++ b/docs/content/docs/integrations/kafka/kafka_events.md @@ -56,42 +56,7 @@ Import the integration on your CDS Project with: cdsctl project integration import PROJECT_KEY project-configuration.yml ``` -Then, as a standard user, you can add a [Kafka Hook]({{}}) on your workflow. - -### Create a Public Kafka Integration for whole CDS Projects - -You can also add a Kafka Integration with cdsctl. As a CDS Administrator, -this allows you to propose a Public Kafka Integration, available on all CDS Projects. - -Create a file `public-configuration.yml`: - -```yml -name: your-kafka-integration -event: true -public: true -public_configurations: - name-of-integration: - "broker url": - type: string - value: "n1.o1.your-broker:9093,n2.o1.n1.o1.your-broker:9093,n3.o1.n1.o1.your-broker:9093" - "topic": - type: string - value: "your-topic.events" - "username": - type: string - value: "your-topic.cds-reader" - "password": - type: password - value: xxxxxxxx -``` - -Import the integration with : - -```bash -cdsctl admin integration-model import public-configuration.yml -``` - -Then, as a standard user, you can add a [Kafka Hook]({{}}) on your workflow. +Then, as a standard user, you can use your kafka integration for workflow notifications. ### One Integration, two use case diff --git a/engine/api/api.go b/engine/api/api.go index 15baa88603..957aa8d13f 100644 --- a/engine/api/api.go +++ b/engine/api/api.go @@ -210,6 +210,9 @@ type Configuration struct { DefaultRetentionPolicy string `toml:"defaultRetentionPolicy" comment:"Default rule for workflow run retention policy, this rule can be overridden on each workflow.\n Example: 'return run_days_before < 365' keeps runs for one year." json:"defaultRetentionPolicy" default:"return run_days_before < 365"` DisablePurgeDeletion bool `toml:"disablePurgeDeletion" comment:"Allow you to disable the deletion part of the purge. Workflow run will only be marked as delete" json:"disablePurgeDeletion" default:"false"` } `toml:"workflow" comment:"######################\n 'Workflow' global configuration \n######################" json:"workflow"` + EventBus struct { + GlobalKafka event.KafkaConfig `toml:"globalKafka" default:"false" json:"globalKafka" mapstructure:"globalKafka"` + } `toml:"events" comment:"######################\n Event bus configuration \n######################" json:"events" mapstructure:"events"` } // DefaultValues is the struc for API Default configuration default values @@ -683,7 +686,7 @@ func (a *API) Serve(ctx context.Context) error { } log.Info(ctx, "Initializing event broker...") - if err := event.Initialize(ctx, a.mustDB(), a.Cache); err != nil { + if err := event.Initialize(ctx, a.mustDB(), a.Cache, a.Config.EventBus.GlobalKafka); err != nil { log.Error(ctx, "error while initializing event system: %s", err) } diff --git a/engine/api/event/elasticsearch.go b/engine/api/event/elasticsearch.go index b9e71b112c..adae153f8a 100644 --- a/engine/api/event/elasticsearch.go +++ b/engine/api/event/elasticsearch.go @@ -41,6 +41,7 @@ func PushInElasticSearch(ctx context.Context, db gorp.SqlExecutor, store cache.S continue } e.Payload = nil + log.Info(ctx, "sending event %q to %s services", e.EventType, sdk.TypeElasticsearch) _, code, errD := services.NewClient(db, esServices).DoJSONRequest(context.Background(), "POST", "/events", e, nil) if code >= 400 || errD != nil { log.Error(ctx, "PushInElasticSearch> Unable to send event %s to elasticsearch [%d]: %v", e.EventType, code, errD) diff --git a/engine/api/event/event.go b/engine/api/event/event.go index 64304eb3af..e956b18817 100644 --- a/engine/api/event/event.go +++ b/engine/api/event/event.go @@ -2,6 +2,7 @@ package event import ( "context" + "errors" "fmt" "os" "strconv" @@ -18,11 +19,13 @@ import ( ) // cache with go cache -var brokersConnectionCache = gocache.New(10*time.Minute, 6*time.Hour) -var publicBrokersConnectionCache = []Broker{} -var hostname, cdsname string -var brokers []Broker -var subscribers []chan<- sdk.Event +var ( + brokersConnectionCache = gocache.New(10*time.Minute, 6*time.Hour) + hostname, cdsname string + brokers []Broker + globalBroker Broker + subscribers []chan<- sdk.Event +) func init() { subscribers = make([]chan<- sdk.Event, 0) @@ -45,30 +48,6 @@ func getBroker(ctx context.Context, t string, option interface{}) (Broker, error return nil, fmt.Errorf("invalid Broker Type %s", t) } -// ResetPublicIntegrations load all integration of type Event and creates kafka brokers -func ResetPublicIntegrations(ctx context.Context, db *gorp.DbMap) error { - publicBrokersConnectionCache = []Broker{} - filterType := sdk.IntegrationTypeEvent - integrations, err := integration.LoadPublicModelsByTypeWithDecryption(db, &filterType) - if err != nil { - return sdk.WrapError(err, "cannot load public models for event type") - } - - for _, integration := range integrations { - for _, cfg := range integration.PublicConfigurations { - kafkaCfg := getKafkaConfig(cfg) - kafkaBroker, err := getBroker(ctx, "kafka", kafkaCfg) - if err != nil { - return sdk.WrapError(err, "cannot get broker for %s and user %s", cfg["broker url"].Value, cfg["username"].Value) - } - - publicBrokersConnectionCache = append(publicBrokersConnectionCache, kafkaBroker) - } - } - - return nil -} - func getKafkaConfig(cfg sdk.IntegrationConfig) KafkaConfig { kafkaCfg := KafkaConfig{ Enabled: true, @@ -121,7 +100,11 @@ func ResetEventIntegration(ctx context.Context, db gorp.SqlExecutor, eventIntegr } // Initialize initializes event system -func Initialize(ctx context.Context, db *gorp.DbMap, cache Store) error { +func Initialize(ctx context.Context, db *gorp.DbMap, cache Store, glolbalKafkaConfigs ...KafkaConfig) error { + if len(glolbalKafkaConfigs) > 1 { + return errors.New("only one global kafka global config is supported") + } + store = cache var err error hostname, err = os.Hostname() @@ -138,7 +121,15 @@ func Initialize(ctx context.Context, db *gorp.DbMap, cache Store) error { } } - return ResetPublicIntegrations(ctx, db) + if len(glolbalKafkaConfigs) == 1 && glolbalKafkaConfigs[0].BrokerAddresses != "" { + globalBroker, err = getBroker(ctx, "kafka", glolbalKafkaConfigs[0]) + if err != nil { + ctx = log.ContextWithStackTrace(ctx, err) + log.Error(ctx, "unable to init builtin kafka broker from config: %v", err) + } + } + + return nil } // Subscribe to CDS events @@ -163,15 +154,17 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) { s <- e } - // Send into public brokers - for _, b := range publicBrokersConnectionCache { - if err := b.sendEvent(&e); err != nil { + if globalBroker != nil { + log.Info(ctx, "sending event %q to global broker", e.EventType) + if err := globalBroker.sendEvent(&e); err != nil { log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err) } } + for _, eventIntegrationID := range e.EventIntegrationsID { brokerConnectionKey := strconv.FormatInt(eventIntegrationID, 10) brokerConnection, ok := brokersConnectionCache.Get(brokerConnectionKey) + var brokerConfig KafkaConfig if !ok { projInt, err := integration.LoadProjectIntegrationByIDWithClearPassword(ctx, db, eventIntegrationID) if err != nil { @@ -194,6 +187,7 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) { continue } brokerConnection = kafkaBroker + brokerConfig = kafkaCfg } broker, ok := brokerConnection.(Broker) @@ -203,6 +197,7 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) { } // Send into external brokers + log.Info(ctx, "sending event %q to %s", e.EventType, brokerConfig.BrokerAddresses) if err := broker.sendEvent(&e); err != nil { log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err) } diff --git a/engine/api/event/kafka.go b/engine/api/event/kafka.go index 44e03814f5..fce0993e4f 100644 --- a/engine/api/event/kafka.go +++ b/engine/api/event/kafka.go @@ -20,16 +20,16 @@ type KafkaClient struct { // KafkaConfig handles all config to connect to Kafka type KafkaConfig struct { - Enabled bool - BrokerAddresses string - User string - Password string - Version string - Topic string - MaxMessageByte int - DisableTLS bool - DisableSASL bool - ClientID string + Enabled bool `toml:"enabled" json:"-" default:"false" mapstructure:"enabled"` + BrokerAddresses string `toml:"broker" json:"-" mapstructure:"broker"` + User string `toml:"user" json:"-" mapstructure:"user"` + Password string `toml:"password" json:"-" mapstructure:"password"` + Version string `toml:"version" json:"-" mapstructure:"version"` + Topic string `toml:"topic" json:"-" mapstructure:"topic"` + MaxMessageByte int `toml:"maxMessageByte" json:"-" mapstructure:"maxMessageByte"` + DisableTLS bool `toml:"disableTLS" json:"-" mapstructure:"disableTLS"` + DisableSASL bool `toml:"disableSASL" json:"-" mapstructure:"disableSASL"` + ClientID string `toml:"clientID" json:"-" mapstructure:"clientID"` } // initialize returns broker, isInit and err if @@ -43,6 +43,18 @@ func (c *KafkaClient) initialize(ctx context.Context, options interface{}) (Brok conf.Topic == "" { return nil, fmt.Errorf("initKafka> Invalid Kafka Configuration") } + + if conf.MaxMessageByte == 0 { + conf.MaxMessageByte = 10000000 + } + + if conf.ClientID == "" { + conf.ClientID = conf.User + } + if conf.ClientID == "" { + conf.ClientID = "cds" + } + c.options = conf if err := c.initProducer(); err != nil { diff --git a/engine/api/integration.go b/engine/api/integration.go index ea548b275c..944c3a2d73 100644 --- a/engine/api/integration.go +++ b/engine/api/integration.go @@ -70,11 +70,6 @@ func (api *API) postIntegrationModelHandler() service.Handler { if m.Public { go propagatePublicIntegrationModel(ctx, api.mustDB(), api.Cache, *m, getAPIConsumer(ctx)) - if m.Event { - if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil { - return sdk.WrapError(err, "error while resetting public integrations") - } - } } return service.WriteJSON(w, m, http.StatusCreated) @@ -127,12 +122,6 @@ func (api *API) putIntegrationModelHandler() service.Handler { api.GoRoutines.Exec(ctx, "propagatePublicIntegrationModel", func(ctx context.Context) { propagatePublicIntegrationModel(ctx, api.mustDB(), api.Cache, *m, getAPIConsumer(ctx)) }) - - if m.Event { - if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil { - return sdk.WrapError(err, "error while resetting public integrations") - } - } } return service.WriteJSON(w, m, http.StatusOK) @@ -231,13 +220,6 @@ func (api *API) deleteIntegrationModelHandler() service.Handler { return sdk.WrapError(err, "Unable to commit tx") } - if old.Event && old.Public { - // reset outside the transaction - if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil { - return sdk.WrapError(err, "error while resetting public integrations") - } - } - return nil } } diff --git a/engine/api/integration/dao_model.go b/engine/api/integration/dao_model.go index f46e767b89..4217b3261e 100644 --- a/engine/api/integration/dao_model.go +++ b/engine/api/integration/dao_model.go @@ -39,46 +39,6 @@ func LoadModels(db gorp.SqlExecutor) ([]sdk.IntegrationModel, error) { return res, nil } -func LoadPublicModelsByTypeWithDecryption(db gorp.SqlExecutor, integrationType *sdk.IntegrationType) ([]sdk.IntegrationModel, error) { - q := "SELECT * from integration_model WHERE public = true" - if integrationType != nil { - switch *integrationType { - case sdk.IntegrationTypeEvent: - q += " AND integration_model.event = true" - case sdk.IntegrationTypeCompute: - q += " AND integration_model.compute = true" - case sdk.IntegrationTypeStorage: - q += " AND integration_model.storage = true" - case sdk.IntegrationTypeHook: - q += " AND integration_model.hook = true" - case sdk.IntegrationTypeDeployment: - q += " AND integration_model.deployment = true" - } - } - - query := gorpmapping.NewQuery(q) - var pms integrationModelSlice - - if err := gorpmapping.GetAll(context.Background(), db, query, &pms, gorpmapping.GetOptions.WithDecryption); err != nil { - return nil, err - } - - var res []sdk.IntegrationModel - for _, pm := range pms { - isValid, err := gorpmapping.CheckSignature(pm, pm.Signature) - if err != nil { - return nil, err - } - if !isValid { - log.Error(context.Background(), "integration.LoadModel> model %d data corrupted", pm.ID) - continue - } - res = append(res, pm.IntegrationModel) - } - - return res, nil -} - // LoadModel Load a integration model by its ID func LoadModel(ctx context.Context, db gorp.SqlExecutor, modelID int64) (sdk.IntegrationModel, error) { query := gorpmapping.NewQuery("SELECT * from integration_model where id = $1").Args(modelID) diff --git a/engine/api/integration/dao_model_test.go b/engine/api/integration/dao_model_test.go index 44e1db0ecd..f70aea857d 100644 --- a/engine/api/integration/dao_model_test.go +++ b/engine/api/integration/dao_model_test.go @@ -42,8 +42,4 @@ func TestCRUDModel(t *testing.T) { require.NoError(t, err) assert.True(t, len(models) > 1) - - filter := sdk.IntegrationTypeEvent - _, err = LoadPublicModelsByTypeWithDecryption(db, &filter) - require.NoError(t, err) } diff --git a/tests/05_sc_workflow_event_kafka.yml b/tests/05_sc_workflow_event_kafka.yml index 7c5b307da6..0fdf8e50ec 100644 --- a/tests/05_sc_workflow_event_kafka.yml +++ b/tests/05_sc_workflow_event_kafka.yml @@ -13,7 +13,6 @@ testcases: - name: import integrations steps: - - script: {{.cdsctl}} -f {{.cdsctl.config}} admin integration-model import ./fixtures/integrations/kafka-public.yml - script: {{.cdsctl}} -f {{.cdsctl.config}} project integration import ITSCWRKFLW18 ./fixtures/integrations/kafka.yml - script: {{.cdsctl}} -f {{.cdsctl.config}} project integration import ITSCWRKFLW18 ./fixtures/integrations/kafka-hook.yml @@ -24,12 +23,12 @@ testcases: - name: check if consumer kafka is started steps: - script: sleep 15 && {{.cdsctl}} -f {{.cdsctl.config}} admin services status --type=hooks|grep 'Hook Kafka Consumers' | grep OK - retry: 30 - delay: 10 + timeout: 30 - name: run workflow by sending a kafka event steps: - script: kafkacat -b localhost:9092 -t test.hook -T -P -l ./fixtures/ITSCWRKFLW18/input-kafka.json + timeout: 30 - name: check event in topic test.eventsproject steps: @@ -37,8 +36,9 @@ testcases: assertions: - result.code ShouldEqual 0 - "result.systemoutjson.type_event ShouldContainSubstring sdk.EventRunWorkflowJob" - retry: 30 + retry: 10 delay: 10 + timeout: 100 - name: check event in topic test.eventspublic steps: @@ -46,8 +46,9 @@ testcases: assertions: - result.code ShouldEqual 0 - "result.systemoutjson.type_event ShouldContainSubstring sdk.EventRunWorkflowJob" - retry: 30 + retry: 10 delay: 10 + timeout: 100 - name: check workflow steps: diff --git a/tests/fixtures/integrations/kafka-public.yml b/tests/fixtures/integrations/kafka-public.yml deleted file mode 100644 index 6cc403cbd9..0000000000 --- a/tests/fixtures/integrations/kafka-public.yml +++ /dev/null @@ -1,23 +0,0 @@ -name: myPublicKafkaIntegration -event: true -public: true -public_configurations: - myPublicKafkaIntegration: - "broker url": - type: string - value: "localhost:9092" - "topic": - type: string - value: "test.eventspublic" - "username": - type: string - value: "" - "password": - type: password - value: "" - "disableTLS": - type: string - value: "true" - "disableSASL": - type: string - value: "true"