Skip to content

Commit

Permalink
add Session.execute_concurrent and execute_concurrent_with_args
Browse files Browse the repository at this point in the history
  • Loading branch information
jbellis committed Sep 17, 2024
1 parent 6e2ffd4 commit a7a0019
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 64 deletions.
77 changes: 77 additions & 0 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import time
from threading import Lock, RLock, Thread, Event
import uuid
import os
import urllib.request, urllib.error
import json

import weakref
from weakref import WeakValueDictionary
Expand Down Expand Up @@ -81,6 +84,7 @@
from cassandra.marshal import int64_pack
from cassandra.timestamps import MonotonicTimestampGenerator
from cassandra.util import _resolve_contact_points_to_string_map, Version
from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args

from cassandra.datastax.insights.reporter import MonitorReporter
from cassandra.datastax.insights.util import version_supports_insights
Expand Down Expand Up @@ -2725,6 +2729,79 @@ def execute_async(self, query, parameters=None, trace=False, custom_payload=None
future.send_request()
return future

def execute_concurrent(self, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False, execution_profile=EXEC_PROFILE_DEFAULT):
"""
Executes a sequence of (statement, parameters) tuples concurrently. Each
``parameters`` item must be a sequence or :const:`None`.
The `concurrency` parameter controls how many statements will be executed
concurrently. When :attr:`.Cluster.protocol_version` is set to 1 or 2,
it is recommended that this be kept below 100 times the number of
core connections per host times the number of connected hosts (see
:meth:`.Cluster.set_core_connections_per_host`). If that amount is exceeded,
the event loop thread may attempt to block on new connection creation,
substantially impacting throughput. If :attr:`~.Cluster.protocol_version`
is 3 or higher, you can safely experiment with higher levels of concurrency.
If `raise_on_first_error` is left as :const:`True`, execution will stop
after the first failed statement and the corresponding exception will be
raised.
`results_generator` controls how the results are returned.
* If :const:`False`, the results are returned only after all requests have completed.
* If :const:`True`, a generator expression is returned. Using a generator results in a constrained
memory footprint when the results set will be large -- results are yielded
as they return instead of materializing the entire list at once. The trade for lower memory
footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results
on-the-fly).
`execution_profile` argument is the execution profile to use for this
request, it is passed directly to :meth:`Session.execute_async`.
A sequence of ``ExecutionResult(success, result_or_exc)`` namedtuples is returned
in the same order that the statements were passed in. If ``success`` is :const:`False`,
there was an error executing the statement, and ``result_or_exc``
will be an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc``
will be the query result.
Example usage::
select_statement = session.prepare("SELECT * FROM users WHERE id=?")
statements_and_params = []
for user_id in user_ids:
params = (user_id, )
statements_and_params.append((select_statement, params))
results = session.execute_concurrent(statements_and_params, raise_on_first_error=False)
for (success, result) in results:
if not success:
handle_error(result) # result will be an Exception
else:
process_user(result[0]) # result will be a list of rows
Note: in the case that `generators` are used, it is important to ensure the consumers do not
block or attempt further synchronous requests, because no further IO will be processed until
the consumer returns. This may also produce a deadlock in the IO event thread.
"""
return execute_concurrent(self, statements_and_parameters, concurrency, raise_on_first_error, results_generator, execution_profile)

def execute_concurrent_with_args(self, statement, parameters, *args, **kwargs):
"""
Like :meth:`~cassandra.concurrent.execute_concurrent()`, but takes a single
statement and a sequence of parameters. Each item in ``parameters``
should be a sequence or :const:`None`.
Example usage::
statement = session.prepare("INSERT INTO mytable (a, b) VALUES (1, ?)")
parameters = [(x,) for x in range(1000)]
session.execute_concurrent_with_args(statement, parameters, concurrency=50)
"""
return execute_concurrent_with_args(self, statement, parameters, *args, **kwargs)

def execute_graph(self, query, parameters=None, trace=False, execution_profile=EXEC_PROFILE_GRAPH_DEFAULT, execute_as=None):
"""
Executes a Gremlin query string or GraphStatement synchronously,
Expand Down
66 changes: 2 additions & 64 deletions cassandra/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,61 +29,7 @@

def execute_concurrent(session, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False, execution_profile=EXEC_PROFILE_DEFAULT):
"""
Executes a sequence of (statement, parameters) tuples concurrently. Each
``parameters`` item must be a sequence or :const:`None`.
The `concurrency` parameter controls how many statements will be executed
concurrently. When :attr:`.Cluster.protocol_version` is set to 1 or 2,
it is recommended that this be kept below 100 times the number of
core connections per host times the number of connected hosts (see
:meth:`.Cluster.set_core_connections_per_host`). If that amount is exceeded,
the event loop thread may attempt to block on new connection creation,
substantially impacting throughput. If :attr:`~.Cluster.protocol_version`
is 3 or higher, you can safely experiment with higher levels of concurrency.
If `raise_on_first_error` is left as :const:`True`, execution will stop
after the first failed statement and the corresponding exception will be
raised.
`results_generator` controls how the results are returned.
* If :const:`False`, the results are returned only after all requests have completed.
* If :const:`True`, a generator expression is returned. Using a generator results in a constrained
memory footprint when the results set will be large -- results are yielded
as they return instead of materializing the entire list at once. The trade for lower memory
footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results
on-the-fly).
`execution_profile` argument is the execution profile to use for this
request, it is passed directly to :meth:`Session.execute_async`.
A sequence of ``ExecutionResult(success, result_or_exc)`` namedtuples is returned
in the same order that the statements were passed in. If ``success`` is :const:`False`,
there was an error executing the statement, and ``result_or_exc`` will be
an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc``
will be the query result.
Example usage::
select_statement = session.prepare("SELECT * FROM users WHERE id=?")
statements_and_params = []
for user_id in user_ids:
params = (user_id, )
statements_and_params.append((select_statement, params))
results = execute_concurrent(
session, statements_and_params, raise_on_first_error=False)
for (success, result) in results:
if not success:
handle_error(result) # result will be an Exception
else:
process_user(result[0]) # result will be a list of rows
Note: in the case that `generators` are used, it is important to ensure the consumers do not
block or attempt further synchronous requests, because no further IO will be processed until
the consumer returns. This may also produce a deadlock in the IO event thread.
See :meth:`.Session.execute_concurrent`.
"""
if concurrency <= 0:
raise ValueError("concurrency must be greater than 0")
Expand Down Expand Up @@ -216,14 +162,6 @@ def _results(self):

def execute_concurrent_with_args(session, statement, parameters, *args, **kwargs):
"""
Like :meth:`~cassandra.concurrent.execute_concurrent()`, but takes a single
statement and a sequence of parameters. Each item in ``parameters``
should be a sequence or :const:`None`.
Example usage::
statement = session.prepare("INSERT INTO mytable (a, b) VALUES (1, ?)")
parameters = [(x,) for x in range(1000)]
execute_concurrent_with_args(session, statement, parameters, concurrency=50)
See :meth:`.Session.execute_concurrent_with_args`.
"""
return execute_concurrent(session, zip(cycle((statement,)), parameters), *args, **kwargs)

0 comments on commit a7a0019

Please sign in to comment.