Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add execute_concurrent_async and expose execute_concurrent_* in Session #1229

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2725,6 +2725,98 @@ def execute_async(self, query, parameters=None, trace=False, custom_payload=None
future.send_request()
return future

def execute_concurrent(self, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False, execution_profile=EXEC_PROFILE_DEFAULT):
"""
Executes a sequence of (statement, parameters) tuples concurrently. Each
``parameters`` item must be a sequence or :const:`None`.

The `concurrency` parameter controls how many statements will be executed
concurrently. When :attr:`.Cluster.protocol_version` is set to 1 or 2,
it is recommended that this be kept below 100 times the number of
core connections per host times the number of connected hosts (see
:meth:`.Cluster.set_core_connections_per_host`). If that amount is exceeded,
the event loop thread may attempt to block on new connection creation,
substantially impacting throughput. If :attr:`~.Cluster.protocol_version`
is 3 or higher, you can safely experiment with higher levels of concurrency.

If `raise_on_first_error` is left as :const:`True`, execution will stop
after the first failed statement and the corresponding exception will be
raised.

`results_generator` controls how the results are returned.

* If :const:`False`, the results are returned only after all requests have completed.
* If :const:`True`, a generator expression is returned. Using a generator results in a constrained
memory footprint when the results set will be large -- results are yielded
as they return instead of materializing the entire list at once. The trade for lower memory
footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results
on-the-fly).

`execution_profile` argument is the execution profile to use for this
request, it is passed directly to :meth:`Session.execute_async`.

A sequence of ``ExecutionResult(success, result_or_exc)`` namedtuples is returned
in the same order that the statements were passed in. If ``success`` is :const:`False`,
there was an error executing the statement, and ``result_or_exc``
will be an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc``
will be the query result.

Example usage::

select_statement = session.prepare("SELECT * FROM users WHERE id=?")

statements_and_params = []
for user_id in user_ids:
params = (user_id, )
statements_and_params.append((select_statement, params))

results = session.execute_concurrent(statements_and_params, raise_on_first_error=False)

for (success, result) in results:
if not success:
handle_error(result) # result will be an Exception
else:
process_user(result[0]) # result will be a list of rows

Note: in the case that `generators` are used, it is important to ensure the consumers do not
block or attempt further synchronous requests, because no further IO will be processed until
the consumer returns. This may also produce a deadlock in the IO event thread.
"""
from cassandra.concurrent import execute_concurrent
return execute_concurrent(self, statements_and_parameters, concurrency, raise_on_first_error, results_generator, execution_profile)

def execute_concurrent_with_args(self, statement, parameters, *args, **kwargs):
"""
Like :meth:`~cassandra.concurrent.execute_concurrent()`, but takes a single
statement and a sequence of parameters. Each item in ``parameters``
should be a sequence or :const:`None`.

Example usage::

statement = session.prepare("INSERT INTO mytable (a, b) VALUES (1, ?)")
parameters = [(x,) for x in range(1000)]
session.execute_concurrent_with_args(statement, parameters, concurrency=50)
"""
from cassandra.concurrent import execute_concurrent_with_args
return execute_concurrent_with_args(self, statement, parameters, *args, **kwargs)

def execute_concurrent_async(self, statements_and_parameters, concurrency=100, raise_on_first_error=False, execution_profile=EXEC_PROFILE_DEFAULT):
"""
Asynchronously executes a sequence of (statement, parameters) tuples concurrently.

Args:
session: Cassandra session object.
statement_and_parameters: Iterable of (prepared CQL statement, bind parameters) tuples.
concurrency (int, optional): Number of concurrent operations. Default is 100.
raise_on_first_error (bool, optional): If True, execution stops on the first error. Default is True.
execution_profile (ExecutionProfile, optional): Execution profile to use. Default is EXEC_PROFILE_DEFAULT.

Returns:
A `Future` object that will be completed when all operations are done.
"""
from cassandra.concurrent import execute_concurrent_async
return execute_concurrent_async(self, statements_and_parameters, concurrency, raise_on_first_error, execution_profile)

def execute_graph(self, query, parameters=None, trace=False, execution_profile=EXEC_PROFILE_GRAPH_DEFAULT, execute_as=None):
"""
Executes a Gremlin query string or GraphStatement synchronously,
Expand Down
112 changes: 47 additions & 65 deletions cassandra/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,77 +13,23 @@
# limitations under the License.


import logging
from collections import namedtuple
from concurrent.futures import Future
from heapq import heappush, heappop
from itertools import cycle
from threading import Condition
import sys

from cassandra.cluster import ResultSet, EXEC_PROFILE_DEFAULT

import logging
log = logging.getLogger(__name__)


ExecutionResult = namedtuple('ExecutionResult', ['success', 'result_or_exc'])

def execute_concurrent(session, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False, execution_profile=EXEC_PROFILE_DEFAULT):
"""
Executes a sequence of (statement, parameters) tuples concurrently. Each
``parameters`` item must be a sequence or :const:`None`.

The `concurrency` parameter controls how many statements will be executed
concurrently. When :attr:`.Cluster.protocol_version` is set to 1 or 2,
it is recommended that this be kept below 100 times the number of
core connections per host times the number of connected hosts (see
:meth:`.Cluster.set_core_connections_per_host`). If that amount is exceeded,
the event loop thread may attempt to block on new connection creation,
substantially impacting throughput. If :attr:`~.Cluster.protocol_version`
is 3 or higher, you can safely experiment with higher levels of concurrency.

If `raise_on_first_error` is left as :const:`True`, execution will stop
after the first failed statement and the corresponding exception will be
raised.

`results_generator` controls how the results are returned.

* If :const:`False`, the results are returned only after all requests have completed.
* If :const:`True`, a generator expression is returned. Using a generator results in a constrained
memory footprint when the results set will be large -- results are yielded
as they return instead of materializing the entire list at once. The trade for lower memory
footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results
on-the-fly).

`execution_profile` argument is the execution profile to use for this
request, it is passed directly to :meth:`Session.execute_async`.

A sequence of ``ExecutionResult(success, result_or_exc)`` namedtuples is returned
in the same order that the statements were passed in. If ``success`` is :const:`False`,
there was an error executing the statement, and ``result_or_exc`` will be
an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc``
will be the query result.

Example usage::

select_statement = session.prepare("SELECT * FROM users WHERE id=?")

statements_and_params = []
for user_id in user_ids:
params = (user_id, )
statements_and_params.append((select_statement, params))

results = execute_concurrent(
session, statements_and_params, raise_on_first_error=False)

for (success, result) in results:
if not success:
handle_error(result) # result will be an Exception
else:
process_user(result[0]) # result will be a list of rows

Note: in the case that `generators` are used, it is important to ensure the consumers do not
block or attempt further synchronous requests, because no further IO will be processed until
the consumer returns. This may also produce a deadlock in the IO event thread.
See :meth:`.Session.execute_concurrent`.
"""
if concurrency <= 0:
raise ValueError("concurrency must be greater than 0")
Expand Down Expand Up @@ -216,14 +162,50 @@ def _results(self):

def execute_concurrent_with_args(session, statement, parameters, *args, **kwargs):
"""
Like :meth:`~cassandra.concurrent.execute_concurrent()`, but takes a single
statement and a sequence of parameters. Each item in ``parameters``
should be a sequence or :const:`None`.
See :meth:`.Session.execute_concurrent_with_args`.
"""
return execute_concurrent(session, zip(cycle((statement,)), parameters), *args, **kwargs)

Example usage::

statement = session.prepare("INSERT INTO mytable (a, b) VALUES (1, ?)")
parameters = [(x,) for x in range(1000)]
execute_concurrent_with_args(session, statement, parameters, concurrency=50)
class ConcurrentExecutorFutureResults(ConcurrentExecutorListResults):
def __init__(self, session, statements_and_params, execution_profile, future):
super().__init__(session, statements_and_params, execution_profile)
self.future = future

def _put_result(self, result, idx, success):
super()._put_result(result, idx, success)
with self._condition:
if self._current == self._exec_count:
if self._exception and self._fail_fast:
self.future.set_exception(self._exception)
else:
sorted_results = [r[1] for r in sorted(self._results_queue)]
self.future.set_result(sorted_results)


def execute_concurrent_async(
session,
statements_and_parameters,
concurrency=100,
raise_on_first_error=False,
execution_profile=EXEC_PROFILE_DEFAULT
):
"""
return execute_concurrent(session, zip(cycle((statement,)), parameters), *args, **kwargs)
See :meth:`.Session.execute_concurrent_async`.
"""
# Create a Future object and initialize the custom ConcurrentExecutor with the Future
future = Future()
executor = ConcurrentExecutorFutureResults(
session=session,
statements_and_params=statements_and_parameters,
execution_profile=execution_profile,
future=future
)

# Execute concurrently
try:
executor.execute(concurrency=concurrency, fail_fast=raise_on_first_error)
except Exception as e:
future.set_exception(e)

return future
54 changes: 53 additions & 1 deletion tests/unit/test_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import platform

from cassandra.cluster import Cluster, Session
from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args
from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args, execute_concurrent_async
from cassandra.pool import Host
from cassandra.policies import SimpleConvictionPolicy
from tests.unit.utils import mock_session_pools
Expand Down Expand Up @@ -239,6 +239,58 @@ def validate_result_ordering(self, results):
self.assertLess(last_time_added, current_time_added)
last_time_added = current_time_added

def insert_and_validate_list_async(self, reverse, slowdown):
"""
This utility method will execute submit various statements for execution using execute_concurrent_async,
then invoke a separate thread to execute the callback associated with the futures registered
for those statements. The parameters will toggle various timing, and ordering changes.
Finally it will validate that the results were returned in the order they were submitted
:param reverse: Execute the callbacks in the opposite order that they were submitted
:param slowdown: Cause intermittent queries to perform slowly
"""
our_handler = MockResponseResponseFuture(reverse=reverse)
mock_session = Mock()
statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
[(i, ) for i in range(100)])
mock_session.execute_async.return_value = our_handler

t = TimedCallableInvoker(our_handler, slowdown=slowdown)
t.start()
try:
future = execute_concurrent_async(mock_session, statements_and_params)
results = future.result()
self.validate_result_ordering(results)
finally:
t.stop()

def test_results_ordering_async_forward(self):
"""
This tests the ordering of our execute_concurrent_async function
when queries complete in the order they were executed.
"""
self.insert_and_validate_list_async(False, False)

def test_results_ordering_async_reverse(self):
"""
This tests the ordering of our execute_concurrent_async function
when queries complete in the reverse order they were executed.
"""
self.insert_and_validate_list_async(True, False)

def test_results_ordering_async_forward_slowdown(self):
"""
This tests the ordering of our execute_concurrent_async function
when queries complete in the order they were executed, with slow queries mixed in.
"""
self.insert_and_validate_list_async(False, True)

def test_results_ordering_async_reverse_slowdown(self):
"""
This tests the ordering of our execute_concurrent_async function
when queries complete in the reverse order they were executed, with slow queries mixed in.
"""
self.insert_and_validate_list_async(True, True)

@mock_session_pools
def test_recursion_limited(self):
"""
Expand Down