From 666804ce7960478df07c7c7850f37ee59a02a8ac Mon Sep 17 00:00:00 2001 From: Dylan Baker Date: Wed, 17 Apr 2024 11:26:40 +0100 Subject: [PATCH 1/3] Updated spectacles modules for assert concurrency configuration --- spectacles/cli.py | 16 +++++++++++++++- spectacles/runner.py | 4 +++- spectacles/validators/data_test.py | 6 ++++-- 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/spectacles/cli.py b/spectacles/cli.py index 349a7536..3d07f510 100644 --- a/spectacles/cli.py +++ b/spectacles/cli.py @@ -27,6 +27,7 @@ from spectacles.logger import set_file_handler from spectacles.runner import Runner from spectacles.utils import log_duration +from spectacles.validators.data_test import QUERY_SLOT_LIMIT __version__ = importlib.metadata.version("spectacles") @@ -709,6 +710,16 @@ def _build_assert_subparser( _build_validator_subparser(subparser_action, subparser) _build_select_subparser(subparser_action, subparser) + subparser.add_argument( + "--concurrency", + type=int, + default=QUERY_SLOT_LIMIT, + help=( + "Specify the number of concurrent queries you want to have running " + f"against your data warehouse. The default is {QUERY_SLOT_LIMIT}." + ), + ) + def _build_content_subparser( subparser_action: argparse._SubParsersAction, # type: ignore[type-arg] @@ -896,6 +907,7 @@ async def run_assert( api_version: float, remote_reset: bool, pin_imports: Dict[str, str], + concurrency: int, ) -> None: # Don't trust env to ignore .netrc credentials async_client = httpx.AsyncClient(trust_env=False) @@ -905,7 +917,9 @@ async def run_assert( ) runner = Runner(client, project, remote_reset, pin_imports) - results = await runner.validate_data_tests(ref, filters) + results = await runner.validate_data_tests( + ref, filters, query_slot_limit=concurrency + ) finally: await async_client.aclose() diff --git a/spectacles/runner.py b/spectacles/runner.py index a53a1dd4..16b4566a 100644 --- a/spectacles/runner.py +++ b/spectacles/runner.py @@ -17,6 +17,7 @@ LookMLValidator, SqlValidator, ) +from spectacles.validators.data_test import QUERY_SLOT_LIMIT from spectacles.validators.sql import ( DEFAULT_CHUNK_SIZE, DEFAULT_QUERY_CONCURRENCY, @@ -444,6 +445,7 @@ async def validate_data_tests( self, ref: Optional[str] = None, filters: Optional[List[str]] = None, + query_slot_limit: int = QUERY_SLOT_LIMIT, ) -> JsonDict: if filters is None: filters = ["*/*"] @@ -462,7 +464,7 @@ async def validate_data_tests( f"{'explore' if explore_count == 1 else 'explores'}" ) tests = await validator.get_tests(project) - await validator.validate(tests) + await validator.validate(tests, query_slot_limit) results = project.get_results(validator="data_test") return results diff --git a/spectacles/validators/data_test.py b/spectacles/validators/data_test.py index 065183ec..61a58304 100644 --- a/spectacles/validators/data_test.py +++ b/spectacles/validators/data_test.py @@ -94,9 +94,11 @@ async def get_tests(self, project: Project) -> List[DataTest]: return selected_tests - async def validate(self, tests: List[DataTest]) -> List[DataTestError]: + async def validate( + self, tests: List[DataTest], query_slot_limit: int = QUERY_SLOT_LIMIT + ) -> List[DataTestError]: data_test_errors: List[DataTestError] = [] - query_slot = asyncio.Semaphore(QUERY_SLOT_LIMIT) + query_slot = asyncio.Semaphore(query_slot_limit) async def run_test(test: DataTest, query_slot: asyncio.Semaphore) -> None: async with query_slot: From 587f1dcdce10828ddb7edef35b38beed42e9b1f3 Mon Sep 17 00:00:00 2001 From: Dylan Baker Date: Wed, 17 Apr 2024 11:31:59 +0100 Subject: [PATCH 2/3] fix tests --- spectacles/cli.py | 1 + 1 file changed, 1 insertion(+) diff --git a/spectacles/cli.py b/spectacles/cli.py index 3d07f510..db5bc1d4 100644 --- a/spectacles/cli.py +++ b/spectacles/cli.py @@ -342,6 +342,7 @@ def main() -> None: api_version=args.api_version, remote_reset=args.remote_reset, pin_imports=pin_imports, + concurrency=args.concurrency, ) ) elif args.command == "content": From 0a8e7d3425fe938b6c6a01d43d01c2ff6bcec346 Mon Sep 17 00:00:00 2001 From: Dylan Baker Date: Thu, 18 Apr 2024 11:11:06 +0100 Subject: [PATCH 3/3] rename to concurrency --- spectacles/cli.py | 10 ++++------ spectacles/runner.py | 6 +++--- spectacles/validators/data_test.py | 8 +++++--- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/spectacles/cli.py b/spectacles/cli.py index db5bc1d4..c18de683 100644 --- a/spectacles/cli.py +++ b/spectacles/cli.py @@ -27,7 +27,7 @@ from spectacles.logger import set_file_handler from spectacles.runner import Runner from spectacles.utils import log_duration -from spectacles.validators.data_test import QUERY_SLOT_LIMIT +from spectacles.validators.data_test import DATA_TEST_CONCURRENCY __version__ = importlib.metadata.version("spectacles") @@ -714,10 +714,10 @@ def _build_assert_subparser( subparser.add_argument( "--concurrency", type=int, - default=QUERY_SLOT_LIMIT, + default=DATA_TEST_CONCURRENCY, help=( "Specify the number of concurrent queries you want to have running " - f"against your data warehouse. The default is {QUERY_SLOT_LIMIT}." + f"against your data warehouse. The default is {DATA_TEST_CONCURRENCY}." ), ) @@ -918,9 +918,7 @@ async def run_assert( ) runner = Runner(client, project, remote_reset, pin_imports) - results = await runner.validate_data_tests( - ref, filters, query_slot_limit=concurrency - ) + results = await runner.validate_data_tests(ref, filters, concurrency) finally: await async_client.aclose() diff --git a/spectacles/runner.py b/spectacles/runner.py index 16b4566a..73c25e44 100644 --- a/spectacles/runner.py +++ b/spectacles/runner.py @@ -17,7 +17,7 @@ LookMLValidator, SqlValidator, ) -from spectacles.validators.data_test import QUERY_SLOT_LIMIT +from spectacles.validators.data_test import DATA_TEST_CONCURRENCY from spectacles.validators.sql import ( DEFAULT_CHUNK_SIZE, DEFAULT_QUERY_CONCURRENCY, @@ -445,7 +445,7 @@ async def validate_data_tests( self, ref: Optional[str] = None, filters: Optional[List[str]] = None, - query_slot_limit: int = QUERY_SLOT_LIMIT, + concurrency: int = DATA_TEST_CONCURRENCY, ) -> JsonDict: if filters is None: filters = ["*/*"] @@ -464,7 +464,7 @@ async def validate_data_tests( f"{'explore' if explore_count == 1 else 'explores'}" ) tests = await validator.get_tests(project) - await validator.validate(tests, query_slot_limit) + await validator.validate(tests, concurrency) results = project.get_results(validator="data_test") return results diff --git a/spectacles/validators/data_test.py b/spectacles/validators/data_test.py index 61a58304..8aa4cda8 100644 --- a/spectacles/validators/data_test.py +++ b/spectacles/validators/data_test.py @@ -6,7 +6,9 @@ from spectacles.exceptions import DataTestError, SpectaclesException from spectacles.lookml import Explore, Project -QUERY_SLOT_LIMIT = 15 # This is the per-user query limit in Looker for most instances +DATA_TEST_CONCURRENCY = ( + 15 # This is the per-user query limit in Looker for most instances +) @dataclass @@ -95,10 +97,10 @@ async def get_tests(self, project: Project) -> List[DataTest]: return selected_tests async def validate( - self, tests: List[DataTest], query_slot_limit: int = QUERY_SLOT_LIMIT + self, tests: List[DataTest], concurrency: int = DATA_TEST_CONCURRENCY ) -> List[DataTestError]: data_test_errors: List[DataTestError] = [] - query_slot = asyncio.Semaphore(query_slot_limit) + query_slot = asyncio.Semaphore(concurrency) async def run_test(test: DataTest, query_slot: asyncio.Semaphore) -> None: async with query_slot: