Skip to content

Commit

Permalink
Support max_concurrency for running Looker queries
Browse files Browse the repository at this point in the history
Situation was that once we ran Spectacles on our Looker instance
with 128 explores in our model, it killed the instance in a way that
it stopped responding and eventually crashed. I've diagnosed that it
was merely the number of running queries at a time that caused it.
It scheduled pretty much all 128 queries immediately.

So this adds a flag --max-concurrency which you can use to tweak
and set how much load you want to put on your Looker instance.

I've tried our staging server with 10 and it works nicely and doesn't
crash anything.
  • Loading branch information
Igor Serko committed Nov 14, 2019
1 parent a21092c commit 106fedd
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 50 deletions.
12 changes: 11 additions & 1 deletion spectacles/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def main():
args.api_version,
args.mode,
args.remote_reset,
args.max_concurrency,
)
elif args.command == "assert":
run_assert(
Expand Down Expand Up @@ -352,6 +353,14 @@ def _build_sql_subparser(
user's branch to the revision of the branch that is on the remote. \
WARNING: This will delete any uncommited changes in the user's workspace.",
)
subparser.add_argument(
"--max-concurrency",
default=0,
type=int,
help="Specify how many concurrent queries you want to have running \
against Looker. If not specified or 0, it will execute as many \
queries as you have explores/dimensions.",
)


def _build_assert_subparser(
Expand Down Expand Up @@ -414,6 +423,7 @@ def run_sql(
api_version,
mode,
remote_reset,
max_concurrency,
) -> None:
"""Runs and validates the SQL for each selected LookML dimension."""
runner = Runner(
Expand All @@ -426,7 +436,7 @@ def run_sql(
api_version,
remote_reset,
)
errors = runner.validate_sql(explores, mode)
errors = runner.validate_sql(explores, mode, max_concurrency)
if errors:
for error in sorted(errors, key=lambda x: x["path"]):
printer.print_sql_error(error)
Expand Down
6 changes: 4 additions & 2 deletions spectacles/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ def __init__(
)
self.client.update_session(project, branch, remote_reset)

def validate_sql(self, selectors: List[str], mode: str = "batch") -> List[dict]:
def validate_sql(
self, selectors: List[str], mode: str = "batch", max_concurrent: int = 0
) -> List[dict]:
sql_validator = SqlValidator(self.client, self.project)
sql_validator.build_project(selectors)
errors = sql_validator.validate(mode)
errors = sql_validator.validate(mode, max_concurrent)
return [vars(error) for error in errors]

def validate_data_tests(self):
Expand Down
137 changes: 90 additions & 47 deletions spectacles/validators.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import List, Sequence, DefaultDict, Tuple
from typing import Callable, List, Sequence, DefaultDict, Set, Tuple
import asyncio
import time
from abc import ABC, abstractmethod
from collections import defaultdict
import aiohttp
Expand Down Expand Up @@ -192,13 +191,16 @@ def build_project(self, selectors: List[str]) -> None:

self.project.models = selected_models

def validate(self, mode: str = "batch") -> List[SqlError]:
def validate(self, mode: str = "batch", max_concurrency: int = 0) -> List[SqlError]:
"""Queries selected explores and returns any errors.
Args:
batch: When true, runs one query per explore (using all dimensions). When
false, runs one query per dimension. Batch mode increases query speed
but can only return the first error encountered for each dimension.
max_concurrency: When > 0 then we ensure we only trigger <max_concurrency>
Looker queries at a time, we wait for them to finish before creating
new ones
Returns:
List[SqlError]: SqlErrors encountered while querying the explore.
Expand All @@ -211,9 +213,10 @@ def validate(self, mode: str = "batch") -> List[SqlError]:
f"[{mode} mode]"
)

errors = self._query(mode)
loop = asyncio.get_event_loop()
errors = list(loop.run_until_complete(self._query(mode, max_concurrency)))
if mode == "hybrid" and self.project.errored:
errors = self._query(mode)
errors = list(loop.run_until_complete(self._query(mode, max_concurrency)))

for model in sorted(self.project.models, key=lambda x: x.name):
for explore in sorted(model.explores, key=lambda x: x.name):
Expand All @@ -228,53 +231,93 @@ def validate(self, mode: str = "batch") -> List[SqlError]:

return errors

def _query(self, mode: str = "batch") -> List[SqlError]:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def _wait_for_query_to_complete(
self, query_task_ids: list, is_final: bool = False
) -> Tuple[list, List[SqlError]]:
errors = []
remaining_query_task_ids = query_task_ids
while remaining_query_task_ids:
remaining_query_task_ids, more_errors = self._get_query_results(
remaining_query_task_ids
)
errors.extend(more_errors)
if not is_final and len(query_task_ids) > len(remaining_query_task_ids):
return remaining_query_task_ids, errors
await asyncio.sleep(0.5)
return remaining_query_task_ids, errors

async def _process_finished_tasks(self, tasks, is_final=False):
return_when = asyncio.FIRST_COMPLETED
if is_final:
return_when = asyncio.ALL_COMPLETED
_done, tasks = await asyncio.wait(tasks, return_when=return_when)
query_ids = []
while _done:
task = _done.pop()
result = task.result()
query_ids.append(result)
return tasks, query_ids

async def _query(
self, mode: str = "batch", max_concurrency: int = 0
) -> List[SqlError]:
session = aiohttp.ClientSession(
loop=loop, headers=self.client.session.headers, timeout=self.timeout
headers=self.client.session.headers, timeout=self.timeout
)
tasks = []
task_calls: List[Tuple[Callable, Tuple]] = []
# first we gather all parameters for the tasks
for model in self.project.models:
for explore in model.explores:
if explore.dimensions:
if mode == "batch" or (mode == "hybrid" and not explore.queried):
logger.debug("Querying one explore at at time")
task = loop.create_task(
self._query_explore(session, model, explore)
)
tasks.append(task)
elif mode == "single" or (mode == "hybrid" and explore.errored):
logger.debug("Querying one dimension at at time")
for dimension in explore.dimensions:
task = loop.create_task(
self._query_dimension(
session, model, explore, dimension
)
if mode == "batch" or (mode == "hybrid" and not explore.queried):
task_calls.append((self._query_explore, (session, model, explore)))
elif mode == "single" or (mode == "hybrid" and explore.errored):
for dimension in explore.dimensions:
task_calls.append(
(
self._query_dimension,
(session, model, explore, dimension),
)
tasks.append(task)

query_task_ids = list(loop.run_until_complete(asyncio.gather(*tasks)))
loop.run_until_complete(session.close())
loop.run_until_complete(asyncio.sleep(0.250))
loop.close()

MAX_QUERY_FETCH = 250

tasks_to_check = query_task_ids[:MAX_QUERY_FETCH]
del query_task_ids[:MAX_QUERY_FETCH]
logger.debug(f"{len(query_task_ids)} left in queue")
tasks_to_check, errors = self._get_query_results(tasks_to_check)

while tasks_to_check or query_task_ids:
number_of_tasks_to_add = MAX_QUERY_FETCH - len(tasks_to_check)
tasks_to_check.extend(query_task_ids[:number_of_tasks_to_add])
del query_task_ids[:number_of_tasks_to_add]
logger.debug(f"{len(query_task_ids)} left in queue")
tasks_to_check, more_errors = self._get_query_results(tasks_to_check)
errors.extend(more_errors)
if tasks_to_check or query_task_ids:
time.sleep(0.5)
)

tasks: Set = set()
query_task_ids: List = []
errors = []
logger.debug(
"we are going to schedule %s tasks at concurrency of %s",
len(task_calls),
max_concurrency,
)
while task_calls:
logger.debug(
"max_concurrency=%s pending_tasks=%s running_tasks=%s "
"query_task_ids=%s",
max_concurrency,
len(task_calls),
len(tasks),
len(query_task_ids),
)
if max_concurrency > 0 and len(tasks) >= max_concurrency:
tasks, _query_ids = await self._process_finished_tasks(tasks)
query_task_ids.extend(_query_ids)

if len(query_task_ids) >= max_concurrency:
query_task_ids, more_errors = await self._wait_for_query_to_complete(
query_task_ids
)
errors.extend(more_errors)
else:
func, args = task_calls.pop(0)
task = asyncio.create_task(func(*args))
tasks.add(task)
# we're now triggered all queries and we just need a final check
tasks, _query_ids = await self._process_finished_tasks(tasks, is_final=True)
query_task_ids.extend(_query_ids)
query_task_ids, more_errors = await self._wait_for_query_to_complete(
query_task_ids, is_final=True
)
errors.extend(more_errors)

await session.close()

return errors

Expand Down

0 comments on commit 106fedd

Please sign in to comment.