Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: usage stats (part 1) #2750

Merged
merged 82 commits into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
750a4f3
start usage stats
hsheth2 Jun 10, 2021
42aec23
more work on usage stats pipelining
hsheth2 Jun 10, 2021
e27ce48
refactor delayed_iter
hsheth2 Jun 10, 2021
98755a4
minor updates
hsheth2 Jun 11, 2021
de83ab9
Merge branch 'master' into usage-stats
hsheth2 Jun 11, 2021
e0efbcd
add aggregation
hsheth2 Jun 11, 2021
f786c1a
handle partitions
hsheth2 Jun 11, 2021
7cab7ca
Merge branch 'master' into usage-stats
hsheth2 Jun 14, 2021
236c9b5
add types for cachetools
hsheth2 Jun 14, 2021
5468984
start java side write path
hsheth2 Jun 15, 2021
b2499e3
Merge branch 'master' into usage-stats
hsheth2 Jun 15, 2021
04aea8c
add some snapshots
hsheth2 Jun 15, 2021
38b7056
start writing pdl
hsheth2 Jun 15, 2021
05c70c3
usage stats ingest interface
hsheth2 Jun 15, 2021
628ced8
more models
hsheth2 Jun 15, 2021
51f3d8a
keep building out write path
hsheth2 Jun 16, 2021
b67fd29
user in document
hsheth2 Jun 16, 2021
83100d9
building es document
hsheth2 Jun 16, 2021
69c0d3a
more index stuff
hsheth2 Jun 16, 2021
18aa95b
refactor index building
hsheth2 Jun 16, 2021
6b4254d
composition instead of inheritance
hsheth2 Jun 16, 2021
9c563bc
checkstyle
hsheth2 Jun 16, 2021
fd2a4b3
configure index builders
hsheth2 Jun 16, 2021
fb7614f
call configure
hsheth2 Jun 16, 2021
ae05e57
write path probably done?
hsheth2 Jun 16, 2021
6dc9828
Merge branch 'master' into usage-stats
hsheth2 Jun 17, 2021
58eb057
expose usage aggregation to python
hsheth2 Jun 17, 2021
c28a38d
fix bug + stop suppressing error messages
hsheth2 Jun 17, 2021
9a0179d
add usage work unit
hsheth2 Jun 17, 2021
9d9314f
make pipeline more generic
hsheth2 Jun 17, 2021
74eb484
more framework changes
hsheth2 Jun 17, 2021
3da4aea
make bigquery a real source
hsheth2 Jun 17, 2021
27b6b7f
add file-based test for usage data
hsheth2 Jun 18, 2021
6d190d9
add datahub rest sink tests
hsheth2 Jun 18, 2021
ca699e7
better log levels
hsheth2 Jun 18, 2021
2448dad
add more aggs tracking
hsheth2 Jun 19, 2021
dc03d34
start query interface
hsheth2 Jun 19, 2021
57d7afc
more query handling logic
hsheth2 Jun 19, 2021
7386bc0
Merge branch 'master' into usage-stats
hsheth2 Jun 21, 2021
76b1d31
bucket read path
hsheth2 Jun 21, 2021
0781866
fix number parsing
hsheth2 Jun 21, 2021
079141e
update models
hsheth2 Jun 21, 2021
4748bac
update test
hsheth2 Jun 21, 2021
6cab940
fix imports
hsheth2 Jun 21, 2021
5c18e1e
adding initial usage stats code to the header
gabe-lyons Jun 21, 2021
9cbc5a8
populate a couple more fields
hsheth2 Jun 22, 2021
31beac4
Merge branch 'usage-stats' of github.com:acryldata/datahub-fork into …
gabe-lyons Jun 22, 2021
d66f906
pdl type updates
gabe-lyons Jun 22, 2021
cd4e4b3
Merge branch 'usage-stats' of github.com:acryldata/datahub-fork into …
hsheth2 Jun 22, 2021
815ff32
Merge branch 'master' into usage-stats
hsheth2 Jun 22, 2021
d1a87c2
adding usage stats client
gabe-lyons Jun 22, 2021
6902d77
update models
hsheth2 Jun 22, 2021
2d42f6b
Merge branch 'usage-stats' of github.com:acryldata/datahub-fork into …
hsheth2 Jun 22, 2021
6eeb2bf
add usage stats file source and update tests
hsheth2 Jun 22, 2021
d712f4d
Merge branch 'master' into usage-stats
hsheth2 Jun 22, 2021
8aa29ab
refactor doc generation into elastic provider
hsheth2 Jun 22, 2021
c67b5de
refactor some constants out
hsheth2 Jun 22, 2021
f84412e
Merge branch 'master' into usage-stats
hsheth2 Jun 22, 2021
6b9dc96
adding usage type
gabe-lyons Jun 22, 2021
646514c
fix imports
hsheth2 Jun 22, 2021
296870d
adding mappers
gabe-lyons Jun 22, 2021
d601cfb
start building aggregations
hsheth2 Jun 22, 2021
c0bcecc
tweak aggs to work
hsheth2 Jun 22, 2021
0898f2b
mce_file -> file rename
hsheth2 Jun 22, 2021
9430e02
finish graphql mapper side
gabe-lyons Jun 22, 2021
ef9344d
Merge branch 'usage-stats' of github.com:acryldata/datahub-fork into …
gabe-lyons Jun 22, 2021
e81a778
fix usage serde issue
hsheth2 Jun 23, 2021
7d4e5a5
usage query
gabe-lyons Jun 23, 2021
be28e74
Merge branch 'usage-stats' of github.com:acryldata/datahub-fork into …
gabe-lyons Jun 23, 2021
0420cf1
add queryRange interface
hsheth2 Jun 23, 2021
4b52049
Merge branch 'usage-stats' of github.com:acryldata/datahub-fork into …
hsheth2 Jun 23, 2021
6a2b384
usage stats in the UI
gabe-lyons Jun 23, 2021
324f128
Merge branch 'usage-stats' of github.com:acryldata/datahub-fork into …
gabe-lyons Jun 23, 2021
67d4c7e
start snowflake usage stats
hsheth2 Jun 23, 2021
a2fe031
Merge branch 'master' into usage-stats
hsheth2 Jun 23, 2021
51a5f22
Merge branch 'usage-stats' of github.com:acryldata/datahub-fork into …
hsheth2 Jun 23, 2021
8063df8
using new usage api
gabe-lyons Jun 23, 2021
bb1d84f
refactor into usage common
hsheth2 Jun 23, 2021
5d49c4e
Merge branch 'usage-stats' of github.com:acryldata/datahub-fork into …
hsheth2 Jun 23, 2021
5abe981
rollback snowflake for now
hsheth2 Jun 23, 2021
4b70bdc
fixing mocks
gabe-lyons Jun 23, 2021
05a2130
Mergt e branch 'usage-stats' of github.com:acryldata/datahub-fork int…
gabe-lyons Jun 23, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.lineage.client.Relationships;
import com.linkedin.metadata.restli.DefaultRestliClientFactory;
import com.linkedin.restli.client.Client;
import com.linkedin.usage.UsageClient;
import com.linkedin.util.Configuration;

/**
Expand Down Expand Up @@ -36,6 +37,7 @@ public class GmsClientFactory {
private static Relationships _relationships;
private static EntityClient _entities;
private static AspectClient _aspects;
private static UsageClient _usage;


private GmsClientFactory() { }
Expand Down Expand Up @@ -94,4 +96,15 @@ public static AspectClient getAspectsClient() {
}
return _aspects;
}

public static UsageClient getUsageClient() {
if (_usage == null) {
synchronized (GmsClientFactory.class) {
if (_usage == null) {
_usage = new UsageClient(REST_CLIENT);
}
}
}
return _usage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
import com.linkedin.datahub.graphql.generated.RelatedDataset;
import com.linkedin.datahub.graphql.generated.SearchResult;
import com.linkedin.datahub.graphql.generated.InstitutionalMemoryMetadata;
import com.linkedin.datahub.graphql.generated.UsageQueryResult;
import com.linkedin.datahub.graphql.generated.UserUsageCounts;
import com.linkedin.datahub.graphql.resolvers.load.AspectResolver;
import com.linkedin.datahub.graphql.resolvers.load.EntityTypeResolver;
import com.linkedin.datahub.graphql.resolvers.load.LoadableTypeBatchResolver;
import com.linkedin.datahub.graphql.resolvers.load.UsageTypeResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.MutableTypeResolver;
import com.linkedin.datahub.graphql.resolvers.type.AspectInterfaceTypeResolver;
import com.linkedin.datahub.graphql.resolvers.type.HyperParameterValueTypeResolver;
Expand Down Expand Up @@ -54,6 +57,7 @@
import com.linkedin.datahub.graphql.types.lineage.DataFlowDataJobsRelationshipsType;
import com.linkedin.datahub.graphql.types.glossary.GlossaryTermType;

import com.linkedin.datahub.graphql.types.usage.UsageType;
import graphql.execution.DataFetcherResult;
import graphql.schema.idl.RuntimeWiring;
import org.apache.commons.io.IOUtils;
Expand Down Expand Up @@ -103,6 +107,7 @@ public class GmsGraphQLEngine {
);
public static final GlossaryTermType GLOSSARY_TERM_TYPE = new GlossaryTermType(GmsClientFactory.getEntitiesClient());
public static final AspectType ASPECT_TYPE = new AspectType(GmsClientFactory.getAspectsClient());
public static final UsageType USAGE_TYPE = new UsageType(GmsClientFactory.getUsageClient());

/**
* Configures the graph objects that can be fetched primary key.
Expand Down Expand Up @@ -200,10 +205,11 @@ public static void configureRuntimeWiring(final RuntimeWiring.Builder builder) {

public static GraphQLEngine.Builder builder() {
return GraphQLEngine.builder()
.addSchema(schema())
.addDataLoaders(loaderSuppliers(LOADABLE_TYPES))
.addDataLoader("Aspect", (context) -> createAspectLoader(context))
.configureRuntimeWiring(GmsGraphQLEngine::configureRuntimeWiring);
.addSchema(schema())
.addDataLoaders(loaderSuppliers(LOADABLE_TYPES))
.addDataLoader("Aspect", (context) -> createAspectLoader(context))
.addDataLoader("UsageQueryResult", (context) -> createUsageLoader(context))
shirshanka marked this conversation as resolved.
Show resolved Hide resolved
.configureRuntimeWiring(GmsGraphQLEngine::configureRuntimeWiring);
}

public static GraphQLEngine get() {
Expand All @@ -219,52 +225,52 @@ public static GraphQLEngine get() {

private static void configureQueryResolvers(final RuntimeWiring.Builder builder) {
builder.type("Query", typeWiring -> typeWiring
.dataFetcher("search", new AuthenticatedResolver<>(
new SearchResolver(SEARCHABLE_TYPES)))
.dataFetcher("autoComplete", new AuthenticatedResolver<>(
new AutoCompleteResolver(SEARCHABLE_TYPES)))
.dataFetcher("autoCompleteForAll", new AuthenticatedResolver<>(
new AutoCompleteForAllResolver(SEARCHABLE_TYPES)))
.dataFetcher("browse", new AuthenticatedResolver<>(
new BrowseResolver(BROWSABLE_TYPES)))
.dataFetcher("browsePaths", new AuthenticatedResolver<>(
new BrowsePathsResolver(BROWSABLE_TYPES)))
.dataFetcher("dataset", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
DATASET_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("corpUser", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
CORP_USER_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("corpGroup", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
CORP_GROUP_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("dashboard", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
DASHBOARD_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("chart", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
CHART_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("tag", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
TAG_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("dataFlow", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
DATA_FLOW_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("dataJob", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
DATA_JOB_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("glossaryTerm", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
GLOSSARY_TERM_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("search", new AuthenticatedResolver<>(
new SearchResolver(SEARCHABLE_TYPES)))
.dataFetcher("autoComplete", new AuthenticatedResolver<>(
new AutoCompleteResolver(SEARCHABLE_TYPES)))
.dataFetcher("autoCompleteForAll", new AuthenticatedResolver<>(
new AutoCompleteForAllResolver(SEARCHABLE_TYPES)))
.dataFetcher("browse", new AuthenticatedResolver<>(
new BrowseResolver(BROWSABLE_TYPES)))
.dataFetcher("browsePaths", new AuthenticatedResolver<>(
new BrowsePathsResolver(BROWSABLE_TYPES)))
.dataFetcher("dataset", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
DATASET_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("corpUser", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
CORP_USER_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("corpGroup", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
CORP_GROUP_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("dashboard", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
DASHBOARD_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("chart", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
CHART_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("tag", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
TAG_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("dataFlow", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
DATA_FLOW_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("dataJob", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
DATA_JOB_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("glossaryTerm", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
GLOSSARY_TERM_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
);
}

Expand Down Expand Up @@ -300,7 +306,8 @@ private static void configureDatasetResolvers(final RuntimeWiring.Builder builde
UPSTREAM_LINEAGE_TYPE,
(env) -> ((Entity) env.getSource()).getUrn()))
)
.dataFetcher("schemaMetadata", new AuthenticatedResolver<>(
.dataFetcher("usageStats", new AuthenticatedResolver<>(new UsageTypeResolver()))
.dataFetcher("schemaMetadata", new AuthenticatedResolver<>(
new AspectResolver())
)
)
Expand All @@ -311,6 +318,13 @@ private static void configureDatasetResolvers(final RuntimeWiring.Builder builde
(env) -> ((Owner) env.getSource()).getOwner()))
)
)
.type("UserUsageCounts", typeWiring -> typeWiring
.dataFetcher("user", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
CORP_USER_TYPE,
(env) -> ((UserUsageCounts) env.getSource()).getUser().getUrn()))
)
)
.type("RelatedDataset", typeWiring -> typeWiring
.dataFetcher("dataset", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
Expand Down Expand Up @@ -554,6 +568,18 @@ private static DataLoader<VersionedAspectKey, DataFetcherResult<Aspect>> createA
}), loaderOptions);
}

private static DataLoader<UsageStatsKey, DataFetcherResult<UsageQueryResult>> createUsageLoader(final QueryContext queryContext) {
BatchLoaderContextProvider contextProvider = () -> queryContext;
DataLoaderOptions loaderOptions = DataLoaderOptions.newOptions().setBatchLoaderContextProvider(contextProvider);
return DataLoader.newDataLoader((keys, context) -> CompletableFuture.supplyAsync(() -> {
try {
return USAGE_TYPE.batchLoad(keys, context.getContext());
} catch (Exception e) {
throw new RuntimeException(String.format("Failed to retrieve usage stats", e));
}
}), loaderOptions);
}

private GmsGraphQLEngine() { }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.linkedin.datahub.graphql;

import com.linkedin.usage.UsageTimeRange;
import lombok.Data;


@Data
public class UsageStatsKey {
private String resource;
private UsageTimeRange range;

public UsageStatsKey(String resource, UsageTimeRange range) {
this.resource = resource;
this.range = range;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.linkedin.datahub.graphql.resolvers.load;

import com.linkedin.datahub.graphql.UsageStatsKey;

import com.linkedin.datahub.graphql.types.LoadableType;
import com.linkedin.pegasus2avro.usage.UsageQueryResult;
import com.linkedin.usage.UsageTimeRange;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.concurrent.CompletableFuture;
import org.dataloader.DataLoader;


/**
* Generic GraphQL resolver responsible for
*
* 1. Retrieving a single input urn.
* 2. Resolving a single {@link LoadableType}.
*
* Note that this resolver expects that {@link DataLoader}s were registered
* for the provided {@link LoadableType} under the name provided by {@link LoadableType#name()}
*
*/
public class UsageTypeResolver implements DataFetcher<CompletableFuture<UsageQueryResult>> {

@Override
public CompletableFuture<UsageQueryResult> get(DataFetchingEnvironment environment) {
final DataLoader<UsageStatsKey, UsageQueryResult> loader = environment.getDataLoaderRegistry().getDataLoader("UsageQueryResult");

String resource = environment.getArgument("resource");
UsageTimeRange duration = UsageTimeRange.valueOf(environment.getArgument("range"));

UsageStatsKey key = new UsageStatsKey(resource, duration);

return loader.load(key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.linkedin.datahub.graphql.types.usage;

import com.linkedin.datahub.graphql.generated.UsageAggregation;
import com.linkedin.datahub.graphql.generated.WindowDuration;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import javax.annotation.Nonnull;


public class UsageAggregationMapper implements
ModelMapper<com.linkedin.usage.UsageAggregation, UsageAggregation> {

public static final UsageAggregationMapper INSTANCE = new UsageAggregationMapper();

public static UsageAggregation map(@Nonnull final com.linkedin.usage.UsageAggregation pdlUsageAggregation) {
return INSTANCE.apply(pdlUsageAggregation);
}

@Override
public UsageAggregation apply(@Nonnull final com.linkedin.usage.UsageAggregation pdlUsageAggregation) {
UsageAggregation result = new UsageAggregation();
result.setBucket(pdlUsageAggregation.getBucket());

if (pdlUsageAggregation.hasDuration()) {
result.setDuration(WindowDuration.valueOf(pdlUsageAggregation.getDuration().toString()));
}
if (pdlUsageAggregation.hasResource()) {
result.setResource(pdlUsageAggregation.getResource().toString());
}
if (pdlUsageAggregation.hasMetrics()) {
result.setMetrics(UsageAggregationMetricsMapper.map(pdlUsageAggregation.getMetrics()));
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.linkedin.datahub.graphql.types.usage;

import com.linkedin.datahub.graphql.generated.UsageAggregationMetrics;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;


public class UsageAggregationMetricsMapper implements
ModelMapper<com.linkedin.usage.UsageAggregationMetrics, UsageAggregationMetrics> {

public static final UsageAggregationMetricsMapper INSTANCE = new UsageAggregationMetricsMapper();

public static UsageAggregationMetrics map(@Nonnull final com.linkedin.usage.UsageAggregationMetrics usageAggregationMetrics) {
return INSTANCE.apply(usageAggregationMetrics);
}

@Override
public UsageAggregationMetrics apply(@Nonnull final com.linkedin.usage.UsageAggregationMetrics usageAggregationMetrics) {
UsageAggregationMetrics result = new UsageAggregationMetrics();
result.setTotalSqlQueries(usageAggregationMetrics.getTotalSqlQueries());
result.setUniqueUserCount(usageAggregationMetrics.getUniqueUserCount());
result.setTopSqlQueries(usageAggregationMetrics.getTopSqlQueries());
if (usageAggregationMetrics.hasUsers()) {
result.setUsers(usageAggregationMetrics.getUsers()
.stream()
.map(aggregation -> UserUsageCountsMapper.map(aggregation))
.collect(Collectors.toList()));
}

return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.linkedin.datahub.graphql.types.usage;

import com.linkedin.datahub.graphql.generated.UsageQueryResultAggregations;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;


public class UsageQueryResultAggregationMapper implements
ModelMapper<com.linkedin.usage.UsageQueryResultAggregations, UsageQueryResultAggregations> {

public static final UsageQueryResultAggregationMapper INSTANCE = new UsageQueryResultAggregationMapper();

public static UsageQueryResultAggregations map(@Nonnull final com.linkedin.usage.UsageQueryResultAggregations pdlUsageResultAggregations) {
return INSTANCE.apply(pdlUsageResultAggregations);
}

@Override
public UsageQueryResultAggregations apply(@Nonnull final com.linkedin.usage.UsageQueryResultAggregations pdlUsageResultAggregations) {
UsageQueryResultAggregations result = new UsageQueryResultAggregations();
result.setTotalSqlQueries(pdlUsageResultAggregations.getTotalSqlQueries());
result.setUniqueUserCount(pdlUsageResultAggregations.getUniqueUserCount());
if (pdlUsageResultAggregations.hasUsers()) {
result.setUsers(pdlUsageResultAggregations.getUsers()
.stream()
.map(aggregation -> UserUsageCountsMapper.map(aggregation))
.collect(Collectors.toList()));
}
return result;
}
}
Loading