Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bigquery): add rate limiting for api calls made #4967

Merged
merged 2 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def get_long_description():
"termcolor>=1.0.0",
"types-termcolor>=1.0.0",
"psutil>=5.8.0",
"ratelimiter",
# Markupsafe breaking change broke Jinja and some other libs
# Pinning it to a version which works even though we are not using explicitly
# https://github.com/aws/aws-sam-cli/issues/3661
Expand Down
19 changes: 15 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dateutil.relativedelta import relativedelta
from google.cloud.bigquery import Client as BigQueryClient
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from ratelimiter import RateLimiter
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine.reflection import Inspector

Expand Down Expand Up @@ -456,9 +457,15 @@ def _get_bigquery_log_entries(
f"Start loading log entries from BigQuery start_time={start_time} and end_time={end_time}"
)
for client in clients:
entries = client.list_entries(
filter_=filter, page_size=self.config.log_page_size
)
if self.config.rate_limit:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think rate-limiting should happen on _create_lineage_map and not here which only returns a generator.

with RateLimiter(max_calls=self.config.requests_per_min, period=60):
entries = client.list_entries(
filter_=filter, page_size=self.config.log_page_size
)
else:
entries = client.list_entries(
filter_=filter, page_size=self.config.log_page_size
)
for entry in entries:
self.report.num_total_log_entries += 1
yield entry
Expand Down Expand Up @@ -519,7 +526,11 @@ def _get_exported_bigquery_audit_metadata(
f"Finished loading log entries from BigQueryAuditMetadata in {dataset}"
)

yield from query_job
if self.config.rate_limit:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here like above

with RateLimiter(max_calls=self.config.requests_per_min, period=60):
yield from query_job
else:
yield from query_job

# Currently we only parse JobCompleted events but in future we would want to parse other
# events to also create field level lineage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from google.cloud.bigquery import Client as BigQueryClient
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from more_itertools import partition
from ratelimiter import RateLimiter

import datahub.emitter.mce_builder as builder
from datahub.configuration.time_window_config import get_time_bucket
Expand Down Expand Up @@ -817,7 +818,11 @@ def _get_exported_bigquery_audit_metadata(
logger.info(
f"Finished loading log entries from BigQueryAuditMetadata in {dataset}"
)
yield from query_job
if self.config.rate_limit:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think rate-limiting should not happen here but at the point where the returned generator is used.
Ther results are yielded from here which means it will this will be only get called when somebody iterate on the generator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, the ratelimiting should at _create_lineage_map

with RateLimiter(max_calls=self.config.requests_per_min, period=60):
yield from query_job
else:
yield from query_job

def _get_entry_timestamp(
self, entry: Union[AuditLogEntry, BigQueryAuditMetadata]
Expand Down Expand Up @@ -884,11 +889,16 @@ def _get_bigquery_log_entries_via_gcp_logging(
] = list()
for client in clients:
try:
list_entries: Iterable[
Union[AuditLogEntry, BigQueryAuditMetadata]
] = client.list_entries(
filter_=filter, page_size=self.config.log_page_size
)
list_entries: Iterable[Union[AuditLogEntry, BigQueryAuditMetadata]]
if self.config.rate_limit:
with RateLimiter(max_calls=self.config.requests_per_min, period=60):
list_entries = client.list_entries(
filter_=filter, page_size=self.config.log_page_size
)
else:
list_entries = client.list_entries(
filter_=filter, page_size=self.config.log_page_size
)
list_entry_generators_across_clients.append(list_entries)
except Exception as e:
logger.warning(
Expand Down
13 changes: 13 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source_config/bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pydantic

from datahub.configuration.common import ConfigModel


class BigQueryBaseConfig(ConfigModel):
rate_limit: bool = pydantic.Field(
default=False, description="Should we rate limit reqeusts made to API."
)
requests_per_min: int = pydantic.Field(
default=60,
description="Used to control number of API calls made per min. Only used when `rate_limit` is set to `True`.",
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
from datahub.configuration.common import ConfigurationError
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.ingestion.source.sql.sql_common import SQLAlchemyConfig
from datahub.ingestion.source_config.bigquery import BigQueryBaseConfig
from datahub.ingestion.source_config.usage.bigquery_usage import BigQueryCredential

logger = logging.getLogger(__name__)


class BigQueryConfig(BaseTimeWindowConfig, SQLAlchemyConfig):
class BigQueryConfig(BigQueryBaseConfig, BaseTimeWindowConfig, SQLAlchemyConfig):
scheme: str = "bigquery"
project_id: Optional[str] = pydantic.Field(
default=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.configuration.source_common import DatasetSourceConfigBase
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.ingestion.source_config.bigquery import BigQueryBaseConfig

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -57,7 +58,7 @@ def create_credential_temp_file(self) -> str:
return fp.name


class BigQueryUsageConfig(DatasetSourceConfigBase, BaseUsageConfig):
class BigQueryUsageConfig(BigQueryBaseConfig, DatasetSourceConfigBase, BaseUsageConfig):
projects: Optional[List[str]] = pydantic.Field(
default=None,
description="List of project ids to ingest usage from. If not specified, will infer from environment.",
Expand Down