src.asqi.workflow

Attributes

Classes

TestExecutionResult

Represents the result of a single test execution or data generation job.

Functions

dbos_check_images_availability(→ Dict[str, bool])

Check if all required Docker images are available locally.

dbos_pull_images(images)

Pull missing Docker images from registries.

extract_manifests_step(→ Dict[str, asqi.schemas.Manifest])

Extract and parse manifests from a list of Docker images.

validate_test_plan(→ List[str])

DBOS step wrapper for comprehensive test plan validation.

validate_indicator_display_reports_step(→ List[str])

DBOS step wrapper for comprehensive score card report validation.

execute_single_test(→ TestExecutionResult)

Execute a single test in a Docker container.

evaluate_score_card(→ List[Dict[str, Any]])

Evaluate score cards against test execution results.

run_test_suite_workflow(→ Tuple[Dict[str, Any], ...)

Execute a test suite with DBOS durability (tests only, no score card evaluation).

convert_test_results_to_objects(...)

Convert test results data back to TestExecutionResult objects.

add_score_cards_to_results(→ Dict[str, Any])

Add score card evaluation results to test results data.

evaluate_score_cards_workflow(→ Dict[str, Any])

Evaluate score cards against existing test results.

run_end_to_end_workflow(→ Tuple[Dict[str, Any], ...)

Execute a complete end-to-end workflow: test execution + score card evaluation.

validate_test_container_reports(→ List[str])

Validates that the reports returned by the test container exactly match the test container manifest output_reports definitions.

validate_display_reports(manifests, score_card, ...)

Validate that score card 'display_reports' are defined in the test container manifests and match the expected structure

save_results_to_file_step(→ None)

Save execution results to a JSON file.

save_container_results_to_file_step(→ None)

Save container results to a JSON file.

start_test_execution(→ str)

Orchestrate test suite execution workflow.

start_score_card_evaluation(→ str)

Orchestrate score card evaluation workflow.

start_data_generation(→ str)

Orchestrate data generation workflow.

execute_data_generation(→ TestExecutionResult)

Execute a single data generation job in a Docker container.

run_data_generation_workflow(→ Tuple[Dict[str, Any], ...)

Execute a test suite with DBOS durability (tests only, no score card evaluation).

Module Contents

src.asqi.workflow.oltp_endpoint
src.asqi.workflow.system_database_url
src.asqi.workflow.config: dbos.DBOSConfig
src.asqi.workflow.console
class src.asqi.workflow.TestExecutionResult(test_name: str, test_id: str, sut_name: str | None, image: str, system_type: str | None = None)

Represents the result of a single test execution or data generation job.

test_id
test_name
sut_name
image
system_type = None
start_time: float = 0
end_time: float = 0
success: bool = False
container_id: str = ''
exit_code: int = -1
container_output: str = ''
error_message: str = ''
results: Dict[str, Any]
generated_reports: List[asqi.response_schemas.GeneratedReport] = []
generated_datasets: List[asqi.response_schemas.GeneratedDataset] = []
property execution_time: float

Calculate execution time in seconds.

property test_results: Dict[str, Any]

Alias for results to maintain backward compatibility with score card engine.

result_dict(use_results_field: bool = False) Dict[str, Any]

Convert to dictionary for storage/reporting.

Args:
use_results_field: If True, outputs ‘results’ field (data generation pipeline).

If False, outputs ‘test_results’ field (test execution pipeline, used by score cards).

Serializes Pydantic objects to dictionaries.

container_dict() Dict[str, Any]

Convert to dictionary for storage/reporting.

src.asqi.workflow.dbos_check_images_availability(images: List[str]) Dict[str, bool]

Check if all required Docker images are available locally.

src.asqi.workflow.dbos_pull_images(images: List[str])

Pull missing Docker images from registries.

src.asqi.workflow.extract_manifests_step(images: List[str]) Dict[str, asqi.schemas.Manifest]

Extract and parse manifests from a list of Docker images.

Args:

images: List of Docker image names

Returns:

Dictionary mapping Image name to Manifest

src.asqi.workflow.validate_test_plan(suite: asqi.schemas.SuiteConfig, systems: asqi.schemas.SystemsConfig, manifests: Dict[str, asqi.schemas.Manifest]) List[str]

DBOS step wrapper for comprehensive test plan validation.

Delegates to validation.py for the actual validation logic. This step exists to provide DBOS durability for validation results.

Args:

suite: Test suite configuration (pre-validated) systems: systems configuration (pre-validated) manifests: Available manifests (pre-validated)

Returns:

List of validation error messages

src.asqi.workflow.validate_indicator_display_reports_step(suite: asqi.schemas.SuiteConfig, manifests: Dict[str, asqi.schemas.Manifest], score_cards: List[asqi.schemas.ScoreCard]) List[str]

DBOS step wrapper for comprehensive score card report validation.

Delegates to validation.py for the actual validation logic. This step exists to provide DBOS durability for validation results.

Args:

suite: Test suite configuration (pre-validated) manifests: Available manifests (pre-validated) score_cards: List of score card configurations (pre-validated)

Returns:

List of validation error messages

src.asqi.workflow.execute_single_test(test_name: str, test_id: str, image: str, sut_name: str, systems_params: Dict[str, Any], test_params: Dict[str, Any], container_config: asqi.config.ContainerConfig, env_file: str | None = None, environment: Dict[str, str] | None = None, metadata_config: Dict[str, Any] | None = None, parent_id: str | None = None) TestExecutionResult

Execute a single test in a Docker container.

Focuses solely on test execution. Input validation is handled separately in validation.py to follow single responsibility principle.

Args:

test_name: Name of the test to execute (pre-validated) test_id: Unique ID of the test to execute (pre-validated) image: Docker image to run (pre-validated) sut_name: Name of the system under test (pre-validated) systems_params: Dictionary containing system_under_test and other systems (pre-validated) test_params: Parameters for the test (pre-validated) container_config: Container execution configurations env_file: Optional path to .env file for test-level environment variables environment: Optional dictionary of environment variables for the test metadata_config: Optional dictionary containing metadata to forward into the test container parent_id: Optional parent workflow ID for tracking hierarchy (defaults to DBOS.workflow_id)

Returns:

TestExecutionResult containing execution metadata and results

Raises:

ValueError: If inputs fail validation or JSON output cannot be parsed RuntimeError: If container execution fails

src.asqi.workflow.evaluate_score_card(test_results: List[TestExecutionResult], score_card_configs: List[Dict[str, Any]], audit_responses_data: Dict[str, Any] | None = None, execution_mode: asqi.config.ExecutionMode = ExecutionMode.END_TO_END) List[Dict[str, Any]]

Evaluate score cards against test execution results.

src.asqi.workflow.run_test_suite_workflow(suite_config: Dict[str, Any], systems_config: Dict[str, Any], executor_config: Dict[str, Any], container_config: asqi.config.ContainerConfig, datasets_config: Dict[str, Any] | None = None, score_card_configs: List[Dict[str, Any]] | None = None, metadata_config: Dict[str, Any] | None = None) Tuple[Dict[str, Any], List[Dict[str, Any]]]

Execute a test suite with DBOS durability (tests only, no score card evaluation).

This workflow: 1. Validates image availability and extracts manifests 2. Performs cross-validation of tests, systems, and manifests 3. Executes tests concurrently with progress tracking 4. Aggregates results with detailed error reporting

Args:

suite_config: Serialized SuiteConfig containing test definitions systems_config: Serialized SystemsConfig containing system configurations executor_config: Execution parameters controlling concurrency and reporting container_config: Container execution configurations datasets_config: Optional datasets configuration for resolving dataset references score_card_configs: Optional list of score card configurations metadata_config: Optional dictionary containing metadata forward into test containers

Returns:

Execution summary with metadata and individual test results (no score cards) and container results

src.asqi.workflow.convert_test_results_to_objects(test_results_data: Dict[str, Any], test_container_data: List[Dict[str, Any]]) List[TestExecutionResult]

Convert test results data back to TestExecutionResult objects.

Args:

test_results_data: Test execution results test_container_data: Test container results containing container output and error message

Returns:

List of TestExecutionResult objects

src.asqi.workflow.add_score_cards_to_results(test_results_data: Dict[str, Any], score_card_evaluation: List[Dict[str, Any]]) Dict[str, Any]

Add score card evaluation results to test results data.

src.asqi.workflow.evaluate_score_cards_workflow(test_results_data: Dict[str, Any], test_container_data: List[Dict[str, Any]], score_card_configs: List[Dict[str, Any]], audit_responses_data: Dict[str, Any] | None = None, execution_mode: asqi.config.ExecutionMode = ExecutionMode.END_TO_END) Dict[str, Any]

Evaluate score cards against existing test results.

Args:

test_results_data: Test execution results test_container_data: Test container results containing container output and error message score_card_configs: List of score card configurations to evaluate audit_responses_data: Optional dict with manual audit responses execution_mode: Execution mode, expected modes:

  • ExecutionMode.EVALUATE_ONLY

  • ExecutionMode.END_TO_END

Returns:

Updated results with score card evaluation data

src.asqi.workflow.run_end_to_end_workflow(suite_config: Dict[str, Any], systems_config: Dict[str, Any], score_card_configs: List[Dict[str, Any]], executor_config: Dict[str, Any], container_config: asqi.config.ContainerConfig, audit_responses_data: Dict[str, Any] | None = None, datasets_config: Dict[str, Any] | None = None) Tuple[Dict[str, Any], List[Dict[str, Any]]]

Execute a complete end-to-end workflow: test execution + score card evaluation.

Args:

suite_config: Serialized SuiteConfig containing test definitions systems_config: Serialized SystemsConfig containing system configurations score_card_configs: List of score card configurations to evaluate executor_config: Execution parameters controlling concurrency and reporting container_config: Container execution configurations audit_responses_data: Optional dict with manual audit responses datasets_config: Optional datasets configuration for resolving dataset references

Returns:

Complete execution results with test results, score card evaluations and container results

src.asqi.workflow.validate_test_container_reports(all_results: List[TestExecutionResult], manifests: Dict[str, asqi.schemas.Manifest]) List[str]

Validates that the reports returned by the test container exactly match the test container manifest output_reports definitions.

Args:

all_results: List of test execution results manifests: Dictionary linking each image to its manifest

Returns:

List of validation error messages or empty list if none found

src.asqi.workflow.validate_display_reports(manifests: Dict[str, asqi.schemas.Manifest], score_card: asqi.schemas.ScoreCard, test_id_to_image: Dict[str, str])

Validate that score card ‘display_reports’ are defined in the test container manifests and match the expected structure

Args:

manifests: Dictionary linking each image to its manifest score_card: Score card configuration to validate test_id_to_image: Dictionary linking each test id to the image used

Raises:

ReportValidationError: If validation fails

src.asqi.workflow.save_results_to_file_step(results: Dict[str, Any], output_path: str) None

Save execution results to a JSON file.

src.asqi.workflow.save_container_results_to_file_step(container_results: List[Dict[str, Any]], output_path: str) None

Save container results to a JSON file.

src.asqi.workflow.start_test_execution(suite_path: str, systems_path: str, executor_config: Dict[str, Any], container_config: asqi.config.ContainerConfig, audit_responses_data: Dict[str, Any] | None = None, output_path: str | None = None, score_card_configs: List[Dict[str, Any]] | None = None, execution_mode: asqi.config.ExecutionMode = ExecutionMode.END_TO_END, test_ids: List[str] | None = None, datasets_config_path: str | None = None) str

Orchestrate test suite execution workflow.

Handles input validation, configuration loading, and workflow delegation. Actual execution logic is handled by dedicated workflow functions.

Args:

suite_path: Path to test suite YAML file systems_path: Path to systems YAML file executor_config: Executor configuration dictionary. Expected keys:

  • “concurrent_tests”: int, number of concurrent tests

  • “max_failures”: int, max number of failures to display

  • “progress_interval”: int, interval for progress updates

container_config: Container execution configurations audit_responses_data: Optional dictionary of audit responses data output_path: Optional path to save results JSON file score_card_configs: Optional list of score card configurations to evaluate execution_mode: Execution mode, expected modes:

  • ExecutionMode.TESTS_ONLY

  • ExecutionMode.END_TO_END

test_ids: Optional list of test ids to filter from suite datasets_config_path: Optional path to datasets configuration YAML file

Returns:

Workflow ID for tracking execution

Raises:

ValueError: If inputs are invalid FileNotFoundError: If configuration files don’t exist PermissionError: If configuration files cannot be read

src.asqi.workflow.start_score_card_evaluation(input_path: str, score_card_configs: List[Dict[str, Any]], audit_responses_data: Dict[str, Any] | None = None, output_path: str | None = None) str

Orchestrate score card evaluation workflow.

Handles input validation, data loading, and workflow delegation. Actual evaluation logic is handled by dedicated workflow functions.

Args:

input_path: Path to JSON file containing test execution results score_card_configs: List of score card configurations to evaluate audit_responses_data : Optional dictionary of audit responses data output_path: Optional path to save updated results JSON file

Returns:

Workflow ID for tracking execution

Raises:

ValueError: If inputs are invalid FileNotFoundError: If input file doesn’t exist json.JSONDecodeError: If input file contains invalid JSON PermissionError: If input file cannot be read

src.asqi.workflow.start_data_generation(generation_config_path: str, systems_path: str | None, executor_config: Dict[str, Any], container_config: asqi.config.ContainerConfig, output_path: str | None = None, datasets_config_path: str | None = None) str

Orchestrate data generation workflow.

Handles input validation, configuration loading, and workflow delegation. Actual execution logic is handled by dedicated workflow functions.

Args:

generation_config_path: Path to generation config YAML file systems_path: Path to systems YAML file (optional) executor_config: Executor configuration dictionary. Expected keys:

  • “concurrent_tests”: int, number of concurrent tests

  • “max_failures”: int, max number of failures to display

  • “progress_interval”: int, interval for progress updates

container_config: Container execution configurations output_path: Optional path to save results JSON file datasets_config_path: Optional path to datasets configuration YAML file

Returns:

Workflow ID for tracking execution

Raises:

ValueError: If inputs are invalid FileNotFoundError: If configuration files don’t exist PermissionError: If configuration files cannot be read

src.asqi.workflow.execute_data_generation(job_name: str, job_id: str, image: str, systems_params: Dict[str, Any], generation_params: Dict[str, Any], container_config: asqi.config.ContainerConfig, env_file: str | None = None, environment: Dict[str, str] | None = None, metadata_config: Dict[str, Any] | None = None, parent_id: str | None = None) TestExecutionResult

Execute a single data generation job in a Docker container.

Args:

job_name: Name of the generation job to execute (pre-validated) job_id: Unique ID of the generation job to execute (pre-validated) image: Docker image to run (pre-validated) systems_params: Dictionary containing generation systems and their configurations (pre-validated) generation_params: Parameters for the generation job (pre-validated) container_config: Container execution configurations env_file: Optional path to .env file for job-level environment variables environment: Optional dictionary of environment variables for the generation job metadata_config: Optional dictionary containing metadata like user_id and job_id to forward into the generation container parent_id: Optional parent workflow ID for tracking hierarchy (defaults to DBOS.workflow_id)

Returns:

TestExecutionResult containing execution metadata and results

Raises:

ValueError: If inputs fail validation or JSON output cannot be parsed RuntimeError: If container execution fails

src.asqi.workflow.run_data_generation_workflow(generation_config: Dict[str, Any], systems_config: Dict[str, Any] | None, executor_config: Dict[str, Any], container_config: asqi.config.ContainerConfig, datasets_config: Dict[str, Any] | None = None, metadata_config: Dict[str, Any] | None = None) Tuple[Dict[str, Any], List[Dict[str, Any]]]

Execute a test suite with DBOS durability (tests only, no score card evaluation).

This workflow: 1. Validates image availability and extracts manifests 2. Performs cross-validation of tests, systems, and manifests 3. Executes tests concurrently with progress tracking 4. Aggregates results with detailed error reporting

Args:

suite_config: Serialized SuiteConfig containing test definitions systems_config: Serialized SystemsConfig containing system configurations (optional) executor_config: Execution parameters controlling concurrency and reporting container_config: Container execution configurations datasets_config: Optional datasets configuration for resolving dataset references metadata_config: Optional dictionary containing metadata like user_id and job_id to forward into generation containers

Returns:

Execution summary with metadata and individual test results (no score cards) and container results