Skip to content

Commit

Permalink
perf: check for experiments and auto operarions before subscribing to…
Browse files Browse the repository at this point in the history
… pubsub (#626)
  • Loading branch information
Ubisoft-potato authored Feb 13, 2024
1 parent bf9b698 commit b4f98d3
Show file tree
Hide file tree
Showing 23 changed files with 815 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ spec:
env:
- name: BUCKETEER_EVENT_PERSISTER_DWH_SERVICE_NAME
value: "{{ template "event-persister-evaluation-events-dwh.fullname" . }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_MYSQL_USER
value: "{{ .Values.env.mysqlUser }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_MYSQL_PASS
value: "{{ .Values.env.mysqlPass }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_MYSQL_HOST
value: "{{ .Values.env.mysqlHost }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_MYSQL_PORT
value: "{{ .Values.env.mysqlPort }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_MYSQL_DB_NAME
value: "{{ .Values.env.mysqlDbName }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_PROJECT
value: "{{ .Values.env.project }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_FEATURE_SERVICE
Expand All @@ -73,6 +83,8 @@ spec:
value: "{{ .Values.env.flushSize }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_FLUSH_INTERVAL
value: "{{ .Values.env.flushInterval }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_CHECK_INTERVAL
value: "{{ .Values.env.checkInterval }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_FLUSH_TIMEOUT
value: "{{ .Values.env.flushTimeout }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_PULLER_NUM_GOROUTINES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@ env:
port: 9090
metricsPort: 9002
timezone: UTC
mysqlUser:
mysqlPass:
mysqlHost:
mysqlPort: 3306
mysqlDbName:
# egress services
experimentService: localhost:9001
featureService: localhost:9001
# option
maxMps: "1000"
numWorkers: 5
flushSize: 100
checkInterval: 60s
flushInterval: 2s
flushTimeout: 30s
# pubsub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ spec:
value: "{{ template "event-persister-evaluation-events-ops.fullname" . }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_PROJECT
value: "{{ .Values.env.project }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_MYSQL_USER
value: "{{ .Values.env.mysqlUser }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_MYSQL_PASS
value: "{{ .Values.env.mysqlPass }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_MYSQL_HOST
value: "{{ .Values.env.mysqlHost }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_MYSQL_PORT
value: "{{ .Values.env.mysqlPort }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_MYSQL_DB_NAME
value: "{{ .Values.env.mysqlDbName }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_FEATURE_SERVICE
value: "{{ .Values.env.featureService }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_AUTO_OPS_SERVICE
Expand All @@ -71,6 +81,8 @@ spec:
value: "{{ .Values.env.flushSize }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_FLUSH_INTERVAL
value: "{{ .Values.env.flushInterval }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_CHECK_INTERVAL
value: "{{ .Values.env.checkInterval }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_FLUSH_TIMEOUT
value: "{{ .Values.env.flushTimeout }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_PULLER_NUM_GOROUTINES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ env:
# egress services
featureService: localhost:9001
autoOpsService: localhost:9001
mysqlUser:
mysqlPass:
mysqlHost:
mysqlPort: 3306
mysqlDbName:
# rpc
port: 9090
metricsPort: 9002
Expand All @@ -32,6 +37,7 @@ env:
maxMps: "1000"
numWorkers: 5
flushSize: 100
checkInterval: 60s
flushInterval: 2s
flushTimeout: 30s

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ spec:
env:
- name: BUCKETEER_EVENT_PERSISTER_DWH_SERVICE_NAME
value: "{{ template "event-persister-goal-events-dwh.fullname" . }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_MYSQL_USER
value: "{{ .Values.env.mysqlUser }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_MYSQL_PASS
value: "{{ .Values.env.mysqlPass }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_MYSQL_HOST
value: "{{ .Values.env.mysqlHost }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_MYSQL_PORT
value: "{{ .Values.env.mysqlPort }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_MYSQL_DB_NAME
value: "{{ .Values.env.mysqlDbName }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_PROJECT
value: "{{ .Values.env.project }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_FEATURE_SERVICE
Expand All @@ -73,6 +83,8 @@ spec:
value: "{{ .Values.env.flushSize }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_FLUSH_INTERVAL
value: "{{ .Values.env.flushInterval }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_CHECK_INTERVAL
value: "{{ .Values.env.checkInterval }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_FLUSH_TIMEOUT
value: "{{ .Values.env.flushTimeout }}"
- name: BUCKETEER_EVENT_PERSISTER_DWH_PULLER_NUM_GOROUTINES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ env:
metricsPort: 9002
timezone: UTC
# egress services
mysqlUser:
mysqlPass:
mysqlHost:
mysqlPort: 3306
mysqlDbName:
experimentService: localhost:9001
featureService: localhost:9001
# option
maxMps: "1000"
numWorkers: 5
flushSize: 100
checkInterval: 60s
flushInterval: 2s
flushTimeout: 30s
# pubsub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ spec:
value: "{{ template "event-persister-goal-events-ops.fullname" . }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_PROJECT
value: "{{ .Values.env.project }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_MYSQL_USER
value: "{{ .Values.env.mysqlUser }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_MYSQL_PASS
value: "{{ .Values.env.mysqlPass }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_MYSQL_HOST
value: "{{ .Values.env.mysqlHost }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_MYSQL_PORT
value: "{{ .Values.env.mysqlPort }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_MYSQL_DB_NAME
value: "{{ .Values.env.mysqlDbName }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_FEATURE_SERVICE
value: "{{ .Values.env.featureService }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_AUTO_OPS_SERVICE
Expand All @@ -71,6 +81,8 @@ spec:
value: "{{ .Values.env.flushSize }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_FLUSH_INTERVAL
value: "{{ .Values.env.flushInterval }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_CHECK_INTERVAL
value: "{{ .Values.env.checkInterval }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_FLUSH_TIMEOUT
value: "{{ .Values.env.flushTimeout }}"
- name: BUCKETEER_EVENT_PERSISTER_OPS_PULLER_NUM_GOROUTINES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ env:
# egress services
featureService: localhost:9001
autoOpsService: localhost:9001
mysqlUser:
mysqlPass:
mysqlHost:
mysqlPort: 3306
mysqlDbName:
# rpc
port: 9090
metricsPort: 9002
Expand All @@ -32,6 +37,7 @@ env:
maxMps: "1000"
numWorkers: 5
flushSize: 100
checkInterval: 60s
flushInterval: 2s
flushTimeout: 30s

Expand Down
64 changes: 54 additions & 10 deletions pkg/eventpersisterdwh/cmd/server/eventpersisterdwh.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import (
"github.com/bucketeer-io/bucketeer/pkg/locale"
"github.com/bucketeer-io/bucketeer/pkg/metrics"
"github.com/bucketeer-io/bucketeer/pkg/pubsub"
"github.com/bucketeer-io/bucketeer/pkg/pubsub/puller"
redisv3 "github.com/bucketeer-io/bucketeer/pkg/redis/v3"
"github.com/bucketeer-io/bucketeer/pkg/rpc"
"github.com/bucketeer-io/bucketeer/pkg/rpc/client"
"github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql"
)

const (
Expand All @@ -52,6 +52,7 @@ type server struct {
maxMPS *int
numWorkers *int
flushSize *int
checkInterval *time.Duration
flushInterval *time.Duration
flushTimeout *time.Duration
timezone *string
Expand All @@ -77,6 +78,12 @@ type server struct {
redisAddr *string
redisPoolMaxIdle *int
redisPoolMaxActive *int
// mysql
mysqlUser *string
mysqlPass *string
mysqlHost *string
mysqlPort *int
mysqlDbName *string
}

func RegisterServerCommand(r cli.CommandRegistry, p cli.ParentCommand) cli.Command {
Expand All @@ -92,6 +99,10 @@ func RegisterServerCommand(r cli.CommandRegistry, p cli.ParentCommand) cli.Comma
"flush-size",
"Maximum number of messages to batch before writing to datastore.",
).Default("50").Int(),
checkInterval: cmd.Flag(
"check-interval",
"Interval to check if there are experiments to be handled.",
).Required().Duration(),
flushInterval: cmd.Flag("flush-interval", "Maximum interval between two flushes.").Default("5s").Duration(),
flushTimeout: cmd.Flag("flush-timeout", "Maximum time for a flush to finish.").Default("20s").Duration(),
timezone: cmd.Flag("timezone", "Time zone").Required().String(),
Expand Down Expand Up @@ -132,14 +143,24 @@ func RegisterServerCommand(r cli.CommandRegistry, p cli.ParentCommand) cli.Comma
"redis-pool-max-active",
"Maximum number of connections allocated by the pool at a given time.",
).Required().Int(),
mysqlUser: cmd.Flag("mysql-user", "MySQL user.").Required().String(),
mysqlPass: cmd.Flag("mysql-pass", "MySQL password.").Required().String(),
mysqlHost: cmd.Flag("mysql-host", "MySQL host.").Required().String(),
mysqlPort: cmd.Flag("mysql-port", "MySQL port.").Required().Int(),
mysqlDbName: cmd.Flag("mysql-db-name", "MySQL database name.").Required().String(),
}
r.RegisterCommand(server)
return server
}

func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.Logger) error {
registerer := metrics.DefaultRegisterer()
puller, err := s.createPuller(ctx, logger)
// mysqlClient
mysqlClient, err := s.createMySQLClient(ctx, registerer, logger)
if err != nil {
return err
}
pubsubClient, err := s.createPubsubClient(ctx, logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -202,12 +223,19 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
}

p := persister.NewPersisterDWH(
puller,
pubsubClient,
registerer,
writer,
mysqlClient,
*s.subscription,
*s.topic,
*s.pullerNumGoroutines,
*s.pullerMaxOutstandingMessages,
*s.pullerMaxOutstandingBytes,
persister.WithMaxMPS(*s.maxMPS),
persister.WithNumWorkers(*s.numWorkers),
persister.WithFlushSize(*s.flushSize),
persister.WithCheckInterval(*s.checkInterval),
persister.WithFlushInterval(*s.flushInterval),
persister.WithFlushTimeout(*s.flushTimeout),
persister.WithMetrics(registerer),
Expand Down Expand Up @@ -247,19 +275,18 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
return nil
}

func (s *server) createPuller(ctx context.Context, logger *zap.Logger) (puller.Puller, error) {
func (s *server) createPubsubClient(
ctx context.Context,
logger *zap.Logger,
) (*pubsub.Client, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
client, err := pubsub.NewClient(ctx, *s.project, pubsub.WithLogger(logger))
pubsubClient, err := pubsub.NewClient(ctx, *s.project, pubsub.WithLogger(logger))
if err != nil {
logger.Error("Failed to create PubSub client", zap.Error(err))
return nil, err
}
return client.CreatePuller(*s.subscription, *s.topic,
pubsub.WithNumGoroutines(*s.pullerNumGoroutines),
pubsub.WithMaxOutstandingMessages(*s.pullerMaxOutstandingMessages),
pubsub.WithMaxOutstandingBytes(*s.pullerMaxOutstandingBytes),
)
return pubsubClient, nil
}

func (s *server) newBigQueryWriter(
Expand Down Expand Up @@ -304,3 +331,20 @@ func (s *server) newBigQueryWriter(
}
return writer, err
}

func (s *server) createMySQLClient(
ctx context.Context,
registerer metrics.Registerer,
logger *zap.Logger,
) (mysql.Client, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
return mysql.NewClient(
ctx,
*s.mysqlUser, *s.mysqlPass, *s.mysqlHost,
*s.mysqlPort,
*s.mysqlDbName,
mysql.WithLogger(logger),
mysql.WithMetrics(registerer),
)
}
Loading

0 comments on commit b4f98d3

Please sign in to comment.