-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
returns error messages when trigger reload with http #1848
Changes from 18 commits
7d8f413
3dea587
3ae2be2
fef63bc
44f0e05
7324528
1f6f3f5
9048e0c
dbfca4e
cd8d601
e6f5d97
084682f
9583043
daca661
6def413
94a653a
315e3c3
f397bed
56334f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -20,12 +20,14 @@ import ( | |||||||||
"github.com/opentracing/opentracing-go" | ||||||||||
"github.com/pkg/errors" | ||||||||||
"github.com/prometheus/client_golang/prometheus" | ||||||||||
"github.com/prometheus/client_golang/prometheus/promauto" | ||||||||||
"github.com/prometheus/common/model" | ||||||||||
"github.com/prometheus/common/route" | ||||||||||
"github.com/prometheus/prometheus/pkg/labels" | ||||||||||
"github.com/prometheus/prometheus/promql" | ||||||||||
"github.com/prometheus/prometheus/rules" | ||||||||||
"github.com/prometheus/prometheus/storage/tsdb" | ||||||||||
tsdberrors "github.com/prometheus/prometheus/tsdb/errors" | ||||||||||
"github.com/prometheus/prometheus/util/strutil" | ||||||||||
"github.com/thanos-io/thanos/pkg/alert" | ||||||||||
"github.com/thanos-io/thanos/pkg/block/metadata" | ||||||||||
|
@@ -200,6 +202,55 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { | |||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
// RuleMetrics defines thanos rule metrics. | ||||||||||
type RuleMetrics struct { | ||||||||||
configSuccess prometheus.Gauge | ||||||||||
configSuccessTime prometheus.Gauge | ||||||||||
duplicatedQuery prometheus.Counter | ||||||||||
rulesLoaded *prometheus.GaugeVec | ||||||||||
ruleEvalWarnings *prometheus.CounterVec | ||||||||||
} | ||||||||||
|
||||||||||
func newRuleMetrics(reg *prometheus.Registry) *RuleMetrics { | ||||||||||
m := new(RuleMetrics) | ||||||||||
|
||||||||||
factory := promauto.With(reg) | ||||||||||
m.configSuccess = factory.NewGauge(prometheus.GaugeOpts{ | ||||||||||
Name: "thanos_rule_config_last_reload_successful", | ||||||||||
Help: "Whether the last configuration reload attempt was successful.", | ||||||||||
}) | ||||||||||
m.configSuccessTime = factory.NewGauge(prometheus.GaugeOpts{ | ||||||||||
Name: "thanos_rule_config_last_reload_success_timestamp_seconds", | ||||||||||
Help: "Timestamp of the last successful configuration reload.", | ||||||||||
}) | ||||||||||
m.duplicatedQuery = factory.NewCounter(prometheus.CounterOpts{ | ||||||||||
Name: "thanos_rule_duplicated_query_addresses_total", | ||||||||||
Help: "The number of times a duplicated query addresses is detected from the different configs in rule.", | ||||||||||
}) | ||||||||||
m.rulesLoaded = factory.NewGaugeVec( | ||||||||||
prometheus.GaugeOpts{ | ||||||||||
Name: "thanos_rule_loaded_rules", | ||||||||||
Help: "Loaded rules partitioned by file and group.", | ||||||||||
}, | ||||||||||
[]string{"strategy", "file", "group"}, | ||||||||||
) | ||||||||||
m.ruleEvalWarnings = factory.NewCounterVec( | ||||||||||
prometheus.CounterOpts{ | ||||||||||
Name: "thanos_rule_evaluation_with_warnings_total", | ||||||||||
Help: "The total number of rule evaluation that were successful but had warnings which can indicate partial error.", | ||||||||||
}, []string{"strategy"}, | ||||||||||
) | ||||||||||
m.ruleEvalWarnings.WithLabelValues(strings.ToLower(storepb.PartialResponseStrategy_ABORT.String())) | ||||||||||
m.ruleEvalWarnings.WithLabelValues(strings.ToLower(storepb.PartialResponseStrategy_WARN.String())) | ||||||||||
|
||||||||||
return m | ||||||||||
} | ||||||||||
|
||||||||||
// reloadErr handlers webhandler and returns err message. | ||||||||||
type reloadErr struct { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would just remove this type and just use |
||||||||||
errMsg chan error | ||||||||||
} | ||||||||||
|
||||||||||
// runRule runs a rule evaluation component that continuously evaluates alerting and recording | ||||||||||
// rules. It sends alert notifications and writes TSDB data for results like a regular Prometheus server. | ||||||||||
func runRule( | ||||||||||
|
@@ -239,39 +290,7 @@ func runRule( | |||||||||
dnsSDResolver string, | ||||||||||
comp component.Component, | ||||||||||
) error { | ||||||||||
configSuccess := prometheus.NewGauge(prometheus.GaugeOpts{ | ||||||||||
Name: "thanos_rule_config_last_reload_successful", | ||||||||||
Help: "Whether the last configuration reload attempt was successful.", | ||||||||||
}) | ||||||||||
configSuccessTime := prometheus.NewGauge(prometheus.GaugeOpts{ | ||||||||||
Name: "thanos_rule_config_last_reload_success_timestamp_seconds", | ||||||||||
Help: "Timestamp of the last successful configuration reload.", | ||||||||||
}) | ||||||||||
duplicatedQuery := prometheus.NewCounter(prometheus.CounterOpts{ | ||||||||||
Name: "thanos_rule_duplicated_query_addresses_total", | ||||||||||
Help: "The number of times a duplicated query addresses is detected from the different configs in rule", | ||||||||||
}) | ||||||||||
rulesLoaded := prometheus.NewGaugeVec( | ||||||||||
prometheus.GaugeOpts{ | ||||||||||
Name: "thanos_rule_loaded_rules", | ||||||||||
Help: "Loaded rules partitioned by file and group", | ||||||||||
}, | ||||||||||
[]string{"strategy", "file", "group"}, | ||||||||||
) | ||||||||||
ruleEvalWarnings := prometheus.NewCounterVec( | ||||||||||
prometheus.CounterOpts{ | ||||||||||
Name: "thanos_rule_evaluation_with_warnings_total", | ||||||||||
Help: "The total number of rule evaluation that were successful but had warnings which can indicate partial error.", | ||||||||||
}, []string{"strategy"}, | ||||||||||
) | ||||||||||
ruleEvalWarnings.WithLabelValues(strings.ToLower(storepb.PartialResponseStrategy_ABORT.String())) | ||||||||||
ruleEvalWarnings.WithLabelValues(strings.ToLower(storepb.PartialResponseStrategy_WARN.String())) | ||||||||||
|
||||||||||
reg.MustRegister(configSuccess) | ||||||||||
reg.MustRegister(configSuccessTime) | ||||||||||
reg.MustRegister(duplicatedQuery) | ||||||||||
reg.MustRegister(rulesLoaded) | ||||||||||
reg.MustRegister(ruleEvalWarnings) | ||||||||||
metrics := newRuleMetrics(reg) | ||||||||||
|
||||||||||
var queryCfg []query.Config | ||||||||||
if len(queryConfigYAML) > 0 { | ||||||||||
|
@@ -435,7 +454,7 @@ func runRule( | |||||||||
opts := opts | ||||||||||
opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"strategy": strings.ToLower(s.String())}, reg) | ||||||||||
opts.Context = ctx | ||||||||||
opts.QueryFunc = queryFunc(logger, queryClients, duplicatedQuery, ruleEvalWarnings, s) | ||||||||||
opts.QueryFunc = queryFunc(logger, queryClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings, s) | ||||||||||
|
||||||||||
mgr := rules.NewManager(&opts) | ||||||||||
ruleMgr.SetRuleManager(s, mgr) | ||||||||||
|
@@ -472,52 +491,32 @@ func runRule( | |||||||||
} | ||||||||||
|
||||||||||
// Handle reload and termination interrupts. | ||||||||||
reload := make(chan struct{}, 1) | ||||||||||
reloadWebhandler := make(chan reloadErr) | ||||||||||
{ | ||||||||||
cancel := make(chan struct{}) | ||||||||||
reload <- struct{}{} // Initial reload. | ||||||||||
|
||||||||||
ctx, cancel := context.WithCancel(context.Background()) | ||||||||||
g.Add(func() error { | ||||||||||
// Initialize rules. | ||||||||||
if err := reloadRules(logger, ruleFiles, ruleMgr, evalInterval, metrics); err != nil { | ||||||||||
level.Error(logger).Log("msg", "initialize rules failed", "err", err) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the initial load, we can actually fail and return the combined multierror. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kakkoyun The maintainer doesn't agree the to stop the everything. please check the comment here. #1848 (comment) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't have strong opinions about it, both works for me. In any case, I've pinged @bwplotka on the thread for the final decision. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, initial can fail indeed (: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cool, happy with that. |
||||||||||
} | ||||||||||
for { | ||||||||||
select { | ||||||||||
case <-cancel: | ||||||||||
return errors.New("canceled") | ||||||||||
case <-reload: | ||||||||||
case <-reloadSignal: | ||||||||||
} | ||||||||||
|
||||||||||
level.Debug(logger).Log("msg", "configured rule files", "files", strings.Join(ruleFiles, ",")) | ||||||||||
var files []string | ||||||||||
for _, pat := range ruleFiles { | ||||||||||
fs, err := filepath.Glob(pat) | ||||||||||
if err := reloadRules(logger, ruleFiles, ruleMgr, evalInterval, metrics); err != nil { | ||||||||||
level.Error(logger).Log("msg", "reload rules by sighup failed", "err", err) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not only sighup TBH, can be HTTP There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bwplotka Yes. These code hanlde sighup only. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be nice to treat signal and HTTP reload exactly the same way (as it was before). Why not? We could reuse There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@bwplotka If so, we need another channel to receive the error message for webhandler. So a new struct should wrap reloadSignal and errMsg. Maybe it is redundant for the current implementation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a new struct to handler webhandler. Since we have a reloadSignal as an input paramter, we need select reloadSignal always. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LGTM! |
||||||||||
} | ||||||||||
case reloadMsg := <-reloadWebhandler: | ||||||||||
err := reloadRules(logger, ruleFiles, ruleMgr, evalInterval, metrics) | ||||||||||
if err != nil { | ||||||||||
// The only error can be a bad pattern. | ||||||||||
level.Error(logger).Log("msg", "retrieving rule files failed. Ignoring file.", "pattern", pat, "err", err) | ||||||||||
continue | ||||||||||
level.Error(logger).Log("msg", "reload rules by webhandler failed", "err", err) | ||||||||||
} | ||||||||||
|
||||||||||
files = append(files, fs...) | ||||||||||
} | ||||||||||
|
||||||||||
level.Info(logger).Log("msg", "reload rule files", "numFiles", len(files)) | ||||||||||
|
||||||||||
if err := ruleMgr.Update(evalInterval, files); err != nil { | ||||||||||
configSuccess.Set(0) | ||||||||||
level.Error(logger).Log("msg", "reloading rules failed", "err", err) | ||||||||||
continue | ||||||||||
} | ||||||||||
|
||||||||||
configSuccess.Set(1) | ||||||||||
configSuccessTime.SetToCurrentTime() | ||||||||||
|
||||||||||
rulesLoaded.Reset() | ||||||||||
for _, group := range ruleMgr.RuleGroups() { | ||||||||||
rulesLoaded.WithLabelValues(group.PartialResponseStrategy.String(), group.File(), group.Name()).Set(float64(len(group.Rules()))) | ||||||||||
reloadMsg.errMsg <- err | ||||||||||
case <-ctx.Done(): | ||||||||||
return ctx.Err() | ||||||||||
} | ||||||||||
|
||||||||||
} | ||||||||||
}, func(error) { | ||||||||||
close(cancel) | ||||||||||
cancel() | ||||||||||
}) | ||||||||||
} | ||||||||||
|
||||||||||
|
@@ -564,7 +563,11 @@ func runRule( | |||||||||
} | ||||||||||
|
||||||||||
router.WithPrefix(webRoutePrefix).Post("/-/reload", func(w http.ResponseWriter, r *http.Request) { | ||||||||||
reload <- struct{}{} | ||||||||||
reloadMsg := reloadErr{make(chan error)} | ||||||||||
reloadWebhandler <- reloadMsg | ||||||||||
if err := <-reloadMsg.errMsg; err != nil { | ||||||||||
http.Error(w, err.Error(), http.StatusInternalServerError) | ||||||||||
} | ||||||||||
}) | ||||||||||
|
||||||||||
flagsMap := map[string]string{ | ||||||||||
|
@@ -758,3 +761,42 @@ func addDiscoveryGroups(g *run.Group, c *http_util.Client, interval time.Duratio | |||||||||
cancel() | ||||||||||
}) | ||||||||||
} | ||||||||||
|
||||||||||
func reloadRules(logger log.Logger, | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function has too many parameters, it makes it harder to read. And most of the parameters are metrics, consider using a struct to collect metrics as it had done in thanos/cmd/thanos/downsample.go Lines 52 to 55 in 021f623
and pass it around. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kakkoyun Committed. Please help to review. Thanks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks good now 👍 |
||||||||||
ruleFiles []string, | ||||||||||
ruleMgr *thanosrule.Manager, | ||||||||||
evalInterval time.Duration, | ||||||||||
metrics *RuleMetrics) error { | ||||||||||
level.Debug(logger).Log("msg", "configured rule files", "files", strings.Join(ruleFiles, ",")) | ||||||||||
var ( | ||||||||||
errs tsdberrors.MultiError | ||||||||||
files []string | ||||||||||
) | ||||||||||
for _, pat := range ruleFiles { | ||||||||||
fs, err := filepath.Glob(pat) | ||||||||||
if err != nil { | ||||||||||
// The only error can be a bad pattern. | ||||||||||
errs.Add(errors.Wrapf(err, "retrieving rule files failed. Ignoring file. pattern %s", pat)) | ||||||||||
continue | ||||||||||
} | ||||||||||
|
||||||||||
files = append(files, fs...) | ||||||||||
} | ||||||||||
|
||||||||||
level.Info(logger).Log("msg", "reload rule files", "numFiles", len(files)) | ||||||||||
|
||||||||||
if err := ruleMgr.Update(evalInterval, files); err != nil { | ||||||||||
metrics.configSuccess.Set(0) | ||||||||||
errs.Add(errors.Wrap(err, "reloading rules failed")) | ||||||||||
return errs.Err() | ||||||||||
} | ||||||||||
|
||||||||||
metrics.configSuccess.Set(1) | ||||||||||
metrics.configSuccessTime.Set(float64(time.Now().UnixNano()) / 1e9) | ||||||||||
|
||||||||||
metrics.rulesLoaded.Reset() | ||||||||||
for _, group := range ruleMgr.RuleGroups() { | ||||||||||
metrics.rulesLoaded.WithLabelValues(group.PartialResponseStrategy.String(), group.File(), group.Name()).Set(float64(len(group.Rules()))) | ||||||||||
} | ||||||||||
return errs.Err() | ||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Thanks!