Skip to content

Commit

Permalink
feat(api): don't send event to public event integrations (#6066)
Browse files Browse the repository at this point in the history
Signed-off-by: francois  samin <[email protected]>
  • Loading branch information
fsamin authored Jan 24, 2022
1 parent 2b56b04 commit 008ec54
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 171 deletions.
37 changes: 1 addition & 36 deletions docs/content/docs/integrations/kafka/kafka_events.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,42 +56,7 @@ Import the integration on your CDS Project with:
cdsctl project integration import PROJECT_KEY project-configuration.yml
```

Then, as a standard user, you can add a [Kafka Hook]({{<relref "/docs/concepts/workflow/hooks/kafka-hook.md">}}) on your workflow.

### Create a Public Kafka Integration for whole CDS Projects

You can also add a Kafka Integration with cdsctl. As a CDS Administrator,
this allows you to propose a Public Kafka Integration, available on all CDS Projects.

Create a file `public-configuration.yml`:

```yml
name: your-kafka-integration
event: true
public: true
public_configurations:
name-of-integration:
"broker url":
type: string
value: "n1.o1.your-broker:9093,n2.o1.n1.o1.your-broker:9093,n3.o1.n1.o1.your-broker:9093"
"topic":
type: string
value: "your-topic.events"
"username":
type: string
value: "your-topic.cds-reader"
"password":
type: password
value: xxxxxxxx
```
Import the integration with :
```bash
cdsctl admin integration-model import public-configuration.yml
```

Then, as a standard user, you can add a [Kafka Hook]({{<relref "/docs/concepts/workflow/hooks/kafka-hook.md">}}) on your workflow.
Then, as a standard user, you can use your kafka integration for workflow notifications.

### One Integration, two use case

Expand Down
5 changes: 4 additions & 1 deletion engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ type Configuration struct {
DefaultRetentionPolicy string `toml:"defaultRetentionPolicy" comment:"Default rule for workflow run retention policy, this rule can be overridden on each workflow.\n Example: 'return run_days_before < 365' keeps runs for one year." json:"defaultRetentionPolicy" default:"return run_days_before < 365"`
DisablePurgeDeletion bool `toml:"disablePurgeDeletion" comment:"Allow you to disable the deletion part of the purge. Workflow run will only be marked as delete" json:"disablePurgeDeletion" default:"false"`
} `toml:"workflow" comment:"######################\n 'Workflow' global configuration \n######################" json:"workflow"`
EventBus struct {
GlobalKafka event.KafkaConfig `toml:"globalKafka" default:"false" json:"globalKafka" mapstructure:"globalKafka"`
} `toml:"events" comment:"######################\n Event bus configuration \n######################" json:"events" mapstructure:"events"`
}

// DefaultValues is the struc for API Default configuration default values
Expand Down Expand Up @@ -683,7 +686,7 @@ func (a *API) Serve(ctx context.Context) error {
}

log.Info(ctx, "Initializing event broker...")
if err := event.Initialize(ctx, a.mustDB(), a.Cache); err != nil {
if err := event.Initialize(ctx, a.mustDB(), a.Cache, a.Config.EventBus.GlobalKafka); err != nil {
log.Error(ctx, "error while initializing event system: %s", err)
}

Expand Down
1 change: 1 addition & 0 deletions engine/api/event/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func PushInElasticSearch(ctx context.Context, db gorp.SqlExecutor, store cache.S
continue
}
e.Payload = nil
log.Info(ctx, "sending event %q to %s services", e.EventType, sdk.TypeElasticsearch)
_, code, errD := services.NewClient(db, esServices).DoJSONRequest(context.Background(), "POST", "/events", e, nil)
if code >= 400 || errD != nil {
log.Error(ctx, "PushInElasticSearch> Unable to send event %s to elasticsearch [%d]: %v", e.EventType, code, errD)
Expand Down
63 changes: 29 additions & 34 deletions engine/api/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package event

import (
"context"
"errors"
"fmt"
"os"
"strconv"
Expand All @@ -18,11 +19,13 @@ import (
)

// cache with go cache
var brokersConnectionCache = gocache.New(10*time.Minute, 6*time.Hour)
var publicBrokersConnectionCache = []Broker{}
var hostname, cdsname string
var brokers []Broker
var subscribers []chan<- sdk.Event
var (
brokersConnectionCache = gocache.New(10*time.Minute, 6*time.Hour)
hostname, cdsname string
brokers []Broker
globalBroker Broker
subscribers []chan<- sdk.Event
)

func init() {
subscribers = make([]chan<- sdk.Event, 0)
Expand All @@ -45,30 +48,6 @@ func getBroker(ctx context.Context, t string, option interface{}) (Broker, error
return nil, fmt.Errorf("invalid Broker Type %s", t)
}

// ResetPublicIntegrations load all integration of type Event and creates kafka brokers
func ResetPublicIntegrations(ctx context.Context, db *gorp.DbMap) error {
publicBrokersConnectionCache = []Broker{}
filterType := sdk.IntegrationTypeEvent
integrations, err := integration.LoadPublicModelsByTypeWithDecryption(db, &filterType)
if err != nil {
return sdk.WrapError(err, "cannot load public models for event type")
}

for _, integration := range integrations {
for _, cfg := range integration.PublicConfigurations {
kafkaCfg := getKafkaConfig(cfg)
kafkaBroker, err := getBroker(ctx, "kafka", kafkaCfg)
if err != nil {
return sdk.WrapError(err, "cannot get broker for %s and user %s", cfg["broker url"].Value, cfg["username"].Value)
}

publicBrokersConnectionCache = append(publicBrokersConnectionCache, kafkaBroker)
}
}

return nil
}

func getKafkaConfig(cfg sdk.IntegrationConfig) KafkaConfig {
kafkaCfg := KafkaConfig{
Enabled: true,
Expand Down Expand Up @@ -121,7 +100,11 @@ func ResetEventIntegration(ctx context.Context, db gorp.SqlExecutor, eventIntegr
}

// Initialize initializes event system
func Initialize(ctx context.Context, db *gorp.DbMap, cache Store) error {
func Initialize(ctx context.Context, db *gorp.DbMap, cache Store, glolbalKafkaConfigs ...KafkaConfig) error {
if len(glolbalKafkaConfigs) > 1 {
return errors.New("only one global kafka global config is supported")
}

store = cache
var err error
hostname, err = os.Hostname()
Expand All @@ -138,7 +121,15 @@ func Initialize(ctx context.Context, db *gorp.DbMap, cache Store) error {
}
}

return ResetPublicIntegrations(ctx, db)
if len(glolbalKafkaConfigs) == 1 && glolbalKafkaConfigs[0].BrokerAddresses != "" {
globalBroker, err = getBroker(ctx, "kafka", glolbalKafkaConfigs[0])
if err != nil {
ctx = log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "unable to init builtin kafka broker from config: %v", err)
}
}

return nil
}

// Subscribe to CDS events
Expand All @@ -163,15 +154,17 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
s <- e
}

// Send into public brokers
for _, b := range publicBrokersConnectionCache {
if err := b.sendEvent(&e); err != nil {
if globalBroker != nil {
log.Info(ctx, "sending event %q to global broker", e.EventType)
if err := globalBroker.sendEvent(&e); err != nil {
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
}
}

for _, eventIntegrationID := range e.EventIntegrationsID {
brokerConnectionKey := strconv.FormatInt(eventIntegrationID, 10)
brokerConnection, ok := brokersConnectionCache.Get(brokerConnectionKey)
var brokerConfig KafkaConfig
if !ok {
projInt, err := integration.LoadProjectIntegrationByIDWithClearPassword(ctx, db, eventIntegrationID)
if err != nil {
Expand All @@ -194,6 +187,7 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
continue
}
brokerConnection = kafkaBroker
brokerConfig = kafkaCfg
}

broker, ok := brokerConnection.(Broker)
Expand All @@ -203,6 +197,7 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
}

// Send into external brokers
log.Info(ctx, "sending event %q to %s", e.EventType, brokerConfig.BrokerAddresses)
if err := broker.sendEvent(&e); err != nil {
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
}
Expand Down
32 changes: 22 additions & 10 deletions engine/api/event/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ type KafkaClient struct {

// KafkaConfig handles all config to connect to Kafka
type KafkaConfig struct {
Enabled bool
BrokerAddresses string
User string
Password string
Version string
Topic string
MaxMessageByte int
DisableTLS bool
DisableSASL bool
ClientID string
Enabled bool `toml:"enabled" json:"-" default:"false" mapstructure:"enabled"`
BrokerAddresses string `toml:"broker" json:"-" mapstructure:"broker"`
User string `toml:"user" json:"-" mapstructure:"user"`
Password string `toml:"password" json:"-" mapstructure:"password"`
Version string `toml:"version" json:"-" mapstructure:"version"`
Topic string `toml:"topic" json:"-" mapstructure:"topic"`
MaxMessageByte int `toml:"maxMessageByte" json:"-" mapstructure:"maxMessageByte"`
DisableTLS bool `toml:"disableTLS" json:"-" mapstructure:"disableTLS"`
DisableSASL bool `toml:"disableSASL" json:"-" mapstructure:"disableSASL"`
ClientID string `toml:"clientID" json:"-" mapstructure:"clientID"`
}

// initialize returns broker, isInit and err if
Expand All @@ -43,6 +43,18 @@ func (c *KafkaClient) initialize(ctx context.Context, options interface{}) (Brok
conf.Topic == "" {
return nil, fmt.Errorf("initKafka> Invalid Kafka Configuration")
}

if conf.MaxMessageByte == 0 {
conf.MaxMessageByte = 10000000
}

if conf.ClientID == "" {
conf.ClientID = conf.User
}
if conf.ClientID == "" {
conf.ClientID = "cds"
}

c.options = conf

if err := c.initProducer(); err != nil {
Expand Down
18 changes: 0 additions & 18 deletions engine/api/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@ func (api *API) postIntegrationModelHandler() service.Handler {

if m.Public {
go propagatePublicIntegrationModel(ctx, api.mustDB(), api.Cache, *m, getAPIConsumer(ctx))
if m.Event {
if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil {
return sdk.WrapError(err, "error while resetting public integrations")
}
}
}

return service.WriteJSON(w, m, http.StatusCreated)
Expand Down Expand Up @@ -127,12 +122,6 @@ func (api *API) putIntegrationModelHandler() service.Handler {
api.GoRoutines.Exec(ctx, "propagatePublicIntegrationModel", func(ctx context.Context) {
propagatePublicIntegrationModel(ctx, api.mustDB(), api.Cache, *m, getAPIConsumer(ctx))
})

if m.Event {
if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil {
return sdk.WrapError(err, "error while resetting public integrations")
}
}
}

return service.WriteJSON(w, m, http.StatusOK)
Expand Down Expand Up @@ -231,13 +220,6 @@ func (api *API) deleteIntegrationModelHandler() service.Handler {
return sdk.WrapError(err, "Unable to commit tx")
}

if old.Event && old.Public {
// reset outside the transaction
if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil {
return sdk.WrapError(err, "error while resetting public integrations")
}
}

return nil
}
}
40 changes: 0 additions & 40 deletions engine/api/integration/dao_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,46 +39,6 @@ func LoadModels(db gorp.SqlExecutor) ([]sdk.IntegrationModel, error) {
return res, nil
}

func LoadPublicModelsByTypeWithDecryption(db gorp.SqlExecutor, integrationType *sdk.IntegrationType) ([]sdk.IntegrationModel, error) {
q := "SELECT * from integration_model WHERE public = true"
if integrationType != nil {
switch *integrationType {
case sdk.IntegrationTypeEvent:
q += " AND integration_model.event = true"
case sdk.IntegrationTypeCompute:
q += " AND integration_model.compute = true"
case sdk.IntegrationTypeStorage:
q += " AND integration_model.storage = true"
case sdk.IntegrationTypeHook:
q += " AND integration_model.hook = true"
case sdk.IntegrationTypeDeployment:
q += " AND integration_model.deployment = true"
}
}

query := gorpmapping.NewQuery(q)
var pms integrationModelSlice

if err := gorpmapping.GetAll(context.Background(), db, query, &pms, gorpmapping.GetOptions.WithDecryption); err != nil {
return nil, err
}

var res []sdk.IntegrationModel
for _, pm := range pms {
isValid, err := gorpmapping.CheckSignature(pm, pm.Signature)
if err != nil {
return nil, err
}
if !isValid {
log.Error(context.Background(), "integration.LoadModel> model %d data corrupted", pm.ID)
continue
}
res = append(res, pm.IntegrationModel)
}

return res, nil
}

// LoadModel Load a integration model by its ID
func LoadModel(ctx context.Context, db gorp.SqlExecutor, modelID int64) (sdk.IntegrationModel, error) {
query := gorpmapping.NewQuery("SELECT * from integration_model where id = $1").Args(modelID)
Expand Down
4 changes: 0 additions & 4 deletions engine/api/integration/dao_model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,4 @@ func TestCRUDModel(t *testing.T) {
require.NoError(t, err)

assert.True(t, len(models) > 1)

filter := sdk.IntegrationTypeEvent
_, err = LoadPublicModelsByTypeWithDecryption(db, &filter)
require.NoError(t, err)
}
11 changes: 6 additions & 5 deletions tests/05_sc_workflow_event_kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ testcases:

- name: import integrations
steps:
- script: {{.cdsctl}} -f {{.cdsctl.config}} admin integration-model import ./fixtures/integrations/kafka-public.yml
- script: {{.cdsctl}} -f {{.cdsctl.config}} project integration import ITSCWRKFLW18 ./fixtures/integrations/kafka.yml
- script: {{.cdsctl}} -f {{.cdsctl.config}} project integration import ITSCWRKFLW18 ./fixtures/integrations/kafka-hook.yml

Expand All @@ -24,30 +23,32 @@ testcases:
- name: check if consumer kafka is started
steps:
- script: sleep 15 && {{.cdsctl}} -f {{.cdsctl.config}} admin services status --type=hooks|grep 'Hook Kafka Consumers' | grep OK
retry: 30
delay: 10
timeout: 30

- name: run workflow by sending a kafka event
steps:
- script: kafkacat -b localhost:9092 -t test.hook -T -P -l ./fixtures/ITSCWRKFLW18/input-kafka.json
timeout: 30

- name: check event in topic test.eventsproject
steps:
- script: kafkacat -b localhost:9092 -t test.eventsproject -C -o -1 -c 1
assertions:
- result.code ShouldEqual 0
- "result.systemoutjson.type_event ShouldContainSubstring sdk.EventRunWorkflowJob"
retry: 30
retry: 10
delay: 10
timeout: 100

- name: check event in topic test.eventspublic
steps:
- script: kafkacat -b localhost:9092 -t test.eventspublic -C -o -1 -c 1
assertions:
- result.code ShouldEqual 0
- "result.systemoutjson.type_event ShouldContainSubstring sdk.EventRunWorkflowJob"
retry: 30
retry: 10
delay: 10
timeout: 100

- name: check workflow
steps:
Expand Down
Loading

0 comments on commit 008ec54

Please sign in to comment.