diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java index ef581fbe142bc..077230ba29849 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java @@ -30,6 +30,7 @@ import com.linkedin.datahub.graphql.generated.CorpUserInfo; import com.linkedin.datahub.graphql.generated.Dashboard; import com.linkedin.datahub.graphql.generated.DashboardInfo; +import com.linkedin.datahub.graphql.generated.DashboardUserUsageCounts; import com.linkedin.datahub.graphql.generated.DataFlow; import com.linkedin.datahub.graphql.generated.DataJob; import com.linkedin.datahub.graphql.generated.DataJobInputOutput; @@ -81,6 +82,7 @@ import com.linkedin.datahub.graphql.resolvers.config.AppConfigResolver; import com.linkedin.datahub.graphql.resolvers.container.ContainerEntitiesResolver; import com.linkedin.datahub.graphql.resolvers.container.ParentContainersResolver; +import com.linkedin.datahub.graphql.resolvers.dashboard.DashboardUsageStatsResolver; import com.linkedin.datahub.graphql.resolvers.dataset.DatasetHealthResolver; import com.linkedin.datahub.graphql.resolvers.deprecation.UpdateDeprecationResolver; import com.linkedin.datahub.graphql.resolvers.domain.CreateDomainResolver; @@ -1015,6 +1017,7 @@ private void configureDashboardResolvers(final RuntimeWiring.Builder builder) { }) ) .dataFetcher("parentContainers", new ParentContainersResolver(entityClient)) + .dataFetcher("usageStats", new DashboardUsageStatsResolver(timeseriesAspectService)) ); builder.type("DashboardInfo", typeWiring -> typeWiring .dataFetcher("charts", new LoadableTypeBatchResolver<>(chartType, @@ -1022,6 +1025,11 @@ private void configureDashboardResolvers(final RuntimeWiring.Builder builder) { .map(Chart::getUrn) .collect(Collectors.toList()))) ); + builder.type("DashboardUserUsageCounts", typeWiring -> typeWiring + .dataFetcher("user", new LoadableTypeResolver<>( + corpUserType, + (env) -> ((DashboardUserUsageCounts) env.getSource()).getUser().getUrn())) + ); } /** diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/dashboard/DashboardUsageStatsResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/dashboard/DashboardUsageStatsResolver.java new file mode 100644 index 0000000000000..b57e70128e4ab --- /dev/null +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/dashboard/DashboardUsageStatsResolver.java @@ -0,0 +1,350 @@ +package com.linkedin.datahub.graphql.resolvers.dashboard; + +import com.google.common.collect.ImmutableList; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.StringArray; +import com.linkedin.datahub.graphql.generated.CorpUser; +import com.linkedin.datahub.graphql.generated.DashboardUsageAggregation; +import com.linkedin.datahub.graphql.generated.DashboardUsageAggregationMetrics; +import com.linkedin.datahub.graphql.generated.DashboardUsageMetrics; +import com.linkedin.datahub.graphql.generated.DashboardUsageQueryResult; +import com.linkedin.datahub.graphql.generated.DashboardUsageQueryResultAggregations; +import com.linkedin.datahub.graphql.generated.DashboardUserUsageCounts; +import com.linkedin.datahub.graphql.generated.Entity; +import com.linkedin.datahub.graphql.generated.WindowDuration; +import com.linkedin.datahub.graphql.types.dashboard.mappers.DashboardUsageMetricMapper; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.aspect.EnvelopedAspect; +import com.linkedin.metadata.query.filter.Condition; +import com.linkedin.metadata.query.filter.ConjunctiveCriterion; +import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; +import com.linkedin.metadata.query.filter.Criterion; +import com.linkedin.metadata.query.filter.CriterionArray; +import com.linkedin.metadata.query.filter.Filter; +import com.linkedin.metadata.timeseries.TimeseriesAspectService; +import com.linkedin.timeseries.AggregationSpec; +import com.linkedin.timeseries.AggregationType; +import com.linkedin.timeseries.CalendarInterval; +import com.linkedin.timeseries.GenericTable; +import com.linkedin.timeseries.GroupingBucket; +import com.linkedin.timeseries.GroupingBucketType; +import com.linkedin.timeseries.TimeWindowSize; +import graphql.schema.DataFetcher; +import graphql.schema.DataFetchingEnvironment; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + + +/** + * Resolver used for resolving the usage statistics of a Dashboard. + *

+ * Returns daily as well as absolute usage metrics of Dashboard + */ +@Slf4j +public class DashboardUsageStatsResolver implements DataFetcher> { + private static final String ES_FIELD_URN = "urn"; + private static final String ES_FIELD_TIMESTAMP = "timestampMillis"; + private static final String ES_FIELD_EVENT_GRANULARITY = "eventGranularity"; + private static final String ES_NULL_VALUE = "NULL"; + private final TimeseriesAspectService timeseriesAspectService; + + public DashboardUsageStatsResolver(TimeseriesAspectService timeseriesAspectService) { + this.timeseriesAspectService = timeseriesAspectService; + } + + @Override + public CompletableFuture get(DataFetchingEnvironment environment) throws Exception { + final String dashboardUrn = ((Entity) environment.getSource()).getUrn(); + final Long maybeStartTimeMillis = environment.getArgumentOrDefault("startTimeMillis", null); + final Long maybeEndTimeMillis = environment.getArgumentOrDefault("endTimeMillis", null); + // Max number of aspects to return for absolute dashboard usage. + final Integer maybeLimit = environment.getArgumentOrDefault("limit", null); + + return CompletableFuture.supplyAsync(() -> { + DashboardUsageQueryResult usageQueryResult = new DashboardUsageQueryResult(); + + // Time Bucket Stats + Filter bucketStatsFilter = createBucketUsageStatsFilter(dashboardUrn, maybeStartTimeMillis, maybeEndTimeMillis); + List dailyUsageBuckets = getBuckets(bucketStatsFilter, dashboardUrn); + DashboardUsageQueryResultAggregations aggregations = getAggregations(bucketStatsFilter, dailyUsageBuckets); + + usageQueryResult.setBuckets(dailyUsageBuckets); + usageQueryResult.setAggregations(aggregations); + + // Absolute usage metrics + List dashboardUsageMetrics = + getDashboardUsageMetrics(dashboardUrn, maybeStartTimeMillis, maybeEndTimeMillis, maybeLimit); + usageQueryResult.setMetrics(dashboardUsageMetrics); + return usageQueryResult; + }); + } + + private List getDashboardUsageMetrics(String dashboardUrn, Long maybeStartTimeMillis, + Long maybeEndTimeMillis, Integer maybeLimit) { + List dashboardUsageMetrics; + try { + Filter filter = new Filter(); + final ArrayList criteria = new ArrayList<>(); + + // Add filter for absence of eventGranularity - only consider absolute stats + Criterion excludeTimeBucketsCriterion = + new Criterion().setField(ES_FIELD_EVENT_GRANULARITY).setCondition(Condition.IS_NULL).setValue(""); + criteria.add(excludeTimeBucketsCriterion); + filter.setOr(new ConjunctiveCriterionArray( + ImmutableList.of(new ConjunctiveCriterion().setAnd(new CriterionArray(criteria))))); + + List aspects = + timeseriesAspectService.getAspectValues(Urn.createFromString(dashboardUrn), Constants.DASHBOARD_ENTITY_NAME, + Constants.DASHBOARD_USAGE_STATISTICS_ASPECT_NAME, maybeStartTimeMillis, maybeEndTimeMillis, maybeLimit, + null, filter); + dashboardUsageMetrics = aspects.stream().map(DashboardUsageMetricMapper::map).collect(Collectors.toList()); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid resource", e); + } + return dashboardUsageMetrics; + } + + private DashboardUsageQueryResultAggregations getAggregations(Filter bucketStatsFilter, + final List dailyUsageBuckets) { + List userUsageCounts = getUserUsageCounts(bucketStatsFilter); + DashboardUsageQueryResultAggregations aggregations = new DashboardUsageQueryResultAggregations(); + aggregations.setUsers(userUsageCounts); + aggregations.setUniqueUserCount(userUsageCounts.size()); + + // Compute total viewsCount and executionsCount for queries time range from the buckets itself. + // We want to avoid issuing an additional query with a sum aggregation. + Integer totalViewsCount = null; + Integer totalExecutionsCount = null; + for (DashboardUsageAggregation bucket : dailyUsageBuckets) { + if (bucket.getMetrics().getExecutionsCount() != null) { + if (totalExecutionsCount == null) { + totalExecutionsCount = 0; + } + totalExecutionsCount += bucket.getMetrics().getExecutionsCount(); + } + if (bucket.getMetrics().getViewsCount() != null) { + if (totalViewsCount == null) { + totalViewsCount = 0; + } + totalViewsCount += bucket.getMetrics().getViewsCount(); + } + } + + aggregations.setExecutionsCount(totalExecutionsCount); + aggregations.setViewsCount(totalViewsCount); + return aggregations; + } + + private List getBuckets(Filter bucketStatsFilter, String dashboardUrn) { + AggregationSpec usersCountAggregation = + new AggregationSpec().setAggregationType(AggregationType.SUM).setFieldPath("uniqueUserCount"); + AggregationSpec viewsCountAggregation = + new AggregationSpec().setAggregationType(AggregationType.SUM).setFieldPath("viewsCount"); + AggregationSpec executionsCountAggregation = + new AggregationSpec().setAggregationType(AggregationType.SUM).setFieldPath("executionsCount"); + + AggregationSpec usersCountCardinalityAggregation = + new AggregationSpec().setAggregationType(AggregationType.CARDINALITY).setFieldPath("uniqueUserCount"); + AggregationSpec viewsCountCardinalityAggregation = + new AggregationSpec().setAggregationType(AggregationType.CARDINALITY).setFieldPath("viewsCount"); + AggregationSpec executionsCountCardinalityAggregation = + new AggregationSpec().setAggregationType(AggregationType.CARDINALITY).setFieldPath("executionsCount"); + + AggregationSpec[] aggregationSpecs = + new AggregationSpec[]{usersCountAggregation, viewsCountAggregation, executionsCountAggregation, + usersCountCardinalityAggregation, viewsCountCardinalityAggregation, executionsCountCardinalityAggregation}; + GenericTable dailyStats = timeseriesAspectService.getAggregatedStats(Constants.DASHBOARD_ENTITY_NAME, + Constants.DASHBOARD_USAGE_STATISTICS_ASPECT_NAME, aggregationSpecs, bucketStatsFilter, + createUsageGroupingBuckets(CalendarInterval.DAY)); + List buckets = new ArrayList<>(); + + StringArray columnNames = dailyStats.getColumnNames(); + Integer idxTimestampMillis = columnNames.indexOf("timestampMillis"); + Integer idxUserCountSum = columnNames.indexOf("sum_uniqueUserCount"); + Integer idxViewsCountSum = columnNames.indexOf("sum_viewsCount"); + Integer idxExecutionsCountSum = columnNames.indexOf("sum_executionsCount"); + Integer idxUserCountCardinality = columnNames.indexOf("cardinality_uniqueUserCount"); + Integer idxViewsCountCardinality = columnNames.indexOf("cardinality_viewsCount"); + Integer idxExecutionsCountCardinality = columnNames.indexOf("cardinality_executionsCount"); + + for (StringArray row : dailyStats.getRows()) { + DashboardUsageAggregation usageAggregation = new DashboardUsageAggregation(); + usageAggregation.setBucket(Long.valueOf(row.get(idxTimestampMillis))); + usageAggregation.setDuration(WindowDuration.DAY); + usageAggregation.setResource(dashboardUrn); + + DashboardUsageAggregationMetrics usageAggregationMetrics = new DashboardUsageAggregationMetrics(); + + // Note: Currently SUM AggregationType returns 0 (zero) value even if all values in timeseries field being aggregated + // are NULL (missing). For example sum of execution counts come up as 0 if all values in executions count timeseries + // are NULL. To overcome this, we extract CARDINALITY for the same timeseries field. Cardinality of 0 identifies + // above scenario. For such scenario, we set sum as NULL. + if (!row.get(idxUserCountSum).equals(ES_NULL_VALUE) && !row.get(idxUserCountCardinality).equals(ES_NULL_VALUE)) { + try { + if (Integer.valueOf(row.get(idxUserCountCardinality)) != 0) { + usageAggregationMetrics.setUniqueUserCount(Integer.valueOf(row.get(idxUserCountSum))); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Failed to convert uniqueUserCount from ES to int", e); + } + } + if (!row.get(idxViewsCountSum).equals(ES_NULL_VALUE) && !row.get(idxViewsCountCardinality) + .equals(ES_NULL_VALUE)) { + try { + if (Integer.valueOf(row.get(idxViewsCountCardinality)) != 0) { + usageAggregationMetrics.setViewsCount(Integer.valueOf(row.get(idxViewsCountSum))); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Failed to convert viewsCount from ES to int", e); + } + } + if (!row.get(idxExecutionsCountSum).equals(ES_NULL_VALUE) && !row.get(idxExecutionsCountCardinality) + .equals(ES_NULL_VALUE)) { + try { + if (Integer.valueOf(row.get(idxExecutionsCountCardinality)) != 0) { + usageAggregationMetrics.setExecutionsCount(Integer.valueOf(row.get(idxExecutionsCountSum))); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Failed to convert executionsCount from ES to object", e); + } + } + usageAggregation.setMetrics(usageAggregationMetrics); + buckets.add(usageAggregation); + } + return buckets; + } + + private List getUserUsageCounts(Filter filter) { + // Sum aggregation on userCounts.count + AggregationSpec sumUsageCountsCountAggSpec = + new AggregationSpec().setAggregationType(AggregationType.SUM).setFieldPath("userCounts.usageCount"); + AggregationSpec sumViewCountsCountAggSpec = + new AggregationSpec().setAggregationType(AggregationType.SUM).setFieldPath("userCounts.viewsCount"); + AggregationSpec sumExecutionCountsCountAggSpec = + new AggregationSpec().setAggregationType(AggregationType.SUM).setFieldPath("userCounts.executionsCount"); + + AggregationSpec usageCountsCardinalityAggSpec = + new AggregationSpec().setAggregationType(AggregationType.CARDINALITY).setFieldPath("userCounts.usageCount"); + AggregationSpec viewCountsCardinalityAggSpec = + new AggregationSpec().setAggregationType(AggregationType.CARDINALITY).setFieldPath("userCounts.viewsCount"); + AggregationSpec executionCountsCardinalityAggSpec = + new AggregationSpec().setAggregationType(AggregationType.CARDINALITY) + .setFieldPath("userCounts.executionsCount"); + AggregationSpec[] aggregationSpecs = + new AggregationSpec[]{sumUsageCountsCountAggSpec, sumViewCountsCountAggSpec, sumExecutionCountsCountAggSpec, + usageCountsCardinalityAggSpec, viewCountsCardinalityAggSpec, executionCountsCardinalityAggSpec}; + + // String grouping bucket on userCounts.user + GroupingBucket userGroupingBucket = + new GroupingBucket().setKey("userCounts.user").setType(GroupingBucketType.STRING_GROUPING_BUCKET); + GroupingBucket[] groupingBuckets = new GroupingBucket[]{userGroupingBucket}; + + // Query backend + GenericTable result = timeseriesAspectService.getAggregatedStats(Constants.DASHBOARD_ENTITY_NAME, + Constants.DASHBOARD_USAGE_STATISTICS_ASPECT_NAME, aggregationSpecs, filter, groupingBuckets); + + StringArray columnNames = result.getColumnNames(); + + Integer idxUser = columnNames.indexOf("userCounts.user"); + Integer idxUsageCountSum = columnNames.indexOf("sum_userCounts.usageCount"); + Integer idxViewsCountSum = columnNames.indexOf("sum_userCounts.viewsCount"); + Integer idxExecutionsCountSum = columnNames.indexOf("sum_userCounts.executionsCount"); + Integer idxUsageCountCardinality = columnNames.indexOf("cardinality_userCounts.usageCount"); + Integer idxViewsCountCardinality = columnNames.indexOf("cardinality_userCounts.viewsCount"); + Integer idxExecutionsCountCardinality = columnNames.indexOf("cardinality_userCounts.executionsCount"); + + // Process response + List userUsageCounts = new ArrayList<>(); + for (StringArray row : result.getRows()) { + DashboardUserUsageCounts userUsageCount = new DashboardUserUsageCounts(); + + CorpUser partialUser = new CorpUser(); + partialUser.setUrn(row.get(idxUser)); + userUsageCount.setUser(partialUser); + + // Note: Currently SUM AggregationType returns 0 (zero) value even if all values in timeseries field being aggregated + // are NULL (missing). For example sum of execution counts come up as 0 if all values in executions count timeseries + // are NULL. To overcome this, we extract CARDINALITY for the same timeseries field. Cardinality of 0 identifies + // above scenario. For such scenario, we set sum as NULL. + if (!row.get(idxUsageCountSum).equals(ES_NULL_VALUE) && !row.get(idxUsageCountCardinality) + .equals(ES_NULL_VALUE)) { + try { + if (Integer.valueOf(row.get(idxUsageCountCardinality)) != 0) { + userUsageCount.setUsageCount(Integer.valueOf(row.get(idxUsageCountSum))); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Failed to convert user usage count from ES to int", e); + } + } + if (!row.get(idxViewsCountSum).equals(ES_NULL_VALUE) && row.get(idxViewsCountCardinality).equals(ES_NULL_VALUE)) { + try { + if (Integer.valueOf(row.get(idxViewsCountCardinality)) != 0) { + userUsageCount.setViewsCount(Integer.valueOf(row.get(idxViewsCountSum))); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Failed to convert user views count from ES to int", e); + } + } + if (!row.get(idxExecutionsCountSum).equals(ES_NULL_VALUE) && !row.get(idxExecutionsCountCardinality) + .equals(ES_NULL_VALUE)) { + try { + if (Integer.valueOf(row.get(idxExecutionsCountCardinality)) != 0) { + userUsageCount.setExecutionsCount(Integer.valueOf(row.get(idxExecutionsCountSum))); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Failed to convert user executions count from ES to int", e); + } + } + userUsageCounts.add(userUsageCount); + } + return userUsageCounts; + } + + private GroupingBucket[] createUsageGroupingBuckets(CalendarInterval calenderInterval) { + GroupingBucket timestampBucket = new GroupingBucket(); + timestampBucket.setKey(ES_FIELD_TIMESTAMP) + .setType(GroupingBucketType.DATE_GROUPING_BUCKET) + .setTimeWindowSize(new TimeWindowSize().setMultiple(1).setUnit(calenderInterval)); + return new GroupingBucket[]{timestampBucket}; + } + + private Filter createBucketUsageStatsFilter(String dashboardUrn, Long startTime, Long endTime) { + Filter filter = new Filter(); + final ArrayList criteria = new ArrayList<>(); + + // Add filter for urn == dashboardUrn + Criterion dashboardUrnCriterion = + new Criterion().setField(ES_FIELD_URN).setCondition(Condition.EQUAL).setValue(dashboardUrn); + criteria.add(dashboardUrnCriterion); + + if (startTime != null) { + // Add filter for start time + Criterion startTimeCriterion = new Criterion().setField(ES_FIELD_TIMESTAMP) + .setCondition(Condition.GREATER_THAN_OR_EQUAL_TO) + .setValue(Long.toString(startTime)); + criteria.add(startTimeCriterion); + } + + if (endTime != null) { + // Add filter for end time + Criterion endTimeCriterion = new Criterion().setField(ES_FIELD_TIMESTAMP) + .setCondition(Condition.LESS_THAN_OR_EQUAL_TO) + .setValue(Long.toString(endTime)); + criteria.add(endTimeCriterion); + } + + // Add filter for presence of eventGranularity - only consider bucket stats and not absolute stats + // since unit is mandatory, we assume if eventGranularity contains unit, then it is not null + Criterion onlyTimeBucketsCriterion = + new Criterion().setField(ES_FIELD_EVENT_GRANULARITY).setCondition(Condition.CONTAIN).setValue("unit"); + criteria.add(onlyTimeBucketsCriterion); + + filter.setOr(new ConjunctiveCriterionArray( + ImmutableList.of(new ConjunctiveCriterion().setAnd(new CriterionArray(criteria))))); + return filter; + } +} diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dashboard/mappers/DashboardUsageMetricMapper.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dashboard/mappers/DashboardUsageMetricMapper.java new file mode 100644 index 0000000000000..d257aef4be565 --- /dev/null +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dashboard/mappers/DashboardUsageMetricMapper.java @@ -0,0 +1,34 @@ +package com.linkedin.datahub.graphql.types.dashboard.mappers; + +import com.linkedin.datahub.graphql.generated.DashboardUsageMetrics; +import com.linkedin.datahub.graphql.types.mappers.TimeSeriesAspectMapper; +import com.linkedin.metadata.aspect.EnvelopedAspect; +import com.linkedin.metadata.utils.GenericRecordUtils; +import javax.annotation.Nonnull; + + +public class DashboardUsageMetricMapper implements TimeSeriesAspectMapper { + + public static final DashboardUsageMetricMapper INSTANCE = new DashboardUsageMetricMapper(); + + public static DashboardUsageMetrics map(@Nonnull final EnvelopedAspect envelopedAspect) { + return INSTANCE.apply(envelopedAspect); + } + + @Override + public DashboardUsageMetrics apply(EnvelopedAspect envelopedAspect) { + com.linkedin.dashboard.DashboardUsageStatistics gmsDashboardUsageStatistics = + GenericRecordUtils.deserializeAspect(envelopedAspect.getAspect().getValue(), + envelopedAspect.getAspect().getContentType(), com.linkedin.dashboard.DashboardUsageStatistics.class); + + final com.linkedin.datahub.graphql.generated.DashboardUsageMetrics dashboardUsageMetrics = + new com.linkedin.datahub.graphql.generated.DashboardUsageMetrics(); + dashboardUsageMetrics.setLastViewed(gmsDashboardUsageStatistics.getLastViewedAt()); + dashboardUsageMetrics.setViewsCount(gmsDashboardUsageStatistics.getViewsCount()); + dashboardUsageMetrics.setExecutionsCount(gmsDashboardUsageStatistics.getExecutionsCount()); + dashboardUsageMetrics.setFavoritesCount(gmsDashboardUsageStatistics.getFavoritesCount()); + dashboardUsageMetrics.setTimestampMillis(gmsDashboardUsageStatistics.getTimestampMillis()); + + return dashboardUsageMetrics; + } +} diff --git a/datahub-graphql-core/src/main/resources/entity.graphql b/datahub-graphql-core/src/main/resources/entity.graphql index cb35bc72ffc23..fc5341d187ee6 100644 --- a/datahub-graphql-core/src/main/resources/entity.graphql +++ b/datahub-graphql-core/src/main/resources/entity.graphql @@ -4121,6 +4121,12 @@ type Dashboard implements EntityWithRelationships & Entity { """ lineage(input: LineageInput!): EntityLineageResult + + """ + Experimental (Subject to breaking change) -- Statistics about how this Dashboard is used + """ + usageStats(startTimeMillis: Long, endTimeMillis: Long, limit: Int): DashboardUsageQueryResult + """ Deprecated, use properties field instead Additional read only information about the dashboard @@ -5247,6 +5253,158 @@ type FieldUsageCounts { count: Int } +""" +Information about individual user usage of a Dashboard +""" +type DashboardUserUsageCounts { + """ + The user of the Dashboard + """ + user: CorpUser + + """ + number of times dashboard has been viewed by the user + """ + viewsCount: Int + + """ + number of dashboard executions by the user + """ + executionsCount: Int + + """ + Normalized numeric metric representing user's dashboard usage + Higher value represents more usage + """ + usageCount: Int +} + +""" +The result of a dashboard usage query +""" +type DashboardUsageQueryResult { + """ + A set of relevant time windows for use in displaying usage statistics + """ + buckets: [DashboardUsageAggregation] + + """ + A set of rolled up aggregations about the dashboard usage + """ + aggregations: DashboardUsageQueryResultAggregations + + """ + A set of absolute dashboard usage metrics + """ + metrics: [DashboardUsageMetrics!] + +} + +""" +A set of rolled up aggregations about the Dashboard usage +""" +type DashboardUsageQueryResultAggregations { + """ + The count of unique Dashboard users within the queried time range + """ + uniqueUserCount: Int + + """ + The specific per user usage counts within the queried time range + """ + users: [DashboardUserUsageCounts] + + """ + The total number of dashboard views within the queried time range + """ + viewsCount: Int + + """ + The total number of dashboard executions within the queried time range + """ + executionsCount: Int + +} + + +""" +A set of absolute dashboard usage metrics +""" +type DashboardUsageMetrics implements TimeSeriesAspect { + """ + The time at which the metrics were reported + """ + timestampMillis: Long! + + """ + The total number of times dashboard has been favorited + FIXME: Qualifies as Popularity Metric rather than Usage Metric? + """ + favoritesCount: Int + + """ + The total number of dashboard views + """ + viewsCount: Int + + """ + The total number of dashboard execution + """ + executionsCount: Int + + """ + The time when this dashboard was last viewed + """ + lastViewed: Long + +} + +""" +An aggregation of Dashboard usage statistics +""" +type DashboardUsageAggregation { + """ + The time window start time + """ + bucket: Long + + """ + The time window span + """ + duration: WindowDuration + + """ + The resource urn associated with the usage information, eg a Dashboard urn + """ + resource: String + + """ + The rolled up usage metrics + """ + metrics: DashboardUsageAggregationMetrics +} + +""" +Rolled up metrics about Dashboard usage over time +""" +type DashboardUsageAggregationMetrics { + """ + The unique number of dashboard users within the time range + """ + uniqueUserCount: Int + + """ + The total number of dashboard views within the time range + """ + viewsCount: Int + + """ + The total number of dashboard executions within the time range + """ + executionsCount: Int + +} + """ The duration of a fixed window of time """ @@ -5273,7 +5431,7 @@ enum WindowDuration { } """ -A time range used in fetching Dataset Usage statistics +A time range used in fetching Usage statistics """ enum TimeRange { """ diff --git a/metadata-ingestion/examples/library/dashboard_usage.py b/metadata-ingestion/examples/library/dashboard_usage.py new file mode 100644 index 0000000000000..ad3bd7c687b16 --- /dev/null +++ b/metadata-ingestion/examples/library/dashboard_usage.py @@ -0,0 +1,164 @@ +# Imports for urn construction utility methods +from datetime import datetime +from typing import List + +from datahub.emitter.mce_builder import make_dashboard_urn, make_user_urn +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.rest_emitter import DatahubRestEmitter + +# Imports for metadata model classes +from datahub.metadata.schema_classes import ( + CalendarIntervalClass, + ChangeTypeClass, + DashboardUsageStatisticsClass, + DashboardUserUsageCountsClass, + TimeWindowSizeClass, +) + +# Create rest emitter +rest_emitter = DatahubRestEmitter(gms_server="http://localhost:8080") + +usage_day_1_user_counts: List[DashboardUserUsageCountsClass] = [ + DashboardUserUsageCountsClass( + user=make_user_urn("user1"), executionsCount=3, usageCount=3 + ), + DashboardUserUsageCountsClass( + user=make_user_urn("user2"), executionsCount=2, usageCount=2 + ), +] + +usage_day_1: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( + entityType="dashboard", + changeType=ChangeTypeClass.UPSERT, + entityUrn=make_dashboard_urn("looker", "dashboards.999999"), + aspectName="dashboardUsageStatistics", + aspect=DashboardUsageStatisticsClass( + timestampMillis=round( + datetime.strptime("2022-02-09", "%Y-%m-%d").timestamp() * 1000 + ), + eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY), + uniqueUserCount=2, + executionsCount=5, + userCounts=usage_day_1_user_counts, + ), +) + +absolute_usage_as_of_day_1: MetadataChangeProposalWrapper = ( + MetadataChangeProposalWrapper( + entityType="dashboard", + changeType=ChangeTypeClass.UPSERT, + entityUrn=make_dashboard_urn("looker", "dashboards.999999"), + aspectName="dashboardUsageStatistics", + aspect=DashboardUsageStatisticsClass( + timestampMillis=round( + datetime.strptime("2022-02-09", "%Y-%m-%d").timestamp() * 1000 + ), + favoritesCount=100, + viewsCount=25, + lastViewedAt=round( + datetime.strptime( + "2022-02-09 04:45:30", "%Y-%m-%d %H:%M:%S" + ).timestamp() + * 1000 + ), + ), + ) +) + +rest_emitter.emit(usage_day_1) +rest_emitter.emit(absolute_usage_as_of_day_1) + +usage_day_2_user_counts: List[DashboardUserUsageCountsClass] = [ + DashboardUserUsageCountsClass( + user=make_user_urn("user1"), executionsCount=4, usageCount=4 + ), + DashboardUserUsageCountsClass( + user=make_user_urn("user2"), executionsCount=6, usageCount=6 + ), +] +usage_day_2: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( + entityType="dashboard", + changeType=ChangeTypeClass.UPSERT, + entityUrn=make_dashboard_urn("looker", "dashboards.999999"), + aspectName="dashboardUsageStatistics", + aspect=DashboardUsageStatisticsClass( + timestampMillis=round( + datetime.strptime("2022-02-10", "%Y-%m-%d").timestamp() * 1000 + ), + eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY), + uniqueUserCount=2, + executionsCount=10, + userCounts=usage_day_2_user_counts, + ), +) + +absolute_usage_as_of_day_2: MetadataChangeProposalWrapper = ( + MetadataChangeProposalWrapper( + entityType="dashboard", + changeType=ChangeTypeClass.UPSERT, + entityUrn=make_dashboard_urn("looker", "dashboards.999999"), + aspectName="dashboardUsageStatistics", + aspect=DashboardUsageStatisticsClass( + timestampMillis=round( + datetime.strptime("2022-02-10", "%Y-%m-%d").timestamp() * 1000 + ), + favoritesCount=100, + viewsCount=27, + lastViewedAt=round( + datetime.strptime( + "2022-02-10 10:45:30", "%Y-%m-%d %H:%M:%S" + ).timestamp() + * 1000 + ), + ), + ) +) + +rest_emitter.emit(usage_day_2) +rest_emitter.emit(absolute_usage_as_of_day_2) + +usage_day_3_user_counts: List[DashboardUserUsageCountsClass] = [ + DashboardUserUsageCountsClass( + user=make_user_urn("user1"), executionsCount=2, usageCount=2 + ), +] +usage_day_3: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( + entityType="dashboard", + changeType=ChangeTypeClass.UPSERT, + entityUrn=make_dashboard_urn("looker", "dashboards.999999"), + aspectName="dashboardUsageStatistics", + aspect=DashboardUsageStatisticsClass( + timestampMillis=round( + datetime.strptime("2022-02-11", "%Y-%m-%d").timestamp() * 1000 + ), + eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY), + uniqueUserCount=1, + executionsCount=2, + userCounts=usage_day_3_user_counts, + ), +) + +absolute_usage_as_of_day_3: MetadataChangeProposalWrapper = ( + MetadataChangeProposalWrapper( + entityType="dashboard", + changeType=ChangeTypeClass.UPSERT, + entityUrn=make_dashboard_urn("looker", "dashboards.999999"), + aspectName="dashboardUsageStatistics", + aspect=DashboardUsageStatisticsClass( + timestampMillis=round( + datetime.strptime("2022-02-11", "%Y-%m-%d").timestamp() * 1000 + ), + favoritesCount=102, + viewsCount=30, + lastViewedAt=round( + datetime.strptime( + "2022-02-11 02:45:30", "%Y-%m-%d %H:%M:%S" + ).timestamp() + * 1000 + ), + ), + ) +) + +rest_emitter.emit(usage_day_3) +rest_emitter.emit(absolute_usage_as_of_day_3) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker.py b/metadata-ingestion/src/datahub/ingestion/source/looker.py index bf6e9ee45b8ee..1712abaa546a7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker.py @@ -63,16 +63,48 @@ from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.schema_classes import ( BrowsePathsClass, + CalendarIntervalClass, + ChangeTypeClass, ChartInfoClass, ChartTypeClass, DashboardInfoClass, + DashboardUsageStatisticsClass, + DashboardUserUsageCountsClass, OwnerClass, OwnershipClass, OwnershipTypeClass, + TimeWindowSizeClass, ) logger = logging.getLogger(__name__) +usage_queries: Dict[str, Dict] = { + # Query - group by dashboard and date, find unique users, dashboard runs count + "counts_per_day_per_dashboard": { + "model": "system__activity", + "view": "history", + "fields": [ + "history.dashboard_id", + "history.created_date", + "history.dashboard_user", + "history.dashboard_run_count", + ], + "filters": {}, + }, + # Query - group by user, dashboard and date, find runs count + "counts_per_day_per_user_per_dashboard": { + "model": "system__activity", + "view": "history", + "fields": [ + "history.created_date", + "history.dashboard_id", + "history.dashboard_run_count", + "user.id", + ], + "filters": {}, + }, +} + class TransportOptionsConfig(ConfigModel): timeout: int @@ -158,6 +190,15 @@ class LookerDashboardSourceConfig(LookerAPIConfig, LookerCommonConfig): None, description="Optional URL to use when constructing external URLs to Looker if the `base_url` is not the correct one to use. For example, `https://looker-public.company.com`. If not provided, the external base URL will default to `base_url`.", ) + extract_usage_history: bool = Field( + False, + description="Experimental (Subject to breaking change) -- Whether to ingest usage statistics for dashboards. Setting this to True will query looker system activity explores to fetch historical dashboard usage.", + ) + # TODO - stateful ingestion to autodetect usage history interval + extract_usage_history_for_interval: str = Field( + "1 day ago", + description="Experimental (Subject to breaking change) -- Used only if extract_usage_history is set to True. Interval to extract looker dashboard usage history for . https://docs.looker.com/reference/filter-expressions#date_and_time", + ) @validator("external_base_url", pre=True, always=True) def external_url_defaults_to_api_config_base_url( @@ -308,6 +349,9 @@ class LookerDashboard: last_updated_by: Optional[LookerUser] = None deleted_at: Optional[datetime.datetime] = None deleted_by: Optional[LookerUser] = None + favorite_count: Optional[int] = None + view_count: Optional[int] = None + last_viewed_at: Optional[datetime.datetime] = None def url(self, base_url): # If the base_url contains a port number (like https://company.looker.com:19999) remove the port number @@ -447,7 +491,7 @@ def _get_fields_from_query(self, query: Optional[Query]) -> List[str]: def _get_looker_dashboard_element( # noqa: C901 self, element: DashboardElement ) -> Optional[LookerDashboardElement]: - # Dashboard elements can use raw queries against explores + # Dashboard elements can use raw usage_queries against explores explores: List[str] fields: List[str] @@ -697,12 +741,14 @@ def fetch_one_explore( def _make_dashboard_and_chart_mces( self, looker_dashboard: LookerDashboard - ) -> List[MetadataChangeEvent]: + ) -> Iterable[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]: chart_mces = [ self._make_chart_mce(element, looker_dashboard) for element in looker_dashboard.dashboard_elements if element.type == "vis" ] + for chart_mce in chart_mces: + yield chart_mce dashboard_urn = builder.make_dashboard_urn( self.source_config.platform_name, looker_dashboard.get_urn_dashboard_id() @@ -734,8 +780,27 @@ def _make_dashboard_and_chart_mces( dashboard_snapshot.aspects.append(Status(removed=looker_dashboard.is_deleted)) dashboard_mce = MetadataChangeEvent(proposedSnapshot=dashboard_snapshot) - - return chart_mces + [dashboard_mce] + yield dashboard_mce + + if self.source_config.extract_usage_history: + # Emit snapshot values of dashboard usage - do this always ? + dashboard_usage_mcp = MetadataChangeProposalWrapper( + entityType="dashboard", + entityUrn=dashboard_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName="dashboardUsageStatistics", + aspect=DashboardUsageStatisticsClass( + timestampMillis=round(datetime.datetime.now().timestamp() * 1000), + favoritesCount=looker_dashboard.favorite_count, + viewsCount=looker_dashboard.view_count, + lastViewedAt=round( + looker_dashboard.last_viewed_at.timestamp() * 1000 + ) + if looker_dashboard.last_viewed_at + else None, + ), + ) + yield dashboard_usage_mcp def get_ownership( self, looker_dashboard: LookerDashboard @@ -859,6 +924,9 @@ def _get_looker_dashboard( last_updated_by=self._get_looker_user(dashboard.last_updater_id), deleted_at=dashboard.deleted_at, deleted_by=self._get_looker_user(dashboard.deleter_id), + favorite_count=dashboard.favorite_count, + view_count=dashboard.view_count, + last_viewed_at=dashboard.last_viewed_at, ) return looker_dashboard @@ -908,6 +976,12 @@ def process_dashboard( "deleted_at", "deleter_id", ] + if self.source_config.extract_usage_history: + fields += [ + "favorite_count", + "view_count", + "last_viewed_at", + ] dashboard_object: Dashboard = self.client.dashboard( dashboard_id=dashboard_id, fields=",".join(fields), @@ -939,10 +1013,117 @@ def process_dashboard( # for mce in mces: workunits = [ MetadataWorkUnit(id=f"looker-{mce.proposedSnapshot.urn}", mce=mce) + if isinstance(mce, MetadataChangeEvent) + else MetadataWorkUnit( + id=f"looker-{mce.aspectName}-{mce.entityUrn}", mcp=mce + ) for mce in mces ] return workunits, dashboard_id, start_time, datetime.datetime.now() + def extract_usage_history_from_system_activity( + self, dashboard_ids: List[str] + ) -> Iterable[MetadataChangeProposalWrapper]: + + # key tuple (dashboard_id, date) + dashboard_usages: Dict[tuple, DashboardUsageStatisticsClass] = dict() + + common_filters = { + "history.dashboard_id": ",".join(dashboard_ids), + "history.created_date": self.source_config.extract_usage_history_for_interval, + } + for query in usage_queries.values(): + query["filters"].update(common_filters) + + self._populate_dashboard_counts(dashboard_usages) + + self._populate_userwise_runs_counts(dashboard_usages) + + for key, val in dashboard_usages.items(): + yield MetadataChangeProposalWrapper( + entityType="dashboard", + entityUrn=builder.make_dashboard_urn( + self.source_config.platform_name, + f"dashboards.{key[0]}", # in sync with LookerDashboard.get_urn_dashboard_id + ), + changeType=ChangeTypeClass.UPSERT, + aspectName="dashboardUsageStatistics", + aspect=val, + ) + + def _populate_userwise_runs_counts(self, dashboard_usages): + userwise_count_rows = LookerUtil.run_inline_query( + self.client, usage_queries["counts_per_day_per_user_per_dashboard"] + ) + + for row in userwise_count_rows: + user: Optional[LookerUser] = ( + self.user_registry.get_by_id( + row["user.id"], + self.source_config.transport_options.get_transport_options() + if self.source_config.transport_options is not None + else None, + ) + if row["user.id"] is not None + else None + ) + if user is None: + logger.warning( + f"Unable to resolve user with id {row['user.id']}, skipping" + ) + continue + + user_urn: Optional[str] = user._get_urn( + self.source_config.strip_user_ids_from_email + ) + + if user_urn is None: + logger.warning( + f"Unable to resolve urn for user with id {row['user.id']}, skipping" + ) + continue + + user_usage: DashboardUserUsageCountsClass = DashboardUserUsageCountsClass( + user=user_urn, + executionsCount=row["history.dashboard_run_count"], + usageCount=row["history.dashboard_run_count"], + ) + + usage_mcp_prev = dashboard_usages.get( + (row["history.dashboard_id"], row["history.created_date"]) + ) + if usage_mcp_prev is None: + # Unreachable + logger.warning( + f"User counts found but no users for {row['history.dashboard_id']} on date {row['history.created_date']}" + ) + continue + + if usage_mcp_prev.userCounts is None: + usage_mcp_prev.userCounts = [user_usage] + else: + usage_mcp_prev.userCounts.append(user_usage) + + def _populate_dashboard_counts(self, dashboard_usages): + count_rows = LookerUtil.run_inline_query( + self.client, + usage_queries["counts_per_day_per_dashboard"], + ) + for row in count_rows: + dashboard_usages[ + (row["history.dashboard_id"], row["history.created_date"]) + ] = DashboardUsageStatisticsClass( + timestampMillis=round( + datetime.datetime.strptime(row["history.created_date"], "%Y-%m-%d") + .replace(tzinfo=datetime.timezone.utc) + .timestamp() + * 1000 + ), + eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY), + uniqueUserCount=row["history.dashboard_user"], + executionsCount=row["history.dashboard_run_count"], + ) + def get_workunits(self) -> Iterable[MetadataWorkUnit]: dashboards = self.client.all_dashboards( fields="id", @@ -1029,5 +1210,17 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: self.reporter.report_workunit(workunit) yield workunit + if self.source_config.extract_usage_history and dashboard_ids is not None: + usage_mcps = self.extract_usage_history_from_system_activity( + dashboard_ids # type:ignore + ) + for usage_mcp in usage_mcps: + workunit = MetadataWorkUnit( + id=f"looker-{usage_mcp.aspectName}-{usage_mcp.entityUrn}-{usage_mcp.aspect.timestampMillis}", # type:ignore + mcp=usage_mcp, + ) + self.reporter.report_workunit(workunit) + yield workunit + def get_report(self) -> SourceReport: return self.reporter diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker_common.py b/metadata-ingestion/src/datahub/ingestion/source/looker_common.py index fce797d019b9e..2535ca214867a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker_common.py @@ -1,3 +1,4 @@ +import json import logging import re from dataclasses import dataclass @@ -8,6 +9,7 @@ from looker_sdk.error import SDKError from looker_sdk.rtl.transport import TransportOptions from looker_sdk.sdk.api31.methods import Looker31SDK +from looker_sdk.sdk.api31.models import WriteQuery from pydantic import BaseModel, Field from pydantic.class_validators import validator @@ -457,6 +459,42 @@ def _display_name(name: str) -> str: """Returns a display name that corresponds to the Looker conventions""" return name.replace("_", " ").title() if name else name + @staticmethod + def create_query_request(q: dict, limit: Optional[str] = None) -> WriteQuery: + return WriteQuery( + model=q["model"], + view=q["view"], + fields=q.get("fields"), + filters=q.get("filters"), + filter_expression=q.get("filter_expressions"), + sorts=q.get("sorts"), + limit=q.get("limit") or limit, + column_limit=q.get("column_limit"), + vis_config={"type": "looker_column"}, + filter_config=q.get("filter_config"), + query_timezone="UTC", + ) + + @staticmethod + def run_inline_query(client: Looker31SDK, q: dict) -> List: + + response_sql = client.run_inline_query( + result_format="sql", + body=LookerUtil.create_query_request(q), + ) + logger.debug("=================Query=================") + logger.debug(response_sql) + + response_json = client.run_inline_query( + result_format="json", + body=LookerUtil.create_query_request(q), + ) + + logger.debug("=================Response=================") + data = json.loads(response_json) + logger.debug(f"length {len(data)}") + return data + @dataclass class LookerExplore: diff --git a/metadata-ingestion/tests/integration/looker/looker_mces_usage_history.json b/metadata-ingestion/tests/integration/looker/looker_mces_usage_history.json new file mode 100644 index 0000000000000..c784e94a6e14c --- /dev/null +++ b/metadata-ingestion/tests/integration/looker/looker_mces_usage_history.json @@ -0,0 +1,387 @@ +[ +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot": { + "urn": "urn:li:dashboard:(looker,dashboards.1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.dashboard.DashboardInfo": { + "customProperties": {}, + "externalUrl": null, + "title": "foo", + "description": "lorem ipsum", + "charts": [], + "datasets": [], + "lastModified": { + "created": { + "time": 1586847600000, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + }, + "lastModified": { + "time": 1586847600000, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + }, + "deleted": null + }, + "dashboardUrl": "https://looker.company.com/dashboards/1", + "access": null, + "lastRefreshed": null + } + }, + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "looker-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(looker,dashboards.1)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dashboardUsageStatistics", + "aspect": { + "value": "{\"timestampMillis\": 1586847600000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"viewsCount\": 25, \"favoritesCount\": 5, \"lastViewedAt\": 1586847600000}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "looker-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/looker/lkml_samples/explores/data.my_view" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "looker.explore.label": "My Explore View", + "looker.explore.file": "test_source_file.lkml" + }, + "externalUrl": "https://looker.company.com/explore/data/my_view", + "name": "My Explore View", + "qualifiedName": null, + "description": "lorem ipsum", + "uri": null, + "tags": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.UpstreamLineage": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.underlying_view,PROD)", + "type": "VIEW" + } + ], + "fineGrainedLineages": null + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "my_view", + "platform": "urn:li:dataPlatform:looker", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + }, + "deleted": null, + "dataset": null, + "cluster": null, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "dim1", + "jsonPath": null, + "nullable": false, + "description": "", + "created": null, + "lastModified": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension", + "context": null + } + ] + }, + "glossaryTerms": null, + "isPartOfKey": false, + "isPartitioningKey": null, + "jsonProps": null + } + ], + "primaryKeys": [], + "foreignKeysSpecs": null, + "foreignKeys": null + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "looker-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "value": "{\"typeNames\": [\"explore\"]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "looker-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { + "urn": "urn:li:tag:Dimension", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpuser:datahub", + "type": "DATAOWNER", + "source": null + } + ], + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + } + } + }, + { + "com.linkedin.pegasus2avro.tag.TagProperties": { + "name": "Dimension", + "description": "A tag that is applied to all dimension fields.", + "colorHex": null + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "looker-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { + "urn": "urn:li:tag:Temporal", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpuser:datahub", + "type": "DATAOWNER", + "source": null + } + ], + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + } + } + }, + { + "com.linkedin.pegasus2avro.tag.TagProperties": { + "name": "Temporal", + "description": "A tag that is applied to all time-based (temporal) fields such as timestamps or durations.", + "colorHex": null + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "looker-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { + "urn": "urn:li:tag:Measure", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpuser:datahub", + "type": "DATAOWNER", + "source": null + } + ], + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + } + } + }, + { + "com.linkedin.pegasus2avro.tag.TagProperties": { + "name": "Measure", + "description": "A tag that is applied to all measures (metrics). Measures are typically the columns that you aggregate on", + "colorHex": null + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "looker-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(looker,dashboards.11)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dashboardUsageStatistics", + "aspect": { + "value": "{\"timestampMillis\": 1656979200000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"executionsCount\": 14, \"uniqueUserCount\": 1, \"userCounts\": [{\"user\": \"urn:li:corpuser:test@looker.com\", \"executionsCount\": 14, \"usageCount\": 14}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "looker-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(looker,dashboards.12)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dashboardUsageStatistics", + "aspect": { + "value": "{\"timestampMillis\": 1656979200000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"executionsCount\": 14, \"uniqueUserCount\": 1, \"userCounts\": [{\"user\": \"urn:li:corpuser:test@looker.com\", \"executionsCount\": 14, \"usageCount\": 14}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "looker-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(looker,dashboards.37)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dashboardUsageStatistics", + "aspect": { + "value": "{\"timestampMillis\": 1656979200000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"executionsCount\": 5, \"uniqueUserCount\": 1, \"userCounts\": [{\"user\": \"urn:li:corpuser:test@looker.com\", \"executionsCount\": 5, \"usageCount\": 5}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "looker-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/looker/test_looker.py b/metadata-ingestion/tests/integration/looker/test_looker.py index 0f878145b2f97..598990c35da93 100644 --- a/metadata-ingestion/tests/integration/looker/test_looker.py +++ b/metadata-ingestion/tests/integration/looker/test_looker.py @@ -1,3 +1,4 @@ +import json import time from datetime import datetime from unittest import mock @@ -11,9 +12,12 @@ LookmlModelExploreFieldset, LookmlModelExploreJoins, Query, + User, + WriteQuery, ) from datahub.ingestion.run.pipeline import Pipeline +from datahub.ingestion.source.looker import usage_queries from tests.test_helpers import mce_helpers FROZEN_TIME = "2020-04-14 07:00:00" @@ -286,6 +290,69 @@ def setup_mock_explore(mocked_client): ) +def setup_mock_user(mocked_client): + mocked_client.user.return_value = User(id=1, email="test@looker.com") + + +def side_effect_query_inline(result_format: str, body: WriteQuery) -> str: + query_type = None + if result_format == "sql": + return "" # Placeholder for sql text + for query_name, query_template in usage_queries.items(): + if body.fields == query_template["fields"]: + query_type = query_name + + if query_type == "counts_per_day_per_dashboard": + return json.dumps( + [ + { + "history.dashboard_id": "11", + "history.created_date": "2022-07-05", + "history.dashboard_user": 1, + "history.dashboard_run_count": 14, + }, + { + "history.dashboard_id": "12", + "history.created_date": "2022-07-05", + "history.dashboard_user": 1, + "history.dashboard_run_count": 14, + }, + { + "history.dashboard_id": "37", + "history.created_date": "2022-07-05", + "history.dashboard_user": 1, + "history.dashboard_run_count": 5, + }, + ] + ) + + if query_type == "counts_per_day_per_user_per_dashboard": + return json.dumps( + [ + { + "history.created_date": "2022-07-05", + "history.dashboard_id": "11", + "user.id": 1, + "history.dashboard_run_count": 14, + }, + { + "history.created_date": "2022-07-05", + "history.dashboard_id": "12", + "user.id": 1, + "history.dashboard_run_count": 14, + }, + { + "history.created_date": "2022-07-05", + "history.dashboard_id": "37", + "user.id": 1, + "history.dashboard_run_count": 5, + }, + ] + ) + + raise Exception("Unknown Query") + + @freeze_time(FROZEN_TIME) def test_looker_ingest_allow_pattern(pytestconfig, tmp_path, mock_time): mocked_client = mock.MagicMock() @@ -355,3 +422,68 @@ def test_looker_ingest_allow_pattern(pytestconfig, tmp_path, mock_time): output_path=tmp_path / "looker_mces.json", golden_path=f"{test_resources_dir}/{mce_out_file}", ) + + +@freeze_time(FROZEN_TIME) +def test_looker_ingest_usage_history(pytestconfig, tmp_path, mock_time): + mocked_client = mock.MagicMock() + with mock.patch("looker_sdk.init31") as mock_sdk: + mock_sdk.return_value = mocked_client + mocked_client.all_dashboards.return_value = [Dashboard(id="1")] + mocked_client.dashboard.return_value = Dashboard( + id="1", + title="foo", + created_at=datetime.utcfromtimestamp(time.time()), + updated_at=datetime.utcfromtimestamp(time.time()), + description="lorem ipsum", + favorite_count=5, + view_count=25, + last_viewed_at=datetime.utcfromtimestamp(time.time()), + dashboard_elements=[ + DashboardElement( + id="2", + type="", + subtitle_text="Some text", + query=Query( + model="data", + view="my_view", + dynamic_fields='[{"table_calculation":"calc","label":"foobar","expression":"offset(${my_table.value},1)","value_format":null,"value_format_name":"eur","_kind_hint":"measure","_type_hint":"number"}]', + ), + ) + ], + ) + mocked_client.run_inline_query.side_effect = side_effect_query_inline + setup_mock_explore(mocked_client) + setup_mock_user(mocked_client) + + test_resources_dir = pytestconfig.rootpath / "tests/integration/looker" + + pipeline = Pipeline.create( + { + "run_id": "looker-test", + "source": { + "type": "looker", + "config": { + "base_url": "https://looker.company.com", + "client_id": "foo", + "client_secret": "bar", + "extract_usage_history": True, + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/looker_mces.json", + }, + }, + } + ) + pipeline.run() + pipeline.raise_from_status() + mce_out_file = "looker_mces_usage_history.json" + + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "looker_mces.json", + golden_path=f"{test_resources_dir}/{mce_out_file}", + ) diff --git a/metadata-ingestion/tests/integration/s3/test_s3.py b/metadata-ingestion/tests/integration/s3/test_s3.py index f20567bdbdf82..d4e840c7e837e 100644 --- a/metadata-ingestion/tests/integration/s3/test_s3.py +++ b/metadata-ingestion/tests/integration/s3/test_s3.py @@ -3,7 +3,7 @@ import os import pytest -from boto3 import Session +from boto3.session import Session from moto import mock_s3 from pydantic import ValidationError diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java index 658c6945b3be8..bc3066628abcd 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java @@ -60,7 +60,8 @@ public static BoolQueryBuilder buildFilterQuery(@Nullable Filter filter) { log.warn("Received query Filter with a deprecated field 'criteria'. Use 'or' instead."); final BoolQueryBuilder andQueryBuilder = new BoolQueryBuilder(); filter.getCriteria().forEach(criterion -> { - if (!criterion.getValue().trim().isEmpty() || criterion.hasValues()) { + if (!criterion.getValue().trim().isEmpty() || criterion.hasValues() + || criterion.getCondition() == Condition.IS_NULL) { andQueryBuilder.must(getQueryBuilderFromCriterion(criterion)); } }); @@ -73,7 +74,8 @@ public static BoolQueryBuilder buildFilterQuery(@Nullable Filter filter) { public static BoolQueryBuilder buildConjunctiveFilterQuery(@Nonnull ConjunctiveCriterion conjunctiveCriterion) { final BoolQueryBuilder andQueryBuilder = new BoolQueryBuilder(); conjunctiveCriterion.getAnd().forEach(criterion -> { - if (!criterion.getValue().trim().isEmpty() || criterion.hasValues()) { + if (!criterion.getValue().trim().isEmpty() || criterion.hasValues() + || criterion.getCondition() == Condition.IS_NULL) { andQueryBuilder.must(getQueryBuilderFromCriterion(criterion)); } }); @@ -120,6 +122,8 @@ public static QueryBuilder getQueryBuilderFromCriterion(@Nonnull Criterion crite Arrays.stream(criterion.getValue().trim().split("\\s*,\\s*")) .forEach(elem -> filters.should(QueryBuilders.matchQuery(criterion.getField(), elem))); return filters; + } else if (condition == Condition.IS_NULL) { + return QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery(criterion.getField())); } else if (condition == Condition.GREATER_THAN) { return QueryBuilders.rangeQuery(criterion.getField()).gt(criterion.getValue().trim()); } else if (condition == Condition.GREATER_THAN_OR_EQUAL_TO) { diff --git a/metadata-models/src/main/pegasus/com/linkedin/dashboard/DashboardUsageStatistics.pdl b/metadata-models/src/main/pegasus/com/linkedin/dashboard/DashboardUsageStatistics.pdl index 468afd34ca526..b45989847c674 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/dashboard/DashboardUsageStatistics.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/dashboard/DashboardUsageStatistics.pdl @@ -3,7 +3,7 @@ namespace com.linkedin.dashboard import com.linkedin.timeseries.TimeseriesAspectBase /** - * Stats corresponding to dashboard's usage. + * Experimental (Subject to breaking change) -- Stats corresponding to dashboard's usage. * * If this aspect represents the latest snapshot of the statistics about a Dashboard, the eventGranularity field should be null. * If this aspect represents a bucketed window of usage statistics (e.g. over a day), then the eventGranularity field should be set accordingly. diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/query/filter/Condition.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/query/filter/Condition.pdl index 93924f7c66fcc..35d3a7e823f46 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/metadata/query/filter/Condition.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/query/filter/Condition.pdl @@ -20,6 +20,11 @@ enum Condition { */ EQUAL + /** + * Represent the relation: field is null, e.g. platform is null + */ + IS_NULL + /** * Represent the relation greater than, e.g. ownerCount > 5 */ diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.analytics.analytics.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.analytics.analytics.snapshot.json index e2b51e8d62d54..0f54e05ccd5e4 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.analytics.analytics.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.analytics.analytics.snapshot.json @@ -56,7 +56,7 @@ "type" : "enum", "name" : "Condition", "doc" : "The matching condition in a filter criterion", - "symbols" : [ "CONTAIN", "END_WITH", "EQUAL", "GREATER_THAN", "GREATER_THAN_OR_EQUAL_TO", "IN", "LESS_THAN", "LESS_THAN_OR_EQUAL_TO", "START_WITH" ], + "symbols" : [ "CONTAIN", "END_WITH", "EQUAL", "IS_NULL", "GREATER_THAN", "GREATER_THAN_OR_EQUAL_TO", "IN", "LESS_THAN", "LESS_THAN_OR_EQUAL_TO", "START_WITH" ], "symbolDocs" : { "CONTAIN" : "Represent the relation: String field contains value, e.g. name contains Profile", "END_WITH" : "Represent the relation: String field ends with value, e.g. name ends with Event", @@ -64,6 +64,7 @@ "GREATER_THAN" : "Represent the relation greater than, e.g. ownerCount > 5", "GREATER_THAN_OR_EQUAL_TO" : "Represent the relation greater than or equal to, e.g. ownerCount >= 5", "IN" : "Represent the relation: String field is one of the array values to, e.g. name in [\"Profile\", \"Event\"]", + "IS_NULL" : "Represent the relation: field is null, e.g. platform is null", "LESS_THAN" : "Represent the relation less than, e.g. ownerCount < 3", "LESS_THAN_OR_EQUAL_TO" : "Represent the relation less than or equal to, e.g. ownerCount <= 3", "START_WITH" : "Represent the relation: String field starts with value, e.g. name starts with PageView" diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json index 927dfabaa12ea..d47cd8b790c16 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json @@ -146,7 +146,7 @@ "type" : "enum", "name" : "Condition", "doc" : "The matching condition in a filter criterion", - "symbols" : [ "CONTAIN", "END_WITH", "EQUAL", "GREATER_THAN", "GREATER_THAN_OR_EQUAL_TO", "IN", "LESS_THAN", "LESS_THAN_OR_EQUAL_TO", "START_WITH" ], + "symbols" : [ "CONTAIN", "END_WITH", "EQUAL", "IS_NULL", "GREATER_THAN", "GREATER_THAN_OR_EQUAL_TO", "IN", "LESS_THAN", "LESS_THAN_OR_EQUAL_TO", "START_WITH" ], "symbolDocs" : { "CONTAIN" : "Represent the relation: String field contains value, e.g. name contains Profile", "END_WITH" : "Represent the relation: String field ends with value, e.g. name ends with Event", @@ -154,6 +154,7 @@ "GREATER_THAN" : "Represent the relation greater than, e.g. ownerCount > 5", "GREATER_THAN_OR_EQUAL_TO" : "Represent the relation greater than or equal to, e.g. ownerCount >= 5", "IN" : "Represent the relation: String field is one of the array values to, e.g. name in [\"Profile\", \"Event\"]", + "IS_NULL" : "Represent the relation: field is null, e.g. platform is null", "LESS_THAN" : "Represent the relation less than, e.g. ownerCount < 3", "LESS_THAN_OR_EQUAL_TO" : "Represent the relation less than or equal to, e.g. ownerCount <= 3", "START_WITH" : "Represent the relation: String field starts with value, e.g. name starts with PageView" diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json index 365a1d1a597a2..4b971a0fca9da 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json @@ -5157,7 +5157,7 @@ "name" : "Condition", "namespace" : "com.linkedin.metadata.query.filter", "doc" : "The matching condition in a filter criterion", - "symbols" : [ "CONTAIN", "END_WITH", "EQUAL", "GREATER_THAN", "GREATER_THAN_OR_EQUAL_TO", "IN", "LESS_THAN", "LESS_THAN_OR_EQUAL_TO", "START_WITH" ], + "symbols" : [ "CONTAIN", "END_WITH", "EQUAL", "IS_NULL", "GREATER_THAN", "GREATER_THAN_OR_EQUAL_TO", "IN", "LESS_THAN", "LESS_THAN_OR_EQUAL_TO", "START_WITH" ], "symbolDocs" : { "CONTAIN" : "Represent the relation: String field contains value, e.g. name contains Profile", "END_WITH" : "Represent the relation: String field ends with value, e.g. name ends with Event", @@ -5165,6 +5165,7 @@ "GREATER_THAN" : "Represent the relation greater than, e.g. ownerCount > 5", "GREATER_THAN_OR_EQUAL_TO" : "Represent the relation greater than or equal to, e.g. ownerCount >= 5", "IN" : "Represent the relation: String field is one of the array values to, e.g. name in [\"Profile\", \"Event\"]", + "IS_NULL" : "Represent the relation: field is null, e.g. platform is null", "LESS_THAN" : "Represent the relation less than, e.g. ownerCount < 3", "LESS_THAN_OR_EQUAL_TO" : "Represent the relation less than or equal to, e.g. ownerCount <= 3", "START_WITH" : "Represent the relation: String field starts with value, e.g. name starts with PageView" diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/Constants.java b/metadata-utils/src/main/java/com/linkedin/metadata/Constants.java index 945422ec92429..74ce31899c25a 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/Constants.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/Constants.java @@ -102,6 +102,7 @@ public class Constants { public static final String DASHBOARD_KEY_ASPECT_NAME = "dashboardKey"; public static final String DASHBOARD_INFO_ASPECT_NAME = "dashboardInfo"; public static final String EDITABLE_DASHBOARD_PROPERTIES_ASPECT_NAME = "editableDashboardProperties"; + public static final String DASHBOARD_USAGE_STATISTICS_ASPECT_NAME = "dashboardUsageStatistics"; // Notebook public static final String NOTEBOOK_KEY_ASPECT_NAME = "notebookKey";