Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): feed a specific kafka topic with jobs #6070

Merged
merged 11 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,7 @@ type Configuration struct {
DefaultRetentionPolicy string `toml:"defaultRetentionPolicy" comment:"Default rule for workflow run retention policy, this rule can be overridden on each workflow.\n Example: 'return run_days_before < 365' keeps runs for one year." json:"defaultRetentionPolicy" default:"return run_days_before < 365"`
DisablePurgeDeletion bool `toml:"disablePurgeDeletion" comment:"Allow you to disable the deletion part of the purge. Workflow run will only be marked as delete" json:"disablePurgeDeletion" default:"false"`
} `toml:"workflow" comment:"######################\n 'Workflow' global configuration \n######################" json:"workflow"`
EventBus struct {
GlobalKafka event.KafkaConfig `toml:"globalKafka" default:"false" json:"globalKafka" mapstructure:"globalKafka"`
} `toml:"events" comment:"######################\n Event bus configuration \n######################" json:"events" mapstructure:"events"`
EventBus event.Config `toml:"events" comment:"######################\n Event bus configuration \n######################" json:"events" mapstructure:"events"`
}

// DefaultValues is the struc for API Default configuration default values
Expand Down Expand Up @@ -686,7 +684,7 @@ func (a *API) Serve(ctx context.Context) error {
}

log.Info(ctx, "Initializing event broker...")
if err := event.Initialize(ctx, a.mustDB(), a.Cache, a.Config.EventBus.GlobalKafka); err != nil {
if err := event.Initialize(ctx, a.mustDB(), a.Cache, &a.Config.EventBus); err != nil {
log.Error(ctx, "error while initializing event system: %s", err)
}

Expand Down
2 changes: 1 addition & 1 deletion engine/api/application/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
func TestLoadByNameAsAdmin(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.Background(), db.DbMap, cache)
_ = event.Initialize(context.Background(), db.DbMap, cache, nil)
key := sdk.RandomString(10)
proj := assets.InsertTestProject(t, db, cache, key, key)
app := sdk.Application{
Expand Down
2 changes: 1 addition & 1 deletion engine/api/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestUpdateAsCodeApplicationHandler(t *testing.T) {
api, db, tsURL := newTestServer(t)

event.OverridePubSubKey("events_pubsub_test")
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache))
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache, nil))
require.NoError(t, api.initWebsocket("events_pubsub_test"))

u, jwt := assets.InsertAdminUser(t, db)
Expand Down
2 changes: 1 addition & 1 deletion engine/api/environment_ascode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestUpdateAsCodeEnvironmentHandler(t *testing.T) {
api, db, tsURL := newTestServer(t)

event.OverridePubSubKey("events_pubsub_test")
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache))
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache, nil))
require.NoError(t, api.initWebsocket("events_pubsub_test"))

u, jwt := assets.InsertAdminUser(t, db)
Expand Down
70 changes: 53 additions & 17 deletions engine/api/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package event

import (
"context"
"errors"
"encoding/json"
"fmt"
"os"
"strconv"
Expand All @@ -18,12 +18,18 @@ import (
"github.com/ovh/cds/sdk/namesgenerator"
)

type Config struct {
GlobalKafka KafkaConfig `toml:"globalKafka" json:"globalKafka" mapstructure:"globalKafka"`
JobSummaryKafka KafkaConfig `toml:"jobSummaryKafka" json:"jobSummaryKafka" mapstructure:"jobSummaryKafka"`
}

// cache with go cache
var (
brokersConnectionCache = gocache.New(10*time.Minute, 6*time.Hour)
hostname, cdsname string
brokers []Broker
globalBroker Broker
jobSummaryBroker Broker
subscribers []chan<- sdk.Event
)

Expand All @@ -34,7 +40,7 @@ func init() {
// Broker event typed
type Broker interface {
initialize(ctx context.Context, options interface{}) (Broker, error)
sendEvent(event *sdk.Event) error
sendEvent(event interface{}) error
status() string
close(ctx context.Context)
}
Expand Down Expand Up @@ -100,11 +106,7 @@ func ResetEventIntegration(ctx context.Context, db gorp.SqlExecutor, eventIntegr
}

// Initialize initializes event system
func Initialize(ctx context.Context, db *gorp.DbMap, cache Store, glolbalKafkaConfigs ...KafkaConfig) error {
if len(glolbalKafkaConfigs) > 1 {
return errors.New("only one global kafka global config is supported")
}

func Initialize(ctx context.Context, db *gorp.DbMap, cache Store, config *Config) error {
store = cache
var err error
hostname, err = os.Hostname()
Expand All @@ -121,12 +123,26 @@ func Initialize(ctx context.Context, db *gorp.DbMap, cache Store, glolbalKafkaCo
}
}

if len(glolbalKafkaConfigs) == 1 && glolbalKafkaConfigs[0].BrokerAddresses != "" {
globalBroker, err = getBroker(ctx, "kafka", glolbalKafkaConfigs[0])
if config == nil {
return nil
}

if config.GlobalKafka.BrokerAddresses != "" {
globalBroker, err = getBroker(ctx, "kafka", config.GlobalKafka)
if err != nil {
ctx = log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "unable to init builtin kafka broker from config: %v", err)
}
log.Info(ctx, "client to broker %s:%s ready", config.GlobalKafka.BrokerAddresses, config.GlobalKafka.Topic)
fsamin marked this conversation as resolved.
Show resolved Hide resolved
}

if config.JobSummaryKafka.BrokerAddresses != "" {
jobSummaryBroker, err = getBroker(ctx, "kafka", config.JobSummaryKafka)
if err != nil {
ctx = log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "unable to init builtin kafka broker from config: %v", err)
}
log.Info(ctx, "client to broker %s:%s ready", config.JobSummaryKafka.BrokerAddresses, config.GlobalKafka.Topic)
fsamin marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
Expand All @@ -150,15 +166,35 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
return
}

for _, s := range subscribers {
s <- e
// Filter "EventJobSummary" for globalKafka Broker
if e.EventType != "sdk.EventJobSummary" {
for _, s := range subscribers {
s <- e
}
if globalBroker != nil {
log.Info(ctx, "sending event %q to global broker", e.EventType)
if err := globalBroker.sendEvent(&e); err != nil {
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
}
}
continue
// we don't send other events than EventJobSummary to users kafka
}

if globalBroker != nil {
log.Info(ctx, "sending event %q to global broker", e.EventType)
if err := globalBroker.sendEvent(&e); err != nil {
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
// We now only send "EventJobSummary" in the jobSummary Broker in project integrations
// if the users send specific kafka integration on their workflows
var ejs sdk.EventJobSummary
if err := json.Unmarshal(e.Payload, &ejs); err != nil {
ctx := log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "unable to unmarshal EventJobSummary")
continue
}
if jobSummaryBroker != nil {
log.Info(ctx, "sending event %+v to job summary broker", ejs)
if err := jobSummaryBroker.sendEvent(ejs); err != nil {
log.Error(ctx, "Error while sending message %s: %v", string(e.Payload), err)
}
continue
}

for _, eventIntegrationID := range e.EventIntegrationsID {
Expand Down Expand Up @@ -198,8 +234,8 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {

// Send into external brokers
log.Info(ctx, "sending event %q to %s", e.EventType, brokerConfig.BrokerAddresses)
if err := broker.sendEvent(&e); err != nil {
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
if err := broker.sendEvent(ejs); err != nil {
log.Warn(ctx, "Error while sending message %s: %v", string(e.Payload), err)
}
}
}
Expand Down
13 changes: 6 additions & 7 deletions engine/api/event/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (
"strings"

"github.com/Shopify/sarama"
"github.com/pkg/errors"
"github.com/rockbears/log"

"github.com/ovh/cds/sdk"
)

// KafkaClient enbeddes the Kafka connecion
Expand Down Expand Up @@ -104,15 +103,15 @@ func (c *KafkaClient) initProducer() error {
}

// sendOnKafkaTopic send a hook on a topic kafka
func (c *KafkaClient) sendEvent(event *sdk.Event) error {
data, errm := json.Marshal(event)
if errm != nil {
return errm
func (c *KafkaClient) sendEvent(event interface{}) error {
data, err := json.Marshal(event)
if err != nil {
return errors.WithStack(err)
}

msg := &sarama.ProducerMessage{Topic: c.options.Topic, Value: sarama.ByteEncoder(data)}
if _, _, err := c.producer.SendMessage(msg); err != nil {
return err
return errors.WithStack(err)
}
return nil
}
Expand Down
18 changes: 18 additions & 0 deletions engine/api/event/publish_workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,21 @@ func PublishWorkflowNodeJobRun(ctx context.Context, pkey string, wr sdk.Workflow
}
publishRunWorkflow(ctx, e, data)
}

func PublishEventJobSummary(ctx context.Context, e sdk.EventJobSummary, integrations []sdk.WorkflowProjectIntegration) {
eventIntegrationsID := make([]int64, len(integrations))
for i, eventIntegration := range integrations {
eventIntegrationsID[i] = eventIntegration.ProjectIntegrationID
}

bts, _ := json.Marshal(e)
event := sdk.Event{
Timestamp: time.Now(),
Hostname: hostname,
CDSName: cdsname,
EventType: fmt.Sprintf("%T", e),
Payload: bts,
EventIntegrationsID: eventIntegrationsID,
}
_ = publishEvent(ctx, event)
}
2 changes: 1 addition & 1 deletion engine/api/pipeline/pipeline_importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func testImportUpdate(t *testing.T, db gorp.SqlExecutor, store cache.Store, tt t
func TestImportUpdate(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.Background(), db.DbMap, cache)
_ = event.Initialize(context.Background(), db.DbMap, cache, nil)

if db == nil {
t.FailNow()
Expand Down
2 changes: 1 addition & 1 deletion engine/api/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestUpdateAsCodePipelineHandler(t *testing.T) {
api, db, tsURL := newTestServer(t)

event.OverridePubSubKey("events_pubsub_test")
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache))
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache, nil))
require.NoError(t, api.initWebsocket("events_pubsub_test"))

u, jwt := assets.InsertAdminUser(t, db)
Expand Down
2 changes: 1 addition & 1 deletion engine/api/project/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestExist(t *testing.T) {
func TestLoadAllByRepo(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.Background(), db.DbMap, cache)
_ = event.Initialize(context.Background(), db.DbMap, cache, nil)

app, _ := application.LoadByName(db, "TestLoadAllByRepo", "TestLoadAllByRepo")
if app != nil {
Expand Down
2 changes: 1 addition & 1 deletion engine/api/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func TestWebsocketNoEventLoose(t *testing.T) {

pubSubKey := "events_pubsub_test_" + sdk.RandomString(10)
event.OverridePubSubKey(pubSubKey)
require.NoError(t, event.Initialize(context.TODO(), api.mustDB(), api.Cache))
require.NoError(t, event.Initialize(context.TODO(), api.mustDB(), api.Cache, nil))
require.NoError(t, api.initWebsocket(pubSubKey))

_, jwt := assets.InsertAdminUser(t, db)
Expand Down
12 changes: 6 additions & 6 deletions engine/api/workflow/dao_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestCanBeRun(t *testing.T) {
func TestPurgeWorkflowRun(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.TODO(), db.DbMap, cache)
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)

mockVCSSservice, _ := assets.InsertService(t, db, "TestManualRunBuildParameterMultiApplication", sdk.TypeVCS)
defer func() {
Expand Down Expand Up @@ -257,7 +257,7 @@ vcs_ssh_key: proj-blabla
func TestPurgeWorkflowRunWithRunningStatus(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.TODO(), db.DbMap, cache)
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)

u, _ := assets.InsertAdminUser(t, db)
consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser)
Expand Down Expand Up @@ -346,7 +346,7 @@ func TestPurgeWorkflowRunWithRunningStatus(t *testing.T) {
func TestPurgeWorkflowRunWithOneSuccessWorkflowRun(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.TODO(), db.DbMap, cache)
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)

mockVCSSservice, _ := assets.InsertService(t, db, "TestManualRunBuildParameterMultiApplication", sdk.TypeVCS)
defer func() {
Expand Down Expand Up @@ -546,7 +546,7 @@ vcs_ssh_key: proj-blabla
func TestPurgeWorkflowRunWithNoSuccessWorkflowRun(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.TODO(), db.DbMap, cache)
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)

mockVCSSservice, _ := assets.InsertService(t, db, "TestManualRunBuildParameterMultiApplication", sdk.TypeVCS)
defer func() {
Expand Down Expand Up @@ -718,7 +718,7 @@ vcs_ssh_key: proj-blabla
func TestPurgeWorkflowRunWithoutTags(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.TODO(), db.DbMap, cache)
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)

u, _ := assets.InsertAdminUser(t, db)
consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser)
Expand Down Expand Up @@ -804,7 +804,7 @@ func TestPurgeWorkflowRunWithoutTags(t *testing.T) {
func TestPurgeWorkflowRunWithoutTagsBiggerHistoryLength(t *testing.T) {
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)

_ = event.Initialize(context.TODO(), db.DbMap, cache)
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)

u, _ := assets.InsertAdminUser(t, db)
consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser)
Expand Down
4 changes: 2 additions & 2 deletions engine/api/workflow_ascode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestPostUpdateWorkflowAsCodeHandler(t *testing.T) {
api, db, tsURL := newTestServer(t)

event.OverridePubSubKey("events_pubsub_test")
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache))
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache, nil))
require.NoError(t, api.initWebsocket("events_pubsub_test"))

u, jwt := assets.InsertAdminUser(t, db)
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestPostMigrateWorkflowAsCodeHandler(t *testing.T) {
api, db, tsURL := newTestServer(t)

event.OverridePubSubKey("events_pubsub_test")
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache))
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache, nil))
require.NoError(t, api.initWebsocket("events_pubsub_test"))

u, jwt := assets.InsertAdminUser(t, db)
Expand Down
Loading