-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: check for experiments and auto operarions before subscribing to pubsub #626
Conversation
947e7dc
to
9ae10e4
Compare
@@ -0,0 +1,4 @@ | |||
select count(*) | |||
from experiment | |||
where deleted = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The conditions must match the conditions when listing the experiments.
The status and the 2-day buffer after the experiment has stopped.
Use this as a reference.
https://github.com/bucketeer-io/bucketeer/blob/main/pkg/eventpersisterdwh/persister/evaluation_event.go#L227-L279
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you try this?
The current condition will get too much data unnecessarily.
SELECT (SELECT count(*) FROM experiment WHERE status = 1) + (SELECT count(*) FROM experiment WHERE status = 2 and DATE_ADD(from_unixtime(stop_at), INTERVAL 2 DAY) > NOW() ) as count;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, let me try it!
But one thing I need to mention is that performing calculations in the WHERE
clause when execute this query, it will not use the index that we created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can use this sub query to count stopped and still in 2-days buffer experiments:
SELECT count(*)
FROM experiment
WHERE status = 2
AND stop_at > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 2 DAY))
So we will not do calculation on stop_at
column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
@@ -0,0 +1,4 @@ | |||
SELECT (SELECT count(*) FROM experiment WHERE status = 1) + (SELECT count(*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you format the query?
SELECT
(
SELECT
count(*)
FROM
experiment
WHERE
status = 1
) + (
SELECT
count(*)
FROM
experiment
WHERE
status = 2
AND stop_at > UNIX_TIMESTAMP(
DATE_SUB(NOW(), INTERVAL 2 DAY)
)
) as count;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure!
@@ -173,23 +173,29 @@ func (p *persister) Run() error { | |||
continue | |||
} | |||
if exist { | |||
p.logger.Debug("There are not been triggered auto ops rules") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: There are untriggered auto ops rules
} | ||
} else { | ||
p.logger.Debug("There are no not been triggered auto ops rules") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: There are no untriggered auto ops rules
@@ -204,7 +208,9 @@ func (p *PersisterDWH) Run() error { | |||
} | |||
timer.Reset(p.opts.checkInterval) | |||
case <-p.ctx.Done(): | |||
p.logger.Info("Context is done") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change it to debug level.
if p.IsRunning() { | ||
p.logger.Info("Puller is running, stop pulling messages") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
p.rateLimitedPuller = puller.NewRateLimitedPuller(p.puller, p.opts.maxMPS) | ||
p.group = errgroup.Group{} | ||
} | ||
} | ||
timer.Reset(p.opts.checkInterval) | ||
case <-p.ctx.Done(): | ||
p.logger.Info("Context is done") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change it to debug level.
if p.IsRunning() { | ||
p.logger.Info("Puller is running, stop pulling messages") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
@@ -177,21 +225,65 @@ func (p *PersisterDWH) Check(ctx context.Context) health.Status { | |||
p.logger.Error("Unhealthy due to context Done is closed", zap.Error(p.ctx.Err())) | |||
return health.Unhealthy | |||
default: | |||
if p.group.FinishedCount() > 0 { | |||
p.logger.Error("Unhealthy", zap.Int32("FinishedCount", p.group.FinishedCount())) | |||
if p.group.FailedCount() > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the persister will be run periodically, it's essential to monitor the FailedCount
instead of the FinishedCount
to prevent it from stopping after a health check.
@@ -161,21 +210,65 @@ func (p *persister) Check(ctx context.Context) health.Status { | |||
p.logger.Error("Unhealthy due to context Done is closed", zap.Error(p.ctx.Err())) | |||
return health.Unhealthy | |||
default: | |||
if p.group.FinishedCount() > 0 { | |||
p.logger.Error("Unhealthy", zap.Int32("FinishedCount", p.group.FinishedCount())) | |||
if p.group.FailedCount() > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
1d4c16b
to
1142e19
Compare
CLA Assistant Lite bot All Contributors have signed the CLA. |
/cla sign |
d3e2277
to
17f6ad0
Compare
for { | ||
select { | ||
case <-timer.C: | ||
// check if there are running experiment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// check if there are running experiment | |
// check if there are running experiments |
if exist { | ||
p.logger.Debug("There are running experiments") | ||
if !p.IsRunning() { | ||
p.group = errgroup.Group{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be below err = p.createNewPuller()
. Otherwise, if it fails to create the puller, it will leave the p.group
set.
p.logger.Debug("Puller is running, stop pulling messages") | ||
p.unsubscribe() | ||
} | ||
// delete subscription if subscription exists |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// delete subscription if subscription exists | |
// delete subscription if it exists |
select { | ||
case <-subscription: | ||
p.isRunning = true | ||
p.logger.Debug("Puller start subscribing") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p.logger.Debug("Puller start subscribing") | |
p.logger.Debug("Puller started subscribing") |
p.unsubscribe() | ||
return nil | ||
} | ||
p.logger.Error("Puller pulling messages error", zap.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p.logger.Error("Puller pulling messages error", zap.Error(err)) | |
p.logger.Error("Failed to pull messages", zap.Error(err)) |
err := p.rateLimitedPuller.Run(ctx) | ||
if err != nil { | ||
if strings.Contains(err.Error(), pubsubErrNotFound) { | ||
p.logger.Debug("Subscription does not exist", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p.logger.Debug("Subscription does not exist", | |
p.logger.Debug("Failed to pull messages. Subscription does not exist", |
} | ||
err := p.group.Wait() | ||
if err != nil { | ||
p.logger.Error("Running puller error", zap.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p.logger.Error("Running puller error", zap.Error(err)) | |
p.logger.Error("Failed while running pull messages", zap.Error(err)) |
@@ -246,7 +251,45 @@ func (c *Client) subscription(id, topicID string) (*pubsub.Subscription, error) | |||
if err == nil { | |||
return sub, nil | |||
} | |||
if strings.Contains(err.Error(), rpcErrAlreadyExists) { | |||
c.logger.Debug("Subscription already exists, use it directly", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c.logger.Debug("Subscription already exists, use it directly", | |
c.logger.Debug("Subscription already exists. Using the current one", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add this line before the continue here?
lastErr = err
We should save the last error. Otherwise, when it returns the timeout error, it won't have any error log.
pkg/pubsub/pubsub.go
Outdated
sub := c.Client.Subscription(id) | ||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | ||
defer cancel() | ||
err := sub.Delete(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err := sub.Delete(ctx) | |
return sub.Delete(ctx) |
b97e6ef
to
cdea535
Compare
experiment | ||
WHERE | ||
status = 2 | ||
AND stop_at > UNIX_TIMESTAMP( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ubisoft-potato, can you fix the indent in these two lines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -0,0 +1,4 @@ | |||
select count(*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To keep the consistency with the other queries, please use uppercase only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
Part of #612
Summary
persisters
: