Skip to content

Commit

Permalink
feat(open assertion spec): MVP for Snowflake DMF Assertions: update m…
Browse files Browse the repository at this point in the history
…odels, add assertions cli with snowflake integration (datahub-project#10602)
  • Loading branch information
mayurinehate authored and sleeperdeep committed Jun 25, 2024
1 parent ba9ebc9 commit e8ca3dd
Show file tree
Hide file tree
Showing 77 changed files with 5,351 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ private Constants() {}
public static final String LINEAGE_SCHEMA_FILE = "lineage.graphql";
public static final String PROPERTIES_SCHEMA_FILE = "properties.graphql";
public static final String FORMS_SCHEMA_FILE = "forms.graphql";
public static final String ASSERTIONS_SCHEMA_FILE = "assertions.graphql";
public static final String INCIDENTS_SCHEMA_FILE = "incident.graphql";
public static final String CONNECTIONS_SCHEMA_FILE = "connection.graphql";
public static final String BROWSE_PATH_DELIMITER = "/";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,12 @@
import com.linkedin.datahub.graphql.resolvers.assertion.AssertionRunEventResolver;
import com.linkedin.datahub.graphql.resolvers.assertion.DeleteAssertionResolver;
import com.linkedin.datahub.graphql.resolvers.assertion.EntityAssertionsResolver;
import com.linkedin.datahub.graphql.resolvers.auth.*;
import com.linkedin.datahub.graphql.resolvers.auth.CreateAccessTokenResolver;
import com.linkedin.datahub.graphql.resolvers.auth.DebugAccessResolver;
import com.linkedin.datahub.graphql.resolvers.auth.GetAccessTokenMetadataResolver;
import com.linkedin.datahub.graphql.resolvers.auth.GetAccessTokenResolver;
import com.linkedin.datahub.graphql.resolvers.auth.ListAccessTokensResolver;
import com.linkedin.datahub.graphql.resolvers.auth.RevokeAccessTokenResolver;
import com.linkedin.datahub.graphql.resolvers.browse.BrowsePathsResolver;
import com.linkedin.datahub.graphql.resolvers.browse.BrowseResolver;
import com.linkedin.datahub.graphql.resolvers.browse.EntityBrowsePathsResolver;
Expand Down Expand Up @@ -814,6 +819,7 @@ public GraphQLEngine.Builder builder() {
.addSchema(fileBasedSchema(PROPERTIES_SCHEMA_FILE))
.addSchema(fileBasedSchema(FORMS_SCHEMA_FILE))
.addSchema(fileBasedSchema(CONNECTIONS_SCHEMA_FILE))
.addSchema(fileBasedSchema(ASSERTIONS_SCHEMA_FILE))
.addSchema(fileBasedSchema(INCIDENTS_SCHEMA_FILE));

for (GmsGraphQLPlugin plugin : this.graphQLPlugins) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ public CompletableFuture<AssertionRunEventsResult> get(DataFetchingEnvironment e
&& AssertionResultType.SUCCESS.equals(
runEvent.getResult().getType()))
.count()));
result.setErrored(
Math.toIntExact(
runEvents.stream()
.filter(
runEvent ->
AssertionRunStatus.COMPLETE.equals(runEvent.getStatus())
&& runEvent.getResult() != null
&& AssertionResultType.ERROR.equals(
runEvent.getResult().getType()))
.count()));
result.setRunEvents(runEvents);
return result;
} catch (RemoteInvocationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static com.linkedin.metadata.Constants.GLOBAL_TAGS_ASPECT_NAME;

import com.linkedin.assertion.AssertionAction;
import com.linkedin.assertion.AssertionActions;
import com.linkedin.assertion.AssertionInfo;
import com.linkedin.common.DataPlatformInstance;
import com.linkedin.common.GlobalTags;
Expand All @@ -10,24 +12,40 @@
import com.linkedin.data.DataMap;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.Assertion;
import com.linkedin.datahub.graphql.generated.AssertionActionType;
import com.linkedin.datahub.graphql.generated.AssertionSource;
import com.linkedin.datahub.graphql.generated.AssertionSourceType;
import com.linkedin.datahub.graphql.generated.AssertionStdAggregation;
import com.linkedin.datahub.graphql.generated.AssertionStdOperator;
import com.linkedin.datahub.graphql.generated.AssertionStdParameter;
import com.linkedin.datahub.graphql.generated.AssertionStdParameterType;
import com.linkedin.datahub.graphql.generated.AssertionStdParameters;
import com.linkedin.datahub.graphql.generated.AssertionType;
import com.linkedin.datahub.graphql.generated.AuditStamp;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.DatasetAssertionInfo;
import com.linkedin.datahub.graphql.generated.DatasetAssertionScope;
import com.linkedin.datahub.graphql.generated.DateInterval;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.FieldAssertionInfo;
import com.linkedin.datahub.graphql.generated.FixedIntervalSchedule;
import com.linkedin.datahub.graphql.generated.FreshnessAssertionInfo;
import com.linkedin.datahub.graphql.generated.SchemaAssertionCompatibility;
import com.linkedin.datahub.graphql.generated.SchemaAssertionField;
import com.linkedin.datahub.graphql.generated.SchemaAssertionInfo;
import com.linkedin.datahub.graphql.generated.SchemaFieldRef;
import com.linkedin.datahub.graphql.generated.SqlAssertionInfo;
import com.linkedin.datahub.graphql.generated.VolumeAssertionInfo;
import com.linkedin.datahub.graphql.types.common.mappers.DataPlatformInstanceAspectMapper;
import com.linkedin.datahub.graphql.types.common.mappers.StringMapMapper;
import com.linkedin.datahub.graphql.types.dataset.mappers.SchemaFieldMapper;
import com.linkedin.datahub.graphql.types.dataset.mappers.SchemaMetadataMapper;
import com.linkedin.datahub.graphql.types.tag.mappers.GlobalTagsMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.metadata.Constants;
import com.linkedin.schema.SchemaField;
import java.util.Collections;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand All @@ -48,6 +66,14 @@ public static Assertion map(@Nullable QueryContext context, final EntityResponse
result.setInfo(
mapAssertionInfo(context, new AssertionInfo(envelopedAssertionInfo.getValue().data())));
}

final EnvelopedAspect envelopedAssertionActions =
aspects.get(Constants.ASSERTION_ACTIONS_ASPECT_NAME);
if (envelopedAssertionActions != null) {
result.setActions(
mapAssertionActions(new AssertionActions(envelopedAssertionActions.getValue().data())));
}

final EnvelopedAspect envelopedPlatformInstance =
aspects.get(Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME);
if (envelopedPlatformInstance != null) {
Expand Down Expand Up @@ -83,20 +109,93 @@ private static com.linkedin.datahub.graphql.generated.Status mapStatus(Status st
return result;
}

private static com.linkedin.datahub.graphql.generated.AssertionInfo mapAssertionInfo(
public static com.linkedin.datahub.graphql.generated.AssertionInfo mapAssertionInfo(
@Nullable QueryContext context, final AssertionInfo gmsAssertionInfo) {
final com.linkedin.datahub.graphql.generated.AssertionInfo assertionInfo =
new com.linkedin.datahub.graphql.generated.AssertionInfo();
assertionInfo.setType(AssertionType.valueOf(gmsAssertionInfo.getType().name()));

if (gmsAssertionInfo.hasLastUpdated()) {
assertionInfo.setLastUpdated(
new AuditStamp(
gmsAssertionInfo.getLastUpdated().getTime(),
gmsAssertionInfo.getLastUpdated().getActor().toString()));
}
if (gmsAssertionInfo.hasDatasetAssertion()) {
DatasetAssertionInfo datasetAssertion =
mapDatasetAssertionInfo(context, gmsAssertionInfo.getDatasetAssertion());
assertionInfo.setDatasetAssertion(datasetAssertion);
}
assertionInfo.setDescription(gmsAssertionInfo.getDescription());
// Description
if (gmsAssertionInfo.hasDescription()) {
assertionInfo.setDescription(gmsAssertionInfo.getDescription());
}
// FRESHNESS Assertions
if (gmsAssertionInfo.hasFreshnessAssertion()) {
FreshnessAssertionInfo freshnessAssertionInfo =
FreshnessAssertionMapper.mapFreshnessAssertionInfo(
context, gmsAssertionInfo.getFreshnessAssertion());
assertionInfo.setFreshnessAssertion(freshnessAssertionInfo);
}
// VOLUME Assertions
if (gmsAssertionInfo.hasVolumeAssertion()) {
VolumeAssertionInfo volumeAssertionInfo =
VolumeAssertionMapper.mapVolumeAssertionInfo(
context, gmsAssertionInfo.getVolumeAssertion());
assertionInfo.setVolumeAssertion(volumeAssertionInfo);
}
// SQL Assertions
if (gmsAssertionInfo.hasSqlAssertion()) {
SqlAssertionInfo sqlAssertionInfo =
SqlAssertionMapper.mapSqlAssertionInfo(gmsAssertionInfo.getSqlAssertion());
assertionInfo.setSqlAssertion(sqlAssertionInfo);
}
// FIELD Assertions
if (gmsAssertionInfo.hasFieldAssertion()) {
FieldAssertionInfo fieldAssertionInfo =
FieldAssertionMapper.mapFieldAssertionInfo(context, gmsAssertionInfo.getFieldAssertion());
assertionInfo.setFieldAssertion(fieldAssertionInfo);
}
// SCHEMA Assertions
if (gmsAssertionInfo.hasSchemaAssertion()) {
SchemaAssertionInfo schemaAssertionInfo =
mapSchemaAssertionInfo(context, gmsAssertionInfo.getSchemaAssertion());
assertionInfo.setSchemaAssertion(schemaAssertionInfo);
}
// Source Type
if (gmsAssertionInfo.hasSource()) {
assertionInfo.setSource(mapSource(gmsAssertionInfo.getSource()));
}
return assertionInfo;
}

private static com.linkedin.datahub.graphql.generated.AssertionActions mapAssertionActions(
final AssertionActions gmsAssertionActions) {
final com.linkedin.datahub.graphql.generated.AssertionActions result =
new com.linkedin.datahub.graphql.generated.AssertionActions();
if (gmsAssertionActions.hasOnFailure()) {
result.setOnFailure(
gmsAssertionActions.getOnFailure().stream()
.map(AssertionMapper::mapAssertionAction)
.collect(Collectors.toList()));
}
if (gmsAssertionActions.hasOnSuccess()) {
result.setOnSuccess(
gmsAssertionActions.getOnSuccess().stream()
.map(AssertionMapper::mapAssertionAction)
.collect(Collectors.toList()));
}
return result;
}

private static com.linkedin.datahub.graphql.generated.AssertionAction mapAssertionAction(
final AssertionAction gmsAssertionAction) {
final com.linkedin.datahub.graphql.generated.AssertionAction result =
new com.linkedin.datahub.graphql.generated.AssertionAction();
result.setType(AssertionActionType.valueOf(gmsAssertionAction.getType().toString()));
return result;
}

private static DatasetAssertionInfo mapDatasetAssertionInfo(
@Nullable QueryContext context,
final com.linkedin.assertion.DatasetAssertionInfo gmsDatasetAssertion) {
Expand Down Expand Up @@ -152,7 +251,7 @@ private static SchemaFieldRef mapDatasetSchemaField(final Urn schemaFieldUrn) {
return new SchemaFieldRef(schemaFieldUrn.toString(), schemaFieldUrn.getEntityKey().get(1));
}

private static AssertionStdParameters mapParameters(
protected static AssertionStdParameters mapParameters(
final com.linkedin.assertion.AssertionStdParameters params) {
final AssertionStdParameters result = new AssertionStdParameters();
if (params.hasValue()) {
Expand All @@ -175,5 +274,61 @@ private static AssertionStdParameter mapParameter(
return result;
}

private AssertionMapper() {}
protected static FixedIntervalSchedule mapFixedIntervalSchedule(
com.linkedin.assertion.FixedIntervalSchedule gmsFixedIntervalSchedule) {
FixedIntervalSchedule fixedIntervalSchedule = new FixedIntervalSchedule();
fixedIntervalSchedule.setUnit(DateInterval.valueOf(gmsFixedIntervalSchedule.getUnit().name()));
fixedIntervalSchedule.setMultiple(gmsFixedIntervalSchedule.getMultiple());
return fixedIntervalSchedule;
}

private static AssertionSource mapSource(final com.linkedin.assertion.AssertionSource gmsSource) {
AssertionSource result = new AssertionSource();
result.setType(AssertionSourceType.valueOf(gmsSource.getType().toString()));
if (gmsSource.hasCreated()) {
result.setCreated(
new AuditStamp(
gmsSource.getCreated().getTime(), gmsSource.getCreated().getActor().toString()));
}
return result;
}

protected static com.linkedin.datahub.graphql.generated.SchemaFieldSpec mapSchemaFieldSpec(
final com.linkedin.schema.SchemaFieldSpec gmsField) {
final com.linkedin.datahub.graphql.generated.SchemaFieldSpec result =
new com.linkedin.datahub.graphql.generated.SchemaFieldSpec();
result.setPath(gmsField.getPath());
result.setType(gmsField.getType());
result.setNativeType(gmsField.getNativeType());
return result;
}

private static SchemaAssertionInfo mapSchemaAssertionInfo(
@Nullable final QueryContext context,
final com.linkedin.assertion.SchemaAssertionInfo gmsSchemaAssertionInfo) {
SchemaAssertionInfo result = new SchemaAssertionInfo();
result.setCompatibility(
SchemaAssertionCompatibility.valueOf(gmsSchemaAssertionInfo.getCompatibility().name()));
result.setEntityUrn(gmsSchemaAssertionInfo.getEntity().toString());
result.setSchema(
SchemaMetadataMapper.INSTANCE.apply(
context, gmsSchemaAssertionInfo.getSchema(), gmsSchemaAssertionInfo.getEntity(), 0L));
result.setFields(
gmsSchemaAssertionInfo.getSchema().getFields().stream()
.map(AssertionMapper::mapSchemaField)
.collect(Collectors.toList()));
return result;
}

private static SchemaAssertionField mapSchemaField(final SchemaField gmsField) {
SchemaAssertionField result = new SchemaAssertionField();
result.setPath(gmsField.getFieldPath());
result.setType(new SchemaFieldMapper().mapSchemaFieldDataType(gmsField.getType()));
if (gmsField.hasNativeDataType()) {
result.setNativeType(gmsField.getNativeDataType());
}
return result;
}

protected AssertionMapper() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public class AssertionType
Constants.ASSERTION_KEY_ASPECT_NAME,
Constants.ASSERTION_INFO_ASPECT_NAME,
Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME,
Constants.GLOBAL_TAGS_ASPECT_NAME);

Constants.GLOBAL_TAGS_ASPECT_NAME,
Constants.ASSERTION_ACTIONS_ASPECT_NAME);
private final EntityClient _entityClient;

public AssertionType(final EntityClient entityClient) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.linkedin.datahub.graphql.types.assertion;

import com.linkedin.assertion.FieldAssertionInfo;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.AssertionStdOperator;
import com.linkedin.datahub.graphql.generated.FieldAssertionType;
import com.linkedin.datahub.graphql.generated.FieldMetricType;
import com.linkedin.datahub.graphql.generated.FieldTransformType;
import com.linkedin.datahub.graphql.generated.FieldValuesFailThresholdType;
import com.linkedin.datahub.graphql.types.dataset.mappers.DatasetFilterMapper;
import javax.annotation.Nullable;

public class FieldAssertionMapper extends AssertionMapper {

public static com.linkedin.datahub.graphql.generated.FieldAssertionInfo mapFieldAssertionInfo(
@Nullable final QueryContext context, final FieldAssertionInfo gmsFieldAssertionInfo) {
final com.linkedin.datahub.graphql.generated.FieldAssertionInfo result =
new com.linkedin.datahub.graphql.generated.FieldAssertionInfo();
result.setEntityUrn(gmsFieldAssertionInfo.getEntity().toString());
result.setType(FieldAssertionType.valueOf(gmsFieldAssertionInfo.getType().name()));
if (gmsFieldAssertionInfo.hasFilter()) {
result.setFilter(DatasetFilterMapper.map(context, gmsFieldAssertionInfo.getFilter()));
}
if (gmsFieldAssertionInfo.hasFieldValuesAssertion()) {
result.setFieldValuesAssertion(
mapFieldValuesAssertion(gmsFieldAssertionInfo.getFieldValuesAssertion()));
}
if (gmsFieldAssertionInfo.hasFieldMetricAssertion()) {
result.setFieldMetricAssertion(
mapFieldMetricAssertion(gmsFieldAssertionInfo.getFieldMetricAssertion()));
}
return result;
}

private static com.linkedin.datahub.graphql.generated.FieldValuesAssertion
mapFieldValuesAssertion(
final com.linkedin.assertion.FieldValuesAssertion gmsFieldValuesAssertion) {
final com.linkedin.datahub.graphql.generated.FieldValuesAssertion result =
new com.linkedin.datahub.graphql.generated.FieldValuesAssertion();
result.setField(mapSchemaFieldSpec(gmsFieldValuesAssertion.getField()));
result.setOperator(AssertionStdOperator.valueOf(gmsFieldValuesAssertion.getOperator().name()));
result.setFailThreshold(
mapFieldValuesFailThreshold(gmsFieldValuesAssertion.getFailThreshold()));
result.setExcludeNulls(gmsFieldValuesAssertion.isExcludeNulls());

if (gmsFieldValuesAssertion.hasTransform()) {
result.setTransform(mapFieldTransform(gmsFieldValuesAssertion.getTransform()));
}

if (gmsFieldValuesAssertion.hasParameters()) {
result.setParameters(mapParameters(gmsFieldValuesAssertion.getParameters()));
}
return result;
}

private static com.linkedin.datahub.graphql.generated.FieldMetricAssertion
mapFieldMetricAssertion(
final com.linkedin.assertion.FieldMetricAssertion gmsFieldMetricAssertion) {
final com.linkedin.datahub.graphql.generated.FieldMetricAssertion result =
new com.linkedin.datahub.graphql.generated.FieldMetricAssertion();
result.setField(mapSchemaFieldSpec(gmsFieldMetricAssertion.getField()));
result.setMetric(FieldMetricType.valueOf(gmsFieldMetricAssertion.getMetric().name()));
result.setOperator(AssertionStdOperator.valueOf(gmsFieldMetricAssertion.getOperator().name()));

if (gmsFieldMetricAssertion.hasParameters()) {
result.setParameters(mapParameters(gmsFieldMetricAssertion.getParameters()));
}

return result;
}

private static com.linkedin.datahub.graphql.generated.FieldTransform mapFieldTransform(
final com.linkedin.assertion.FieldTransform gmsFieldTransform) {
final com.linkedin.datahub.graphql.generated.FieldTransform result =
new com.linkedin.datahub.graphql.generated.FieldTransform();
result.setType(FieldTransformType.valueOf(gmsFieldTransform.getType().name()));
return result;
}

private static com.linkedin.datahub.graphql.generated.FieldValuesFailThreshold
mapFieldValuesFailThreshold(
final com.linkedin.assertion.FieldValuesFailThreshold gmsFieldValuesFailThreshold) {
final com.linkedin.datahub.graphql.generated.FieldValuesFailThreshold result =
new com.linkedin.datahub.graphql.generated.FieldValuesFailThreshold();
result.setType(
FieldValuesFailThresholdType.valueOf(gmsFieldValuesFailThreshold.getType().name()));
result.setValue(gmsFieldValuesFailThreshold.getValue());
return result;
}

private FieldAssertionMapper() {}
}
Loading

0 comments on commit e8ca3dd

Please sign in to comment.