Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): feed a specific kafka topic with jobs #6070

Merged
merged 11 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,7 @@ type Configuration struct {
DefaultRetentionPolicy string `toml:"defaultRetentionPolicy" comment:"Default rule for workflow run retention policy, this rule can be overridden on each workflow.\n Example: 'return run_days_before < 365' keeps runs for one year." json:"defaultRetentionPolicy" default:"return run_days_before < 365"`
DisablePurgeDeletion bool `toml:"disablePurgeDeletion" comment:"Allow you to disable the deletion part of the purge. Workflow run will only be marked as delete" json:"disablePurgeDeletion" default:"false"`
} `toml:"workflow" comment:"######################\n 'Workflow' global configuration \n######################" json:"workflow"`
EventBus struct {
GlobalKafka event.KafkaConfig `toml:"globalKafka" default:"false" json:"globalKafka" mapstructure:"globalKafka"`
} `toml:"events" comment:"######################\n Event bus configuration \n######################" json:"events" mapstructure:"events"`
EventBus event.Config `toml:"events" comment:"######################\n Event bus configuration \n######################" json:"events" mapstructure:"events"`
}

// DefaultValues is the struc for API Default configuration default values
Expand Down Expand Up @@ -686,7 +684,7 @@ func (a *API) Serve(ctx context.Context) error {
}

log.Info(ctx, "Initializing event broker...")
if err := event.Initialize(ctx, a.mustDB(), a.Cache, a.Config.EventBus.GlobalKafka); err != nil {
if err := event.Initialize(ctx, a.mustDB(), a.Cache, &a.Config.EventBus); err != nil {
log.Error(ctx, "error while initializing event system: %s", err)
}

Expand Down
2 changes: 1 addition & 1 deletion engine/api/application/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
func TestLoadByNameAsAdmin(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.Background(), db.DbMap, cache)
_ = event.Initialize(context.Background(), db.DbMap, cache, nil)
key := sdk.RandomString(10)
proj := assets.InsertTestProject(t, db, cache, key, key)
app := sdk.Application{
Expand Down
2 changes: 1 addition & 1 deletion engine/api/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestUpdateAsCodeApplicationHandler(t *testing.T) {
api, db, tsURL := newTestServer(t)

event.OverridePubSubKey("events_pubsub_test")
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache))
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache, nil))
require.NoError(t, api.initWebsocket("events_pubsub_test"))

u, jwt := assets.InsertAdminUser(t, db)
Expand Down
2 changes: 1 addition & 1 deletion engine/api/environment_ascode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestUpdateAsCodeEnvironmentHandler(t *testing.T) {
api, db, tsURL := newTestServer(t)

event.OverridePubSubKey("events_pubsub_test")
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache))
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache, nil))
require.NoError(t, api.initWebsocket("events_pubsub_test"))

u, jwt := assets.InsertAdminUser(t, db)
Expand Down
84 changes: 59 additions & 25 deletions engine/api/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package event

import (
"context"
"errors"
"encoding/json"
"fmt"
"os"
"strconv"
Expand All @@ -15,15 +15,22 @@ import (

"github.com/ovh/cds/engine/api/integration"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/event"
"github.com/ovh/cds/sdk/namesgenerator"
)

type Config struct {
GlobalKafka event.KafkaConfig `toml:"globalKafka" json:"globalKafka" mapstructure:"globalKafka"`
JobSummaryKafka event.KafkaConfig `toml:"jobSummaryKafka" json:"jobSummaryKafka" mapstructure:"jobSummaryKafka"`
}

// cache with go cache
var (
brokersConnectionCache = gocache.New(10*time.Minute, 6*time.Hour)
hostname, cdsname string
brokers []Broker
globalBroker Broker
jobSummaryBroker Broker
subscribers []chan<- sdk.Event
)

Expand All @@ -34,7 +41,7 @@ func init() {
// Broker event typed
type Broker interface {
initialize(ctx context.Context, options interface{}) (Broker, error)
sendEvent(event *sdk.Event) error
sendEvent(event interface{}) error
status() string
close(ctx context.Context)
}
Expand All @@ -48,8 +55,8 @@ func getBroker(ctx context.Context, t string, option interface{}) (Broker, error
return nil, fmt.Errorf("invalid Broker Type %s", t)
}

func getKafkaConfig(cfg sdk.IntegrationConfig) KafkaConfig {
kafkaCfg := KafkaConfig{
func getKafkaConfig(cfg sdk.IntegrationConfig) event.KafkaConfig {
kafkaCfg := event.KafkaConfig{
Enabled: true,
BrokerAddresses: cfg["broker url"].Value,
Topic: cfg["topic"].Value,
Expand Down Expand Up @@ -100,11 +107,7 @@ func ResetEventIntegration(ctx context.Context, db gorp.SqlExecutor, eventIntegr
}

// Initialize initializes event system
func Initialize(ctx context.Context, db *gorp.DbMap, cache Store, glolbalKafkaConfigs ...KafkaConfig) error {
if len(glolbalKafkaConfigs) > 1 {
return errors.New("only one global kafka global config is supported")
}

func Initialize(ctx context.Context, db *gorp.DbMap, cache Store, config *Config) error {
store = cache
var err error
hostname, err = os.Hostname()
Expand All @@ -121,11 +124,27 @@ func Initialize(ctx context.Context, db *gorp.DbMap, cache Store, glolbalKafkaCo
}
}

if len(glolbalKafkaConfigs) == 1 && glolbalKafkaConfigs[0].BrokerAddresses != "" {
globalBroker, err = getBroker(ctx, "kafka", glolbalKafkaConfigs[0])
if config == nil {
return nil
}

if config.GlobalKafka.BrokerAddresses != "" {
globalBroker, err = getBroker(ctx, "kafka", config.GlobalKafka)
if err != nil {
ctx = log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "unable to init builtin kafka broker from config: %v", err)
} else {
log.Info(ctx, "client to broker %s:%s ready", config.GlobalKafka.BrokerAddresses, config.GlobalKafka.Topic)
}
}

if config.JobSummaryKafka.BrokerAddresses != "" {
jobSummaryBroker, err = getBroker(ctx, "kafka", config.JobSummaryKafka)
if err != nil {
ctx = log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "unable to init builtin kafka broker from config: %v", err)
} else {
log.Info(ctx, "client to broker %s:%s ready", config.JobSummaryKafka.BrokerAddresses, config.GlobalKafka.Topic)
}
}

Expand All @@ -150,32 +169,47 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
return
}

for _, s := range subscribers {
s <- e
// Filter "EventJobSummary" for globalKafka Broker
if e.EventType != "sdk.EventJobSummary" {
for _, s := range subscribers {
s <- e
}
if globalBroker != nil {
log.Info(ctx, "sending event %q to global broker", e.EventType)
if err := globalBroker.sendEvent(&e); err != nil {
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
}
}
continue
// we don't send other events than EventJobSummary to users kafka
}

if globalBroker != nil {
log.Info(ctx, "sending event %q to global broker", e.EventType)
if err := globalBroker.sendEvent(&e); err != nil {
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
// We now only send "EventJobSummary" in the jobSummary Broker in project integrations
// if the users send specific kafka integration on their workflows
var ejs sdk.EventJobSummary
if err := json.Unmarshal(e.Payload, &ejs); err != nil {
ctx := log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "unable to unmarshal EventJobSummary")
continue
}
if jobSummaryBroker != nil {
log.Info(ctx, "sending event %+v to job summary broker", ejs)
if err := jobSummaryBroker.sendEvent(ejs); err != nil {
log.Error(ctx, "Error while sending message %s: %v", string(e.Payload), err)
}
}

for _, eventIntegrationID := range e.EventIntegrationsID {
brokerConnectionKey := strconv.FormatInt(eventIntegrationID, 10)
brokerConnection, ok := brokersConnectionCache.Get(brokerConnectionKey)
var brokerConfig KafkaConfig
var brokerConfig event.KafkaConfig
if !ok {
projInt, err := integration.LoadProjectIntegrationByIDWithClearPassword(ctx, db, eventIntegrationID)
if err != nil {
log.Error(ctx, "Event.DequeueEvent> Cannot load project integration for project %s and id %d and type event: %v", e.ProjectKey, eventIntegrationID, err)
continue
}

if projInt.Model.Public {
continue
}

kafkaCfg := getKafkaConfig(projInt.Config)
kafkaBroker, err := getBroker(ctx, "kafka", kafkaCfg)
if err != nil {
Expand All @@ -197,9 +231,9 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
}

// Send into external brokers
log.Info(ctx, "sending event %q to %s", e.EventType, brokerConfig.BrokerAddresses)
if err := broker.sendEvent(&e); err != nil {
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
log.Info(ctx, "sending event %q to integration broker: %s", e.EventType, brokerConfig.BrokerAddresses)
if err := broker.sendEvent(ejs); err != nil {
log.Warn(ctx, "Error while sending message %s: %v", string(e.Payload), err)
}
}
}
Expand Down
32 changes: 9 additions & 23 deletions engine/api/event/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,20 @@ import (
"strings"

"github.com/Shopify/sarama"
"github.com/ovh/cds/sdk/event"
"github.com/pkg/errors"
"github.com/rockbears/log"

"github.com/ovh/cds/sdk"
)

// KafkaClient enbeddes the Kafka connecion
type KafkaClient struct {
options KafkaConfig
options event.KafkaConfig
producer sarama.SyncProducer
}

// KafkaConfig handles all config to connect to Kafka
type KafkaConfig struct {
Enabled bool `toml:"enabled" json:"-" default:"false" mapstructure:"enabled"`
BrokerAddresses string `toml:"broker" json:"-" mapstructure:"broker"`
User string `toml:"user" json:"-" mapstructure:"user"`
Password string `toml:"password" json:"-" mapstructure:"password"`
Version string `toml:"version" json:"-" mapstructure:"version"`
Topic string `toml:"topic" json:"-" mapstructure:"topic"`
MaxMessageByte int `toml:"maxMessageByte" json:"-" mapstructure:"maxMessageByte"`
DisableTLS bool `toml:"disableTLS" json:"-" mapstructure:"disableTLS"`
DisableSASL bool `toml:"disableSASL" json:"-" mapstructure:"disableSASL"`
ClientID string `toml:"clientID" json:"-" mapstructure:"clientID"`
}

// initialize returns broker, isInit and err if
func (c *KafkaClient) initialize(ctx context.Context, options interface{}) (Broker, error) {
conf, ok := options.(KafkaConfig)
conf, ok := options.(event.KafkaConfig)
if !ok {
return nil, fmt.Errorf("invalid Kafka Initialization")
}
Expand Down Expand Up @@ -104,15 +90,15 @@ func (c *KafkaClient) initProducer() error {
}

// sendOnKafkaTopic send a hook on a topic kafka
func (c *KafkaClient) sendEvent(event *sdk.Event) error {
data, errm := json.Marshal(event)
if errm != nil {
return errm
func (c *KafkaClient) sendEvent(event interface{}) error {
data, err := json.Marshal(event)
if err != nil {
return errors.WithStack(err)
}

msg := &sarama.ProducerMessage{Topic: c.options.Topic, Value: sarama.ByteEncoder(data)}
if _, _, err := c.producer.SendMessage(msg); err != nil {
return err
return errors.WithStack(err)
}
return nil
}
Expand Down
18 changes: 18 additions & 0 deletions engine/api/event/publish_workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,21 @@ func PublishWorkflowNodeJobRun(ctx context.Context, pkey string, wr sdk.Workflow
}
publishRunWorkflow(ctx, e, data)
}

func PublishEventJobSummary(ctx context.Context, e sdk.EventJobSummary, integrations []sdk.WorkflowProjectIntegration) {
eventIntegrationsID := make([]int64, len(integrations))
for i, eventIntegration := range integrations {
eventIntegrationsID[i] = eventIntegration.ProjectIntegrationID
}

bts, _ := json.Marshal(e)
event := sdk.Event{
Timestamp: time.Now(),
Hostname: hostname,
CDSName: cdsname,
EventType: fmt.Sprintf("%T", e),
Payload: bts,
EventIntegrationsID: eventIntegrationsID,
}
_ = publishEvent(ctx, event)
}
2 changes: 1 addition & 1 deletion engine/api/pipeline/pipeline_importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func testImportUpdate(t *testing.T, db gorp.SqlExecutor, store cache.Store, tt t
func TestImportUpdate(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.Background(), db.DbMap, cache)
_ = event.Initialize(context.Background(), db.DbMap, cache, nil)

if db == nil {
t.FailNow()
Expand Down
2 changes: 1 addition & 1 deletion engine/api/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestUpdateAsCodePipelineHandler(t *testing.T) {
api, db, tsURL := newTestServer(t)

event.OverridePubSubKey("events_pubsub_test")
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache))
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache, nil))
require.NoError(t, api.initWebsocket("events_pubsub_test"))

u, jwt := assets.InsertAdminUser(t, db)
Expand Down
2 changes: 1 addition & 1 deletion engine/api/project/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestExist(t *testing.T) {
func TestLoadAllByRepo(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.Background(), db.DbMap, cache)
_ = event.Initialize(context.Background(), db.DbMap, cache, nil)

app, _ := application.LoadByName(db, "TestLoadAllByRepo", "TestLoadAllByRepo")
if app != nil {
Expand Down
2 changes: 1 addition & 1 deletion engine/api/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func TestWebsocketNoEventLoose(t *testing.T) {

pubSubKey := "events_pubsub_test_" + sdk.RandomString(10)
event.OverridePubSubKey(pubSubKey)
require.NoError(t, event.Initialize(context.TODO(), api.mustDB(), api.Cache))
require.NoError(t, event.Initialize(context.TODO(), api.mustDB(), api.Cache, nil))
require.NoError(t, api.initWebsocket(pubSubKey))

_, jwt := assets.InsertAdminUser(t, db)
Expand Down
12 changes: 6 additions & 6 deletions engine/api/workflow/dao_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestCanBeRun(t *testing.T) {
func TestPurgeWorkflowRun(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.TODO(), db.DbMap, cache)
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)

mockVCSSservice, _ := assets.InsertService(t, db, "TestManualRunBuildParameterMultiApplication", sdk.TypeVCS)
defer func() {
Expand Down Expand Up @@ -257,7 +257,7 @@ vcs_ssh_key: proj-blabla
func TestPurgeWorkflowRunWithRunningStatus(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.TODO(), db.DbMap, cache)
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)

u, _ := assets.InsertAdminUser(t, db)
consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser)
Expand Down Expand Up @@ -346,7 +346,7 @@ func TestPurgeWorkflowRunWithRunningStatus(t *testing.T) {
func TestPurgeWorkflowRunWithOneSuccessWorkflowRun(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.TODO(), db.DbMap, cache)
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)

mockVCSSservice, _ := assets.InsertService(t, db, "TestManualRunBuildParameterMultiApplication", sdk.TypeVCS)
defer func() {
Expand Down Expand Up @@ -546,7 +546,7 @@ vcs_ssh_key: proj-blabla
func TestPurgeWorkflowRunWithNoSuccessWorkflowRun(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.TODO(), db.DbMap, cache)
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)

mockVCSSservice, _ := assets.InsertService(t, db, "TestManualRunBuildParameterMultiApplication", sdk.TypeVCS)
defer func() {
Expand Down Expand Up @@ -718,7 +718,7 @@ vcs_ssh_key: proj-blabla
func TestPurgeWorkflowRunWithoutTags(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.TODO(), db.DbMap, cache)
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)

u, _ := assets.InsertAdminUser(t, db)
consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser)
Expand Down Expand Up @@ -804,7 +804,7 @@ func TestPurgeWorkflowRunWithoutTags(t *testing.T) {
func TestPurgeWorkflowRunWithoutTagsBiggerHistoryLength(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.TODO(), db.DbMap, cache)
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)

u, _ := assets.InsertAdminUser(t, db)
consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser)
Expand Down
Loading