Skip to content

Commit

Permalink
fix: public integration is now global
Browse files Browse the repository at this point in the history
Signed-off-by: francois  samin <[email protected]>
  • Loading branch information
fsamin committed Jan 21, 2022
1 parent e834a00 commit de73b71
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 40 deletions.
5 changes: 4 additions & 1 deletion engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ type Configuration struct {
DefaultRetentionPolicy string `toml:"defaultRetentionPolicy" comment:"Default rule for workflow run retention policy, this rule can be overridden on each workflow.\n Example: 'return run_days_before < 365' keeps runs for one year." json:"defaultRetentionPolicy" default:"return run_days_before < 365"`
DisablePurgeDeletion bool `toml:"disablePurgeDeletion" comment:"Allow you to disable the deletion part of the purge. Workflow run will only be marked as delete" json:"disablePurgeDeletion" default:"false"`
} `toml:"workflow" comment:"######################\n 'Workflow' global configuration \n######################" json:"workflow"`
EventBus struct {
GlobalKafka event.KafkaConfig `toml:"globalKafka" default:"false" json:"globalKafka"`
} `toml:"events" comment:"######################\n Event bus configuration \n######################" json:"events"`
}

// DefaultValues is the struc for API Default configuration default values
Expand Down Expand Up @@ -688,7 +691,7 @@ func (a *API) Serve(ctx context.Context) error {
}

log.Info(ctx, "Initializing event broker...")
if err := event.Initialize(ctx, a.mustDB(), a.Cache); err != nil {
if err := event.Initialize(ctx, a.mustDB(), a.Cache, a.Config.EventBus.GlobalKafka); err != nil {
log.Error(ctx, "error while initializing event system: %s", err)
}

Expand Down
33 changes: 28 additions & 5 deletions engine/api/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package event

import (
"context"
"errors"
"fmt"
"os"
"strconv"
Expand All @@ -18,10 +19,13 @@ import (
)

// cache with go cache
var brokersConnectionCache = gocache.New(10*time.Minute, 6*time.Hour)
var hostname, cdsname string
var brokers []Broker
var subscribers []chan<- sdk.Event
var (
brokersConnectionCache = gocache.New(10*time.Minute, 6*time.Hour)
hostname, cdsname string
brokers []Broker
globalBroker Broker
subscribers []chan<- sdk.Event
)

func init() {
subscribers = make([]chan<- sdk.Event, 0)
Expand Down Expand Up @@ -96,7 +100,11 @@ func ResetEventIntegration(ctx context.Context, db gorp.SqlExecutor, eventIntegr
}

// Initialize initializes event system
func Initialize(ctx context.Context, db *gorp.DbMap, cache Store) error {
func Initialize(ctx context.Context, db *gorp.DbMap, cache Store, glolbalKafkaConfigs ...KafkaConfig) error {
if len(glolbalKafkaConfigs) > 1 {
return errors.New("only one global kafka global config is supported")
}

store = cache
var err error
hostname, err = os.Hostname()
Expand All @@ -113,6 +121,14 @@ func Initialize(ctx context.Context, db *gorp.DbMap, cache Store) error {
}
}

if len(glolbalKafkaConfigs) == 1 && glolbalKafkaConfigs[0].BrokerAddresses != "" {
globalBroker, err = getBroker(ctx, "kafka", glolbalKafkaConfigs[0])
if err != nil {
ctx = log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "unable to init builtin kafka broker from config: %v", err)
}
}

return nil
}

Expand All @@ -138,6 +154,13 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
s <- e
}

if globalBroker != nil {
log.Info(ctx, "sending event %q to global broker", e.EventType)
if err := globalBroker.sendEvent(&e); err != nil {
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
}
}

for _, eventIntegrationID := range e.EventIntegrationsID {
brokerConnectionKey := strconv.FormatInt(eventIntegrationID, 10)
brokerConnection, ok := brokersConnectionCache.Get(brokerConnectionKey)
Expand Down
32 changes: 22 additions & 10 deletions engine/api/event/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ type KafkaClient struct {

// KafkaConfig handles all config to connect to Kafka
type KafkaConfig struct {
Enabled bool
BrokerAddresses string
User string
Password string
Version string
Topic string
MaxMessageByte int
DisableTLS bool
DisableSASL bool
ClientID string
Enabled bool `toml:"enabled" json:"-" commented:"true" default:"true"`
BrokerAddresses string `toml:"broker" json:"-" commented:"true"`
User string `toml:"user" json:"-" commented:"true"`
Password string `toml:"password" json:"-" commented:"true"`
Version string `toml:"version" json:"-" commented:"true"`
Topic string `toml:"topic" json:"-" commented:"true"`
MaxMessageByte int `toml:"maxMessageByte" json:"-" commented:"true"`
DisableTLS bool `toml:"disableTLS" json:"-" commented:"true"`
DisableSASL bool `toml:"disableSASL" json:"-" commented:"true"`
ClientID string `toml:"clientID" json:"-" commented:"true"`
}

// initialize returns broker, isInit and err if
Expand All @@ -43,6 +43,18 @@ func (c *KafkaClient) initialize(ctx context.Context, options interface{}) (Brok
conf.Topic == "" {
return nil, fmt.Errorf("initKafka> Invalid Kafka Configuration")
}

if conf.MaxMessageByte == 0 {
conf.MaxMessageByte = 10000000
}

if conf.ClientID == "" {
conf.ClientID = conf.User
}
if conf.ClientID == "" {
conf.ClientID = "cds"
}

c.options = conf

if err := c.initProducer(); err != nil {
Expand Down
1 change: 0 additions & 1 deletion tests/05_sc_workflow_event_kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ testcases:

- name: import integrations
steps:
- script: {{.cdsctl}} -f {{.cdsctl.config}} admin integration-model import ./fixtures/integrations/kafka-public.yml
- script: {{.cdsctl}} -f {{.cdsctl.config}} project integration import ITSCWRKFLW18 ./fixtures/integrations/kafka.yml
- script: {{.cdsctl}} -f {{.cdsctl.config}} project integration import ITSCWRKFLW18 ./fixtures/integrations/kafka-hook.yml

Expand Down
23 changes: 0 additions & 23 deletions tests/fixtures/integrations/kafka-public.yml

This file was deleted.

0 comments on commit de73b71

Please sign in to comment.