From 801419c03f48a48160e1581a703dd12c4d229992 Mon Sep 17 00:00:00 2001 From: francois samin Date: Tue, 18 Jan 2022 16:51:43 +0100 Subject: [PATCH] chore(api): don't send event to public event integrations Signed-off-by: francois samin --- engine/api/event/event.go | 36 +++------------------ engine/api/integration.go | 18 ----------- engine/api/integration/dao_model.go | 40 ------------------------ engine/api/integration/dao_model_test.go | 4 --- 4 files changed, 4 insertions(+), 94 deletions(-) diff --git a/engine/api/event/event.go b/engine/api/event/event.go index 64304eb3af..e393dbc97c 100644 --- a/engine/api/event/event.go +++ b/engine/api/event/event.go @@ -19,7 +19,6 @@ import ( // cache with go cache var brokersConnectionCache = gocache.New(10*time.Minute, 6*time.Hour) -var publicBrokersConnectionCache = []Broker{} var hostname, cdsname string var brokers []Broker var subscribers []chan<- sdk.Event @@ -45,30 +44,6 @@ func getBroker(ctx context.Context, t string, option interface{}) (Broker, error return nil, fmt.Errorf("invalid Broker Type %s", t) } -// ResetPublicIntegrations load all integration of type Event and creates kafka brokers -func ResetPublicIntegrations(ctx context.Context, db *gorp.DbMap) error { - publicBrokersConnectionCache = []Broker{} - filterType := sdk.IntegrationTypeEvent - integrations, err := integration.LoadPublicModelsByTypeWithDecryption(db, &filterType) - if err != nil { - return sdk.WrapError(err, "cannot load public models for event type") - } - - for _, integration := range integrations { - for _, cfg := range integration.PublicConfigurations { - kafkaCfg := getKafkaConfig(cfg) - kafkaBroker, err := getBroker(ctx, "kafka", kafkaCfg) - if err != nil { - return sdk.WrapError(err, "cannot get broker for %s and user %s", cfg["broker url"].Value, cfg["username"].Value) - } - - publicBrokersConnectionCache = append(publicBrokersConnectionCache, kafkaBroker) - } - } - - return nil -} - func getKafkaConfig(cfg sdk.IntegrationConfig) KafkaConfig { kafkaCfg := KafkaConfig{ Enabled: true, @@ -138,7 +113,7 @@ func Initialize(ctx context.Context, db *gorp.DbMap, cache Store) error { } } - return ResetPublicIntegrations(ctx, db) + return nil } // Subscribe to CDS events @@ -163,15 +138,10 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) { s <- e } - // Send into public brokers - for _, b := range publicBrokersConnectionCache { - if err := b.sendEvent(&e); err != nil { - log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err) - } - } for _, eventIntegrationID := range e.EventIntegrationsID { brokerConnectionKey := strconv.FormatInt(eventIntegrationID, 10) brokerConnection, ok := brokersConnectionCache.Get(brokerConnectionKey) + var brokerConfig KafkaConfig if !ok { projInt, err := integration.LoadProjectIntegrationByIDWithClearPassword(ctx, db, eventIntegrationID) if err != nil { @@ -194,6 +164,7 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) { continue } brokerConnection = kafkaBroker + brokerConfig = kafkaCfg } broker, ok := brokerConnection.(Broker) @@ -203,6 +174,7 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) { } // Send into external brokers + log.Info(ctx, "sending event %q to %s", e.EventType, brokerConfig.BrokerAddresses) if err := broker.sendEvent(&e); err != nil { log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err) } diff --git a/engine/api/integration.go b/engine/api/integration.go index ea548b275c..944c3a2d73 100644 --- a/engine/api/integration.go +++ b/engine/api/integration.go @@ -70,11 +70,6 @@ func (api *API) postIntegrationModelHandler() service.Handler { if m.Public { go propagatePublicIntegrationModel(ctx, api.mustDB(), api.Cache, *m, getAPIConsumer(ctx)) - if m.Event { - if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil { - return sdk.WrapError(err, "error while resetting public integrations") - } - } } return service.WriteJSON(w, m, http.StatusCreated) @@ -127,12 +122,6 @@ func (api *API) putIntegrationModelHandler() service.Handler { api.GoRoutines.Exec(ctx, "propagatePublicIntegrationModel", func(ctx context.Context) { propagatePublicIntegrationModel(ctx, api.mustDB(), api.Cache, *m, getAPIConsumer(ctx)) }) - - if m.Event { - if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil { - return sdk.WrapError(err, "error while resetting public integrations") - } - } } return service.WriteJSON(w, m, http.StatusOK) @@ -231,13 +220,6 @@ func (api *API) deleteIntegrationModelHandler() service.Handler { return sdk.WrapError(err, "Unable to commit tx") } - if old.Event && old.Public { - // reset outside the transaction - if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil { - return sdk.WrapError(err, "error while resetting public integrations") - } - } - return nil } } diff --git a/engine/api/integration/dao_model.go b/engine/api/integration/dao_model.go index f46e767b89..4217b3261e 100644 --- a/engine/api/integration/dao_model.go +++ b/engine/api/integration/dao_model.go @@ -39,46 +39,6 @@ func LoadModels(db gorp.SqlExecutor) ([]sdk.IntegrationModel, error) { return res, nil } -func LoadPublicModelsByTypeWithDecryption(db gorp.SqlExecutor, integrationType *sdk.IntegrationType) ([]sdk.IntegrationModel, error) { - q := "SELECT * from integration_model WHERE public = true" - if integrationType != nil { - switch *integrationType { - case sdk.IntegrationTypeEvent: - q += " AND integration_model.event = true" - case sdk.IntegrationTypeCompute: - q += " AND integration_model.compute = true" - case sdk.IntegrationTypeStorage: - q += " AND integration_model.storage = true" - case sdk.IntegrationTypeHook: - q += " AND integration_model.hook = true" - case sdk.IntegrationTypeDeployment: - q += " AND integration_model.deployment = true" - } - } - - query := gorpmapping.NewQuery(q) - var pms integrationModelSlice - - if err := gorpmapping.GetAll(context.Background(), db, query, &pms, gorpmapping.GetOptions.WithDecryption); err != nil { - return nil, err - } - - var res []sdk.IntegrationModel - for _, pm := range pms { - isValid, err := gorpmapping.CheckSignature(pm, pm.Signature) - if err != nil { - return nil, err - } - if !isValid { - log.Error(context.Background(), "integration.LoadModel> model %d data corrupted", pm.ID) - continue - } - res = append(res, pm.IntegrationModel) - } - - return res, nil -} - // LoadModel Load a integration model by its ID func LoadModel(ctx context.Context, db gorp.SqlExecutor, modelID int64) (sdk.IntegrationModel, error) { query := gorpmapping.NewQuery("SELECT * from integration_model where id = $1").Args(modelID) diff --git a/engine/api/integration/dao_model_test.go b/engine/api/integration/dao_model_test.go index 44e1db0ecd..f70aea857d 100644 --- a/engine/api/integration/dao_model_test.go +++ b/engine/api/integration/dao_model_test.go @@ -42,8 +42,4 @@ func TestCRUDModel(t *testing.T) { require.NoError(t, err) assert.True(t, len(models) > 1) - - filter := sdk.IntegrationTypeEvent - _, err = LoadPublicModelsByTypeWithDecryption(db, &filter) - require.NoError(t, err) }