Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): measure sink bottlenecking #10628

Merged
merged 1 commit into from
Jun 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 31 additions & 25 deletions metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import concurrent.futures
import contextlib
import dataclasses
import functools
import logging
import uuid
from dataclasses import dataclass
from enum import auto
from typing import Optional, Union

Expand All @@ -29,6 +29,7 @@
MetadataChangeProposal,
)
from datahub.utilities.advanced_thread_executor import PartitionExecutor
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.server_config_util import set_gms_config

logger = logging.getLogger(__name__)
Expand All @@ -44,15 +45,17 @@ class DatahubRestSinkConfig(DatahubClientConfig):

# These only apply in async mode.
max_threads: int = 15
max_pending_requests: int = 500
max_pending_requests: int = 2000


@dataclass
@dataclasses.dataclass
class DataHubRestSinkReport(SinkReport):
max_threads: int = -1
gms_version: str = ""
max_threads: Optional[int] = None
gms_version: Optional[str] = None
pending_requests: int = 0

main_thread_blocking_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer)

def compute_stats(self) -> None:
super().compute_stats()

Expand Down Expand Up @@ -105,7 +108,7 @@ def __post_init__(self) -> None:
self.report.gms_version = (
gms_config.get("versions", {})
.get("acryldata/datahub", {})
.get("version", "")
.get("version", None)
)
self.report.max_threads = self.config.max_threads
logger.debug("Setting env variables to override config")
Expand Down Expand Up @@ -189,25 +192,28 @@ def write_record_async(
],
write_callback: WriteCallback,
) -> None:
record = record_envelope.record
if self.config.mode == SyncOrAsync.ASYNC:
partition_key = _get_partition_key(record_envelope)
self.executor.submit(
partition_key,
self._emit_wrapper,
record,
done_callback=functools.partial(
self._write_done_callback, record_envelope, write_callback
),
)
self.report.pending_requests += 1
else:
# execute synchronously
try:
self._emit_wrapper(record)
write_callback.on_success(record_envelope, success_metadata={})
except Exception as e:
write_callback.on_failure(record_envelope, e, failure_metadata={})
# Because the default is async mode and most sources are slower than the sink, this
# should only have a high value if the sink is actually a bottleneck.
with self.report.main_thread_blocking_timer:
record = record_envelope.record
if self.config.mode == SyncOrAsync.ASYNC:
partition_key = _get_partition_key(record_envelope)
self.executor.submit(
partition_key,
self._emit_wrapper,
record,
done_callback=functools.partial(
self._write_done_callback, record_envelope, write_callback
),
)
self.report.pending_requests += 1
else:
# execute synchronously
try:
self._emit_wrapper(record)
write_callback.on_success(record_envelope, success_metadata={})
except Exception as e:
write_callback.on_failure(record_envelope, e, failure_metadata={})

def emit_async(
self,
Expand Down
Loading