diff --git a/docs/design/job_architecture.md b/docs/design/job_architecture.md index 56dc5b660d..2096d97e62 100644 --- a/docs/design/job_architecture.md +++ b/docs/design/job_architecture.md @@ -19,7 +19,6 @@ The front- and backend have a shared vocabulary of message types and parameters, ```js define(['common/jobCommMessages'], (jcm) => { - // use the JobCommMessages object console.log("job status message type: " + jcm.MESSAGE_TYPE.STATUS) console.log("job ID parameter: " + jcm.PARAM.JOB_ID) @@ -37,16 +36,7 @@ These messages are sent to the `JobCommChannel` on the front end, to get process * `JOB_ID` - a string, the job id OR * `JOB_ID_LIST` - an array of job IDs OR * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) - -`START_UPDATE` - request the status for a job or jobs, but start an update cycle so that it's continually requested. - * `JOB_ID` - a string, the job id OR - * `JOB_ID_LIST` - an array of job IDs OR - * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) - -`STOP_UPDATE` - signal that the front end doesn't need any more updates for the specified job(s), so stop sending them for each loop cycle. Doesn't actually end the job, only requests for updates. - * `JOB_ID` - a string, the job id OR - * `JOB_ID_LIST` - an array of job IDs OR - * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) + * `TS` - an int; send only the jobs that have been requested since this timestamp `INFO` - request information about the job(s), specifically app id, spec, input parameters and (if finished) outputs * `JOB_ID` - a string, the job id OR @@ -56,6 +46,7 @@ These messages are sent to the `JobCommChannel` on the front end, to get process `CANCEL` - request that the server cancel the running job(s) * `JOB_ID` - a string, the job id OR * `JOB_ID_LIST` - an array of job IDs + * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) `RETRY` - request that the server rerun a job or set of jobs * `JOB_ID` - a string, the job id OR @@ -68,6 +59,18 @@ These messages are sent to the `JobCommChannel` on the front end, to get process * `num_lines` - the number of lines to request (will get back up to that many if there aren't more) (optional) * `latest` - true if requesting just the latest set of logs (optional) +### To be deprecated -- do not use + +`START_UPDATE` - request the status for a job or jobs, but start an update cycle so that it's continually requested. + * `JOB_ID` - a string, the job id OR + * `JOB_ID_LIST` - an array of job IDs OR + * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) + +`STOP_UPDATE` - signal that the front end doesn't need any more updates for the specified job(s), so stop sending them for each loop cycle. Doesn't actually end the job, only requests for updates. + * `JOB_ID` - a string, the job id OR + * `JOB_ID_LIST` - an array of job IDs OR + * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) + ### Usage Example The comm channel is used through the main Bus object that's instantiated through the global `Runtime` object. That needs to be included in the `define` statement for all AMD modules. The bus is then used with its `emit` function (you have the bus *emit* a message to its listeners), and any inputs are passed along with it. @@ -108,30 +111,7 @@ define( ``` ## Bus responses -Messages from the kernel to the front end are received by the `JobCommChannel` module. This takes the responses, unpacks them, and turns them into a response message that is passed back over the bus to any frontend Javascript module that listens to them. The message types are described below, along with the content that gets sent, followed by an example of how to make use of them. - -### Cell-related - -`RUN_STATUS` - updates the run status of the job - this is part of the initial flow of starting a job through the AppManager. - * see `run_status` below for the message format. - -### Job-related - -`INFO` - contains information about the current job. The format is described under `job_info` in the **Messages sent from the kernel to the browser** section below. - -`LOGS` - sent with information about some job logs. The format is described under `job_logs` in the **Messages sent from the kernel to the browser** section below. - -`RETRY` - sent in response to a retry request; contains the original job ID, the ID of the retry if the request was successful, or an error message, if it was not. - -`STATUS` - contains the current job state - * `jobState` - object, describes the job state (see the **Data Structures** section below for the structure) - * `outputWidgetInfo` - object, contains the parameters to be sent to an output widget. This will be different for all widgets, depending on the App that invokes them. - -`ERROR` - sent in response to an error that happened on job information lookup, or another error that happened while processing some other message to the JobManager. - * `raw_request` - object; the request that generated the error - * `source` - string; what triggered the error, e.g. an invalid request or an issue completing a request - * any other content that was supplied - this is frequently an error object, from which a name and error message, and potentially other information, are taken. - +Messages from the kernel to the front end are received by the `JobCommChannel` module. This takes the responses, unpacks them, and turns them into a response message that is passed back over the bus to any frontend Javascript module that listens to them. See the [messages sent from the kernel to the browser](#messages-sent-from-the-kernel-to-the-browser) section for full descriptions. ### Usage example As in the Bus requests section above, the front end response handling is done through the Runtime bus. The bus provides both an `on` and a `listen` function, examples will show how to use both. Generally, the `listen` function is more specific and binds the listener to a specific bus channel. These channels can invoke the jobId, or the cellId, to make sure that only information about specific jobs is listened for. @@ -182,7 +162,6 @@ Note that both of these create events that get bound to the DOM, and when the wi ## Kernel Comm Channel On the kernel side, a complementary comm channel is used. This is set up in the `biokbase.narrative.jobs.jobcomm.JobComm` class. On Narrative load, page reload, or kernel restart, this is initialized to handle any messages sent to the kernel. The same controlled vocabulary of terms is used for message types and job parameters as for the frontend. - Note that these are autogenerated by the frontend `JobCommChannel` object, using the `Jupyter.kernel.comm` package. The actual message that the JobComm sees in the kernel has this format: @@ -212,7 +191,8 @@ print(PARAM["JOB_ID_LIST"]) # prints 'job_id_list' For simplicity below, it is assumed that constants have been created from the key/value pairs in MESSAGE_TYPE and PARAM. -An example of a job status message sent from the frontend to the kernel: +An example of a job status message sent from the frontend to the kernel, requesting a job status update on a job. + ```json { "msg_id": "some string", @@ -224,44 +204,10 @@ An example of a job status message sent from the frontend to the kernel: } } ``` -Messages expected by the narrative backend: -`STATUS` - request job status - * `JOB_ID` - a string, the job id OR - * `JOB_ID_LIST` - an array of job IDs OR - * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) +## Messages sent from the browser to the kernel -`STATUS_ALL` - request the status of all jobs registered in the job manager - -`START_UPDATE` - request updating job(s) during the update thread, responds with `STATUS` - * `JOB_ID` - a string, the job id OR - * `JOB_ID_LIST` - an array of job IDs OR - * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) - -`STOP_UPDATE` - request halting update for job(s) during the update thread, responds with `STATUS` - * `JOB_ID` - a string, the job id OR - * `JOB_ID_LIST` - an array of job IDs OR - * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) - -`INFO` - request general information about job(s) - * `JOB_ID` - a string, the job id OR - * `JOB_ID_LIST` - an array of job IDs OR - * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) - -`LOGS` - request job log information -* `JOB_ID` - string OR -* `JOB_ID_LIST` - array of strings -* `first_line` - int >= 0, ignored if `latest` is `true` -* `num_lines` - int > 0 -* `latest` - boolean, `true` if requesting just the latest logs - -`CANCEL` - cancel a job or list of jobs; responds with `STATUS` -* `JOB_ID` - string OR -* `JOB_ID_LIST` - array of strings - -`RETRY` - retry a job or list of jobs, responds with `RETRY` -* `JOB_ID` - string OR -* `JOB_ID_LIST` - array of strings +See the [Bus requests](#bus-requests) section. ## Messages sent from the kernel to the browser These are all caught by the `JobCommChannel` on the browser side, then parsed and sent as the bus messages described above. Like other kernel messages, they have a `msg_type` field, and a `content` field containing data meant for the frontend to use. They have a rough structure like this: @@ -286,6 +232,7 @@ For example: "msg_type": "job_status", "content": { "example_job_id": { + "job_id": "example_job_id", "jobState": { "job_id": "example_job_id", "status": "running", @@ -305,14 +252,14 @@ By design, these should only be seen by the `JobCommChannel` instance, then sent The backend bundles together multiple messages of the same type in an object indexed by key (usually job or cell ID). In nearly all cases, the frontend then separates out the data and sends it out on individual channels for each job or cell. ### `ERROR` -A general job comm error, capturing most errors that get thrown by the kernel +A general job comm error message, capturing exceptions that get thrown by the kernel **content** (this varies, but usually includes the below) - * `raw_request` - the original request message that wound up in an error - * `source` - request type or method that triggered the error + * `source` - request type, arbitrary string, or null, meant to indicate what triggered the error + * `request` - request data, arbitrary string, or null, meant to indicate the comm request that triggered the error Error messages usually also contain data about the error itself: - * `name` - the error type - * `message` - description of the issue + * `name` - the exception type name + * `message` - the exception message **bus** `ERROR` @@ -327,24 +274,32 @@ Dictionary with key(s) job ID and value dictionaries with the following structur * `job_params` - the unstructured set of parameters sent to the execution engine * `batch_id` - id of batch container job -i.e. +In case of error, the response has instead the keys: + * `job_id` + * `error` - brief message explaining the issue + +Sample response JSON: ```json { "job_id_1": { - "app_id": ..., - "app_name": ..., + "app_id": string, + "app_name": string, "job_id": "job_id_1", "job_params": ..., "batch_id": "some_batch_id", }, - "job_id_2": { ...contents... } + "job_id_2": { + ... + }, + ..., + "error_id_1": { + "job_id": "error_id_1", + "error": string + }, + ... } ``` -In case of error, the response has the keys: - * `job_id` - * `error` - brief message explaining the issue - **bus** The frontend splits out the job info and distributes it out by job ID. @@ -352,14 +307,22 @@ The frontend splits out the job info and distributes it out by job ID. The current job state. This one is probably most common. **content** -Dictionary with key(s) job ID and value dictionaries with the following structure: - * `jobState` - see **Data Structures** below for details (it's big and shouldn't be repeated all over this document). Regarding error states: non-existent jobs have the status `does_not_exist`, and when the job state cannot be retrieved from EE2 the status `ee2_error` is used - * `outputWidgetInfo` - the parameters to send to output widgets, only available for a completed job +Dictionaries generated from `job.output_state()` with key(s) generally being: + * `job_id` with the value being a dictionary with the following structure: + * `job_id` + * `jobState` - see **Data Structures** below for details (it's big and shouldn't be repeated all over this document). + * `outputWidgetInfo` - the parameters to send to output widgets, only available for a completed job + * `last_checked` - ns epoch that tells the frontend when this `STATUS` request was completed so the frontend can use it to make the next `STATUS` request + +In case of some errors, the response has instead the keys: + * `job_id` + * `error` - brief message explaining the issue Sample response JSON: ```json { "job_id_1": { + "job_id": "job_id_1", "jobState": { "job_id": "job_id_1", "status": "running", @@ -367,14 +330,27 @@ Sample response JSON: }, "outputWidgetInfo": null, // only available for completed jobs }, - "job_id_2": { ...contents... } + "job_id_2": { + "job_id": "job_id_2", + "error": "Cannot find job with ID job_id_2" + }, + "job_id_3": { + "job_id": "job_id_3", + "jobState": { + "job_id": "job_id_3", + "status": "completed", + ... + }, + "outputWidgetInfo": { + ... + }, + "error": , + }, + ..., + "last_checked": 1647888392311312353 } ``` -In case of error, the response has the keys: - * `job_id` - * `error` - brief message explaining the issue - **bus** Job status data is split out into individual jobs by the frontend and distributed. @@ -382,15 +358,19 @@ Job status data is split out into individual jobs by the frontend and distribute ### `STATUS_ALL` The set of all job states for all running jobs, or at least the set that should be updated (those that are complete and not requested by the front end are not included - if a job is sitting in an error or finished state, it doesn't need ot have its app cell updated) -**content** - all of the below are included, but the top-level keys are all job id strings, e.g.: +**content** +Dictionary similar to `STATUS` response. All of the below are included, but the top-level keys are all job id strings, e.g.: + * `job_id` + * `jobState` - the job state (see the **Data Structures** section below for details + * `outputWidgetInfo` - the parameters to send to output widgets, only available for a completed job + +Sample response JSON: ```json { "job_id_1": { ...contents... }, "job_id_2": { ...contents... } } ``` - * `jobState` - the job state (see the **Data Structures** section below for details - * `outputWidgetInfo` - the parameters to send to output widgets, only available for a completed job **bus** - see `STATUS` @@ -415,7 +395,6 @@ In case of error, the response has the keys: The most common log error encountered is that logs are not found -- this can occur if the job has not yet started running or the job was terminated whilst it was still in the job queue. **bus** `LOGS` - Log data is split out into individual jobs and sent to `job_id` (see above) @@ -424,6 +403,18 @@ Sent when one or more jobs are retried **content** Dictionary with key(s) original job ID and value dictionaries with the following structure: +Where the dict values corresponding to "job" or "retry" are the same data structures as for `job_status` +Outer keys: + * `job_id` - string, ID of the job that was retried (the retry parent) + * `job` - string, the job state object of that job + * `retry_id` - string, ID of the new job + * `retry` - string, the job state object of the new job that was launched + +In case of error, the response has instead the keys: + * `job_id` + * `error` - brief message explaining the issue + +Sample response JSON: ```json { "job_id_1": { @@ -435,28 +426,18 @@ Dictionary with key(s) original job ID and value dictionaries with the following "job_id_2": { "job_id": "job_id_2", "job": {"jobState": {"job_id": "job_id_2", "status": status, ...} ...}, - "error": "..." + "error": "..." // from EE2 }, ... - job_id_x: { - "job": {"jobState": {"job_id": "job_id_x", "status": "does_not_exist"}}, + "error_id_1": { + "job_id": "error_id_1", "error": "does_not_exist" } } ``` -Where the dict values corresponding to "job" or "retry" are the same data structures as for `job_status` -Outer keys: - * `job_id` - string, ID of the job that was retried (the retry parent) - * `job` - string, the job state object of that job - * `retry_id` - string, ID of the new job - * `retry` - string, the job state object of the new job that was launched -In case of error, the response has the keys: - * `job_id` - * `error` - brief message explaining the issue **bus** `RETRY` - Retry data is split out into individual jobs and sent to `job_id` @@ -484,42 +465,57 @@ All cases: **bus** `RUN_STATUS` sent unchanged to `cell_id` +## Job Management flow -## Job Management flow on backend (in IPython kernel, biokbase.narrative.jobs package) -These steps define the process of creating a new app running job. -1. User clicks "Run" on App Cell in browser. - * Cell provides app_id, cell_id, run_id, and parameters. - * Invokes biokbase.narrative.appmanager.AppManager.run_app. -2. `AppManager.run_app` validates the following bits of information before passing them on to EE2: - * App (based on id, version, and spec). - * Parameters (based on the app spec). -3. `AppManager.run_app` preparation and start: - * Convert app params from user-readable to machine-understandable (via the spec input_mapping). - * Fetch cell id, run id, workspace id, user token. - * Create an agent token on behalf of the user. This effectively makes a new authentication token that has a two-week lifetime, separate from the current login token. For example, if the user's current login token has a remaining lifespan of 1 hour, then the new job will be able to continue long past that. - * Submit all of the above to `NarrativeJobService.run_job`. -4. Get response from `NarrativeJobService.run_job` - * Combine with info from step 3, create `biokbase.narrative.jobs.job.Job` object. - * submit new `Job` to `biokbase.narrative.jobs.jobmanager.JobManager` singleton object. -5. `AppManager` tells the `JobComm` channel to (1) fetch the new job status and push it to the browser, and (2) start the job lookup loop for the newly created job. (calls `AppManager.register_new_job`) +### JobManager initialization and startup. -## JobManager initialization and startup. These steps take place whenever the user loads a narrative, or when the kernel is restarted. This ensures that the JobManager in the kernel is kept up-to-date on Job status. -1. User starts kernel (opens a Narrative, or clicks Kernel -> Restart) -2. `jobCommChannel.js` (front end widget) executes the following kernel call: + +* User starts kernel (opens a Narrative, or clicks Kernel -> Restart) +* `jobCommChannel.js` (front end widget) executes the following kernel call: ```py JobManager().initialize_jobs() -JobComm().start_update_loop +JobComm().start_job_status_loop(cell_list=cell_list, init_jobs=True) ``` -3. `JobManager` does: - * Get current user and workspace id. - * `ExecutionEngine2.check_workspace_jobs` with the workspace id - gets the set of jobs in that workspace, and builds them into `Job` objects. -4. `JobComm` does: +* The `JobManager` runs `check_workspace_jobs` on EE2 with the current workspace ID to fetch the list of jobs associated with the workspace. This is filtered to include only those in `cell_list`, and then `Job` objects are built from the data. +* `JobComm` does: * Starts the lookup loop thread. * On the first pass, this looks up status of all jobs and pushes them forward to the browser. * If any jobs are in a terminal state, they'll stopped being looked up automatically. If all jobs are terminated, then the loop thread itself stops. -## JobComm status lookup loop. + +### Running an app + +These steps define the process by which jobs are created by running an app. + +* User clicks "Run" on app or batch cell in the browser. + * Cell provides app_id, cell_id, run_id, and parameters. + * Invokes `biokbase.narrative.appmanager.AppManager.run_app` or `run_app_batch`. +* `AppManager.run_app` validates the following bits of information before passing them on to EE2: + * App (based on id, version, and spec). + * Parameters (based on the app spec). +* `AppManager.run_app` preparation and start: + * Convert app params from user-readable to machine-understandable (via the spec input_mapping). + * Fetch cell id, run id, workspace id, user token. + * Create an agent token on behalf of the user. This effectively makes a new authentication token that has a two-week lifetime, separate from the current login token. For example, if the user's current login token has a remaining lifespan of 1 hour, then the new job will be able to continue long past that. + * Submit all of the above to the Execution Engine (EE2) endpoint `run_job` or `run_job_batch`. +* Get response from EE2 +* Send a `RUN_STATUS` message to the browser, giving the results of the job submission; this will either contain a job ID or list of job IDs, if the submission was successful, or an error if not. +* Assuming the submission was successful, the job IDs are combine with info from the job submission to create `biokbase.narrative.jobs.job.Job` objects. These are registered with the `biokbase.narrative.jobs.jobmanager.JobManager` singleton object. +* On the frontend, the cell receiving the `RUN_STATUS` message will send `STATUS` requests to the backend to track the progress of the job. Other message types (`LOGS`, `CANCEL`, `RETRY`, etc.) are sent as needed. + * Batch cells have an update mechanism that triggers a `STATUS` request a short period after receiving the last update from the backend. + * App cells use the job lookup loop to keep up-to-date with job status. + + +### App initialisation on reload + +These steps are taken when an app with a running (or previously-run) job starts up. +* Job ID(s) are retrieved from the app meta data +* If the job was not in a terminal state when it was saved, the app requests `STATUS` from the backend. +* Job request and response flow follows the pattern above. + + +#### JobComm status lookup loop -- TO BE DEPRECATED 1. Calls `JobComm._lookup_job_status_loop`, which in turn calls `JobComm.lookup_all_job_states`. This gets forwarded to `JobManager.lookup_all_job_states`, and the results pushed to the browser as a comm channel message. 2. Internal to `JobManager.lookup_all_job_states`, the following steps happen: * Build a list of job ids to lookup - those that are flagged for lookup. @@ -536,7 +532,7 @@ JobComm().start_update_loop ## Data Structures ### Job state #### EE2 State -In kernel, as retrieved from EE2.check_job +In kernel, as retrieved from `EE2.check_job` (described by example) ```json { @@ -584,54 +580,35 @@ In kernel, as retrieved from EE2.check_job } ``` #### BE Output State -As sent to browser, includes cell info and run info -``` -{ - outputWidgetInfo: (if not finished, None, else...) job.get_viewer_params result - jobState: { - job_id: string, - status: string, - created: epoch ms, - updated: epoch ms, - queued: optional - epoch ms, - finished: optional - epoch ms, - terminated_code: optional - int, - tag: string (release, beta, dev), - parent_job_id: optional - string or null, - run_id: string, - cell_id: string, - errormsg: optional - string, - error (optional): { - code: int, - name: string, - message: string (should be for the user to read), - error: string, (likely a stacktrace) - }, - error_code: optional - int - created: 1583863267000, - batch_id: str, - batch_job: bool, - child_jobs: array, - retry_ids: array, - retry_parent: str - } -} -``` - -When an error occurs while preparing the job state, the output states will have the formats +As generated by `job.output_state()` and sent to browser, includes cell info and run info ```json { - "job_id_0": { - "jobState": {"job_id": "job_id_0", "status": "does_not_exist"} - }, - "job_id_1": { + "job_id": string, + "outputWidgetInfo": (if not finished, None, else...) job.get_viewer_params result, "jobState": { - "job_id": "job_id_1", - "status": "ee2_error", - "updated": 1234567890, // the current epoch time - ... // cached job data - } - }, - ... + "job_id": string, + "status": string, + "batch_id": str, + "batch_job": bool, + "child_jobs": array, + "retry_ids": array, + "retry_parent": str, + "created": epoch ms, + "queued": optional - epoch ms, + "finished": optional - epoch ms, + "updated": epoch ms, + "terminated_code": optional - int, + "error": { // optional + "code": int, + "name": string, + "message": string, (should be for the user to read), + "error": string, (likely a stacktrace) + }, + "run_id": string, + "cell_id": string, + "tag": string (release, beta, dev), + "error_code": optional - int, + "errormsg": optional - string, + } } ``` diff --git a/kbase-extension/static/kbase/config/job_config.json b/kbase-extension/static/kbase/config/job_config.json index bc8f7b4d74..f0198a7eba 100644 --- a/kbase-extension/static/kbase/config/job_config.json +++ b/kbase-extension/static/kbase/config/job_config.json @@ -4,7 +4,8 @@ "CELL_ID": "cell_id", "CELL_ID_LIST": "cell_id_list", "JOB_ID": "job_id", - "JOB_ID_LIST": "job_id_list" + "JOB_ID_LIST": "job_id_list", + "TS": "ts" }, "message_types": { "CANCEL": "cancel_job", diff --git a/src/biokbase/narrative/jobs/job.py b/src/biokbase/narrative/jobs/job.py index fb16b9014b..6d7e0f7b3e 100644 --- a/src/biokbase/narrative/jobs/job.py +++ b/src/biokbase/narrative/jobs/job.py @@ -25,9 +25,7 @@ "scheduler_type", "scheduler_id", ] - EXCLUDED_JOB_STATE_FIELDS = JOB_INIT_EXCLUDED_JOB_STATE_FIELDS + ["job_input"] - OUTPUT_STATE_EXCLUDED_JOB_STATE_FIELDS = EXCLUDED_JOB_STATE_FIELDS + ["user", "wsid"] EXTRA_JOB_STATE_FIELDS = ["batch_id", "child_jobs"] @@ -100,7 +98,7 @@ def __init__(self, ee2_state, extra_data=None, children=None): if ee2_state.get("job_id") is None: raise ValueError("Cannot create a job without a job ID!") - self._acc_state = ee2_state + self._update_state(ee2_state) self.extra_data = extra_data # verify parent-children relationship @@ -325,20 +323,30 @@ def _update_state(self, state: dict) -> None: """ given a state data structure (as emitted by ee2), update the stored state in the job object """ - if state: + if not isinstance(state, dict): + raise TypeError("state must be a dict") + # Check job_id match + if self._acc_state: if "job_id" in state and state["job_id"] != self.job_id: raise ValueError( f"Job ID mismatch in _update_state: job ID: {self.job_id}; state ID: {state['job_id']}" ) - state = copy.deepcopy(state) - if self._acc_state is None: - self._acc_state = state - else: - self._acc_state.update(state) + # Check if there would be no change in updating + # i.e., if state <= self._acc_state + if self._acc_state is not None: + if {**self._acc_state, **state} == self._acc_state: + return + + state = copy.deepcopy(state) + if self._acc_state is None: + self._acc_state = state + else: + self._acc_state.update(state) + self.last_updated = time.time_ns() - def state(self, force_refresh=False): + def state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): """ Queries the job service to see the state of the current job. """ @@ -347,7 +355,7 @@ def state(self, force_refresh=False): state = self.query_ee2_state(self.job_id, init=False) self._update_state(state) - return self._internal_state(JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) + return self._internal_state(exclude) def _internal_state(self, exclude=None): """Wrapper for self._acc_state""" @@ -355,39 +363,55 @@ def _internal_state(self, exclude=None): self._trim_ee2_state(state, exclude) return state - def output_state(self, state=None) -> dict: + def output_state(self, state=None, no_refresh=False) -> dict: """ - :param state: can be queried individually from ee2/cache with self.state(), - but sometimes want it to be queried in bulk from ee2 upstream - :return: dict, with structure - - { - outputWidgetInfo: (if not finished, None, else...) job.get_viewer_params result - jobState: { - job_id: string, - status: string, - created: epoch ms, - updated: epoch ms, - queued: optional - epoch ms, - finished: optional - epoc ms, - terminated_code: optional - int, - tag: string (release, beta, dev), - parent_job_id: optional - string or null, - run_id: string, - cell_id: string, - errormsg: optional - string, - error (optional): { - code: int, - name: string, - message: string (should be for the user to read), - error: string, (likely a stacktrace) - }, - error_code: optional - int - } - } + :param state: Supplied when the state is queried beforehand from EE2 in bulk, + or when it is retrieved from a cache. If not supplied, must be + queried with self.state() or self._internal_state() + :return: dict - with structure generally like (not accounting for error modes): + { + "job_id": string, + "jobState": { + "status": string - enum, + "created": epoch ms, + "updated": epoch ms, + "queued": epoch ms, + "running": epoch ms, + "finished": epoch ms, + "batch_job": bool, + "job_output": { + "version": string, + "result": [ + { + "obj_ref": string, + "report_name": string, + "report_ref": string, + } + ], + "id": string + }, + "batch_id": string, + "child_jobs": list, + "retry_ids": list, + "retry_count": int, + "job_id": string, + "created": epoch ms + }, + "outputWidgetInfo": { # None if not finished + "name": string, + "tag": string - (release, beta, dev), + "params": { + "wsName": string, + "obj_ref": string, + "report_name": string, + "report_ref": string + "report_window_line_height": string + } + } + } """ if not state: - state = self.state() + state = self._internal_state() if no_refresh else self.state() else: self._update_state(state) state = self._internal_state() diff --git a/src/biokbase/narrative/jobs/jobcomm.py b/src/biokbase/narrative/jobs/jobcomm.py index 5e5dbc378c..c883168574 100644 --- a/src/biokbase/narrative/jobs/jobcomm.py +++ b/src/biokbase/narrative/jobs/jobcomm.py @@ -1,6 +1,7 @@ import copy import threading from typing import List, Union +import time from ipykernel.comm import Comm from biokbase.narrative.jobs.util import load_job_constants from biokbase.narrative.jobs.jobmanager import JobManager @@ -16,7 +17,8 @@ JOBS_NOT_PROVIDED_ERR = "job_id_list not provided" CELLS_NOT_PROVIDED_ERR = "cell_id_list not provided" BATCH_NOT_PROVIDED_ERR = "batch_id not provided" -ONE_INPUT_TYPE_ONLY_ERR = "Please provide one of job_id, job_id_list, or batch_id" +NO_INPUT_TYPE_ERR = "Please provide one of job_id, job_id_list, or batch_id" +ONE_INPUT_TYPE_ONLY_ERR = "Please provide at most one of job_id, job_id_list, or batch_id" INVALID_REQUEST_ERR = "Improperly formatted job channel message!" MISSING_REQUEST_TYPE_ERR = "Missing request type in job channel message!" @@ -28,58 +30,54 @@ class JobRequest(object): """ A small wrapper for job comm channel request data. This generally comes in the form of a packet from the kernel Comm object. - It's expected to be a dict of the format: + It is expected to be a dict of the format: { content: { 'comm_id': , 'data': { - 'request_type': effectively, the function requested. + 'request_type': job function requested 'job_id': optional 'job_id_list': optional + 'batch_id': optional } } } - This little wrapper fills 2 roles: + This little wrapper fills some roles: 1. It validates that the packet is correct and has some expected values. - 2. If there's a job_id field, it makes sure that it's real (by asking the - JobManager - avoids a bunch of duplicate validation code) - - Each JobRequest has at most one of a job_id or job_id_list. If the job comm - channel request data is received with a job_id_list, it may be split up - into multiple JobRequests. Likewise, if the job comm channel request data - is received with a job_id, a JobRequest may be created with a job_id_list - containing that job_id + 2. It provides any job ID information with guardrails Provides the following attributes: raw_request dict the unedited request received by the job comm msg_id str unique message id request_type str the function to perform. This isn't - strictly controlled here, but by JobComm._handle_comm_message. + strictly controlled here, but by + JobComm._handle_comm_message. rq_data dict the actual data of the request. Contains the request - type and other parameters specific to the function to be performed + type and other parameters specific to the function + to be performed - Optional: + The IDs of the job(s) to perform the function on (optional): job_id str job_id_list list(str) batch_id str - - The IDs of the job(s) to perform the function on. - """ + INPUT_TYPES = [PARAM["JOB_ID"], PARAM["JOB_ID_LIST"], PARAM["BATCH_ID"]] + def __init__(self, rq: dict): - self.raw_request = copy.deepcopy(rq) + rq = copy.deepcopy(rq) + self.raw_request = rq self.msg_id = rq.get("msg_id") # might be useful later? self.rq_data = rq.get("content", {}).get("data") - if self.rq_data is None: + if not self.rq_data: raise JobRequestException(INVALID_REQUEST_ERR) self.request_type = self.rq_data.get("request_type") - if self.request_type is None: + if not self.request_type: raise JobRequestException(MISSING_REQUEST_TYPE_ERR) input_type_count = 0 - for input_type in [PARAM["JOB_ID"], PARAM["JOB_ID_LIST"], PARAM["BATCH_ID"]]: + for input_type in self.INPUT_TYPES: if input_type in self.rq_data: input_type_count += 1 if input_type_count > 1: @@ -105,6 +103,9 @@ def batch_id(self): return self.rq_data[PARAM["BATCH_ID"]] raise JobRequestException(BATCH_NOT_PROVIDED_ERR) + def has_job_ids(self): + return any([input_type in self.rq_data for input_type in self.INPUT_TYPES]) + def has_batch_id(self): return PARAM["BATCH_ID"] in self.rq_data @@ -114,6 +115,11 @@ def cell_id_list(self): return self.rq_data[PARAM["CELL_ID_LIST"]] raise JobRequestException(CELLS_NOT_PROVIDED_ERR) + @property + def ts(self): + """This param is completely optional""" + return self.rq_data.get(PARAM["TS"]) + class JobComm: """ @@ -181,14 +187,21 @@ def __init__(self): MESSAGE_TYPE["STOP_UPDATE"]: self._modify_job_updates, } - def _get_job_ids(self, req: JobRequest = None): + def _get_job_ids(self, req: JobRequest = None) -> List[str]: + """ + Extract the job IDs from a job request object + + :param req: the job request to take the IDs from, defaults to None + :type req: JobRequest, optional + + :return: list of job IDs + :rtype: List[str] + """ + if not req.has_job_ids(): + raise JobRequestException(NO_INPUT_TYPE_ERR) if req.has_batch_id(): return self._jm.update_batch_job(req.batch_id) - - try: - return req.job_id_list - except Exception as ex: - raise JobRequestException(ONE_INPUT_TYPE_ONLY_ERR) from ex + return req.job_id_list def start_job_status_loop( self, @@ -208,11 +221,12 @@ def start_job_status_loop( self._jm.initialize_jobs(cell_list) except Exception as e: error = { - "error": "Unable to get initial jobs list", + "source": getattr(e, "source", "jc.start_job_status_loop"), + "request": getattr(e, "request", "jc.start_job_status_loop"), + "name": getattr(e, "name", type(e).__name__), "message": getattr(e, "message", UNKNOWN_REASON), + "error": "Unable to get initial jobs list", "code": getattr(e, "code", -1), - "source": getattr(e, "source", "jobmanager"), - "name": getattr(e, "name", type(e).__name__), } self.send_comm_message(MESSAGE_TYPE["ERROR"], error) # if job init failed, set the lookup loop var back to False and return @@ -260,9 +274,12 @@ def _get_all_job_states( def _get_job_states_by_cell_id(self, req: JobRequest = None) -> dict: """ - Fetches status of all jobs associated with the given cell ID(s) - :param req: a JobRequest with the cell_id_list of interest - :returns: dict in the form + Fetches status of all jobs associated with the given cell ID(s). + + :param req: job request object with the cell_id_list param set, defaults to None + :type req: JobRequest, optional + + :return: dictionary in the form { "jobs": { # dict with job IDs as keys and job states as values @@ -276,6 +293,7 @@ def _get_job_states_by_cell_id(self, req: JobRequest = None) -> dict: "cell_two": [ ... ], } } + :rtype: dict """ cell_job_states = self._jm.get_job_states_by_cell_id( cell_id_list=req.cell_id_list @@ -285,41 +303,83 @@ def _get_job_states_by_cell_id(self, req: JobRequest = None) -> dict: def _get_job_info(self, req: JobRequest) -> dict: """ - Look up job info. This is just some high-level generic information about the running - job, including the app id, name, and job parameters. - :param req: a JobRequest with the job_id_list of interest - :returns: a dict keyed with job IDs and with values of dicts with the following keys: - - app_id - str - module/name, - - app_name - str - name of the app as it shows up in the Narrative interface - - batch_id - str - the batch parent ID (if appropriate) - - job_id - str - just re-reporting the id string - - job_params - dict - the params that were passed to that particular job + Gets job information for a list of job IDs. + + Job info for a given job ID is in the form: + { + "app_id": string in the form "/", + "app_name": string, + "job_id": string, + "job_params": dictionary, + "batch_id": string | None, + } + + Jobs that cannot be found in the `_running_jobs` index will return + { + "job_id": string, + "error": "Cannot find job with ID " + } + + :param req: job request with a list of job IDs + :type req: JobRequest + :return: dictionary containing job info for each input job, indexed by job ID + :rtype: dict """ job_id_list = self._get_job_ids(req) job_info = self._jm.get_job_info(job_id_list) self.send_comm_message(MESSAGE_TYPE["INFO"], job_info) return job_info - def __get_job_states(self, job_id_list) -> dict: + def _get_send_job_states(self, job_id_list: list, ts: int = None) -> dict: """ - Look up job states. + Retrieves the job states for the supplied job_ids. + If the ts parameter is present, only jobs that have been updated since that time are returned. + + Jobs that cannot be found in the `_running_jobs` index will return + { + "job_id": string, + "error": "Cannot find job with ID " + } - Returns a dictionary of job state information indexed by job ID. + :param job_id_list: job IDs to retrieve job states for + :type job_id_list: list + :param ts: timestamp, in the format generated by time.time_ns(); defaults to None + :type ts: int, optional + + :return: dictionary of job states, indexed by job ID + :rtype: dict """ - output_states = self._jm.get_job_states(job_id_list) + output_states = self._jm.get_job_states(job_id_list, ts) self.send_comm_message(MESSAGE_TYPE["STATUS"], output_states) return output_states def get_job_state(self, job_id: str) -> dict: """ + Retrieve the job state for a single job. + This differs from the _get_job_state (underscored version) in that it just takes a job_id string, not a JobRequest. + + :param job_id: the job ID to get the state for + :type job_id: string + + :return: dictionary of job states, indexed by job ID + :rtype: dict """ - return self.__get_job_states([job_id]) + return self._get_send_job_states([job_id]) def _get_job_states(self, req: JobRequest) -> dict: + """ + Retrieves the job states for the supplied job_ids. + + :param req: job request with a list of job IDs + :type req: JobRequest + + :return: dictionary of job states, indexed by job ID + :rtype: dict + """ job_id_list = self._get_job_ids(req) - return self.__get_job_states(job_id_list) + return self._get_send_job_states(job_id_list, req.ts) def _modify_job_updates(self, req: JobRequest) -> dict: """ @@ -354,11 +414,16 @@ def _modify_job_updates(self, req: JobRequest) -> dict: def _cancel_jobs(self, req: JobRequest) -> dict: """ - This cancels a running job. - If there are no valid jobs, this raises a JobRequestException. - If there's an error while attempting to cancel, this raises a NarrativeError. - In the end, after a successful cancel, this finishes up by fetching and returning the - job state with the new status. + Cancel a job or list of jobs. After sending the cancellation request, the job states + are refreshed and their new output states returned. + + See JobManager.cancel_jobs() for more details. + + :param req: job request containing job ID or list of job IDs to be cancelled + :type req: JobRequest + + :return: job output states, indexed by job ID + :rtype: dict """ job_id_list = self._get_job_ids(req) cancel_results = self._jm.cancel_jobs(job_id_list) @@ -366,6 +431,17 @@ def _cancel_jobs(self, req: JobRequest) -> dict: return cancel_results def _retry_jobs(self, req: JobRequest) -> dict: + """ + Retry a job or list of jobs. + + See JobManager.retry_jobs() for more details. + + :param req: job request containing job ID or list of job IDs to be retried + :type req: JobRequest + + :return: job retry data, indexed by job ID + :rtype: dict + """ job_id_list = self._get_job_ids(req) retry_results = self._jm.retry_jobs(job_id_list) self.send_comm_message(MESSAGE_TYPE["RETRY"], retry_results) @@ -373,7 +449,15 @@ def _retry_jobs(self, req: JobRequest) -> dict: def _get_job_logs(self, req: JobRequest) -> dict: """ - This returns a set of job logs based on the info in the request. + Fetch the logs for a job or list of jobs. + + See JobManager.get_job_logs_for_list() for more details. + + :param req: job request containing job ID or list of job IDs to fetch logs for + :type req: JobRequest + + :return: job log data, indexed by job ID + :rtype: dict """ job_id_list = self._get_job_ids(req) log_output = self._jm.get_job_logs_for_list( @@ -387,7 +471,8 @@ def _get_job_logs(self, req: JobRequest) -> dict: def _handle_comm_message(self, msg: dict) -> dict: """ - Handles comm messages that come in from the other end of the KBaseJobs channel. + Handle incoming messages on the KBaseJobs channel. + Messages get translated into one or more JobRequest objects, which are then passed to the right handler, based on the request. @@ -395,6 +480,15 @@ def _handle_comm_message(self, msg: dict) -> dict: Any unknown request is returned over the channel with message type 'job_error', and a JobRequestException is raised. + + :param msg: incoming comm message + :type msg: dict + + :raises JobRequestException: if the message type is not recognised + + :return: result of running the appropriate method; generally this is a dictionary + of job data indexed by job ID. + :rtype: dict """ with exc_to_msg(msg): request = JobRequest(msg) @@ -414,6 +508,9 @@ def send_comm_message(self, msg_type: str, content: dict) -> None: Sends a ipykernel.Comm message to the KBaseJobs channel with the given msg_type and content. These just get encoded into the message itself. """ + if msg_type == MESSAGE_TYPE["STATUS"]: + content["last_checked"] = time.time_ns() + msg = {"msg_type": msg_type, "content": content} self._comm.send(msg) @@ -421,34 +518,37 @@ def send_error_message( self, req: Union[JobRequest, dict, str], content: dict = None ) -> None: """ - Sends a comm message over the KBaseJobs channel as an error. This will have msg_type set to - ERROR ('job_error'), and include the original request in the message content as - "source". - - req can be the original request message or its JobRequest form. - Since the latter is made from the former, they have the same information. - It can also be a string or None if this context manager is invoked outside of a JC request - - This sends a packet that looks like: + Wrapper for self.send_comm_message generally resulting in a message like: { - request: the original JobRequest data object, function params, or function name - source: the function request that spawned the error - other fields about the error, dependent on the content. + "msg_type": "job_error", + "content": { + "source": request type from original incoming comm request, if available, else an arbitrary str/NoneType, + "request": request data from original incoming comm request, if available, else an arbitrary str/NoneType, + **{any extra error information} + } } + where this method computes "msg_type" and "content" and passes it to self.send_comm_message + + :param req: Can be the original comm request message or its JobRequest instantiation. + Since the latter is made from the former, they have the same information. + It can also be a string or None if this method is invoked outside of a JobComm request + :type req: JobRequest, dict, str, or NoneType + :param content: Additional error information + :type content: dict, optional """ error_content = {} if isinstance(req, JobRequest): - error_content["request"] = req.rq_data error_content["source"] = req.request_type + error_content["request"] = req.rq_data elif isinstance(req, dict): data = req.get("content", {}).get("data", {}) - error_content["request"] = data error_content["source"] = data.get("request_type") + error_content["request"] = data elif isinstance(req, str) or req is None: - error_content["request"] = req error_content["source"] = req + error_content["request"] = req - if content is not None: + if content: error_content.update(content) self.send_comm_message(MESSAGE_TYPE["ERROR"], error_content) @@ -456,7 +556,8 @@ def send_error_message( class exc_to_msg: """ - This is a context manager to wrap around JC code + This is a context manager to wrap around JobComm code in order to catch any exception, + send it back as a comm error messages, and then re-raise that exception """ jc = JobComm() @@ -476,17 +577,18 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, exc_tb): """ - If exception is caught, will send job comm message in this format + If an exception is caught during execution in the JobComm code, + this will send back a comm error message like: { - "msg_type": ERROR, + "msg_type": "job_error", "content": { - "source": "request_type", - "job_id": "0123456789abcdef", # or job_id_list. optional and mutually exclusive - "name": "ValueError", - "message": "Something happened", - #---------- Below, NarrativeException only ----------- - "code": -1, - "error": "Unable to complete this request" + "source": request type from original incoming comm request, if available, else an arbitrary str/NoneType, + "request": request data from original incoming comm request, if available, else an arbitrary str/NoneType, + "name": exception name, # e.g., ValueError + "message": exception message, # e.g. "Something specifically went wrong!" + #---------- Below, for NarrativeException only ----------- + "error": exception error attribute, # e.g. "Unable to complete this request" + "code": exception code attribute, # e.g., -1 } } Will then re-raise exception diff --git a/src/biokbase/narrative/jobs/jobmanager.py b/src/biokbase/narrative/jobs/jobmanager.py index a3a0fa8e5d..3ae0cba8c1 100644 --- a/src/biokbase/narrative/jobs/jobmanager.py +++ b/src/biokbase/narrative/jobs/jobmanager.py @@ -2,11 +2,13 @@ from jinja2 import Template from datetime import datetime, timezone, timedelta import copy -from typing import List, Tuple +from enum import Enum +from itertools import cycle +from typing import List, Tuple, Union +from collections.abc import Iterable import biokbase.narrative.clients as clients from .job import ( Job, - EXCLUDED_JOB_STATE_FIELDS, JOB_INIT_EXCLUDED_JOB_STATE_FIELDS, ) from biokbase.narrative.common import kblogging @@ -36,7 +38,57 @@ JOBS_MISSING_ERR = "No valid job IDs provided" CELLS_NOT_PROVIDED_ERR = "cell_id_list not provided" -DOES_NOT_EXIST = "does_not_exist" + + +class OutputStateErrMsg(Enum): + """ + For errors that go into the STATUS response. + Enum mapping from error names to formattable error messages. + The first formatting argument of each error message must be the job ID + """ + + NOT_FOUND = "Cannot find job with ID %s" + NOT_UPDATED = "Job with ID %s has not been updated since ts %d" + QUERY_EE2_STATES = "A Job.query_ee2_states error occurred for job with ID %s: %s" + CANCEL = "An EE2.cancel_job error occurred for job with ID %s: %s" + + def gen_err_msg(self, its: List[Union[str, int, Iterable]]): + """ + Create a generator for filled in enum values + + :param its: A list of arguments, where each argument + should be either a literal or an iterable of literals + The iterables will then be zipped and used to format the enum values. + The first argument must be the list of job_ids + """ + if not isinstance(its, list): + raise TypeError( + "Argument its must be of type list" + ) + # check number of arguments matches number of %s or %d to be formatted + num_format = self.value.count("%") + if num_format != len(its): + raise ValueError( + f"{self.__class__.__name__}.{self.name} must be formatted with {num_format} argument(s). " + f"Received {len(its)} argument(s)" + ) + + for i, e in enumerate(its): + # if element is a literal, convert to iterable of literal + if isinstance(e, str) or isinstance(e, int): + its[i] = cycle([e]) + + def gen(): + for tup in zip(*its): + yield self.value % tup + + return gen() + + def replace_result(self) -> bool: + names = [e.name for e in list(OutputStateErrMsg)] + replace = [True, True, False, False] + ans = dict(zip(names, replace)) + return ans[self.name] class JobManager(object): @@ -49,9 +101,9 @@ class JobManager(object): __instance = None # keys: job_id, values: { refresh = 1/0, job = Job object } - _running_jobs = dict() + _running_jobs = {} # keys: cell_id, values: set(job_1_id, job_2_id, job_3_id) - _jobs_by_cell_id = dict() + _jobs_by_cell_id = {} _log = kblogging.get_logger(__name__) @@ -74,12 +126,19 @@ def _reorder_parents_children(states: dict) -> dict: def _check_job_list(self, input_ids: List[str] = []) -> Tuple[List[str], List[str]]: """ - Deduplicates the input job list, maintaining insertion order + Deduplicates the input job list, maintaining insertion order. Any jobs not present in self._running_jobs are added to an error list - :param input_ids: a list of putative job IDs - :return results: tuple with items "job_ids", containing valid IDs; - and "error_ids", for jobs that the narrative backend does not know about + :param input_ids: list of putative job IDs, defaults to [] + :type input_ids: List[str], optional + + :raises JobRequestException: if the input_ids parameter is not a list or + or if there are no valid job IDs supplied + + :return: tuple with items + job_ids - valid job IDs + error_ids - jobs that the narrative backend does not know about + :rtype: Tuple[List[str], List[str]] """ if not isinstance(input_ids, list): raise JobRequestException(f"{JOBS_TYPE_ERR}: {input_ids}") @@ -103,10 +162,10 @@ def register_new_job(self, job: Job, refresh: bool = None) -> None: Registers a new Job with the manager and stores the job locally. This should only be invoked when a new Job gets started. - Parameters: - ----------- - job : biokbase.narrative.jobs.job.Job object - The new Job that was started. + :param job: a Job object for the new job that was started + :type job: Job + :param refresh: whether or not the job should be refreshed, defaults to None + :type refresh: bool, optional """ kblogging.log_event(self._log, "register_new_job", {"job_id": job.job_id}) @@ -127,14 +186,19 @@ def initialize_jobs(self, cell_ids: List[str] = None) -> None: """ Initializes this JobManager. This is expected to be run by a running Narrative, and naturally linked to a workspace. - So it does the following steps. + It runs the following steps: 1. gets the current workspace ID from app_util.system_variable('workspace_id') - 2. get list of jobs with that ws id from ee2 (also gets tag, cell_id, run_id) + 2. get job state data on all jobs with that ws id from ee2 3. initialize the Job objects and add them to the running jobs list 4. start the status lookup loop. + + :param cell_ids: list of cell IDs to filter the existing jobs for, defaults to None + :type cell_ids: List[str], optional + + :raises NarrativeException: if the call to ee2 fails """ ws_id = system_variable("workspace_id") - job_states = dict() + job_states = {} kblogging.log_event(self._log, "JobManager.initialize_jobs", {"ws_id": ws_id}) try: job_states = clients.get("execution_engine2").check_workspace_jobs( @@ -149,7 +213,7 @@ def initialize_jobs(self, cell_ids: List[str] = None) -> None: new_e = transform_job_exception(e, "Unable to initialize jobs") raise new_e - self._running_jobs = dict() + self._running_jobs = {} job_states = self._reorder_parents_children(job_states) for job_state in job_states.values(): child_jobs = None @@ -170,22 +234,22 @@ def initialize_jobs(self, cell_ids: List[str] = None) -> None: self.register_new_job(job, refresh) - def _create_jobs(self, job_ids) -> dict: + def _create_jobs(self, job_ids: List[str]) -> dict: """ + Given a list of job IDs, creates job objects for them and populates the _running_jobs dictionary. TODO: error handling - Given a list of job IDs, creates job objects for them and populates the _running_jobs dictionary + + :param job_ids: job IDs to create job objects for + :type job_ids: List[str] + + :return: dictionary of job states indexed by job ID + :rtype: dict """ job_ids = [job_id for job_id in job_ids if job_id not in self._running_jobs] if not len(job_ids): return {} - job_states = clients.get("execution_engine2").check_jobs( - { - "job_ids": job_ids, - "exclude_fields": JOB_INIT_EXCLUDED_JOB_STATE_FIELDS, - "return_list": 0, - } - ) + job_states = Job.query_ee2_states(job_ids, init=True) for job_state in job_states.values(): # do not set new jobs to be automatically refreshed - if the front end wants them # refreshed, it'll make a request. @@ -193,10 +257,17 @@ def _create_jobs(self, job_ids) -> dict: return job_states - def get_job(self, job_id): + def get_job(self, job_id: str) -> Job: """ - Returns a Job with the given job_id. - Raises a JobRequestException if not found. + Retrieve a job from the Job Manager's _running_jobs index. + + :param job_id: the job ID to be retrieved + :type job_id: str + + :raises JobRequestException: if the job cannot be found + + :return: Job object corresponding to that job ID + :rtype: Job """ if job_id not in self._running_jobs: raise JobRequestException(JOB_NOT_REG_ERR, job_id) @@ -206,9 +277,48 @@ def _construct_job_output_state_set( self, job_ids: List[str], states: dict = None ) -> dict: """ + Precondition: job_ids already validated + Builds a set of job states for the list of job ids. - :param job_ids: list of job IDs (may be empty) - :param states: dict, where each value is a state is from EE2 + The output will look like: + { + "job_id_0": { # dict generated by job.output_state() + "job_id": "job_id_0": + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": { + ... + } + }, + "job_id_1": { # dict generated by job.output_state() + "job_id": "job_id_1": + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": None + }, + ..., + "ee2_error_id_0": { # dict generated by job.output_state() with EE2 error message added + "job_id": "ee2_error_id_0": + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": ..., + "error": + }, + ... + } + + :param job_ids: list of job IDs + :type job_ids: List[str] + :param states: dict of job state data from EE2, indexed by job ID, defaults to None + :type states: dict, optional + + :raises JobRequestException: if job_ids is not a list + + :return: dict containing the output_state for each job, indexed by job ID. + :rtype: dict """ if not isinstance(job_ids, list): raise JobRequestException("job_ids must be a list") @@ -216,8 +326,8 @@ def _construct_job_output_state_set( if not len(job_ids): return {} - output_states = dict() - jobs_to_lookup = list() + output_states = {} + jobs_to_lookup = [] # Fetch from cache of terminated jobs, where available. # These are already post-processed and ready to return. @@ -231,17 +341,11 @@ def _construct_job_output_state_set( else: jobs_to_lookup.append(job_id) - fetched_states = dict() + fetched_states = {} # Get the rest of states direct from EE2. if len(jobs_to_lookup): try: - fetched_states = clients.get("execution_engine2").check_jobs( - { - "job_ids": jobs_to_lookup, - "exclude_fields": EXCLUDED_JOB_STATE_FIELDS, - "return_list": 0, - } - ) + fetched_states = Job.query_ee2_states(jobs_to_lookup, init=False) except Exception as e: error_message = str(e) kblogging.log_event( @@ -259,16 +363,78 @@ def _construct_job_output_state_set( output_states[job_id] = job.output_state(fetched_states[job_id]) else: # fetch the current state without updating it - output_states[job_id] = job.output_state({}) - # add an error field with the error message from the failed look up - output_states[job_id]["error"] = error_message + output_states[job_id] = job.output_state(no_refresh=True) + + failed_ids = [job_id for job_id in jobs_to_lookup if job_id not in fetched_states] + if failed_ids: + self.add_errors_to_results( + output_states, failed_ids, OutputStateErrMsg.QUERY_EE2_STATES, error_message + ) return output_states - def get_job_states(self, job_ids: List[str]) -> dict: - job_ids, error_ids = self._check_job_list(job_ids) + def get_job_states(self, job_ids: List[str], ts: int = None) -> dict: + """ + Retrieves the job states for the supplied job_ids, with the option to + replace any jobs that have not been updated since ts with a short stub + + (Omitting some error states from job.output_state()) the output is generally like: + { + "job_id_0": { # dict generated by job.output_state() + "job_id": "job_id_0": + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": { + ... + } + }, + "job_id_1": { # dict generated by job.output_state() + "job_id": "job_id_1": + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": None + }, + ..., + "ee2_error_id_0": { # dict generated by job.output_state() with EE2 error message added + "job_id": "ee2_error_id_0": + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": ..., + "error": + }, + ..., + "nonexistent_error_id_0": { # jobs that cannot be found in the `_running_jobs` index + "job_id": "nonexistent_error_id_0", + "error": "Cannot find job with ID nonexistent_error_id_0" + }, + ... + "not_updated_id_0": { # jobs that have not been updated since ts + "job_id": "not_updated_id_0", + "error": "Job with ID not_updated_id_0 has not been updated since ts 123085709827" + } + } + + :param job_ids: job IDs to retrieve job state data for + :type job_ids: List[str] + :param ts: timestamp (as generated by time.time_ns()) to filter the jobs, defaults to None + :type ts: int, optional + + :return: dictionary of job states, indexed by job ID + :rtype: dict + """ + job_ids, not_found_ids = self._check_job_list(job_ids) output_states = self._construct_job_output_state_set(job_ids) - return self.add_errors_to_results(output_states, error_ids) + not_updated_ids = [] + if ts is not None: + for job_id in output_states: + if self.get_job(job_id).last_updated <= ts: + not_updated_ids.append(job_id) + self.add_errors_to_results(output_states, not_found_ids, OutputStateErrMsg.NOT_FOUND) + self.add_errors_to_results(output_states, not_updated_ids, OutputStateErrMsg.NOT_UPDATED, ts) + return output_states def get_all_job_states(self, ignore_refresh_flag=False) -> dict: """ @@ -276,26 +442,35 @@ def get_all_job_states(self, ignore_refresh_flag=False) -> dict: If ignore_refresh_flag is True, then returns states for all jobs this JobManager knows about (i.e. all jobs associated with the workspace). - This returns them all as a dictionary, keyed on the job id. - :param ignore_refresh_flag: boolean - if True, ignore the usual refresh state of the job. - Even if the job is stopped, or completed, fetch and return its state from the service. + :param ignore_refresh_flag: if True, ignore the refresh state of the job -- return the state + regardless of whether the job is stopped or completed. Defaults to False. + :type ignore_refresh_flag: bool, optional + + :return: dictionary of job states, indexed by job ID + :rtype: dict """ - jobs_to_lookup = list() + jobs_to_lookup = [] # grab the list of running job ids, so we don't run into update-while-iterating problems. - for job_id in self._running_jobs.keys(): + for job_id in self._running_jobs: if self._running_jobs[job_id]["refresh"] or ignore_refresh_flag: jobs_to_lookup.append(job_id) if len(jobs_to_lookup) > 0: return self._construct_job_output_state_set(jobs_to_lookup) - return dict() + return {} def _get_job_ids_by_cell_id(self, cell_id_list: List[str] = None) -> tuple: """ - Finds jobs with a cell_id in cell_id_list - Mappings of job ID to cell ID are added when new jobs are registered - Returns a list of job IDs and a mapping of cell IDs to the list of - job IDs associated with the cell. + Finds jobs with a cell_id in cell_id_list. + Mappings of job ID to cell ID are added when new jobs are registered. + + :param cell_id_list: cell IDs to retrieve job state data for + :type cell_id_list: List[str] + + :return: tuple with two components: + job_id_list: list of job IDs associated with the cell IDs supplied + cell_to_job_mapping: mapping of cell IDs to the list of job IDs associated with the cell + :rtype: tuple """ if not cell_id_list: raise JobRequestException(CELLS_NOT_PROVIDED_ERR) @@ -310,9 +485,15 @@ def _get_job_ids_by_cell_id(self, cell_id_list: List[str] = None) -> tuple: def get_job_states_by_cell_id(self, cell_id_list: List[str] = None) -> dict: """ - Fetch job states for jobs with a cell_id in cell_id_list - Returns a dictionary of job states keyed by job ID and a mapping of - cell IDs to the list of job IDs associated with the cell. + Retrieves the job states for jobs associated with the cell_id_list supplied. + + :param cell_id_list: cell IDs to retrieve job state data for + :type cell_id_list: List[str] + + :return: dictionary with two keys: + 'jobs': job states, indexed by job ID + 'mapping': mapping of cell IDs to the list of job IDs associated with the cell + :rtype: dict """ (jobs_to_lookup, cell_to_job_mapping) = self._get_job_ids_by_cell_id( cell_id_list @@ -325,19 +506,31 @@ def get_job_states_by_cell_id(self, cell_id_list: List[str] = None) -> dict: def get_job_info(self, job_ids: List[str]) -> dict: """ - Sends the info over the comm channel as these packets: + Gets job information for a list of job IDs. + + Job info for a given job ID is in the form: + { + "app_id": string in the form "/", + "app_name": string, + "job_id": string, + "job_params": dictionary, + "batch_id": string | None, + } + + Jobs that cannot be found in the `_running_jobs` index will return { - app_id: module/name, - app_name: random string, - job_id: string, - job_params: dictionary, - batch_id: string, + "job_id": string, + "error": "Cannot find job with ID " } - Will set packet to the generic job not found message if job_id doesn't exist. + + :param job_ids: job IDs to retrieve job info for + :type job_ids: List[str] + :return: job info for each job, indexed by job ID + :rtype: dict """ job_ids, error_ids = self._check_job_list(job_ids) - infos = dict() + infos = {} for job_id in job_ids: job = self.get_job(job_id) infos[job_id] = { @@ -347,7 +540,7 @@ def get_job_info(self, job_ids: List[str]) -> dict: "job_id": job_id, "job_params": job.params, } - return self.add_errors_to_results(infos, error_ids) + return self.add_errors_to_results(infos, error_ids, OutputStateErrMsg.NOT_FOUND) def get_job_logs( self, @@ -357,32 +550,47 @@ def get_job_logs( latest: bool = False, ) -> dict: """ - :param job_id: str - the job id from the execution engine - :param first_line: int - the first line to be requested by the log. 0-indexed. If < 0, + Retrieves job logs for the job ID supplied. + + Jobs logs for a given job ID are in the form: + { + "job_id": string, + "batch_id": string | None, + "first": int - the first line returned, + "latest": bool - whether the latest lines were returned, + "max_lines": int - the number of logs lines currently available for that job, + "lines": list - the lines themselves, fresh from the server; these are dicts in the form + "line" - the log line string + "is_error" - either 0 or 1 + } + + If there is an error when retrieving logs (e.g. the job has yet to start or + it is a batch job and does not generate logs), the return structure will be: + { + "job_id": string + "batch_id": string | None + "error": string - error message + } + + :param job_id: the job id from the execution engine + :type job_id: str + :param first_line: the first line to be requested by the log. 0-indexed. If < 0, this will be set to 0 + :type first_line: int, defaults to 0 :param num_lines: int - the maximum number of lines to return. if < 0, will be reset to 0. if None, then will not be considered, and just return all the lines. - :param latest: bool - if True, will only return the most recent max_lines - of logs. This overrides the first_line parameter if set to True. If the call made is - get_job_logs(id, first_line=0, num_lines=5, latest=True), and there are 100 - log lines available, then lines 96-100 will be returned. - :returns: dict with keys: - job_id: string - batch_id: string | None - first: int - the first line returned - latest: bool - whether the latest lines were returned - max_lines: int - the number of logs lines currently available for that job - lines: list - the lines themselves, fresh from the server. These are all tiny dicts with keys - "line" - the log line string - "is_error" - either 0 or 1 + :type num_lines: int, defaults to None. + :param latest: if True, will only return the most recent max_lines + of logs. If set to True, overrides the first_line parameter; e.g. for the call + + get_job_logs(id, first_line=0, num_lines=5, latest=True) - If there is an error when retrieving logs (e.g. the job - has yet to start or it is a batch job and does not generate - logs), the return structure will be: - job_id: string - batch_id: string | None - error: string - error message + if there are 100 log lines available, then lines 96-100 will be returned. + :type latest: boolean, defaults to False. + + :return: job log data for each job, indexed by job ID + :rtype: dict """ job = self.get_job(job_id) @@ -426,6 +634,24 @@ def get_job_logs_for_list( ) -> dict: """ Fetch the logs for a list of jobs. Note that the parameters supplied are applied to all jobs. + + Jobs that cannot be found in the `_running_jobs` index will return + { + "job_id": string, + "error": "Cannot find job with ID " + } + + :param job_id_list: list of jobs to fetch logs for + :type job_id_list: List[str] + :param first_line: the first line to be returned, defaults to 0 + :type first_line: int, optional + :param num_lines: number of lines to be returned, defaults to None + :type num_lines: int, optional + :param latest: whether to return the latest log lines; only relevant if num_lines is set. Defaults to False + :type latest: bool, optional + + :return: job log data indexed by job ID; see get_job_logs for details + :rtype: dict """ job_ids, error_ids = self._check_job_list(job_id_list) @@ -433,23 +659,33 @@ def get_job_logs_for_list( for job_id in job_ids: output[job_id] = self.get_job_logs(job_id, first_line, num_lines, latest) - return self.add_errors_to_results(output, error_ids) + return self.add_errors_to_results(output, error_ids, OutputStateErrMsg.NOT_FOUND) def cancel_jobs(self, job_id_list: List[str]) -> dict: """ - Cancel a list of running jobs, placing them in a canceled state - Does NOT delete the jobs. - If the job_ids are not present or are not found in the Narrative, - a JobRequestException is raised. + Cancel a list of jobs and return their new state. After sending the cancellation + request, the job states are refreshed and their new output states returned. - Results are returned as a dict of job status objects keyed by job id + Jobs that trigger an error when cancelled will return + { + "job_id": string, + "error": + } + + Jobs that cannot be found in the `_running_jobs` index will return + { + "job_id": string, + "error": "Cannot find job with ID " + } - :param job_id_list: list of strs - :return job_states: dict with keys job IDs and values job state objects + :param job_id_list: job IDs to cancel + :type job_id_list: List[str] + :return: job output states, indexed by job ID + :rtype: dict """ job_ids, error_ids = self._check_job_list(job_id_list) - error_states = dict() + error_states = {} for job_id in job_ids: if not self.get_job(job_id).was_terminal(): error = self._cancel_job(job_id) @@ -457,12 +693,21 @@ def cancel_jobs(self, job_id_list: List[str]) -> dict: error_states[job_id] = error.message job_states = self._construct_job_output_state_set(job_ids) - for job_id in error_states: - job_states[job_id]["error"] = error_states[job_id] - return self.add_errors_to_results(job_states, error_ids) + self.add_errors_to_results(job_states, list(error_states.keys()), OutputStateErrMsg.CANCEL, list(error_states.values())) + self.add_errors_to_results(job_states, error_ids, OutputStateErrMsg.NOT_FOUND) + return job_states def _cancel_job(self, job_id: str) -> None: + """ + Cancel a single job. If an error occurs during cancellation, that error is converted + into a NarrativeException and returned to the caller. + + :param job_id: job ID to be cancelled + :type job_id: str + :return: if present, the exception raised when trying to cancel the job + :rtype: NarrativeException | None + """ # Stop updating the job status while we try to cancel. # Set the job to a special state of 'canceling' while we're doing the cancel is_refreshing = self._running_jobs[job_id].get("refresh", False) @@ -479,26 +724,44 @@ def _cancel_job(self, job_id: str) -> None: def retry_jobs(self, job_id_list: List[str]) -> dict: """ - Returns - [ - { - "job_id": job_id, - "job": {"state": {"job_id": job_id, "status": status, ...} ...}, - "retry_id": retry_id, - "retry": {"state": {"job_id": retry_id, "status": status, ...} ...} + Retry a list of job IDs, returning job output states for the jobs to be retried + and the new jobs created by the retry command. + + Retry data for a given job ID is in the form: + { + "job_id": "job_id_1", + "job": { # i.e. a job.output_state() object + "jobState": {"job_id": "job_id_1", "status": status, ...} + ... }, - { - "job": {"state": {"job_id": job_id, "status": status, ...} ...}, - "error": "..." + "retry_id": "retry_id_1", + "retry": { # i.e. a job.output_state() object + "jobState": {"job_id": "retry_id_1", "status": status, ...} + ... } - ... - { - "job": {"state": {"job_id": job_id, "status": DOES_NOT_EXIST}}, - "error": f"Cannot find job with ID {job_id}", - } - ] - where the innermost dictionaries are job states from ee2 and are within the - job states from job.output_state() + } + + If the job cannot be retried (e.g. it is a batch job or the user doesn't have permissions), + the error message from ee2 will be returned: + { + "job_id": string, + "job": { "jobState": { ... }, ... }, + "error": "Cannot retry a batch parent job", # from ee2 + } + + Jobs that cannot be found in the `_running_jobs` index will return + { + "job_id": string, + "error": "Cannot find job with ID " + } + + :param job_id_list: list of job IDs + :type job_id_list: List[str] + + :raises NarrativeException: if EE2 returns an error from the retry request + + :return: job retry data indexed by job ID + :rtype: dict """ job_ids, error_ids = self._check_job_list(job_id_list) try: @@ -506,7 +769,8 @@ def retry_jobs(self, job_id_list: List[str]) -> dict: {"job_ids": job_ids} ) except Exception as e: - raise transform_job_exception(e, "Unable to retry job(s)") + raise transform_job_exception(e, "Unable to retry job(s)") from e + # for each retry result, refresh the state of the retried and new jobs orig_ids = [result["job_id"] for result in retry_results] retry_ids = [ @@ -516,30 +780,62 @@ def retry_jobs(self, job_id_list: List[str]) -> dict: retry_states = self._construct_job_output_state_set( retry_ids, self._create_jobs(retry_ids) # add to self._running_jobs index ) - job_states = {**orig_states, **retry_states} results_by_job_id = {} # fill in the job state details for result in retry_results: job_id = result["job_id"] - results_by_job_id[job_id] = {"job_id": job_id, "job": job_states[job_id]} + results_by_job_id[job_id] = {"job_id": job_id, "job": orig_states[job_id]} if "retry_id" in result: retry_id = result["retry_id"] - results_by_job_id[job_id]["retry_id"] = retry_id - results_by_job_id[job_id]["retry"] = job_states[retry_id] + results_by_job_id[job_id].update( + {"retry_id": retry_id, "retry": retry_states[retry_id]} + ) if "error" in result: results_by_job_id[job_id]["error"] = result["error"] - return self.add_errors_to_results(results_by_job_id, error_ids) + return self.add_errors_to_results(results_by_job_id, error_ids, OutputStateErrMsg.NOT_FOUND) - def add_errors_to_results(self, results: dict, error_ids: List[str]) -> dict: - """ - Add the generic "not found" error for each job_id in error_ids + @staticmethod + def add_errors_to_results( + results: dict, error_ids: List[str], error_enum: OutputStateErrMsg, *extra_its: Tuple + ) -> dict: """ - for error_id in error_ids: - results[error_id] = { - "job_id": error_id, - "error": f"Cannot find job with ID {error_id}", - } + Add error states to results + + :param results: dictionary of job data (output state, info, retry, etc.) indexed by job ID + :type results: dict + :param error_ids: list of IDs that could not be found + :type error_ids: List[str] + :param error_enum: an enum instance from JobStateErrMsg + :type error_enum: JobStateErrMsg + :param its: any extra arguments to feed into error_enum.gen_str_fill to format the error message + :type its: list + + :return: input results augmented by either extra error dictionaries or errors in existing dictionaries + :rtype: dict + """ + # create generator yielding error messages + gen_err_msg = error_enum.gen_err_msg([error_ids] + list(extra_its)) + + for error_id, err_msg in zip(error_ids, gen_err_msg): + # if there's already an error there + if error_id in results and "error" in results[error_id]: + existing_error = results[error_id]["error"] + # concatenate the errors (works recursively) + err_msg = f"{existing_error}\n{err_msg}" + + if error_enum.replace_result(): + results[error_id] = { + "job_id": error_id, + "error": err_msg, + } + else: + if error_id not in results: + raise ValueError(f"Cannot add error because response dict is missing key {error_id}") + results[error_id].update( + {"error": err_msg} + ) + return results def modify_job_refresh(self, job_ids: List[str], update_refresh: bool) -> None: diff --git a/src/biokbase/narrative/jobs/util.py b/src/biokbase/narrative/jobs/util.py index 0174485a9c..d1752dd307 100644 --- a/src/biokbase/narrative/jobs/util.py +++ b/src/biokbase/narrative/jobs/util.py @@ -15,8 +15,8 @@ def load_job_constants(relative_path_to_file=JOB_CONFIG_FILE_PATH_PARTS): Load the job-related terms that are shared by front- and back ends. """ full_path = [os.environ["NARRATIVE_DIR"]] + relative_path_to_file - config_json = open(os.path.join(*full_path)).read() - config = json.loads(config_json) + with open(os.path.join(*full_path)) as fh: + config = json.load(fh) REQUIRED = { "message_types": [ "CANCEL", @@ -31,7 +31,7 @@ def load_job_constants(relative_path_to_file=JOB_CONFIG_FILE_PATH_PARTS): "ERROR", "RUN_STATUS", ], - "params": ["BATCH_ID", "CELL_ID_LIST", "JOB_ID", "JOB_ID_LIST"], + "params": ["BATCH_ID", "CELL_ID_LIST", "JOB_ID", "JOB_ID_LIST", "TS"], } # ensure we have all the required message type and param names diff --git a/src/biokbase/narrative/tests/generate_test_results.py b/src/biokbase/narrative/tests/generate_test_results.py index 91fa444d2e..ba9969e2f6 100644 --- a/src/biokbase/narrative/tests/generate_test_results.py +++ b/src/biokbase/narrative/tests/generate_test_results.py @@ -16,9 +16,13 @@ ) """ -generate_test_results.py is used to generate the job message data that the narrative backend produces and that the frontend consumes. It uses data from `ee2_job_test_data_file` and `app_specs_file` and provides expected narrative backend message data, as well as a number of mappings that are used in python tests. +generate_test_results.py is used to generate the job message data that the narrative backend produces and that the frontend consumes. +It uses data from `ee2_job_test_data_file` and `app_specs_file` and provides expected narrative backend message data, +as well as a number of mappings that are used in python tests. -The narrative backend message data is written as JSON to the `response_data_file`, indexed by message type and job ID. By default, if `response_data_file` does not exist when `generate_test_results.py` is run, job message data will be saved there. The `response_data_file` can also be generated by running `generate_test_results.py` with the `--force` argument. +The narrative backend message data is written as JSON to the `response_data_file`, indexed by message type and job ID. +By default, if `response_data_file` does not exist when `generate_test_results.py` is run, job message data will be saved there. +The `response_data_file` can also be generated by running `generate_test_results.py` with the `--force` argument. """ diff --git a/src/biokbase/narrative/tests/job_test_constants.py b/src/biokbase/narrative/tests/job_test_constants.py index 68bd9b2a51..7828cd27c5 100644 --- a/src/biokbase/narrative/tests/job_test_constants.py +++ b/src/biokbase/narrative/tests/job_test_constants.py @@ -35,7 +35,14 @@ def get_test_job(job_id): return copy.deepcopy(TEST_JOBS[job_id]) +def get_test_jobs(job_ids): + return {job_id: get_test_job(job_id) for job_id in job_ids} + + CLIENTS = "biokbase.narrative.clients.get" +TIME_NS = "biokbase.narrative.jobs.jobcomm.time.time_ns" + +TEST_EPOCH_NS = 42 # arbitrary epoch ns MAX_LOG_LINES = 10 diff --git a/src/biokbase/narrative/tests/test_job.py b/src/biokbase/narrative/tests/test_job.py index 1b60ea892c..6136b5ca6d 100644 --- a/src/biokbase/narrative/tests/test_job.py +++ b/src/biokbase/narrative/tests/test_job.py @@ -383,7 +383,7 @@ def test_job_update__no_state(self): """ job = create_job_from_ee2(JOB_CREATED) self.assertFalse(job.was_terminal()) - job._update_state(None) + job._update_state({}) self.assertFalse(job.was_terminal()) @mock.patch(CLIENTS, get_mock_client) diff --git a/src/biokbase/narrative/tests/test_jobcomm.py b/src/biokbase/narrative/tests/test_jobcomm.py index 84d1baefab..06698211f8 100644 --- a/src/biokbase/narrative/tests/test_jobcomm.py +++ b/src/biokbase/narrative/tests/test_jobcomm.py @@ -4,9 +4,11 @@ import itertools import re import copy +import time from biokbase.narrative.jobs.jobmanager import ( JobManager, + OutputStateErrMsg, JOB_NOT_REG_ERR, JOB_NOT_BATCH_ERR, JOBS_TYPE_ERR, @@ -32,9 +34,13 @@ transform_job_exception, ) +from src.biokbase.narrative.jobs.jobcomm import NO_INPUT_TYPE_ERR + from .util import ConfigTests, validate_job_state from biokbase.narrative.tests.job_test_constants import ( CLIENTS, + TIME_NS, + TEST_EPOCH_NS, MAX_LOG_LINES, JOB_COMPLETED, JOB_CREATED, @@ -60,6 +66,7 @@ BATCH_PARENT_CHILDREN, BATCH_CHILDREN, generate_error, + get_test_jobs, ) from biokbase.narrative.tests.generate_test_results import ( ALL_RESPONSE_DATA, @@ -108,7 +115,7 @@ def make_comm_msg( - msg_type: str, job_id_like, as_job_request: bool, content: dict = None + msg_type: str, job_id_like, as_job_request: bool, content: dict = {} ): job_arguments = {} if type(job_id_like) is dict: @@ -120,10 +127,9 @@ def make_comm_msg( msg = { "msg_id": "some_id", - "content": {"data": {"request_type": msg_type, **job_arguments}}, + "content": {"data": {"request_type": msg_type, **job_arguments, **content}}, } - if content is not None: - msg["content"]["data"].update(content) + if as_job_request: return JobRequest(msg) else: @@ -134,6 +140,20 @@ def get_app_data(*args): return {"info": {"name": APP_NAME}} +def ts_are_close(t0: int, t1: int, tol: float = 1) -> bool: + """Check if two times, in ns, are "close" + + Args: + t0 (int): time in ns + t1 (int): time in ns + tol (float, optional): tolerated discrepancy between the times, in s. Defaults to 1. + + Returns: + bool: Whether the two times differ by less than the tolerance + """ + return abs(t1 - t0) * 1e-9 <= 1 + + class JobCommTestCase(unittest.TestCase): maxDiff = None @@ -366,6 +386,22 @@ def test_req_no_inputs__fail(self): for msg_type in functions: req_dict = make_comm_msg(msg_type, None, False) + err = JobRequestException(NO_INPUT_TYPE_ERR) + with self.assertRaisesRegex(type(err), str(err)): + self.jc._handle_comm_message(req_dict) + self.check_error_message(req_dict, err) + + def test_req_multiple_inputs__fail(self): + functions = [ + CANCEL, + INFO, + LOGS, + RETRY, + STATUS, + ] + + for msg_type in functions: + req_dict = make_comm_msg(msg_type, {"job_id": "something", "batch_id": "another_thing"}, False) err = JobRequestException(ONE_INPUT_TYPE_ONLY_ERR) with self.assertRaisesRegex(type(err), str(err)): self.jc._handle_comm_message(req_dict) @@ -446,11 +482,12 @@ def test_start_job_status_loop__initialise_jobs_error(self): { "msg_type": ERROR, "content": { - "error": "Unable to get initial jobs list", - "message": "check_workspace_jobs failed", - "code": -32000, "source": "ee2", + "request": "jc.start_job_status_loop", "name": "JSONRPCError", + "message": "check_workspace_jobs failed", + "error": "Unable to get initial jobs list", + "code": -32000, }, }, ) @@ -478,6 +515,7 @@ def test_start_job_status_loop__no_jobs_stop_loop(self): # --------------------- @mock.patch(CLIENTS, get_mock_client) + @mock.patch(TIME_NS, lambda: TEST_EPOCH_NS) def check_job_output_states( self, output_states=None, @@ -486,6 +524,7 @@ def check_job_output_states( response_type=STATUS, ok_states=[], error_states=[], + last_checked=TEST_EPOCH_NS, ): """ Handle any request that returns a dictionary of job state objects; this @@ -498,6 +537,7 @@ def check_job_output_states( :param params: params for the comm message (opt) :param ok_states: list of job IDs expected to be in the output :param error_states: list of job IDs expected to return a not found error + :param last_checked: ts in ns """ if output_states is None: req_dict = make_comm_msg(request_type, params, False) @@ -512,6 +552,11 @@ def check_job_output_states( msg, ) + if response_type == STATUS: + self._check_pop_last_checked(output_states, last_checked) + else: + self.assertNotIn("last_checked", output_states) + for job_id, state in output_states.items(): self.assertEqual(ALL_RESPONSE_DATA[STATUS][job_id], state) if job_id in ok_states: @@ -529,6 +574,7 @@ def test_get_all_job_states__ok(self): # ----------------------- # Lookup single job state # ----------------------- + @mock.patch(TIME_NS, lambda: TEST_EPOCH_NS) def test_get_job_state__1_ok(self): output_states = self.jc.get_job_state(JOB_COMPLETED) self.check_job_output_states( @@ -541,9 +587,22 @@ def test_get_job_state__no_job(self): ): self.jc.get_job_state(None) + def test_lookup_job_state__live_ts(self): + output_states = self.jc.get_job_state(JOB_COMPLETED) + self.assertTrue( + ts_are_close(output_states["last_checked"], time.time_ns()) + ) + # ----------------------- # Lookup select job states # ----------------------- + def _check_pop_last_checked(self, output_states, last_checked=TEST_EPOCH_NS): + self.assertIn("last_checked", output_states) + self.assertTrue( + last_checked == output_states["last_checked"] + or ts_are_close(last_checked, output_states["last_checked"]) + ) + del output_states["last_checked"] def test_get_job_states__job_id__ok(self): self.check_job_output_states( @@ -598,6 +657,7 @@ def test_get_job_states__batch_id__not_batch(self): self.check_batch_id__not_batch_test(STATUS) @mock.patch(CLIENTS, get_mock_client) + @mock.patch(TIME_NS, lambda: TEST_EPOCH_NS) def test_get_job_states__job_id_list__ee2_error(self): exc = Exception("Test exception") exc_message = str(exc) @@ -612,10 +672,12 @@ def mock_check_jobs(params): self.jc._handle_comm_message(req_dict) msg = self.jc._comm.last_message + self._check_pop_last_checked(msg["content"], TEST_EPOCH_NS) + expected = {id: copy.deepcopy(ALL_RESPONSE_DATA[STATUS][id]) for id in ALL_JOBS} for job_id in ACTIVE_JOBS: # add in the ee2_error message - expected[job_id]["error"] = exc_message + expected[job_id]["error"] = OutputStateErrMsg.QUERY_EE2_STATES.value % (job_id, exc_message) self.assertEqual( { @@ -625,6 +687,80 @@ def mock_check_jobs(params): msg, ) + @mock.patch(CLIENTS, get_mock_client) + def test_get_job_states__err(self): + """ + """ + # what FE would say was the last time the jobs were checked + NOW = time.time_ns() + + # mix of terminal and not terminal + not_updated_ids = [JOB_COMPLETED, JOB_ERROR, JOB_TERMINATED, JOB_CREATED, JOB_RUNNING] + # not terminal + updated_ids = [BATCH_PARENT, BATCH_RETRY_RUNNING] + + # error ids + not_found_ids = [JOB_NOT_FOUND] + + job_ids = not_updated_ids + updated_ids + active_ids = list(set(job_ids) & set(ACTIVE_JOBS)) + + # output_states will be partitioned as + not_found_ids + terminal_ids = list(set(job_ids) - set(ACTIVE_JOBS)) + not_updated_active_ids = list(set(not_updated_ids) & set(active_ids)) + updated_active_ids = list(set(updated_ids) & set(active_ids)) # (yes, redundant) + + def mock_check_jobs(params): + """Update appropriate job states""" + lookup_ids = params["job_ids"] + self.assertCountEqual(active_ids, lookup_ids) # sanity check + + job_states_ret = get_test_jobs(lookup_ids) + for job_id, job_state in job_states_ret.items(): + # if job was updated, return an updated version + if job_id in updated_active_ids: + job_state["updated"] += 1 + return job_states_ret + + rq = make_comm_msg(STATUS, job_ids + not_found_ids, False, {"ts": NOW}) + with mock.patch.object(MockClients, "check_jobs", side_effect=mock_check_jobs): + output_states = self.jc._handle_comm_message(rq) + + # checks + exp_updated_output_states = { + job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) for job_id in updated_active_ids + } + for job_state in exp_updated_output_states.values(): + job_state["jobState"]["updated"] += 1 + + expected = { + # corresponding to not_found_ids + **{ + job_id: { + "job_id": job_id, + "error": OutputStateErrMsg.NOT_FOUND.value % job_id + } + for job_id in not_found_ids + }, + # corresponding to updated_active_ids + **exp_updated_output_states, + # corresponding to not_updated_active_ids and terminal_ids + **{ + job_id: { + "job_id": job_id, + "error": OutputStateErrMsg.NOT_UPDATED.value % (job_id, NOW) + } + for job_id in not_updated_active_ids + terminal_ids + }, + } + + self._check_pop_last_checked(output_states, NOW) + self.assertEqual( + expected, + output_states + ) + # ----------------------- # get cell job states # ----------------------- @@ -827,17 +963,20 @@ def test_cancel_jobs__job_id_list__all_bad_jobs(self): ) @mock.patch(CLIENTS, get_mock_client) + @mock.patch(TIME_NS, lambda: TEST_EPOCH_NS) def test_cancel_jobs__job_id_list__failure(self): # the mock client will throw an error with BATCH_RETRY_RUNNING job_id_list = [JOB_RUNNING, BATCH_RETRY_RUNNING] req_dict = make_comm_msg(CANCEL, job_id_list, False) output = self.jc._handle_comm_message(req_dict) print(output) + self._check_pop_last_checked(output) + expected = { JOB_RUNNING: ALL_RESPONSE_DATA[STATUS][JOB_RUNNING], BATCH_RETRY_RUNNING: { **ALL_RESPONSE_DATA[STATUS][BATCH_RETRY_RUNNING], - "error": CANCEL + " failed", + "error": OutputStateErrMsg.CANCEL.value % (BATCH_RETRY_RUNNING, CANCEL + " failed"), }, } @@ -1288,14 +1427,19 @@ def test_request_ok(self): rq.job_id_list def test_request_no_data(self): - rq_msg = {"msg_id": "some_id", "content": {}} - with self.assertRaisesRegex(JobRequestException, INVALID_REQUEST_ERR): - JobRequest(rq_msg) + rq_msg1 = {"msg_id": "some_id", "content": {}} + rq_msg2 = {"msg_id": "some_id", "content": {"data": {}}} + rq_msg3 = {"msg_id": "some_id", "content": {"data": None}} + rq_msg4 = {"msg_id": "some_id", "content": {"what": "?"}} + for msg in [rq_msg1, rq_msg2, rq_msg3, rq_msg4]: + with self.assertRaisesRegex(JobRequestException, INVALID_REQUEST_ERR): + JobRequest(msg) def test_request_no_req(self): - rq_msg = {"msg_id": "some_id", "content": {"data": {"request_type": None}}} - rq_msg2 = {"msg_id": "some_other_id", "content": {"data": {}}} - for msg in [rq_msg, rq_msg2]: + rq_msg1 = {"msg_id": "some_id", "content": {"data": {"request_type": None}}} + rq_msg2 = {"msg_id": "some_id", "content": {"data": {"request_type": ""}}} + rq_msg3 = {"msg_id": "some_id", "content": {"data": {"what": {}}}} + for msg in [rq_msg1, rq_msg2, rq_msg3]: with self.assertRaisesRegex(JobRequestException, MISSING_REQUEST_TYPE_ERR): JobRequest(msg) diff --git a/src/biokbase/narrative/tests/test_jobmanager.py b/src/biokbase/narrative/tests/test_jobmanager.py index a6735b01dd..9edb185280 100644 --- a/src/biokbase/narrative/tests/test_jobmanager.py +++ b/src/biokbase/narrative/tests/test_jobmanager.py @@ -7,10 +7,13 @@ from unittest import mock import re import os +from typing import List, Tuple +import time from IPython.display import HTML from biokbase.narrative.jobs.jobmanager import ( JobManager, + OutputStateErrMsg, JOB_NOT_REG_ERR, JOB_NOT_BATCH_ERR, JOBS_TYPE_ERR, @@ -27,9 +30,6 @@ NarrativeException, JobRequestException, ) - -from .util import ConfigTests - from biokbase.narrative.tests.job_test_constants import ( CLIENTS, JOB_COMPLETED, @@ -41,6 +41,7 @@ BATCH_TERMINATED, BATCH_TERMINATED_RETRIED, BATCH_ERROR_RETRIED, + BATCH_RETRY_RUNNING, JOB_NOT_FOUND, BAD_JOB_ID, ALL_JOBS, @@ -50,17 +51,18 @@ REFRESH_STATE, BATCH_CHILDREN, TEST_JOBS, + TEST_EPOCH_NS, get_test_job, + get_test_jobs, generate_error, ) - from biokbase.narrative.tests.generate_test_results import ( ALL_RESPONSE_DATA, JOBS_BY_CELL_ID, TEST_CELL_ID_LIST, TEST_CELL_IDs, ) - +from .util import ConfigTests from .narrative_mock.mockclients import ( get_mock_client, get_failing_mock_client, @@ -223,7 +225,7 @@ def test__construct_job_output_state_set__empty_list(self): @mock.patch(CLIENTS, get_mock_client) def test__construct_job_output_state_set__ee2_error(self): exc = Exception("Test exception") - exc_message = str(exc) + exc_msg = str(exc) def mock_check_jobs(params): raise exc @@ -236,9 +238,12 @@ def mock_check_jobs(params): for job_id in ALL_JOBS } - for job_id in ACTIVE_JOBS: + for job_id, meta_exc_msg in zip( + ACTIVE_JOBS, + OutputStateErrMsg.QUERY_EE2_STATES.gen_err_msg([ACTIVE_JOBS, exc_msg]) + ): # expect there to be an error message added - expected[job_id]["error"] = exc_message + expected[job_id]["error"] = meta_exc_msg self.assertEqual( expected, @@ -640,6 +645,67 @@ def test_get_job_states__empty(self): ): self.jm.get_job_states([]) + @mock.patch(CLIENTS, get_mock_client) + def test_get_job_states__last_updated(self): + """ + Test that only updated jobs return an actual state + and that the rest of the jobs return an error stub state + """ + # what FE would say was the last time the jobs were checked + NOW = time.time_ns() + + # mix of terminal and not terminal + not_updated_ids = [JOB_COMPLETED, JOB_ERROR, JOB_TERMINATED, JOB_CREATED, JOB_RUNNING] + # not terminal + updated_ids = [BATCH_PARENT, BATCH_RETRY_RUNNING] + + job_ids = not_updated_ids + updated_ids + active_ids = list(set(job_ids) & set(ACTIVE_JOBS)) + + # output_states will be partitioned as + terminal_ids = list(set(job_ids) - set(ACTIVE_JOBS)) + not_updated_active_ids = list(set(not_updated_ids) & set(active_ids)) + updated_active_ids = list(set(updated_ids) & set(active_ids)) # (yes, redundant) + + def mock_check_jobs(self_, params): + """Update appropriate job states""" + lookup_ids = params["job_ids"] + self.assertCountEqual(active_ids, lookup_ids) # sanity check + + job_states_ret = get_test_jobs(lookup_ids) + for job_id, job_state in job_states_ret.items(): + # if job was updated, return an updated version + if job_id in updated_active_ids: + job_state["updated"] += 1 + return job_states_ret + + with mock.patch.object(MockClients, "check_jobs", mock_check_jobs): + output_states = self.jm.get_job_states(job_ids, ts=NOW) + + updated_output_states = { + job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) for job_id in updated_active_ids + } + for job_state in updated_output_states.values(): + job_state["jobState"]["updated"] += 1 + + expected = { + # corresponding to updated_active_ids + **updated_output_states, + # corresponding to not_updated_active_ids and terminal_ids + **{ + job_id: { + "job_id": job_id, + "error": OutputStateErrMsg.NOT_UPDATED.value % (job_id, NOW) + } + for job_id in not_updated_active_ids + terminal_ids + } + } + + self.assertEqual( + expected, + output_states + ) + def test_update_batch_job__dne(self): with self.assertRaisesRegex( JobRequestException, f"{JOB_NOT_REG_ERR}: {JOB_NOT_FOUND}" @@ -740,6 +806,219 @@ def test_get_job_info(self): infos, {id: ALL_RESPONSE_DATA[MESSAGE_TYPE["INFO"]][id] for id in ALL_JOBS} ) + @mock.patch(CLIENTS, get_mock_client) + def test_add_errors_to_results__concat_errs__integrated(self): + active_ids = [JOB_CREATED, JOB_RUNNING] + terminal_ids = [JOB_COMPLETED] + job_ids = active_ids + terminal_ids + + check_jobs_err = "Something went wrong in EE2.check_jobs" + check_jobs_exc = RuntimeError(check_jobs_err) + cancel_job_errs = { + job_id: err + for job_id, err in zip( + job_ids, [f"EE2.check_job err {num}" for num in ["UNO", "DOS"]] + ) + } + + def mock_check_jobs(self, params): + raise check_jobs_exc + + def mock_cancel_job(self, job_id): + return NarrativeException( + None, cancel_job_errs[job_id], None, None, None + ) + + with mock.patch.object(MockClients, "check_jobs", mock_check_jobs): + with mock.patch.object(JobManager, "_cancel_job", mock_cancel_job): + output_states = self.jm.cancel_jobs(job_ids) + + exp = {job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) for job_id in job_ids} + for job_id in active_ids: + exp[job_id]["error"] = ( + f"A Job.query_ee2_states error occurred for job with ID {job_id}: {check_jobs_err}" + "\n" + f"An EE2.cancel_job error occurred for job with ID {job_id}: {cancel_job_errs[job_id]}" + ) + + self.assertEqual( + exp, + output_states + ) + + def test_add_errors_to_results__concat_errs__unit(self): + job_ids = ALL_JOBS + error_ids = [JOB_RUNNING, JOB_COMPLETED] + output_states = get_test_jobs(job_ids) + + check_jobs_err = "Test check_jobs exception" + cancel_job_errs = ["Test cancel_job exception UNO", "Test cancel_job exception DOS"] + + self.jm.add_errors_to_results( + output_states, error_ids, OutputStateErrMsg.QUERY_EE2_STATES, check_jobs_err + ) + self.jm.add_errors_to_results( + output_states, error_ids, OutputStateErrMsg.CANCEL, cancel_job_errs + ) + + for error_id, cancel_job_err in zip(error_ids, cancel_job_errs): + output_state = output_states[error_id] + self.assertIn("error", output_state) + self.assertEqual( + ( + f"A Job.query_ee2_states error occurred for job with ID {error_id}: {check_jobs_err}" + "\n" + f"An EE2.cancel_job error occurred for job with ID {error_id}: {cancel_job_err}" + ), + output_state["error"] + ) + + def test_add_errors_to_results__cannot_add_err(self): + job_ids = [JOB_RUNNING, JOB_COMPLETED] + error_ids = [JOB_CREATED] + output_states = get_test_jobs(job_ids) + + with self.assertRaisesRegex( + ValueError, f"Cannot add error because response dict is missing key {error_ids[0]}" + ): + self.jm.add_errors_to_results( + output_states, error_ids, OutputStateErrMsg.CANCEL, ["Test cancel_job exception`"] + ) + + +class OutputStateErrMsgTest(unittest.TestCase): + """ + Unit tests + """ + + JOB_IDS = [c + str(i) for c, i in zip(list("abc"), range(3))] + ERROR_IDS = JOB_IDS[1:] + CHECK_JOBS_ERR = "ee2.check_jobs rejection" + CANCEL_JOBS_ERR = [ + "ee2.cancel_job rejection UNO", "ee2.cancel_job rejection DOS" + ] + + maxDiff = None + + def get_orig_results(self): + return { + job_id: {"some": "random", "content": "with", "job_id": job_id} + for job_id in self.JOB_IDS + } + + def get_first_orig_result(self): + job_id = self.JOB_IDS[0] + return { + job_id: {"some": "random", "content": "with", "job_id": job_id} + } + + def add_errors_to_results( + self, results: dict, error_ids: List[str], error_enum: OutputStateErrMsg, *extra_its: Tuple + ): + """ + Strongly resembles jm.add_errors_to_results + But a pared down happy path method + """ + gen_err_msg = error_enum.gen_err_msg([error_ids] + list(extra_its)) + + for error_id, err_msg in zip(error_ids, gen_err_msg): + if error_enum.replace_result(): + results[error_id] = { + "job_id": error_id, + "error": err_msg, + } + else: + results[error_id].update( + {"error": err_msg} + ) + return results + + def test__NOT_FOUND(self): + results = self.add_errors_to_results( + self.get_orig_results(), self.ERROR_IDS, OutputStateErrMsg.NOT_FOUND + ) + self.assertEqual( + { + **self.get_first_orig_result(), + **{ + job_id: { + "job_id": job_id, + "error": f"Cannot find job with ID {job_id}" + } + for job_id in self.ERROR_IDS + } + }, + results + ) + + def test__NOT_UPDATED(self): + results = self.add_errors_to_results( + self.get_orig_results(), self.ERROR_IDS, OutputStateErrMsg.NOT_UPDATED, TEST_EPOCH_NS + ) + self.assertEqual( + { + **self.get_first_orig_result(), + **{ + job_id: { + "job_id": job_id, + "error": f"Job with ID {job_id} has not been updated since ts {TEST_EPOCH_NS}" + } + for job_id in self.ERROR_IDS + } + }, + results + ) + + def test__QUERY_EE2_STATES(self): + results = self.add_errors_to_results( + self.get_orig_results(), self.ERROR_IDS, OutputStateErrMsg.QUERY_EE2_STATES, self.CHECK_JOBS_ERR + ) + self.assertEqual( + { + **self.get_first_orig_result(), + **{ + job_id: { + "some": "random", "content": "with", "job_id": job_id, + "error": f"A Job.query_ee2_states error occurred for job with ID {job_id}: {self.CHECK_JOBS_ERR}" + } + for job_id in self.ERROR_IDS + } + }, + results + ) + + def test__CANCEL(self): + results = self.add_errors_to_results( + self.get_orig_results(), self.ERROR_IDS, OutputStateErrMsg.CANCEL, self.CANCEL_JOBS_ERR + ) + self.assertEqual( + { + **self.get_first_orig_result(), + **{ + job_id: { + "some": "random", "content": "with", "job_id": job_id, + "error": f"An EE2.cancel_job error occurred for job with ID {job_id}: {cancel_job_err}" + } + for job_id, cancel_job_err in zip(self.ERROR_IDS, self.CANCEL_JOBS_ERR) + } + }, + results + ) + + def test_gen_err_msg__wrong_type_arg(self): + with self.assertRaisesRegex( + TypeError, + "Argument its must be of type list" + ): + OutputStateErrMsg.NOT_FOUND.gen_err_msg(42) + + def test_gen_err_msg__wrong_num_format(self): + with self.assertRaisesRegex( + ValueError, + re.escape("OutputStateErrMsg.NOT_FOUND must be formatted with 1 argument(s). Received 2 argument(s)") + ): + OutputStateErrMsg.NOT_FOUND.gen_err_msg([self.ERROR_IDS, "extra_unused_format_arg"]) + if __name__ == "__main__": unittest.main()