Skip to content
This repository has been archived by the owner on Apr 15, 2022. It is now read-only.

Refactor to use concurrent.futures #12

Merged
merged 44 commits into from
Jun 26, 2017
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
cd1b76d
Move function fully qualified name saving from the Client's responsib…
aronasorman May 23, 2017
a9fb3fa
Add SuccessMessage type
aronasorman May 24, 2017
4129774
Simplify Worker backend by using concurrent.futures.
aronasorman May 24, 2017
72cbc9a
Implement InfiniteLoopThread, a generic thread that runs a function f…
aronasorman May 24, 2017
d1f9e40
Add timeout parameter to messaging.pop abstractmethod
aronasorman May 24, 2017
b6c8a62
fix job completion marking
aronasorman May 24, 2017
07d5f5c
flesh out worker sending of success message.
aronasorman May 24, 2017
f332128
Add the queued job state, and give a better string representation of …
aronasorman May 25, 2017
a16ff34
fix bug InfiniteLoopThread only sleeps when we get an exception.
aronasorman May 25, 2017
8c53e89
properly mark jobs as running, then take them off the queue.
aronasorman May 25, 2017
d3e9a86
catch a possible timeout exception from waiting for a job update.
aronasorman May 25, 2017
881679b
Extend get_lambda_to_execute to support passing in a function for pro…
aronasorman May 25, 2017
77a2b53
Create a ProgressMessage class for updating our progress.
aronasorman May 25, 2017
b440efa
Create a new FailureMessage class
aronasorman May 25, 2017
a0fe416
Allow job progress updates.
aronasorman May 25, 2017
7505575
Fetch the job AFTER we get an update about it.
aronasorman May 25, 2017
e343908
Implement failure notification for jobs.
aronasorman May 25, 2017
5b69f13
rename InfiniteLoopThread.cancel to stop
aronasorman May 26, 2017
8b06b90
refactor Scheduler to use InfiniteLoopThread instead of defining its …
aronasorman May 26, 2017
2fbdaab
add docstrings for the client classes
aronasorman May 26, 2017
d4bcf4a
remove unused functions and classes
aronasorman May 26, 2017
31c5280
add comment documentation for the job's states
aronasorman May 26, 2017
920a380
Properly set the initial values of traceback and exception
aronasorman May 26, 2017
664bab4
add docstrings for Job.
aronasorman May 26, 2017
1875cab
Add initial changelog.
aronasorman May 26, 2017
96d7d8e
Merge remote-tracking branch 'origin/develop' into refactor-futures
aronasorman May 31, 2017
129c2eb
get tests passing on python27
aronasorman Jun 5, 2017
f4b5cdf
Add sqlalchemy and remove enum34
aronasorman Jun 5, 2017
24602c5
truly remove enum dependency
aronasorman Jun 7, 2017
a851417
second pass on a SQL-based job storage backend.
aronasorman Jun 8, 2017
2462622
implement barbequeue.exceptions.TimeoutError
aronasorman Jun 9, 2017
8635de4
Fix test that expects MessageType to be an enum
aronasorman Jun 9, 2017
ec8f693
use NotImplementedError, not NotImplemented
aronasorman Jun 9, 2017
1b49bb2
make sure read sessions are closed.
aronasorman Jun 9, 2017
d02a858
more fixes to get stuff working on sqlalchemy
aronasorman Jun 12, 2017
e92205c
add client.all_jobs() and client.clear()
aronasorman Jun 12, 2017
92db474
Properly set orm_job.obj value.
aronasorman Jun 12, 2017
90dca67
allow failed jobs to track the exception and traceback
aronasorman Jun 13, 2017
8f74ba8
Fix clearing of jobs.
aronasorman Jun 13, 2017
be50717
Implement SimpleClient to allow users to specify the worker process t…
aronasorman Jun 14, 2017
1554030
remove pdb call from simpletest
aronasorman Jun 14, 2017
391bb9f
Always have concurrent-futures as a dependency.
aronasorman Jun 21, 2017
b2687fa
Add wait and wait_for_completion on the client class
aronasorman Jun 22, 2017
92b020b
conditionally import processpool and threadpool.
aronasorman Jun 26, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Change Log


All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

This comment was marked as spam.



## [Unreleased]

This comment was marked as spam.

### Added
- the first pass of the small queueing system.
- An InMemClient that spawns thread-based workers, and stores jobs in-memory.
Empty file added src/__init__.py
Empty file.
72 changes: 43 additions & 29 deletions src/barbequeue/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import importlib
import uuid

from barbequeue.common.classes import Job
Expand All @@ -16,50 +15,56 @@ def __init__(self, app, namespace, **config):
def schedule(self, func, *args, **kwargs):
"""
Schedules a function func for execution.
"""

updates_progress = kwargs.pop('updates_progress', False)
The only other special parameter is track_progress. If passed in and not None, the func will be passed in a
keyword parameter called update_progress:

def update_progress(progress, total_progress, stage=""):

The running function can call the update_progress function to notify interested parties of the function's
current progress.

All other parameters are directly passed to the function when it starts running.

:type func: callable or str
:param func: A callable object that will be scheduled for running.
:return: a string representing the job_id.
"""

# turn our function object into its fully qualified name if needed
if callable(func):
funcname = self.stringify_func(func)
# if the func is already a job object, just schedule that directly.
if isinstance(func, Job):
job = func
# else, turn it into a job first.
else:
funcname = func
job = Job(func, *args, **kwargs)

job = Job(funcname, *args, **kwargs)
job.track_progress = kwargs.pop('track_progress', False)
job_id = self.storage.schedule_job(job)
return job_id

def cancel(self, job_id):
"""
Mark a job as canceled and remove it from the list of jobs to be executed.
Send a message to our workers to stop a job.

:param job_id: the job_id of the Job to cancel.
"""
self.storage.cancel_job(job_id)

def status(self, job_id):
"""
Gets the status of a job given by job_id.
"""
return self.storage.get_job(job_id)

@staticmethod
def stringify_func(func):
assert callable(func), "function {} passed to stringify_func isn't a function!".format(func)
Returns a Job object corresponding to the job_id. From there, you can query for the following attributes:

fqn = "{module}.{funcname}".format(module=func.__module__, funcname=func.__name__)
return fqn
- function string to run
- its current state (see Job.State for the list of states)
- progress (returning an int), total_progress (returning an int), and percentage_progress
(derived from running job.progress/total_progress)
- the job.exception and job.traceback, if the job's function returned an error

@staticmethod
def import_stringified_func(funcstring):
assert isinstance(funcstring, str)

modulestring, funcname = funcstring.rsplit('.', 1)

mod = importlib.import_module(modulestring)

func = getattr(mod, funcname)
return func
:param job_id: the job_id to get the Job object for
:return: the Job object corresponding to the job_id
"""
return self.storage.get_job(job_id)


class InMemClient(Client):
Expand All @@ -80,13 +85,22 @@ def __init__(self, app, namespace, *args, **kwargs):
self._storage = storage_inmem.Backend(app, namespace)
self._messaging = messaging_inmem.Backend()
self._workers = inmem.Backend(incoming_message_mailbox=self.worker_mailbox_name,
outgoing_message_mailbox=self.scheduler_mailbox_name)
self._scheduler = Scheduler(self._storage, self._messaging, self._workers,
outgoing_message_mailbox=self.scheduler_mailbox_name,
msgbackend=self._messaging)
self._scheduler = Scheduler(self._storage, self._messaging,
worker_mailbox=self.worker_mailbox_name,
incoming_mailbox=self.scheduler_mailbox_name)

super(InMemClient, self).__init__(app, namespace, storage_backend=storage_inmem, *args, **kwargs)

def shutdown(self):
"""
Shutdown the client and all of its managed resources:

- the workers
- the scheduler threads

:return: None
"""
self._scheduler.shutdown()
self._workers.shutdown()
171 changes: 86 additions & 85 deletions src/barbequeue/common/classes.py
Original file line number Diff line number Diff line change
@@ -1,113 +1,114 @@
import abc
import enum
import importlib
import logging
import threading
from collections import namedtuple
from functools import partial

from barbequeue import humanhash
from barbequeue.common.utils import import_stringified_func, stringify_func

logger = logging.getLogger(__name__)


class Job(object):
"""
Job represents a function whose execution has been deferred through the Client's schedule function.

Jobs are stored on the storage backend for persistence through restarts, and are scheduled for running
to the workers.
"""
class State(enum.Enum):

This comment was marked as spam.

"""
the Job.State object enumerates a Job's possible valid states.

SCHEDULED means the Job has been accepted by the client, but has not been
sent to the workers for running.

QUEUED means the Job has been sent to the workers for running, but has not
been run yet (to our knowledge).

RUNNING means that one of the workers has started running the job, but is not
complete yet. If the job has been set to track progress, then the job's progress
and total_progress fields should be continuously updated.

FAILED means that the job's function has raised an exception during runtime.
The job's exception and traceback fields should be set.

CANCELED means that the job has been canceled from running.

COMPLETED means that the function has run to completion. The job's result field
should be set with the function's return value.
"""

SCHEDULED = 0
# STARTED = 1
QUEUED = 1
RUNNING = 2
FAILED = 3
CANCELED = 4
COMPLETED = 5

def __init__(self, func_string, *args, **kwargs):
def __init__(self, func, *args, **kwargs):
"""
Create a new Job that will run func given the arguments passed to Job(). If the track_progress keyword parameter
is given, the worker will pass an update_progress function to update interested parties about the function's
progress. See Client.__doc__ for update_progress's function parameters.

:param func: func can be a callable object, in which case it is turned into an importable string,
or it can be an importable string already.
"""
self.job_id = kwargs.pop('job_id', None)
self.state = kwargs.pop('state', self.State.SCHEDULED)
self.func = func_string
self.traceback = kwargs.pop('traceback', '')
self.exception = kwargs.pop('exception', '')
self.traceback = ""
self.exception = None
self.track_progress = kwargs.pop('track_progress', False)
self.progress = 0
self.total_progress = 0
self.args = args
self.kwargs = kwargs

def get_lambda_to_execute(self):
fqn = self.func
modulename, funcname = fqn.rsplit('.', 1)
mod = importlib.import_module(modulename)
assert hasattr(
mod, funcname), \
"Module {} does not have attribute {}".format(
mod, funcname)

func = getattr(mod, funcname)

y = lambda: func(*self.args, **self.kwargs)
return y
if callable(func):
funcstring = stringify_func(func)
elif isinstance(func, str):
funcstring = func
else:
raise Exception(
"Error in creating job. We do not know how to "
"handle a function of type {}".format(type(func)))

def serialize(self):
pass


class ProgressData(namedtuple("_ProgressData", ["id", "order", "data"])):
pass
self.func = funcstring

def get_lambda_to_execute(self):
"""
return a function that executes the function assigned to this job.

If job.track_progress is None (the default), the returned function accepts no argument
and simply needs to be called. If job.track_progress is True, an update_progress function
is passed in that can be used by the function to provide feedback progress back to the
job scheduling system.

:return: a function that executes the original function assigned to this job.
"""
func = import_stringified_func(self.func)

class Function(namedtuple("_Function", ["module", "funcname"])):
def serialize(self):
# Since this is all in memory, there is no need to serialize anything.
raise NotImplementedError()
if self.track_progress:
y = lambda p: func(update_progress=partial(p, self.job_id), *self.args, **self.kwargs)
else:
y = lambda: func(*self.args, **self.kwargs)

return y

class BaseCloseableThread(threading.Thread):
"""
A base class for a thread that monitors an Event as a signal for shutting down, and a main_loop that otherwise loop
until the shutdown event is received.
"""
__metaclass__ = abc.ABCMeta

DEFAULT_TIMEOUT_SECONDS = 0.2

def __init__(self, shutdown_event, thread_name, *args, **kwargs):
assert isinstance(shutdown_event, threading.Event)

self.shutdown_event = shutdown_event
self.thread_name = thread_name
self.thread_id = self._generate_thread_id()
self.logger = logging.getLogger("{module}.{name}[{id}]".format(module=__name__,
name=self.thread_name,
id=self.thread_id))
self.full_thread_name = "{thread_name}-{thread_id}".format(thread_name=self.thread_name,
thread_id=self.thread_id)
super(BaseCloseableThread, self).__init__(name=self.full_thread_name, *args, **kwargs)

def run(self):
self.logger.info("Started new {name} thread ID#{id}".format(name=self.thread_name,
id=self.thread_id))

while True:
if self.shutdown_event.wait(self.DEFAULT_TIMEOUT_SECONDS):
self.logger.warning("{name} shut down event received; closing.".format(name=self.thread_name))
self.shutdown()
break
else:
self.main_loop(timeout=self.DEFAULT_TIMEOUT_SECONDS)
continue

@abc.abstractmethod
def main_loop(self, timeout):
@property
def percentage_progress(self):
"""
The main loop of a thread. Run this loop if we haven't received any shutdown events in the last
timeout seconds. Normally this is used to read from a queue; you are encouraged to return from
this function if the timeout parameter has elapsed, to allow the thread to continue to check
for the shutdown event.
:param timeout: a parameter determining how long you can wait for a timeout.
:return: None
Returns a float between 0 and 1, representing the current job's progress in its task.

:return: float corresponding to the total percentage progress of the job.
"""
pass

@staticmethod
def _generate_thread_id():
uid, _ = humanhash.uuid()
return uid

def shutdown(self):
# stub method, override if you need a more complex shut down procedure.
pass
return float(self.progress) / self.total_progress

This comment was marked as spam.


def __repr__(self):
return "<Job id: {id} state: {state} progress: {p}/{total} func: {func}>".format(
id=self.job_id,
state=self.state.name,
func=self.func,
p=self.progress,
total=self.total_progress
)
Loading