Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(SDK) Add StructuredPropertyPatchBuilder in python sdk and provide sample CRUD files #10824

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static com.linkedin.metadata.Constants.GLOSSARY_TERMS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.OWNERSHIP_ASPECT_NAME;
import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTIES_ASPECT_NAME;
import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME;
import static com.linkedin.metadata.Constants.UPSTREAM_LINEAGE_ASPECT_NAME;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -46,7 +47,8 @@ public class AspectTemplateEngine {
DATA_JOB_INPUT_OUTPUT_ASPECT_NAME,
CHART_INFO_ASPECT_NAME,
DASHBOARD_INFO_ASPECT_NAME,
STRUCTURED_PROPERTIES_ASPECT_NAME)
STRUCTURED_PROPERTIES_ASPECT_NAME,
STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME)
.collect(Collectors.toSet());

private final Map<String, Template<? extends RecordTemplate>> _aspectTemplateMap;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.linkedin.metadata.aspect.patch.template.structuredproperty;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.aspect.patch.template.CompoundKeyTemplate;
import com.linkedin.structured.StructuredPropertyDefinition;
import java.util.Collections;
import javax.annotation.Nonnull;

public class StructuredPropertyDefinitionTemplate
extends CompoundKeyTemplate<StructuredPropertyDefinition> {

private static final String ENTITY_TYPES_FIELD_NAME = "entityTypes";
private static final String ALLOWED_VALUES_FIELD_NAME = "allowedValues";
private static final String VALUE_FIELD_NAME = "value";
private static final String UNIT_SEPARATOR_DELIMITER = "␟";

@Override
public StructuredPropertyDefinition getSubtype(RecordTemplate recordTemplate)
throws ClassCastException {
if (recordTemplate instanceof StructuredPropertyDefinition) {
return (StructuredPropertyDefinition) recordTemplate;
}
throw new ClassCastException("Unable to cast RecordTemplate to StructuredPropertyDefinition");
}
Comment on lines +21 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure proper exception handling.

The getSubtype method throws a ClassCastException with a generic message. It might be beneficial to include the class name of the recordTemplate to provide more context in the exception message.

- throw new ClassCastException("Unable to cast RecordTemplate to StructuredPropertyDefinition");
+ throw new ClassCastException("Unable to cast " + recordTemplate.getClass().getName() + " to StructuredPropertyDefinition");
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@Override
public StructuredPropertyDefinition getSubtype(RecordTemplate recordTemplate)
throws ClassCastException {
if (recordTemplate instanceof StructuredPropertyDefinition) {
return (StructuredPropertyDefinition) recordTemplate;
}
throw new ClassCastException("Unable to cast RecordTemplate to StructuredPropertyDefinition");
}
@Override
public StructuredPropertyDefinition getSubtype(RecordTemplate recordTemplate)
throws ClassCastException {
if (recordTemplate instanceof StructuredPropertyDefinition) {
return (StructuredPropertyDefinition) recordTemplate;
}
throw new ClassCastException("Unable to cast " + recordTemplate.getClass().getName() + " to StructuredPropertyDefinition");
}


@Override
public Class<StructuredPropertyDefinition> getTemplateType() {
return StructuredPropertyDefinition.class;
}

@Nonnull
@Override
public StructuredPropertyDefinition getDefault() {
StructuredPropertyDefinition definition = new StructuredPropertyDefinition();
definition.setQualifiedName("");
definition.setValueType(UrnUtils.getUrn("urn:li:dataType:datahub.string"));
definition.setEntityTypes(new UrnArray());

return definition;
}
Comment on lines +35 to +44
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider making default values configurable.

The getDefault method sets a hardcoded default value for valueType. Consider making this configurable to allow for more flexibility.

public StructuredPropertyDefinition getDefault(String defaultValueType) {
    StructuredPropertyDefinition definition = new StructuredPropertyDefinition();
    definition.setQualifiedName("");
    definition.setValueType(UrnUtils.getUrn(defaultValueType));
    definition.setEntityTypes(new UrnArray());
    return definition;
}


@Nonnull
@Override
public JsonNode transformFields(JsonNode baseNode) {
JsonNode transformedNode =
arrayFieldToMap(baseNode, ENTITY_TYPES_FIELD_NAME, Collections.emptyList());

if (transformedNode.get(ALLOWED_VALUES_FIELD_NAME) == null) {
return transformedNode;
}

// allowedValues has a nested key - value.string or value.number depending on type. Mapping
// needs this nested key
JsonNode allowedValues = transformedNode.get(ALLOWED_VALUES_FIELD_NAME);
if (((ArrayNode) allowedValues).size() > 0) {
JsonNode allowedValue = ((ArrayNode) allowedValues).get(0);
JsonNode value = allowedValue.get(VALUE_FIELD_NAME);
String secondaryKeyName = value.fieldNames().next();
return arrayFieldToMap(
transformedNode,
ALLOWED_VALUES_FIELD_NAME,
Collections.singletonList(
VALUE_FIELD_NAME + UNIT_SEPARATOR_DELIMITER + secondaryKeyName));
}

return arrayFieldToMap(
transformedNode, ALLOWED_VALUES_FIELD_NAME, Collections.singletonList(VALUE_FIELD_NAME));
}

@Nonnull
@Override
public JsonNode rebaseFields(JsonNode patched) {
JsonNode patchedNode =
transformedMapToArray(patched, ENTITY_TYPES_FIELD_NAME, Collections.emptyList());

if (patchedNode.get(ALLOWED_VALUES_FIELD_NAME) == null) {
return patchedNode;
}
return transformedMapToArray(
patchedNode, ALLOWED_VALUES_FIELD_NAME, Collections.singletonList(VALUE_FIELD_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.linkedin.metadata.aspect.patch.template.dataset.DatasetPropertiesTemplate;
import com.linkedin.metadata.aspect.patch.template.dataset.EditableSchemaMetadataTemplate;
import com.linkedin.metadata.aspect.patch.template.dataset.UpstreamLineageTemplate;
import com.linkedin.metadata.aspect.patch.template.structuredproperty.StructuredPropertyDefinitionTemplate;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.DefaultEntitySpec;
import com.linkedin.metadata.models.EntitySpec;
Expand Down Expand Up @@ -87,6 +88,8 @@ private AspectTemplateEngine populateTemplateEngine(Map<String, AspectSpec> aspe
aspectSpecTemplateMap.put(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME, new DataJobInputOutputTemplate());
aspectSpecTemplateMap.put(
STRUCTURED_PROPERTIES_ASPECT_NAME, new StructuredPropertiesTemplate());
aspectSpecTemplateMap.put(
STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME, new StructuredPropertyDefinitionTemplate());
return new AspectTemplateEngine(aspectSpecTemplateMap);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import logging

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.specific.dataset import DatasetPatchBuilder

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

# Create rest emitter
rest_emitter = DataHubRestEmitter(gms_server="http://localhost:8080")
Comment on lines +10 to +11
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make GMS server URL configurable.

Hardcoding the GMS server URL can be problematic for different environments. Consider using an environment variable or a configuration file.

- rest_emitter = DataHubRestEmitter(gms_server="http://localhost:8080")
+ import os
+ gms_server = os.getenv("GMS_SERVER", "http://localhost:8080")
+ rest_emitter = DataHubRestEmitter(gms_server=gms_server)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Create rest emitter
rest_emitter = DataHubRestEmitter(gms_server="http://localhost:8080")
# Create rest emitter
import os
gms_server = os.getenv("GMS_SERVER", "http://localhost:8080")
rest_emitter = DataHubRestEmitter(gms_server=gms_server)


dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make dataset URN parameters configurable.

Hardcoding the dataset URN parameters can be problematic for different environments. Consider using environment variables or a configuration file.

- dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
+ platform = os.getenv("DATASET_PLATFORM", "hive")
+ name = os.getenv("DATASET_NAME", "fct_users_created")
+ env = os.getenv("DATASET_ENV", "PROD")
+ dataset_urn = make_dataset_urn(platform=platform, name=name, env=env)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
platform = os.getenv("DATASET_PLATFORM", "hive")
name = os.getenv("DATASET_NAME", "fct_users_created")
env = os.getenv("DATASET_ENV", "PROD")
dataset_urn = make_dataset_urn(platform=platform, name=name, env=env)



for patch_mcp in (
DatasetPatchBuilder(dataset_urn)
.add_structured_property("io.acryl.dataManagement.replicationSLA", 12)
.build()
):
rest_emitter.emit(patch_mcp)
Comment on lines +16 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make structured property name and value configurable.

Hardcoding the structured property name and value can be problematic for different use cases. Consider using environment variables or a configuration file.

- for patch_mcp in (
-    DatasetPatchBuilder(dataset_urn)
-    .add_structured_property("io.acryl.dataManagement.replicationSLA", 12)
-    .build()
- ):
+ structured_property = os.getenv("STRUCTURED_PROPERTY", "io.acryl.dataManagement.replicationSLA")
+ structured_property_value = int(os.getenv("STRUCTURED_PROPERTY_VALUE", 12))
+ for patch_mcp in (
+    DatasetPatchBuilder(dataset_urn)
+    .add_structured_property(structured_property, structured_property_value)
+    .build()
+ ):
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for patch_mcp in (
DatasetPatchBuilder(dataset_urn)
.add_structured_property("io.acryl.dataManagement.replicationSLA", 12)
.build()
):
rest_emitter.emit(patch_mcp)
import os
structured_property = os.getenv("STRUCTURED_PROPERTY", "io.acryl.dataManagement.replicationSLA")
structured_property_value = int(os.getenv("STRUCTURED_PROPERTY_VALUE", 12))
for patch_mcp in (
DatasetPatchBuilder(dataset_urn)
.add_structured_property(structured_property, structured_property_value)
.build()
):
rest_emitter.emit(patch_mcp)



log.info(f"Added cluster_name, retention_time properties to dataset {dataset_urn}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import logging

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.specific.dataset import DatasetPatchBuilder

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

# Create rest emitter
rest_emitter = DataHubRestEmitter(gms_server="http://localhost:8080")
Comment on lines +10 to +11
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make GMS server URL configurable.

Hardcoding the GMS server URL can be problematic for different environments. Consider using an environment variable or a configuration file.

- rest_emitter = DataHubRestEmitter(gms_server="http://localhost:8080")
+ import os
+ gms_server = os.getenv("GMS_SERVER", "http://localhost:8080")
+ rest_emitter = DataHubRestEmitter(gms_server=gms_server)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Create rest emitter
rest_emitter = DataHubRestEmitter(gms_server="http://localhost:8080")
# Create rest emitter
import os
gms_server = os.getenv("GMS_SERVER", "http://localhost:8080")
rest_emitter = DataHubRestEmitter(gms_server=gms_server)


dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make dataset URN parameters configurable.

Hardcoding the dataset URN parameters can be problematic for different environments. Consider using environment variables or a configuration file.

- dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
+ platform = os.getenv("DATASET_PLATFORM", "hive")
+ name = os.getenv("DATASET_NAME", "fct_users_created")
+ env = os.getenv("DATASET_ENV", "PROD")
+ dataset_urn = make_dataset_urn(platform=platform, name=name, env=env)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
platform = os.getenv("DATASET_PLATFORM", "hive")
name = os.getenv("DATASET_NAME", "fct_users_created")
env = os.getenv("DATASET_ENV", "PROD")
dataset_urn = make_dataset_urn(platform=platform, name=name, env=env)



for patch_mcp in (
DatasetPatchBuilder(dataset_urn)
.remove_structured_property("io.acryl.dataManagement.replicationSLA")
.build()
):
rest_emitter.emit(patch_mcp)
Comment on lines +16 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make structured property name configurable.

Hardcoding the structured property name can be problematic for different use cases. Consider using an environment variable or a configuration file.

- for patch_mcp in (
-    DatasetPatchBuilder(dataset_urn)
-    .remove_structured_property("io.acryl.dataManagement.replicationSLA")
-    .build()
- ):
+ structured_property = os.getenv("STRUCTURED_PROPERTY", "io.acryl.dataManagement.replicationSLA")
+ for patch_mcp in (
+    DatasetPatchBuilder(dataset_urn)
+    .remove_structured_property(structured_property)
+    .build()
+ ):
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for patch_mcp in (
DatasetPatchBuilder(dataset_urn)
.remove_structured_property("io.acryl.dataManagement.replicationSLA")
.build()
):
rest_emitter.emit(patch_mcp)
import os
structured_property = os.getenv("STRUCTURED_PROPERTY", "io.acryl.dataManagement.replicationSLA")
for patch_mcp in (
DatasetPatchBuilder(dataset_urn)
.remove_structured_property(structured_property)
.build()
):
rest_emitter.emit(patch_mcp)



log.info(f"Added cluster_name, retention_time properties to dataset {dataset_urn}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import logging

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.specific.dataset import DatasetPatchBuilder

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

# Create rest emitter
rest_emitter = DataHubRestEmitter(gms_server="http://localhost:8080")
Comment on lines +10 to +11
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make GMS server URL configurable.

Hardcoding the GMS server URL can be problematic for different environments. Consider using an environment variable or a configuration file.

- rest_emitter = DataHubRestEmitter(gms_server="http://localhost:8080")
+ import os
+ gms_server = os.getenv("GMS_SERVER", "http://localhost:8080")
+ rest_emitter = DataHubRestEmitter(gms_server=gms_server)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Create rest emitter
rest_emitter = DataHubRestEmitter(gms_server="http://localhost:8080")
# Create rest emitter
import os
gms_server = os.getenv("GMS_SERVER", "http://localhost:8080")
rest_emitter = DataHubRestEmitter(gms_server=gms_server)


dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make dataset URN parameters configurable.

Hardcoding the dataset URN parameters can be problematic for different environments. Consider using environment variables or a configuration file.

- dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
+ platform = os.getenv("DATASET_PLATFORM", "hive")
+ name = os.getenv("DATASET_NAME", "fct_users_created")
+ env = os.getenv("DATASET_ENV", "PROD")
+ dataset_urn = make_dataset_urn(platform=platform, name=name, env=env)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
import os
dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
platform = os.getenv("DATASET_PLATFORM", "hive")
name = os.getenv("DATASET_NAME", "fct_users_created")
env = os.getenv("DATASET_ENV", "PROD")
dataset_urn = make_dataset_urn(platform=platform, name=name, env=env)



for patch_mcp in (
DatasetPatchBuilder(dataset_urn)
.set_structured_property("io.acryl.dataManagement.replicationSLA", 120)
.build()
):
rest_emitter.emit(patch_mcp)
Comment on lines +16 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make structured property name and value configurable.

Hardcoding the structured property name and value can be problematic for different use cases. Consider using environment variables or a configuration file.

- for patch_mcp in (
-    DatasetPatchBuilder(dataset_urn)
-    .set_structured_property("io.acryl.dataManagement.replicationSLA", 120)
-    .build()
- ):
+ structured_property = os.getenv("STRUCTURED_PROPERTY", "io.acryl.dataManagement.replicationSLA")
+ structured_property_value = int(os.getenv("STRUCTURED_PROPERTY_VALUE", 120))
+ for patch_mcp in (
+    DatasetPatchBuilder(dataset_urn)
+    .set_structured_property(structured_property, structured_property_value)
+    .build()
+ ):
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for patch_mcp in (
DatasetPatchBuilder(dataset_urn)
.set_structured_property("io.acryl.dataManagement.replicationSLA", 120)
.build()
):
rest_emitter.emit(patch_mcp)
import os
structured_property = os.getenv("STRUCTURED_PROPERTY", "io.acryl.dataManagement.replicationSLA")
structured_property_value = int(os.getenv("STRUCTURED_PROPERTY_VALUE", 120))
for patch_mcp in (
DatasetPatchBuilder(dataset_urn)
.set_structured_property(structured_property, structured_property_value)
.build()
):
rest_emitter.emit(patch_mcp)



log.info(f"Added cluster_name, retention_time properties to dataset {dataset_urn}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import logging

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter

# Imports for metadata model classes
from datahub.metadata.schema_classes import (
PropertyValueClass,
StructuredPropertyDefinitionClass,
)
from datahub.metadata.urns import StructuredPropertyUrn

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

# Create rest emitter
rest_emitter = DatahubRestEmitter(gms_server="http://localhost:8080")

# first, let's make an open ended structured property that allows one text value
text_property_urn = StructuredPropertyUrn("openTextProperty")
text_property_definition = StructuredPropertyDefinitionClass(
qualifiedName="io.acryl.openTextProperty",
displayName="Open Text Property",
valueType="urn:li:dataType:datahub.string",
cardinality="SINGLE",
entityTypes=[
"urn:li:entityType:datahub.dataset",
"urn:li:entityType:datahub.container",
],
description="This structured property allows a signle open ended response as a value",
immutable=False,
)

event_prop_1: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityUrn=str(text_property_urn),
aspect=text_property_definition,
)
rest_emitter.emit(event_prop_1)
Comment on lines +19 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the typo in the description.

There is a typo in the description of the text_property_definition. "signle" should be "single".

- description="This structured property allows a signle open ended response as a value",
+ description="This structured property allows a single open ended response as a value",
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# first, let's make an open ended structured property that allows one text value
text_property_urn = StructuredPropertyUrn("openTextProperty")
text_property_definition = StructuredPropertyDefinitionClass(
qualifiedName="io.acryl.openTextProperty",
displayName="Open Text Property",
valueType="urn:li:dataType:datahub.string",
cardinality="SINGLE",
entityTypes=[
"urn:li:entityType:datahub.dataset",
"urn:li:entityType:datahub.container",
],
description="This structured property allows a signle open ended response as a value",
immutable=False,
)
event_prop_1: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityUrn=str(text_property_urn),
aspect=text_property_definition,
)
rest_emitter.emit(event_prop_1)
# first, let's make an open ended structured property that allows one text value
text_property_urn = StructuredPropertyUrn("openTextProperty")
text_property_definition = StructuredPropertyDefinitionClass(
qualifiedName="io.acryl.openTextProperty",
displayName="Open Text Property",
valueType="urn:li:dataType:datahub.string",
cardinality="SINGLE",
entityTypes=[
"urn:li:entityType:datahub.dataset",
"urn:li:entityType:datahub.container",
],
description="This structured property allows a single open ended response as a value",
immutable=False,
)
event_prop_1: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityUrn=str(text_property_urn),
aspect=text_property_definition,
)
rest_emitter.emit(event_prop_1)


# next, let's make a property that allows for multiple datahub entity urns as values
# This example property could be used to reference other users or groups in datahub
urn_property_urn = StructuredPropertyUrn("dataSteward")
urn_property_definition = StructuredPropertyDefinitionClass(
qualifiedName="io.acryl.dataManagement.dataSteward",
displayName="Data Steward",
valueType="urn:li:dataType:datahub.urn",
cardinality="MULTIPLE",
entityTypes=["urn:li:entityType:datahub.dataset"],
description="The data stewards of this asset are in charge of ensuring data cleanliness and governance",
immutable=True,
typeQualifier={
"allowedTypes": [
"urn:li:entityType:datahub.corpuser",
"urn:li:entityType:datahub.corpGroup",
]
}, # this line ensures only user or group urns can be assigned as values
)

event_prop_2: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityUrn=str(urn_property_urn),
aspect=urn_property_definition,
)
rest_emitter.emit(event_prop_2)

# finally, let's make a single select number property with a few allowed options
number_property_urn = StructuredPropertyUrn("replicationSLA")
number_property_definition = StructuredPropertyDefinitionClass(
qualifiedName="io.acryl.dataManagement.replicationSLA",
displayName="Retention Time",
valueType="urn:li:dataType:datahub.number",
cardinality="SINGLE",
entityTypes=[
"urn:li:entityType:datahub.dataset",
"urn:li:entityType:datahub.dataFlow",
],
description="SLA for how long data can be delayed before replicating to the destination cluster",
immutable=False,
allowedValues=[
PropertyValueClass(
value=30,
description="30 days, usually reserved for datasets that are ephemeral and contain pii",
),
PropertyValueClass(
value=90,
description="Use this for datasets that drive monthly reporting but contain pii",
),
PropertyValueClass(
value=365,
description="Use this for non-sensitive data that can be retained for longer",
),
],
)

event_prop_3: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityUrn=str(number_property_urn),
aspect=number_property_definition,
)
rest_emitter.emit(event_prop_3)
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging
from typing import Union

from datahub.configuration.kafka import KafkaProducerConnectionConfig
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.metadata.urns import StructuredPropertyUrn
from datahub.specific.structured_property import StructuredPropertyPatchBuilder

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


# Get an emitter, either REST or Kafka, this example shows you both
def get_emitter() -> Union[DataHubRestEmitter, DatahubKafkaEmitter]:
USE_REST_EMITTER = True
if USE_REST_EMITTER:
gms_endpoint = "http://localhost:8080"
return DataHubRestEmitter(gms_server=gms_endpoint)
else:
kafka_server = "localhost:9092"
schema_registry_url = "http://localhost:8081"
return DatahubKafkaEmitter(
config=KafkaEmitterConfig(
connection=KafkaProducerConnectionConfig(
bootstrap=kafka_server, schema_registry_url=schema_registry_url
)
)
)


# input your unique structured property ID
property_urn = StructuredPropertyUrn("dataSteward")

with get_emitter() as emitter:
for patch_mcp in (
StructuredPropertyPatchBuilder(str(property_urn))
.set_display_name("test display name")
.set_cardinality("MULTIPLE")
.add_entity_type("urn:li:entityType:datahub.dataJob")
.build()
):
emitter.emit(patch_mcp)
Loading
Loading