-
Notifications
You must be signed in to change notification settings - Fork 2
Python SDK Tutorial Part 3
After the above implementation, we have a functionally multi-threaded client that implements basic rate-limiting. However, it isn't very robust nor performant; it is limited to the same number of processing threads as the allotted rate (in requests per second). A more robust and performant approach is to allow for as many processing threads as desired to optimize iowait while still throttling requests in order to maintain a consistent rate. To implement this approach, we can easily leverage the token bucket algorithm, which was designed for the similar network bandwidth limiting concept.
For the token-bucket implementation, start by copying the existing rate_limiting_simple.py
script to rate_limiting_robust.py
. Now let's add a variable to represent the number of waiting threads per processing thread:
...
# set allotted number of requests per second
# this is defined here as it could be retrieved through some external mechanism
__requests_per_second = 4
__waiting_threads = 10
...
Next, let's add our new token-bucket concurrency control constructs:
...
# 3. Define concurrency specific objects
# stats objects
lock_stats = mp.Lock()
counter = mp.Value('i', 0)
avg_req_time = mp.Value('f', 0)
time_start = mp.Value('f', 999999999999999)
time_end = mp.Value('f', 0)
# concurrency control objects
lock_bucket = mp.Lock()
bucket = mp.Queue()
# pre-seed bucket
for i in range(__requests_per_second):
bucket.put('token')
...
In addition to the previously added stats objects, we are now adding multiprocessing constructs for the purpose of rate limiting. The Queue
object acts as the bucket, with a Lock
to gate modifications to its contents. We also pre-seed the bucket with tokens here.
Now let's define a new function that will act as our bucket-refilling thread. In a nutshell, the way the token-bucket algorithm works is that multiple threads take tokens from the bucket in order to run, while a separate thread replenishes tokens into the bucket at a pre-defined rate (in this case, a rate of n
tokens per seconds, which corresponds to the n
requests per second rate). Let's call this function bucket_refill_thread
:
def bucket_refill_thread(lock_bucket, bucket):
while True:
# (re-)establish parameters
interval = float(1.0)
tokens = __requests_per_second
# sleep for interval
# the blow section breaks intervals apart into smaller chunks in order to smooth out bursting
# for example, 8 rps can be broken down into: 2 requests / 250 ms
if __requests_per_second > 1:
for i in range(__requests_per_second-1, 1):
if __requests_per_second % i == 0:
interval = 1 / float(i)
tokens = __requests_per_second / i
break
time.sleep(interval)
# check for poison pill
lock_bucket.acquire()
size = bucket.qsize()
shutdown = False
for i in range(size):
try:
token = bucket.get(block=False)
if 'shutdown' == token:
shutdown = True
break
else:
bucket.put(token)
except:
pass
if shutdown:
break
# don't let the bucket exceed token capacity
while bucket.qsize() < min(size + tokens, __requests_per_second):
bucket.put('token')
lock_bucket.release()
Like the other thread functions above, it looks for a 'poison pill' control signal to shutdown, but it also puts tokens into the bucket queue, with a cap of no higher than the allotted rps rate. It also doesn't simply put in the maximum allotted tokens each second; it tries to divide it into smaller chunks in order to smooth out request bursting. Doing this maintains a smoother overall request rate and lower response time latencies (which yields better overall throughput). For example, with a rate of 12 rps, it will actually end up putting in 2 tokens every 1/6 of a second, which makes for much smoother response handling than 12 tokens (at once) every second.
We also need to start and stop the bucket_refill_thread
just like the other threads in the test_api
function:
...
# first start bucket refill thread
refill_thread = mp.Process(target=bucket_refill_thread, args=(lock_bucket, bucket))
refill_thread.start()
pool = []
for i in range(__requests_per_second * __waiting_threads): # waiting threads to keep the pipeline full
# pass in necessary parameters to thread, including client key, etc.
p = mp.Process(target=image_process_thread,
args=(url, client_key, queue, results,
lock_stats, counter, avg_req_time, time_start, time_end,
lock_bucket, bucket))
pool.append(p)
p.start()
...
# 5. clean-up after queue has been processed with "poison pill"
while not queue.empty():
# wait for queue to be processed
time.sleep(1)
for i in pool:
# seed shutdown messages / poison pills
queue.put(dict(id=-1, url='shutdown', model='shutdown'))
for p in pool:
# enforce clean shutdown of threads
p.join()
# stop bucket refill thread
bucket.put('shutdown')
refill_thread.join()
...
Note that we also multiply the number of allotted threads by the previously defined __waiting_threads
multiplier in order to spawn numerous waiting threads in order to maximize our client's throughput. We also add our new bucket (and corresponding mutex) to our existing image_process_thread
invocation loop.
The image_process_thread
can now be enhanced to utilize these new rate limiting constructs:
def image_process_thread(url, client_key, queue, results,
lock_stats, counter, avg_req_time, time_start, time_end,
lock_bucket, bucket):
while True:
# acquire token in order to process
lock_bucket.acquire()
token = None
try:
token = bucket.get(block=False) # don't do anything with the token, just remove it, as it acts as our "access rights"
except:
pass
# first release lock
lock_bucket.release()
# then proceed or sleep
if not token:
time.sleep(1 / float(__requests_per_second))
continue
# get image URL entry to process
...
Note that similarly to how the earlier implementation handled 429
s, if a token is not available for the processing thread, sleep for an arbitrary 1 / n
seconds (where n
is requests per seconds).
We also don't need to explicitly sleep if we get a 429
code, we can remove the previously existing time.sleep()
calls:
...
elif resp.status_code == 429:
# handle over-rate limit retrying
print(msg.format(
http=resp.status_code,
limit=resp.headers['X-RateLimit-Remaining-second'],
thread=mp.current_process().name,
msg='surpassed rate limit, trying again')
)
# re-queue entry and try again
queue.put(entry)
...
Finally, let's add this 3rd implementation to our existing run.py
test runner:
...
print('3. running robust rate limiting example')
rate_limiting_robust.run(client_key)
Now if you run the test suite, you should see higher throughput for this more robust implementation, such as:
[60] requests processed in [16.0] seconds with average time [1033.0] ms, total throughput: [3.75] rps
And that's it! At this point, we have implemented a basic client, a simple rate-limited client, and a more robust, optimized rate-limited client that implements the token-bucket algorithm.
If you run into any issues or have any suggestions with the code examples, please feel free to create an issue in GitHub! Thanks for reading our tutorial!
exit
© 2019 Restb.ai [Mercurio Platform SL]. All rights reserved.