Skip to content

Commit

Permalink
feat(blame) - add schema history blame UI
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya-radhakrishnan committed May 2, 2022
1 parent 1afbc49 commit 5a7e22f
Show file tree
Hide file tree
Showing 24 changed files with 1,165 additions and 30 deletions.
1 change: 1 addition & 0 deletions datahub-graphql-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ graphqlCodegen {
"$projectDir/src/main/resources/recommendation.graphql".toString(),
"$projectDir/src/main/resources/ingestion.graphql".toString(),
"$projectDir/src/main/resources/auth.graphql".toString(),
"$projectDir/src/main/resources/timeline.graphql".toString(),
]
outputDir = new File("$projectDir/src/mainGeneratedGraphQL/java")
packageName = "com.linkedin.datahub.graphql.generated"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class Constants {
public static final String ANALYTICS_SCHEMA_FILE = "analytics.graphql";
public static final String RECOMMENDATIONS_SCHEMA_FILE = "recommendation.graphql";
public static final String INGESTION_SCHEMA_FILE = "ingestion.graphql";
public static final String TIMELINE_SCHEMA_FILE = "timeline.graphql";
public static final String BROWSE_PATH_DELIMITER = "/";
public static final String VERSION_STAMP_FIELD_NAME = "versionStamp";
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@
import com.linkedin.datahub.graphql.resolvers.group.ListGroupsResolver;
import com.linkedin.datahub.graphql.resolvers.group.RemoveGroupMembersResolver;
import com.linkedin.datahub.graphql.resolvers.group.RemoveGroupResolver;
import com.linkedin.datahub.graphql.resolvers.jobs.EntityRunsResolver;
import com.linkedin.datahub.graphql.resolvers.jobs.DataJobRunsResolver;
import com.linkedin.datahub.graphql.resolvers.user.UpdateUserStatusResolver;
import com.linkedin.datahub.graphql.resolvers.policy.GetGrantedPrivilegesResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.CancelIngestionExecutionRequestResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.CreateIngestionExecutionRequestResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.GetIngestionExecutionRequestResolver;
Expand All @@ -92,6 +88,8 @@
import com.linkedin.datahub.graphql.resolvers.ingest.source.GetIngestionSourceResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.source.ListIngestionSourcesResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.source.UpsertIngestionSourceResolver;
import com.linkedin.datahub.graphql.resolvers.jobs.DataJobRunsResolver;
import com.linkedin.datahub.graphql.resolvers.jobs.EntityRunsResolver;
import com.linkedin.datahub.graphql.resolvers.load.AspectResolver;
import com.linkedin.datahub.graphql.resolvers.load.EntityLineageResultResolver;
import com.linkedin.datahub.graphql.resolvers.load.EntityRelationshipsResultResolver;
Expand All @@ -113,6 +111,7 @@
import com.linkedin.datahub.graphql.resolvers.mutate.RemoveTermResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.UpdateDescriptionResolver;
import com.linkedin.datahub.graphql.resolvers.policy.DeletePolicyResolver;
import com.linkedin.datahub.graphql.resolvers.policy.GetGrantedPrivilegesResolver;
import com.linkedin.datahub.graphql.resolvers.policy.ListPoliciesResolver;
import com.linkedin.datahub.graphql.resolvers.policy.UpsertPolicyResolver;
import com.linkedin.datahub.graphql.resolvers.recommendation.ListRecommendationsResolver;
Expand All @@ -122,6 +121,7 @@
import com.linkedin.datahub.graphql.resolvers.search.SearchAcrossLineageResolver;
import com.linkedin.datahub.graphql.resolvers.search.SearchResolver;
import com.linkedin.datahub.graphql.resolvers.tag.SetTagColorResolver;
import com.linkedin.datahub.graphql.resolvers.timeline.GetSchemaBlameResolver;
import com.linkedin.datahub.graphql.resolvers.type.AspectInterfaceTypeResolver;
import com.linkedin.datahub.graphql.resolvers.type.EntityInterfaceTypeResolver;
import com.linkedin.datahub.graphql.resolvers.type.HyperParameterValueTypeResolver;
Expand All @@ -130,8 +130,8 @@
import com.linkedin.datahub.graphql.resolvers.type.TimeSeriesAspectInterfaceTypeResolver;
import com.linkedin.datahub.graphql.resolvers.user.ListUsersResolver;
import com.linkedin.datahub.graphql.resolvers.user.RemoveUserResolver;
import com.linkedin.datahub.graphql.resolvers.user.UpdateUserStatusResolver;
import com.linkedin.datahub.graphql.types.BrowsableEntityType;
import com.linkedin.datahub.graphql.types.dataprocessinst.mappers.DataProcessInstanceRunEventMapper;
import com.linkedin.datahub.graphql.types.EntityType;
import com.linkedin.datahub.graphql.types.LoadableType;
import com.linkedin.datahub.graphql.types.SearchableEntityType;
Expand All @@ -144,10 +144,10 @@
import com.linkedin.datahub.graphql.types.corpuser.CorpUserType;
import com.linkedin.datahub.graphql.types.dashboard.DashboardType;
import com.linkedin.datahub.graphql.types.dataset.VersionedDatasetType;
import com.linkedin.datahub.graphql.types.notebook.NotebookType;
import com.linkedin.datahub.graphql.types.dataflow.DataFlowType;
import com.linkedin.datahub.graphql.types.datajob.DataJobType;
import com.linkedin.datahub.graphql.types.dataplatform.DataPlatformType;
import com.linkedin.datahub.graphql.types.dataprocessinst.mappers.DataProcessInstanceRunEventMapper;
import com.linkedin.datahub.graphql.types.dataset.DatasetType;
import com.linkedin.datahub.graphql.types.dataset.mappers.DatasetProfileMapper;
import com.linkedin.datahub.graphql.types.domain.DomainType;
Expand All @@ -157,6 +157,7 @@
import com.linkedin.datahub.graphql.types.mlmodel.MLModelGroupType;
import com.linkedin.datahub.graphql.types.mlmodel.MLModelType;
import com.linkedin.datahub.graphql.types.mlmodel.MLPrimaryKeyType;
import com.linkedin.datahub.graphql.types.notebook.NotebookType;
import com.linkedin.datahub.graphql.types.tag.TagType;
import com.linkedin.datahub.graphql.types.usage.UsageType;
import com.linkedin.entity.client.EntityClient;
Expand All @@ -167,6 +168,7 @@
import com.linkedin.metadata.recommendation.RecommendationsService;
import com.linkedin.metadata.secret.SecretService;
import com.linkedin.metadata.telemetry.TelemetryConfiguration;
import com.linkedin.metadata.timeline.TimelineService;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import com.linkedin.metadata.version.GitVersion;
import com.linkedin.usage.UsageClient;
Expand Down Expand Up @@ -195,7 +197,7 @@

import static com.linkedin.datahub.graphql.Constants.*;
import static com.linkedin.metadata.Constants.*;
import static graphql.Scalars.GraphQLLong;
import static graphql.Scalars.*;


/**
Expand All @@ -217,6 +219,7 @@ public class GmsGraphQLEngine {
private final GitVersion gitVersion;
private final boolean supportsImpactAnalysis;
private final TimeseriesAspectService timeseriesAspectService;
private final TimelineService timelineService;

private final IngestionConfiguration ingestionConfiguration;
private final AuthenticationConfiguration authenticationConfiguration;
Expand Down Expand Up @@ -288,6 +291,7 @@ public GmsGraphQLEngine(
final AuthenticationConfiguration authenticationConfiguration,
final AuthorizationConfiguration authorizationConfiguration,
final GitVersion gitVersion,
final TimelineService timelineService,
final boolean supportsImpactAnalysis,
final VisualConfiguration visualConfiguration,
final TelemetryConfiguration telemetryConfiguration
Expand All @@ -306,6 +310,7 @@ public GmsGraphQLEngine(
this.gitVersion = gitVersion;
this.supportsImpactAnalysis = supportsImpactAnalysis;
this.timeseriesAspectService = timeseriesAspectService;
this.timelineService = timelineService;

this.ingestionConfiguration = Objects.requireNonNull(ingestionConfiguration);
this.authenticationConfiguration = Objects.requireNonNull(authenticationConfiguration);
Expand Down Expand Up @@ -421,6 +426,7 @@ public GraphQLEngine.Builder builder() {
.addSchema(fileBasedSchema(ANALYTICS_SCHEMA_FILE))
.addSchema(fileBasedSchema(RECOMMENDATIONS_SCHEMA_FILE))
.addSchema(fileBasedSchema(INGESTION_SCHEMA_FILE))
.addSchema(fileBasedSchema(TIMELINE_SCHEMA_FILE))
.addDataLoaders(loaderSuppliers(loadableTypes))
.addDataLoader("Aspect", context -> createDataLoader(aspectType, context))
.addDataLoader("UsageQueryResult", context -> createDataLoader(usageType, context))
Expand Down Expand Up @@ -524,6 +530,7 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("listIngestionSources", new ListIngestionSourcesResolver(this.entityClient))
.dataFetcher("ingestionSource", new GetIngestionSourceResolver(this.entityClient))
.dataFetcher("executionRequest", new GetIngestionExecutionRequestResolver(this.entityClient))
.dataFetcher("getSchemaBlame", new GetSchemaBlameResolver(this.timelineService))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public CompletableFuture<AppConfig> get(final DataFetchingEnvironment environmen

final ManagedIngestionConfig ingestionConfig = new ManagedIngestionConfig();
ingestionConfig.setEnabled(_ingestionConfiguration.isEnabled());
appConfig.setAuthConfig(authConfig);

appConfig.setAnalyticsConfig(analyticsConfig);
appConfig.setPoliciesConfig(policiesConfig);
appConfig.setIdentityManagementConfig(identityManagementConfig);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.linkedin.datahub.graphql.resolvers.timeline;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.generated.GetSchemaBlameInput;
import com.linkedin.datahub.graphql.generated.GetSchemaBlameResult;
import com.linkedin.datahub.graphql.types.timeline.mappers.SchemaFieldBlameMapper;
import com.linkedin.metadata.timeline.TimelineService;
import com.linkedin.metadata.timeline.data.ChangeCategory;
import com.linkedin.metadata.timeline.data.ChangeTransaction;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;


/*
Returns the most recent changes made to each column in a dataset at each dataset version.
*/
@Slf4j
public class GetSchemaBlameResolver implements DataFetcher<CompletableFuture<GetSchemaBlameResult>> {
private final TimelineService _timelineService;

public GetSchemaBlameResolver(TimelineService timelineService) {
_timelineService = timelineService;
}

@Override
public CompletableFuture<GetSchemaBlameResult> get(final DataFetchingEnvironment environment) throws Exception {
final GetSchemaBlameInput input = bindArgument(environment.getArgument("input"), GetSchemaBlameInput.class);

final String datasetUrnString = input.getDatasetUrn();
final long startTime = 0;
final long endTime = 0;
final String version = input.getVersion() == null ? null : input.getVersion();

return CompletableFuture.supplyAsync(() -> {
try {
final Set<ChangeCategory> changeCategorySet = new HashSet<>();
changeCategorySet.add(ChangeCategory.TECHNICAL_SCHEMA);
Urn datasetUrn = Urn.createFromString(datasetUrnString);
List<ChangeTransaction> changeTransactionList =
_timelineService.getTimeline(datasetUrn, changeCategorySet, startTime, endTime, null, null, false);
return SchemaFieldBlameMapper.map(changeTransactionList, version);
} catch (URISyntaxException u) {
log.error(
String.format("Failed to list schema blame data, likely due to the Urn %s being invalid", datasetUrnString),
u);
return null;
} catch (Exception e) {
log.error("Failed to list schema blame data", e);
return null;
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package com.linkedin.datahub.graphql.types.timeline.mappers;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.generated.ChangeOperationType;
import com.linkedin.datahub.graphql.generated.GetSchemaBlameResult;
import com.linkedin.datahub.graphql.generated.SchemaFieldBlame;
import com.linkedin.datahub.graphql.generated.SchemaFieldChange;
import com.linkedin.datahub.graphql.generated.SemanticVersionStruct;
import com.linkedin.metadata.key.SchemaFieldKey;
import com.linkedin.metadata.timeline.data.ChangeCategory;
import com.linkedin.metadata.timeline.data.ChangeEvent;
import com.linkedin.metadata.timeline.data.ChangeTransaction;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.util.Pair;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.parquet.SemanticVersion;


// Class for converting ChangeTransactions received from the Timeline API to SchemaFieldBlame structs for every schema
// at every semantic version.
@Slf4j
public class SchemaFieldBlameMapper {

public static GetSchemaBlameResult map(List<ChangeTransaction> changeTransactions, @Nullable String versionCutoff) {
if (changeTransactions.isEmpty()) {
return null;
}

Map<String, SchemaFieldBlame> schemaBlameMap = new HashMap<>();
GetSchemaBlameResult result = new GetSchemaBlameResult();

String latestSemanticVersionString =
truncateSemanticVersion(changeTransactions.get(changeTransactions.size() - 1).getSemVer());
long latestSemanticVersionTimestamp = changeTransactions.get(changeTransactions.size() - 1).getTimestamp();
String latestVersionStamp = changeTransactions.get(changeTransactions.size() - 1).getVersionStamp();
result.setLatestVersion(
new SemanticVersionStruct(latestSemanticVersionString, latestSemanticVersionTimestamp, latestVersionStamp));

String semanticVersionFilterString = versionCutoff == null ? latestSemanticVersionString : versionCutoff;
Optional<SemanticVersion> semanticVersionFilterOptional = createSemanticVersion(semanticVersionFilterString);
if (!semanticVersionFilterOptional.isPresent()) {
return result;
}

SemanticVersion semanticVersionFilter = semanticVersionFilterOptional.get();

List<ChangeTransaction> reversedChangeTransactions = changeTransactions.stream()
.map(SchemaFieldBlameMapper::semanticVersionChangeTransactionPair)
.filter(Optional::isPresent)
.map(Optional::get)
.filter(semanticVersionChangeTransactionPair ->
semanticVersionChangeTransactionPair.getFirst().compareTo(semanticVersionFilter) <= 0)
.sorted(Collections.reverseOrder(Comparator.comparing(Pair::getFirst)))
.map(Pair::getSecond)
.collect(Collectors.toList());

String selectedSemanticVersion = truncateSemanticVersion(reversedChangeTransactions.get(0).getSemVer());
long selectedSemanticVersionTimestamp = reversedChangeTransactions.get(0).getTimestamp();
String selectedVersionStamp = reversedChangeTransactions.get(0).getVersionStamp();
result.setVersion(
new SemanticVersionStruct(selectedSemanticVersion, selectedSemanticVersionTimestamp, selectedVersionStamp));

List<SemanticVersionStruct> semanticVersionStructList = new ArrayList<>();
for (ChangeTransaction changeTransaction : reversedChangeTransactions) {
SemanticVersionStruct semanticVersionStruct =
new SemanticVersionStruct(truncateSemanticVersion(changeTransaction.getSemVer()),
changeTransaction.getTimestamp(), changeTransaction.getVersionStamp());
semanticVersionStructList.add(semanticVersionStruct);

for (ChangeEvent changeEvent : changeTransaction.getChangeEvents()) {
if (changeEvent.getCategory() != ChangeCategory.TECHNICAL_SCHEMA) {
continue;
}

String schemaUrn = changeEvent.getModifier();
if (schemaUrn == null || schemaBlameMap.containsKey(schemaUrn)) {
continue;
}

SchemaFieldBlame schemaFieldBlame = new SchemaFieldBlame();

SchemaFieldKey schemaFieldKey;
try {
schemaFieldKey = (SchemaFieldKey) EntityKeyUtils.convertUrnToEntityKey(Urn.createFromString(schemaUrn),
new SchemaFieldKey().schema());
} catch (Exception e) {
log.debug(String.format("Could not generate schema urn for %s", schemaUrn));
continue;
}

String fieldPath = schemaFieldKey.getFieldPath();
schemaFieldBlame.setFieldPath(fieldPath);

SchemaFieldChange schemaFieldChange =
getLastSchemaFieldChange(changeEvent, changeTransaction.getTimestamp(), changeTransaction.getSemVer(),
changeTransaction.getVersionStamp());
schemaFieldBlame.setSchemaFieldChange(schemaFieldChange);

schemaBlameMap.put(schemaUrn, schemaFieldBlame);
}
}

result.setSchemaFieldBlameList(schemaBlameMap.values()
.stream()
.filter(schemaFieldBlame -> !schemaFieldBlame.getSchemaFieldChange()
.getChangeType()
.equals(ChangeOperationType.REMOVE))
.collect(Collectors.toList()));
result.setSemanticVersionList(semanticVersionStructList);
return result;
}

private static Optional<Pair<SemanticVersion, ChangeTransaction>> semanticVersionChangeTransactionPair(
ChangeTransaction changeTransaction) {
Optional<SemanticVersion> semanticVersion = createSemanticVersion(changeTransaction.getSemVer());
return semanticVersion.map(version -> Pair.of(version, changeTransaction));
}

private static Optional<SemanticVersion> createSemanticVersion(String semanticVersionString) {
String truncatedSemanticVersion = truncateSemanticVersion(semanticVersionString);
try {
SemanticVersion semanticVersion = SemanticVersion.parse(truncatedSemanticVersion);
return Optional.of(semanticVersion);
} catch (SemanticVersion.SemanticVersionParseException e) {
return Optional.empty();
}
}

// The SemanticVersion is currently returned from the ChangeTransactions in the format "x.y.z-computed". This function
// removes the suffix "computed".
private static String truncateSemanticVersion(String semanticVersion) {
String suffix = "-computed";
return semanticVersion.endsWith(suffix) ? semanticVersion.substring(0, semanticVersion.lastIndexOf(suffix))
: semanticVersion;
}

private static SchemaFieldChange getLastSchemaFieldChange(ChangeEvent changeEvent, long timestamp,
String semanticVersion, String versionStamp) {
SchemaFieldChange schemaFieldChange = new SchemaFieldChange();
schemaFieldChange.setTimestampMillis(timestamp);
schemaFieldChange.setSemanticVersion(truncateSemanticVersion(semanticVersion));
schemaFieldChange.setChangeType(
ChangeOperationType.valueOf(ChangeOperationType.class, changeEvent.getOperation().toString()));
schemaFieldChange.setVersionStamp(versionStamp);

String translatedChangeOperationType;
switch (changeEvent.getOperation()) {
case ADD:
translatedChangeOperationType = "Added";
break;
case MODIFY:
translatedChangeOperationType = "Modified";
break;
case REMOVE:
translatedChangeOperationType = "Removed";
break;
default:
translatedChangeOperationType = "Unknown change made";
log.warn(translatedChangeOperationType);
break;
}

String suffix = "-computed";
String translatedSemanticVersion =
semanticVersion.endsWith(suffix) ? semanticVersion.substring(0, semanticVersion.lastIndexOf(suffix))
: semanticVersion;

String lastSchemaFieldChange = String.format("%s in v%s", translatedChangeOperationType, translatedSemanticVersion);
schemaFieldChange.setLastSchemaFieldChange(lastSchemaFieldChange);

return schemaFieldChange;
}

private SchemaFieldBlameMapper() {
}
}
Loading

0 comments on commit 5a7e22f

Please sign in to comment.