Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(policies): Make policies searchable by privilege, type, status or editable fields #9877

Merged
merged 7 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.linkedin.datahub.upgrade.config;

import com.linkedin.datahub.upgrade.system.entity.steps.BackfillPolicyFields;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.SearchService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BackfillPolicyFieldsConfig {

@Bean
public BackfillPolicyFields backfillPolicyFields(
EntityService<?> entityService,
SearchService searchService,
@Value("${systemUpdate.policyFields.enabled}") final boolean enabled,
@Value("${systemUpdate.policyFields.reprocess.enabled}") final boolean reprocessEnabled,
@Value("${systemUpdate.policyFields.batchSize}") final Integer batchSize) {
return new BackfillPolicyFields(
entityService, searchService, enabled, reprocessEnabled, batchSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices;
import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices;
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2;
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillPolicyFields;
import com.linkedin.datahub.upgrade.system.via.ReindexDataJobViaNodesCLL;
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.gms.factory.config.ConfigurationProvider;
Expand Down Expand Up @@ -40,7 +41,8 @@ public SystemUpdate systemUpdate(
final GitVersion gitVersion,
@Qualifier("revision") String revision,
final BackfillBrowsePathsV2 backfillBrowsePathsV2,
final ReindexDataJobViaNodesCLL reindexDataJobViaNodesCLL) {
final ReindexDataJobViaNodesCLL reindexDataJobViaNodesCLL,
final BackfillPolicyFields backfillPolicyFields) {

String version = String.format("%s-%s", gitVersion.getVersion(), revision);
return new SystemUpdate(
Expand All @@ -49,7 +51,8 @@ public SystemUpdate systemUpdate(
kafkaEventProducer,
version,
backfillBrowsePathsV2,
reindexDataJobViaNodesCLL);
reindexDataJobViaNodesCLL,
backfillPolicyFields);
}

@Value("#{systemEnvironment['DATAHUB_REVISION'] ?: '0'}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices;
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2;
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillPolicyFields;
import com.linkedin.datahub.upgrade.system.via.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import java.util.List;
Expand All @@ -26,11 +27,13 @@ public SystemUpdate(
final KafkaEventProducer kafkaEventProducer,
final String version,
final BackfillBrowsePathsV2 backfillBrowsePathsV2,
final ReindexDataJobViaNodesCLL upgradeViaNodeCll) {
final ReindexDataJobViaNodesCLL upgradeViaNodeCll,
final BackfillPolicyFields backfillPolicyFields) {

_preStartupUpgrades = List.of(buildIndicesJob);
_steps = List.of(new DataHubStartupStep(kafkaEventProducer, version));
_postStartupUpgrades = List.of(cleanIndicesJob, backfillBrowsePathsV2, upgradeViaNodeCll);
_postStartupUpgrades =
List.of(cleanIndicesJob, backfillBrowsePathsV2, upgradeViaNodeCll, backfillPolicyFields);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.linkedin.datahub.upgrade.system.entity.steps;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.Upgrade;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.SearchService;
import java.util.List;

public class BackfillPolicyFields implements Upgrade {
private final List<UpgradeStep> _steps;

public BackfillPolicyFields(
EntityService<?> entityService,
SearchService searchService,
boolean enabled,
boolean reprocessEnabled,
Integer batchSize) {
if (enabled) {
_steps =
ImmutableList.of(
new BackfillPolicyFieldsStep(
entityService, searchService, reprocessEnabled, batchSize));
} else {
_steps = ImmutableList.of();
}
}

@Override
public String id() {
return "BackfillPolicyField";
pedro93 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public List<UpgradeStep> steps() {
return _steps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package com.linkedin.datahub.upgrade.system.entity.steps;

import static com.linkedin.metadata.Constants.*;

import com.google.common.collect.ImmutableList;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.DataMap;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.entity.EntityResponse;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.search.ScrollResult;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.policy.DataHubPolicyInfo;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;

/**
* This bootstrap step is responsible for upgrading DataHub policy documents with new searchable
* fields in ES
*/
@Slf4j
public class BackfillPolicyFieldsStep implements UpgradeStep {
private static final String UPGRADE_ID = "BackfillPolicyFieldsStep";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);
private final boolean reprocessEnabled;
private final Integer batchSize;
private final EntityService<?> entityService;
private final SearchService _searchService;

public BackfillPolicyFieldsStep(
EntityService<?> entityService,
SearchService searchService,
boolean reprocessEnabled,
Integer batchSize) {
this.entityService = entityService;
this._searchService = searchService;
this.reprocessEnabled = reprocessEnabled;
this.batchSize = batchSize;
}

@Override
public String id() {
return UPGRADE_ID;
}

@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
final AuditStamp auditStamp =
new AuditStamp()
.setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR))
.setTime(System.currentTimeMillis());

String scrollId = null;
int migratedCount = 0;
do {
log.info(
String.format(
"Upgrading batch of policies %s-%s", migratedCount, migratedCount + batchSize));
pedro93 marked this conversation as resolved.
Show resolved Hide resolved
scrollId = backfillPolicies(auditStamp, scrollId);
migratedCount += batchSize;
} while (scrollId != null);

BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, entityService);

return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}

/**
* Returns whether the upgrade should proceed if the step fails after exceeding the maximum
* retries.
*/
@Override
public boolean isOptional() {
return true;
}

/**
* Returns whether the upgrade should be skipped. Uses previous run history or the environment
* variables REPROCESS_DEFAULT_POLICY_FIELDS & BACKFILL_BROWSE_PATHS_V2 to determine whether to
* skip.
*/
@Override
public boolean skip(UpgradeContext context) {

if (reprocessEnabled) {
return false;
}

boolean previouslyRun = entityService.exists(UPGRADE_ID_URN, true);
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
}
return previouslyRun;
}

private String backfillPolicies(AuditStamp auditStamp, String scrollId) {

final Filter filter = backfillPolicyFieldFilter();
final ScrollResult scrollResult =
_searchService.scrollAcrossEntities(
ImmutableList.of(Constants.POLICY_ENTITY_NAME),
"*",
filter,
null,
scrollId,
null,
batchSize,
new SearchFlags()
.setFulltext(true)
.setSkipCache(true)
.setSkipHighlighting(true)
.setSkipAggregates(true));

if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().isEmpty()) {
return null;
}

for (SearchEntity searchEntity : scrollResult.getEntities()) {
try {
ingestPolicyFields(searchEntity.getEntity(), auditStamp);
} catch (Exception e) {
// don't stop the whole step because of one bad urn or one bad ingestion
log.error(
String.format(
"Error ingesting default browsePathsV2 aspect for urn %s",
searchEntity.getEntity()),
e);
}
}

return scrollResult.getScrollId();
}

private Filter backfillPolicyFieldFilter() {
// Condition: Does not have at least 1 of: `privileges`, `editable`, `state` or `type`
ConjunctiveCriterionArray conjunctiveCriterionArray = new ConjunctiveCriterionArray();

conjunctiveCriterionArray.add(getCriterionForMissingField("privilege"));
conjunctiveCriterionArray.add(getCriterionForMissingField("editable"));
conjunctiveCriterionArray.add(getCriterionForMissingField("state"));
conjunctiveCriterionArray.add(getCriterionForMissingField("type"));

Filter filter = new Filter();
filter.setOr(conjunctiveCriterionArray);
return filter;
}

private void ingestPolicyFields(Urn urn, AuditStamp auditStamp) {
EntityResponse entityResponse = null;
try {
entityResponse =
entityService.getEntityV2(
urn.getEntityType(), urn, Collections.singleton(DATAHUB_POLICY_INFO_ASPECT_NAME));
} catch (URISyntaxException e) {
log.error(
String.format(
"Error getting DataHub Policy Info for entity with urn %s while restating policy information",
urn),
e);
}

if (entityResponse != null
&& entityResponse.getAspects().containsKey(DATAHUB_POLICY_INFO_ASPECT_NAME)) {
final DataMap dataMap =
entityResponse.getAspects().get(DATAHUB_POLICY_INFO_ASPECT_NAME).getValue().data();
final DataHubPolicyInfo infoAspect = new DataHubPolicyInfo(dataMap);
log.debug(
String.format("Restating policy information for urn %s with value %s", urn, infoAspect));
pedro93 marked this conversation as resolved.
Show resolved Hide resolved
MetadataChangeProposal proposal = new MetadataChangeProposal();
proposal.setEntityUrn(urn);
proposal.setEntityType(urn.getEntityType());
proposal.setAspectName(DATAHUB_POLICY_INFO_ASPECT_NAME);
proposal.setChangeType(ChangeType.RESTATE);
proposal.setSystemMetadata(
new SystemMetadata()
.setRunId(DEFAULT_RUN_ID)
.setLastObserved(System.currentTimeMillis()));
proposal.setAspect(GenericRecordUtils.serializeAspect(infoAspect));
entityService.ingestProposal(proposal, auditStamp, true);
}
}

@NotNull
private static ConjunctiveCriterion getCriterionForMissingField(String field) {
final Criterion missingPrivilegesField = new Criterion();
missingPrivilegesField.setCondition(Condition.IS_NULL);
missingPrivilegesField.setField(field);

final CriterionArray criterionArray = new CriterionArray();
criterionArray.add(missingPrivilegesField);
final ConjunctiveCriterion conjunctiveCriterion = new ConjunctiveCriterion();
conjunctiveCriterion.setAnd(criterionArray);
return conjunctiveCriterion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,17 @@ record DataHubPolicyInfo {
/**
* The type of policy
*/
@Searchable = {
"fieldType": "KEYWORD"
}
type: string

/**
* The state of policy, ACTIVE or INACTIVE
*/
@Searchable = {
"fieldType": "KEYWORD"
}
state: string

/**
Expand All @@ -42,6 +48,12 @@ record DataHubPolicyInfo {
/**
* The privileges that the policy grants.
*/
@Searchable = {
"/*": {
"fieldType": "KEYWORD",
"addToFilters": true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What doesaddToFilters allow us to do?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://datahubproject.io/docs/next/metadata-modeling/extending-the-metadata-model/#searchable - This is to allow the array values to be used as filters from what I understand.

}
}
privileges: array[string]

/**
Expand All @@ -52,6 +64,9 @@ record DataHubPolicyInfo {
/**
* Whether the policy should be editable via the UI
*/
@Searchable = {
"fieldType": "BOOLEAN"
}
editable: boolean = true

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,11 @@ systemUpdate:
batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_BROWSE_PATHS_V2_BATCH_SIZE:5000}
reprocess:
enabled: ${REPROCESS_DEFAULT_BROWSE_PATHS_V2:false}
policyFields:
enabled: ${BOOTSTRAP_SYSTEM_UPDATE_POLICY_FIELDS_ENABLED:true}
batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_POLICY_FIELDS_BATCH_SIZE:5000}
reprocess:
enabled: ${REPROCESS_DEFAULT_POLICY_FIELDS:false}

structuredProperties:
enabled: ${ENABLE_STRUCTURED_PROPERTIES_HOOK:true} # applies structured properties mappings
Expand Down
Loading