Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest) Allow ingestion of Elasticsearch index template #5444

Merged
merged 2 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions metadata-ingestion/archived/source_docs/elastic_search.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ source:
index_pattern:
allow: [".*some_index_name_pattern*"]
deny: [".*skip_index_name_pattern*"]
ingest_index_templates: False
index_template_pattern:
allow: [".*some_index_template_name_pattern*"]

sink:
# sink configs
Expand All @@ -51,17 +54,20 @@ sink:
Note that a `.` is used to denote nested fields in the YAML recipe.


| Field | Required | Default | Description |
| --------------------------- | -------- |--------------------|---------------------------------------------------------------|
| `host` | ✅ | `"localhost:9092"` | The elastic search host URI. |
| `username` | | None | The username credential. |
| `password` | | None | The password credential. |
| `url_prefix` | | "" | There are cases where an enterprise would have multiple elastic search clusters. One way for them to manage is to have a single endpoint for all the elastic search clusters and use url_prefix for routing requests to different clusters. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `platform_instance` | | None | The Platform instance to use while constructing URNs. |
| `index_pattern.allow` | | | List of regex patterns for indexes to include in ingestion. |
| `index_pattern.deny` | | | List of regex patterns for indexes to exclude from ingestion. |
| `index_pattern.ignoreCase` | | `True` | Whether regex matching should ignore case or not |
| Field | Required | Default | Description |
|--------------------------------| -------- |--------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `host` | ✅ | `"localhost:9092"` | The elastic search host URI. |
| `username` | | None | The username credential. |
| `password` | | None | The password credential. |
| `url_prefix` | | "" | There are cases where an enterprise would have multiple elastic search clusters. One way for them to manage is to have a single endpoint for all the elastic search clusters and use url_prefix for routing requests to different clusters. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `platform_instance` | | None | The Platform instance to use while constructing URNs. |
| `index_pattern.allow` | | | List of regex patterns for indexes to include in ingestion. |
| `index_pattern.deny` | | | List of regex patterns for indexes to exclude from ingestion. |
| `index_pattern.ignoreCase` | | `True` | Whether regex matching should ignore case or not |
| `ingest_index_templates` | | `False` | Whether index templates should be ingested |
| `index_template_pattern.allow` | | | List of regex patterns for index templates to include in ingestion. |
| `index_template_pattern.deny` | | | List of regex patterns for index templates to exclude from ingestion. |

## Compatibility

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ source:
index_pattern:
allow: [".*some_index_name_pattern*"]
deny: [".*skip_index_name_pattern*"]
ingest_index_templates: False
index_template_pattern:
allow: [".*some_index_template_name_pattern*"]

sink:
# sink configs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ source:
client_key: "./path/client.key"
ssl_assert_hostname: False
ssl_assert_fingerprint: "./path/cert.fingerprint"

ingest_index_templates: False
# index_template_pattern:
# allow:
# - "^.+"

sink:
type: "datahub-rest"
config:
Expand Down
100 changes: 74 additions & 26 deletions metadata-ingestion/src/datahub/ingestion/source/elastic_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ class ElasticToSchemaFieldConverter:
"match_only_text": StringTypeClass,
"completion": StringTypeClass,
"search_as_you_type": StringTypeClass,
"ip": StringTypeClass,
# Records
"object": RecordTypeClass,
"flattened": RecordTypeClass,
"nested": RecordTypeClass,
"geo_point": RecordTypeClass,
# Arrays
"histogram": ArrayTypeClass,
"aggregate_metric_double": ArrayTypeClass,
Expand Down Expand Up @@ -224,6 +226,13 @@ class ElasticsearchSourceConfig(DatasetSourceConfigBase):
default=AllowDenyPattern(allow=[".*"], deny=["^_.*", "^ilm-history.*"]),
description="regex patterns for indexes to filter in ingestion.",
)
ingest_index_templates: bool = Field(
default=False, description="Ingests ES index templates if enabled."
)
index_template_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern(allow=[".*"], deny=["^_.*"]),
description="The regex patterns for filtering index templates to ingest.",
)

@validator("host")
def host_colon_port_comma(cls, host_val: str) -> str:
Expand Down Expand Up @@ -304,7 +313,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
self.report.report_index_scanned(index)

if self.source_config.index_pattern.allowed(index):
for mcp in self._extract_mcps(index):
for mcp in self._extract_mcps(index, is_index=True):
wu = MetadataWorkUnit(id=f"index-{index}", mcp=mcp)
self.report.report_workunit(wu)
yield wu
Expand All @@ -315,6 +324,14 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
wu = MetadataWorkUnit(id=f"index-{index}", mcp=mcp)
self.report.report_workunit(wu)
yield wu
if self.source_config.ingest_index_templates:
templates = self.client.indices.get_template()
for template in templates:
if self.source_config.index_template_pattern.allowed(template):
for mcp in self._extract_mcps(template, is_index=False):
wu = MetadataWorkUnit(id=f"template-{template}", mcp=mcp)
self.report.report_workunit(wu)
yield wu

def _get_data_stream_index_count_mcps(
self,
Expand All @@ -336,19 +353,26 @@ def _get_data_stream_index_count_mcps(
changeType=ChangeTypeClass.UPSERT,
)

def _extract_mcps(self, index: str) -> Iterable[MetadataChangeProposalWrapper]:
logger.debug(f"index = {index}")
raw_index = self.client.indices.get(index=index)
raw_index_metadata = raw_index[index]

# 0. Dedup data_streams.
data_stream = raw_index_metadata.get("data_stream")
if data_stream:
index = data_stream
self.data_stream_partition_count[index] += 1
if self.data_stream_partition_count[index] > 1:
# This is a duplicate, skip processing it further.
return
def _extract_mcps(
self, index: str, is_index: bool = True
) -> Iterable[MetadataChangeProposalWrapper]:
logger.debug(f"index='{index}', is_index={is_index}")

if is_index:
raw_index = self.client.indices.get(index=index)
raw_index_metadata = raw_index[index]

# 0. Dedup data_streams.
data_stream = raw_index_metadata.get("data_stream")
if data_stream:
index = data_stream
self.data_stream_partition_count[index] += 1
if self.data_stream_partition_count[index] > 1:
# This is a duplicate, skip processing it further.
return
else:
raw_index = self.client.indices.get_template(name=index)
raw_index_metadata = raw_index[index]

# 1. Construct and emit the schemaMetadata aspect
# 1.1 Generate the schema fields from ES mappings.
Expand Down Expand Up @@ -401,23 +425,47 @@ def _extract_mcps(self, index: str) -> Iterable[MetadataChangeProposalWrapper]:
entityUrn=dataset_urn,
aspectName="subTypes",
aspect=SubTypesClass(
typeNames=["Index" if not data_stream else "DataStream"]
typeNames=[
"Index Template"
if not is_index
else "Index"
if not data_stream
else "Datastream"
]
),
changeType=ChangeTypeClass.UPSERT,
)

# 4. Construct and emit properties if needed
index_aliases = raw_index_metadata.get("aliases", {}).keys()
# 4. Construct and emit properties if needed. Will attempt to get the following properties
custom_properties: Dict[str, str] = {}
# 4.1 aliases
index_aliases: List[str] = raw_index_metadata.get("aliases", {}).keys()
if index_aliases:
yield MetadataChangeProposalWrapper(
entityType="dataset",
entityUrn=dataset_urn,
aspectName="datasetProperties",
aspect=DatasetPropertiesClass(
customProperties={"aliases": ",".join(index_aliases)}
),
changeType=ChangeTypeClass.UPSERT,
)
custom_properties["aliases"] = ",".join(index_aliases)
# 4.2 index_patterns
index_patterns: List[str] = raw_index_metadata.get("index_patterns", [])
if index_patterns:
custom_properties["index_patterns"] = ",".join(index_patterns)

# 4.3 number_of_shards
index_settings: Dict[str, Any] = raw_index_metadata.get("settings", {}).get(
"index", {}
)
num_shards: str = index_settings.get("number_of_shards", "")
if num_shards:
custom_properties["num_shards"] = num_shards
# 4.4 number_of_replicas
num_replicas: str = index_settings.get("number_of_replicas", "")
if num_replicas:
custom_properties["num_replicas"] = num_replicas

yield MetadataChangeProposalWrapper(
entityType="dataset",
entityUrn=dataset_urn,
aspectName="datasetProperties",
aspect=DatasetPropertiesClass(customProperties=custom_properties),
changeType=ChangeTypeClass.UPSERT,
)

# 5. Construct and emit platform instance aspect
if self.source_config.platform_instance:
Expand Down