Skip to content

Commit

Permalink
feat: add prometheus url and time created to the prometheus cache and…
Browse files Browse the repository at this point in the history
… invalidate on different prometheus url also (#80)

* feat: add prometheus url and time created to the prometheus cache and invalidate on different prometheus url also

Signed-off-by: Martin Chodur <[email protected]>

* feat: improve cache and prometheus live validations

Signed-off-by: Martin Chodur <[email protected]>

---------

Signed-off-by: Martin Chodur <[email protected]>
  • Loading branch information
FUSAKLA authored Jul 19, 2024
1 parent 00a6a77 commit 18e9894
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 95 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Changed: :warning: **Renamed `hasValidPartialStrategy` to `hasValidPartialResponseStrategy`** as it was documented so it is actually a fix
- Changed: :warning: **Disallow special rule file fields of Thanos, Mimir or Loki by default**
To enable them, you need to set some of the new flags described below
- Changed: The Prometheus results cache format has changed to reduce it's size and improve performance. **Delete the old cache file** before upgrade.
Also now if the cache contains time of creation and URL of the Prometheus it has data for. From now on, if the URL does not match, the case is pruned.
- Added: New flags `--support-thanos`, `--support-mimir`, `--support-loki` to enable special rule file fields of Thanos, Mimir or Loki
- Added: :tada: **Support for validation of Loki rules!** Now you can validate Loki rules as well. First two validators are:
- `expressionIsValidLogQL` to check if the expression is a valid LogQL query
Expand All @@ -20,6 +22,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added: Support for the `query_offset` field in the rule group
- Added: New validator `expressionIsValidPromQL` to check if the expression is a valid PromQL query
- Added: Support for bearer token authentication in the `prometheus` section of the config using the `bearerTokenFile` field or by specifying the `PROMETHEUS_BEARER_TOKEN` env variable.
- Added: `maximumMatchingSeries` option to the `expressionSelectorsMatchesAnything` validator to check maximum number of series any selector can match.
- Added: new config options to the prometheus section of config:
- `queryOffset`: Specify offset(delay) of the query (useful for consistency if using remote write for example).
- `queryLookback`: How long into the past to look in queries supporting time range (just metadata queries for now).
- Fixed: Loading glob patterns in the file paths to rules
- Fixed: Params of the `expressionCanBeEvaluated` validator were ignored, this is now fixed.
- Updated: Prometheus and other dependencies
- CI: Updated Github actions for golangcilint and goreleaser

Expand Down
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,14 @@ prometheus:
bearerTokenFile: bearer_token.txt
# OPTIONAL Timeout for any request on the Prometheus instance
timeout: 30s
# OPTIONAL: name of the file to save cache of the Prometheus calls for speedup
# OPTIONAL name of the file to save cache of the Prometheus calls for speedup
cacheFile: .promruval_cache.json
# OPTIONAL: maximum age how old the cache can be to be used
# OPTIONAL maximum age how old the cache can be to be used
maxCacheAge: 1h
# OPTIONAL offset(delay) of the query evaluation time (useful for consistency if using remote write for example).
queryOffset: 1m
# OPTIONAL how long into the past to look in queries supporting time range (just metadata queries for now).
queryLookback: 20m
validationRules:
# Name of the validation rule.
Expand Down
9 changes: 7 additions & 2 deletions docs/validations.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ Possibly you can set maximum allowed query execution time and maximum number of

```yaml
params:
timeSeriesLimit: 100
evaluationDurationLimit: 1m
timeSeriesLimit: 100 # Optional, maximum series returned by the query
evaluationDurationLimit: 1m # Optional, maximum duration of the query evaluation
```

#### `expressionUsesExistingLabels`
Expand All @@ -298,6 +298,11 @@ Fails if any used label is not present in the configured Prometheus instance.
Verifies if any of the selectors in the expression (eg `up{foo="bar"}`) matches actual data in the configured Prometheus
instance.

```yaml
params:
maximumMatchingSeries: 1000 # Optional, maximum number of matching series for single selector used in expression
```

#### `expressionWithNoMetricName`

Fails if an expression doesn't use an explicit metric name (also if used as `__name__` label) in all its selectors(eg `up{foo="bar"}`).
Expand Down
21 changes: 17 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"fmt"
"os"
"path/filepath"
"strings"
"time"

"github.com/alecthomas/kingpin/v2"
Expand Down Expand Up @@ -31,7 +33,7 @@ var (
versionCmd = app.Command("version", "Print version and build information.")

validateCmd = app.Command("validate", "Validate Prometheus rule files using validation rules from config file.")
filePaths = validateCmd.Arg("path", "File paths to be validated, can be passed as a glob.").Required().Strings()
filePaths = validateCmd.Arg("path", "File paths to be validated, can use even double star globs or ~. Will be expanded if not done by bash.").Required().Strings()
disabledRules = validateCmd.Flag("disable-rule", "Allows to disable any validation rules by it's name. Can be passed multiple times.").Short('d').Strings()
enabledRules = validateCmd.Flag("enable-rule", "Only enable these validation rules. Can be passed multiple times.").Short('e').Strings()
validationOutputFormat = validateCmd.Flag("output", "Format of the output.").Short('o').PlaceHolder("[text,json,yaml]").Default("text").Enum("text", "json", "yaml")
Expand Down Expand Up @@ -144,11 +146,22 @@ func main() {
}
var filesToBeValidated []string
for _, path := range *filePaths {
paths, err := doublestar.Glob(os.DirFS("."), path)
if strings.HasPrefix(path, "~/") {
home, err := os.UserHomeDir()
if err != nil {
exitWithError(fmt.Errorf("failed to get user home directory: %w", err))
}
path = filepath.Join(home, path[2:])
}

base, pattern := doublestar.SplitPattern(path)
paths, err := doublestar.Glob(os.DirFS(base), pattern, doublestar.WithFilesOnly(), doublestar.WithFailOnIOErrors(), doublestar.WithFailOnPatternNotExist())
if err != nil {
exitWithError(err)
exitWithError(fmt.Errorf("failed expanding glob pattern `%s`: %w", path, err))
}
for _, p := range paths {
filesToBeValidated = append(filesToBeValidated, filepath.Join(base, p))
}
filesToBeValidated = append(filesToBeValidated, paths...)
}

if *supportLoki {
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type PrometheusConfig struct {
CacheFile string `yaml:"cacheFile,omitempty" default:".promruval_cache.json"`
MaxCacheAge time.Duration `yaml:"maxCacheAge,omitempty" default:"1h"`
BearerTokenFile string `yaml:"bearerTokenFile,omitempty"`
QueryOffset time.Duration `yaml:"queryOffset,omitempty" default:"1m"`
QueryLookback time.Duration `yaml:"queryLookback,omitempty" default:"20m"`
}

func (c *PrometheusConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
Expand Down
73 changes: 46 additions & 27 deletions pkg/prometheus/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,65 @@ import (
"os"
"time"

"github.com/prometheus/common/model"

log "github.com/sirupsen/logrus"
)

func newCache(file string, maxAge time.Duration) *cache {
c := &cache{
file: file,
Queries: map[string][]*model.Sample{},
Labels: []string{},
Series: map[string][]model.LabelSet{},
func newCache(file, prometheusURL string, maxAge time.Duration) *cache {
emptyCache := cache{
file: file,
PrometheusURL: prometheusURL,
Created: time.Now(),
QueriesStats: make(map[string]queryStats),
KnownLabels: []string{},
SelectorMatchingSeries: make(map[string]int),
}
info, err := os.Stat(file)
previousCache := emptyCache
f, err := os.Open(file)
if err != nil {
log.Warnf("cache file %s not found, skipping", file)
return c
if !os.IsNotExist(err) {
f, err = os.Create(file)
if err != nil {
log.Warnf("error creating cache file %s: %s", file, err)
return &emptyCache
}
} else {
log.Warnf("error opening cache file %s, skipping: %s", file, err)
return &emptyCache
}
}
cacheAge := time.Since(info.ModTime())
if err := json.NewDecoder(f).Decode(&previousCache); err != nil {
log.Warnf("invalid cache file `%s` format: %s", file, err)
return &emptyCache
}
pruneCache := false
cacheAge := time.Since(previousCache.Created)
if maxAge != 0 && cacheAge > maxAge {
log.Warnf("%s old cache %s is outdated, limit is %s", cacheAge, file, maxAge)
return c
log.Infof("%s old cache %s is outdated, limit is %s", cacheAge, file, maxAge)
pruneCache = true
}
f, err := os.Open(file)
if err != nil {
log.Warnf("error opening cache file %s, skipping: %s", file, err)
return c
if previousCache.PrometheusURL != prometheusURL {
log.Infof("data in cache file %s are from different Prometheus on URL %s, cannot be used for the instance on %s URL", file, previousCache.PrometheusURL, prometheusURL)
pruneCache = true
}
err = json.NewDecoder(f).Decode(c)
if err != nil {
log.Warnf("invalid cache file `%s` format: %s", file, err)
return c
if pruneCache {
log.Warnf("Pruning cache file %s", file)
return &emptyCache
}
return c
return &previousCache
}

type queryStats struct {
Error error `json:"error,omitempty"`
Series int `json:"series"`
Duration time.Duration `json:"duration"`
}
type cache struct {
file string
Queries map[string][]*model.Sample `json:"queries"`
Labels []string `json:"labels"`
Series map[string][]model.LabelSet `json:"series"`
file string
PrometheusURL string `json:"prometheus_url"`
Created time.Time `json:"created"`
QueriesStats map[string]queryStats `json:"queries_stats"`
KnownLabels []string `json:"known_labels"`
SelectorMatchingSeries map[string]int `json:"selector_matching_series"`
}

func (c *cache) Dump() {
Expand Down
135 changes: 83 additions & 52 deletions pkg/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,29 @@ func NewClientWithRoundTripper(promConfig config.PrometheusConfig, tripper http.
}
v1cli := v1.NewAPI(cli)
promClient := Client{
apiClient: v1cli,
url: promConfig.URL,
timeout: promConfig.Timeout,
cache: newCache(promConfig.CacheFile, promConfig.MaxCacheAge),
apiClient: v1cli,
url: promConfig.URL,
timeout: promConfig.Timeout,
queryOffset: promConfig.QueryOffset,
queryLookback: promConfig.QueryLookback,
cache: newCache(promConfig.CacheFile, promConfig.URL, promConfig.MaxCacheAge),
}
return &promClient, nil
}

type Client struct {
apiClient v1.API
url string
timeout time.Duration
cache *cache
apiClient v1.API
url string
timeout time.Duration
queryOffset time.Duration
queryLookback time.Duration
cache *cache
}

func (s *Client) queryTimeRange() (start, end time.Time) {
end = time.Now().Add(-s.queryOffset)
start = end.Add(-s.queryLookback)
return start, end
}

func (s *Client) DumpCache() {
Expand All @@ -94,69 +104,90 @@ func (s *Client) newContext() (context.Context, context.CancelFunc) {
}

func (s *Client) SelectorMatch(selector string) ([]model.LabelSet, error) {
if _, ok := s.cache.Series[selector]; !ok {
ctx, cancel := s.newContext()
defer cancel()
result, warnings, err := s.apiClient.Series(ctx, []string{selector}, time.Now().Add(-time.Minute), time.Now())
if err != nil {
return nil, fmt.Errorf("failed to query series: %w", err)
}
if len(warnings) > 0 {
log.Warnf("Warning querying Prometheus: %s\n", warnings)
}
s.cache.Series[selector] = result
} else {
log.Debugf("using cached series match result for `%s`", selector)
ctx, cancel := s.newContext()
defer cancel()
start := time.Now()
queryStart, queryEnd := s.queryTimeRange()
result, warnings, err := s.apiClient.Series(ctx, []string{selector}, queryStart, queryEnd)
log.Debugf("queried series matching selector `%s` on %s prometheus in %s", selector, s.url, time.Since(start))
if err != nil {
return nil, fmt.Errorf("failed to query series: %w", err)
}
if len(warnings) > 0 {
log.Warnf("Warning querying Prometheus: %s\n", warnings)
}
return s.cache.Series[selector], nil
return result, nil
}

func (s *Client) SelectorMatchingSeries(selector string) (int, error) {
if count, found := s.cache.SelectorMatchingSeries[selector]; found {
return count, nil
}
series, err := s.SelectorMatch(selector)
if err != nil {
return 0, err
}
s.cache.SelectorMatchingSeries[selector] = len(series)
return len(series), nil
}

func (s *Client) Labels() ([]string, error) {
if len(s.cache.Labels) == 0 {
if len(s.cache.KnownLabels) == 0 {
ctx, cancel := s.newContext()
defer cancel()
start := time.Now()
result, warnings, err := s.apiClient.LabelNames(ctx, []string{}, time.Now().Add(-time.Minute), time.Now())
log.Infof("loaded all prometheus label names from %s in %s", s.url, time.Since(start))
queryStart, queryEnd := s.queryTimeRange()
result, warnings, err := s.apiClient.LabelNames(ctx, []string{}, queryStart, queryEnd)
log.Debugf("loaded all prometheus label names from %s in %s", s.url, time.Since(start))
if err != nil {
return nil, err
}
if len(warnings) > 0 {
log.Warnf("Warning querying Prometheus: %s\n", warnings)
}
s.cache.Labels = result
s.cache.KnownLabels = result
}
return s.cache.Labels, nil
return s.cache.KnownLabels, nil
}

func (s *Client) Query(query string) ([]*model.Sample, int, time.Duration, error) {
var duration time.Duration
if _, ok := s.cache.Queries[query]; !ok {
ctx, cancel := s.newContext()
defer cancel()
start := time.Now()
result, warnings, err := s.apiClient.Query(ctx, query, time.Now())
duration = time.Since(start)
log.Infof("query `%s` on %s prometheus took %s", query, s.url, duration)
if err != nil {
return nil, 0, 0, fmt.Errorf("error querying prometheus: %w", err)
}
if len(warnings) > 0 {
log.Warnf("Warning querying Prometheus: %s\n", warnings)
ctx, cancel := s.newContext()
defer cancel()
start := time.Now()
_, queryEnd := s.queryTimeRange()
result, warnings, err := s.apiClient.Query(ctx, query, queryEnd)
duration = time.Since(start)
log.Debugf("query `%s` on %s prometheus took %s", query, s.url, duration)
if err != nil {
return nil, 0, 0, fmt.Errorf("error querying prometheus: %w", err)
}
if len(warnings) > 0 {
log.Warnf("Warning querying Prometheus: %s\n", warnings)
}
switch result.Type() {
case model.ValVector:
vectorResult, ok := result.(model.Vector)
if !ok {
return nil, 0, 0, fmt.Errorf("failed to convert result to model.Vector")
}
switch result.Type() {
case model.ValVector:
vectorResult, ok := result.(model.Vector)
if !ok {
return nil, 0, 0, fmt.Errorf("failed to convert result to model.Vector")
}
s.cache.Queries[query] = vectorResult
default:
return nil, 0, 0, fmt.Errorf("unknown prometheus response type: %s", result)
return vectorResult, len(vectorResult), duration, nil
case model.ValScalar:
scalarResult, ok := result.(*model.Scalar)
if !ok {
return nil, 0, 0, fmt.Errorf("failed to convert result to model.Scalar")
}
} else {
log.Debugf("using cached query result for `%s`", query)
return []*model.Sample{{Value: scalarResult.Value, Timestamp: model.Now()}}, 1, duration, nil
}
return nil, 0, 0, fmt.Errorf("unknown prometheus response type: %s", result)
}

func (s *Client) QueryStats(query string) (int, time.Duration, error) {
if stats, found := s.cache.QueriesStats[query]; found {
return stats.Series, stats.Duration, stats.Error
}
res := s.cache.Queries[query]
return res, len(res), duration, nil
_, series, duration, err := s.Query(query)
stats := queryStats{Series: series, Duration: duration, Error: err}
s.cache.QueriesStats[query] = stats
return stats.Series, stats.Duration, stats.Error
}
Loading

0 comments on commit 18e9894

Please sign in to comment.