diff --git a/storage/cached_rawstore.go b/storage/cached_rawstore.go index 1bfdc06..a462356 100644 --- a/storage/cached_rawstore.go +++ b/storage/cached_rawstore.go @@ -50,7 +50,7 @@ func (s *cachedRawStore) Head(ctx context.Context, reference DataReference) (Met // ReadRaw retrieves a byte array from the Blob store or an error func (s *cachedRawStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) { - ctx, span := telemetryutils.NewSpan(ctx, "blobstore", "cachedRawStore.ReadRaw") + ctx, span := telemetryutils.NewSpan(ctx, "blobstore-client", "flytestdlib.storage.cachedRawStore/ReadRaw") defer span.End() key := []byte(reference) @@ -88,7 +88,7 @@ func (s *cachedRawStore) ReadRaw(ctx context.Context, reference DataReference) ( // WriteRaw stores a raw byte array. func (s *cachedRawStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error { - ctx, span := telemetryutils.NewSpan(ctx, "blobstore", "cachedRawStore.WriteRaw") + ctx, span := telemetryutils.NewSpan(ctx, "blobstore-client", "flytestdlib.storage.cachedRawStore/WriteRaw") defer span.End() var buf bytes.Buffer diff --git a/storage/protobuf_store.go b/storage/protobuf_store.go index adcfba9..6847e91 100644 --- a/storage/protobuf_store.go +++ b/storage/protobuf_store.go @@ -33,7 +33,7 @@ type DefaultProtobufStore struct { } func (s DefaultProtobufStore) ReadProtobuf(ctx context.Context, reference DataReference, msg proto.Message) error { - ctx, span := telemetryutils.NewSpan(ctx, "blobstore", "DefaultProtobufStore.ReadProtobuf") + ctx, span := telemetryutils.NewSpan(ctx, "blobstore-client", "flytestdlib.storage.DefaultProtobufStore/ReadProtobuf") defer span.End() rc, err := s.ReadRaw(ctx, reference) @@ -67,7 +67,7 @@ func (s DefaultProtobufStore) ReadProtobuf(ctx context.Context, reference DataRe } func (s DefaultProtobufStore) WriteProtobuf(ctx context.Context, reference DataReference, opts Options, msg proto.Message) error { - ctx, span := telemetryutils.NewSpan(ctx, "blobstore", "DefaultProtobufStore.WriteProtobuf") + ctx, span := telemetryutils.NewSpan(ctx, "blobstore-client", "flytestdlib.storage.DefaultProtobufStore/WriteProtobuf") defer span.End() t := s.metrics.MarshalTime.Start() diff --git a/telemetryutils/factory.go b/telemetryutils/factory.go index e4b4fba..ab412d7 100644 --- a/telemetryutils/factory.go +++ b/telemetryutils/factory.go @@ -74,7 +74,17 @@ func RegisterTracerProvider(serviceName string, config *Config) error { } opts = append(opts, trace.WithResource(telemetryResource)) - tracerProviders[serviceName] = trace.NewTracerProvider(opts...) + tracerProvider := trace.NewTracerProvider(opts...) + tracerProviders[serviceName] = tracerProvider return nil } + +func GetTracerProvider(serviceName string) rawtrace.TracerProvider { + if t, ok := tracerProviders[serviceName]; ok { + return t + } + + // TODO @hamersaw - add warning "tracerProvider 'foo' not registered" + return noopTracerProvider +} diff --git a/telemetryutils/k8s.go b/telemetryutils/k8s.go index aaff0c0..68aed08 100644 --- a/telemetryutils/k8s.go +++ b/telemetryutils/k8s.go @@ -2,11 +2,17 @@ package telemetryutils import ( "context" + "fmt" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" ) +const ( + K8S_SERVICE_NAME = "k8s-client" + PACKAGE = "controller-runtime.pkg.client" +) + type K8sCacheWrapper struct { cache.Cache } @@ -16,13 +22,13 @@ func WrapK8sCache(c cache.Cache) cache.Cache { } func (c *K8sCacheWrapper) Get(ctx context.Context, key client.ObjectKey, obj client.Object) error { - ctx, span := NewSpan(ctx, "kubernetes", "Cache.Get") + ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Cache/Get", PACKAGE)) defer span.End() return c.Cache.Get(ctx, key, obj) } func (c *K8sCacheWrapper) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { - ctx, span := NewSpan(ctx, "kubernetes", "Cache.List") + ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Cache/List", PACKAGE)) defer span.End() return c.Cache.List(ctx, list, opts...) } @@ -38,43 +44,43 @@ func WrapK8sClient(c client.Client) client.Client { } func (c *K8sClientWrapper) Get(ctx context.Context, key client.ObjectKey, obj client.Object) error { - ctx, span := NewSpan(ctx, "kubernetes", "Client.Get") + ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Client/Get", PACKAGE)) defer span.End() return c.Client.Get(ctx, key, obj) } func (c *K8sClientWrapper) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { - ctx, span := NewSpan(ctx, "kubernetes", "Client.List") + ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Client/List", PACKAGE)) defer span.End() return c.Client.List(ctx, list, opts...) } func (c *K8sClientWrapper) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { - ctx, span := NewSpan(ctx, "kubernetes", "Client.Create") + ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Client/Create", PACKAGE)) defer span.End() return c.Client.Create(ctx, obj, opts...) } func (c *K8sClientWrapper) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { - ctx, span := NewSpan(ctx, "kubernetes", "Client.Delete") + ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Client/Delete", PACKAGE)) defer span.End() return c.Client.Delete(ctx, obj, opts...) } func (c *K8sClientWrapper) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { - ctx, span := NewSpan(ctx, "kubernetes", "Client.Update") + ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Client/Update", PACKAGE)) defer span.End() return c.Client.Update(ctx, obj, opts...) } func (c *K8sClientWrapper) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { - ctx, span := NewSpan(ctx, "kubernetes", "Client.Patch") + ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Client/Patch", PACKAGE)) defer span.End() return c.Client.Patch(ctx, obj, patch, opts...) } func (c *K8sClientWrapper) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error { - ctx, span := NewSpan(ctx, "kubernetes", "Client.DeleteAllOf") + ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.Client/DeleteAllOf", PACKAGE)) defer span.End() return c.Client.DeleteAllOf(ctx, obj, opts...) } @@ -88,13 +94,13 @@ type K8sStatusWriterWrapper struct { } func (s *K8sStatusWriterWrapper) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { - ctx, span := NewSpan(ctx, "kubernetes", "StatusWriter.Update") + ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.StatusWriter/Update", PACKAGE)) defer span.End() return s.StatusWriter.Update(ctx, obj, opts...) } func (s *K8sStatusWriterWrapper) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { - ctx, span := NewSpan(ctx, "kubernetes", "StatusWriter.Patch") + ctx, span := NewSpan(ctx, K8S_SERVICE_NAME, fmt.Sprintf("%s.StatusWriter/Patch", PACKAGE)) defer span.End() return s.StatusWriter.Patch(ctx, obj, patch, opts...) } diff --git a/telemetryutils/utils.go b/telemetryutils/utils.go index 4ad92b9..4243180 100644 --- a/telemetryutils/utils.go +++ b/telemetryutils/utils.go @@ -23,14 +23,6 @@ import ( func NewSpan(ctx context.Context, serviceName string, spanName string) (context.Context, trace.Span) { // TODO @hamersaw - can check ctx.IDK to see if tracing is enabled on - // if not -> use a NoopTracerProvider - var tracerProvider trace.TracerProvider - if t, ok := tracerProviders[serviceName]; ok { - tracerProvider = t - } else { - // TODO @hamersaw - add warning "tracerProvider 'foo' not registered" - tracerProvider = noopTracerProvider - } var attributes []attribute.KeyValue for key, value := range contextutils.GetLogFields(ctx) { @@ -39,5 +31,7 @@ func NewSpan(ctx context.Context, serviceName string, spanName string) (context. } } + tracerProvider := GetTracerProvider(serviceName) return tracerProvider.Tracer("default").Start(ctx, spanName, trace.WithAttributes(attributes...)) + // TODO @hamersaw - tracer should be the library name - ex. github.com/flyteorg/flytepropeller }