Skip to content

Commit

Permalink
DATAUP-729
Browse files Browse the repository at this point in the history
add ts to STATUS requests
add last_checked to STATUS responses
unify error modes in response JSON
  • Loading branch information
n1mus committed Apr 16, 2022
1 parent 20428dd commit 542e952
Show file tree
Hide file tree
Showing 11 changed files with 1,305 additions and 471 deletions.
345 changes: 161 additions & 184 deletions docs/design/job_architecture.md

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion kbase-extension/static/kbase/config/job_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"CELL_ID": "cell_id",
"CELL_ID_LIST": "cell_id_list",
"JOB_ID": "job_id",
"JOB_ID_LIST": "job_id_list"
"JOB_ID_LIST": "job_id_list",
"TS": "ts"
},
"message_types": {
"CANCEL": "cancel_job",
Expand Down
106 changes: 65 additions & 41 deletions src/biokbase/narrative/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
"scheduler_type",
"scheduler_id",
]

EXCLUDED_JOB_STATE_FIELDS = JOB_INIT_EXCLUDED_JOB_STATE_FIELDS + ["job_input"]

OUTPUT_STATE_EXCLUDED_JOB_STATE_FIELDS = EXCLUDED_JOB_STATE_FIELDS + ["user", "wsid"]

EXTRA_JOB_STATE_FIELDS = ["batch_id", "child_jobs"]
Expand Down Expand Up @@ -100,7 +98,7 @@ def __init__(self, ee2_state, extra_data=None, children=None):
if ee2_state.get("job_id") is None:
raise ValueError("Cannot create a job without a job ID!")

self._acc_state = ee2_state
self._update_state(ee2_state)
self.extra_data = extra_data

# verify parent-children relationship
Expand Down Expand Up @@ -325,20 +323,30 @@ def _update_state(self, state: dict) -> None:
"""
given a state data structure (as emitted by ee2), update the stored state in the job object
"""
if state:
if not isinstance(state, dict):
raise TypeError("state must be a dict")

# Check job_id match
if self._acc_state:
if "job_id" in state and state["job_id"] != self.job_id:
raise ValueError(
f"Job ID mismatch in _update_state: job ID: {self.job_id}; state ID: {state['job_id']}"
)

state = copy.deepcopy(state)
if self._acc_state is None:
self._acc_state = state
else:
self._acc_state.update(state)
# Check if there would be no change in updating
# i.e., if state <= self._acc_state
if self._acc_state is not None:
if {**self._acc_state, **state} == self._acc_state:
return

state = copy.deepcopy(state)
if self._acc_state is None:
self._acc_state = state
else:
self._acc_state.update(state)
self.last_updated = time.time_ns()

def state(self, force_refresh=False):
def state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS):
"""
Queries the job service to see the state of the current job.
"""
Expand All @@ -347,47 +355,63 @@ def state(self, force_refresh=False):
state = self.query_ee2_state(self.job_id, init=False)
self._update_state(state)

return self._internal_state(JOB_INIT_EXCLUDED_JOB_STATE_FIELDS)
return self._internal_state(exclude)

def _internal_state(self, exclude=None):
"""Wrapper for self._acc_state"""
state = copy.deepcopy(self._acc_state)
self._trim_ee2_state(state, exclude)
return state

def output_state(self, state=None) -> dict:
def output_state(self, state=None, no_refresh=False) -> dict:
"""
:param state: can be queried individually from ee2/cache with self.state(),
but sometimes want it to be queried in bulk from ee2 upstream
:return: dict, with structure
{
outputWidgetInfo: (if not finished, None, else...) job.get_viewer_params result
jobState: {
job_id: string,
status: string,
created: epoch ms,
updated: epoch ms,
queued: optional - epoch ms,
finished: optional - epoc ms,
terminated_code: optional - int,
tag: string (release, beta, dev),
parent_job_id: optional - string or null,
run_id: string,
cell_id: string,
errormsg: optional - string,
error (optional): {
code: int,
name: string,
message: string (should be for the user to read),
error: string, (likely a stacktrace)
},
error_code: optional - int
}
}
:param state: Supplied when the state is queried beforehand from EE2 in bulk,
or when it is retrieved from a cache. If not supplied, must be
queried with self.state() or self._internal_state()
:return: dict - with structure generally like (not accounting for error modes):
{
"job_id": string,
"jobState": {
"status": string - enum,
"created": epoch ms,
"updated": epoch ms,
"queued": epoch ms,
"running": epoch ms,
"finished": epoch ms,
"batch_job": bool,
"job_output": {
"version": string,
"result": [
{
"obj_ref": string,
"report_name": string,
"report_ref": string,
}
],
"id": string
},
"batch_id": string,
"child_jobs": list,
"retry_ids": list,
"retry_count": int,
"job_id": string,
"created": epoch ms
},
"outputWidgetInfo": { # None if not finished
"name": string,
"tag": string - (release, beta, dev),
"params": {
"wsName": string,
"obj_ref": string,
"report_name": string,
"report_ref": string
"report_window_line_height": string
}
}
}
"""
if not state:
state = self.state()
state = self._internal_state() if no_refresh else self.state()
else:
self._update_state(state)
state = self._internal_state()
Expand Down
Loading

0 comments on commit 542e952

Please sign in to comment.