Skip to content

Commit

Permalink
echo upgrade + otel support
Browse files Browse the repository at this point in the history
  • Loading branch information
mcorbin committed Jan 9, 2025
1 parent 9cb0e77 commit c3866e0
Show file tree
Hide file tree
Showing 887 changed files with 152,352 additions and 12,905 deletions.
32 changes: 31 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"fmt"
"log"
"os"
Expand All @@ -10,9 +11,13 @@ import (
"gopkg.in/yaml.v2"

"github.com/appclacks/cabourotte/daemon"

"github.com/pkg/errors"
"github.com/urfave/cli/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -56,6 +61,31 @@ func Main() {
if err != nil {
return errors.Wrapf(err, "Fail to start the logger")
}
ctx := context.Background()
exp, err := otlptracehttp.New(ctx)
if err != nil {
return err
}

r := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("cabourotte"),
)

shutdownFn := func() {}

if os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") != "" || os.Getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") != "" {
logger.Info("starting opentelemetry traces export")
tracerProvider := trace.NewTracerProvider(trace.WithBatcher(exp), trace.WithResource(r))
otel.SetTracerProvider(tracerProvider)
shutdownFn = func() {
err := tracerProvider.Shutdown(context.Background())
if err != nil {
panic(err)
}
}
}
defer shutdownFn()
// nolint
defer logger.Sync()
daemonComponent, err := daemon.New(logger, &config)
Expand Down
1 change: 1 addition & 0 deletions daemon/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Component struct {
// New creates and start a new daemon component
func New(logger *zap.Logger, config *Configuration) (*Component, error) {
logger.Info("Starting the Cabourotte daemon")

prom, err := prometheus.New()
if err != nil {
return nil, err
Expand Down
37 changes: 29 additions & 8 deletions discovery/http/root.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
package http

import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/http/httptrace"
"time"

"github.com/appclacks/cabourotte/healthcheck"
"github.com/appclacks/cabourotte/tls"
"github.com/pkg/errors"
prom "github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
"gopkg.in/tomb.v2"

"github.com/appclacks/cabourotte/healthcheck"
"github.com/appclacks/cabourotte/tls"
)

// HTTPDiscovery the http discovery struct
Expand Down Expand Up @@ -57,8 +63,13 @@ func New(logger *zap.Logger, config *Configuration, checkComponent *healthcheck.
Config: config,
URL: url,
Client: &http.Client{
Transport: transport,
Timeout: time.Second * 5,
Transport: otelhttp.NewTransport(
transport,
otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace {
return otelhttptrace.NewClientTrace(ctx)
}),
),
Timeout: time.Second * 5,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
Expand All @@ -67,8 +78,8 @@ func New(logger *zap.Logger, config *Configuration, checkComponent *healthcheck.
return &component, nil
}

func (c *HTTPDiscovery) request() error {
req, err := http.NewRequest("GET", c.URL, nil)
func (c *HTTPDiscovery) request(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, "GET", c.URL, nil)
if err != nil {
return errors.Wrapf(err, "HTTP discovery: fail to create request for %s", c.URL)
}
Expand Down Expand Up @@ -120,16 +131,26 @@ func (c *HTTPDiscovery) Start() error {
for {
select {
case <-c.tick.C:
tracer := otel.Tracer("discovery")
ctx, span := tracer.Start(context.Background(), "discovery")
span.SetAttributes(attribute.String("cabourotte.discovery.name", c.Config.Name))
span.SetAttributes(attribute.String("cabourotte.discovery.type", "http"))
c.Logger.Debug(fmt.Sprintf("HTTP discovery: polling %s", c.URL))
start := time.Now()
status := "success"
err := c.request()
err := c.request(ctx)
duration := time.Since(start)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "discovery failure")
status = "failure"
msg := fmt.Sprintf("HTTP discovery error: %s", err.Error())
c.Logger.Error(msg)
} else {
span.SetStatus(codes.Ok, "discovery successful")
}
span.SetAttributes(attribute.String("cabourotte.discovery.status", status))
span.End()
c.requestHistogram.With(prom.Labels{"name": c.Config.Name}).Observe(duration.Seconds())
c.responseCounter.With(prom.Labels{"status": status, "name": c.Config.Name}).Inc()
case <-c.t.Dying():
Expand Down
5 changes: 3 additions & 2 deletions discovery/http/root_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package http

import (
"context"
"encoding/json"
"net"
"net/http"
Expand Down Expand Up @@ -124,7 +125,7 @@ func TestRequest(t *testing.T) {
if err != nil {
t.Fatalf("Fail to create the HTTP discovery component :\n%v", err)
}
err = discovery.request()
err = discovery.request(context.Background())
if err != nil {
t.Fatalf("HTTP discovery request failed\n%v", err)
}
Expand All @@ -135,7 +136,7 @@ func TestRequest(t *testing.T) {
if checks[0].Base().Name != "foo" {
t.Fatalf("Invalid healthcheck name %s", checks[0].Base().Name)
}
err = discovery.request()
err = discovery.request(context.Background())
if err != nil {
t.Fatalf("HTTP discovery request failed\n%v", err)
}
Expand Down
17 changes: 13 additions & 4 deletions exporter/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ package exporter

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptrace"
"time"

"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/appclacks/cabourotte/healthcheck"
"github.com/appclacks/cabourotte/tls"
"go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

// HTTPConfiguration The configuration for the HTTP exporter.
Expand Down Expand Up @@ -86,8 +90,13 @@ func NewHTTPExporter(logger *zap.Logger, config *HTTPConfiguration) (*HTTPExport
Config: config,
URL: url,
Client: &http.Client{
Transport: transport,
Timeout: time.Second * 3,
Transport: otelhttp.NewTransport(
transport,
otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace {
return otelhttptrace.NewClientTrace(ctx)
}),
),
Timeout: time.Second * 3,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
Expand Down Expand Up @@ -134,14 +143,14 @@ func (c *HTTPExporter) GetConfig() interface{} {
}

// Push pushes events to the HTTP destination
func (c *HTTPExporter) Push(result *healthcheck.Result) error {
func (c *HTTPExporter) Push(ctx context.Context, result *healthcheck.Result) error {
var jsonBytes []byte
payload := []*healthcheck.Result{result}
jsonBytes, err := json.Marshal(payload)
if err != nil {
return errors.Wrapf(err, "Fail to convert result to json:\n%v", result)
}
req, err := http.NewRequest("POST", c.URL, bytes.NewBuffer(jsonBytes))
req, err := http.NewRequestWithContext(ctx, "POST", c.URL, bytes.NewBuffer(jsonBytes))
if err != nil {
return errors.Wrapf(err, "HTTP exporter: fail to create request for %s", c.URL)
}
Expand Down
3 changes: 2 additions & 1 deletion exporter/http_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package exporter

import (
"context"
"net/http"
"net/http/httptest"
"strconv"
Expand Down Expand Up @@ -39,7 +40,7 @@ func TestHTTPExporter(t *testing.T) {
if err != nil {
t.Fatalf("Fail to start the http exporter:\n%v", err)
}
err = exporter.Push(&healthcheck.Result{
err = exporter.Push(context.Background(), &healthcheck.Result{
Name: "foo",
Success: true,
HealthcheckTimestamp: time.Now().Unix(),
Expand Down
3 changes: 2 additions & 1 deletion exporter/riemann.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package exporter

import (
"context"
"fmt"
"net"
"time"
Expand Down Expand Up @@ -145,7 +146,7 @@ func (c *RiemannExporter) IsStarted() bool {
}

// Push pushes events to the desination
func (c *RiemannExporter) Push(result *healthcheck.Result) error {
func (c *RiemannExporter) Push(_ context.Context, result *healthcheck.Result) error {
state := "ok"
if !result.Success {
state = "critical"
Expand Down
27 changes: 21 additions & 6 deletions exporter/root.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package exporter

import (
"context"
"fmt"
"sync"
"time"

"github.com/appclacks/cabourotte/healthcheck"
"github.com/appclacks/cabourotte/memorystore"
"github.com/appclacks/cabourotte/prometheus"
"github.com/pkg/errors"
prom "github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
"gopkg.in/tomb.v2"

"github.com/appclacks/cabourotte/healthcheck"
"github.com/appclacks/cabourotte/memorystore"
"github.com/appclacks/cabourotte/prometheus"
)

// Exporter the exporter interface
Expand All @@ -23,7 +26,7 @@ type Exporter interface {
IsStarted() bool
Name() string
GetConfig() interface{}
Push(*healthcheck.Result) error
Push(context.Context, *healthcheck.Result) error
}

// Component the exporter component
Expand Down Expand Up @@ -122,7 +125,9 @@ func (c *Component) Start() error {
})
go func() {
defer c.wg.Done()
tracer := otel.Tracer("exporter")
for message := range c.ChanResult {
ctx, span := tracer.Start(context.Background(), "export")
c.MemoryStore.Add(message)
if message.Success {
c.Logger.Debug("Healthcheck successful",
Expand All @@ -140,21 +145,28 @@ func (c *Component) Start() error {
}
for k := range c.Exporters {
exporter := c.Exporters[k]
ctx, exporterSpan := tracer.Start(ctx, "exporter")
exporterSpan.SetAttributes(attribute.String("cabourotte.exporter.name", exporter.Name()))
if exporter.IsStarted() {
start := time.Now()
err := exporter.Push(message)
err := exporter.Push(ctx, message)
duration := time.Since(start)
status := "success"
name := exporter.Name()
if err != nil {
c.Logger.Error(fmt.Sprintf("Failed to push healthchecks result for exporter %s: %s", name, err.Error()))
status = "failure"
exporterSpan.RecordError(err)
exporterSpan.SetStatus(codes.Error, "exporter failure")
err := exporter.Stop()
if err != nil {
// do not return error
// on purpose
exporterSpan.RecordError(err)
c.Logger.Error(fmt.Sprintf("Fail to close the exporter %s: %s", name, err.Error()))
}
} else {
span.SetStatus(codes.Ok, "successfully exported results")
}
c.exporterHistogram.With(prom.Labels{"name": name, "status": status}).Observe(duration.Seconds())
}
Expand All @@ -163,10 +175,13 @@ func (c *Component) Start() error {
if err != nil {
// do not return error
// on purpose
exporterSpan.SetStatus(codes.Error, "exporter failure")
span.RecordError(err)
c.Logger.Error(fmt.Sprintf("fail to reconnect the exporter %s: %s", exporter.Name(), err.Error()))
}
}
}
span.End()
}
c.Logger.Info("Exporter routine stopped")

Expand Down
Loading

0 comments on commit c3866e0

Please sign in to comment.