Skip to content

Commit

Permalink
chore(examples): quick start example of testing kafka-based apis (#3004)
Browse files Browse the repository at this point in the history
* Adding PoC of tracing with Kafka

* Updated example with producer

* Added consumer to kafka example

* Adding Kafka example

* Update README
  • Loading branch information
danielbdias authored Aug 2, 2023
1 parent 56376d3 commit 9ac8991
Show file tree
Hide file tree
Showing 22 changed files with 1,896 additions and 0 deletions.
1 change: 1 addition & 0 deletions examples/quick-start-go-and-kafka/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.DS_Store
11 changes: 11 additions & 0 deletions examples/quick-start-go-and-kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Quick Start - Go API with Kafka messaging

> [Read the detailed recipe for setting up OpenTelemetry Collector with Tractest in our documentation.](https://docs.tracetest.io/examples-tutorials/recipes/running-tracetest-without-a-trace-data-store)
This is a simple quick start on how to configure two Go lang apps that interacts with [Apache Kafka](https://kafka.apache.org/): a `producer-api` and a `consumer-api` , and how to test if the messaging is properly working with a trace-based test.

If you want to run this example, just execute `docker compose up` on this folder.

To execute a Trace-based test with Tracetest against this structure just run `tracetest run test -f test.yaml`.

Feel free to check out the [docs](https://docs.tracetest.io/), and join our [Discord Community](https://discord.gg/8MtcMrQNbX) for more info!
14 changes: 14 additions & 0 deletions examples/quick-start-go-and-kafka/consumer-worker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM golang:alpine as builder
ENV GO111MODULE=on
RUN apk update && apk add --no-cache git

WORKDIR /app
COPY go.mod ./
COPY go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./bin/main .

FROM scratch
COPY --from=builder /app/bin/main .
CMD ["./main"]
19 changes: 19 additions & 0 deletions examples/quick-start-go-and-kafka/consumer-worker/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package config

import "os"

type Config struct {
OtelExporterEndpoint string
OtelServiceName string
KafkaBrokerUrl string
KafkaTopic string
}

func Current() *Config {
return &Config{
OtelExporterEndpoint: os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"),
OtelServiceName: os.Getenv("OTEL_SERVICE_NAME"),
KafkaBrokerUrl: os.Getenv("KAFKA_BROKER_URL"),
KafkaTopic: os.Getenv("KAFKA_TOPIC"),
}
}
50 changes: 50 additions & 0 deletions examples/quick-start-go-and-kafka/consumer-worker/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
module github.com/kubeshop/tracetest/examples/quick-start-go-and-kafka/consumer-worker

go 1.20

require (
github.com/Shopify/sarama v1.38.1
go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.42.0
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/trace v1.16.0
go.uber.org/zap v1.24.0
google.golang.org/grpc v1.56.2
)

require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.15.14 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)
512 changes: 512 additions & 0 deletions examples/quick-start-go-and-kafka/consumer-worker/go.sum

Large diffs are not rendered by default.

69 changes: 69 additions & 0 deletions examples/quick-start-go-and-kafka/consumer-worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"

"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/kubeshop/tracetest/examples/quick-start-go-and-kafka/consumer-worker/config"
"github.com/kubeshop/tracetest/examples/quick-start-go-and-kafka/consumer-worker/streaming"
"github.com/kubeshop/tracetest/examples/quick-start-go-and-kafka/consumer-worker/telemetry"
)

func main() {
currentConfig := config.Current()

ctx := context.Background()

logger, err := zap.NewDevelopment()
if err != nil {
fmt.Printf("error creating zap logger, error: %v", err)
return
}
defer logger.Sync()

logger.Info("Setting up worker...")

logger.Info("Initializing OpenTelemetry...")
tracer, err := telemetry.Setup(ctx, currentConfig.OtelExporterEndpoint, currentConfig.OtelServiceName)
if err != nil {
logger.Error("Unable to setup OpenTelemetry", zap.Error(err))
return
}
logger.Info("OpenTelemetry initialized.")

logger.Info("Initializing Kafka reader...")
reader, err := streaming.GetKafkaReader(currentConfig.KafkaBrokerUrl, currentConfig.KafkaTopic)
if err != nil {
logger.Error("Unable to setup Kafka reader", zap.Error(err))
return
}
logger.Info("Kafka reader initialized.")

logger.Info("Starting worker...")

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

err = reader.Read(ctx, getMessageReader(tracer, logger))
if err != nil {
logger.Error("Unable to read messages from Kafka", zap.Error(err))
return
}

<-ctx.Done()
logger.Info("Worker stop signal detected")
}

func getMessageReader(tracer trace.Tracer, logger *zap.Logger) func(context.Context, string, string) {
return func(readerContext context.Context, topic, message string) {
_, span := tracer.Start(readerContext, "Process incoming message")
defer span.End()

logger.Info("Incoming message", zap.String("topic", topic), zap.String("message", message))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package streaming

import (
"context"
"fmt"

"github.com/Shopify/sarama"
"go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama"
"go.opentelemetry.io/otel"
)

const consumerGroupName = "reader"

type Reader struct {
consumerGroup sarama.ConsumerGroup
topic string
}

func GetKafkaReader(brokerUrl, topic string) (*Reader, error) {
config := sarama.NewConfig()
config.Version = sarama.V2_5_0_0
config.Consumer.Return.Errors = true

brokerUrls := []string{brokerUrl}

consumerGroup, err := sarama.NewConsumerGroup(brokerUrls, consumerGroupName, config)
if err != nil {
return nil, fmt.Errorf("error when starting consumer group: %w", err)
}

return &Reader{
consumerGroup: consumerGroup,
topic: topic,
}, nil
}

func (r *Reader) Read(ctx context.Context, messageReader func(context.Context, string, string)) error {
consumerGroupHandler := internalConsumer{
messageReader: messageReader,
}

// Wrap instrumentation
handler := otelsarama.WrapConsumerGroupHandler(&consumerGroupHandler)

err := r.consumerGroup.Consume(ctx, []string{r.topic}, handler)
if err != nil {
return fmt.Errorf("error when consuming via handler: %w", err)
}

return nil
}

// based on https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/instrumentation/github.com/Shopify/sarama/otelsarama/example/consumer/consumer.go

// Represents a Sarama consumer group consumer.
type internalConsumer struct {
messageReader func(context.Context, string, string)
}

// Setup is run at the beginning of a new session, before ConsumeClaim.
func (consumer *internalConsumer) Setup(sarama.ConsumerGroupSession) error {
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited.
func (consumer *internalConsumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *internalConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29

for message := range claim.Messages() {
ctx := otel.GetTextMapPropagator().Extract(context.Background(), otelsarama.NewConsumerMessageCarrier(message))

fmt.Println("Headers: ", message.Headers)
consumer.messageReader(ctx, string(message.Topic), string(message.Value))
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package telemetry

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdkTrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const spanExporterTimeout = 1 * time.Minute

func Setup(ctx context.Context, otelExporterEndpoint, serviceName string) (trace.Tracer, error) {
spanExporter, err := newSpanExporter(ctx, otelExporterEndpoint)
if err != nil {
return nil, fmt.Errorf("failed to setup span exporter: %w", err)
}

traceProvider, err := newTraceProvider(ctx, spanExporter, serviceName)
if err != nil {
return nil, fmt.Errorf("failed to setup trace provider: %w", err)
}

return traceProvider.Tracer(serviceName), nil
}

func newSpanExporter(ctx context.Context, otelExporterEndpoint string) (sdkTrace.SpanExporter, error) {
ctx, cancel := context.WithTimeout(ctx, spanExporterTimeout)
defer cancel()

conn, err := grpc.DialContext(ctx, otelExporterEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
}

traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
if err != nil {
return nil, fmt.Errorf("failed to create trace exporter: %w", err)
}

return traceExporter, nil
}

func newTraceProvider(ctx context.Context, spanExporter sdkTrace.SpanExporter, serviceName string) (*sdkTrace.TracerProvider, error) {
defaultResource := resource.Default()

extraResources, _ := resource.New(
context.Background(),
resource.WithOS(),
resource.WithProcess(),
resource.WithContainer(),
resource.WithHost(),
)

mergedResource, err := resource.Merge(
defaultResource,
extraResources,
)
if err != nil {
return nil, fmt.Errorf("failed to create otel resource: %w", err)
}

tp := sdkTrace.NewTracerProvider(
sdkTrace.WithBatcher(spanExporter),
sdkTrace.WithResource(mergedResource),
)

otel.SetTracerProvider(tp)

otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
),
)

return tp, nil
}
Loading

0 comments on commit 9ac8991

Please sign in to comment.