diff --git a/doc/Makefile.am b/doc/Makefile.am index f6e922bae6c4..17c731598100 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -53,7 +53,8 @@ MAN1_FILES_PRIMARY = \ man1/flux-resource.1 \ man1/flux-pgrep.1 \ man1/flux-cancel.1 \ - man1/flux-watch.1 + man1/flux-watch.1 \ + man1/flux-update.1 # These files are generated as clones of a primary page. # Sphinx handles this automatically if declared in the conf.py diff --git a/doc/man1/flux-update.rst b/doc/man1/flux-update.rst new file mode 100644 index 000000000000..2dc8cacc601e --- /dev/null +++ b/doc/man1/flux-update.rst @@ -0,0 +1,102 @@ +.. flux-help-section: jobs + +============== +flux-update(1) +============== + +SYNOPSIS +======== + +**flux** **update** [*OPTIONS*] JOBID KEY=VALUE [KEY=VALUE...] + +DESCRIPTION +=========== + +flux-update(1) requests an update of one or more attributes for an active +(pending or running) job. Updates are permitted and validated by the job +manager before being applied to the job. + +Keys are expressed as period-delimited strings corresponding to an attribute +path in jobspec. If a key does not start with ``attributes.``, ``tasks.``, +or ``.resources`` (the top-level jobspec keys allowed by RFC 14), then +the key is assumed to be prefixed with ``attributes.system.``, such that:: + + $ flux update f12345 myattr="value" + +would request an update of ``attributes.system.myattr`` to the string value +``"value"``. + +The flux-update(1) command may also support other convenient key aliases. +Key aliases are listed in the SPECIAL KEYS section below. + +Updates will be sent to the job manager update service, which checks that +the current user is permitted to apply all updates, and that all updates +are valid. If multiple updates are specified on the command line, then +all updates are either applied or the request fails. + +.. note:: + Job updates are allowed in the job manager by special plugins on + a case by case basis. At this time, the set of keys that can actually + be updated for a job may be very limited. + +The instance owner may be allowed to update specific attributes of jobs +and bypass validation checks. For example, the duration of a guest job may +be increased beyond currently configured limits if the update request is +performed by the instance owner. When a job is modified in this way, future +updates to the job by the guest user are denied with an error message:: + + job is immutable due to previous instance owner update + +This is necessary to prevent possible unintended bypass of limits or +other checks on a job by a guest. + +The flux-update(1) command may also support special handling of values +for specific keys. Those special cases are documented in the SPECIAL KEYS +section below. + +OPTIONS +======= + +**-n, --dry-run** + Do not send update to job manager, but print the updates in JSON to + stdout. + +**-v, --verbose** + Print updated keys on success. + +SPECIAL KEYS +============ + +*attributes.system.duration*, *duration* + Updates of the job ``duration`` can take the form of of *[+-]FSD*, where + ``+`` or ``-`` indicate an adjustment of the existing duration, and *FSD* + is any string or number in RFC 23 Flux Standard Duration. Examples include + ``60``, ``1m``, ``1.5h``, ``+10m``, ``-1h``. + +*name* + Alias for job name, i.e. ``attributes.system.job.name`` + +EXIT STATUS +=========== + +0 + All updates were successful + +1 + Updates were invalid or not permitted, or the provided JOBID was invalid + or inactive, or the user does not have permission to modify the job + +2 + Syntax or other command line error + +RESOURCES +========= + +Flux: http://flux-framework.org + +RFC 14: Canonical Job Specification: https://flux-framework.readthedocs.io/projects/flux-rfc/en/latest/spec_14.html + +SEE ALSO +======== + +:man1:`flux-jobs`, :man1:`flux-submit`, :man1:`flux-bulksubmit` diff --git a/doc/manpages.py b/doc/manpages.py index 5f3d3b8b95dc..a98885ab8182 100644 --- a/doc/manpages.py +++ b/doc/manpages.py @@ -68,6 +68,7 @@ ('man1/flux', 'flux', 'the Flux resource management framework', [author], 1), ('man1/flux-shell', 'flux-shell', 'the Flux job shell', [author], 1), ('man1/flux-watch', 'flux-watch', 'monitor one or more Flux jobs', [author], 1), + ('man1/flux-update', 'flux-update', 'update active Flux jobs', [author], 1), ('man3/flux_attr_get', 'flux_attr_set', 'get/set Flux broker attributes', [author], 3), ('man3/flux_attr_get', 'flux_attr_get', 'get/set Flux broker attributes', [author], 3), ('man3/flux_aux_set', 'flux_aux_get', 'get/set auxiliary handle data', [author], 3), diff --git a/doc/test/spell.en.pws b/doc/test/spell.en.pws index d3de87aefdae..2e66068caa87 100644 --- a/doc/test/spell.en.pws +++ b/doc/test/spell.en.pws @@ -717,3 +717,4 @@ MemoryHigh MemoryMax MemoryLow MemoryMin +myattr diff --git a/etc/completions/flux.pre b/etc/completions/flux.pre index 26adc2ee0025..03f048453c73 100644 --- a/etc/completions/flux.pre +++ b/etc/completions/flux.pre @@ -159,6 +159,29 @@ _flux_cancel() return 0 } +# flux-update(1) completions +_flux_update() +{ + local cmd=$1 + OPTS="\ + -v, --verbose \ + -n, --dry-run \ + " + if [[ $cur != -* ]]; then + # Attempt to substitute a pending jobid + compopt +o filenames + pending_jobs=$(flux jobs -f pending -no {id}) + COMPREPLY=( $(compgen -W "${pending_jobs}" -- "$cur") ) + return 0 + fi + COMPREPLY=( $(compgen -W "${OPTS} -h --help" -- "$cur") ) + if [[ "${COMPREPLY[@]}" == *= ]]; then + # Add space if there is not a '=' in suggestions + compopt -o nospace + fi + return 0 +} + # flux-mini(1) completions _flux_mini() { @@ -1474,6 +1497,9 @@ _flux_core() watch) _flux_watch $subcmd ;; + update) + _flux_update $subcmd + ;; -*) COMPREPLY=( $(compgen -W "${FLUX_OPTS}" -- "$cur") ) ;; diff --git a/src/bindings/python/flux/job/Jobspec.py b/src/bindings/python/flux/job/Jobspec.py index 6b60b27f4609..5f0f503922e7 100644 --- a/src/bindings/python/flux/job/Jobspec.py +++ b/src/bindings/python/flux/job/Jobspec.py @@ -533,12 +533,29 @@ def add_file(self, path, data, perms=0o0600, encoding=None): files[path] = Fileref(data, perms=perms, encoding=encoding) self.jobspec["attributes"]["system"]["files"] = files - def setattr(self, key, val): + def getattr(self, key): + """ + get attribute from jobspec using dotted key notation, e.g. + system.duration or optionally attributes.system.duration. + Raises KeyError if a component of key does not exist. + """ + if not key.startswith("attributes."): + key = "attributes." + key + value = self.jobspec + for attr in key.split("."): + value = value.get(attr) + if value is None: + raise KeyError + return value + + def setattr(self, key, val): """ set job attribute """ - set_treedict(self.jobspec, "attributes." + key, val) + if not key.startswith("attributes."): + key = "attributes." + key + set_treedict(self.jobspec, key, val) def setattr_shell_option(self, key, val): """ diff --git a/src/cmd/Makefile.am b/src/cmd/Makefile.am index 88732265bba1..1b8db7bf641a 100644 --- a/src/cmd/Makefile.am +++ b/src/cmd/Makefile.am @@ -115,6 +115,7 @@ dist_fluxcmd_SCRIPTS = \ flux-queue.py \ flux-cancel.py \ flux-watch.py \ + flux-update.py \ flux-imp-exec-helper fluxcmd_PROGRAMS = \ diff --git a/src/cmd/flux-update.py b/src/cmd/flux-update.py new file mode 100755 index 000000000000..91c3426a1419 --- /dev/null +++ b/src/cmd/flux-update.py @@ -0,0 +1,225 @@ +############################################################## +# Copyright 2023 Lawrence Livermore National Security, LLC +# (c.f. AUTHORS, NOTICE.LLNS, COPYING) +# +# This file is part of the Flux resource manager framework. +# For details, see https://github.com/flux-framework. +# +# SPDX-License-Identifier: LGPL-3.0 +############################################################## + +import argparse +import json +import logging +import math +import sys + +import flux +import flux.job +import flux.util + +LOGGER = logging.getLogger("flux-update") + + +class JobspecUpdates: + """ + Convenience class for building a jobspec-update payload from a + set of KEY=VALUE pairs on the command line, and a method to send + the update as a request to the job manager. + """ + + # Mapping of short key names, i.e. as given on the command line, + # to full dotted-path location in jobspec. + # + # Note: If a key doesn't exist in this mapping, but also does not start + # with 'attributes.', 'resources.' or 'tasks.', then 'attributes.system' + # is assumed. + # + key_aliases = {"name": "attributes.system.job.name"} + + def __init__(self, jobid, flux_handle=None): + self._flux_handle = flux_handle + self.jobid = jobid + self.updates = None + self.jobspec = None + + @property + def flux_handle(self): + if self._flux_handle is None: + self._flux_handle = flux.Flux() + return self._flux_handle + + def _apply_jobspec_updates(self, eventlog): + """ + Apply jobspec updates from eventlog to internal jobspec: + """ + for entry in eventlog.splitlines(): + event = flux.job.EventLogEvent(entry) + if event.name == "jobspec-update": + for key, value in event.context.items(): + self.jobspec.setattr(key, value) + + def _fetch_jobspec(self, key): + """ + Fetch dotted key 'key' in jobspec for this job, fetching jobspec + and eventlog (to apply jobspec-updates) if necessary. + """ + if self.jobspec is None: + lookup = flux.job.job_kvs_lookup( + self.flux_handle, jobid=self.jobid, keys=["jobspec", "eventlog"] + ) + self.jobspec = flux.job.JobspecV1(**lookup["jobspec"]) + self._apply_jobspec_updates(lookup["eventlog"]) + + return self.jobspec.getattr(key) + + def update_attributes_system_duration(self, value): + """ + Handle a duration update. + + If update begins with "+" or "-", then get duration from jobspec and + increase or decrease by the amount of the remaining argument. O/w, + treat value as an explicit new duration. + """ + result = None + if value.startswith(("-", "+")): + # relative update, fetch value first + duration = self._fetch_jobspec("attributes.system.duration") + if duration == 0: + raise ValueError( + f"duration for {self.jobid} is unlimited, " + f"can't update by {value}" + ) + + arg = flux.util.parse_fsd(value[1:]) + if value.startswith("-"): + result = duration - arg + if result <= 0.0: + duration = flux.util.fsd(duration) + raise ValueError( + f"current duration for {self.jobid} ({duration})" + f" cannot be reduced by {value[1:]}" + ) + else: + result = duration + arg + else: + result = flux.util.parse_fsd(value) + + # An unlimited duration is represented as 0. in jobspec, so + # check for infinity here and replace with 0. + # + if math.isinf(result): + result = 0.0 + return result + + def add_update(self, key, value): + """ + Append an update to the current updates object. + """ + if self.updates is None: + self.updates = {} + + # Handle any special keys aliases + if key in self.key_aliases: + key = self.key_aliases[key] + + # If key doesn't start with attributes, resources, or tasks, + # assume 'attributes.system.' for convenience: + if not key.startswith(("attributes.", "resources.", "tasks.")): + key = f"attributes.system.{key}" + try: + # Use any function update_attributes_system_blah() if + # registered to process the value: + # + function_signature = "update_" + key.replace(".", "_") + value = getattr(self, function_signature)(value) + except AttributeError: + # Otherwise, attempt to load value as JSON: + # + try: + value = json.loads(value) + except json.decoder.JSONDecodeError: + # Otherwise, load value as string: + # + value = str(value) + self.updates[key] = value + + def items(self): + """ + Convenience wrapper to return a copy of the current update + dictionary key, value pairs + """ + return self.updates.items() + + def to_json(self): + return json.dumps(self.updates) + + def send_rpc(self): + payload = {"id": self.jobid, "updates": self.updates} + return self.flux_handle.rpc("job-manager.update", payload) + + +def parse_args(): + parser = argparse.ArgumentParser( + prog="flux-update", formatter_class=flux.util.help_formatter() + ) + parser.add_argument( + "-n", + "--dry-run", + action="store_true", + help="Do not apply any updates, just emit update payload to stdout", + ) + parser.add_argument( + "-v", + "--verbose", + action="store_true", + default=0, + help="Be more verbose. Log updated items after success.", + ) + parser.add_argument( + "jobid", + metavar="JOBID", + type=flux.job.JobID, + help="Target jobid", + ) + parser.add_argument( + "updates", + metavar="KEY=VALUE", + type=str, + nargs="+", + help="Requested jobspec updates in KEY=VALUE form", + ) + return parser.parse_args() + + +@flux.util.CLIMain(LOGGER) +def main(): + sys.stdout = open( + sys.stdout.fileno(), "w", encoding="utf8", errors="surrogateescape" + ) + sys.stderr = open( + sys.stderr.fileno(), "w", encoding="utf8", errors="surrogateescape" + ) + + args = parse_args() + + updates = JobspecUpdates(args.jobid) + + for arg in args.updates: + key, _, value = arg.partition("=") + updates.add_update(key, value) + + if args.dry_run: + print(updates.to_json()) + sys.exit(0) + + updates.send_rpc().get() + if args.verbose: + for key, value in updates.items(): + LOGGER.info(f"updated {key} to {value}") + + +if __name__ == "__main__": + main() + +# vi: ts=4 sw=4 expandtab diff --git a/src/modules/job-manager/Makefile.am b/src/modules/job-manager/Makefile.am index 6646fbe716ea..5385fc31f259 100644 --- a/src/modules/job-manager/Makefile.am +++ b/src/modules/job-manager/Makefile.am @@ -65,11 +65,14 @@ libjob_manager_la_SOURCES = \ jobtap-internal.h \ jobtap.h \ jobtap.c \ + update.h \ + update.c \ plugins/priority-default.c \ plugins/limit-job-size.c \ plugins/limit-duration.c \ plugins/dependency-after.c \ plugins/begin-time.c \ + plugins/update-duration.c \ plugins/validate-duration.c \ plugins/history.c diff --git a/src/modules/job-manager/event.c b/src/modules/job-manager/event.c index 9b7d3894fec6..3d4024fd3876 100644 --- a/src/modules/job-manager/event.c +++ b/src/modules/job-manager/event.c @@ -43,7 +43,6 @@ #include "src/common/libczmqcontainers/czmq_containers.h" #include "src/common/libeventlog/eventlog.h" #include "src/common/libutil/errno_safe.h" -#include "src/common/libutil/jpath.h" #include "src/common/libjob/idf58.h" #include "ccan/ptrint/ptrint.h" #include "ccan/str/str.h" @@ -553,17 +552,11 @@ static int event_handle_dependency (struct job *job, */ static int event_handle_jobspec_update (struct job *job, json_t *context) { - const char *path; - json_t *val; - if (!job->jobspec_redacted || job->state == FLUX_JOB_STATE_RUN - || job->state == FLUX_JOB_STATE_CLEANUP) + || job->state == FLUX_JOB_STATE_CLEANUP + || job_apply_jobspec_updates (job, context) < 0) return -1; - json_object_foreach (context, path, val) { - if (jpath_set (job->jobspec_redacted, path, val) < 0) - return -1; - } return 0; } @@ -675,6 +668,13 @@ int event_job_update (struct job *job, json_t *event) else if (streq (name, "jobspec-update")) { if (event_handle_jobspec_update (job, context) < 0) goto inval; + /* Transition a job in SCHED state back to PRIORITY to trigger + * possible recalculation of job priority, update scheduler with + * new jobspec, etc. Job will transition back to SCHED after a + * priority is assigned. + */ + if (job->state == FLUX_JOB_STATE_SCHED) + job->state = FLUX_JOB_STATE_PRIORITY; } else if (strstarts (name, "dependency-")) { if (job->state == FLUX_JOB_STATE_DEPEND @@ -813,6 +813,33 @@ static int event_jobtap_call (struct event *event, name, idf58 (job->id)); + /* + * Notify plugins not subscribed to all events of a jobspec update + * since this is a more common case. + * + * This callback should occur before the state transition callback + * below, since jobspec-update will transition a job in SCHED back to + * PRIORITY, and plugins should be notified of the jobspec changes + * *before* the `job.state.priority` callback to allow for adjustment + * of internal state normally established before the first time the + * job.state.priority topic is called. + */ + if (streq (name, "jobspec-update")) { + json_t *updates; + if (json_unpack (entry, "{s:o}", "context", &updates) < 0) { + flux_log (event->ctx->h, + LOG_ERR, + "unable to unpack jobspec-update contexto for %s", + idf58 (job->id)); + return -1; + } + (void) jobtap_call (event->ctx->jobtap, + job, + "job.update", + "{s:O}", + "updates", updates); + } + if (job->state != old_state) { /* * Call plugin callback on state change diff --git a/src/modules/job-manager/job-manager.c b/src/modules/job-manager/job-manager.c index 6f75cb8d31db..058a388bfeca 100644 --- a/src/modules/job-manager/job-manager.c +++ b/src/modules/job-manager/job-manager.c @@ -34,6 +34,7 @@ #include "annotate.h" #include "journal.h" #include "getattr.h" +#include "update.h" #include "jobtap-internal.h" #include "job-manager.h" @@ -217,6 +218,10 @@ int mod_main (flux_t *h, int argc, char **argv) flux_log_error (h, "error creating journal interface"); goto done; } + if (!(ctx.update = update_ctx_create (&ctx))) { + flux_log_error (h, "error creating job update interface"); + goto done; + } if (!(ctx.jobtap = jobtap_create (&ctx))) { flux_log (h, LOG_ERR, "error creating jobtap interface"); goto done; @@ -250,6 +255,7 @@ int mod_main (flux_t *h, int argc, char **argv) alloc_ctx_destroy (ctx.alloc); submit_ctx_destroy (ctx.submit); event_ctx_destroy (ctx.event); + update_ctx_destroy (ctx.update); /* job aux containers may call destructors in jobtap plugins, so destroy * jobs before unloading plugins; but don't destroy job hashes until after. */ diff --git a/src/modules/job-manager/job-manager.h b/src/modules/job-manager/job-manager.h index b1d281e1780b..fa769425c7d7 100644 --- a/src/modules/job-manager/job-manager.h +++ b/src/modules/job-manager/job-manager.h @@ -33,6 +33,7 @@ struct job_manager { struct journal *journal; struct purge *purge; struct queue *queue; + struct update *update; struct jobtap *jobtap; }; diff --git a/src/modules/job-manager/job.c b/src/modules/job-manager/job.c index 145dc573806c..ece03198fc04 100644 --- a/src/modules/job-manager/job.c +++ b/src/modules/job-manager/job.c @@ -112,6 +112,10 @@ static int job_flag_set_internal (struct job *job, if (!dry_run) job->flags |= FLUX_JOB_DEBUG; } + else if (streq (flag, "immutable")) { + if (!dry_run) + job->immutable = 1; + } else { errno = EINVAL; return -1; @@ -588,6 +592,62 @@ const char *job_event_queue_print (struct job *job, char *buf, int size) return buf; } +bool validate_jobspec_updates (json_t *updates) +{ + const char *key; + json_t *entry; + json_object_foreach (updates, key, entry) { + if (!strstarts (key, "attributes.")) + return false; + } + return true; +} + +static int jobspec_apply_updates (json_t *jobspec, json_t *updates) +{ + const char *path; + json_t *val; + + if (!jobspec) { + errno = EINVAL; + return -1; + } + json_object_foreach (updates, path, val) { + if (jpath_set (jobspec, path, val) < 0) + return -1; + } + return 0; +} + +int job_apply_jobspec_updates (struct job *job, json_t *updates) +{ + if (jobspec_apply_updates (job->jobspec_redacted, updates) < 0 + || jobspec_redacted_parse_queue (job) < 0) + return -1; + return 0; +} + +json_t *job_jobspec_with_updates (struct job *job, json_t *updates) +{ + json_t *jobspec; + + if (!job->jobspec_redacted) { + errno = EAGAIN; + return NULL; + } + if (!(jobspec = json_deep_copy (job->jobspec_redacted))) { + errno = ENOMEM; + return NULL; + } + if (jobspec_apply_updates (jobspec, updates) < 0) { + int saved_errno = errno; + json_decref (jobspec); + errno = saved_errno; + return NULL; + } + return jobspec; +} + /* * vi:tabstop=4 shiftwidth=4 expandtab */ diff --git a/src/modules/job-manager/job.h b/src/modules/job-manager/job.h index a7907b2019d5..71d1130eeb42 100644 --- a/src/modules/job-manager/job.h +++ b/src/modules/job-manager/job.h @@ -46,6 +46,7 @@ struct job { uint8_t reattach:1; uint8_t eventlog_readonly:1;// job is inactive or invalid uint8_t hold_events:1; // queue events instead of posting immediately + uint8_t immutable:1; // user job updates are disabled uint8_t perilog_active; // if nonzero, prolog/epilog active @@ -127,6 +128,18 @@ int job_event_peek (struct job *job, int *flagsp, json_t **entryp); bool job_event_is_queued (struct job *job, const char *name); const char *job_event_queue_print (struct job *job, char *buf, int size); +/* Validate updates as valid RFC 21 jobspec-update event context: + */ +bool validate_jobspec_updates (json_t *updates); + +/* Apply updates to jobspec + */ +int job_apply_jobspec_updates (struct job *job, json_t *updates); + +/* Return a copy of the jobspec for 'job' with 'updates' applied. + */ +json_t *job_jobspec_with_updates (struct job *job, json_t *updates); + #endif /* _FLUX_JOB_MANAGER_JOB_H */ /* diff --git a/src/modules/job-manager/jobtap-internal.h b/src/modules/job-manager/jobtap-internal.h index a18e28ac528f..aa94a190223a 100644 --- a/src/modules/job-manager/jobtap-internal.h +++ b/src/modules/job-manager/jobtap-internal.h @@ -63,6 +63,37 @@ int jobtap_check_dependencies (struct jobtap *jobtap, bool raise_exception, char **errp); +/* Call `job.update.` callback to verify that a jobspec update of + * 'key' to 'value' is allowed. The flux_msg_cred parameter should be set + * to the credentials of the original requestor. + * + * Returns an error with 'errp' (Caller must free) set if no plugin is + * registered to handle updates of 'key', or if the callback returned an + * error. + * + * If the update needs further validation via `job.validate`, then + * needs_validation will be set nonzero. The caller should be sure to pass + * the updated jobspec to `job.validate` before posting updates to the job + * eventlog. + */ +int jobtap_job_update (struct jobtap *jobtap, + struct flux_msg_cred cred, + struct job *job, + const char *key, + json_t *value, + int *needs_validation, + char **errp); + +/* Call the `job.validate` plugin stack, but using an updated jobspec by + * applying 'updates' to 'job'. + * + * If validation fails, then this function will return -1 with the error + * set in 'errp' (Caller must free). + */ +int jobtap_validate_updates (struct jobtap *jobtap, + struct job *job, + json_t *updates, + char **errp); /* Load a new jobtap from `path`. Path may start with `builtin.` to * attempt to load one of the builtin jobtap plugins. diff --git a/src/modules/job-manager/jobtap.c b/src/modules/job-manager/jobtap.c index 64750474b9e3..fd2df12cb761 100644 --- a/src/modules/job-manager/jobtap.c +++ b/src/modules/job-manager/jobtap.c @@ -49,6 +49,7 @@ extern int limit_duration_plugin_init (flux_plugin_t *p); extern int after_plugin_init (flux_plugin_t *p); extern int begin_time_plugin_init (flux_plugin_t *p); extern int validate_duration_plugin_init (flux_plugin_t *p); +extern int update_duration_plugin_init (flux_plugin_t *p); extern int history_plugin_init (flux_plugin_t *p); struct jobtap_builtin { @@ -63,6 +64,7 @@ static struct jobtap_builtin jobtap_builtins [] = { { ".dependency-after", after_plugin_init }, { ".begin-time", &begin_time_plugin_init }, { ".validate-duration", &validate_duration_plugin_init }, + { ".update-duration", &update_duration_plugin_init }, { ".history", &history_plugin_init }, { 0 }, }; @@ -2110,20 +2112,6 @@ int flux_jobtap_event_post_pack (flux_plugin_t *p, return rc; } -/* RFC 21 jobspec-update event keys must start with "attributes." - * Reject update events with keys that violate the RFC. - */ -static bool validate_jobspec_updates (json_t *o) -{ - const char *key; - json_t *entry; - json_object_foreach (o, key, entry) { - if (!strstarts (key, "attributes.")) - return false; - } - return true; -} - int flux_jobtap_jobspec_update_pack (flux_plugin_t *p, const char *fmt, ...) { int rc = -1; @@ -2322,6 +2310,140 @@ int flux_jobtap_epilog_finish (flux_plugin_t *p, status); } +int jobtap_job_update (struct jobtap *jobtap, + struct flux_msg_cred cred, + struct job *job, + const char *key, + json_t *value, + int *needs_validation, + char **errp) +{ + int rc = -1; + char topic[128]; + int topiclen = sizeof (topic); + flux_plugin_arg_t *args = NULL; + + if (snprintf (topic, topiclen, "job.update.%s", key) >= topiclen) { + error_asprintf (jobtap, job, errp, + "topic string overflow"); + return -1; + } + + if (!(args = jobtap_args_create (jobtap, job)) + || flux_plugin_arg_pack (args, + FLUX_PLUGIN_ARG_IN, + "{s:{s:I s:I} s:s s:O}", + "cred", + "userid", (json_int_t) cred.userid, + "rolemask", (json_int_t) cred.rolemask, + "key", key, + "value", value) < 0 + || flux_plugin_arg_set (args, FLUX_PLUGIN_ARG_OUT, "{}") < 0) { + error_asprintf (jobtap, job, errp, + "jobtap_job_update: failed to create args"); + flux_plugin_arg_destroy (args); + return -1; + } + rc = jobtap_stack_call (jobtap, jobtap->plugins, job, topic, args); + if (rc == 0) { + /* No plugin handles update of this jobspec key, reject the update. + */ + error_asprintf (jobtap, job, errp, "update of %s not supported", key); + rc = -1; + } + else if (rc < 0) { + /* Callback failed, check for provided errmsg */ + const char *errmsg; + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_OUT, + "{s:s}", + "errmsg", &errmsg) < 0) { + errmsg = "update rejected by job-manager plugin"; + } + error_asprintf (jobtap, job, errp, "%s", errmsg); + errno = EPERM; + } + else if (rc > 0 && needs_validation != NULL) { + /* Default is to require further validation by calling job.validate + * with the updated jobspec. However, a plugin may note that the + * update is already validated or should bypass validation by + * setting "validated" in the plugin OUT arguments to a nonzero + * value. + */ + int validated = 0; + if ((rc = flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_OUT, + "{s?i}", + "validated", &validated) < 0)) { + error_asprintf (jobtap, + job, + errp, + "failed to unpack validated flag"); + fprintf (stderr, "arg unpack failed: %s\n", + flux_plugin_arg_strerror (args)); + return -1; + } + *needs_validation = !validated; + } + flux_plugin_arg_destroy (args); + return rc; +} + +int jobtap_validate_updates (struct jobtap *jobtap, + struct job *job, + json_t *updates, + char **errp) +{ + int rc = -1; + json_t *jobspec_updated = NULL; + flux_plugin_arg_t *args = NULL; + + if (!(jobspec_updated = job_jobspec_with_updates (job, updates))) { + error_asprintf (jobtap, job, errp, "update: %s", strerror (errno)); + goto error; + } + + /* Create plugin args, then override jobspec with updated version + */ + if (!(args = jobtap_args_create (jobtap, job)) + || flux_plugin_arg_pack (args, + FLUX_PLUGIN_ARG_IN, + "{s:O}", + "jobspec", jobspec_updated) < 0) { + error_asprintf (jobtap, job, errp, "update: %s", + flux_plugin_arg_strerror (args)); + goto error; + } + + /* Call validation stack + */ + rc = jobtap_stack_call (jobtap, + jobtap->plugins, + job, + "job.validate", + args); + + if (rc < 0) { + const char *errmsg; + /* + * Plugin callback failed, check for errmsg for this job + * If plugin did not provide an error message, then construct + * a generic error "rejected by plugin". + */ + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_OUT, + "{s:s}", + "errmsg", &errmsg) < 0) + errmsg = "rejected by job-manager plugin"; + if ((*errp = strdup (errmsg)) == NULL) + flux_log (jobtap->ctx->h, LOG_ERR, + "jobtap: validate failed to capture errmsg"); + } +error: + ERRNO_SAFE_WRAP (json_decref, jobspec_updated); + flux_plugin_arg_destroy (args); + return rc; +} /* * vi:tabstop=4 shiftwidth=4 expandtab diff --git a/src/modules/job-manager/plugins/limit-duration.c b/src/modules/job-manager/plugins/limit-duration.c index f5bf1aba31ab..ef8719635954 100644 --- a/src/modules/job-manager/plugins/limit-duration.c +++ b/src/modules/job-manager/plugins/limit-duration.c @@ -247,9 +247,6 @@ static int validate_cb (flux_plugin_t *p, goto error; } - if (state != FLUX_JOB_STATE_NEW) // flux restart or plugin reload - return 0; - if (check_limit (ctx, duration, queue, &error) < 0) goto error; diff --git a/src/modules/job-manager/plugins/limit-job-size.c b/src/modules/job-manager/plugins/limit-job-size.c index 4dd3b525bb8e..a27cc49161dc 100644 --- a/src/modules/job-manager/plugins/limit-job-size.c +++ b/src/modules/job-manager/plugins/limit-job-size.c @@ -375,12 +375,6 @@ static int validate_cb (flux_plugin_t *p, goto error; } - /* Jobs that have already been accepted must bypass the limits check. - * This occurs when Flux is restarted with pending jobs in the KVS. - */ - if (state != FLUX_JOB_STATE_NEW) - return 0; - if (jj_get_counts_json (jobspec, &counts) < 0) { errprintf (&error, "%s", counts.error); goto error; diff --git a/src/modules/job-manager/plugins/update-duration.c b/src/modules/job-manager/plugins/update-duration.c new file mode 100644 index 000000000000..6a844f28219a --- /dev/null +++ b/src/modules/job-manager/plugins/update-duration.c @@ -0,0 +1,88 @@ +/************************************************************\ + * Copyright 2023 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* update-duration.c - allow updates of attributes.system.duration for jobs + * + * This plugin implements a 'job.update.attributes.system.duration' + * callback to enable duration updates for pending jobs. + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include + +/* + * Allow instance owner to update duration to any value, even if it + * exceeds a configured duration limit. By default, this is true, to + * disable this behavior, reload the `.update-duration` plugin with + * owner-allow-any=0. + */ +static int owner_allow_any = 1; + +static int duration_update_cb (flux_plugin_t *p, + const char *topic, + flux_plugin_arg_t *args, + void *arg) +{ + struct flux_msg_cred cred; + flux_job_state_t state; + double duration; + + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:F s:i s:{s:i s:i}}", + "value", &duration, + "state", &state, + "cred", + "userid", &cred.userid, + "rolemask", &cred.rolemask) < 0) { + flux_jobtap_error (p, args, "plugin args unpack failed"); + return -1; + } + if (duration < 0.) { + flux_jobtap_error (p, args, "duration must not be negative"); + return -1; + } + if (state == FLUX_JOB_STATE_RUN + || state == FLUX_JOB_STATE_CLEANUP) { + flux_jobtap_error (p, + args, + "update of duration for running job not supported"); + return -1; + } + if ((cred.rolemask & FLUX_ROLE_OWNER) && owner_allow_any) { + /* If owner is allowed to make any duration adjustment, then + * report that value is validated via out arguments: + */ + flux_plugin_arg_pack (args, + FLUX_PLUGIN_ARG_OUT, + "{s:i}", + "validated", 1); + } + return 0; +} + +int update_duration_plugin_init (flux_plugin_t *p) +{ + flux_plugin_conf_unpack (p, + "{s:i}", + "owner-allow-any", + &owner_allow_any); + return flux_plugin_add_handler (p, + "job.update.attributes.system.duration", + duration_update_cb, + NULL); +} + +// vi:ts=4 sw=4 expandtab diff --git a/src/modules/job-manager/update.c b/src/modules/job-manager/update.c new file mode 100644 index 000000000000..286a5d4406f7 --- /dev/null +++ b/src/modules/job-manager/update.c @@ -0,0 +1,270 @@ +/************************************************************\ + * Copyright 2023 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* update - handle job update requests + * + * UPDATE REQUEST + * + * An update request payload consists of a jobid and dictionary of + * period-delimited keys to update in that job, e.g. + * + * { "id": 123456, "updates": {"attributes.system.duration", 3600.}} + * + * OPERATION + * + * For each update key, a jobtap callback "job.update.KEY" is executed. + * Currently at least one plugin MUST validate the update, therefore + * update keys are only supported if there is a plugin that explicitly + * allows the update by returning 0 from the 'job.update.*' callback. + * + * Note: in the future, some keys MAY be explicitly allowed in an allow + * list directly within this module. + * + * If any update in a request fails to be validated, then the request + * fails immediately. That is, either all updates are applied or none + * are. + * + * Once all updates are validated by callbacks, then updates as applied + * to jobspec are validated by passing an updated jobspec to the + * `job.validate` jobtap plugin stack. + * + * Plugins may request that this validation step be skipped by setting + * the 'validated' flag to 1 in the FLUX_PLUGIN_OUT_ARGS of the + * `job.update.*` callback. The `job.validate` step will only be skipped + * all keys in an update have the validated flag set. + * + * If all steps above are successful, then a `jobspec-update`event is + * posted for the job and a success response sent to the caller. + * + * FUTURE WORK + * + * - Some job updates may require feasibility checks on the resulting + * jobspec. There should be a flag for a plugin to require that the + * result be passed to the scheduler feasibility RPC. + * + * - The above change will require some asynchronous handling be added + * to this service + * + * - Plugins should also somehow be able to initiate asynchronous work + * before validating an update. There is no support for async plugin + * callbacks in jobtap at this time, though. + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include + +#include "update.h" +#include "job-manager.h" +#include "jobtap-internal.h" +#include "event.h" + +struct update { + struct job_manager *ctx; + flux_msg_handler_t **handlers; +}; + + +static int update_job (struct job_manager *ctx, + struct flux_msg_cred cred, + struct job *job, + json_t *updates, + char **errp) +{ + const char *key; + json_t *value; + int validate = 0; /* validation of result necessary */ + + /* Loop through one or more proposed updates in `updates` object + * and call `job.update.` job plugin(s) to validate each + * update. + */ + json_object_foreach (updates, key, value) { + int needs_validation = 1; + if (jobtap_job_update (ctx->jobtap, + cred, + job, + key, + value, + &needs_validation, + errp) < 0) + return -1; + /* If any jobspec key needs further validation, then all + * keys will be validated at the same time. This means a key + * that might not need further validation when updated alone + * may need to be validated when paired with other keys in a + * single update: + */ + if (needs_validation) + validate = 1; + } + if (validate + && jobtap_validate_updates (ctx->jobtap, + job, + updates, + errp) < 0) + return -1; + + /* If this update was requested by the instance owner, and the + * job owner is not the instance owner, and job validation was + * bypassed (validate != true), then disable future job updates + * as not permitted by marking the job immutable. + * + * The reasons for doing this are two-fold: + * + * - A future update of an unrelated attribute could fail validation + * due to this attribute update. This could result in a confusing + * error message. + * + * - Bypassing validation for individual, previously updated attributes + * could be complex and might open the update process to unintended + * vulnerabilities (e.g. a user update after an instance owner update + * could allow a job access to resources, time limits, etc that are + * not intended for normal users.) + */ + if (!validate + && (cred.rolemask & FLUX_ROLE_OWNER) + && cred.userid != job->userid) { + if (event_job_post_pack (ctx->event, + job, + "set-flags", + 0, + "{s:[s]}", + "flags", "immutable") < 0) { + if (!(*errp = strdup ("failed to set job immutable flag"))) + flux_log_error (ctx->h, + "update: set-flags: strdup errmsg failed"); + return -1; + } + + } + + /* All updates have been allowed by plugins and validated as a unit, + * so now emit jobspec-update event. + */ + if (event_job_post_pack (ctx->event, + job, + "jobspec-update", + 0, + "O", + updates) < 0) { + if (!(*errp = strdup ("failed to pack jobspec-update event"))) + flux_log_error (ctx->h, + "update: failed to create pack error string"); + return -1; + } + return 0; +} + +static void update_handle_request (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct update *update = arg; + struct job_manager *ctx = update->ctx; + flux_jobid_t id; + struct job *job; + json_t *updates; + struct flux_msg_cred cred; + const char *errstr = NULL; + char *error = NULL; + + if (flux_request_unpack (msg, + NULL, + "{s:I s:o}", + "id", &id, + "updates", &updates) < 0) + goto error; + + /* Validate updates object, currently all updates MUST + * start with `attributes.`: + */ + if (!validate_jobspec_updates (updates)) { + errstr = "one or more jobspec updates are invalid"; + errno = EINVAL; + goto error; + } + /* Verify jobid exists and is not inactive + */ + if (!(job = zhashx_lookup (ctx->active_jobs, &id))) { + if (!(job = zhashx_lookup (ctx->inactive_jobs, &id))) + errstr = "unknown job id"; + else + errstr = "job is inactive"; + errno = EINVAL; + goto error; + } + /* Fetch the credential from this message and ensure the user + * has authorization to update this job. + */ + if (flux_msg_get_cred (msg, &cred) < 0 + || flux_msg_cred_authorize (cred, job->userid) < 0) { + errstr = "guests may only update their own jobs"; + goto error; + } + if (job->immutable && !(cred.rolemask & FLUX_ROLE_OWNER)) { + errstr = "job is immutable due to previous instance owner update"; + goto error; + } + /* Process the update request + */ + if (update_job (ctx, cred, job, updates, &error) < 0) { + errstr = error; + goto error; + } + if (flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "%s: flux_respond", __FUNCTION__); + return; +error: + if (flux_respond_error (h, msg, errno, errstr) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); + free (error); +} + +void update_ctx_destroy (struct update *update) +{ + if (update) { + int saved_errno = errno; + flux_msg_handler_delvec (update->handlers); + free (update); + errno = saved_errno; + } +} + +static const struct flux_msg_handler_spec htab[] = { + { + FLUX_MSGTYPE_REQUEST, + "job-manager.update", + update_handle_request, + FLUX_ROLE_USER + }, + FLUX_MSGHANDLER_TABLE_END, +}; + +struct update *update_ctx_create (struct job_manager *ctx) +{ + struct update *update; + + if (!(update = calloc (1, sizeof (*update)))) + return NULL; + update->ctx = ctx; + if (flux_msg_handler_addvec (ctx->h, htab, update, &update->handlers) < 0) + goto error; + return update; +error: + update_ctx_destroy (update); + return NULL; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/modules/job-manager/update.h b/src/modules/job-manager/update.h new file mode 100644 index 000000000000..8ef4545ba960 --- /dev/null +++ b/src/modules/job-manager/update.h @@ -0,0 +1,22 @@ +/************************************************************\ + * Copyright 2023 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_JOB_MANAGER_UPDATE_H +#define _FLUX_JOB_MANAGER_UPDATE_H + +#include "job-manager.h" + +struct update *update_ctx_create (struct job_manager *ctx); +void update_ctx_destroy (struct update *update); + +#endif /* ! _FLUX_JOB_MANAGER_UPDATE_H */ +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/t/Makefile.am b/t/Makefile.am index 8de06d82e62e..7f31bbbc0a38 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -156,6 +156,7 @@ TESTSCRIPTS = \ t2275-job-duration-validator.t \ t2276-job-requires.t \ t2280-job-memo.t \ + t2290-job-update.t \ t2300-sched-simple.t \ t2302-sched-simple-up-down.t \ t2303-sched-hello.t \ @@ -484,6 +485,7 @@ check_LTLIBRARIES = \ job-manager/plugins/config.la \ job-manager/plugins/jobspec-update.la \ job-manager/plugins/jobspec-update-job-list.la \ + job-manager/plugins/update-test.la \ stats/stats-basic.la \ stats/stats-immediate.la @@ -1005,6 +1007,16 @@ job_manager_plugins_jobspec_update_job_list_la_LIBADD = \ $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/libflux-core.la +job_manager_plugins_update_test_la_SOURCES = \ + job-manager/plugins/update-test.c +job_manager_plugins_update_test_la_CPPFLAGS = \ + $(test_cppflags) +job_manager_plugins_update_test_la_LDFLAGS = \ + $(fluxplugin_ldflags) -module -rpath /nowhere +job_manager_plugins_update_test_la_LIBADD = \ + $(top_builddir)/src/common/libflux-internal.la \ + $(top_builddir)/src/common/libflux-core.la + hwloc_hwloc_convert_SOURCES = hwloc/hwloc-convert.c hwloc_hwloc_convert_CPPFLAGS = $(HWLOC_CFLAGS) $(test_cppflags) hwloc_hwloc_convert_LDADD = $(HWLOC_LIBS) \ diff --git a/t/job-manager/plugins/jobspec-update-job-list.c b/t/job-manager/plugins/jobspec-update-job-list.c index 43440fe9b1a5..2910b797823e 100644 --- a/t/job-manager/plugins/jobspec-update-job-list.c +++ b/t/job-manager/plugins/jobspec-update-job-list.c @@ -61,6 +61,13 @@ static int sched_cb (flux_plugin_t *p, flux_plugin_arg_t *args, void *data) { + static bool updated = false; + /* Avoid emitting this jobspec-update event more than once per test. + * Note: this means the test plugin will only work for one job without + * reloading the plugin + */ + if (updated) + return 0; if (flux_jobtap_jobspec_update_pack (p, "{s:s}", "attributes.system.queue", @@ -71,6 +78,7 @@ static int sched_cb (flux_plugin_t *p, "update failure"); return -1; } + updated = true; return 0; } diff --git a/t/job-manager/plugins/update-test.c b/t/job-manager/plugins/update-test.c new file mode 100644 index 000000000000..073b4b98d069 --- /dev/null +++ b/t/job-manager/plugins/update-test.c @@ -0,0 +1,79 @@ +/************************************************************\ + * Copyright 2023 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* update-test.c - test plugin authorization of job update + * allow updates of the 'test' and 'test2' attributes for test + * purposes. + */ + +#include +#include + +#include "ccan/str/str.h" + +static int update_cb (flux_plugin_t *p, + const char *topic, + flux_plugin_arg_t *args, + void *data) +{ + struct flux_msg_cred cred; + const char *value; + + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:s s:{s:i s:i}}", + "value", &value, + "cred", + "userid", &cred.userid, + "rolemask", &cred.rolemask) < 0) + return flux_jobtap_error (p, args, "plugin args unpack failed"); + if (streq (value, "fail-test")) + return flux_jobtap_error (p, args, "rejecting update: fail-test"); + return 0; +} + +static int job_updated (flux_plugin_t *p, + const char *topic, + flux_plugin_arg_t *args, + void *data) +{ + const char *value = NULL; + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:{s?s}}", + "updates", + "attributes.system.test", &value) < 0) + return flux_jobtap_error (p, args, "plugin args unpack failed"); + if (value + && flux_jobtap_event_post_pack (p, + FLUX_JOBTAP_CURRENT_JOB, + "update-test", + "{s:s}", + "value", value) < 0) + return flux_jobtap_error (p, args, "flux_job_event_post_pack failed"); + return 0; +} + + +static const struct flux_plugin_handler tab[] = { + { "job.update", job_updated, NULL }, + { "job.update.attributes.system.test", update_cb, NULL }, + { "job.update.attributes.system.test2", update_cb, NULL }, + { 0 }, +}; + +int flux_plugin_init (flux_plugin_t *p) +{ + if (flux_plugin_register (p, "update-test", tab) < 0) + return -1; + return 0; +} + +// vi:ts=4 sw=4 expandtab diff --git a/t/t2290-job-update.t b/t/t2290-job-update.t new file mode 100755 index 000000000000..8be65f725365 --- /dev/null +++ b/t/t2290-job-update.t @@ -0,0 +1,185 @@ +#!/bin/sh +test_description='Test flux update command' + +. $(dirname $0)/sharness.sh + +if flux job submit --help 2>&1 | grep -q sign-type; then + test_set_prereq HAVE_FLUX_SECURITY +fi + +test_under_flux 1 job + +flux setattr log-stderr-level 1 + +runas_guest() { + local userid=$(($(id -u)+1)) + FLUX_HANDLE_USERID=$userid FLUX_HANDLE_ROLEMASK=0x2 "$@" +} + +submit_held_job_as_guest() +{ + local duration=$1 + local userid=$(($(id -u)+1)) + flux run --dry-run -t $duration true | \ + flux python ${SHARNESS_TEST_SRCDIR}/scripts/sign-as.py $userid \ + >job.signed + FLUX_HANDLE_USERID=$userid \ + flux job submit --flags=signed --urgency=0 job.signed +} + +test_expect_success 'flux update requires jobid and keyval args' ' + test_expect_code 2 flux update 1234 && + test_expect_code 2 flux update +' +test_expect_success 'submit jobs for testing' ' + inactive_jobid=$(flux submit --wait-event=clean true) && + jobid=$(flux submit --urgency=hold sleep 60) +' +test_expect_success 'invalid jobid fails' ' + test_expect_code 1 flux update f123 duration=10m 2>invalid-jobid.err && + test_debug "cat invalid-jobid.err" && + grep "unknown job id" invalid-jobid.err +' +test_expect_success 'update to inactive job fails' ' + test_expect_code 1 flux update $inactive_jobid duration=1m \ + 2>inactive-jobid.err && + test_debug "cat inactive-jobid.err" && + grep "job is inactive" inactive-jobid.err +' +test_expect_success 'invalid jobspec key cannot be updated' ' + test_expect_code 1 flux update $jobid foo=bar +' +test_expect_success 'guests can only update their own jobs' ' + test_expect_code 1 runas_guest flux update $jobid duration=1h \ + 2>invalid-user.err && + test_debug "cat invalid-user.err" && + grep "guests may only update their own jobs" invalid-user.err +' +test_expect_success 'update of unlimited duration with relative value fails' ' + test_expect_code 1 flux update $jobid duration=+1m && + test_expect_code 1 flux update $jobid duration=-1m +' +test_expect_success 'update request for negative duration fails' ' + echo "{\"id\": $(flux job id $jobid),\ + \"updates\": {\"attributes.system.duration\": -1.0}\ + }" \ + | ${FLUX_BUILD_DIR}/t/request/rpc job-manager.update 1 # EPERM +' +test_expect_success 'update request with invalid duration type fails' ' + echo "{\"id\": $(flux job id $jobid),\ + \"updates\": {\"attributes.system.duration\": "foo"}\ + }" \ + | ${FLUX_BUILD_DIR}/t/request/rpc job-manager.update 71 # EPROTO +' +test_expect_success 'update of duration of pending job works' ' + flux update $jobid duration=1m && + flux job eventlog $jobid \ + | grep jobspec-update \ + | grep duration=60 +' +test_expect_success 'update of duration accepts relative values' ' + flux update --dry-run $jobid duration=+1m \ + | jq ".\"attributes.system.duration\" == 120." && + flux update --dry-run $jobid duration=-30s \ + | jq ".\"attributes.system.duration\" == 30." +' +test_expect_success 'update with multiple keys fails if one key fails' ' + test_expect_code 1 flux update $jobid duration=12345 name=foo +' +test_expect_success 'update of duration to inf sets duration to 0' ' + flux update --dry-run $jobid duration=inf \ + | jq ".\"attributes.system.duration\" == 0." +' +test_expect_success 'flux update rejects duration <= 0 fails' ' + test_expect_code 1 flux update $jobid duration=-1h +' +test_expect_success 'update affects duration of running job' ' + flux update $jobid duration=0.1s && + flux job urgency $jobid default && + flux job wait-event -t 30 -m type=timeout $jobid exception && + flux job info $jobid R \ + | jq -e "(.execution|(.expiration - .starttime)*10 | round) == 1" +' +test_expect_success 'update fails for running job' ' + jobid=$(flux submit -t1m --wait-event=start sleep 60) && + test_expect_code 1 flux update $jobid duration=90s 2>run.err && + test_debug "cat run.err" && + grep "update of duration for running job not supported" run.err +' +test_expect_success 'add a duration limit and submit a held job' ' + echo policy.limits.duration=\"1m\" | flux config load && + jobid=$(flux submit --urgency=hold -t 1m true) +' +test_expect_success 'instance owner can adjust duration past limits' ' + flux update $jobid duration=1h && + flux job eventlog $jobid \ + | grep jobspec-update \ + | grep duration=3600 +' +test_expect_success FLUX_SECURITY 'guest update of their own job works' ' + guest_jobid=$(submit_held_job_as_guest 1m) && + runas_guest flux update -v $guest_jobid duration=1m && + flux job eventlog $guest_jobid \ + | grep jobspec-update \ + | grep duration=60 +' +test_expect_success FLUX_SECURITY 'guest cannot update job duration past limit' ' + test_expect_code 1 runas_guest flux update -v $guest_jobid duration=1h +' +test_expect_success 'instance owner can adjust duration past limits' ' + flux update $jobid duration=1h && + flux job eventlog $jobid \ + | grep jobspec-update \ + | grep duration=3600 +' +test_expect_success FLUX_SECURITY 'instance owner can adjust guest job duration past limits' ' + flux update $guest_jobid duration=1h && + flux job eventlog $guest_jobid \ + | grep jobspec-update \ + | grep duration=3600 +' +test_expect_success FLUX_SECURITY 'guest job is now immutable' ' + test_expect_code 1 runas_guest \ + flux update -v $guest_jobid duration=1m \ + 2>immutable.err && + test_debug "cat immutable.err" && + grep immutable immutable.err +' +test_expect_success 'adjust duration so future tests pass validation' ' + flux update $jobid duration=1m +' +test_expect_success 'reload update-duration plugin with owner-allow-any=0' ' + flux jobtap remove .update-duration && + flux jobtap load .update-duration owner-allow-any=0 +' +test_expect_success 'update duration above policy limit now fails' ' + test_expect_code 1 flux update $jobid duration=1h 2>limit.err && + test_debug "cat limit.err" && + grep "requested duration exceeds policy limit" limit.err +' +test_expect_success 'update of attributes.system.test fails' ' + test_expect_code 1 flux update $jobid test=foo +' +test_expect_success 'load update-test jobtap plugin' ' + PLUGINPATH=${FLUX_BUILD_DIR}/t/job-manager/plugins/.libs && + flux jobtap load --remove=all $PLUGINPATH/update-test.so +' +test_expect_success 'now update of attributes.system.test works' ' + flux update $jobid test=foo-update && + flux job eventlog $jobid \ + | grep jobspec-update \ + | grep foo-update +' +test_expect_success 'update-test plugin can reject updates' ' + test_expect_code 1 flux update $jobid test=fail-test 2>fail-test.err && + test_debug "cat fail-test.err" && + grep "rejecting update" fail-test.err +' +test_expect_success 'multiple keys can be updated successfully' ' + flux update -v $jobid test=ok test2=ok2 && + flux job eventlog $jobid && + flux job eventlog $jobid \ + | grep jobspec-update \ + | grep "test=\"ok\" attributes.system.test2=\"ok2\"" +' +test_done