Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): don't send event to public event integrations #6066

Merged
merged 7 commits into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 1 addition & 36 deletions docs/content/docs/integrations/kafka/kafka_events.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,42 +56,7 @@ Import the integration on your CDS Project with:
cdsctl project integration import PROJECT_KEY project-configuration.yml
```

Then, as a standard user, you can add a [Kafka Hook]({{<relref "/docs/concepts/workflow/hooks/kafka-hook.md">}}) on your workflow.

### Create a Public Kafka Integration for whole CDS Projects

You can also add a Kafka Integration with cdsctl. As a CDS Administrator,
this allows you to propose a Public Kafka Integration, available on all CDS Projects.

Create a file `public-configuration.yml`:

```yml
name: your-kafka-integration
event: true
public: true
public_configurations:
name-of-integration:
"broker url":
type: string
value: "n1.o1.your-broker:9093,n2.o1.n1.o1.your-broker:9093,n3.o1.n1.o1.your-broker:9093"
"topic":
type: string
value: "your-topic.events"
"username":
type: string
value: "your-topic.cds-reader"
"password":
type: password
value: xxxxxxxx
```

Import the integration with :

```bash
cdsctl admin integration-model import public-configuration.yml
```

Then, as a standard user, you can add a [Kafka Hook]({{<relref "/docs/concepts/workflow/hooks/kafka-hook.md">}}) on your workflow.
Then, as a standard user, you can use your kafka integration for workflow notifications.

### One Integration, two use case

Expand Down
5 changes: 4 additions & 1 deletion engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ type Configuration struct {
DefaultRetentionPolicy string `toml:"defaultRetentionPolicy" comment:"Default rule for workflow run retention policy, this rule can be overridden on each workflow.\n Example: 'return run_days_before < 365' keeps runs for one year." json:"defaultRetentionPolicy" default:"return run_days_before < 365"`
DisablePurgeDeletion bool `toml:"disablePurgeDeletion" comment:"Allow you to disable the deletion part of the purge. Workflow run will only be marked as delete" json:"disablePurgeDeletion" default:"false"`
} `toml:"workflow" comment:"######################\n 'Workflow' global configuration \n######################" json:"workflow"`
EventBus struct {
GlobalKafka event.KafkaConfig `toml:"globalKafka" default:"false" json:"globalKafka" mapstructure:"globalKafka"`
} `toml:"events" comment:"######################\n Event bus configuration \n######################" json:"events" mapstructure:"events"`
}

// DefaultValues is the struc for API Default configuration default values
Expand Down Expand Up @@ -688,7 +691,7 @@ func (a *API) Serve(ctx context.Context) error {
}

log.Info(ctx, "Initializing event broker...")
if err := event.Initialize(ctx, a.mustDB(), a.Cache); err != nil {
if err := event.Initialize(ctx, a.mustDB(), a.Cache, a.Config.EventBus.GlobalKafka); err != nil {
log.Error(ctx, "error while initializing event system: %s", err)
}

Expand Down
1 change: 1 addition & 0 deletions engine/api/event/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func PushInElasticSearch(ctx context.Context, db gorp.SqlExecutor, store cache.S
continue
}
e.Payload = nil
log.Info(ctx, "sending event %q to %s services", e.EventType, sdk.TypeElasticsearch)
_, code, errD := services.NewClient(db, esServices).DoJSONRequest(context.Background(), "POST", "/events", e, nil)
if code >= 400 || errD != nil {
log.Error(ctx, "PushInElasticSearch> Unable to send event %s to elasticsearch [%d]: %v", e.EventType, code, errD)
Expand Down
63 changes: 29 additions & 34 deletions engine/api/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package event

import (
"context"
"errors"
"fmt"
"os"
"strconv"
Expand All @@ -18,11 +19,13 @@ import (
)

// cache with go cache
var brokersConnectionCache = gocache.New(10*time.Minute, 6*time.Hour)
var publicBrokersConnectionCache = []Broker{}
var hostname, cdsname string
var brokers []Broker
var subscribers []chan<- sdk.Event
var (
brokersConnectionCache = gocache.New(10*time.Minute, 6*time.Hour)
hostname, cdsname string
brokers []Broker
globalBroker Broker
subscribers []chan<- sdk.Event
)

func init() {
subscribers = make([]chan<- sdk.Event, 0)
Expand All @@ -45,30 +48,6 @@ func getBroker(ctx context.Context, t string, option interface{}) (Broker, error
return nil, fmt.Errorf("invalid Broker Type %s", t)
}

// ResetPublicIntegrations load all integration of type Event and creates kafka brokers
func ResetPublicIntegrations(ctx context.Context, db *gorp.DbMap) error {
publicBrokersConnectionCache = []Broker{}
filterType := sdk.IntegrationTypeEvent
integrations, err := integration.LoadPublicModelsByTypeWithDecryption(db, &filterType)
if err != nil {
return sdk.WrapError(err, "cannot load public models for event type")
}

for _, integration := range integrations {
for _, cfg := range integration.PublicConfigurations {
kafkaCfg := getKafkaConfig(cfg)
kafkaBroker, err := getBroker(ctx, "kafka", kafkaCfg)
if err != nil {
return sdk.WrapError(err, "cannot get broker for %s and user %s", cfg["broker url"].Value, cfg["username"].Value)
}

publicBrokersConnectionCache = append(publicBrokersConnectionCache, kafkaBroker)
}
}

return nil
}

func getKafkaConfig(cfg sdk.IntegrationConfig) KafkaConfig {
kafkaCfg := KafkaConfig{
Enabled: true,
Expand Down Expand Up @@ -121,7 +100,11 @@ func ResetEventIntegration(ctx context.Context, db gorp.SqlExecutor, eventIntegr
}

// Initialize initializes event system
func Initialize(ctx context.Context, db *gorp.DbMap, cache Store) error {
func Initialize(ctx context.Context, db *gorp.DbMap, cache Store, glolbalKafkaConfigs ...KafkaConfig) error {
if len(glolbalKafkaConfigs) > 1 {
return errors.New("only one global kafka global config is supported")
}

store = cache
var err error
hostname, err = os.Hostname()
Expand All @@ -138,7 +121,15 @@ func Initialize(ctx context.Context, db *gorp.DbMap, cache Store) error {
}
}

return ResetPublicIntegrations(ctx, db)
if len(glolbalKafkaConfigs) == 1 && glolbalKafkaConfigs[0].BrokerAddresses != "" {
globalBroker, err = getBroker(ctx, "kafka", glolbalKafkaConfigs[0])
if err != nil {
ctx = log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "unable to init builtin kafka broker from config: %v", err)
}
}

return nil
}

// Subscribe to CDS events
Expand All @@ -163,15 +154,17 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
s <- e
}

// Send into public brokers
for _, b := range publicBrokersConnectionCache {
if err := b.sendEvent(&e); err != nil {
if globalBroker != nil {
log.Info(ctx, "sending event %q to global broker", e.EventType)
if err := globalBroker.sendEvent(&e); err != nil {
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
}
}

for _, eventIntegrationID := range e.EventIntegrationsID {
brokerConnectionKey := strconv.FormatInt(eventIntegrationID, 10)
brokerConnection, ok := brokersConnectionCache.Get(brokerConnectionKey)
var brokerConfig KafkaConfig
if !ok {
projInt, err := integration.LoadProjectIntegrationByIDWithClearPassword(ctx, db, eventIntegrationID)
if err != nil {
Expand All @@ -194,6 +187,7 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
continue
}
brokerConnection = kafkaBroker
brokerConfig = kafkaCfg
}

broker, ok := brokerConnection.(Broker)
Expand All @@ -203,6 +197,7 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
}

// Send into external brokers
log.Info(ctx, "sending event %q to %s", e.EventType, brokerConfig.BrokerAddresses)
if err := broker.sendEvent(&e); err != nil {
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
}
Expand Down
32 changes: 22 additions & 10 deletions engine/api/event/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ type KafkaClient struct {

// KafkaConfig handles all config to connect to Kafka
type KafkaConfig struct {
Enabled bool
BrokerAddresses string
User string
Password string
Version string
Topic string
MaxMessageByte int
DisableTLS bool
DisableSASL bool
ClientID string
Enabled bool `toml:"enabled" json:"-" default:"false" mapstructure:"enabled"`
BrokerAddresses string `toml:"broker" json:"-" mapstructure:"broker"`
User string `toml:"user" json:"-" mapstructure:"user"`
Password string `toml:"password" json:"-" mapstructure:"password"`
Version string `toml:"version" json:"-" mapstructure:"version"`
Topic string `toml:"topic" json:"-" mapstructure:"topic"`
MaxMessageByte int `toml:"maxMessageByte" json:"-" mapstructure:"maxMessageByte"`
DisableTLS bool `toml:"disableTLS" json:"-" mapstructure:"disableTLS"`
DisableSASL bool `toml:"disableSASL" json:"-" mapstructure:"disableSASL"`
ClientID string `toml:"clientID" json:"-" mapstructure:"clientID"`
}

// initialize returns broker, isInit and err if
Expand All @@ -43,6 +43,18 @@ func (c *KafkaClient) initialize(ctx context.Context, options interface{}) (Brok
conf.Topic == "" {
return nil, fmt.Errorf("initKafka> Invalid Kafka Configuration")
}

if conf.MaxMessageByte == 0 {
conf.MaxMessageByte = 10000000
}

if conf.ClientID == "" {
conf.ClientID = conf.User
}
if conf.ClientID == "" {
conf.ClientID = "cds"
}

c.options = conf

if err := c.initProducer(); err != nil {
Expand Down
18 changes: 0 additions & 18 deletions engine/api/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@ func (api *API) postIntegrationModelHandler() service.Handler {

if m.Public {
go propagatePublicIntegrationModel(ctx, api.mustDB(), api.Cache, *m, getAPIConsumer(ctx))
if m.Event {
if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil {
return sdk.WrapError(err, "error while resetting public integrations")
}
}
}

return service.WriteJSON(w, m, http.StatusCreated)
Expand Down Expand Up @@ -127,12 +122,6 @@ func (api *API) putIntegrationModelHandler() service.Handler {
api.GoRoutines.Exec(ctx, "propagatePublicIntegrationModel", func(ctx context.Context) {
propagatePublicIntegrationModel(ctx, api.mustDB(), api.Cache, *m, getAPIConsumer(ctx))
})

if m.Event {
if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil {
return sdk.WrapError(err, "error while resetting public integrations")
}
}
}

return service.WriteJSON(w, m, http.StatusOK)
Expand Down Expand Up @@ -231,13 +220,6 @@ func (api *API) deleteIntegrationModelHandler() service.Handler {
return sdk.WrapError(err, "Unable to commit tx")
}

if old.Event && old.Public {
// reset outside the transaction
if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil {
return sdk.WrapError(err, "error while resetting public integrations")
}
}

return nil
}
}
40 changes: 0 additions & 40 deletions engine/api/integration/dao_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,46 +39,6 @@ func LoadModels(db gorp.SqlExecutor) ([]sdk.IntegrationModel, error) {
return res, nil
}

func LoadPublicModelsByTypeWithDecryption(db gorp.SqlExecutor, integrationType *sdk.IntegrationType) ([]sdk.IntegrationModel, error) {
q := "SELECT * from integration_model WHERE public = true"
if integrationType != nil {
switch *integrationType {
case sdk.IntegrationTypeEvent:
q += " AND integration_model.event = true"
case sdk.IntegrationTypeCompute:
q += " AND integration_model.compute = true"
case sdk.IntegrationTypeStorage:
q += " AND integration_model.storage = true"
case sdk.IntegrationTypeHook:
q += " AND integration_model.hook = true"
case sdk.IntegrationTypeDeployment:
q += " AND integration_model.deployment = true"
}
}

query := gorpmapping.NewQuery(q)
var pms integrationModelSlice

if err := gorpmapping.GetAll(context.Background(), db, query, &pms, gorpmapping.GetOptions.WithDecryption); err != nil {
return nil, err
}

var res []sdk.IntegrationModel
for _, pm := range pms {
isValid, err := gorpmapping.CheckSignature(pm, pm.Signature)
if err != nil {
return nil, err
}
if !isValid {
log.Error(context.Background(), "integration.LoadModel> model %d data corrupted", pm.ID)
continue
}
res = append(res, pm.IntegrationModel)
}

return res, nil
}

// LoadModel Load a integration model by its ID
func LoadModel(ctx context.Context, db gorp.SqlExecutor, modelID int64) (sdk.IntegrationModel, error) {
query := gorpmapping.NewQuery("SELECT * from integration_model where id = $1").Args(modelID)
Expand Down
4 changes: 0 additions & 4 deletions engine/api/integration/dao_model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,4 @@ func TestCRUDModel(t *testing.T) {
require.NoError(t, err)

assert.True(t, len(models) > 1)

filter := sdk.IntegrationTypeEvent
_, err = LoadPublicModelsByTypeWithDecryption(db, &filter)
require.NoError(t, err)
}
11 changes: 6 additions & 5 deletions tests/05_sc_workflow_event_kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ testcases:

- name: import integrations
steps:
- script: {{.cdsctl}} -f {{.cdsctl.config}} admin integration-model import ./fixtures/integrations/kafka-public.yml
- script: {{.cdsctl}} -f {{.cdsctl.config}} project integration import ITSCWRKFLW18 ./fixtures/integrations/kafka.yml
- script: {{.cdsctl}} -f {{.cdsctl.config}} project integration import ITSCWRKFLW18 ./fixtures/integrations/kafka-hook.yml

Expand All @@ -24,30 +23,32 @@ testcases:
- name: check if consumer kafka is started
steps:
- script: sleep 15 && {{.cdsctl}} -f {{.cdsctl.config}} admin services status --type=hooks|grep 'Hook Kafka Consumers' | grep OK
retry: 30
delay: 10
timeout: 30

- name: run workflow by sending a kafka event
steps:
- script: kafkacat -b localhost:9092 -t test.hook -T -P -l ./fixtures/ITSCWRKFLW18/input-kafka.json
timeout: 30

- name: check event in topic test.eventsproject
steps:
- script: kafkacat -b localhost:9092 -t test.eventsproject -C -o -1 -c 1
assertions:
- result.code ShouldEqual 0
- "result.systemoutjson.type_event ShouldContainSubstring sdk.EventRunWorkflowJob"
retry: 30
retry: 10
delay: 10
timeout: 100

- name: check event in topic test.eventspublic
steps:
- script: kafkacat -b localhost:9092 -t test.eventspublic -C -o -1 -c 1
assertions:
- result.code ShouldEqual 0
- "result.systemoutjson.type_event ShouldContainSubstring sdk.EventRunWorkflowJob"
retry: 30
retry: 10
delay: 10
timeout: 100

- name: check workflow
steps:
Expand Down
Loading