Skip to content

Commit

Permalink
Add list Datasets endpoint (#25)
Browse files Browse the repository at this point in the history
* Add list Datasets endpoint
  • Loading branch information
chanadian authored Dec 10, 2019
1 parent 65abdc7 commit 6bb0ab4
Show file tree
Hide file tree
Showing 14 changed files with 669 additions and 126 deletions.
66 changes: 64 additions & 2 deletions datacatalog/pkg/manager/impl/dataset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package impl

import (
"context"
"strconv"
"time"

"github.com/lyft/datacatalog/pkg/common"
"github.com/lyft/datacatalog/pkg/errors"
"github.com/lyft/datacatalog/pkg/manager/impl/validators"
"github.com/lyft/datacatalog/pkg/manager/interfaces"
"github.com/lyft/datacatalog/pkg/repositories"
"github.com/lyft/datacatalog/pkg/repositories/transformers"
datacatalog "github.com/lyft/datacatalog/protos/gen"

"github.com/lyft/datacatalog/pkg/errors"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/promutils/labeled"
Expand All @@ -26,6 +27,8 @@ type datasetMetrics struct {
createErrorCounter labeled.Counter
getSuccessCounter labeled.Counter
getErrorCounter labeled.Counter
listSuccessCounter labeled.Counter
listFailureCounter labeled.Counter
transformerErrorCounter labeled.Counter
validationErrorCounter labeled.Counter
alreadyExistsCounter labeled.Counter
Expand Down Expand Up @@ -129,6 +132,63 @@ func (dm *datasetManager) GetDataset(ctx context.Context, request datacatalog.Ge
}, nil
}

// List Datasets with optional filtering and pagination
func (dm *datasetManager) ListDatasets(ctx context.Context, request datacatalog.ListDatasetsRequest) (*datacatalog.ListDatasetsResponse, error) {
err := validators.ValidateListDatasetsRequest(&request)
if err != nil {
logger.Warningf(ctx, "Invalid list datasets request %v, err: %v", request, err)
dm.systemMetrics.validationErrorCounter.Inc(ctx)
return nil, err
}

// Get the list inputs
listInput, err := transformers.FilterToListInput(ctx, common.Dataset, request.GetFilter())
if err != nil {
logger.Warningf(ctx, "Invalid list datasets request %v, err: %v", request, err)
dm.systemMetrics.validationErrorCounter.Inc(ctx)
return nil, err
}

err = transformers.ApplyPagination(request.Pagination, &listInput)
if err != nil {
logger.Warningf(ctx, "Invalid pagination options in list datasets request %v, err: %v", request, err)
dm.systemMetrics.validationErrorCounter.Inc(ctx)
return nil, err
}

// Perform the list with the dataset and listInput filters
datasetModels, err := dm.repo.DatasetRepo().List(ctx, listInput)
if err != nil {
logger.Errorf(ctx, "Unable to list Datasets err: %v", err)
dm.systemMetrics.listFailureCounter.Inc(ctx)
return nil, err
}

// convert returned models into entity list
datasetList := make([]*datacatalog.Dataset, len(datasetModels))
transformerErrs := make([]error, 0)
for idx, datasetModel := range datasetModels {
dataset, err := transformers.FromDatasetModel(datasetModel)
if err != nil {
logger.Errorf(ctx, "Unable to transform Dataset %+v err: %v", dataset.Id, err)
transformerErrs = append(transformerErrs, err)
}

datasetList[idx] = dataset
}

if len(transformerErrs) > 0 {
dm.systemMetrics.listFailureCounter.Inc(ctx)
return nil, errors.NewCollectedErrors(codes.Internal, transformerErrs)
}

token := strconv.Itoa(int(listInput.Offset) + len(datasetList))

logger.Debugf(ctx, "Listed %v matching datasets successfully", len(datasetList))
dm.systemMetrics.listSuccessCounter.Inc(ctx)
return &datacatalog.ListDatasetsResponse{Datasets: datasetList, NextToken: token}, nil
}

func NewDatasetManager(repo repositories.RepositoryInterface, store *storage.DataStore, datasetScope promutils.Scope) interfaces.DatasetManager {
return &datasetManager{
repo: repo,
Expand All @@ -145,6 +205,8 @@ func NewDatasetManager(repo repositories.RepositoryInterface, store *storage.Dat
validationErrorCounter: labeled.NewCounter("validation_failed_count", "The number of times validation failed", datasetScope, labeled.EmitUnlabeledMetric),
alreadyExistsCounter: labeled.NewCounter("already_exists_count", "The number of times a dataset already exists", datasetScope, labeled.EmitUnlabeledMetric),
doesNotExistCounter: labeled.NewCounter("does_not_exists_count", "The number of times a dataset was not found", datasetScope, labeled.EmitUnlabeledMetric),
listSuccessCounter: labeled.NewCounter("list_success_count", "The number of times list dataset succeeded", datasetScope, labeled.EmitUnlabeledMetric),
listFailureCounter: labeled.NewCounter("list_failure_count", "The number of times list dataset failed", datasetScope, labeled.EmitUnlabeledMetric),
},
}
}
111 changes: 98 additions & 13 deletions datacatalog/pkg/manager/impl/dataset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"context"

"github.com/golang/protobuf/proto"
"github.com/lyft/datacatalog/pkg/common"
"github.com/lyft/datacatalog/pkg/errors"
"github.com/lyft/datacatalog/pkg/repositories/mocks"
"github.com/lyft/datacatalog/pkg/repositories/models"
"github.com/lyft/datacatalog/pkg/repositories/transformers"
datacatalog "github.com/lyft/datacatalog/protos/gen"
"github.com/lyft/flytestdlib/contextutils"
mockScope "github.com/lyft/flytestdlib/promutils"
Expand Down Expand Up @@ -153,18 +155,8 @@ func TestGetDataset(t *testing.T) {
dcRepo := getDataCatalogRepo()
datasetManager := NewDatasetManager(dcRepo, nil, mockScope.NewTestScope())

serializedMetadata, _ := proto.Marshal(expectedDataset.Metadata)
datasetModelResponse := models.Dataset{
DatasetKey: models.DatasetKey{
Project: expectedDataset.Id.Project,
Domain: expectedDataset.Id.Domain,
Version: expectedDataset.Id.Version,
Name: expectedDataset.Id.Name,
UUID: expectedDataset.Id.UUID,
},
SerializedMetadata: serializedMetadata,
PartitionKeys: []models.PartitionKey{{Name: expectedDataset.PartitionKeys[0]}, {Name: expectedDataset.PartitionKeys[1]}},
}
datasetModelResponse, err := transformers.CreateDatasetModel(expectedDataset)
assert.NoError(t, err)

dcRepo.MockDatasetRepo.On("Get",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
Expand All @@ -174,7 +166,7 @@ func TestGetDataset(t *testing.T) {
datasetKey.Project == expectedDataset.Id.Project &&
datasetKey.Domain == expectedDataset.Id.Domain &&
datasetKey.Version == expectedDataset.Id.Version
})).Return(datasetModelResponse, nil)
})).Return(*datasetModelResponse, nil)
request := datacatalog.GetDatasetRequest{Dataset: getTestDataset().Id}
datasetResponse, err := datasetManager.GetDataset(context.Background(), request)
assert.NoError(t, err)
Expand Down Expand Up @@ -204,3 +196,96 @@ func TestGetDataset(t *testing.T) {
})

}

func TestListDatasets(t *testing.T) {
ctx := context.Background()
expectedDataset := getTestDataset()
dcRepo := getDataCatalogRepo()

t.Run("List Datasets on invalid filter", func(t *testing.T) {
datasetManager := NewDatasetManager(dcRepo, nil, mockScope.NewTestScope())
filter := &datacatalog.FilterExpression{
Filters: []*datacatalog.SinglePropertyFilter{
{
PropertyFilter: &datacatalog.SinglePropertyFilter_ArtifactFilter{
ArtifactFilter: &datacatalog.ArtifactPropertyFilter{
Property: &datacatalog.ArtifactPropertyFilter_ArtifactId{
ArtifactId: "test",
},
},
},
},
},
}

artifactResponse, err := datasetManager.ListDatasets(ctx, datacatalog.ListDatasetsRequest{Filter: filter})
assert.Error(t, err)
assert.Nil(t, artifactResponse)
responseCode := status.Code(err)
assert.Equal(t, codes.InvalidArgument, responseCode)
})

t.Run("List Datasets with Project and Name", func(t *testing.T) {
datasetManager := NewDatasetManager(dcRepo, nil, mockScope.NewTestScope())
filter := &datacatalog.FilterExpression{
Filters: []*datacatalog.SinglePropertyFilter{
{
PropertyFilter: &datacatalog.SinglePropertyFilter_DatasetFilter{
DatasetFilter: &datacatalog.DatasetPropertyFilter{
Property: &datacatalog.DatasetPropertyFilter_Project{
Project: "testProject",
},
},
},
},
{
PropertyFilter: &datacatalog.SinglePropertyFilter_DatasetFilter{
DatasetFilter: &datacatalog.DatasetPropertyFilter{
Property: &datacatalog.DatasetPropertyFilter_Domain{
Domain: "testDomain",
},
},
},
},
},
}

datasetModel, err := transformers.CreateDatasetModel(expectedDataset)
assert.NoError(t, err)

dcRepo.MockDatasetRepo.On("List", mock.Anything,
mock.MatchedBy(func(listInput models.ListModelsInput) bool {
return len(listInput.ModelFilters) == 2 &&
listInput.ModelFilters[0].Entity == common.Dataset &&
len(listInput.ModelFilters[0].ValueFilters) == 1 &&
listInput.ModelFilters[1].Entity == common.Dataset &&
len(listInput.ModelFilters[1].ValueFilters) == 1 &&
listInput.Limit == 50 &&
listInput.Offset == 0
})).Return([]models.Dataset{*datasetModel}, nil)

datasetResponse, err := datasetManager.ListDatasets(ctx, datacatalog.ListDatasetsRequest{Filter: filter})
assert.NoError(t, err)
assert.NotEmpty(t, datasetResponse)
assert.Len(t, datasetResponse.Datasets, 1)
})

t.Run("List Datasets with no filtering", func(t *testing.T) {
datasetManager := NewDatasetManager(dcRepo, nil, mockScope.NewTestScope())

datasetModel, err := transformers.CreateDatasetModel(expectedDataset)
assert.NoError(t, err)

dcRepo.MockDatasetRepo.On("List", mock.Anything,
mock.MatchedBy(func(listInput models.ListModelsInput) bool {
return len(listInput.ModelFilters) == 0 &&
listInput.Limit == 50 &&
listInput.Offset == 0
})).Return([]models.Dataset{*datasetModel}, nil)

datasetResponse, err := datasetManager.ListDatasets(ctx, datacatalog.ListDatasetsRequest{})
assert.NoError(t, err)
assert.NotEmpty(t, datasetResponse)
assert.Len(t, datasetResponse.Datasets, 1)
})
}
23 changes: 23 additions & 0 deletions datacatalog/pkg/manager/impl/validators/dataset_validator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package validators

import (
"github.com/lyft/datacatalog/pkg/common"
datacatalog "github.com/lyft/datacatalog/protos/gen"
)

Expand Down Expand Up @@ -31,3 +32,25 @@ func ValidateDatasetID(ds *datacatalog.DatasetID) error {
}
return nil
}

// Ensure list Datasets request is properly constructed
func ValidateListDatasetsRequest(request *datacatalog.ListDatasetsRequest) error {
if request.Pagination != nil {
err := ValidatePagination(*request.Pagination)
if err != nil {
return err
}
}

// Datasets cannot be filtered by tag, partitions or artifacts
for _, filter := range request.Filter.GetFilters() {
if filter.GetTagFilter() != nil {
return NewInvalidFilterError(common.Dataset, common.Tag)
} else if filter.GetPartitionFilter() != nil {
return NewInvalidFilterError(common.Dataset, common.Partition)
} else if filter.GetArtifactFilter() != nil {
return NewInvalidFilterError(common.Dataset, common.Artifact)
}
}
return nil
}
1 change: 1 addition & 0 deletions datacatalog/pkg/manager/interfaces/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ import (
type DatasetManager interface {
CreateDataset(ctx context.Context, request idl_datacatalog.CreateDatasetRequest) (*idl_datacatalog.CreateDatasetResponse, error)
GetDataset(ctx context.Context, request idl_datacatalog.GetDatasetRequest) (*idl_datacatalog.GetDatasetResponse, error)
ListDatasets(ctx context.Context, request idl_datacatalog.ListDatasetsRequest) (*idl_datacatalog.ListDatasetsResponse, error)
}
21 changes: 21 additions & 0 deletions datacatalog/pkg/repositories/gormimpl/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/jinzhu/gorm"
"github.com/lyft/datacatalog/pkg/common"
"github.com/lyft/datacatalog/pkg/repositories/errors"
"github.com/lyft/datacatalog/pkg/repositories/interfaces"
"github.com/lyft/datacatalog/pkg/repositories/models"
Expand Down Expand Up @@ -61,3 +62,23 @@ func (h *dataSetRepo) Get(ctx context.Context, in models.DatasetKey) (models.Dat

return ds, nil
}

func (h *dataSetRepo) List(ctx context.Context, in models.ListModelsInput) ([]models.Dataset, error) {
timer := h.repoMetrics.ListDuration.Start(ctx)
defer timer.Stop()

// apply filters and joins
tx, err := applyListModelsInput(h.db, common.Dataset, in)
if err != nil {
return nil, err
} else if tx.Error != nil {
return []models.Dataset{}, h.errorTransformer.ToDataCatalogError(tx.Error)
}

datasets := make([]models.Dataset, 0)
tx = tx.Preload("PartitionKeys").Find(&datasets)
if tx.Error != nil {
return []models.Dataset{}, h.errorTransformer.ToDataCatalogError(tx.Error)
}
return datasets, nil
}
Loading

0 comments on commit 6bb0ab4

Please sign in to comment.