Skip to content

Commit

Permalink
fix(policies): Re-revert the policies fix + ingest documents directly…
Browse files Browse the repository at this point in the history
… to search (datahub-project#4733)
  • Loading branch information
Dexter Lee authored and maggiehays committed Aug 1, 2022
1 parent b187eef commit 1e534f0
Show file tree
Hide file tree
Showing 21 changed files with 349 additions and 181 deletions.
Original file line number Diff line number Diff line change
@@ -1,37 +1,31 @@
package com.linkedin.datahub.graphql.resolvers.policy;

import com.linkedin.common.urn.Urn;
import com.datahub.authorization.PolicyFetcher;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.ListPoliciesInput;
import com.linkedin.datahub.graphql.generated.ListPoliciesResult;
import com.linkedin.datahub.graphql.generated.Policy;
import com.linkedin.datahub.graphql.types.policy.mappers.PolicyMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.datahub.graphql.resolvers.policy.mappers.PolicyInfoPolicyMapper;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.query.ListUrnsResult;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
import static com.linkedin.metadata.Constants.*;
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;


public class ListPoliciesResolver implements DataFetcher<CompletableFuture<ListPoliciesResult>> {

private static final Integer DEFAULT_START = 0;
private static final Integer DEFAULT_COUNT = 20;

private final EntityClient _entityClient;
private final PolicyFetcher _policyFetcher;

public ListPoliciesResolver(final EntityClient entityClient) {
_entityClient = entityClient;
_policyFetcher = new PolicyFetcher(entityClient);
}

@Override
Expand All @@ -47,20 +41,16 @@ public CompletableFuture<ListPoliciesResult> get(final DataFetchingEnvironment e
return CompletableFuture.supplyAsync(() -> {
try {
// First, get all policy Urns.
final ListUrnsResult gmsResult = _entityClient.listUrns(POLICY_ENTITY_NAME, start, count, context.getAuthentication());

// Then, get all policies. TODO: Migrate batchGet to return GenericAspects, to avoid requiring a snapshot.
final Map<Urn, EntityResponse> entities = _entityClient.batchGetV2(POLICY_ENTITY_NAME,
new HashSet<>(gmsResult.getEntities()), null, context.getAuthentication());
final PolicyFetcher.PolicyFetchResult policyFetchResult =
_policyFetcher.fetchPolicies(start, count, context.getAuthentication());

// Now that we have entities we can bind this to a result.
final ListPoliciesResult result = new ListPoliciesResult();
result.setStart(gmsResult.getStart());
result.setCount(gmsResult.getCount());
result.setTotal(gmsResult.getTotal());
result.setPolicies(mapEntities(entities.values()));
result.setStart(start);
result.setCount(count);
result.setTotal(policyFetchResult.getTotal());
result.setPolicies(mapEntities(policyFetchResult.getPolicies()));
return result;

} catch (Exception e) {
throw new RuntimeException("Failed to list policies", e);
}
Expand All @@ -69,9 +59,11 @@ public CompletableFuture<ListPoliciesResult> get(final DataFetchingEnvironment e
throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator.");
}

private List<Policy> mapEntities(final Collection<EntityResponse> entities) {
return entities.stream()
.map(PolicyMapper::map)
.collect(Collectors.toList());
private List<Policy> mapEntities(final List<PolicyFetcher.Policy> policies) {
return policies.stream().map(policy -> {
Policy mappedPolicy = PolicyInfoPolicyMapper.map(policy.getPolicyInfo());
mappedPolicy.setUrn(policy.getUrn().toString());
return mappedPolicy;
}).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)

// Create the policy info.
final DataHubPolicyInfo info = PolicyUpdateInputInfoMapper.map(input);
info.setLastUpdatedTimestamp(System.currentTimeMillis());

proposal.setEntityType(POLICY_ENTITY_NAME);
proposal.setAspectName(POLICY_INFO_ASPECT_NAME);
proposal.setAspect(GenericRecordUtils.serializeAspect(info));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public CompletableFuture<SearchResults> get(DataFetchingEnvironment environment)
log.debug("Executing search. entity type {}, query {}, filters: {}, start: {}, count: {}", input.getType(),
input.getQuery(), input.getFilters(), start, count);
return UrnSearchResultsMapper.map(
_entityClient.search(entityName, sanitizedQuery, ResolverUtils.buildFilter(input.getFilters()), start,
_entityClient.search(entityName, sanitizedQuery, ResolverUtils.buildFilter(input.getFilters()), null, start,
count, ResolverUtils.getAuthentication(environment)));
} catch (Exception e) {
log.error("Failed to execute search: entity type {}, query {}, filters: {}, start: {}, count: {}",
Expand Down

This file was deleted.

21 changes: 10 additions & 11 deletions datahub-web-react/src/app/policy/PoliciesPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -163,25 +163,18 @@ export const PoliciesPage = () => {
loading: policiesLoading,
error: policiesError,
data: policiesData,
refetch: policiesRefetch,
} = useListPoliciesQuery({
fetchPolicy: 'no-cache',
variables: { input: { start, count: pageSize } },
});

const listPoliciesQuery = 'listPolicies';

// Any time a policy is removed, edited, or created, refetch the list.
const [createPolicy, { error: createPolicyError }] = useCreatePolicyMutation({
refetchQueries: () => [listPoliciesQuery],
});
const [createPolicy, { error: createPolicyError }] = useCreatePolicyMutation();

const [updatePolicy, { error: updatePolicyError }] = useUpdatePolicyMutation({
refetchQueries: () => [listPoliciesQuery],
});
const [updatePolicy, { error: updatePolicyError }] = useUpdatePolicyMutation();

const [deletePolicy, { error: deletePolicyError }] = useDeletePolicyMutation({
refetchQueries: () => [listPoliciesQuery],
});
const [deletePolicy, { error: deletePolicyError }] = useDeletePolicyMutation();

const updateError = createPolicyError || updatePolicyError || deletePolicyError;

Expand Down Expand Up @@ -244,6 +237,9 @@ export const PoliciesPage = () => {
content: `Are you sure you want to remove policy?`,
onOk() {
deletePolicy({ variables: { urn: policy?.urn as string } }); // There must be a focus policy urn.
setTimeout(function () {
policiesRefetch();
}, 2000);
onCancelViewPolicy();
},
onCancel() {},
Expand Down Expand Up @@ -282,6 +278,9 @@ export const PoliciesPage = () => {
createPolicy({ variables: { input: toPolicyInput(savePolicy) } });
}
message.success('Successfully saved policy.');
setTimeout(function () {
policiesRefetch();
}, 2000);
onClosePolicyBuilder();
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata.search.utils;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.LongMap;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
Expand All @@ -14,8 +15,11 @@
import com.linkedin.metadata.utils.SearchUtil;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -40,6 +44,15 @@ private SearchUtils() {

}

public static Optional<String> getDocId(@Nonnull Urn urn) {
try {
return Optional.of(URLEncoder.encode(urn.toString(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
log.error("Failed to encode the urn with error: {}", e.toString());
return Optional.empty();
}
}

/**
* Validates the request params and create a request map out of it.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.transformer.SearchDocumentTransformer;
import com.linkedin.metadata.search.utils.SearchUtils;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import com.linkedin.metadata.timeseries.transformer.TimeseriesAspectTransformer;
Expand Down Expand Up @@ -198,15 +199,13 @@ private void updateSearchService(String entityName, Urn urn, AspectSpec aspectSp
return;
}

String docId;
try {
docId = URLEncoder.encode(urn.toString(), "UTF-8");
} catch (UnsupportedEncodingException e) {
log.error("Failed to encode the urn with error: {}", e.toString());
Optional<String> docId = SearchUtils.getDocId(urn);

if (!docId.isPresent()) {
return;
}

_entitySearchService.upsertDocument(entityName, searchDocument.get(), docId);
_entitySearchService.upsertDocument(entityName, searchDocument.get(), docId.get());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ record DataHubPolicyInfo {
/**
* Display name of the Policy
*/
@Searchable = {
"fieldType": "KEYWORD"
}
displayName: string

/**
Expand Down Expand Up @@ -48,4 +51,11 @@ record DataHubPolicyInfo {
*/
editable: boolean = true

/**
* Timestamp when the policy was last updated
*/
@Searchable = {
"fieldType": "DATETIME"
}
lastUpdatedTimestamp: optional long
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,11 @@
import com.linkedin.common.Ownership;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.authorization.PoliciesConfig;
import com.linkedin.metadata.query.ListUrnsResult;
import com.linkedin.policy.DataHubPolicyInfo;
import com.linkedin.r2.RemoteInvocationException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -25,12 +19,11 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.metadata.Constants.CORP_GROUP_ENTITY_NAME;
import static com.linkedin.metadata.Constants.CORP_USER_ENTITY_NAME;
import static com.linkedin.metadata.Constants.DATAHUB_POLICY_INFO_ASPECT_NAME;
import static com.linkedin.metadata.Constants.POLICY_ENTITY_NAME;


/**
Expand Down Expand Up @@ -80,7 +73,7 @@ public DataHubAuthorizer(
final int refreshIntervalSeconds,
final AuthorizationMode mode) {
_systemAuthentication = systemAuthentication;
_policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, entityClient, _policyCache);
_policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, new PolicyFetcher(entityClient), _policyCache);
_refreshExecutorService.scheduleAtFixedRate(_policyRefreshRunnable, delayIntervalSeconds, refreshIntervalSeconds, TimeUnit.SECONDS);
_mode = mode;
_resourceSpecResolver = new ResourceSpecResolver(systemAuthentication, entityClient);
Expand Down Expand Up @@ -213,21 +206,13 @@ private boolean isRequestGranted(final DataHubPolicyInfo policy, final Authoriza
* entire cache using Policies stored in the backend.
*/
@VisibleForTesting
@RequiredArgsConstructor
static class PolicyRefreshRunnable implements Runnable {

private final Authentication _systemAuthentication;
private final EntityClient _entityClient;
private final PolicyFetcher _policyFetcher;
private final Map<String, List<DataHubPolicyInfo>> _policyCache;

public PolicyRefreshRunnable(
final Authentication systemAuthentication,
final EntityClient entityClient,
final Map<String, List<DataHubPolicyInfo>> policyCache) {
_systemAuthentication = systemAuthentication;
_entityClient = entityClient;
_policyCache = policyCache;
}

@Override
public void run() {
try {
Expand All @@ -240,18 +225,16 @@ public void run() {

while (start < total) {
try {
log.debug(String.format("Batch fetching policies. start: %s, count: %s ", start, count));
final ListUrnsResult policyUrns = _entityClient.listUrns(POLICY_ENTITY_NAME, start, count, _systemAuthentication);
final Map<Urn, EntityResponse> policyEntities = _entityClient.batchGetV2(POLICY_ENTITY_NAME,
new HashSet<>(policyUrns.getEntities()), null, _systemAuthentication);
final PolicyFetcher.PolicyFetchResult
policyFetchResult = _policyFetcher.fetchPolicies(start, count, _systemAuthentication);

addPoliciesToCache(newCache, policyEntities.values());
addPoliciesToCache(newCache, policyFetchResult.getPolicies());

total = policyUrns.getTotal();
total = policyFetchResult.getTotal();
start = start + count;
} catch (RemoteInvocationException e) {
log.error(String.format(
"Failed to retrieve policy urns! Skipping updating policy cache until next refresh. start: %s, count: %s", start, count), e);
} catch (Exception e) {
log.error(
"Failed to retrieve policy urns! Skipping updating policy cache until next refresh. start: {}, count: {}", start, count, e);
return;
}
synchronized (_policyCache) {
Expand All @@ -266,19 +249,8 @@ public void run() {
}

private void addPoliciesToCache(final Map<String, List<DataHubPolicyInfo>> cache,
final Collection<EntityResponse> entityResponses) {
for (final EntityResponse entityResponse : entityResponses) {
addPolicyToCache(cache, entityResponse);
}
}

private void addPolicyToCache(final Map<String, List<DataHubPolicyInfo>> cache, final EntityResponse entityResponse) {
EnvelopedAspectMap aspectMap = entityResponse.getAspects();
if (!aspectMap.containsKey(DATAHUB_POLICY_INFO_ASPECT_NAME)) {
throw new IllegalArgumentException(
String.format("Failed to find DataHubPolicyInfo aspect in DataHubPolicy data %s. Invalid state.", aspectMap));
}
addPolicyToCache(cache, new DataHubPolicyInfo(aspectMap.get(DATAHUB_POLICY_INFO_ASPECT_NAME).getValue().data()));
final List<PolicyFetcher.Policy> policies) {
policies.forEach(policy -> addPolicyToCache(cache, policy.getPolicyInfo()));
}

private void addPolicyToCache(final Map<String, List<DataHubPolicyInfo>> cache, final DataHubPolicyInfo policy) {
Expand Down
Loading

0 comments on commit 1e534f0

Please sign in to comment.