Skip to content

Commit

Permalink
fix(cli): graph - get_aspect_v2 method fails to deserialize aspects c…
Browse files Browse the repository at this point in the history
…orrectly (#4971)
  • Loading branch information
shirshanka authored May 22, 2022
1 parent 87ac8a3 commit 0f7ff79
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 1 deletion.
5 changes: 4 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from datahub.configuration.common import ConfigModel, OperationalError
from datahub.emitter.mce_builder import Aspect
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.emitter.serialization_helper import post_json_transform
from datahub.metadata.schema_classes import (
DatasetUsageStatisticsClass,
GlobalTagsClass,
Expand Down Expand Up @@ -145,7 +146,9 @@ def get_aspect_v2(
aspect_type_name = record_schema.fullname.replace(".pegasus2avro", "")
aspect_json = response_json.get("aspect", {}).get(aspect_type_name)
if aspect_json:
return aspect_type.from_obj(aspect_json, tuples=True)
# need to apply a transform to the response to match rest.li and avro serialization
post_json_obj = post_json_transform(aspect_json)
return aspect_type.from_obj(post_json_obj)
else:
raise OperationalError(
f"Failed to find {aspect_type_name} in response {response_json}"
Expand Down
42 changes: 42 additions & 0 deletions smoke-test/tests/cli/datahub_graph_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import time

import pytest
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.metadata.schema_classes import SchemaMetadataClass, KafkaSchemaClass
from tests.utils import (
FRONTEND_ENDPOINT,
GMS_ENDPOINT,
delete_urns_from_file,
ingest_file_via_rest,
)


@pytest.fixture(scope="module", autouse=False)
def ingest_cleanup_data(request):
print("ingesting graph test data")
ingest_file_via_rest("tests/cli/graph_data.json")
yield
print("removing graph test data")
delete_urns_from_file("tests/cli/graph_data.json")


@pytest.mark.dependency()
def test_healthchecks(wait_for_healthchecks):
# Call to wait_for_healthchecks fixture will do the actual functionality.
pass


@pytest.mark.dependency(depends=["test_healthchecks"])
def test_get_aspect_v2(frontend_session, ingest_cleanup_data):
graph: DataHubGraph = DataHubGraph(DatahubClientConfig())
urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,test-rollback,PROD)"
schema_metadata: SchemaMetadataClass = graph.get_aspect_v2(
urn, aspect="schemaMetadata", aspect_type=SchemaMetadataClass
)

assert schema_metadata is not None
assert schema_metadata.platform == "urn:li:dataPlatform:kafka"
assert isinstance(schema_metadata.platformSchema, KafkaSchemaClass)
k_schema: KafkaSchemaClass = schema_metadata.platformSchema
assert k_schema.documentSchema == "{\"type\":\"record\",\"name\":\"SampleKafkaSchema\",\"namespace\":\"com.linkedin.dataset\",\"doc\":\"Sample Kafka dataset\",\"fields\":[{\"name\":\"field_foo\",\"type\":[\"string\"]},{\"name\":\"field_bar\",\"type\":[\"boolean\"]}]}"

143 changes: 143 additions & 0 deletions smoke-test/tests/cli/graph_data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
[
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,test-rollback,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {
"paths": ["/prod/kafka/SampleKafkaDataset"]
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"description": null,
"uri": null,
"tags": [],
"customProperties": {
"prop1": "fakeprop",
"prop2": "pikachu"
}
}
},
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:jdoe",
"type": "DATAOWNER",
"source": null
},
{
"owner": "urn:li:corpuser:datahub",
"type": "DATAOWNER",
"source": null
}
],
"lastModified": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe",
"impersonator": null
}
}
},
{
"com.linkedin.pegasus2avro.common.InstitutionalMemory": {
"elements": [
{
"url": "https://www.linkedin.com",
"description": "Sample doc",
"createStamp": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe",
"impersonator": null
}
}
]
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "SampleKafkaSchema",
"platform": "urn:li:dataPlatform:kafka",
"version": 0,
"created": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe",
"impersonator": null
},
"lastModified": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe",
"impersonator": null
},
"deleted": null,
"dataset": null,
"cluster": null,
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.KafkaSchema": {
"documentSchema": "{\"type\":\"record\",\"name\":\"SampleKafkaSchema\",\"namespace\":\"com.linkedin.dataset\",\"doc\":\"Sample Kafka dataset\",\"fields\":[{\"name\":\"field_foo\",\"type\":[\"string\"]},{\"name\":\"field_bar\",\"type\":[\"boolean\"]}]}"
}
},
"fields": [
{
"fieldPath": "[version=2.0].[type=boolean].field_foo_2",
"jsonPath": null,
"nullable": false,
"description": {
"string": "Foo field description"
},
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.BooleanType": {}
}
},
"nativeDataType": "varchar(100)",
"globalTags": {
"tags": [{ "tag": "urn:li:tag:NeedsDocumentation" }]
},
"recursive": false
},
{
"fieldPath": "[version=2.0].[type=boolean].field_bar",
"jsonPath": null,
"nullable": false,
"description": {
"string": "Bar field description"
},
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.BooleanType": {}
}
},
"nativeDataType": "boolean",
"recursive": false
},
{
"fieldPath": "[version=2.0].[key=True].[type=int].id",
"jsonPath": null,
"nullable": false,
"description": {
"string": "Id specifying which partition the message should go to"
},
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.BooleanType": {}
}
},
"nativeDataType": "boolean",
"recursive": false
}
],
"primaryKeys": null,
"foreignKeysSpecs": null
}
}
]
}
},
"proposedDelta": null
}
]

0 comments on commit 0f7ff79

Please sign in to comment.