diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py index 007b7487cb6a4..8572b2378a3bb 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py @@ -1,9 +1,9 @@ import concurrent.futures import contextlib +import dataclasses import functools import logging import uuid -from dataclasses import dataclass from enum import auto from typing import Optional, Union @@ -29,6 +29,7 @@ MetadataChangeProposal, ) from datahub.utilities.advanced_thread_executor import PartitionExecutor +from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.server_config_util import set_gms_config logger = logging.getLogger(__name__) @@ -44,15 +45,17 @@ class DatahubRestSinkConfig(DatahubClientConfig): # These only apply in async mode. max_threads: int = 15 - max_pending_requests: int = 500 + max_pending_requests: int = 2000 -@dataclass +@dataclasses.dataclass class DataHubRestSinkReport(SinkReport): - max_threads: int = -1 - gms_version: str = "" + max_threads: Optional[int] = None + gms_version: Optional[str] = None pending_requests: int = 0 + main_thread_blocking_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer) + def compute_stats(self) -> None: super().compute_stats() @@ -105,7 +108,7 @@ def __post_init__(self) -> None: self.report.gms_version = ( gms_config.get("versions", {}) .get("acryldata/datahub", {}) - .get("version", "") + .get("version", None) ) self.report.max_threads = self.config.max_threads logger.debug("Setting env variables to override config") @@ -189,25 +192,28 @@ def write_record_async( ], write_callback: WriteCallback, ) -> None: - record = record_envelope.record - if self.config.mode == SyncOrAsync.ASYNC: - partition_key = _get_partition_key(record_envelope) - self.executor.submit( - partition_key, - self._emit_wrapper, - record, - done_callback=functools.partial( - self._write_done_callback, record_envelope, write_callback - ), - ) - self.report.pending_requests += 1 - else: - # execute synchronously - try: - self._emit_wrapper(record) - write_callback.on_success(record_envelope, success_metadata={}) - except Exception as e: - write_callback.on_failure(record_envelope, e, failure_metadata={}) + # Because the default is async mode and most sources are slower than the sink, this + # should only have a high value if the sink is actually a bottleneck. + with self.report.main_thread_blocking_timer: + record = record_envelope.record + if self.config.mode == SyncOrAsync.ASYNC: + partition_key = _get_partition_key(record_envelope) + self.executor.submit( + partition_key, + self._emit_wrapper, + record, + done_callback=functools.partial( + self._write_done_callback, record_envelope, write_callback + ), + ) + self.report.pending_requests += 1 + else: + # execute synchronously + try: + self._emit_wrapper(record) + write_callback.on_success(record_envelope, success_metadata={}) + except Exception as e: + write_callback.on_failure(record_envelope, e, failure_metadata={}) def emit_async( self,