Skip to content

Commit

Permalink
add E2E test for exemplars
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <[email protected]>
  • Loading branch information
yeya24 committed May 11, 2021
1 parent 48c8c0f commit c120f0a
Show file tree
Hide file tree
Showing 11 changed files with 287 additions and 146 deletions.
2 changes: 1 addition & 1 deletion test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func TestCompactWithStoreGateway(t *testing.T) {
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified"))

q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{str.GRPCNetworkEndpoint()}, nil, nil, nil, nil, nil, "", "")
q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{str.GRPCNetworkEndpoint()}).Build()
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(q))

Expand Down
115 changes: 96 additions & 19 deletions test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/cortexproject/cortex/integration/e2e"
Expand All @@ -36,6 +37,10 @@ var defaultBackoffConfig = util.BackoffConfig{
MaxRetries: 50,
}

const (
FeatureExemplarStorage = "exemplar-storage"
)

// TODO(bwplotka): Run against multiple?
func DefaultPrometheusImage() string {
return "quay.io/prometheus/prometheus:v2.26.0"
Expand All @@ -55,7 +60,7 @@ func DefaultImage() string {
return "thanos"
}

func NewPrometheus(sharedDir string, name string, config, promImage string) (*e2e.HTTPService, string, error) {
func NewPrometheus(sharedDir string, name string, config string, promImage string, enableFeatures ...string) (*e2e.HTTPService, string, error) {
dir := filepath.Join(sharedDir, "data", "prometheus", name)
container := filepath.Join(e2e.ContainerSharedDir, "data", "prometheus", name)
if err := os.MkdirAll(dir, 0750); err != nil {
Expand All @@ -73,6 +78,10 @@ func NewPrometheus(sharedDir string, name string, config, promImage string) (*e2
"--log.level": infoLogLevel,
"--web.listen-address": ":9090",
})

if len(enableFeatures) > 0 {
args = append(args, fmt.Sprintf("--enable-feature=%s", strings.Join(enableFeatures, ",")))
}
prom := e2e.NewHTTPService(
fmt.Sprintf("prometheus-%s", name),
promImage,
Expand All @@ -86,8 +95,8 @@ func NewPrometheus(sharedDir string, name string, config, promImage string) (*e2
return prom, container, nil
}

func NewPrometheusWithSidecar(sharedDir string, netName string, name string, config, promImage string) (*e2e.HTTPService, *Service, error) {
prom, dataDir, err := NewPrometheus(sharedDir, name, config, promImage)
func NewPrometheusWithSidecar(sharedDir string, netName string, name string, config, promImage string, enableFeatures ...string) (*e2e.HTTPService, *Service, error) {
prom, dataDir, err := NewPrometheus(sharedDir, name, config, promImage, enableFeatures...)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -115,11 +124,75 @@ func NewPrometheusWithSidecar(sharedDir string, netName string, name string, con
return prom, sidecar, nil
}

func NewQuerier(sharedDir, name string, storeAddresses, fileSDStoreAddresses, ruleAddresses, targetAddresses []string, metadataAddresses, exemplarAddresses []string, routePrefix, externalPrefix string) (*Service, error) {
type QuerierBuilder struct {
sharedDir string
name string
routePrefix string
externalPrefix string

storeAddresses []string
fileSDStoreAddresses []string
ruleAddresses []string
metadataAddresses []string
targetAddresses []string
exemplarAddresses []string

tracingConfig string
}

func NewQuerierBuilder(sharedDir, name string, storeAddresses []string) *QuerierBuilder {
return &QuerierBuilder{
sharedDir: sharedDir,
name: name,
storeAddresses: storeAddresses,
}
}

func (q *QuerierBuilder) WithFileSDStoreAddresses(fileSDStoreAddresses []string) *QuerierBuilder {
q.fileSDStoreAddresses = fileSDStoreAddresses
return q
}

func (q *QuerierBuilder) WithRuleAddresses(ruleAddresses []string) *QuerierBuilder {
q.ruleAddresses = ruleAddresses
return q
}

func (q *QuerierBuilder) WithTargetAddresses(targetAddresses []string) *QuerierBuilder {
q.targetAddresses = targetAddresses
return q
}

func (q *QuerierBuilder) WithExemplarAddresses(exemplarAddresses []string) *QuerierBuilder {
q.exemplarAddresses = exemplarAddresses
return q
}

func (q *QuerierBuilder) WithMetadataAddresses(metadataAddresses []string) *QuerierBuilder {
q.metadataAddresses = metadataAddresses
return q
}

func (q *QuerierBuilder) WithRoutePrefix(routePrefix string) *QuerierBuilder {
q.routePrefix = routePrefix
return q
}

func (q *QuerierBuilder) WithExternalPrefix(externalPrefix string) *QuerierBuilder {
q.externalPrefix = externalPrefix
return q
}

func (q *QuerierBuilder) WithTracingConfig(tracingConfig string) *QuerierBuilder {
q.tracingConfig = tracingConfig
return q
}

func (q *QuerierBuilder) Build() (*Service, error) {
const replicaLabel = "replica"

args := e2e.BuildArgs(map[string]string{
"--debug.name": fmt.Sprintf("querier-%v", name),
"--debug.name": fmt.Sprintf("querier-%v", q.name),
"--grpc-address": ":9091",
"--grpc-grace-period": "0s",
"--http-address": ":8080",
Expand All @@ -129,35 +202,35 @@ func NewQuerier(sharedDir, name string, storeAddresses, fileSDStoreAddresses, ru
"--query.max-concurrent": "1",
"--store.sd-interval": "5s",
})
for _, addr := range storeAddresses {
for _, addr := range q.storeAddresses {
args = append(args, "--store="+addr)
}

for _, addr := range ruleAddresses {
for _, addr := range q.ruleAddresses {
args = append(args, "--rule="+addr)
}

for _, addr := range targetAddresses {
for _, addr := range q.targetAddresses {
args = append(args, "--target="+addr)
}

for _, addr := range metadataAddresses {
for _, addr := range q.metadataAddresses {
args = append(args, "--metadata="+addr)
}

for _, addr := range exemplarAddresses {
for _, addr := range q.exemplarAddresses {
args = append(args, "--exemplar="+addr)
}

if len(fileSDStoreAddresses) > 0 {
queryFileSDDir := filepath.Join(sharedDir, "data", "querier", name)
container := filepath.Join(e2e.ContainerSharedDir, "data", "querier", name)
if len(q.fileSDStoreAddresses) > 0 {
queryFileSDDir := filepath.Join(q.sharedDir, "data", "querier", q.name)
container := filepath.Join(e2e.ContainerSharedDir, "data", "querier", q.name)
if err := os.MkdirAll(queryFileSDDir, 0750); err != nil {
return nil, errors.Wrap(err, "create query dir failed")
}

fileSD := []*targetgroup.Group{{}}
for _, a := range fileSDStoreAddresses {
for _, a := range q.fileSDStoreAddresses {
fileSD[0].Targets = append(fileSD[0].Targets, model.LabelSet{model.AddressLabel: model.LabelValue(a)})
}

Expand All @@ -173,16 +246,20 @@ func NewQuerier(sharedDir, name string, storeAddresses, fileSDStoreAddresses, ru
args = append(args, "--store.sd-files="+filepath.Join(container, "filesd.yaml"))
}

if routePrefix != "" {
args = append(args, "--web.route-prefix="+routePrefix)
if q.routePrefix != "" {
args = append(args, "--web.route-prefix="+q.routePrefix)
}

if externalPrefix != "" {
args = append(args, "--web.external-prefix="+externalPrefix)
if q.externalPrefix != "" {
args = append(args, "--web.external-prefix="+q.externalPrefix)
}

if q.tracingConfig != "" {
args = append(args, "--tracing.config="+q.tracingConfig)
}

querier := NewService(
fmt.Sprintf("querier-%v", name),
fmt.Sprintf("querier-%v", q.name),
DefaultImage(),
e2e.NewCommand("query", args...),
e2e.NewHTTPReadinessProbe(8080, "/-/ready", 200, 200),
Expand Down
146 changes: 132 additions & 14 deletions test/e2e/exemplars_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,22 @@ package e2e_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/cortexproject/cortex/integration/e2e"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
)

const (
traceIDLabel = "traceID"
)

func TestExemplarsAPI_Fanout(t *testing.T) {
t.Parallel()

Expand All @@ -29,6 +37,7 @@ func TestExemplarsAPI_Fanout(t *testing.T) {
"prom1",
defaultPromConfig("ha", 0, "", ""),
e2ethanos.DefaultPrometheusImage(),
e2ethanos.FeatureExemplarStorage,
)
testutil.Ok(t, err)
prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(
Expand All @@ -37,29 +46,138 @@ func TestExemplarsAPI_Fanout(t *testing.T) {
"prom2",
defaultPromConfig("ha", 1, "", ""),
e2ethanos.DefaultPrometheusImage(),
e2ethanos.FeatureExemplarStorage,
)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2))

q, err := e2ethanos.NewQuerier(
s.SharedDir(),
"query",
[]string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint()},
nil,
nil,
nil,
nil,
[]string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint()},
"",
"",
)
tracingCfg := fmt.Sprintf(`type: JAEGER
config:
sampler_type: const
sampler_param: 1
service_name: %s`, s.NetworkName()+"-query")

stores := []string{sidecar1.NetworkEndpointFor(s.NetworkName(), 9091), sidecar2.NetworkEndpointFor(s.NetworkName(), 9091)}
q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "query", stores).
WithExemplarAddresses(stores).
WithTracingConfig(tracingCfg).
Build()
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(q))

_, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
// Recreate Prometheus and sidecar with Thanos query scrape target.
prom1, sidecar1, err = e2ethanos.NewPrometheusWithSidecar(
s.SharedDir(),
netName,
"prom1",
defaultPromConfig("ha", 0, "", "", "localhost:9090", q.NetworkHTTPEndpoint()),
e2ethanos.DefaultPrometheusImage(),
"exemplar-storage",
)
testutil.Ok(t, err)
prom2, sidecar2, err = e2ethanos.NewPrometheusWithSidecar(
s.SharedDir(),
netName,
"prom2",
defaultPromConfig("ha", 1, "", "", "localhost:9090", q.NetworkHTTPEndpoint()),
e2ethanos.DefaultPrometheusImage(),
"exemplar-storage",
)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2))

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
t.Cleanup(cancel)

testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics))
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"thanos_query_exemplar_apis_dns_provider_results"}, e2e.WaitMissingMetrics))

now := time.Now()
start := timestamp.FromTime(now.Add(-time.Hour))
end := timestamp.FromTime(now.Add(time.Hour))

// Send HTTP requests to thanos query to trigger exemplars.
labelNames(t, ctx, q.HTTPEndpoint(), nil, start, end, func(res []string) bool {
return true
})

t.Run("Basic exemplars query", func(t *testing.T) {
requiredSeriesLabels := map[string]string{
"__name__": "http_request_duration_seconds_bucket",
"handler": "label_names",
"job": "myself",
"method": "get",
"prometheus": "ha",
}
queryExemplars(t, ctx, q.HTTPEndpoint(), `http_request_duration_seconds_bucket{handler="label_names"}`,
start, end, func(data []*exemplarspb.ExemplarData) bool {
if len(data) != 1 {
return false
}

// Compare series labels.
seriesLabels := labelpb.ZLabelSetsToPromLabelSets(data[0].SeriesLabels)
for _, lbls := range seriesLabels {
for k, v := range requiredSeriesLabels {
if lbls.Get(k) != v {
return false
}
}
}

// Make sure the exemplar contains the correct traceID label.
for _, exemplar := range data[0].Exemplars {
for _, lbls := range labelpb.ZLabelSetsToPromLabelSets(exemplar.Labels) {
if !lbls.Has(traceIDLabel) {
return false
}
}
}
return true
})
})

t.Run("Exemplars query with matched external label", func(t *testing.T) {
requiredSeriesLabels := map[string]string{
"__name__": "http_request_duration_seconds_bucket",
"handler": "label_names",
"job": "myself",
"method": "get",
"prometheus": "ha",
}
// Here replica is an external label.
queryExemplars(t, ctx, q.HTTPEndpoint(), `http_request_duration_seconds_bucket{handler="label_names", replica="0"}`,
start, end, func(data []*exemplarspb.ExemplarData) bool {
if len(data) != 1 {
return false
}

// Compare series labels.
seriesLabels := labelpb.ZLabelSetsToPromLabelSets(data[0].SeriesLabels)
for _, lbls := range seriesLabels {
for k, v := range requiredSeriesLabels {
if lbls.Get(k) != v {
return false
}
}
}

// Make sure the exemplar contains the correct traceID label.
for _, exemplar := range data[0].Exemplars {
for _, lbls := range labelpb.ZLabelSetsToPromLabelSets(exemplar.Labels) {
if !lbls.Has(traceIDLabel) {
return false
}
}
}
return true
})
})

t.Run("Exemplars query doesn't match external label", func(t *testing.T) {
// Here replica is an external label, but it doesn't match.
queryExemplars(t, ctx, q.HTTPEndpoint(), `http_request_duration_seconds_bucket{handler="label_names", replica="foo"}`,
start, end, func(data []*exemplarspb.ExemplarData) bool {
return len(data) == 0
})
})
}
Loading

0 comments on commit c120f0a

Please sign in to comment.