From 64a97007bcbbe42e98d9bbec4bb3993286d3650c Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Mon, 26 Apr 2021 11:44:35 +0100 Subject: [PATCH 01/18] import wal package from grafana/agent (with string interning disabled) Signed-off-by: Michael Okoko --- pkg/rules/remotewrite/series.go | 256 +++++++++++ pkg/rules/remotewrite/util.go | 118 ++++++ pkg/rules/remotewrite/wal.go | 678 ++++++++++++++++++++++++++++++ pkg/rules/remotewrite/wal_test.go | 374 ++++++++++++++++ 4 files changed, 1426 insertions(+) create mode 100644 pkg/rules/remotewrite/series.go create mode 100644 pkg/rules/remotewrite/util.go create mode 100644 pkg/rules/remotewrite/wal.go create mode 100644 pkg/rules/remotewrite/wal_test.go diff --git a/pkg/rules/remotewrite/series.go b/pkg/rules/remotewrite/series.go new file mode 100644 index 0000000000..f75d415ebb --- /dev/null +++ b/pkg/rules/remotewrite/series.go @@ -0,0 +1,256 @@ +package remotewrite + +import ( + "sync" + + "github.com/prometheus/prometheus/pkg/labels" +) + +type memSeries struct { + sync.Mutex + + ref uint64 + lset labels.Labels + lastTs int64 + + // TODO(rfratto): this solution below isn't perfect, and there's still + // the possibility for a series to be deleted before it's + // completely gone from the WAL. Rather, we should have gc return + // a "should delete" map and be given a "deleted" map. + // If a series that is going to be marked for deletion is in the + // "deleted" map, then it should be deleted instead. + // + // The "deleted" map will be populated by the Truncate function. + // It will be cleared with every call to gc. + + // willDelete marks a series as to be deleted on the next garbage + // collection. If it receives a write, willDelete is disabled. + willDelete bool + + // Whether this series has samples waiting to be committed to the WAL + pendingCommit bool +} + +func (s *memSeries) updateTs(ts int64) { + s.lastTs = ts + s.willDelete = false + s.pendingCommit = true +} + +// seriesHashmap is a simple hashmap for memSeries by their label set. It is +// built on top of a regular hashmap and holds a slice of series to resolve +// hash collisions. Its methods require the hash to be submitted with it to +// avoid re-computations throughout the code. +// +// This code is copied from the Prometheus TSDB. +type seriesHashmap map[uint64][]*memSeries + +func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries { + for _, s := range m[hash] { + if labels.Equal(s.lset, lset) { + return s + } + } + return nil +} + +func (m seriesHashmap) set(hash uint64, s *memSeries) { + l := m[hash] + for i, prev := range l { + if labels.Equal(prev.lset, s.lset) { + l[i] = s + return + } + } + m[hash] = append(l, s) +} + +func (m seriesHashmap) del(hash uint64, ref uint64) { + var rem []*memSeries + for _, s := range m[hash] { + if s.ref != ref { + rem = append(rem, s) + } else { + //intern.ReleaseLabels(intern.Global, s.lset) + } + } + if len(rem) == 0 { + delete(m, hash) + } else { + m[hash] = rem + } +} + +const ( + // defaultStripeSize is the default number of entries to allocate in the + // stripeSeries hash map. + defaultStripeSize = 1 << 14 +) + +// stripeSeries locks modulo ranges of IDs and hashes to reduce lock contention. +// The locks are padded to not be on the same cache line. Filling the padded space +// with the maps was profiled to be slower – likely due to the additional pointer +// dereferences. +// +// This code is copied from the Prometheus TSDB. +type stripeSeries struct { + size int + series []map[uint64]*memSeries + hashes []seriesHashmap + locks []stripeLock +} + +type stripeLock struct { + sync.RWMutex + // Padding to avoid multiple locks being on the same cache line. + _ [40]byte +} + +func newStripeSeries() *stripeSeries { + stripeSize := defaultStripeSize + s := &stripeSeries{ + size: stripeSize, + series: make([]map[uint64]*memSeries, stripeSize), + hashes: make([]seriesHashmap, stripeSize), + locks: make([]stripeLock, stripeSize), + } + + for i := range s.series { + s.series[i] = map[uint64]*memSeries{} + } + for i := range s.hashes { + s.hashes[i] = seriesHashmap{} + } + return s +} + +// gc garbage collects old chunks that are strictly before mint and removes +// series entirely that have no chunks left. +func (s *stripeSeries) gc(mint int64) map[uint64]struct{} { + var ( + deleted = map[uint64]struct{}{} + ) + + // Run through all series and find series that haven't been written to + // since mint. Mark those series as deleted and store their ID. + for i := 0; i < s.size; i++ { + s.locks[i].Lock() + + for _, series := range s.series[i] { + series.Lock() + seriesHash := series.lset.Hash() + + // If the series has received a write after mint, there's still + // data and it's not completely gone yet. + if series.lastTs >= mint || series.pendingCommit { + series.willDelete = false + series.Unlock() + continue + } + + // The series hasn't received any data and *might* be gone, but + // we want to give it an opportunity to come back before marking + // it as deleted, so we wait one more GC cycle. + if !series.willDelete { + series.willDelete = true + series.Unlock() + continue + } + + // The series is gone entirely. We'll need to delete the label + // hash (if one exists) so we'll obtain a lock for that too. + j := int(seriesHash) & (s.size - 1) + if i != j { + s.locks[j].Lock() + } + + deleted[series.ref] = struct{}{} + delete(s.series[i], series.ref) + s.hashes[j].del(seriesHash, series.ref) + + if i != j { + s.locks[j].Unlock() + } + + series.Unlock() + } + + s.locks[i].Unlock() + } + + return deleted +} + +func (s *stripeSeries) getByID(id uint64) *memSeries { + i := id & uint64(s.size-1) + + s.locks[i].RLock() + series := s.series[i][id] + s.locks[i].RUnlock() + + return series +} + +func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries { + i := hash & uint64(s.size-1) + + s.locks[i].RLock() + series := s.hashes[i].get(hash, lset) + s.locks[i].RUnlock() + + return series +} + +func (s *stripeSeries) set(hash uint64, series *memSeries) { + i := hash & uint64(s.size-1) + s.locks[i].Lock() + s.hashes[i].set(hash, series) + s.locks[i].Unlock() + + i = series.ref & uint64(s.size-1) + s.locks[i].Lock() + s.series[i][series.ref] = series + s.locks[i].Unlock() +} + +func (s *stripeSeries) iterator() *stripeSeriesIterator { + return &stripeSeriesIterator{s} +} + +// stripeSeriesIterator allows to iterate over series through a channel. +// The channel should always be completely consumed to not leak. +type stripeSeriesIterator struct { + s *stripeSeries +} + +func (it *stripeSeriesIterator) Channel() <-chan *memSeries { + ret := make(chan *memSeries) + + go func() { + for i := 0; i < it.s.size; i++ { + it.s.locks[i].RLock() + + for _, series := range it.s.series[i] { + series.Lock() + + j := int(series.lset.Hash()) & (it.s.size - 1) + if i != j { + it.s.locks[j].RLock() + } + + ret <- series + + if i != j { + it.s.locks[j].RUnlock() + } + series.Unlock() + } + + it.s.locks[i].RUnlock() + } + + close(ret) + }() + + return ret +} diff --git a/pkg/rules/remotewrite/util.go b/pkg/rules/remotewrite/util.go new file mode 100644 index 0000000000..3a1e593e0f --- /dev/null +++ b/pkg/rules/remotewrite/util.go @@ -0,0 +1,118 @@ +package remotewrite + +import ( + "path/filepath" + "sync" + + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/wal" +) + +type walReplayer struct { + w wal.WriteTo +} + +func (r walReplayer) Replay(dir string) error { + w, err := wal.Open(nil, dir) + if err != nil { + return err + } + + dir, startFrom, err := wal.LastCheckpoint(w.Dir()) + if err != nil && err != record.ErrNotFound { + return err + } + + if err == nil { + sr, err := wal.NewSegmentsReader(dir) + if err != nil { + return err + } + + err = r.replayWAL(wal.NewReader(sr)) + if closeErr := sr.Close(); closeErr != nil && err == nil { + err = closeErr + } + if err != nil { + return err + } + + startFrom++ + } + + _, last, err := wal.Segments(w.Dir()) + if err != nil { + return err + } + + for i := startFrom; i <= last; i++ { + s, err := wal.OpenReadSegment(wal.SegmentName(w.Dir(), i)) + if err != nil { + return err + } + + sr := wal.NewSegmentBufReader(s) + err = r.replayWAL(wal.NewReader(sr)) + if closeErr := sr.Close(); closeErr != nil && err == nil { + err = closeErr + } + if err != nil { + return err + } + } + + return nil +} + +func (r walReplayer) replayWAL(reader *wal.Reader) error { + var dec record.Decoder + + for reader.Next() { + rec := reader.Record() + switch dec.Type(rec) { + case record.Series: + series, err := dec.Series(rec, nil) + if err != nil { + return err + } + r.w.StoreSeries(series, 0) + case record.Samples: + samples, err := dec.Samples(rec, nil) + if err != nil { + return err + } + r.w.Append(samples) + } + } + + return nil +} + +type walDataCollector struct { + mut sync.Mutex + samples []record.RefSample + series []record.RefSeries +} + +func (c *walDataCollector) Append(samples []record.RefSample) bool { + c.mut.Lock() + defer c.mut.Unlock() + + c.samples = append(c.samples, samples...) + return true +} + +func (c *walDataCollector) StoreSeries(series []record.RefSeries, _ int) { + c.mut.Lock() + defer c.mut.Unlock() + + c.series = append(c.series, series...) +} + +func (c *walDataCollector) SeriesReset(_ int) {} + +// SubDirectory returns the subdirectory within a Storage directory used for +// the Prometheus WAL. +func SubDirectory(base string) string { + return filepath.Join(base, "wal") +} diff --git a/pkg/rules/remotewrite/wal.go b/pkg/rules/remotewrite/wal.go new file mode 100644 index 0000000000..a7851cff09 --- /dev/null +++ b/pkg/rules/remotewrite/wal.go @@ -0,0 +1,678 @@ +package remotewrite + +import ( + "context" + "fmt" + "math" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/pkg/value" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/wal" +) + +// ErrWALClosed is an error returned when a WAL operation can't run because the +// storage has already been closed. +var ErrWALClosed = fmt.Errorf("WAL storage closed") + +type storageMetrics struct { + r prometheus.Registerer + + numActiveSeries prometheus.Gauge + numDeletedSeries prometheus.Gauge + totalCreatedSeries prometheus.Counter + totalRemovedSeries prometheus.Counter + totalAppendedSamples prometheus.Counter +} + +func newStorageMetrics(r prometheus.Registerer) *storageMetrics { + m := storageMetrics{r: r} + m.numActiveSeries = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "agent_wal_storage_active_series", + Help: "Current number of active series being tracked by the WAL storage", + }) + + m.numDeletedSeries = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "agent_wal_storage_deleted_series", + Help: "Current number of series marked for deletion from memory", + }) + + m.totalCreatedSeries = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "agent_wal_storage_created_series_total", + Help: "Total number of created series appended to the WAL", + }) + + m.totalRemovedSeries = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "agent_wal_storage_removed_series_total", + Help: "Total number of created series removed from the WAL", + }) + + m.totalAppendedSamples = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "agent_wal_samples_appended_total", + Help: "Total number of samples appended to the WAL", + }) + + if r != nil { + r.MustRegister( + m.numActiveSeries, + m.numDeletedSeries, + m.totalCreatedSeries, + m.totalRemovedSeries, + m.totalAppendedSamples, + ) + } + + return &m +} + +func (m *storageMetrics) Unregister() { + if m.r == nil { + return + } + cs := []prometheus.Collector{ + m.numActiveSeries, + m.numDeletedSeries, + m.totalCreatedSeries, + m.totalRemovedSeries, + } + for _, c := range cs { + m.r.Unregister(c) + } +} + +// Storage implements storage.Storage, and just writes to the WAL. +type Storage struct { + // Embed Queryable/ChunkQueryable for compatibility, but don't actually implement it. + storage.Queryable + storage.ChunkQueryable + + // Operations against the WAL must be protected by a mutex so it doesn't get + // closed in the middle of an operation. Other operations are concurrency-safe, so we + // use a RWMutex to allow multiple usages of the WAL at once. If the WAL is closed, all + // operations that change the WAL must fail. + walMtx sync.RWMutex + walClosed bool + + path string + wal *wal.WAL + logger log.Logger + + appenderPool sync.Pool + bufPool sync.Pool + + mtx sync.RWMutex + nextRef uint64 + series *stripeSeries + + deletedMtx sync.Mutex + deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until. + + metrics *storageMetrics +} + +// NewStorage makes a new Storage. +func NewStorage(logger log.Logger, registerer prometheus.Registerer, path string) (*Storage, error) { + w, err := wal.NewSize(logger, registerer, SubDirectory(path), wal.DefaultSegmentSize, true) + if err != nil { + return nil, err + } + + storage := &Storage{ + path: path, + wal: w, + logger: logger, + deleted: map[uint64]int{}, + series: newStripeSeries(), + metrics: newStorageMetrics(registerer), + + // The first ref ID must be non-zero, as the scraping code treats 0 as a + // non-existent ID and won't cache it. + nextRef: 1, + } + + storage.bufPool.New = func() interface{} { + b := make([]byte, 0, 1024) + return b + } + + storage.appenderPool.New = func() interface{} { + return &appender{ + w: storage, + series: make([]record.RefSeries, 0, 100), + samples: make([]record.RefSample, 0, 100), + } + } + + if err := storage.replayWAL(); err != nil { + level.Warn(storage.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err) + if err := w.Repair(err); err != nil { + return nil, errors.Wrap(err, "repair corrupted WAL") + } + } + + return storage, nil +} + +func (w *Storage) replayWAL() error { + w.walMtx.RLock() + defer w.walMtx.RUnlock() + + if w.walClosed { + return ErrWALClosed + } + + level.Info(w.logger).Log("msg", "replaying WAL, this may take a while", "dir", w.wal.Dir()) + dir, startFrom, err := wal.LastCheckpoint(w.wal.Dir()) + if err != nil && err != record.ErrNotFound { + return errors.Wrap(err, "find last checkpoint") + } + + if err == nil { + sr, err := wal.NewSegmentsReader(dir) + if err != nil { + return errors.Wrap(err, "open checkpoint") + } + defer func() { + if err := sr.Close(); err != nil { + level.Warn(w.logger).Log("msg", "error while closing the wal segments reader", "err", err) + } + }() + + // A corrupted checkpoint is a hard error for now and requires user + // intervention. There's likely little data that can be recovered anyway. + if err := w.loadWAL(wal.NewReader(sr)); err != nil { + return errors.Wrap(err, "backfill checkpoint") + } + startFrom++ + level.Info(w.logger).Log("msg", "WAL checkpoint loaded") + } + + // Find the last segment. + _, last, err := wal.Segments(w.wal.Dir()) + if err != nil { + return errors.Wrap(err, "finding WAL segments") + } + + // Backfill segments from the most recent checkpoint onwards. + for i := startFrom; i <= last; i++ { + s, err := wal.OpenReadSegment(wal.SegmentName(w.wal.Dir(), i)) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i)) + } + + sr := wal.NewSegmentBufReader(s) + err = w.loadWAL(wal.NewReader(sr)) + if err := sr.Close(); err != nil { + level.Warn(w.logger).Log("msg", "error while closing the wal segments reader", "err", err) + } + if err != nil { + return err + } + level.Info(w.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", last) + } + + return nil +} + +func (w *Storage) loadWAL(r *wal.Reader) (err error) { + var ( + dec record.Decoder + ) + + var ( + decoded = make(chan interface{}, 10) + errCh = make(chan error, 1) + seriesPool = sync.Pool{ + New: func() interface{} { + return []record.RefSeries{} + }, + } + samplesPool = sync.Pool{ + New: func() interface{} { + return []record.RefSample{} + }, + } + ) + + go func() { + defer close(decoded) + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + series := seriesPool.Get().([]record.RefSeries)[:0] + series, err = dec.Series(rec, series) + if err != nil { + errCh <- &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode series"), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- series + case record.Samples: + samples := samplesPool.Get().([]record.RefSample)[:0] + samples, err = dec.Samples(rec, samples) + if err != nil { + errCh <- &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode samples"), + Segment: r.Segment(), + Offset: r.Offset(), + } + } + decoded <- samples + case record.Tombstones: + // We don't care about tombstones + continue + default: + errCh <- &wal.CorruptionErr{ + Err: errors.Errorf("invalid record type %v", dec.Type(rec)), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + } + }() + + for d := range decoded { + switch v := d.(type) { + case []record.RefSeries: + for _, s := range v { + // If this is a new series, create it in memory without a timestamp. + // If we read in a sample for it, we'll use the timestamp of the latest + // sample. Otherwise, the series is stale and will be deleted once + // the truncation is performed. + if w.series.getByID(s.Ref) == nil { + series := &memSeries{ref: s.Ref, lset: s.Labels, lastTs: 0} + w.series.set(s.Labels.Hash(), series) + + w.metrics.numActiveSeries.Inc() + w.metrics.totalCreatedSeries.Inc() + + w.mtx.Lock() + if w.nextRef <= s.Ref { + w.nextRef = s.Ref + 1 + } + w.mtx.Unlock() + } + } + + //nolint:staticcheck + seriesPool.Put(v) + case []record.RefSample: + for _, s := range v { + // Update the lastTs for the series based + series := w.series.getByID(s.Ref) + if series == nil { + level.Warn(w.logger).Log("msg", "found sample referencing non-existing series, skipping") + continue + } + + series.Lock() + if s.T > series.lastTs { + series.lastTs = s.T + } + series.Unlock() + } + + //nolint:staticcheck + samplesPool.Put(v) + default: + panic(fmt.Errorf("unexpected decoded type: %T", d)) + } + } + + select { + case err := <-errCh: + return err + default: + } + + if r.Err() != nil { + return errors.Wrap(r.Err(), "read records") + } + + return nil +} + +// Directory returns the path where the WAL storage is held. +func (w *Storage) Directory() string { + return w.path +} + +// Appender returns a new appender against the storage. +func (w *Storage) Appender(_ context.Context) storage.Appender { + return w.appenderPool.Get().(storage.Appender) +} + +// StartTime always returns 0, nil. It is implemented for compatibility with +// Prometheus, but is unused in the agent. +func (*Storage) StartTime() (int64, error) { + return 0, nil +} + +// Truncate removes all data from the WAL prior to the timestamp specified by +// mint. +func (w *Storage) Truncate(mint int64) error { + w.walMtx.RLock() + defer w.walMtx.RUnlock() + + if w.walClosed { + return ErrWALClosed + } + + start := time.Now() + + // Garbage collect series that haven't received an update since mint. + w.gc(mint) + level.Info(w.logger).Log("msg", "series GC completed", "duration", time.Since(start)) + + first, last, err := wal.Segments(w.wal.Dir()) + if err != nil { + return errors.Wrap(err, "get segment range") + } + + // Start a new segment, so low ingestion volume instance don't have more WAL + // than needed. + err = w.wal.NextSegment() + if err != nil { + return errors.Wrap(err, "next segment") + } + + last-- // Never consider last segment for checkpoint. + if last < 0 { + return nil // no segments yet. + } + + // The lower two thirds of segments should contain mostly obsolete samples. + // If we have less than two segments, it's not worth checkpointing yet. + last = first + (last-first)*2/3 + if last <= first { + return nil + } + + keep := func(id uint64) bool { + if w.series.getByID(id) != nil { + return true + } + + w.deletedMtx.Lock() + _, ok := w.deleted[id] + w.deletedMtx.Unlock() + return ok + } + if _, err = wal.Checkpoint(w.logger, w.wal, first, last, keep, mint); err != nil { + return errors.Wrap(err, "create checkpoint") + } + if err := w.wal.Truncate(last + 1); err != nil { + // If truncating fails, we'll just try again at the next checkpoint. + // Leftover segments will just be ignored in the future if there's a checkpoint + // that supersedes them. + level.Error(w.logger).Log("msg", "truncating segments failed", "err", err) + } + + // The checkpoint is written and segments before it is truncated, so we no + // longer need to track deleted series that are before it. + w.deletedMtx.Lock() + for ref, segment := range w.deleted { + if segment < first { + delete(w.deleted, ref) + w.metrics.totalRemovedSeries.Inc() + } + } + w.metrics.numDeletedSeries.Set(float64(len(w.deleted))) + w.deletedMtx.Unlock() + + if err := wal.DeleteCheckpoints(w.wal.Dir(), last); err != nil { + // Leftover old checkpoints do not cause problems down the line beyond + // occupying disk space. + // They will just be ignored since a higher checkpoint exists. + level.Error(w.logger).Log("msg", "delete old checkpoints", "err", err) + } + + level.Info(w.logger).Log("msg", "WAL checkpoint complete", + "first", first, "last", last, "duration", time.Since(start)) + return nil +} + +// gc removes data before the minimum timestamp from the head. +func (w *Storage) gc(mint int64) { + deleted := w.series.gc(mint) + w.metrics.numActiveSeries.Sub(float64(len(deleted))) + + _, last, _ := wal.Segments(w.wal.Dir()) + w.deletedMtx.Lock() + defer w.deletedMtx.Unlock() + + // We want to keep series records for any newly deleted series + // until we've passed the last recorded segment. The WAL will + // still contain samples records with all of the ref IDs until + // the segment's samples has been deleted from the checkpoint. + // + // If the series weren't kept on startup when the WAL was replied, + // the samples wouldn't be able to be used since there wouldn't + // be any labels for that ref ID. + for ref := range deleted { + w.deleted[ref] = last + } + + w.metrics.numDeletedSeries.Set(float64(len(w.deleted))) +} + +// WriteStalenessMarkers appends a staleness sample for all active series. +func (w *Storage) WriteStalenessMarkers(remoteTsFunc func() int64) error { + var lastErr error + var lastTs int64 + + app := w.Appender(context.Background()) + it := w.series.iterator() + for series := range it.Channel() { + var ( + ref = series.ref + lset = series.lset + ) + + ts := timestamp.FromTime(time.Now()) + _, err := app.Append(ref, lset, ts, math.Float64frombits(value.StaleNaN)) + if err != nil { + lastErr = err + } + + // Remove millisecond precision; the remote write timestamp we get + // only has second precision. + lastTs = (ts / 1000) * 1000 + } + + if lastErr == nil { + if err := app.Commit(); err != nil { + return fmt.Errorf("failed to commit staleness markers: %w", err) + } + + // Wait for remote write to write the lastTs, but give up after 1m + level.Info(w.logger).Log("msg", "waiting for remote write to write staleness markers...") + + stopCh := time.After(1 * time.Minute) + start := time.Now() + + Outer: + for { + select { + case <-stopCh: + level.Error(w.logger).Log("msg", "timed out waiting for staleness markers to be written") + break Outer + default: + writtenTs := remoteTsFunc() + if writtenTs >= lastTs { + duration := time.Since(start) + level.Info(w.logger).Log("msg", "remote write wrote staleness markers", "duration", duration) + break Outer + } + + level.Info(w.logger).Log("msg", "remote write hasn't written staleness markers yet", "remoteTs", writtenTs, "lastTs", lastTs) + + // Wait a bit before reading again + time.Sleep(5 * time.Second) + } + } + } + + return lastErr +} + +// Close closes the storage and all its underlying resources. +func (w *Storage) Close() error { + w.walMtx.Lock() + defer w.walMtx.Unlock() + + if w.walClosed { + return fmt.Errorf("already closed") + } + w.walClosed = true + + if w.metrics != nil { + w.metrics.Unregister() + } + return w.wal.Close() +} + +type appender struct { + w *Storage + series []record.RefSeries + samples []record.RefSample +} + +func (a *appender) Append(ref uint64, l labels.Labels, t int64, v float64) (uint64, error) { + if ref == 0 { + return a.Add(l, t, v) + } + return ref, a.AddFast(ref, t, v) +} + +func (a *appender) Add(l labels.Labels, t int64, v float64) (uint64, error) { + hash := l.Hash() + series := a.w.series.getByHash(hash, l) + if series != nil { + return series.ref, a.AddFast(series.ref, t, v) + } + + // Ensure no empty or duplicate labels have gotten through. This mirrors the + // equivalent validation code in the TSDB's headAppender. + l = l.WithoutEmpty() + if len(l) == 0 { + return 0, errors.Wrap(tsdb.ErrInvalidSample, "empty labelset") + } + + if lbl, dup := l.HasDuplicateLabelNames(); dup { + return 0, errors.Wrap(tsdb.ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, lbl)) + } + + a.w.mtx.Lock() + ref := a.w.nextRef + a.w.nextRef++ + a.w.mtx.Unlock() + + series = &memSeries{ref: ref, lset: l} + series.updateTs(t) + + a.series = append(a.series, record.RefSeries{ + Ref: ref, + Labels: l, + }) + + a.w.series.set(hash, series) + + a.w.metrics.numActiveSeries.Inc() + a.w.metrics.totalCreatedSeries.Inc() + a.w.metrics.totalAppendedSamples.Inc() + + return series.ref, a.AddFast(series.ref, t, v) +} + +func (a *appender) AddFast(ref uint64, t int64, v float64) error { + series := a.w.series.getByID(ref) + if series == nil { + return storage.ErrNotFound + } + series.Lock() + defer series.Unlock() + + // Update last recorded timestamp. Used by Storage.gc to determine if a + // series is dead. + series.updateTs(t) + + a.samples = append(a.samples, record.RefSample{ + Ref: ref, + T: t, + V: v, + }) + + a.w.metrics.totalAppendedSamples.Inc() + return nil +} + +func (a *appender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { + // remote_write doesn't support exemplars yet, so do nothing here. + return 0, nil +} + +// Commit submits the collected samples and purges the batch. +func (a *appender) Commit() error { + a.w.walMtx.RLock() + defer a.w.walMtx.RUnlock() + + if a.w.walClosed { + return ErrWALClosed + } + + var encoder record.Encoder + buf := a.w.bufPool.Get().([]byte) + + if len(a.series) > 0 { + buf = encoder.Series(a.series, buf) + if err := a.w.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + + if len(a.samples) > 0 { + buf = encoder.Samples(a.samples, buf) + if err := a.w.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + + //nolint:staticcheck + a.w.bufPool.Put(buf) + + for _, sample := range a.samples { + series := a.w.series.getByID(sample.Ref) + if series != nil { + series.Lock() + series.pendingCommit = false + series.Unlock() + } + } + + return a.Rollback() +} + +func (a *appender) Rollback() error { + a.series = a.series[:0] + a.samples = a.samples[:0] + a.w.appenderPool.Put(a) + return nil +} diff --git a/pkg/rules/remotewrite/wal_test.go b/pkg/rules/remotewrite/wal_test.go new file mode 100644 index 0000000000..84787544eb --- /dev/null +++ b/pkg/rules/remotewrite/wal_test.go @@ -0,0 +1,374 @@ +package remotewrite + +import ( + "context" + "io/ioutil" + "math" + "os" + "sort" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/value" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/stretchr/testify/require" +) + +func TestStorage_InvalidSeries(t *testing.T) { + walDir, err := ioutil.TempDir(os.TempDir(), "wal") + require.NoError(t, err) + defer os.RemoveAll(walDir) + + s, err := NewStorage(log.NewNopLogger(), nil, walDir) + require.NoError(t, err) + defer func() { + require.NoError(t, s.Close()) + }() + + app := s.Appender(context.Background()) + + _, err = app.Append(0, labels.Labels{}, 0, 0) + require.Error(t, err, "should reject empty labels") + + _, err = app.Append(0, labels.Labels{{Name: "a", Value: "1"}, {Name: "a", Value: "2"}}, 0, 0) + require.Error(t, err, "should reject duplicate labels") + + // Sanity check: valid series + _, err = app.Append(0, labels.Labels{{Name: "a", Value: "1"}}, 0, 0) + require.NoError(t, err, "should not reject valid series") +} + +func TestStorage(t *testing.T) { + walDir, err := ioutil.TempDir(os.TempDir(), "wal") + require.NoError(t, err) + defer os.RemoveAll(walDir) + + s, err := NewStorage(log.NewNopLogger(), nil, walDir) + require.NoError(t, err) + defer func() { + require.NoError(t, s.Close()) + }() + + app := s.Appender(context.Background()) + + // Write some samples + payload := seriesList{ + {name: "foo", samples: []sample{{1, 10.0}, {10, 100.0}}}, + {name: "bar", samples: []sample{{2, 20.0}, {20, 200.0}}}, + {name: "baz", samples: []sample{{3, 30.0}, {30, 300.0}}}, + } + for _, metric := range payload { + metric.Write(t, app) + } + + require.NoError(t, app.Commit()) + + collector := walDataCollector{} + replayer := walReplayer{w: &collector} + require.NoError(t, replayer.Replay(s.wal.Dir())) + + names := []string{} + for _, series := range collector.series { + names = append(names, series.Labels.Get("__name__")) + } + require.Equal(t, payload.SeriesNames(), names) + + expectedSamples := payload.ExpectedSamples() + actual := collector.samples + sort.Sort(byRefSample(actual)) + require.Equal(t, expectedSamples, actual) +} + +func TestStorage_ExistingWAL(t *testing.T) { + walDir, err := ioutil.TempDir(os.TempDir(), "wal") + require.NoError(t, err) + defer os.RemoveAll(walDir) + + s, err := NewStorage(log.NewNopLogger(), nil, walDir) + require.NoError(t, err) + + app := s.Appender(context.Background()) + payload := seriesList{ + {name: "foo", samples: []sample{{1, 10.0}, {10, 100.0}}}, + {name: "bar", samples: []sample{{2, 20.0}, {20, 200.0}}}, + {name: "baz", samples: []sample{{3, 30.0}, {30, 300.0}}}, + {name: "blerg", samples: []sample{{4, 40.0}, {40, 400.0}}}, + } + + // Write half of the samples. + for _, metric := range payload[0 : len(payload)/2] { + metric.Write(t, app) + } + + require.NoError(t, app.Commit()) + require.NoError(t, s.Close()) + + // We need to wait a little bit for the previous store to finish + // flushing. + time.Sleep(time.Millisecond * 150) + + // Create a new storage, write the other half of samples. + s, err = NewStorage(log.NewNopLogger(), nil, walDir) + require.NoError(t, err) + defer func() { + require.NoError(t, s.Close()) + }() + + // Verify that the storage picked up existing series when it + // replayed the WAL. + for series := range s.series.iterator().Channel() { + require.Greater(t, series.lastTs, int64(0), "series timestamp not updated") + } + + app = s.Appender(context.Background()) + + for _, metric := range payload[len(payload)/2:] { + metric.Write(t, app) + } + + require.NoError(t, app.Commit()) + + collector := walDataCollector{} + replayer := walReplayer{w: &collector} + require.NoError(t, replayer.Replay(s.wal.Dir())) + + names := []string{} + for _, series := range collector.series { + names = append(names, series.Labels.Get("__name__")) + } + require.Equal(t, payload.SeriesNames(), names) + + expectedSamples := payload.ExpectedSamples() + actual := collector.samples + sort.Sort(byRefSample(actual)) + require.Equal(t, expectedSamples, actual) +} + +func TestStorage_Truncate(t *testing.T) { + // Same as before but now do the following: + // after writing all the data, forcefully create 4 more segments, + // then do a truncate of a timestamp for _some_ of the data. + // then read data back in. Expect to only get the latter half of data. + walDir, err := ioutil.TempDir(os.TempDir(), "wal") + require.NoError(t, err) + defer os.RemoveAll(walDir) + + s, err := NewStorage(log.NewNopLogger(), nil, walDir) + require.NoError(t, err) + defer func() { + require.NoError(t, s.Close()) + }() + + app := s.Appender(context.Background()) + + payload := seriesList{ + {name: "foo", samples: []sample{{1, 10.0}, {10, 100.0}}}, + {name: "bar", samples: []sample{{2, 20.0}, {20, 200.0}}}, + {name: "baz", samples: []sample{{3, 30.0}, {30, 300.0}}}, + {name: "blerg", samples: []sample{{4, 40.0}, {40, 400.0}}}, + } + + for _, metric := range payload { + metric.Write(t, app) + } + + require.NoError(t, app.Commit()) + + // Forefully create a bunch of new segments so when we truncate + // there's enough segments to be considered for truncation. + for i := 0; i < 5; i++ { + require.NoError(t, s.wal.NextSegment()) + } + + // Truncate half of the samples, keeping only the second sample + // per series. + keepTs := payload[len(payload)-1].samples[0].ts + 1 + err = s.Truncate(keepTs) + require.NoError(t, err) + + payload = payload.Filter(func(s sample) bool { + return s.ts >= keepTs + }) + expectedSamples := payload.ExpectedSamples() + + // Read back the WAL, collect series and samples. + collector := walDataCollector{} + replayer := walReplayer{w: &collector} + require.NoError(t, replayer.Replay(s.wal.Dir())) + + names := []string{} + for _, series := range collector.series { + names = append(names, series.Labels.Get("__name__")) + } + require.Equal(t, payload.SeriesNames(), names) + + actual := collector.samples + sort.Sort(byRefSample(actual)) + require.Equal(t, expectedSamples, actual) +} + +func TestStorage_WriteStalenessMarkers(t *testing.T) { + walDir, err := ioutil.TempDir(os.TempDir(), "wal") + require.NoError(t, err) + defer os.RemoveAll(walDir) + + s, err := NewStorage(log.NewNopLogger(), nil, walDir) + require.NoError(t, err) + defer func() { + require.NoError(t, s.Close()) + }() + + app := s.Appender(context.Background()) + + // Write some samples + payload := seriesList{ + {name: "foo", samples: []sample{{1, 10.0}, {10, 100.0}}}, + {name: "bar", samples: []sample{{2, 20.0}, {20, 200.0}}}, + {name: "baz", samples: []sample{{3, 30.0}, {30, 300.0}}}, + } + for _, metric := range payload { + metric.Write(t, app) + } + + require.NoError(t, app.Commit()) + + // Write staleness markers for every series + require.NoError(t, s.WriteStalenessMarkers(func() int64 { + // Pass math.MaxInt64 so it seems like everything was written already + return math.MaxInt64 + })) + + // Read back the WAL, collect series and samples. + collector := walDataCollector{} + replayer := walReplayer{w: &collector} + require.NoError(t, replayer.Replay(s.wal.Dir())) + + actual := collector.samples + sort.Sort(byRefSample(actual)) + + staleMap := map[uint64]bool{} + for _, sample := range actual { + if _, ok := staleMap[sample.Ref]; !ok { + staleMap[sample.Ref] = false + } + if value.IsStaleNaN(sample.V) { + staleMap[sample.Ref] = true + } + } + + for ref, v := range staleMap { + require.True(t, v, "ref %d doesn't have stale marker", ref) + } +} + +func TestStoraeg_TruncateAfterClose(t *testing.T) { + walDir, err := ioutil.TempDir(os.TempDir(), "wal") + require.NoError(t, err) + defer os.RemoveAll(walDir) + + s, err := NewStorage(log.NewNopLogger(), nil, walDir) + require.NoError(t, err) + + require.NoError(t, s.Close()) + require.Error(t, ErrWALClosed, s.Truncate(0)) +} + +type sample struct { + ts int64 + val float64 +} + +type series struct { + name string + samples []sample + + ref *uint64 +} + +func (s *series) Write(t *testing.T, app storage.Appender) { + t.Helper() + + lbls := labels.FromMap(map[string]string{"__name__": s.name}) + + offset := 0 + if s.ref == nil { + // Write first sample to get ref ID + ref, err := app.Append(0, lbls, s.samples[0].ts, s.samples[0].val) + require.NoError(t, err) + + s.ref = &ref + offset = 1 + } + + // Write other data points with AddFast + for _, sample := range s.samples[offset:] { + _, err := app.Append(*s.ref, lbls, sample.ts, sample.val) + require.NoError(t, err) + } +} + +type seriesList []*series + +// Filter creates a new seriesList with series filtered by a sample +// keep predicate function. +func (s seriesList) Filter(fn func(s sample) bool) seriesList { + var ret seriesList + + for _, entry := range s { + var samples []sample + + for _, sample := range entry.samples { + if fn(sample) { + samples = append(samples, sample) + } + } + + if len(samples) > 0 { + ret = append(ret, &series{ + name: entry.name, + ref: entry.ref, + samples: samples, + }) + } + } + + return ret +} + +func (s seriesList) SeriesNames() []string { + names := make([]string, 0, len(s)) + for _, series := range s { + names = append(names, series.name) + } + return names +} + +// ExpectedSamples returns the list of expected samples, sorted by ref ID and timestamp +func (s seriesList) ExpectedSamples() []record.RefSample { + expect := []record.RefSample{} + for _, series := range s { + for _, sample := range series.samples { + expect = append(expect, record.RefSample{ + Ref: *series.ref, + T: sample.ts, + V: sample.val, + }) + } + } + sort.Sort(byRefSample(expect)) + return expect +} + +type byRefSample []record.RefSample + +func (b byRefSample) Len() int { return len(b) } +func (b byRefSample) Swap(i, j int) { b[i], b[j] = b[j], b[i] } +func (b byRefSample) Less(i, j int) bool { + if b[i].Ref == b[j].Ref { + return b[i].T < b[j].T + } + return b[i].Ref < b[j].Ref +} From f6514234356599acf6319aece790b17c6ef5a208 Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Mon, 26 Apr 2021 11:45:55 +0100 Subject: [PATCH 02/18] Set up remote-write config and test skeleton Signed-off-by: Michael Okoko --- cmd/thanos/config.go | 12 ++++++ cmd/thanos/rule.go | 65 +++++++++++++++++++++++-------- test/e2e/e2ethanos/services.go | 43 ++++++++++++--------- test/e2e/rule_test.go | 70 +++++++++++++++++++++++++++++++++- test/e2e/rules_api_test.go | 4 +- 5 files changed, 158 insertions(+), 36 deletions(-) diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index 3770389a68..f29e983b10 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -224,3 +224,15 @@ func (ac *alertMgrConfig) registerFlag(cmd extflag.FlagClause) *alertMgrConfig { ac.alertRelabelConfigPath = extflag.RegisterPathOrContent(cmd, "alert.relabel-config", "YAML file that contains alert relabelling configuration.", extflag.WithEnvSubstitution()) return ac } + +type ruleRWConfig struct { + remoteWrite bool + remoteWriteConfig *extflag.PathOrContent +} + +func (rc *ruleRWConfig) registerFlag(cmd extflag.FlagClause) *ruleRWConfig { + cmd.Flag("remote-write", "If true, directs ruler to remote-write evaluated samples to the server configured by 'remote-write.config'."). + BoolVar(&rc.remoteWrite) + rc.remoteWriteConfig = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write server where samples should be sent to. See https://thanos.io/tip/components/rule.md/#query-api", false) + return rc +} diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 6a6cb9cd67..a837506d9b 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -15,6 +15,9 @@ import ( "strings" "time" + "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/rules/remotewrite" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" @@ -75,6 +78,9 @@ type ruleConfig struct { alertQueryURL *url.URL alertRelabelConfigYAML []byte + rwConfig ruleRWConfig + rwConfigYAML []byte + resendDelay time.Duration evalInterval time.Duration ruleFiles []string @@ -90,6 +96,7 @@ func (rc *ruleConfig) registerFlag(cmd extkingpin.FlagClause) { rc.shipper.registerFlag(cmd) rc.query.registerFlag(cmd) rc.alertmgr.registerFlag(cmd) + rc.rwConfig.registerFlag(cmd) } // registerRule registers a rule command. @@ -164,6 +171,14 @@ func registerRule(app *extkingpin.App) { return errors.New("--query/--query.sd-files and --query.config* parameters cannot be defined at the same time") } + // Parse and check remote-write config if it's enabled + if conf.rwConfig.remoteWrite { + conf.rwConfigYAML, err = conf.rwConfig.remoteWriteConfig.Content() + if err != nil { + return err + } + } + // Parse and check alerting configuration. conf.alertmgrsConfigYAML, err = conf.alertmgr.configPath.Content() if err != nil { @@ -319,25 +334,43 @@ func runRule( // Discover and resolve query addresses. addDiscoveryGroups(g, queryClient, conf.query.dnsSDInterval) } - - db, err := tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil) + var ( + appendable storage.Appendable + queryable storage.Queryable + db *tsdb.DB + ) if err != nil { return errors.Wrap(err, "open TSDB") } + if conf.rwConfig.remoteWrite { + rw, err := remotewrite.NewStorage(logger, reg, "jfdlsfsl") + if err != nil { + return errors.Wrap(err, "open WAL storage") + } + appendable = rw + queryable = rw + } else { + db, err := tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil) + if err != nil { + return errors.Wrap(err, "open TSDB") + } - level.Debug(logger).Log("msg", "removing storage lock file if any") - if err := removeLockfileIfAny(logger, conf.dataDir); err != nil { - return errors.Wrap(err, "remove storage lock files") - } + level.Debug(logger).Log("msg", "removing storage lock file if any") + if err := removeLockfileIfAny(logger, conf.dataDir); err != nil { + return errors.Wrap(err, "remove storage lock files") + } - { - done := make(chan struct{}) - g.Add(func() error { - <-done - return db.Close() - }, func(error) { - close(done) - }) + { + done := make(chan struct{}) + g.Add(func() error { + <-done + return db.Close() + }, func(error) { + close(done) + }) + } + appendable = db + queryable = db } // Build the Alertmanager clients. @@ -435,9 +468,9 @@ func runRule( rules.ManagerOptions{ NotifyFunc: notifyFunc, Logger: logger, - Appendable: db, + Appendable: appendable, ExternalURL: nil, - Queryable: db, + Queryable: queryable, ResendDelay: conf.resendDelay, }, queryFuncCreator(logger, queryClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings, conf.query.httpMethod), diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index c15c150cbb..b861052a1f 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -460,7 +460,11 @@ func NewRoutingAndIngestingReceiverWithConfigWatcher(sharedDir, networkName, nam return receiver, nil } -func NewRuler(sharedDir, name, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config) (*Service, error) { +func NewTSDBRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config) (*Service, error) { + return NewRuler(sharedDir, name, ruleSubDir, amCfg, queryCfg, false) +} + +func NewRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config, remoteWrite bool) (*Service, error) { dir := filepath.Join(sharedDir, "data", "rule", name) container := filepath.Join(e2e.ContainerSharedDir, "data", "rule", name) if err := os.MkdirAll(dir, 0750); err != nil { @@ -479,25 +483,30 @@ func NewRuler(sharedDir, name, ruleSubDir string, amCfg []alert.AlertmanagerConf return nil, errors.Wrapf(err, "generate query file: %v", queryCfg) } + ruleArgs := map[string]string{ + "--debug.name": fmt.Sprintf("rule-%v", name), + "--grpc-address": ":9091", + "--grpc-grace-period": "0s", + "--http-address": ":8080", + "--label": fmt.Sprintf(`replica="%s"`, name), + "--data-dir": container, + "--rule-file": filepath.Join(e2e.ContainerSharedDir, ruleSubDir, "*.yaml"), + "--eval-interval": "3s", + "--alertmanagers.config": string(amCfgBytes), + "--alertmanagers.sd-dns-interval": "1s", + "--log.level": infoLogLevel, + "--query.config": string(queryCfgBytes), + "--query.sd-dns-interval": "1s", + "--resend-delay": "5s", + } + if remoteWrite { + ruleArgs["--remote-write"] = "" + } + ruler := NewService( fmt.Sprintf("rule-%v", name), DefaultImage(), - e2e.NewCommand("rule", e2e.BuildArgs(map[string]string{ - "--debug.name": fmt.Sprintf("rule-%v", name), - "--grpc-address": ":9091", - "--grpc-grace-period": "0s", - "--http-address": ":8080", - "--label": fmt.Sprintf(`replica="%s"`, name), - "--data-dir": container, - "--rule-file": filepath.Join(e2e.ContainerSharedDir, ruleSubDir, "*.yaml"), - "--eval-interval": "3s", - "--alertmanagers.config": string(amCfgBytes), - "--alertmanagers.sd-dns-interval": "1s", - "--log.level": infoLogLevel, - "--query.config": string(queryCfgBytes), - "--query.sd-dns-interval": "1s", - "--resend-delay": "5s", - })...), + e2e.NewCommand("rule", e2e.BuildArgs(ruleArgs)...), e2e.NewHTTPReadinessProbe(8080, "/-/ready", 200, 200), 8080, 9091, diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index fb7289ca90..0fc3f40999 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -220,7 +220,7 @@ func TestRule(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(am1, am2)) - r, err := e2ethanos.NewRuler(s.SharedDir(), "1", rulesSubDir, []alert.AlertmanagerConfig{ + r, err := e2ethanos.NewTSDBRuler(s.SharedDir(), "1", rulesSubDir, []alert.AlertmanagerConfig{ { EndpointsConfig: http_util.EndpointsConfig{ FileSDConfigs: []http_util.FileSDConfig{ @@ -435,6 +435,74 @@ func TestRule(t *testing.T) { } } +// TestStatelessRule verifies that Thanos Ruler can be run in stateless mode where it: +// evaluates rules against one/more Queriers. +// record the rule evaluations in a WAL +// the WAL gets replicated to a Receiver endpoint + +func TestStatelessRule(t *testing.T) { + t.Parallel() + + s, err := e2e.NewScenario("e2e_test_rule") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, s)) + + _, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + // Prepare work dirs. + rulesSubDir := filepath.Join("rules") + rulesPath := filepath.Join(s.SharedDir(), rulesSubDir) + testutil.Ok(t, os.MkdirAll(rulesPath, os.ModePerm)) + createRuleFiles(t, rulesPath) + amTargetsSubDir := filepath.Join("rules_am_targets") + testutil.Ok(t, os.MkdirAll(filepath.Join(s.SharedDir(), amTargetsSubDir), os.ModePerm)) + queryTargetsSubDir := filepath.Join("rules_query_targets") + testutil.Ok(t, os.MkdirAll(filepath.Join(s.SharedDir(), queryTargetsSubDir), os.ModePerm)) + + am1, err := e2ethanos.NewAlertmanager(s.SharedDir(), "1") + testutil.Ok(t, err) + am2, err := e2ethanos.NewAlertmanager(s.SharedDir(), "2") + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(am1, am2)) + + r, err := e2ethanos.NewRuler(s.SharedDir(), "1", rulesSubDir, []alert.AlertmanagerConfig{ + { + EndpointsConfig: http_util.EndpointsConfig{ + FileSDConfigs: []http_util.FileSDConfig{ + { + // FileSD which will be used to register discover dynamically am1. + Files: []string{filepath.Join(e2e.ContainerSharedDir, amTargetsSubDir, "*.yaml")}, + RefreshInterval: model.Duration(time.Second), + }, + }, + StaticAddresses: []string{ + am2.NetworkHTTPEndpoint(), + }, + Scheme: "http", + }, + Timeout: model.Duration(time.Second), + APIVersion: alert.APIv1, + }, + }, []query.Config{ + { + EndpointsConfig: http_util.EndpointsConfig{ + // We test Statically Addressed queries in other tests. Focus on FileSD here. + FileSDConfigs: []http_util.FileSDConfig{ + { + // FileSD which will be used to register discover dynamically q. + Files: []string{filepath.Join(e2e.ContainerSharedDir, queryTargetsSubDir, "*.yaml")}, + RefreshInterval: model.Duration(time.Second), + }, + }, + Scheme: "http", + }, + }, + }, true) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(r)) +} + // Test Ruler behavior on different storepb.PartialResponseStrategy when having partial response from single `failingStoreAPI`. func TestRulePartialResponse(t *testing.T) { t.Skip("TODO: Allow HTTP ports from binaries running on host to be accessible.") diff --git a/test/e2e/rules_api_test.go b/test/e2e/rules_api_test.go index 86f97cdf18..10f111fa68 100644 --- a/test/e2e/rules_api_test.go +++ b/test/e2e/rules_api_test.go @@ -66,9 +66,9 @@ func TestRulesAPI_Fanout(t *testing.T) { testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) // 2x Rulers. - r1, err := e2ethanos.NewRuler(s.SharedDir(), "rule1", thanosRulesSubDir, nil, nil) + r1, err := e2ethanos.NewTSDBRuler(s.SharedDir(), "rule1", thanosRulesSubDir, nil, nil) testutil.Ok(t, err) - r2, err := e2ethanos.NewRuler(s.SharedDir(), "rule2", thanosRulesSubDir, nil, nil) + r2, err := e2ethanos.NewTSDBRuler(s.SharedDir(), "rule2", thanosRulesSubDir, nil, nil) testutil.Ok(t, err) stores := []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), r1.NetworkEndpointFor(s.NetworkName(), 9091), r2.NetworkEndpointFor(s.NetworkName(), 9091)} From 0e2b82f7c67006a0df3fb837dba5eb3673979da2 Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Tue, 4 May 2021 06:23:20 +0100 Subject: [PATCH 03/18] Setup fanout and related storages for stateless ruler Signed-off-by: Michael Okoko --- cmd/thanos/config.go | 6 +- cmd/thanos/rule.go | 23 +++++-- pkg/rules/remotewrite/remotewrite.go | 98 ++++++++++++++++++++++++++++ 3 files changed, 119 insertions(+), 8 deletions(-) create mode 100644 pkg/rules/remotewrite/remotewrite.go diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index f29e983b10..15fc0eed4f 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -226,13 +226,13 @@ func (ac *alertMgrConfig) registerFlag(cmd extflag.FlagClause) *alertMgrConfig { } type ruleRWConfig struct { - remoteWrite bool - remoteWriteConfig *extflag.PathOrContent + remoteWrite bool + configPath *extflag.PathOrContent } func (rc *ruleRWConfig) registerFlag(cmd extflag.FlagClause) *ruleRWConfig { cmd.Flag("remote-write", "If true, directs ruler to remote-write evaluated samples to the server configured by 'remote-write.config'."). BoolVar(&rc.remoteWrite) - rc.remoteWriteConfig = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write server where samples should be sent to. See https://thanos.io/tip/components/rule.md/#query-api", false) + rc.configPath = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write server where samples should be sent to. See https://thanos.io/tip/components/rule.md/#query-api", false) return rc } diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index a837506d9b..9eaed4da44 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -173,7 +173,7 @@ func registerRule(app *extkingpin.App) { // Parse and check remote-write config if it's enabled if conf.rwConfig.remoteWrite { - conf.rwConfigYAML, err = conf.rwConfig.remoteWriteConfig.Content() + conf.rwConfigYAML, err = conf.rwConfig.configPath.Content() if err != nil { return err } @@ -343,12 +343,25 @@ func runRule( return errors.Wrap(err, "open TSDB") } if conf.rwConfig.remoteWrite { - rw, err := remotewrite.NewStorage(logger, reg, "jfdlsfsl") + conf.rwConfigYAML, err = conf.rwConfig.configPath.Content() if err != nil { - return errors.Wrap(err, "open WAL storage") + return err + } + var rwCfg remotewrite.Config + if len(conf.rwConfigYAML) == 0 { + return errors.New("no --remote-write.config was given") + } + rwCfg, err = remotewrite.LoadRemoteWriteConfig(conf.rwConfigYAML) + if err != nil { + return err + } + walDir := filepath.Join(conf.dataDir, rwCfg.Name) + remoteStore, err := remotewrite.NewFanoutStorage(logger, reg, walDir, rwCfg) + if err != nil { + return errors.Wrap(err, "set up remote-write store for ruler") } - appendable = rw - queryable = rw + appendable = remoteStore + queryable = remoteStore } else { db, err := tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil) if err != nil { diff --git a/pkg/rules/remotewrite/remotewrite.go b/pkg/rules/remotewrite/remotewrite.go new file mode 100644 index 0000000000..e007a86742 --- /dev/null +++ b/pkg/rules/remotewrite/remotewrite.go @@ -0,0 +1,98 @@ +package remotewrite + +import ( + "errors" + "fmt" + "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/scrape" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/remote" + "gopkg.in/yaml.v2" + "sync" + "time" +) + +var ( + managerMtx sync.Mutex +) + +type Config struct { + Name string `yaml:"name"` + RemoteStore *config.RemoteWriteConfig `yaml:"remote_write,omitempty"` + ScrapeConfig *config.ScrapeConfig `yaml:"scrape_config,omitempty"` +} + +func LoadRemoteWriteConfig(configYAML []byte) (Config, error) { + var cfg Config + if err := yaml.Unmarshal(configYAML, &cfg); err != nil { + return cfg, err + } + return cfg, nil +} + +func NewFanoutStorage(logger log.Logger, reg prometheus.Registerer, walDir string, rwConfig Config) (storage.Storage, error) { + walStore, err := NewStorage(logger, reg, walDir) + if err != nil { + return nil, err + } + scrapeMgr := &readyScrapeManager{} + remoteStore := remote.NewStorage(logger, reg, walStore.StartTime, walStore.Directory(), 1*time.Minute, scrapeMgr) + err = remoteStore.ApplyConfig(&config.Config{ + GlobalConfig: config.DefaultGlobalConfig, + RemoteWriteConfigs: []*config.RemoteWriteConfig{rwConfig.RemoteStore}, + }) + if err != nil { + return nil, fmt.Errorf("failed applying config to remote storage: %w", err) + } + fanoutStorage := storage.NewFanout(logger, walStore, remoteStore) + + scrapeManager := newScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) + err = scrapeManager.ApplyConfig(&config.Config{ + GlobalConfig: config.DefaultGlobalConfig, + ScrapeConfigs: []*config.ScrapeConfig{rwConfig.ScrapeConfig}, + }) + if err != nil { + return nil, fmt.Errorf("failed applying config to scrape manager: %w", err) + } + return fanoutStorage, nil +} + +func newScrapeManager(logger log.Logger, app storage.Appendable) *scrape.Manager { + // scrape.NewManager modifies a global variable in Prometheus. To avoid a + // data race of modifying that global, we lock a mutex here briefly. + managerMtx.Lock() + defer managerMtx.Unlock() + return scrape.NewManager(logger, app) +} + +// ErrNotReady is returned when the scrape manager is used but has not been +// initialized yet. +var ErrNotReady = errors.New("scrape manager not ready") + +// readyScrapeManager allows a scrape manager to be retrieved. Even if it's set at a later point in time. +type readyScrapeManager struct { + mtx sync.RWMutex + m *scrape.Manager +} + +// Set the scrape manager. +func (rm *readyScrapeManager) Set(m *scrape.Manager) { + rm.mtx.Lock() + defer rm.mtx.Unlock() + + rm.m = m +} + +// Get the scrape manager. If is not ready, return an error. +func (rm *readyScrapeManager) Get() (*scrape.Manager, error) { + rm.mtx.RLock() + defer rm.mtx.RUnlock() + + if rm.m != nil { + return rm.m, nil + } + + return nil, ErrNotReady +} From 02f8981b4fdcf23403bf81819d97b6ed16d2e604 Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Tue, 4 May 2021 06:23:42 +0100 Subject: [PATCH 04/18] Optionally run ruler in stateless mode Signed-off-by: Michael Okoko --- scripts/quickstart.sh | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/scripts/quickstart.sh b/scripts/quickstart.sh index e99ab8fdd9..fb4395e9d2 100755 --- a/scripts/quickstart.sh +++ b/scripts/quickstart.sh @@ -245,6 +245,19 @@ QUERIER_JAEGER_CONFIG=$( EOF ) +REMOTE_WRITE_FLAGS="" +if [ -n "${STATELESS_RULER_ENABLED}" ]; then + cat >/data/rule-remote-write.yaml <<-EOF + name: "thanos-receivers" + remote_write: + url: "http://127.0.0.1:10908/api/v1/receive" + name: "receive-0" +EOF + + REMOTE_WRITE_FLAGS="--remote-write --remote-write.config-file data/rule-remote-write.yaml + " +fi + # Start Thanos Ruler. ${THANOS_EXECUTABLE} rule \ --data-dir data/ \ @@ -256,6 +269,7 @@ ${THANOS_EXECUTABLE} rule \ --http-address="0.0.0.0:19999" \ --grpc-address="0.0.0.0:19998" \ --label 'rule="true"' \ + "${REMOTE_WRITE_FLAGS}" \ ${OBJSTORECFG} & STORES="${STORES} --store 127.0.0.1:19998" From fc8f1035ebacb18d84721d8c7859a4052eb57782 Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Mon, 10 May 2021 11:13:28 +0100 Subject: [PATCH 05/18] Set up tests and implementations for configuring remote-write for ruler Signed-off-by: Michael Okoko --- pkg/rules/remotewrite/remotewrite.go | 13 +++--- test/e2e/e2ethanos/services.go | 10 ++++- test/e2e/rule_test.go | 62 +++++++++++++++++++++------- 3 files changed, 62 insertions(+), 23 deletions(-) diff --git a/pkg/rules/remotewrite/remotewrite.go b/pkg/rules/remotewrite/remotewrite.go index e007a86742..c47e3f1968 100644 --- a/pkg/rules/remotewrite/remotewrite.go +++ b/pkg/rules/remotewrite/remotewrite.go @@ -3,6 +3,9 @@ package remotewrite import ( "errors" "fmt" + "sync" + "time" + "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config" @@ -10,18 +13,16 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" "gopkg.in/yaml.v2" - "sync" - "time" ) var ( - managerMtx sync.Mutex + managerMtx sync.Mutex ) type Config struct { - Name string `yaml:"name"` - RemoteStore *config.RemoteWriteConfig `yaml:"remote_write,omitempty"` - ScrapeConfig *config.ScrapeConfig `yaml:"scrape_config,omitempty"` + Name string `yaml:"name"` + RemoteStore *config.RemoteWriteConfig `yaml:"remote_write,omitempty"` + ScrapeConfig *config.ScrapeConfig `yaml:"scrape_config,omitempty"` } func LoadRemoteWriteConfig(configYAML []byte) (Config, error) { diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index b861052a1f..b9a0099206 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -6,6 +6,7 @@ package e2ethanos import ( "encoding/json" "fmt" + "github.com/thanos-io/thanos/pkg/rules/remotewrite" "io/ioutil" "os" "path/filepath" @@ -461,10 +462,10 @@ func NewRoutingAndIngestingReceiverWithConfigWatcher(sharedDir, networkName, nam } func NewTSDBRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config) (*Service, error) { - return NewRuler(sharedDir, name, ruleSubDir, amCfg, queryCfg, false) + return NewRuler(sharedDir, name, ruleSubDir, amCfg, queryCfg, false, remotewrite.Config{}) } -func NewRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config, remoteWrite bool) (*Service, error) { +func NewRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config, remoteWrite bool, remoteWriteCfg remotewrite.Config) (*Service, error) { dir := filepath.Join(sharedDir, "data", "rule", name) container := filepath.Join(e2e.ContainerSharedDir, "data", "rule", name) if err := os.MkdirAll(dir, 0750); err != nil { @@ -500,7 +501,12 @@ func NewRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.Al "--resend-delay": "5s", } if remoteWrite { + rwCfgBytes, err := yaml.Marshal(remoteWriteCfg) + if err != nil { + return nil, errors.Wrapf(err, "generate remote write config: %v", remoteWriteCfg) + } ruleArgs["--remote-write"] = "" + ruleArgs["--remote-write.config"] = string(rwCfgBytes) } ruler := NewService( diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 0fc3f40999..af6b6777f1 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -8,6 +8,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/thanos-io/thanos/pkg/rules/remotewrite" "io/ioutil" "net/http" "os" @@ -440,32 +441,62 @@ func TestRule(t *testing.T) { // record the rule evaluations in a WAL // the WAL gets replicated to a Receiver endpoint -func TestStatelessRule(t *testing.T) { - t.Parallel() - s, err := e2e.NewScenario("e2e_test_rule") +func TestRule_CanRemoteWriteData(t *testing.T) { + s, err := e2e.NewScenario("e2e_test_rule_remote_write") testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, s)) - _, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - t.Cleanup(cancel) - - // Prepare work dirs. + // create rule files rulesSubDir := filepath.Join("rules") rulesPath := filepath.Join(s.SharedDir(), rulesSubDir) testutil.Ok(t, os.MkdirAll(rulesPath, os.ModePerm)) - createRuleFiles(t, rulesPath) - amTargetsSubDir := filepath.Join("rules_am_targets") + testAlertRuleRecordAbsentMetric := ` +record: test_absent_metric +expr: absent(nonexistent{job='thanos-receive'}) +labels: + severity: page +annotations: + summary: "tesemole hahaha" +` + createRuleFile(t, filepath.Join(rulesPath, "rw_rule-0.yaml"), testAlertRuleRecordAbsentMetric) + amTargetsSubDir := filepath.Join("rw_rules_am_targets") testutil.Ok(t, os.MkdirAll(filepath.Join(s.SharedDir(), amTargetsSubDir), os.ModePerm)) - queryTargetsSubDir := filepath.Join("rules_query_targets") + queryTargetsSubDir := filepath.Join("rw_rules_query_targets") testutil.Ok(t, os.MkdirAll(filepath.Join(s.SharedDir(), queryTargetsSubDir), os.ModePerm)) - am1, err := e2ethanos.NewAlertmanager(s.SharedDir(), "1") + receiver, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1) testutil.Ok(t, err) - am2, err := e2ethanos.NewAlertmanager(s.SharedDir(), "2") + testutil.Ok(t, s.StartAndWaitReady(receiver)) + + querier, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{receiver.GRPCNetworkEndpoint()}, []string{receiver.GRPCNetworkEndpoint()}, nil, nil, nil, nil, "", "") testutil.Ok(t, err) - testutil.Ok(t, s.StartAndWaitReady(am1, am2)) + testutil.Ok(t, s.StartAndWaitReady(querier)) + + ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Minute) + t.Cleanup(cancel) + + // check that querier can talk to the receiver + t.Run("can query from receiver", func(t *testing.T) { + testAbsentAlert := "absent(nonexistent{job='thanos-receive'})" + queryAndAssertSeries(t, ctx, querier.HTTPEndpoint(), testAbsentAlert, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{ + { + "job": "thanos-receive", + }, + }) + }) + am, err := e2ethanos.NewAlertmanager(s.SharedDir(), "1") + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(am)) + + rwURL := e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(8081)) + fmt.Println(rwURL) + testutil.Ok(t, err) + + fmt.Println("AlertManager URL: ", am.HTTPPort()) r, err := e2ethanos.NewRuler(s.SharedDir(), "1", rulesSubDir, []alert.AlertmanagerConfig{ { EndpointsConfig: http_util.EndpointsConfig{ @@ -477,7 +508,7 @@ func TestStatelessRule(t *testing.T) { }, }, StaticAddresses: []string{ - am2.NetworkHTTPEndpoint(), + am.NetworkHTTPEndpoint(), }, Scheme: "http", }, @@ -498,9 +529,10 @@ func TestStatelessRule(t *testing.T) { Scheme: "http", }, }, - }, true) + }, true, remotewrite.Config{}) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(r)) + time.Sleep(5 * time.Minute) } // Test Ruler behavior on different storepb.PartialResponseStrategy when having partial response from single `failingStoreAPI`. From 44b621912ecc175e3636dfe3eea1a4393c82a1e8 Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Fri, 14 May 2021 12:48:54 +0100 Subject: [PATCH 06/18] Implement stub querier for WAL storage to fix nil pointer error Signed-off-by: Michael Okoko --- pkg/rules/remotewrite/remotewrite.go | 60 +-------------------- pkg/rules/remotewrite/wal.go | 22 ++++++++ test/e2e/rule_test.go | 78 ++++++++++++---------------- 3 files changed, 56 insertions(+), 104 deletions(-) diff --git a/pkg/rules/remotewrite/remotewrite.go b/pkg/rules/remotewrite/remotewrite.go index c47e3f1968..dd27cb139d 100644 --- a/pkg/rules/remotewrite/remotewrite.go +++ b/pkg/rules/remotewrite/remotewrite.go @@ -1,28 +1,20 @@ package remotewrite import ( - "errors" "fmt" - "sync" "time" "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" "gopkg.in/yaml.v2" ) -var ( - managerMtx sync.Mutex -) - type Config struct { Name string `yaml:"name"` RemoteStore *config.RemoteWriteConfig `yaml:"remote_write,omitempty"` - ScrapeConfig *config.ScrapeConfig `yaml:"scrape_config,omitempty"` } func LoadRemoteWriteConfig(configYAML []byte) (Config, error) { @@ -38,8 +30,7 @@ func NewFanoutStorage(logger log.Logger, reg prometheus.Registerer, walDir strin if err != nil { return nil, err } - scrapeMgr := &readyScrapeManager{} - remoteStore := remote.NewStorage(logger, reg, walStore.StartTime, walStore.Directory(), 1*time.Minute, scrapeMgr) + remoteStore := remote.NewStorage(logger, reg, walStore.StartTime, walStore.Directory(), 1*time.Minute, nil) err = remoteStore.ApplyConfig(&config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: []*config.RemoteWriteConfig{rwConfig.RemoteStore}, @@ -48,52 +39,5 @@ func NewFanoutStorage(logger log.Logger, reg prometheus.Registerer, walDir strin return nil, fmt.Errorf("failed applying config to remote storage: %w", err) } fanoutStorage := storage.NewFanout(logger, walStore, remoteStore) - - scrapeManager := newScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) - err = scrapeManager.ApplyConfig(&config.Config{ - GlobalConfig: config.DefaultGlobalConfig, - ScrapeConfigs: []*config.ScrapeConfig{rwConfig.ScrapeConfig}, - }) - if err != nil { - return nil, fmt.Errorf("failed applying config to scrape manager: %w", err) - } return fanoutStorage, nil -} - -func newScrapeManager(logger log.Logger, app storage.Appendable) *scrape.Manager { - // scrape.NewManager modifies a global variable in Prometheus. To avoid a - // data race of modifying that global, we lock a mutex here briefly. - managerMtx.Lock() - defer managerMtx.Unlock() - return scrape.NewManager(logger, app) -} - -// ErrNotReady is returned when the scrape manager is used but has not been -// initialized yet. -var ErrNotReady = errors.New("scrape manager not ready") - -// readyScrapeManager allows a scrape manager to be retrieved. Even if it's set at a later point in time. -type readyScrapeManager struct { - mtx sync.RWMutex - m *scrape.Manager -} - -// Set the scrape manager. -func (rm *readyScrapeManager) Set(m *scrape.Manager) { - rm.mtx.Lock() - defer rm.mtx.Unlock() - - rm.m = m -} - -// Get the scrape manager. If is not ready, return an error. -func (rm *readyScrapeManager) Get() (*scrape.Manager, error) { - rm.mtx.RLock() - defer rm.mtx.RUnlock() - - if rm.m != nil { - return rm.m, nil - } - - return nil, ErrNotReady -} +} \ No newline at end of file diff --git a/pkg/rules/remotewrite/wal.go b/pkg/rules/remotewrite/wal.go index a7851cff09..40ac340960 100644 --- a/pkg/rules/remotewrite/wal.go +++ b/pkg/rules/remotewrite/wal.go @@ -357,6 +357,10 @@ func (w *Storage) Appender(_ context.Context) storage.Appender { return w.appenderPool.Get().(storage.Appender) } +func (w *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return &remoteWriteQueryable{}, nil +} + // StartTime always returns 0, nil. It is implemented for compatibility with // Prometheus, but is unused in the agent. func (*Storage) StartTime() (int64, error) { @@ -676,3 +680,21 @@ func (a *appender) Rollback() error { a.w.appenderPool.Put(a) return nil } + +type remoteWriteQueryable struct{} + +func (r *remoteWriteQueryable) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { + return nil, nil, nil +} + +func (r *remoteWriteQueryable) LabelNames() ([]string, storage.Warnings, error) { + return nil, nil, nil +} + +func (r *remoteWriteQueryable) Close() error { + return nil +} + +func (r *remoteWriteQueryable) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + return nil +} diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index af6b6777f1..a5d8750c0f 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -8,9 +8,12 @@ import ( "context" "encoding/json" "fmt" + commoncfg "github.com/prometheus/common/config" + "github.com/prometheus/prometheus/config" "github.com/thanos-io/thanos/pkg/rules/remotewrite" "io/ioutil" "net/http" + "net/url" "os" "path/filepath" "testing" @@ -436,67 +439,43 @@ func TestRule(t *testing.T) { } } -// TestStatelessRule verifies that Thanos Ruler can be run in stateless mode where it: -// evaluates rules against one/more Queriers. -// record the rule evaluations in a WAL -// the WAL gets replicated to a Receiver endpoint - - func TestRule_CanRemoteWriteData(t *testing.T) { + testAlertRuleRecordAbsentMetric := ` +groups: +- name: example_record_rules + interval: 100ms + rules: + - record: test_absent_metric + expr: absent(nonexistent{job='thanos-receive'}) +` + s, err := e2e.NewScenario("e2e_test_rule_remote_write") testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, s)) - // create rule files + _, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + t.Cleanup(cancel) + + // Prepare work dirs. rulesSubDir := filepath.Join("rules") rulesPath := filepath.Join(s.SharedDir(), rulesSubDir) testutil.Ok(t, os.MkdirAll(rulesPath, os.ModePerm)) - testAlertRuleRecordAbsentMetric := ` -record: test_absent_metric -expr: absent(nonexistent{job='thanos-receive'}) -labels: - severity: page -annotations: - summary: "tesemole hahaha" -` - createRuleFile(t, filepath.Join(rulesPath, "rw_rule-0.yaml"), testAlertRuleRecordAbsentMetric) - amTargetsSubDir := filepath.Join("rw_rules_am_targets") + createRuleFile(t, filepath.Join(rulesPath, fmt.Sprintf("rules-0.yaml")), testAlertRuleRecordAbsentMetric) + amTargetsSubDir := filepath.Join("rules_am_targets") testutil.Ok(t, os.MkdirAll(filepath.Join(s.SharedDir(), amTargetsSubDir), os.ModePerm)) - queryTargetsSubDir := filepath.Join("rw_rules_query_targets") + queryTargetsSubDir := filepath.Join("rules_query_targets") testutil.Ok(t, os.MkdirAll(filepath.Join(s.SharedDir(), queryTargetsSubDir), os.ModePerm)) - receiver, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1) testutil.Ok(t, err) - testutil.Ok(t, s.StartAndWaitReady(receiver)) - - querier, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{receiver.GRPCNetworkEndpoint()}, []string{receiver.GRPCNetworkEndpoint()}, nil, nil, nil, nil, "", "") + am2, err := e2ethanos.NewAlertmanager(s.SharedDir(), "2") testutil.Ok(t, err) - testutil.Ok(t, s.StartAndWaitReady(querier)) + testutil.Ok(t, s.StartAndWaitReady(am2)) - ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Minute) - t.Cleanup(cancel) - - // check that querier can talk to the receiver - t.Run("can query from receiver", func(t *testing.T) { - testAbsentAlert := "absent(nonexistent{job='thanos-receive'})" - queryAndAssertSeries(t, ctx, querier.HTTPEndpoint(), testAbsentAlert, promclient.QueryOptions{ - Deduplicate: false, - }, []model.Metric{ - { - "job": "thanos-receive", - }, - }) - }) - am, err := e2ethanos.NewAlertmanager(s.SharedDir(), "1") + //todo: replace am2 with actual receiver + rwURL, err := url.Parse(e2ethanos.RemoteWriteEndpoint(am2.NetworkHTTPEndpoint())) testutil.Ok(t, err) - testutil.Ok(t, s.StartAndWaitReady(am)) - rwURL := e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(8081)) - fmt.Println(rwURL) - testutil.Ok(t, err) - - fmt.Println("AlertManager URL: ", am.HTTPPort()) r, err := e2ethanos.NewRuler(s.SharedDir(), "1", rulesSubDir, []alert.AlertmanagerConfig{ { EndpointsConfig: http_util.EndpointsConfig{ @@ -508,7 +487,7 @@ annotations: }, }, StaticAddresses: []string{ - am.NetworkHTTPEndpoint(), + am2.NetworkHTTPEndpoint(), }, Scheme: "http", }, @@ -529,9 +508,16 @@ annotations: Scheme: "http", }, }, - }, true, remotewrite.Config{}) + }, true, remotewrite.Config{ + Name: "ruler-rw-receivers", + RemoteStore: &config.RemoteWriteConfig{ + URL: &commoncfg.URL{URL: rwURL}, + Name: "thanos-receiver", + }, + }) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(r)) + time.Sleep(5 * time.Minute) } From a5e3a786165b87c2af5f5bb861e87588d91e591c Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Fri, 14 May 2021 14:35:35 +0100 Subject: [PATCH 07/18] Setup e2e test for stateless ruler Signed-off-by: Michael Okoko --- test/e2e/rule_test.go | 71 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 61 insertions(+), 10 deletions(-) diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index a5d8750c0f..51c5cb5528 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -440,6 +440,8 @@ func TestRule(t *testing.T) { } func TestRule_CanRemoteWriteData(t *testing.T) { + t.Parallel() + testAlertRuleRecordAbsentMetric := ` groups: - name: example_record_rules @@ -453,7 +455,7 @@ groups: testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, s)) - _, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) t.Cleanup(cancel) // Prepare work dirs. @@ -466,16 +468,19 @@ groups: queryTargetsSubDir := filepath.Join("rules_query_targets") testutil.Ok(t, os.MkdirAll(filepath.Join(s.SharedDir(), queryTargetsSubDir), os.ModePerm)) - testutil.Ok(t, err) - am2, err := e2ethanos.NewAlertmanager(s.SharedDir(), "2") - testutil.Ok(t, err) - testutil.Ok(t, s.StartAndWaitReady(am2)) - - //todo: replace am2 with actual receiver - rwURL, err := url.Parse(e2ethanos.RemoteWriteEndpoint(am2.NetworkHTTPEndpoint())) + am, err := e2ethanos.NewAlertmanager(s.SharedDir(), "1") testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(am)) + receiver, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(receiver)) + rwURL, err := url.Parse(e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(8081))) + testutil.Ok(t, err) + querier, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{receiver.GRPCNetworkEndpoint()}).Build() + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(querier)) r, err := e2ethanos.NewRuler(s.SharedDir(), "1", rulesSubDir, []alert.AlertmanagerConfig{ { EndpointsConfig: http_util.EndpointsConfig{ @@ -487,7 +492,7 @@ groups: }, }, StaticAddresses: []string{ - am2.NetworkHTTPEndpoint(), + am.NetworkHTTPEndpoint(), }, Scheme: "http", }, @@ -518,7 +523,53 @@ groups: testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(r)) - time.Sleep(5 * time.Minute) + writeTargets(t, filepath.Join(s.SharedDir(), queryTargetsSubDir, "targets.yaml"), querier.NetworkHTTPEndpoint()) + writeTargets(t, filepath.Join(s.SharedDir(), amTargetsSubDir, "targets.yaml"), am.NetworkHTTPEndpoint()) + + t.Run("inject samples into receiver to reset its StoreAPI MinTime", func(t *testing.T) { + // inject data into receiver to reset its minTime (so it doesn't get filtered out by store) + // the sample is injected through a prometheus instance that remote_writes samples into the receiver node + prom, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom", 0, e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(prom)) + + queryAndAssertSeries(t, ctx, querier.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{ + { + "job":"myself", + "prometheus": "prom", + "receive": "1", + "replica": "0", + "tenant_id": "default-tenant", + }, + }) + }) + + t.Run("query can contact from receiver", func(t *testing.T) { + testAbsentQuery := "absent(nonexistent{job='thanos-receive'})" + queryAndAssertSeries(t, ctx, querier.HTTPEndpoint(), testAbsentQuery, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{ + { + "job": "thanos-receive", + }, + }) + }) + + t.Run("can fetch remote-written samples from receiver", func(t *testing.T) { + testRecordedSamples := "test_absent_metric" + queryAndAssertSeries(t, ctx, querier.HTTPEndpoint(), testRecordedSamples, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{ + { + "__name__": "test_absent_metric", + "job":"thanos-receive", + "receive": "1", + "tenant_id": "default-tenant", + }, + }) + }) } // Test Ruler behavior on different storepb.PartialResponseStrategy when having partial response from single `failingStoreAPI`. From f05fabd2afc1743b731ad2d1cd56191830c2a91d Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Sat, 22 May 2021 13:34:45 +0100 Subject: [PATCH 08/18] Add copied code commentary to remotewrite packages Signed-off-by: Michael Okoko --- pkg/rules/remotewrite/remotewrite.go | 8 +++----- pkg/rules/remotewrite/series.go | 4 ++-- pkg/rules/remotewrite/util.go | 2 ++ pkg/rules/remotewrite/wal.go | 2 ++ pkg/rules/remotewrite/wal_test.go | 2 ++ 5 files changed, 11 insertions(+), 7 deletions(-) diff --git a/pkg/rules/remotewrite/remotewrite.go b/pkg/rules/remotewrite/remotewrite.go index dd27cb139d..7a9dc841e4 100644 --- a/pkg/rules/remotewrite/remotewrite.go +++ b/pkg/rules/remotewrite/remotewrite.go @@ -31,13 +31,11 @@ func NewFanoutStorage(logger log.Logger, reg prometheus.Registerer, walDir strin return nil, err } remoteStore := remote.NewStorage(logger, reg, walStore.StartTime, walStore.Directory(), 1*time.Minute, nil) - err = remoteStore.ApplyConfig(&config.Config{ + if err := remoteStore.ApplyConfig(&config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: []*config.RemoteWriteConfig{rwConfig.RemoteStore}, - }) - if err != nil { + }); err != nil { return nil, fmt.Errorf("failed applying config to remote storage: %w", err) } - fanoutStorage := storage.NewFanout(logger, walStore, remoteStore) - return fanoutStorage, nil + return storage.NewFanout(logger, walStore, remoteStore), nil } \ No newline at end of file diff --git a/pkg/rules/remotewrite/series.go b/pkg/rules/remotewrite/series.go index f75d415ebb..d82a31e1a2 100644 --- a/pkg/rules/remotewrite/series.go +++ b/pkg/rules/remotewrite/series.go @@ -1,3 +1,5 @@ +// This is copied from https://github.com/grafana/agent/blob/a23bd5cf27c2ac99695b7449d38fb12444941a1c/pkg/prom/wal/series.go +// TODO(idoqo): Migrate to prometheus package when https://github.com/prometheus/prometheus/pull/8785 is ready. package remotewrite import ( @@ -70,8 +72,6 @@ func (m seriesHashmap) del(hash uint64, ref uint64) { for _, s := range m[hash] { if s.ref != ref { rem = append(rem, s) - } else { - //intern.ReleaseLabels(intern.Global, s.lset) } } if len(rem) == 0 { diff --git a/pkg/rules/remotewrite/util.go b/pkg/rules/remotewrite/util.go index 3a1e593e0f..b6ddb6b804 100644 --- a/pkg/rules/remotewrite/util.go +++ b/pkg/rules/remotewrite/util.go @@ -1,3 +1,5 @@ +// This is copied from https://github.com/grafana/agent/blob/a23bd5cf27c2ac99695b7449d38fb12444941a1c/pkg/prom/wal/util.go +// TODO(idoqo): Migrate to prometheus package when https://github.com/prometheus/prometheus/pull/8785 is ready. package remotewrite import ( diff --git a/pkg/rules/remotewrite/wal.go b/pkg/rules/remotewrite/wal.go index 40ac340960..6aad91b32f 100644 --- a/pkg/rules/remotewrite/wal.go +++ b/pkg/rules/remotewrite/wal.go @@ -1,3 +1,5 @@ +// This is copied from https://github.com/grafana/agent/blob/a23bd5cf27c2ac99695b7449d38fb12444941a1c/pkg/prom/wal/wal.go +// TODO(idoqo): Migrate to prometheus package when https://github.com/prometheus/prometheus/pull/8785 is ready. package remotewrite import ( diff --git a/pkg/rules/remotewrite/wal_test.go b/pkg/rules/remotewrite/wal_test.go index 84787544eb..d1f9eb684a 100644 --- a/pkg/rules/remotewrite/wal_test.go +++ b/pkg/rules/remotewrite/wal_test.go @@ -1,3 +1,5 @@ +// This is copied from https://github.com/grafana/agent/blob/a23bd5cf27c2ac99695b7449d38fb12444941a1c/pkg/prom/wal/wal_test.go +// TODO(idoqo): Migrate to prometheus package when https://github.com/prometheus/prometheus/pull/8785 is ready. package remotewrite import ( From 7b87e527c51ad8c2d1c6b914d1b1c83502ac2856 Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Mon, 24 May 2021 16:06:00 +0100 Subject: [PATCH 09/18] Use static addresses for am and querier Signed-off-by: Michael Okoko --- pkg/rules/remotewrite/remotewrite.go | 6 +-- test/e2e/e2ethanos/services.go | 3 +- test/e2e/rule_test.go | 66 ++++++++++++---------------- 3 files changed, 33 insertions(+), 42 deletions(-) diff --git a/pkg/rules/remotewrite/remotewrite.go b/pkg/rules/remotewrite/remotewrite.go index 7a9dc841e4..8e258fcff3 100644 --- a/pkg/rules/remotewrite/remotewrite.go +++ b/pkg/rules/remotewrite/remotewrite.go @@ -13,8 +13,8 @@ import ( ) type Config struct { - Name string `yaml:"name"` - RemoteStore *config.RemoteWriteConfig `yaml:"remote_write,omitempty"` + Name string `yaml:"name"` + RemoteStore *config.RemoteWriteConfig `yaml:"remote_write,omitempty"` } func LoadRemoteWriteConfig(configYAML []byte) (Config, error) { @@ -38,4 +38,4 @@ func NewFanoutStorage(logger log.Logger, reg prometheus.Registerer, walDir strin return nil, fmt.Errorf("failed applying config to remote storage: %w", err) } return storage.NewFanout(logger, walStore, remoteStore), nil -} \ No newline at end of file +} diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index b9a0099206..c926e8a847 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -6,7 +6,6 @@ package e2ethanos import ( "encoding/json" "fmt" - "github.com/thanos-io/thanos/pkg/rules/remotewrite" "io/ioutil" "os" "path/filepath" @@ -14,6 +13,8 @@ import ( "strings" "time" + "github.com/thanos-io/thanos/pkg/rules/remotewrite" + "github.com/cortexproject/cortex/integration/e2e" "github.com/grafana/dskit/backoff" "github.com/pkg/errors" diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 51c5cb5528..a21e042005 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -8,17 +8,19 @@ import ( "context" "encoding/json" "fmt" - commoncfg "github.com/prometheus/common/config" - "github.com/prometheus/prometheus/config" - "github.com/thanos-io/thanos/pkg/rules/remotewrite" "io/ioutil" "net/http" "net/url" + "net/http/httptest" "os" "path/filepath" "testing" "time" + commoncfg "github.com/prometheus/common/config" + "github.com/prometheus/prometheus/config" + "github.com/thanos-io/thanos/pkg/rules/remotewrite" + "github.com/cortexproject/cortex/integration/e2e" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/discovery/targetgroup" @@ -439,10 +441,13 @@ func TestRule(t *testing.T) { } } +// TestRule_CanRemoteWriteData checks that Thanos Ruler can be run in stateless mode +// where it remote_writes rule evaluations to a Prometheus remote-write endpoint (typically +// a Thanos Receiver). func TestRule_CanRemoteWriteData(t *testing.T) { t.Parallel() - testAlertRuleRecordAbsentMetric := ` + testRuleRecordAbsentMetric := ` groups: - name: example_record_rules interval: 100ms @@ -458,16 +463,10 @@ groups: ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) t.Cleanup(cancel) - // Prepare work dirs. - rulesSubDir := filepath.Join("rules") + rulesSubDir := "rules" rulesPath := filepath.Join(s.SharedDir(), rulesSubDir) testutil.Ok(t, os.MkdirAll(rulesPath, os.ModePerm)) - createRuleFile(t, filepath.Join(rulesPath, fmt.Sprintf("rules-0.yaml")), testAlertRuleRecordAbsentMetric) - amTargetsSubDir := filepath.Join("rules_am_targets") - testutil.Ok(t, os.MkdirAll(filepath.Join(s.SharedDir(), amTargetsSubDir), os.ModePerm)) - queryTargetsSubDir := filepath.Join("rules_query_targets") - testutil.Ok(t, os.MkdirAll(filepath.Join(s.SharedDir(), queryTargetsSubDir), os.ModePerm)) - + createRuleFile(t, filepath.Join(rulesPath, "rules-0.yaml"), testRuleRecordAbsentMetric) am, err := e2ethanos.NewAlertmanager(s.SharedDir(), "1") testutil.Ok(t, err) @@ -484,13 +483,6 @@ groups: r, err := e2ethanos.NewRuler(s.SharedDir(), "1", rulesSubDir, []alert.AlertmanagerConfig{ { EndpointsConfig: http_util.EndpointsConfig{ - FileSDConfigs: []http_util.FileSDConfig{ - { - // FileSD which will be used to register discover dynamically am1. - Files: []string{filepath.Join(e2e.ContainerSharedDir, amTargetsSubDir, "*.yaml")}, - RefreshInterval: model.Duration(time.Second), - }, - }, StaticAddresses: []string{ am.NetworkHTTPEndpoint(), }, @@ -502,30 +494,22 @@ groups: }, []query.Config{ { EndpointsConfig: http_util.EndpointsConfig{ - // We test Statically Addressed queries in other tests. Focus on FileSD here. - FileSDConfigs: []http_util.FileSDConfig{ - { - // FileSD which will be used to register discover dynamically q. - Files: []string{filepath.Join(e2e.ContainerSharedDir, queryTargetsSubDir, "*.yaml")}, - RefreshInterval: model.Duration(time.Second), - }, + StaticAddresses: []string{ + querier.NetworkHTTPEndpoint(), }, Scheme: "http", }, }, - }, true, remotewrite.Config{ + }, true, remotewrite.Config{ Name: "ruler-rw-receivers", RemoteStore: &config.RemoteWriteConfig{ - URL: &commoncfg.URL{URL: rwURL}, + URL: &commoncfg.URL{URL: rwURL}, Name: "thanos-receiver", }, }) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(r)) - writeTargets(t, filepath.Join(s.SharedDir(), queryTargetsSubDir, "targets.yaml"), querier.NetworkHTTPEndpoint()) - writeTargets(t, filepath.Join(s.SharedDir(), amTargetsSubDir, "targets.yaml"), am.NetworkHTTPEndpoint()) - t.Run("inject samples into receiver to reset its StoreAPI MinTime", func(t *testing.T) { // inject data into receiver to reset its minTime (so it doesn't get filtered out by store) // the sample is injected through a prometheus instance that remote_writes samples into the receiver node @@ -537,11 +521,11 @@ groups: Deduplicate: false, }, []model.Metric{ { - "job":"myself", + "job": "myself", "prometheus": "prom", - "receive": "1", - "replica": "0", - "tenant_id": "default-tenant", + "receive": "1", + "replica": "0", + "tenant_id": "default-tenant", }, }) }) @@ -563,15 +547,21 @@ groups: Deduplicate: false, }, []model.Metric{ { - "__name__": "test_absent_metric", - "job":"thanos-receive", - "receive": "1", + "__name__": "test_absent_metric", + "job": "thanos-receive", + "receive": "1", "tenant_id": "default-tenant", }, }) }) } +// TestRule_CanPersistWALData checks that in stateless mode, Thanos Ruler can persist rule evaluations +// which couldn't be sent to the remote write endpoint (e.g because receiver isn't available). +func TestRule_CanPersistWALData(t *testing.T) { + //TODO: Implement test with unavailable remote-write endpoint(receiver) +} + // Test Ruler behavior on different storepb.PartialResponseStrategy when having partial response from single `failingStoreAPI`. func TestRulePartialResponse(t *testing.T) { t.Skip("TODO: Allow HTTP ports from binaries running on host to be accessible.") From 2729902dc09608b20c22e639108fa236271d8b49 Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Tue, 1 Jun 2021 02:10:05 +0100 Subject: [PATCH 10/18] Remove need for separate remote-write flag for stateless ruler This removes the need to pass a separate `remote-write` flag to ruler to enable stateless mode. Instead, we now check if a remote-write config is provided and automatically enables stateless mode based off that. Ruler test is also cleaned up to remove unnecessary tests (i.e those that have been performed by other e2e suites). Signed-off-by: Michael Okoko --- cmd/thanos/config.go | 7 ++---- cmd/thanos/rule.go | 10 ++++----- test/e2e/e2ethanos/services.go | 11 ++++++---- test/e2e/rule_test.go | 40 ++++------------------------------ test/e2e/rules_api_test.go | 4 ++-- 5 files changed, 19 insertions(+), 53 deletions(-) diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index 15fc0eed4f..5760984b3d 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -226,13 +226,10 @@ func (ac *alertMgrConfig) registerFlag(cmd extflag.FlagClause) *alertMgrConfig { } type ruleRWConfig struct { - remoteWrite bool - configPath *extflag.PathOrContent + configPath *extflag.PathOrContent } func (rc *ruleRWConfig) registerFlag(cmd extflag.FlagClause) *ruleRWConfig { - cmd.Flag("remote-write", "If true, directs ruler to remote-write evaluated samples to the server configured by 'remote-write.config'."). - BoolVar(&rc.remoteWrite) - rc.configPath = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write server where samples should be sent to. See https://thanos.io/tip/components/rule.md/#query-api", false) + rc.configPath = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write server where samples should be sent to. This automatically enables stateless mode for ruler and no series will be stored in the ruler's TSDB. See https://thanos.io/tip/components/rule.md/#query-api", false) return rc } diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 9eaed4da44..9a95290a1e 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -171,8 +171,8 @@ func registerRule(app *extkingpin.App) { return errors.New("--query/--query.sd-files and --query.config* parameters cannot be defined at the same time") } - // Parse and check remote-write config if it's enabled - if conf.rwConfig.remoteWrite { + // Parse and check remote-write config and enable stateless mode for ruler. + if conf.rwConfig.configPath != nil { conf.rwConfigYAML, err = conf.rwConfig.configPath.Content() if err != nil { return err @@ -339,10 +339,8 @@ func runRule( queryable storage.Queryable db *tsdb.DB ) - if err != nil { - return errors.Wrap(err, "open TSDB") - } - if conf.rwConfig.remoteWrite { + + if conf.rwConfig.configPath != nil { conf.rwConfigYAML, err = conf.rwConfig.configPath.Content() if err != nil { return err diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index c926e8a847..38a68736c4 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -463,10 +463,14 @@ func NewRoutingAndIngestingReceiverWithConfigWatcher(sharedDir, networkName, nam } func NewTSDBRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config) (*Service, error) { - return NewRuler(sharedDir, name, ruleSubDir, amCfg, queryCfg, false, remotewrite.Config{}) + return newRuler(sharedDir, name, ruleSubDir, amCfg, queryCfg, nil) } -func NewRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config, remoteWrite bool, remoteWriteCfg remotewrite.Config) (*Service, error) { +func NewStatelessRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config, remoteWriteCfg *remotewrite.Config) (*Service, error) { + return newRuler(sharedDir, name, ruleSubDir, amCfg, queryCfg, remoteWriteCfg) +} + +func newRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config, remoteWriteCfg *remotewrite.Config) (*Service, error) { dir := filepath.Join(sharedDir, "data", "rule", name) container := filepath.Join(e2e.ContainerSharedDir, "data", "rule", name) if err := os.MkdirAll(dir, 0750); err != nil { @@ -501,12 +505,11 @@ func NewRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.Al "--query.sd-dns-interval": "1s", "--resend-delay": "5s", } - if remoteWrite { + if remoteWriteCfg != nil { rwCfgBytes, err := yaml.Marshal(remoteWriteCfg) if err != nil { return nil, errors.Wrapf(err, "generate remote write config: %v", remoteWriteCfg) } - ruleArgs["--remote-write"] = "" ruleArgs["--remote-write.config"] = string(rwCfgBytes) } diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index a21e042005..e1bad0c9f9 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -11,13 +11,12 @@ import ( "io/ioutil" "net/http" "net/url" - "net/http/httptest" "os" "path/filepath" "testing" "time" - commoncfg "github.com/prometheus/common/config" + common_cfg "github.com/prometheus/common/config" "github.com/prometheus/prometheus/config" "github.com/thanos-io/thanos/pkg/rules/remotewrite" @@ -480,7 +479,7 @@ groups: querier, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{receiver.GRPCNetworkEndpoint()}).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(querier)) - r, err := e2ethanos.NewRuler(s.SharedDir(), "1", rulesSubDir, []alert.AlertmanagerConfig{ + r, err := e2ethanos.NewStatelessRuler(s.SharedDir(), "1", rulesSubDir, []alert.AlertmanagerConfig{ { EndpointsConfig: http_util.EndpointsConfig{ StaticAddresses: []string{ @@ -500,47 +499,16 @@ groups: Scheme: "http", }, }, - }, true, remotewrite.Config{ + }, &remotewrite.Config{ Name: "ruler-rw-receivers", RemoteStore: &config.RemoteWriteConfig{ - URL: &commoncfg.URL{URL: rwURL}, + URL: &common_cfg.URL{URL: rwURL}, Name: "thanos-receiver", }, }) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(r)) - t.Run("inject samples into receiver to reset its StoreAPI MinTime", func(t *testing.T) { - // inject data into receiver to reset its minTime (so it doesn't get filtered out by store) - // the sample is injected through a prometheus instance that remote_writes samples into the receiver node - prom, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom", 0, e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) - testutil.Ok(t, err) - testutil.Ok(t, s.StartAndWaitReady(prom)) - - queryAndAssertSeries(t, ctx, querier.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ - Deduplicate: false, - }, []model.Metric{ - { - "job": "myself", - "prometheus": "prom", - "receive": "1", - "replica": "0", - "tenant_id": "default-tenant", - }, - }) - }) - - t.Run("query can contact from receiver", func(t *testing.T) { - testAbsentQuery := "absent(nonexistent{job='thanos-receive'})" - queryAndAssertSeries(t, ctx, querier.HTTPEndpoint(), testAbsentQuery, promclient.QueryOptions{ - Deduplicate: false, - }, []model.Metric{ - { - "job": "thanos-receive", - }, - }) - }) - t.Run("can fetch remote-written samples from receiver", func(t *testing.T) { testRecordedSamples := "test_absent_metric" queryAndAssertSeries(t, ctx, querier.HTTPEndpoint(), testRecordedSamples, promclient.QueryOptions{ diff --git a/test/e2e/rules_api_test.go b/test/e2e/rules_api_test.go index 10f111fa68..2a31b2d3a9 100644 --- a/test/e2e/rules_api_test.go +++ b/test/e2e/rules_api_test.go @@ -88,9 +88,9 @@ func TestRulesAPI_Fanout(t *testing.T) { } // Recreate rulers with the corresponding query config. - r1, err = e2ethanos.NewRuler(s.SharedDir(), "rule1", thanosRulesSubDir, nil, queryCfg) + r1, err = e2ethanos.NewTSDBRuler(s.SharedDir(), "rule1", thanosRulesSubDir, nil, queryCfg) testutil.Ok(t, err) - r2, err = e2ethanos.NewRuler(s.SharedDir(), "rule2", thanosRulesSubDir, nil, queryCfg) + r2, err = e2ethanos.NewTSDBRuler(s.SharedDir(), "rule2", thanosRulesSubDir, nil, queryCfg) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(r1, r2)) From 9e033d09e689da98f13bce220aaae1ff9660f260 Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Mon, 7 Jun 2021 12:21:10 +0100 Subject: [PATCH 11/18] Generate docs for stateless ruler flags and fix tests Signed-off-by: Michael Okoko --- cmd/thanos/config.go | 9 --------- cmd/thanos/rule.go | 30 ++++++++++------------------ docs/components/rule.md | 19 +++++++++++++++++- pkg/rules/remotewrite/remotewrite.go | 9 +++++++-- pkg/rules/remotewrite/util.go | 5 +++++ scripts/quickstart.sh | 3 +-- test/e2e/rule_test.go | 12 +++++------ 7 files changed, 47 insertions(+), 40 deletions(-) diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index 5760984b3d..3770389a68 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -224,12 +224,3 @@ func (ac *alertMgrConfig) registerFlag(cmd extflag.FlagClause) *alertMgrConfig { ac.alertRelabelConfigPath = extflag.RegisterPathOrContent(cmd, "alert.relabel-config", "YAML file that contains alert relabelling configuration.", extflag.WithEnvSubstitution()) return ac } - -type ruleRWConfig struct { - configPath *extflag.PathOrContent -} - -func (rc *ruleRWConfig) registerFlag(cmd extflag.FlagClause) *ruleRWConfig { - rc.configPath = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write server where samples should be sent to. This automatically enables stateless mode for ruler and no series will be stored in the ruler's TSDB. See https://thanos.io/tip/components/rule.md/#query-api", false) - return rc -} diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 9a95290a1e..ac47b6a617 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -78,8 +78,7 @@ type ruleConfig struct { alertQueryURL *url.URL alertRelabelConfigYAML []byte - rwConfig ruleRWConfig - rwConfigYAML []byte + rwConfig *extflag.PathOrContent resendDelay time.Duration evalInterval time.Duration @@ -96,7 +95,6 @@ func (rc *ruleConfig) registerFlag(cmd extkingpin.FlagClause) { rc.shipper.registerFlag(cmd) rc.query.registerFlag(cmd) rc.alertmgr.registerFlag(cmd) - rc.rwConfig.registerFlag(cmd) } // registerRule registers a rule command. @@ -124,6 +122,8 @@ func registerRule(app *extkingpin.App) { cmd.Flag("eval-interval", "The default evaluation interval to use."). Default("30s").DurationVar(&conf.evalInterval) + conf.rwConfig = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write server where samples should be sent to. This automatically enables stateless mode for ruler and no series will be stored in the ruler's TSDB. If an empty config (or file) is provided, the flag is ignored and ruler is run with its own TSDB.", extflag.WithEnvSubstitution()) + reqLogDecision := cmd.Flag("log.request.decision", "Deprecation Warning - This flag would be soon deprecated, and replaced with `request.logging-config`. Request Logging for logging the start and end of requests. By default this flag is disabled. LogFinishCall: Logs the finish call of the requests. LogStartAndFinishCall: Logs the start and finish call of the requests. NoLogCall: Disable request logging.").Default("").Enum("NoLogCall", "LogFinishCall", "LogStartAndFinishCall", "") conf.objStoreConfig = extkingpin.RegisterCommonObjStoreFlags(cmd, "", false) @@ -171,14 +171,6 @@ func registerRule(app *extkingpin.App) { return errors.New("--query/--query.sd-files and --query.config* parameters cannot be defined at the same time") } - // Parse and check remote-write config and enable stateless mode for ruler. - if conf.rwConfig.configPath != nil { - conf.rwConfigYAML, err = conf.rwConfig.configPath.Content() - if err != nil { - return err - } - } - // Parse and check alerting configuration. conf.alertmgrsConfigYAML, err = conf.alertmgr.configPath.Content() if err != nil { @@ -340,16 +332,14 @@ func runRule( db *tsdb.DB ) - if conf.rwConfig.configPath != nil { - conf.rwConfigYAML, err = conf.rwConfig.configPath.Content() - if err != nil { - return err - } + rwCfgYAML, err := conf.rwConfig.Content() + if err != nil { + return err + } + + if len(rwCfgYAML) > 0 { var rwCfg remotewrite.Config - if len(conf.rwConfigYAML) == 0 { - return errors.New("no --remote-write.config was given") - } - rwCfg, err = remotewrite.LoadRemoteWriteConfig(conf.rwConfigYAML) + rwCfg, err = remotewrite.LoadRemoteWriteConfig(rwCfgYAML) if err != nil { return err } diff --git a/docs/components/rule.md b/docs/components/rule.md index 8e9da7773e..0b051a5d05 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -344,7 +344,24 @@ Flags: (repeatable). --query.sd-interval=5m Refresh interval to re-read file SD files. (used as a fallback) - --request.logging-config= + --remote-write.config= + Alternative to 'remote-write.config-file' flag + (mutually exclusive). Content of YAML config + for the remote-write server where samples + should be sent to. This automatically enables + stateless mode for ruler and no series will be + stored in the ruler's TSDB. If an empty config + (or file) is provided, the flag is ignored and + ruler is run with its own TSDB. + --remote-write.config-file= + Path to YAML config for the remote-write server + where samples should be sent to. This + automatically enables stateless mode for ruler + and no series will be stored in the ruler's + TSDB. If an empty config (or file) is provided, + the flag is ignored and ruler is run with its + own TSDB. + --request.logging-config= Alternative to 'request.logging-config-file' flag (mutually exclusive). Content of YAML file with request logging configuration. See format diff --git a/pkg/rules/remotewrite/remotewrite.go b/pkg/rules/remotewrite/remotewrite.go index 8e258fcff3..94e540f347 100644 --- a/pkg/rules/remotewrite/remotewrite.go +++ b/pkg/rules/remotewrite/remotewrite.go @@ -1,9 +1,10 @@ package remotewrite import ( - "fmt" "time" + "github.com/pkg/errors" + "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config" @@ -12,11 +13,13 @@ import ( "gopkg.in/yaml.v2" ) +// Config represents a remote write configuration for Thanos stateless ruler. type Config struct { Name string `yaml:"name"` RemoteStore *config.RemoteWriteConfig `yaml:"remote_write,omitempty"` } +// LoadRemoteWriteConfig prepares a Config instance from a given YAML config. func LoadRemoteWriteConfig(configYAML []byte) (Config, error) { var cfg Config if err := yaml.Unmarshal(configYAML, &cfg); err != nil { @@ -25,6 +28,8 @@ func LoadRemoteWriteConfig(configYAML []byte) (Config, error) { return cfg, nil } +// NewFanoutStorage creates a storage that fans-out to both the WAL and a configured remote storage. +// The remote storage tails the WAL and sends the metrics it reads using Prometheus' remote_write. func NewFanoutStorage(logger log.Logger, reg prometheus.Registerer, walDir string, rwConfig Config) (storage.Storage, error) { walStore, err := NewStorage(logger, reg, walDir) if err != nil { @@ -35,7 +40,7 @@ func NewFanoutStorage(logger log.Logger, reg prometheus.Registerer, walDir strin GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: []*config.RemoteWriteConfig{rwConfig.RemoteStore}, }); err != nil { - return nil, fmt.Errorf("failed applying config to remote storage: %w", err) + return nil, errors.Wrap(err, "applying config to remote storage") } return storage.NewFanout(logger, walStore, remoteStore), nil } diff --git a/pkg/rules/remotewrite/util.go b/pkg/rules/remotewrite/util.go index b6ddb6b804..e9b6a573d0 100644 --- a/pkg/rules/remotewrite/util.go +++ b/pkg/rules/remotewrite/util.go @@ -104,6 +104,11 @@ func (c *walDataCollector) Append(samples []record.RefSample) bool { return true } +func (c *walDataCollector) AppendExemplars([]record.RefExemplar) bool { + // dummy implementation to make walDataCollector conform to the WriteTo interface + return true +} + func (c *walDataCollector) StoreSeries(series []record.RefSeries, _ int) { c.mut.Lock() defer c.mut.Unlock() diff --git a/scripts/quickstart.sh b/scripts/quickstart.sh index fb4395e9d2..779be02a71 100755 --- a/scripts/quickstart.sh +++ b/scripts/quickstart.sh @@ -254,8 +254,7 @@ if [ -n "${STATELESS_RULER_ENABLED}" ]; then name: "receive-0" EOF - REMOTE_WRITE_FLAGS="--remote-write --remote-write.config-file data/rule-remote-write.yaml - " + REMOTE_WRITE_FLAGS="--remote-write.config-file data/rule-remote-write.yaml" fi # Start Thanos Ruler. diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index e1bad0c9f9..8d3700c23e 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -10,7 +10,6 @@ import ( "fmt" "io/ioutil" "net/http" - "net/url" "os" "path/filepath" "testing" @@ -102,6 +101,7 @@ groups: annotations: summary: "I always complain and I have been loaded via sighup signal." ` + amTimeout = model.Duration(10 * time.Second) ) type rulesResp struct { @@ -240,7 +240,7 @@ func TestRule(t *testing.T) { }, Scheme: "http", }, - Timeout: model.Duration(10 * time.Second), + Timeout: amTimeout, APIVersion: alert.APIv1, }, }, []query.Config{ @@ -449,7 +449,7 @@ func TestRule_CanRemoteWriteData(t *testing.T) { testRuleRecordAbsentMetric := ` groups: - name: example_record_rules - interval: 100ms + interval: 1s rules: - record: test_absent_metric expr: absent(nonexistent{job='thanos-receive'}) @@ -474,8 +474,8 @@ groups: receiver, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(receiver)) - rwURL, err := url.Parse(e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(8081))) - testutil.Ok(t, err) + rwURL := mustURLParse(t, e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(8081))) + querier, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{receiver.GRPCNetworkEndpoint()}).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(querier)) @@ -487,7 +487,7 @@ groups: }, Scheme: "http", }, - Timeout: model.Duration(time.Second), + Timeout: amTimeout, APIVersion: alert.APIv1, }, }, []query.Config{ From 64f4c2616770025800b0edf64a8237485c18c67c Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Thu, 10 Jun 2021 14:07:53 +0100 Subject: [PATCH 12/18] Use promauto for prometheus primitives Signed-off-by: Michael Okoko --- pkg/rules/remotewrite/wal.go | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/pkg/rules/remotewrite/wal.go b/pkg/rules/remotewrite/wal.go index 6aad91b32f..25b9143567 100644 --- a/pkg/rules/remotewrite/wal.go +++ b/pkg/rules/remotewrite/wal.go @@ -1,5 +1,6 @@ // This is copied from https://github.com/grafana/agent/blob/a23bd5cf27c2ac99695b7449d38fb12444941a1c/pkg/prom/wal/wal.go // TODO(idoqo): Migrate to prometheus package when https://github.com/prometheus/prometheus/pull/8785 is ready. + package remotewrite import ( @@ -13,6 +14,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" @@ -39,41 +41,31 @@ type storageMetrics struct { func newStorageMetrics(r prometheus.Registerer) *storageMetrics { m := storageMetrics{r: r} - m.numActiveSeries = prometheus.NewGauge(prometheus.GaugeOpts{ + m.numActiveSeries = promauto.With(r).NewGauge(prometheus.GaugeOpts{ Name: "agent_wal_storage_active_series", Help: "Current number of active series being tracked by the WAL storage", }) - m.numDeletedSeries = prometheus.NewGauge(prometheus.GaugeOpts{ + m.numDeletedSeries = promauto.With(r).NewGauge(prometheus.GaugeOpts{ Name: "agent_wal_storage_deleted_series", Help: "Current number of series marked for deletion from memory", }) - m.totalCreatedSeries = prometheus.NewCounter(prometheus.CounterOpts{ + m.totalCreatedSeries = promauto.With(r).NewCounter(prometheus.CounterOpts{ Name: "agent_wal_storage_created_series_total", Help: "Total number of created series appended to the WAL", }) - m.totalRemovedSeries = prometheus.NewCounter(prometheus.CounterOpts{ + m.totalRemovedSeries = promauto.With(r).NewCounter(prometheus.CounterOpts{ Name: "agent_wal_storage_removed_series_total", Help: "Total number of created series removed from the WAL", }) - m.totalAppendedSamples = prometheus.NewCounter(prometheus.CounterOpts{ + m.totalAppendedSamples = promauto.With(r).NewCounter(prometheus.CounterOpts{ Name: "agent_wal_samples_appended_total", Help: "Total number of samples appended to the WAL", }) - if r != nil { - r.MustRegister( - m.numActiveSeries, - m.numDeletedSeries, - m.totalCreatedSeries, - m.totalRemovedSeries, - m.totalAppendedSamples, - ) - } - return &m } From 0290a3d816168c1168bc584eed0a829b125dedd2 Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Thu, 10 Jun 2021 14:58:45 +0100 Subject: [PATCH 13/18] Group imports and satisfy go-lint Signed-off-by: Michael Okoko --- cmd/thanos/rule.go | 5 ++--- pkg/rules/remotewrite/wal_test.go | 2 +- test/e2e/e2ethanos/services.go | 3 +-- test/e2e/rule_test.go | 7 +++---- 4 files changed, 7 insertions(+), 10 deletions(-) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index ac47b6a617..cb696db083 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -15,9 +15,6 @@ import ( "strings" "time" - "github.com/prometheus/prometheus/storage" - "github.com/thanos-io/thanos/pkg/rules/remotewrite" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" @@ -33,6 +30,7 @@ import ( "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/rules" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/util/strutil" "github.com/thanos-io/thanos/pkg/errutil" @@ -53,6 +51,7 @@ import ( "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/query" thanosrules "github.com/thanos-io/thanos/pkg/rules" + "github.com/thanos-io/thanos/pkg/rules/remotewrite" "github.com/thanos-io/thanos/pkg/runutil" grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" diff --git a/pkg/rules/remotewrite/wal_test.go b/pkg/rules/remotewrite/wal_test.go index d1f9eb684a..555b71bb28 100644 --- a/pkg/rules/remotewrite/wal_test.go +++ b/pkg/rules/remotewrite/wal_test.go @@ -348,7 +348,7 @@ func (s seriesList) SeriesNames() []string { return names } -// ExpectedSamples returns the list of expected samples, sorted by ref ID and timestamp +// ExpectedSamples returns the list of expected samples, sorted by ref ID and timestamp. func (s seriesList) ExpectedSamples() []record.RefSample { expect := []record.RefSample{} for _, series := range s { diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 38a68736c4..7e17a5bdd8 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -13,8 +13,6 @@ import ( "strings" "time" - "github.com/thanos-io/thanos/pkg/rules/remotewrite" - "github.com/cortexproject/cortex/integration/e2e" "github.com/grafana/dskit/backoff" "github.com/pkg/errors" @@ -28,6 +26,7 @@ import ( "github.com/thanos-io/thanos/pkg/query" "github.com/thanos-io/thanos/pkg/queryfrontend" "github.com/thanos-io/thanos/pkg/receive" + "github.com/thanos-io/thanos/pkg/rules/remotewrite" ) const infoLogLevel = "info" diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 8d3700c23e..413cd8d2e1 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -15,12 +15,10 @@ import ( "testing" "time" - common_cfg "github.com/prometheus/common/config" - "github.com/prometheus/prometheus/config" - "github.com/thanos-io/thanos/pkg/rules/remotewrite" - "github.com/cortexproject/cortex/integration/e2e" + common_cfg "github.com/prometheus/common/config" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" "gopkg.in/yaml.v2" @@ -28,6 +26,7 @@ import ( http_util "github.com/thanos-io/thanos/pkg/http" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/query" + "github.com/thanos-io/thanos/pkg/rules/remotewrite" "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/test/e2e/e2ethanos" From 77ee3379ee3418f5727646c3e3bee681598e31af Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Tue, 6 Jul 2021 01:32:59 +0100 Subject: [PATCH 14/18] Always return empty series set from WAL storage Signed-off-by: Michael Okoko --- pkg/rules/remotewrite/wal.go | 2 +- test/e2e/rule_test.go | 26 ++++++++++++++------------ 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/pkg/rules/remotewrite/wal.go b/pkg/rules/remotewrite/wal.go index 25b9143567..eaf08f99eb 100644 --- a/pkg/rules/remotewrite/wal.go +++ b/pkg/rules/remotewrite/wal.go @@ -690,5 +690,5 @@ func (r *remoteWriteQueryable) Close() error { } func (r *remoteWriteQueryable) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { - return nil + return storage.EmptySeriesSet() } diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 413cd8d2e1..8e60285a81 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -99,6 +99,14 @@ groups: severity: page annotations: summary: "I always complain and I have been loaded via sighup signal." +` + testRuleRecordAbsentMetric = ` +groups: +- name: example_record_rules + interval: 1s + rules: + - record: test_absent_metric + expr: absent(nonexistent{job='thanos-receive'}) ` amTimeout = model.Duration(10 * time.Second) ) @@ -445,15 +453,6 @@ func TestRule(t *testing.T) { func TestRule_CanRemoteWriteData(t *testing.T) { t.Parallel() - testRuleRecordAbsentMetric := ` -groups: -- name: example_record_rules - interval: 1s - rules: - - record: test_absent_metric - expr: absent(nonexistent{job='thanos-receive'}) -` - s, err := e2e.NewScenario("e2e_test_rule_remote_write") testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, s)) @@ -464,13 +463,16 @@ groups: rulesSubDir := "rules" rulesPath := filepath.Join(s.SharedDir(), rulesSubDir) testutil.Ok(t, os.MkdirAll(rulesPath, os.ModePerm)) - createRuleFile(t, filepath.Join(rulesPath, "rules-0.yaml"), testRuleRecordAbsentMetric) + + for i, rule := range []string{testRuleRecordAbsentMetric, testAlertRuleWarnOnPartialResponse} { + createRuleFile(t, filepath.Join(rulesPath, fmt.Sprintf("rules-%d.yaml", i)), rule) + } am, err := e2ethanos.NewAlertmanager(s.SharedDir(), "1") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(am)) - receiver, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1) + receiver, err := e2ethanos.NewIngestingReceiver(s.SharedDir(), s.NetworkName()) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(receiver)) rwURL := mustURLParse(t, e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(8081))) @@ -516,7 +518,7 @@ groups: { "__name__": "test_absent_metric", "job": "thanos-receive", - "receive": "1", + "receive": "e2e_test_rule_remote_write", "tenant_id": "default-tenant", }, }) From 933a875449963e8a5e3c7c0e6855c74b3f900cd3 Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Wed, 1 Sep 2021 23:29:32 +0100 Subject: [PATCH 15/18] re-generate rule documentation Signed-off-by: Michael Okoko --- docs/components/rule.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/components/rule.md b/docs/components/rule.md index 0b051a5d05..6518674766 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -344,7 +344,7 @@ Flags: (repeatable). --query.sd-interval=5m Refresh interval to re-read file SD files. (used as a fallback) - --remote-write.config= + --remote-write.config= Alternative to 'remote-write.config-file' flag (mutually exclusive). Content of YAML config for the remote-write server where samples @@ -353,7 +353,7 @@ Flags: stored in the ruler's TSDB. If an empty config (or file) is provided, the flag is ignored and ruler is run with its own TSDB. - --remote-write.config-file= + --remote-write.config-file= Path to YAML config for the remote-write server where samples should be sent to. This automatically enables stateless mode for ruler @@ -361,7 +361,7 @@ Flags: TSDB. If an empty config (or file) is provided, the flag is ignored and ruler is run with its own TSDB. - --request.logging-config= + --request.logging-config= Alternative to 'request.logging-config-file' flag (mutually exclusive). Content of YAML file with request logging configuration. See format From 41d34088e0c681d0721630664a21e24bbfdc5105 Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Wed, 1 Sep 2021 23:41:34 +0100 Subject: [PATCH 16/18] copyright headers to satisfy golint Signed-off-by: Michael Okoko --- pkg/rules/remotewrite/remotewrite.go | 3 +++ pkg/rules/remotewrite/series.go | 3 +++ pkg/rules/remotewrite/util.go | 3 +++ pkg/rules/remotewrite/wal.go | 3 +++ pkg/rules/remotewrite/wal_test.go | 3 +++ 5 files changed, 15 insertions(+) diff --git a/pkg/rules/remotewrite/remotewrite.go b/pkg/rules/remotewrite/remotewrite.go index 94e540f347..13cf6378a4 100644 --- a/pkg/rules/remotewrite/remotewrite.go +++ b/pkg/rules/remotewrite/remotewrite.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package remotewrite import ( diff --git a/pkg/rules/remotewrite/series.go b/pkg/rules/remotewrite/series.go index d82a31e1a2..02063d8bfc 100644 --- a/pkg/rules/remotewrite/series.go +++ b/pkg/rules/remotewrite/series.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + // This is copied from https://github.com/grafana/agent/blob/a23bd5cf27c2ac99695b7449d38fb12444941a1c/pkg/prom/wal/series.go // TODO(idoqo): Migrate to prometheus package when https://github.com/prometheus/prometheus/pull/8785 is ready. package remotewrite diff --git a/pkg/rules/remotewrite/util.go b/pkg/rules/remotewrite/util.go index e9b6a573d0..bb3b2ec7c6 100644 --- a/pkg/rules/remotewrite/util.go +++ b/pkg/rules/remotewrite/util.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + // This is copied from https://github.com/grafana/agent/blob/a23bd5cf27c2ac99695b7449d38fb12444941a1c/pkg/prom/wal/util.go // TODO(idoqo): Migrate to prometheus package when https://github.com/prometheus/prometheus/pull/8785 is ready. package remotewrite diff --git a/pkg/rules/remotewrite/wal.go b/pkg/rules/remotewrite/wal.go index eaf08f99eb..bcd7fb409e 100644 --- a/pkg/rules/remotewrite/wal.go +++ b/pkg/rules/remotewrite/wal.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + // This is copied from https://github.com/grafana/agent/blob/a23bd5cf27c2ac99695b7449d38fb12444941a1c/pkg/prom/wal/wal.go // TODO(idoqo): Migrate to prometheus package when https://github.com/prometheus/prometheus/pull/8785 is ready. diff --git a/pkg/rules/remotewrite/wal_test.go b/pkg/rules/remotewrite/wal_test.go index 555b71bb28..1843284081 100644 --- a/pkg/rules/remotewrite/wal_test.go +++ b/pkg/rules/remotewrite/wal_test.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + // This is copied from https://github.com/grafana/agent/blob/a23bd5cf27c2ac99695b7449d38fb12444941a1c/pkg/prom/wal/wal_test.go // TODO(idoqo): Migrate to prometheus package when https://github.com/prometheus/prometheus/pull/8785 is ready. package remotewrite From 79b06d31aa0eaeb778ec6f72e9093e8534dd1fbd Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Fri, 17 Sep 2021 04:19:37 +0100 Subject: [PATCH 17/18] Rename wal storage metrics Signed-off-by: Michael Okoko --- go.mod | 1 + pkg/rules/remotewrite/wal.go | 12 ++++++------ test/e2e/rule_test.go | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 9dc5c9f698..7878f29bcc 100644 --- a/go.mod +++ b/go.mod @@ -61,6 +61,7 @@ require ( github.com/prometheus/common v0.29.0 github.com/prometheus/exporter-toolkit v0.6.0 github.com/prometheus/prometheus v1.8.2-0.20210720123808-b1ed4a0a663d + github.com/stretchr/testify v1.7.0 github.com/tencentyun/cos-go-sdk-v5 v0.7.31 github.com/uber/jaeger-client-go v2.29.1+incompatible github.com/uber/jaeger-lib v2.4.1+incompatible diff --git a/pkg/rules/remotewrite/wal.go b/pkg/rules/remotewrite/wal.go index bcd7fb409e..bf2c0f033a 100644 --- a/pkg/rules/remotewrite/wal.go +++ b/pkg/rules/remotewrite/wal.go @@ -45,27 +45,27 @@ type storageMetrics struct { func newStorageMetrics(r prometheus.Registerer) *storageMetrics { m := storageMetrics{r: r} m.numActiveSeries = promauto.With(r).NewGauge(prometheus.GaugeOpts{ - Name: "agent_wal_storage_active_series", + Name: "thanos_wal_storage_active_series", Help: "Current number of active series being tracked by the WAL storage", }) m.numDeletedSeries = promauto.With(r).NewGauge(prometheus.GaugeOpts{ - Name: "agent_wal_storage_deleted_series", + Name: "thanos_wal_storage_deleted_series", Help: "Current number of series marked for deletion from memory", }) m.totalCreatedSeries = promauto.With(r).NewCounter(prometheus.CounterOpts{ - Name: "agent_wal_storage_created_series_total", + Name: "thanos_wal_storage_created_series_total", Help: "Total number of created series appended to the WAL", }) m.totalRemovedSeries = promauto.With(r).NewCounter(prometheus.CounterOpts{ - Name: "agent_wal_storage_removed_series_total", + Name: "thanos_wal_storage_removed_series_total", Help: "Total number of created series removed from the WAL", }) m.totalAppendedSamples = promauto.With(r).NewCounter(prometheus.CounterOpts{ - Name: "agent_wal_samples_appended_total", + Name: "thanos_wal_samples_appended_total", Help: "Total number of samples appended to the WAL", }) @@ -684,7 +684,7 @@ func (r *remoteWriteQueryable) LabelValues(name string, matchers ...*labels.Matc return nil, nil, nil } -func (r *remoteWriteQueryable) LabelNames() ([]string, storage.Warnings, error) { +func (r *remoteWriteQueryable) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { return nil, nil, nil } diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 8e60285a81..0a074f4dc9 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -477,7 +477,7 @@ func TestRule_CanRemoteWriteData(t *testing.T) { testutil.Ok(t, s.StartAndWaitReady(receiver)) rwURL := mustURLParse(t, e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(8081))) - querier, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{receiver.GRPCNetworkEndpoint()}).Build() + querier, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", receiver.GRPCNetworkEndpoint()).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(querier)) r, err := e2ethanos.NewStatelessRuler(s.SharedDir(), "1", rulesSubDir, []alert.AlertmanagerConfig{ From 6b0612ca48ea76ca4c20e655a64ea0faa7c9a25d Mon Sep 17 00:00:00 2001 From: Michael Okoko Date: Fri, 17 Sep 2021 12:05:41 +0100 Subject: [PATCH 18/18] Use Prometheus' remote write config instead of rolling another Signed-off-by: Michael Okoko --- cmd/thanos/rule.go | 12 +++++++----- docs/components/rule.md | 25 ++++++++++++++----------- pkg/rules/remotewrite/remotewrite.go | 16 +++++----------- test/e2e/e2ethanos/services.go | 6 +++--- test/e2e/rule_test.go | 10 +++------- 5 files changed, 32 insertions(+), 37 deletions(-) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index cb696db083..5e867c91e5 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/common/route" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/promql" @@ -121,7 +122,7 @@ func registerRule(app *extkingpin.App) { cmd.Flag("eval-interval", "The default evaluation interval to use."). Default("30s").DurationVar(&conf.evalInterval) - conf.rwConfig = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write server where samples should be sent to. This automatically enables stateless mode for ruler and no series will be stored in the ruler's TSDB. If an empty config (or file) is provided, the flag is ignored and ruler is run with its own TSDB.", extflag.WithEnvSubstitution()) + conf.rwConfig = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write server where samples should be sent to (see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). This automatically enables stateless mode for ruler and no series will be stored in the ruler's TSDB. If an empty config (or file) is provided, the flag is ignored and ruler is run with its own TSDB.", extflag.WithEnvSubstitution()) reqLogDecision := cmd.Flag("log.request.decision", "Deprecation Warning - This flag would be soon deprecated, and replaced with `request.logging-config`. Request Logging for logging the start and end of requests. By default this flag is disabled. LogFinishCall: Logs the finish call of the requests. LogStartAndFinishCall: Logs the start and finish call of the requests. NoLogCall: Disable request logging.").Default("").Enum("NoLogCall", "LogFinishCall", "LogStartAndFinishCall", "") @@ -337,20 +338,20 @@ func runRule( } if len(rwCfgYAML) > 0 { - var rwCfg remotewrite.Config + var rwCfg config.RemoteWriteConfig rwCfg, err = remotewrite.LoadRemoteWriteConfig(rwCfgYAML) if err != nil { return err } walDir := filepath.Join(conf.dataDir, rwCfg.Name) - remoteStore, err := remotewrite.NewFanoutStorage(logger, reg, walDir, rwCfg) + remoteStore, err := remotewrite.NewFanoutStorage(logger, reg, walDir, &rwCfg) if err != nil { return errors.Wrap(err, "set up remote-write store for ruler") } appendable = remoteStore queryable = remoteStore } else { - db, err := tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil) + db, err = tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil) if err != nil { return errors.Wrap(err, "open TSDB") } @@ -555,7 +556,7 @@ func runRule( ) // Start gRPC server. - { + if db != nil { tsdbStore := store.NewTSDBStore(logger, db, component.Rule, conf.lset) tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpc.tlsSrvCert, conf.grpc.tlsSrvKey, conf.grpc.tlsSrvClientCA) @@ -580,6 +581,7 @@ func runRule( s.Shutdown(err) }) } + // Start UI & metrics HTTP server. { router := route.New() diff --git a/docs/components/rule.md b/docs/components/rule.md index 6518674766..b13470a22f 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -348,19 +348,22 @@ Flags: Alternative to 'remote-write.config-file' flag (mutually exclusive). Content of YAML config for the remote-write server where samples - should be sent to. This automatically enables - stateless mode for ruler and no series will be - stored in the ruler's TSDB. If an empty config - (or file) is provided, the flag is ignored and - ruler is run with its own TSDB. + should be sent to (see + https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). + This automatically enables stateless mode for + ruler and no series will be stored in the + ruler's TSDB. If an empty config (or file) is + provided, the flag is ignored and ruler is run + with its own TSDB. --remote-write.config-file= Path to YAML config for the remote-write server - where samples should be sent to. This - automatically enables stateless mode for ruler - and no series will be stored in the ruler's - TSDB. If an empty config (or file) is provided, - the flag is ignored and ruler is run with its - own TSDB. + where samples should be sent to (see + https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). + This automatically enables stateless mode for + ruler and no series will be stored in the + ruler's TSDB. If an empty config (or file) is + provided, the flag is ignored and ruler is run + with its own TSDB. --request.logging-config= Alternative to 'request.logging-config-file' flag (mutually exclusive). Content of YAML file diff --git a/pkg/rules/remotewrite/remotewrite.go b/pkg/rules/remotewrite/remotewrite.go index 13cf6378a4..9f1f5f9f46 100644 --- a/pkg/rules/remotewrite/remotewrite.go +++ b/pkg/rules/remotewrite/remotewrite.go @@ -16,15 +16,9 @@ import ( "gopkg.in/yaml.v2" ) -// Config represents a remote write configuration for Thanos stateless ruler. -type Config struct { - Name string `yaml:"name"` - RemoteStore *config.RemoteWriteConfig `yaml:"remote_write,omitempty"` -} - -// LoadRemoteWriteConfig prepares a Config instance from a given YAML config. -func LoadRemoteWriteConfig(configYAML []byte) (Config, error) { - var cfg Config +// LoadRemoteWriteConfig prepares a RemoteWriteConfig instance from a given YAML config. +func LoadRemoteWriteConfig(configYAML []byte) (config.RemoteWriteConfig, error) { + var cfg config.RemoteWriteConfig if err := yaml.Unmarshal(configYAML, &cfg); err != nil { return cfg, err } @@ -33,7 +27,7 @@ func LoadRemoteWriteConfig(configYAML []byte) (Config, error) { // NewFanoutStorage creates a storage that fans-out to both the WAL and a configured remote storage. // The remote storage tails the WAL and sends the metrics it reads using Prometheus' remote_write. -func NewFanoutStorage(logger log.Logger, reg prometheus.Registerer, walDir string, rwConfig Config) (storage.Storage, error) { +func NewFanoutStorage(logger log.Logger, reg prometheus.Registerer, walDir string, rwConfig *config.RemoteWriteConfig) (storage.Storage, error) { walStore, err := NewStorage(logger, reg, walDir) if err != nil { return nil, err @@ -41,7 +35,7 @@ func NewFanoutStorage(logger log.Logger, reg prometheus.Registerer, walDir strin remoteStore := remote.NewStorage(logger, reg, walStore.StartTime, walStore.Directory(), 1*time.Minute, nil) if err := remoteStore.ApplyConfig(&config.Config{ GlobalConfig: config.DefaultGlobalConfig, - RemoteWriteConfigs: []*config.RemoteWriteConfig{rwConfig.RemoteStore}, + RemoteWriteConfigs: []*config.RemoteWriteConfig{rwConfig}, }); err != nil { return nil, errors.Wrap(err, "applying config to remote storage") } diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 7e17a5bdd8..d582042aeb 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/dskit/backoff" "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/pkg/relabel" "gopkg.in/yaml.v2" @@ -26,7 +27,6 @@ import ( "github.com/thanos-io/thanos/pkg/query" "github.com/thanos-io/thanos/pkg/queryfrontend" "github.com/thanos-io/thanos/pkg/receive" - "github.com/thanos-io/thanos/pkg/rules/remotewrite" ) const infoLogLevel = "info" @@ -465,11 +465,11 @@ func NewTSDBRuler(sharedDir string, name string, ruleSubDir string, amCfg []aler return newRuler(sharedDir, name, ruleSubDir, amCfg, queryCfg, nil) } -func NewStatelessRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config, remoteWriteCfg *remotewrite.Config) (*Service, error) { +func NewStatelessRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config, remoteWriteCfg *config.RemoteWriteConfig) (*Service, error) { return newRuler(sharedDir, name, ruleSubDir, amCfg, queryCfg, remoteWriteCfg) } -func newRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config, remoteWriteCfg *remotewrite.Config) (*Service, error) { +func newRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config, remoteWriteCfg *config.RemoteWriteConfig) (*Service, error) { dir := filepath.Join(sharedDir, "data", "rule", name) container := filepath.Join(e2e.ContainerSharedDir, "data", "rule", name) if err := os.MkdirAll(dir, 0750); err != nil { diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 0a074f4dc9..0a8755b9cd 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -26,7 +26,6 @@ import ( http_util "github.com/thanos-io/thanos/pkg/http" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/query" - "github.com/thanos-io/thanos/pkg/rules/remotewrite" "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/test/e2e/e2ethanos" @@ -500,12 +499,9 @@ func TestRule_CanRemoteWriteData(t *testing.T) { Scheme: "http", }, }, - }, &remotewrite.Config{ - Name: "ruler-rw-receivers", - RemoteStore: &config.RemoteWriteConfig{ - URL: &common_cfg.URL{URL: rwURL}, - Name: "thanos-receiver", - }, + }, &config.RemoteWriteConfig{ + URL: &common_cfg.URL{URL: rwURL}, + Name: "thanos-receiver", }) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(r))