Skip to content

Commit

Permalink
Merge pull request #3 from lyft/metrics
Browse files Browse the repository at this point in the history
Add Observability through Metrics and debug Logging
  • Loading branch information
chanadian authored Aug 29, 2019
2 parents 34744ad + 6d2fc42 commit 0da0ffb
Show file tree
Hide file tree
Showing 24 changed files with 426 additions and 65 deletions.
11 changes: 7 additions & 4 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 7 additions & 10 deletions cmd/entrypoints/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@ package entrypoints

import (
"context"
"fmt"
"html"
"net"
"net/http"

"github.com/lyft/datacatalog/pkg/config"
"github.com/lyft/datacatalog/pkg/rpc/datacatalogservice"
datacatalog "github.com/lyft/datacatalog/protos/gen"
"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils/labeled"
"github.com/spf13/cobra"
"google.golang.org/grpc"
)
Expand All @@ -28,7 +24,7 @@ var serveCmd = &cobra.Command{
go func() {
err := serveHealthcheck(ctx, cfg)
if err != nil {
logger.Errorf(ctx, "Unable to serve http", config.GetConfig().GetGrpcHostAddress(), err)
logger.Errorf(ctx, "Unable to serve http", config.GetConfig().GetHTTPHostAddress(), err)
}
}()

Expand All @@ -38,8 +34,6 @@ var serveCmd = &cobra.Command{

func init() {
RootCmd.AddCommand(serveCmd)

labeled.SetMetricKeys(contextutils.AppNameKey)
}

// Create and start the gRPC server
Expand All @@ -63,10 +57,13 @@ func newGRPCServer(_ context.Context) *grpc.Server {
}

func serveHealthcheck(ctx context.Context, cfg *config.Config) error {
http.HandleFunc("/healthcheck", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Healthcheck success on %v", html.EscapeString(r.URL.Path))
mux := http.NewServeMux()

// Register Healthcheck
mux.HandleFunc("/healthcheck", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})

logger.Infof(ctx, "Serving DataCatalog http on port %v", cfg.GetHTTPHostAddress())
return http.ListenAndServe(cfg.GetHTTPHostAddress(), nil)
return http.ListenAndServe(cfg.GetHTTPHostAddress(), mux)
}
4 changes: 0 additions & 4 deletions cmd/entrypoints/serve_dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
"github.com/lyft/datacatalog/pkg/config"
"github.com/lyft/datacatalog/pkg/rpc/datacatalogservice"
datacatalog "github.com/lyft/datacatalog/protos/gen"
"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils/labeled"
"github.com/spf13/cobra"
"google.golang.org/grpc"
)
Expand All @@ -26,8 +24,6 @@ var serveDummyCmd = &cobra.Command{

func init() {
RootCmd.AddCommand(serveDummyCmd)

labeled.SetMetricKeys(contextutils.AppNameKey)
}

// Create and start the gRPC server and http healthcheck endpoint
Expand Down
4 changes: 3 additions & 1 deletion datacatalog_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
# Real configuration when running inside K8s (local or otherwise) lives in a ConfigMap
# Look in the artifacts directory in the flyte repo for what's actually run
application:
grpcPort: 8089
grpcPort: 8081
httpPort: 8080
datacatalog:
storage-prefix: "metadata"
metrics-scope: "datacatalog"
profiler-port: 10254
storage:
connection:
access-key: minio
Expand Down
10 changes: 10 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,13 @@ func NewDataCatalogError(code codes.Code, message string) error {
func NewDataCatalogErrorf(code codes.Code, format string, a ...interface{}) error {
return NewDataCatalogError(code, fmt.Sprintf(format, a...))
}

func IsAlreadyExistsError(err error) bool {
dcErr, ok := err.(DataCatalogError)
return ok && dcErr.GRPCStatus().Code() == codes.AlreadyExists
}

func IsDoesNotExistError(err error) bool {
dcErr, ok := err.(DataCatalogError)
return ok && dcErr.GRPCStatus().Code() == codes.NotFound
}
22 changes: 22 additions & 0 deletions pkg/errors/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package errors

import (
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
)

func TestAlreadyExists(t *testing.T) {
alreadyExistsErr := NewDataCatalogError(codes.AlreadyExists, "already exists")
notFoundErr := NewDataCatalogError(codes.NotFound, "not found")
assert.True(t, IsAlreadyExistsError(alreadyExistsErr))
assert.False(t, IsAlreadyExistsError(notFoundErr))
}

func TestNotFoundErr(t *testing.T) {
alreadyExistsErr := NewDataCatalogError(codes.AlreadyExists, "already exists")
notFoundErr := NewDataCatalogError(codes.NotFound, "not found")
assert.False(t, IsDoesNotExistError(alreadyExistsErr))
assert.True(t, IsDoesNotExistError(notFoundErr))
}
80 changes: 80 additions & 0 deletions pkg/manager/impl/artifact_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,52 @@ import (
"github.com/lyft/datacatalog/pkg/repositories/models"
"github.com/lyft/datacatalog/pkg/repositories/transformers"

"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/promutils/labeled"
"github.com/lyft/flytestdlib/storage"
"google.golang.org/grpc/codes"
)

type artifactMetrics struct {
scope promutils.Scope
createSuccessCounter labeled.Counter
createFailureCounter labeled.Counter
getSuccessCounter labeled.Counter
getFailureCounter labeled.Counter
createDataFailureCounter labeled.Counter
createDataSuccessCounter labeled.Counter
transformerErrorCounter labeled.Counter
validationErrorCounter labeled.Counter
alreadyExistsCounter labeled.Counter
doesNotExistCounter labeled.Counter
}

type artifactManager struct {
repo repositories.RepositoryInterface
artifactStore ArtifactDataStore
systemMetrics artifactMetrics
}

// Create an Artifact along with the associated ArtifactData. The ArtifactData will be stored in an offloaded location.
func (m *artifactManager) CreateArtifact(ctx context.Context, request datacatalog.CreateArtifactRequest) (*datacatalog.CreateArtifactResponse, error) {
artifact := request.Artifact
err := validators.ValidateArtifact(artifact)
if err != nil {
logger.Warningf(ctx, "Invalid create artifact request %v, err: %v", request, err)
m.systemMetrics.validationErrorCounter.Inc(ctx)
return nil, err
}

ctx = contextutils.WithProjectDomain(ctx, artifact.Dataset.Project, artifact.Dataset.Domain)
datasetKey := transformers.FromDatasetID(*artifact.Dataset)

// The dataset must exist for the artifact, let's verify that first
_, err = m.repo.DatasetRepo().Get(ctx, datasetKey)
if err != nil {
logger.Warnf(ctx, "Failed to get dataset for artifact creation %v, err: %v", datasetKey, err)
m.systemMetrics.createFailureCounter.Inc(ctx)
return nil, err
}

Expand All @@ -43,22 +66,40 @@ func (m *artifactManager) CreateArtifact(ctx context.Context, request datacatalo
for i, artifactData := range request.Artifact.Data {
dataLocation, err := m.artifactStore.PutData(ctx, *artifact, *artifactData)
if err != nil {
logger.Errorf(ctx, "Failed to store artifact data err: %v", err)
m.systemMetrics.createDataFailureCounter.Inc(ctx)
return nil, err
}

artifactDataModels[i].Name = artifactData.Name
artifactDataModels[i].Location = dataLocation.String()
m.systemMetrics.createDataSuccessCounter.Inc(ctx)
}

logger.Debugf(ctx, "Stored %v data for artifact %+v", len(artifactDataModels), artifact.Id)

artifactModel, err := transformers.CreateArtifactModel(request, artifactDataModels)
if err != nil {
logger.Errorf(ctx, "Failed to transform artifact err: %v", err)
m.systemMetrics.transformerErrorCounter.Inc(ctx)
return nil, err
}

err = m.repo.ArtifactRepo().Create(ctx, artifactModel)
if err != nil {
if errors.IsAlreadyExistsError(err) {
logger.Warnf(ctx, "Artifact already exists key: %+v, err %v", artifact.Id, err)
m.systemMetrics.alreadyExistsCounter.Inc(ctx)
} else {
logger.Errorf(ctx, "Failed to create artifact %v, err: %v", artifactDataModels, err)
m.systemMetrics.createFailureCounter.Inc(ctx)
}
return nil, err
}

logger.Debugf(ctx, "Successfully created artifact id: %v", artifact.Id)

m.systemMetrics.createSuccessCounter.Inc(ctx)
return &datacatalog.CreateArtifactResponse{}, nil
}

Expand All @@ -67,23 +108,42 @@ func (m *artifactManager) GetArtifact(ctx context.Context, request datacatalog.G
datasetID := request.Dataset
err := validators.ValidateGetArtifactRequest(request)
if err != nil {
logger.Warningf(ctx, "Invalid get artifact request %v, err: %v", request, err)
m.systemMetrics.validationErrorCounter.Inc(ctx)
return nil, err
}

ctx = contextutils.WithProjectDomain(ctx, datasetID.Project, datasetID.Domain)
var artifactModel models.Artifact
switch request.QueryHandle.(type) {
case *datacatalog.GetArtifactRequest_ArtifactId:
logger.Debugf(ctx, "Get artifact by id %v", request.GetArtifactId())
artifactKey := transformers.ToArtifactKey(*datasetID, request.GetArtifactId())
artifactModel, err = m.repo.ArtifactRepo().Get(ctx, artifactKey)

if err != nil {
if errors.IsDoesNotExistError(err) {
logger.Warnf(ctx, "Artifact does not exist id: %+v, err %v", request.GetArtifactId(), err)
m.systemMetrics.doesNotExistCounter.Inc(ctx)
} else {
logger.Errorf(ctx, "Unable to retrieve artifact by id: %+v, err %v", request.GetArtifactId(), err)
m.systemMetrics.getFailureCounter.Inc(ctx)
}
return nil, err
}
case *datacatalog.GetArtifactRequest_TagName:
logger.Debugf(ctx, "Get artifact by id %v", request.GetTagName())
tagKey := transformers.ToTagKey(*datasetID, request.GetTagName())
tag, err := m.repo.TagRepo().Get(ctx, tagKey)

if err != nil {
if errors.IsDoesNotExistError(err) {
logger.Warnf(ctx, "Artifact does not exist tag: %+v, err %v", request.GetTagName(), err)
m.systemMetrics.doesNotExistCounter.Inc(ctx)
} else {
logger.Errorf(ctx, "Unable to retrieve Artifact by tag %v, err: %v", request.GetTagName(), err)
m.systemMetrics.getFailureCounter.Inc(ctx)
}
return nil, err
}

Expand All @@ -96,13 +156,16 @@ func (m *artifactManager) GetArtifact(ctx context.Context, request datacatalog.G

artifact, err := transformers.FromArtifactModel(artifactModel)
if err != nil {
logger.Errorf(ctx, "Error in transforming get artifact request %+v, err %v", artifactModel, err)
m.systemMetrics.transformerErrorCounter.Inc(ctx)
return nil, err
}

artifactDataList := make([]*datacatalog.ArtifactData, len(artifactModel.ArtifactData))
for i, artifactData := range artifactModel.ArtifactData {
value, err := m.artifactStore.GetData(ctx, artifactData)
if err != nil {
logger.Errorf(ctx, "Error in getting artifact data from datastore %+v, err %v", artifactData.Location, err)
return nil, err
}

Expand All @@ -113,14 +176,31 @@ func (m *artifactManager) GetArtifact(ctx context.Context, request datacatalog.G
}
artifact.Data = artifactDataList

logger.Debugf(ctx, "Retrieved artifact dataset %v, id: %v", artifact.Dataset, artifact.Id)
m.systemMetrics.getSuccessCounter.Inc(ctx)
return &datacatalog.GetArtifactResponse{
Artifact: &artifact,
}, nil
}

func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.DataStore, storagePrefix storage.DataReference, artifactScope promutils.Scope) interfaces.ArtifactManager {
artifactMetrics := artifactMetrics{
scope: artifactScope,
createSuccessCounter: labeled.NewCounter("create_artifact_success_count", "The number of times create artifact was called", artifactScope, labeled.EmitUnlabeledMetric),
getSuccessCounter: labeled.NewCounter("get_artifact_success_count", "The number of times get artifact was called", artifactScope, labeled.EmitUnlabeledMetric),
createFailureCounter: labeled.NewCounter("create_artifact_failure_count", "The number of times create artifact failed", artifactScope, labeled.EmitUnlabeledMetric),
getFailureCounter: labeled.NewCounter("get_artifact_failure_count", "The number of times get artifact failed", artifactScope, labeled.EmitUnlabeledMetric),
createDataFailureCounter: labeled.NewCounter("create_artifact_data_failure_count", "The number of times create artifact data failed", artifactScope, labeled.EmitUnlabeledMetric),
createDataSuccessCounter: labeled.NewCounter("create_artifact_data_succeeded_count", "The number of times create artifact data succeeded", artifactScope, labeled.EmitUnlabeledMetric),
transformerErrorCounter: labeled.NewCounter("transformer_failed_count", "The number of times transformations failed", artifactScope, labeled.EmitUnlabeledMetric),
validationErrorCounter: labeled.NewCounter("validation_failed_count", "The number of times validation failed", artifactScope, labeled.EmitUnlabeledMetric),
alreadyExistsCounter: labeled.NewCounter("already_exists_count", "The number of times an artifact already exists", artifactScope, labeled.EmitUnlabeledMetric),
doesNotExistCounter: labeled.NewCounter("does_not_exists_count", "The number of times an artifact was not found", artifactScope, labeled.EmitUnlabeledMetric),
}

return &artifactManager{
repo: repo,
artifactStore: NewArtifactDataStore(store, storagePrefix),
systemMetrics: artifactMetrics,
}
}
5 changes: 4 additions & 1 deletion pkg/manager/impl/artifact_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ import (
"google.golang.org/grpc/status"
)

func createInmemoryDataStore(t testing.TB, scope mockScope.Scope) *storage.DataStore {
func init() {
labeled.SetMetricKeys(contextutils.AppNameKey)
}

func createInmemoryDataStore(t testing.TB, scope mockScope.Scope) *storage.DataStore {
cfg := storage.Config{
Type: storage.TypeMemory,
}
Expand Down
Loading

0 comments on commit 0da0ffb

Please sign in to comment.