diff --git a/CHANGELOG.md b/CHANGELOG.md index d9a92d3..3ef556e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,8 +14,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Handling for spot instance eviction - [#85](https://github.com/PrefectHQ/prefect-kubernetes/pull/85) - ### Changed ### Deprecated @@ -26,6 +24,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security +## 0.2.12 + +Released September 18th, 2023. + +### Added + +- Handling for spot instance eviction - [#85](https://github.com/PrefectHQ/prefect-kubernetes/pull/85) +- Logging recent job events when pod scheduling fails - [#88](https://github.com/PrefectHQ/prefect-kubernetes/pull/88) +- Event logging for pod events - [#91](https://github.com/PrefectHQ/prefect-kubernetes/pull/91) + + +### Fixed + +- `env` handling to allow hard coding of environment variable in base job template - [#94](https://github.com/PrefectHQ/prefect-kubernetes/pull/94) + ## 0.2.11 Released July 20th, 2023. diff --git a/prefect_kubernetes/worker.py b/prefect_kubernetes/worker.py index 0dcd91a..daee196 100644 --- a/prefect_kubernetes/worker.py +++ b/prefect_kubernetes/worker.py @@ -95,7 +95,6 @@ import anyio.abc from prefect.blocks.kubernetes import KubernetesClusterConfig -from prefect.docker import get_prefect_image_name from prefect.exceptions import ( InfrastructureError, InfrastructureNotAvailable, @@ -104,6 +103,7 @@ from prefect.server.schemas.core import Flow from prefect.server.schemas.responses import DeploymentResponse from prefect.utilities.asyncutils import run_sync_in_worker_thread +from prefect.utilities.dockerutils import get_prefect_image_name from prefect.utilities.importtools import lazy_import from prefect.utilities.pydantic import JsonPatch from prefect.utilities.templating import find_placeholders @@ -308,9 +308,7 @@ def prepare_for_flow_run( super().prepare_for_flow_run(flow_run, deployment, flow) # Update configuration env and job manifest env self._update_prefect_api_url_if_local_server() - self.job_manifest["spec"]["template"]["spec"]["containers"][0]["env"] = [ - {"name": k, "value": v} for k, v in self.env.items() - ] + self._populate_env_in_manifest() # Update labels in job manifest self._slugify_labels() # Add defaults to job manifest if necessary @@ -318,6 +316,42 @@ def prepare_for_flow_run( self._populate_command_if_not_present() self._populate_generate_name_if_not_present() + def _populate_env_in_manifest(self): + """ + Populates environment variables in the job manifest. + + When `env` is templated as a variable in the job manifest it comes in as a + dictionary. We need to convert it to a list of dictionaries to conform to the + Kubernetes job manifest schema. + + This function also handles the case where the user has removed the `{{ env }}` + placeholder and hard coded a value for `env`. In this case, we need to prepend + our environment variables to the list to ensure Prefect setting propagation. + An example reason the a user would remove the `{{ env }}` placeholder to + hardcode Kuberentes secrets in the base job template. + """ + transformed_env = [{"name": k, "value": v} for k, v in self.env.items()] + + template_env = self.job_manifest["spec"]["template"]["spec"]["containers"][ + 0 + ].get("env") + + # If user has removed `{{ env }}` placeholder and hard coded a value for `env`, + # we need to prepend our environment variables to the list to ensure Prefect + # setting propagation. + if isinstance(template_env, list): + self.job_manifest["spec"]["template"]["spec"]["containers"][0]["env"] = [ + *transformed_env, + *template_env, + ] + # Current templating adds `env` as a dict when the kubernetes manifest requires + # a list of dicts. Might be able to improve this in the future with a better + # default `env` value and better typing. + else: + self.job_manifest["spec"]["template"]["spec"]["containers"][0][ + "env" + ] = transformed_env + def _update_prefect_api_url_if_local_server(self): """If the API URL has been set by the base environment rather than the by the user, update the value to ensure connectivity when using a bridge network by diff --git a/requirements.txt b/requirements.txt index f4647c5..45672dd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -prefect>=2.10.9 +prefect>=2.13.0 kubernetes >= 24.2.0 \ No newline at end of file diff --git a/tests/test_worker.py b/tests/test_worker.py index aa471a6..af255ee 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -21,7 +21,6 @@ ) from kubernetes.config import ConfigException from prefect.client.schemas import FlowRun -from prefect.docker import get_prefect_image_name from prefect.exceptions import ( InfrastructureError, InfrastructureNotAvailable, @@ -35,6 +34,7 @@ get_current_settings, temporary_settings, ) +from prefect.utilities.dockerutils import get_prefect_image_name from pydantic import ValidationError from prefect_kubernetes import KubernetesWorker @@ -259,6 +259,292 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): stream_output=True, ), ), + ( + # default base template with custom env + { + "job_configuration": { + "command": "{{ command }}", + "env": "{{ env }}", + "labels": "{{ labels }}", + "name": "{{ name }}", + "namespace": "{{ namespace }}", + "job_manifest": { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "labels": "{{ labels }}", + "namespace": "{{ namespace }}", + "generateName": "{{ name }}-", + }, + "spec": { + "backoffLimit": 0, + "ttlSecondsAfterFinished": "{{ finished_job_ttl }}", + "template": { + "spec": { + "parallelism": 1, + "completions": 1, + "restartPolicy": "Never", + "serviceAccountName": "{{ service_account_name }}", + "containers": [ + { + "name": "prefect-job", + "env": [ + { + "name": "TEST_ENV", + "valueFrom": { + "secretKeyRef": { + "name": "test-secret", + "key": "shhhhh", + } + }, + }, + ], + "image": "{{ image }}", + "imagePullPolicy": "{{ image_pull_policy }}", + "args": "{{ command }}", + } + ], + } + }, + }, + }, + "cluster_config": "{{ cluster_config }}", + "job_watch_timeout_seconds": "{{ job_watch_timeout_seconds }}", + "pod_watch_timeout_seconds": "{{ pod_watch_timeout_seconds }}", + "stream_output": "{{ stream_output }}", + }, + "variables": { + "description": "Default variables for the Kubernetes worker.\n\nThe schema for this class is used to populate the `variables` section of the default\nbase job template.", + "type": "object", + "properties": { + "name": { + "title": "Name", + "description": "Name given to infrastructure created by a worker.", + "type": "string", + }, + "env": { + "title": "Environment Variables", + "description": "Environment variables to set when starting a flow run.", + "type": "object", + "additionalProperties": {"type": "string"}, + }, + "labels": { + "title": "Labels", + "description": "Labels applied to infrastructure created by a worker.", + "type": "object", + "additionalProperties": {"type": "string"}, + }, + "command": { + "title": "Command", + "description": "The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker.", + "type": "string", + }, + "namespace": { + "title": "Namespace", + "description": "The Kubernetes namespace to create jobs within.", + "default": "default", + "type": "string", + }, + "image": { + "title": "Image", + "description": "The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be used.", + "example": "docker.io/prefecthq/prefect:2-latest", + "type": "string", + }, + "service_account_name": { + "title": "Service Account Name", + "description": "The Kubernetes service account to use for job creation.", + "type": "string", + }, + "image_pull_policy": { + "title": "Image Pull Policy", + "description": "The Kubernetes image pull policy to use for job containers.", + "default": "IfNotPresent", + "enum": ["IfNotPresent", "Always", "Never"], + "type": "string", + }, + "finished_job_ttl": { + "title": "Finished Job TTL", + "description": "The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely.", + "type": "integer", + }, + "job_watch_timeout_seconds": { + "title": "Job Watch Timeout Seconds", + "description": "Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely.", + "type": "integer", + }, + "pod_watch_timeout_seconds": { + "title": "Pod Watch Timeout Seconds", + "description": "Number of seconds to watch for pod creation before timing out.", + "default": 60, + "type": "integer", + }, + "stream_output": { + "title": "Stream Output", + "description": "If set, output will be streamed from the job to local standard output.", + "default": True, + "type": "boolean", + }, + "cluster_config": { + "title": "Cluster Config", + "description": "The Kubernetes cluster config to use for job creation.", + "allOf": [{"$ref": "#/definitions/KubernetesClusterConfig"}], + }, + }, + "definitions": { + "KubernetesClusterConfig": { + "title": "KubernetesClusterConfig", + "description": "Stores configuration for interaction with Kubernetes clusters.\n\nSee `from_file` for creation.", + "type": "object", + "properties": { + "config": { + "title": "Config", + "description": "The entire contents of a kubectl config file.", + "type": "object", + }, + "context_name": { + "title": "Context Name", + "description": "The name of the kubectl context to use.", + "type": "string", + }, + }, + "required": ["config", "context_name"], + "block_type_slug": "kubernetes-cluster-config", + "secret_fields": [], + "block_schema_references": {}, + } + }, + }, + }, + {}, + KubernetesWorkerJobConfiguration( + command=None, + env={}, + labels={}, + name=None, + namespace="default", + job_manifest={ + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "namespace": "default", + "generateName": "-", + "labels": {}, + }, + "spec": { + "backoffLimit": 0, + "template": { + "spec": { + "parallelism": 1, + "completions": 1, + "restartPolicy": "Never", + "containers": [ + { + "name": "prefect-job", + "imagePullPolicy": "IfNotPresent", + "env": [ + { + "name": "TEST_ENV", + "valueFrom": { + "secretKeyRef": { + "name": "test-secret", + "key": "shhhhh", + } + }, + }, + ], + } + ], + } + }, + }, + }, + cluster_config=None, + job_watch_timeout_seconds=None, + pod_watch_timeout_seconds=60, + stream_output=True, + ), + lambda flow_run, deployment, flow: KubernetesWorkerJobConfiguration( + command="python -m prefect.engine", + env={ + **get_current_settings().to_environment_variables(exclude_unset=True), + "PREFECT__FLOW_RUN_ID": flow_run.id.hex, + }, + labels={ + "prefect.io/flow-run-id": str(flow_run.id), + "prefect.io/flow-run-name": flow_run.name, + "prefect.io/version": _slugify_label_value(prefect.__version__), + "prefect.io/deployment-id": str(deployment.id), + "prefect.io/deployment-name": deployment.name, + "prefect.io/flow-id": str(flow.id), + "prefect.io/flow-name": flow.name, + }, + name=flow_run.name, + namespace="default", + job_manifest={ + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "namespace": "default", + "generateName": f"{flow_run.name}-", + "labels": { + "prefect.io/flow-run-id": str(flow_run.id), + "prefect.io/flow-run-name": flow_run.name, + "prefect.io/version": _slugify_label_value(prefect.__version__), + "prefect.io/deployment-id": str(deployment.id), + "prefect.io/deployment-name": deployment.name, + "prefect.io/flow-id": str(flow.id), + "prefect.io/flow-name": flow.name, + }, + }, + "spec": { + "backoffLimit": 0, + "template": { + "spec": { + "parallelism": 1, + "completions": 1, + "restartPolicy": "Never", + "containers": [ + { + "name": "prefect-job", + "imagePullPolicy": "IfNotPresent", + "env": [ + *[ + {"name": k, "value": v} + for k, v in get_current_settings() + .to_environment_variables( + exclude_unset=True + ) + .items() + ], + { + "name": "PREFECT__FLOW_RUN_ID", + "value": flow_run.id.hex, + }, + { + "name": "TEST_ENV", + "valueFrom": { + "secretKeyRef": { + "name": "test-secret", + "key": "shhhhh", + } + }, + }, + ], + "image": get_prefect_image_name(), + "args": ["python", "-m", "prefect.engine"], + } + ], + } + }, + }, + }, + cluster_config=None, + job_watch_timeout_seconds=None, + pod_watch_timeout_seconds=60, + stream_output=True, + ), + ), ( # default base template with no values KubernetesWorker.get_default_base_job_template(),