src.asqi.backends.kubernetes_backend

Kubernetes Job-based implementation of ContainerBackend.

Dispatches ASQI containers as Kubernetes Jobs using the K8s Jobs API. Images are assumed to be available in the cluster – no pull logic is performed.

Each Job runs the workload container alongside a native sidecar container (K8s >= 1.28 initContainers entry with restartPolicy: Always) that mediates S3 I/O via the shared emptyDir volume and a per-Job ConfigMap carrying the workload’s InputRef / OutputDestination (AIP-2207).

Required RBAC (see asqi/k8s/rbac.yaml in the installed package):

A ServiceAccount with create / get / watch / delete permissions on batch/v1 Jobs, get / list on core/v1 Pods, and create / delete on core/v1 ConfigMaps in the target namespace.

Optional dependency:

Install the Kubernetes Python client with:

pip install 'asqi-engineer[k8s]'
Unsupported Docker-semantics fields (fail-closed):
__volumes — the legacy Docker volume-passing key is rejected by

_extract_io_refs (use __inputs / __output with AIP-2207 InputRef / OutputDestination).

host_access — Docker socket / Docker-in-Docker access is not supported in K8s Jobs.

Attributes

Classes

IORefs

Result of extracting AIP-2207 InputRef / OutputDestination from container args.

KubernetesBackend

ContainerBackend implementation that runs containers as Kubernetes Jobs.

Module Contents

src.asqi.backends.kubernetes_backend.logger
class src.asqi.backends.kubernetes_backend.IORefs

Result of extracting AIP-2207 InputRef / OutputDestination from container args.

Attributes:
args: args with __inputs / __output keys stripped from any

--test-params / --generation-params JSON payload. This is what the workload container actually receives — the workload sees local mount paths, never raw S3 refs.

inputs: Validated InputRef list (possibly empty) destined for the

sidecar’s io.json ConfigMap.

output: Validated OutputDestination (single, optional) destined

for the same ConfigMap.

error: Non-None when extraction fails (malformed refs, legacy

__volumes key, etc.). Callers MUST fail-closed and skip Job creation when error is set — no silent strip.

args: list[str]
inputs: list[asqi.schemas.InputRef]
output: asqi.schemas.OutputDestination | None = None
error: str | None = None
class src.asqi.backends.kubernetes_backend.KubernetesBackend(namespace: str = _DEFAULT_NAMESPACE, *, sidecar_image: str | None = None, sidecar_sa_name: str | None = None, sidecar_configmap_name: str | None = None, sidecar_secret_name: str | None = None)

ContainerBackend implementation that runs containers as Kubernetes Jobs.

Each call to run() creates a K8s Job, waits for it to complete, collects its logs, then deletes the Job. Images are assumed to be available in the cluster — no pull logic is performed.

Args:
namespace: Kubernetes namespace in which Jobs are created.

Defaults to "default".

RBAC requirement:

The ServiceAccount must have create / get / watch / delete permissions on batch/v1 Jobs and get / list on core/v1 Pods in namespace. See asqi/k8s/rbac.yaml in the installed package for a ready-to-apply manifest.

run(image: str, args: list[str], container_config: asqi.config.ContainerConfig, environment: dict[str, str] | None = None, name: str | None = None, workflow_id: str = '', manifest: asqi.schemas.Manifest | None = None) dict[str, Any]

Create a K8s Job for image, wait for completion, and return results.

Args:

image: Container image reference. args: Command-line arguments passed to the container. May contain

__inputs / __output keys inside --test-params / --generation-params JSON payloads; these are extracted into the per-Job ConfigMap and stripped before reaching the workload (see _extract_io_refs()).

container_config: Execution configuration (timeout, resource limits, etc.). environment: Optional environment variables injected into the workload container. name: Optional human-readable hint used to build the Job name. workflow_id: Workflow identifier attached as a Job label and as

the sidecar’s AIP_JOB_HANDLE env var.

manifest: Optional manifest for the container image. host_access=True

causes an immediate fail-closed return without creating a Job.

Returns:

Dict with success, exit_code, output, error, container_id.

shutdown(workflow_ids: list[str] | None = None) None

Delete all ASQI K8s Jobs, optionally scoped to specific workflow IDs.

Args:
workflow_ids: If None, deletes all Jobs labelled

service=asqi_engineer in the namespace. Otherwise only Jobs matching the given workflow IDs are deleted.

check_images(images: list[str]) dict[str, bool]

Return True for every image – K8s does not support pre-flight image checks.

Image availability is determined at pod scheduling time by the kubelet.

pull_images(images: list[str]) None

No-op: image pulling is handled by the kubelet (imagePullPolicy).

Images are assumed to be available in the cluster registry.

extract_manifest(image: str, manifest_path: str = ContainerConfig.MANIFEST_PATH) asqi.schemas.Manifest | None

Extract manifest.yaml from an image by running a one-shot K8s Job.

The Job runs cat <manifest_path>; the pod stdout is parsed as YAML.

Args:

image: Container image reference. manifest_path: Path to the manifest file inside the container.

Returns:

Parsed Manifest or None.

Raises:

ManifestExtractionError: If the Job fails or the YAML cannot be parsed.