Skip to content

Commit

Permalink
chore(api): don't send event to public event integrations
Browse files Browse the repository at this point in the history
Signed-off-by: francois  samin <[email protected]>
  • Loading branch information
fsamin committed Jan 18, 2022
1 parent 5708807 commit 801419c
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 94 deletions.
36 changes: 4 additions & 32 deletions engine/api/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

// cache with go cache
var brokersConnectionCache = gocache.New(10*time.Minute, 6*time.Hour)
var publicBrokersConnectionCache = []Broker{}
var hostname, cdsname string
var brokers []Broker
var subscribers []chan<- sdk.Event
Expand All @@ -45,30 +44,6 @@ func getBroker(ctx context.Context, t string, option interface{}) (Broker, error
return nil, fmt.Errorf("invalid Broker Type %s", t)
}

// ResetPublicIntegrations load all integration of type Event and creates kafka brokers
func ResetPublicIntegrations(ctx context.Context, db *gorp.DbMap) error {
publicBrokersConnectionCache = []Broker{}
filterType := sdk.IntegrationTypeEvent
integrations, err := integration.LoadPublicModelsByTypeWithDecryption(db, &filterType)
if err != nil {
return sdk.WrapError(err, "cannot load public models for event type")
}

for _, integration := range integrations {
for _, cfg := range integration.PublicConfigurations {
kafkaCfg := getKafkaConfig(cfg)
kafkaBroker, err := getBroker(ctx, "kafka", kafkaCfg)
if err != nil {
return sdk.WrapError(err, "cannot get broker for %s and user %s", cfg["broker url"].Value, cfg["username"].Value)
}

publicBrokersConnectionCache = append(publicBrokersConnectionCache, kafkaBroker)
}
}

return nil
}

func getKafkaConfig(cfg sdk.IntegrationConfig) KafkaConfig {
kafkaCfg := KafkaConfig{
Enabled: true,
Expand Down Expand Up @@ -138,7 +113,7 @@ func Initialize(ctx context.Context, db *gorp.DbMap, cache Store) error {
}
}

return ResetPublicIntegrations(ctx, db)
return nil
}

// Subscribe to CDS events
Expand All @@ -163,15 +138,10 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
s <- e
}

// Send into public brokers
for _, b := range publicBrokersConnectionCache {
if err := b.sendEvent(&e); err != nil {
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
}
}
for _, eventIntegrationID := range e.EventIntegrationsID {
brokerConnectionKey := strconv.FormatInt(eventIntegrationID, 10)
brokerConnection, ok := brokersConnectionCache.Get(brokerConnectionKey)
var brokerConfig KafkaConfig
if !ok {
projInt, err := integration.LoadProjectIntegrationByIDWithClearPassword(ctx, db, eventIntegrationID)
if err != nil {
Expand All @@ -194,6 +164,7 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
continue
}
brokerConnection = kafkaBroker
brokerConfig = kafkaCfg
}

broker, ok := brokerConnection.(Broker)
Expand All @@ -203,6 +174,7 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
}

// Send into external brokers
log.Info(ctx, "sending event %q to %s", e.EventType, brokerConfig.BrokerAddresses)
if err := broker.sendEvent(&e); err != nil {
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
}
Expand Down
18 changes: 0 additions & 18 deletions engine/api/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@ func (api *API) postIntegrationModelHandler() service.Handler {

if m.Public {
go propagatePublicIntegrationModel(ctx, api.mustDB(), api.Cache, *m, getAPIConsumer(ctx))
if m.Event {
if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil {
return sdk.WrapError(err, "error while resetting public integrations")
}
}
}

return service.WriteJSON(w, m, http.StatusCreated)
Expand Down Expand Up @@ -127,12 +122,6 @@ func (api *API) putIntegrationModelHandler() service.Handler {
api.GoRoutines.Exec(ctx, "propagatePublicIntegrationModel", func(ctx context.Context) {
propagatePublicIntegrationModel(ctx, api.mustDB(), api.Cache, *m, getAPIConsumer(ctx))
})

if m.Event {
if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil {
return sdk.WrapError(err, "error while resetting public integrations")
}
}
}

return service.WriteJSON(w, m, http.StatusOK)
Expand Down Expand Up @@ -231,13 +220,6 @@ func (api *API) deleteIntegrationModelHandler() service.Handler {
return sdk.WrapError(err, "Unable to commit tx")
}

if old.Event && old.Public {
// reset outside the transaction
if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil {
return sdk.WrapError(err, "error while resetting public integrations")
}
}

return nil
}
}
40 changes: 0 additions & 40 deletions engine/api/integration/dao_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,46 +39,6 @@ func LoadModels(db gorp.SqlExecutor) ([]sdk.IntegrationModel, error) {
return res, nil
}

func LoadPublicModelsByTypeWithDecryption(db gorp.SqlExecutor, integrationType *sdk.IntegrationType) ([]sdk.IntegrationModel, error) {
q := "SELECT * from integration_model WHERE public = true"
if integrationType != nil {
switch *integrationType {
case sdk.IntegrationTypeEvent:
q += " AND integration_model.event = true"
case sdk.IntegrationTypeCompute:
q += " AND integration_model.compute = true"
case sdk.IntegrationTypeStorage:
q += " AND integration_model.storage = true"
case sdk.IntegrationTypeHook:
q += " AND integration_model.hook = true"
case sdk.IntegrationTypeDeployment:
q += " AND integration_model.deployment = true"
}
}

query := gorpmapping.NewQuery(q)
var pms integrationModelSlice

if err := gorpmapping.GetAll(context.Background(), db, query, &pms, gorpmapping.GetOptions.WithDecryption); err != nil {
return nil, err
}

var res []sdk.IntegrationModel
for _, pm := range pms {
isValid, err := gorpmapping.CheckSignature(pm, pm.Signature)
if err != nil {
return nil, err
}
if !isValid {
log.Error(context.Background(), "integration.LoadModel> model %d data corrupted", pm.ID)
continue
}
res = append(res, pm.IntegrationModel)
}

return res, nil
}

// LoadModel Load a integration model by its ID
func LoadModel(ctx context.Context, db gorp.SqlExecutor, modelID int64) (sdk.IntegrationModel, error) {
query := gorpmapping.NewQuery("SELECT * from integration_model where id = $1").Args(modelID)
Expand Down
4 changes: 0 additions & 4 deletions engine/api/integration/dao_model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,4 @@ func TestCRUDModel(t *testing.T) {
require.NoError(t, err)

assert.True(t, len(models) > 1)

filter := sdk.IntegrationTypeEvent
_, err = LoadPublicModelsByTypeWithDecryption(db, &filter)
require.NoError(t, err)
}

0 comments on commit 801419c

Please sign in to comment.