Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
updated to use new NewSpan definition
Browse files Browse the repository at this point in the history
Signed-off-by: Dan Rammer <[email protected]>
  • Loading branch information
hamersaw committed Nov 15, 2022
1 parent 4f2c320 commit 13cc4de
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 24 deletions.
4 changes: 2 additions & 2 deletions storage/cached_rawstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *cachedRawStore) Head(ctx context.Context, reference DataReference) (Met

// ReadRaw retrieves a byte array from the Blob store or an error
func (s *cachedRawStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) {
ctx, span := telemetryutils.NewSpan(ctx, "blobstore", "cachedRawStore.ReadRaw")
ctx, span := telemetryutils.NewSpan(ctx, "blobstore-client", "flytestdlib.storage.cachedRawStore/ReadRaw")
defer span.End()

key := []byte(reference)
Expand Down Expand Up @@ -88,7 +88,7 @@ func (s *cachedRawStore) ReadRaw(ctx context.Context, reference DataReference) (

// WriteRaw stores a raw byte array.
func (s *cachedRawStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error {
ctx, span := telemetryutils.NewSpan(ctx, "blobstore", "cachedRawStore.WriteRaw")
ctx, span := telemetryutils.NewSpan(ctx, "blobstore-client", "flytestdlib.storage.cachedRawStore/WriteRaw")
defer span.End()

var buf bytes.Buffer
Expand Down
4 changes: 2 additions & 2 deletions storage/protobuf_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type DefaultProtobufStore struct {
}

func (s DefaultProtobufStore) ReadProtobuf(ctx context.Context, reference DataReference, msg proto.Message) error {
ctx, span := telemetryutils.NewSpan(ctx, "blobstore", "DefaultProtobufStore.ReadProtobuf")
ctx, span := telemetryutils.NewSpan(ctx, "blobstore-client", "flytestdlib.storage.DefaultProtobufStore/ReadProtobuf")
defer span.End()

rc, err := s.ReadRaw(ctx, reference)
Expand Down Expand Up @@ -67,7 +67,7 @@ func (s DefaultProtobufStore) ReadProtobuf(ctx context.Context, reference DataRe
}

func (s DefaultProtobufStore) WriteProtobuf(ctx context.Context, reference DataReference, opts Options, msg proto.Message) error {
ctx, span := telemetryutils.NewSpan(ctx, "blobstore", "DefaultProtobufStore.WriteProtobuf")
ctx, span := telemetryutils.NewSpan(ctx, "blobstore-client", "flytestdlib.storage.DefaultProtobufStore/WriteProtobuf")
defer span.End()

t := s.metrics.MarshalTime.Start()
Expand Down
12 changes: 11 additions & 1 deletion telemetryutils/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,17 @@ func RegisterTracerProvider(serviceName string, config *Config) error {
}

opts = append(opts, trace.WithResource(telemetryResource))
tracerProviders[serviceName] = trace.NewTracerProvider(opts...)
tracerProvider := trace.NewTracerProvider(opts...)

tracerProviders[serviceName] = tracerProvider
return nil
}

func GetTracerProvider(serviceName string) rawtrace.TracerProvider {
if t, ok := tracerProviders[serviceName]; ok {
return t
}

// TODO @hamersaw - add warning "tracerProvider 'foo' not registered"
return noopTracerProvider
}
28 changes: 17 additions & 11 deletions telemetryutils/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ package telemetryutils

import (
"context"
"fmt"

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
K8S_SERVICE_NAME = "k8s-client"
PACKAGE = "controller-runtime.pkg.client"
)

type K8sCacheWrapper struct {
cache.Cache
}
Expand All @@ -16,13 +22,13 @@ func WrapK8sCache(c cache.Cache) cache.Cache {
}

func (c *K8sCacheWrapper) Get(ctx context.Context, key client.ObjectKey, obj client.Object) error {
ctx, span := NewSpan(ctx, "kubernetes", "Cache.Get")
ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Cache/Get", PACKAGE))
defer span.End()
return c.Cache.Get(ctx, key, obj)
}

func (c *K8sCacheWrapper) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
ctx, span := NewSpan(ctx, "kubernetes", "Cache.List")
ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Cache/List", PACKAGE))
defer span.End()
return c.Cache.List(ctx, list, opts...)
}
Expand All @@ -38,43 +44,43 @@ func WrapK8sClient(c client.Client) client.Client {
}

func (c *K8sClientWrapper) Get(ctx context.Context, key client.ObjectKey, obj client.Object) error {
ctx, span := NewSpan(ctx, "kubernetes", "Client.Get")
ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Client/Get", PACKAGE))
defer span.End()
return c.Client.Get(ctx, key, obj)
}

func (c *K8sClientWrapper) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
ctx, span := NewSpan(ctx, "kubernetes", "Client.List")
ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Client/List", PACKAGE))
defer span.End()
return c.Client.List(ctx, list, opts...)
}

func (c *K8sClientWrapper) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
ctx, span := NewSpan(ctx, "kubernetes", "Client.Create")
ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Client/Create", PACKAGE))
defer span.End()
return c.Client.Create(ctx, obj, opts...)
}

func (c *K8sClientWrapper) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
ctx, span := NewSpan(ctx, "kubernetes", "Client.Delete")
ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Client/Delete", PACKAGE))
defer span.End()
return c.Client.Delete(ctx, obj, opts...)
}

func (c *K8sClientWrapper) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
ctx, span := NewSpan(ctx, "kubernetes", "Client.Update")
ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Client/Update", PACKAGE))
defer span.End()
return c.Client.Update(ctx, obj, opts...)
}

func (c *K8sClientWrapper) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
ctx, span := NewSpan(ctx, "kubernetes", "Client.Patch")
ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Client/Patch", PACKAGE))
defer span.End()
return c.Client.Patch(ctx, obj, patch, opts...)
}

func (c *K8sClientWrapper) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error {
ctx, span := NewSpan(ctx, "kubernetes", "Client.DeleteAllOf")
ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Client/DeleteAllOf", PACKAGE))
defer span.End()
return c.Client.DeleteAllOf(ctx, obj, opts...)
}
Expand All @@ -88,13 +94,13 @@ type K8sStatusWriterWrapper struct {
}

func (s *K8sStatusWriterWrapper) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
ctx, span := NewSpan(ctx, "kubernetes", "StatusWriter.Update")
ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.StatusWriter/Update", PACKAGE))
defer span.End()
return s.StatusWriter.Update(ctx, obj, opts...)
}

func (s *K8sStatusWriterWrapper) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
ctx, span := NewSpan(ctx, "kubernetes", "StatusWriter.Patch")
ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.StatusWriter/Patch", PACKAGE))
defer span.End()
return s.StatusWriter.Patch(ctx, obj, patch, opts...)
}
10 changes: 2 additions & 8 deletions telemetryutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@ import (

func NewSpan(ctx context.Context, serviceName string, spanName string) (context.Context, trace.Span) {
// TODO @hamersaw - can check ctx.IDK to see if tracing is enabled on
// if not -> use a NoopTracerProvider
var tracerProvider trace.TracerProvider
if t, ok := tracerProviders[serviceName]; ok {
tracerProvider = t
} else {
// TODO @hamersaw - add warning "tracerProvider 'foo' not registered"
tracerProvider = noopTracerProvider
}

var attributes []attribute.KeyValue
for key, value := range contextutils.GetLogFields(ctx) {
Expand All @@ -39,5 +31,7 @@ func NewSpan(ctx context.Context, serviceName string, spanName string) (context.
}
}

tracerProvider := GetTracerProvider(serviceName)
return tracerProvider.Tracer("default").Start(ctx, spanName, trace.WithAttributes(attributes...))
// TODO @hamersaw - tracer should be the library name - ex. github.com/flyteorg/flytepropeller
}

0 comments on commit 13cc4de

Please sign in to comment.