src.asqi.workflow¶
Attributes¶
Classes¶
Represents the result of a single test execution or data generation job. |
Functions¶
|
Check if all required Docker images are available locally. |
|
Pull missing Docker images from registries. |
|
Extract and parse manifests from a list of Docker images. |
|
DBOS step wrapper for comprehensive test plan validation. |
|
DBOS step wrapper for comprehensive score card report validation. |
|
Execute a single test in a Docker container. |
|
Evaluate score cards against test execution results. |
|
Execute a test suite with DBOS durability (tests only, no score card evaluation). |
Convert test results data back to TestExecutionResult objects. |
|
|
Add score card evaluation results to test results data. |
|
Evaluate score cards against existing test results. |
|
Execute a complete end-to-end workflow: test execution + score card evaluation. |
|
Validates that the reports returned by the test container exactly match the test container manifest output_reports definitions. |
|
Validate that score card 'display_reports' are defined in the test container manifests and match the expected structure |
|
Save execution results to a JSON file. |
Save container results to a JSON file. |
|
|
Orchestrate test suite execution workflow. |
|
Orchestrate score card evaluation workflow. |
|
Orchestrate data generation workflow. |
|
Execute a single data generation job in a Docker container. |
|
Execute a test suite with DBOS durability (tests only, no score card evaluation). |
Module Contents¶
- src.asqi.workflow.oltp_endpoint¶
- src.asqi.workflow.system_database_url¶
- src.asqi.workflow.config: dbos.DBOSConfig¶
- src.asqi.workflow.console¶
- class src.asqi.workflow.TestExecutionResult(test_name: str, test_id: str, sut_name: str | None, image: str, system_type: str | None = None)¶
Represents the result of a single test execution or data generation job.
- test_id¶
- test_name¶
- sut_name¶
- image¶
- system_type = None¶
- start_time: float = 0¶
- end_time: float = 0¶
- success: bool = False¶
- container_id: str = ''¶
- exit_code: int = -1¶
- container_output: str = ''¶
- error_message: str = ''¶
- results: Dict[str, Any]¶
- generated_reports: List[asqi.response_schemas.GeneratedReport] = []¶
- generated_datasets: List[asqi.response_schemas.GeneratedDataset] = []¶
- property execution_time: float¶
Calculate execution time in seconds.
- property test_results: Dict[str, Any]¶
Alias for results to maintain backward compatibility with score card engine.
- result_dict(use_results_field: bool = False) Dict[str, Any]¶
Convert to dictionary for storage/reporting.
- Args:
- use_results_field: If True, outputs ‘results’ field (data generation pipeline).
If False, outputs ‘test_results’ field (test execution pipeline, used by score cards).
Serializes Pydantic objects to dictionaries.
- container_dict() Dict[str, Any]¶
Convert to dictionary for storage/reporting.
- src.asqi.workflow.dbos_check_images_availability(images: List[str]) Dict[str, bool]¶
Check if all required Docker images are available locally.
- src.asqi.workflow.dbos_pull_images(images: List[str])¶
Pull missing Docker images from registries.
- src.asqi.workflow.extract_manifests_step(images: List[str]) Dict[str, asqi.schemas.Manifest]¶
Extract and parse manifests from a list of Docker images.
- Args:
images: List of Docker image names
- Returns:
Dictionary mapping Image name to Manifest
- src.asqi.workflow.validate_test_plan(suite: asqi.schemas.SuiteConfig, systems: asqi.schemas.SystemsConfig, manifests: Dict[str, asqi.schemas.Manifest]) List[str]¶
DBOS step wrapper for comprehensive test plan validation.
Delegates to validation.py for the actual validation logic. This step exists to provide DBOS durability for validation results.
- Args:
suite: Test suite configuration (pre-validated) systems: systems configuration (pre-validated) manifests: Available manifests (pre-validated)
- Returns:
List of validation error messages
- src.asqi.workflow.validate_indicator_display_reports_step(suite: asqi.schemas.SuiteConfig, manifests: Dict[str, asqi.schemas.Manifest], score_cards: List[asqi.schemas.ScoreCard]) List[str]¶
DBOS step wrapper for comprehensive score card report validation.
Delegates to validation.py for the actual validation logic. This step exists to provide DBOS durability for validation results.
- Args:
suite: Test suite configuration (pre-validated) manifests: Available manifests (pre-validated) score_cards: List of score card configurations (pre-validated)
- Returns:
List of validation error messages
- src.asqi.workflow.execute_single_test(test_name: str, test_id: str, image: str, sut_name: str, systems_params: Dict[str, Any], test_params: Dict[str, Any], container_config: asqi.config.ContainerConfig, env_file: str | None = None, environment: Dict[str, str] | None = None, metadata_config: Dict[str, Any] | None = None, parent_id: str | None = None) TestExecutionResult¶
Execute a single test in a Docker container.
Focuses solely on test execution. Input validation is handled separately in validation.py to follow single responsibility principle.
- Args:
test_name: Name of the test to execute (pre-validated) test_id: Unique ID of the test to execute (pre-validated) image: Docker image to run (pre-validated) sut_name: Name of the system under test (pre-validated) systems_params: Dictionary containing system_under_test and other systems (pre-validated) test_params: Parameters for the test (pre-validated) container_config: Container execution configurations env_file: Optional path to .env file for test-level environment variables environment: Optional dictionary of environment variables for the test metadata_config: Optional dictionary containing metadata to forward into the test container parent_id: Optional parent workflow ID for tracking hierarchy (defaults to DBOS.workflow_id)
- Returns:
TestExecutionResult containing execution metadata and results
- Raises:
ValueError: If inputs fail validation or JSON output cannot be parsed RuntimeError: If container execution fails
- src.asqi.workflow.evaluate_score_card(test_results: List[TestExecutionResult], score_card_configs: List[Dict[str, Any]], audit_responses_data: Dict[str, Any] | None = None, execution_mode: asqi.config.ExecutionMode = ExecutionMode.END_TO_END) List[Dict[str, Any]]¶
Evaluate score cards against test execution results.
- src.asqi.workflow.run_test_suite_workflow(suite_config: Dict[str, Any], systems_config: Dict[str, Any], executor_config: Dict[str, Any], container_config: asqi.config.ContainerConfig, datasets_config: Dict[str, Any] | None = None, score_card_configs: List[Dict[str, Any]] | None = None, metadata_config: Dict[str, Any] | None = None) Tuple[Dict[str, Any], List[Dict[str, Any]]]¶
Execute a test suite with DBOS durability (tests only, no score card evaluation).
This workflow: 1. Validates image availability and extracts manifests 2. Performs cross-validation of tests, systems, and manifests 3. Executes tests concurrently with progress tracking 4. Aggregates results with detailed error reporting
- Args:
suite_config: Serialized SuiteConfig containing test definitions systems_config: Serialized SystemsConfig containing system configurations executor_config: Execution parameters controlling concurrency and reporting container_config: Container execution configurations datasets_config: Optional datasets configuration for resolving dataset references score_card_configs: Optional list of score card configurations metadata_config: Optional dictionary containing metadata forward into test containers
- Returns:
Execution summary with metadata and individual test results (no score cards) and container results
- src.asqi.workflow.convert_test_results_to_objects(test_results_data: Dict[str, Any], test_container_data: List[Dict[str, Any]]) List[TestExecutionResult]¶
Convert test results data back to TestExecutionResult objects.
- Args:
test_results_data: Test execution results test_container_data: Test container results containing container output and error message
- Returns:
List of TestExecutionResult objects
- src.asqi.workflow.add_score_cards_to_results(test_results_data: Dict[str, Any], score_card_evaluation: List[Dict[str, Any]]) Dict[str, Any]¶
Add score card evaluation results to test results data.
- src.asqi.workflow.evaluate_score_cards_workflow(test_results_data: Dict[str, Any], test_container_data: List[Dict[str, Any]], score_card_configs: List[Dict[str, Any]], audit_responses_data: Dict[str, Any] | None = None, execution_mode: asqi.config.ExecutionMode = ExecutionMode.END_TO_END) Dict[str, Any]¶
Evaluate score cards against existing test results.
- Args:
test_results_data: Test execution results test_container_data: Test container results containing container output and error message score_card_configs: List of score card configurations to evaluate audit_responses_data: Optional dict with manual audit responses execution_mode: Execution mode, expected modes:
ExecutionMode.EVALUATE_ONLY
ExecutionMode.END_TO_END
- Returns:
Updated results with score card evaluation data
- src.asqi.workflow.run_end_to_end_workflow(suite_config: Dict[str, Any], systems_config: Dict[str, Any], score_card_configs: List[Dict[str, Any]], executor_config: Dict[str, Any], container_config: asqi.config.ContainerConfig, audit_responses_data: Dict[str, Any] | None = None, datasets_config: Dict[str, Any] | None = None) Tuple[Dict[str, Any], List[Dict[str, Any]]]¶
Execute a complete end-to-end workflow: test execution + score card evaluation.
- Args:
suite_config: Serialized SuiteConfig containing test definitions systems_config: Serialized SystemsConfig containing system configurations score_card_configs: List of score card configurations to evaluate executor_config: Execution parameters controlling concurrency and reporting container_config: Container execution configurations audit_responses_data: Optional dict with manual audit responses datasets_config: Optional datasets configuration for resolving dataset references
- Returns:
Complete execution results with test results, score card evaluations and container results
- src.asqi.workflow.validate_test_container_reports(all_results: List[TestExecutionResult], manifests: Dict[str, asqi.schemas.Manifest]) List[str]¶
Validates that the reports returned by the test container exactly match the test container manifest output_reports definitions.
- Args:
all_results: List of test execution results manifests: Dictionary linking each image to its manifest
- Returns:
List of validation error messages or empty list if none found
- src.asqi.workflow.validate_display_reports(manifests: Dict[str, asqi.schemas.Manifest], score_card: asqi.schemas.ScoreCard, test_id_to_image: Dict[str, str])¶
Validate that score card ‘display_reports’ are defined in the test container manifests and match the expected structure
- Args:
manifests: Dictionary linking each image to its manifest score_card: Score card configuration to validate test_id_to_image: Dictionary linking each test id to the image used
- Raises:
ReportValidationError: If validation fails
- src.asqi.workflow.save_results_to_file_step(results: Dict[str, Any], output_path: str) None¶
Save execution results to a JSON file.
- src.asqi.workflow.save_container_results_to_file_step(container_results: List[Dict[str, Any]], output_path: str) None¶
Save container results to a JSON file.
- src.asqi.workflow.start_test_execution(suite_path: str, systems_path: str, executor_config: Dict[str, Any], container_config: asqi.config.ContainerConfig, audit_responses_data: Dict[str, Any] | None = None, output_path: str | None = None, score_card_configs: List[Dict[str, Any]] | None = None, execution_mode: asqi.config.ExecutionMode = ExecutionMode.END_TO_END, test_ids: List[str] | None = None, datasets_config_path: str | None = None) str¶
Orchestrate test suite execution workflow.
Handles input validation, configuration loading, and workflow delegation. Actual execution logic is handled by dedicated workflow functions.
- Args:
suite_path: Path to test suite YAML file systems_path: Path to systems YAML file executor_config: Executor configuration dictionary. Expected keys:
“concurrent_tests”: int, number of concurrent tests
“max_failures”: int, max number of failures to display
“progress_interval”: int, interval for progress updates
container_config: Container execution configurations audit_responses_data: Optional dictionary of audit responses data output_path: Optional path to save results JSON file score_card_configs: Optional list of score card configurations to evaluate execution_mode: Execution mode, expected modes:
ExecutionMode.TESTS_ONLY
ExecutionMode.END_TO_END
test_ids: Optional list of test ids to filter from suite datasets_config_path: Optional path to datasets configuration YAML file
- Returns:
Workflow ID for tracking execution
- Raises:
ValueError: If inputs are invalid FileNotFoundError: If configuration files don’t exist PermissionError: If configuration files cannot be read
- src.asqi.workflow.start_score_card_evaluation(input_path: str, score_card_configs: List[Dict[str, Any]], audit_responses_data: Dict[str, Any] | None = None, output_path: str | None = None) str¶
Orchestrate score card evaluation workflow.
Handles input validation, data loading, and workflow delegation. Actual evaluation logic is handled by dedicated workflow functions.
- Args:
input_path: Path to JSON file containing test execution results score_card_configs: List of score card configurations to evaluate audit_responses_data : Optional dictionary of audit responses data output_path: Optional path to save updated results JSON file
- Returns:
Workflow ID for tracking execution
- Raises:
ValueError: If inputs are invalid FileNotFoundError: If input file doesn’t exist json.JSONDecodeError: If input file contains invalid JSON PermissionError: If input file cannot be read
- src.asqi.workflow.start_data_generation(generation_config_path: str, systems_path: str | None, executor_config: Dict[str, Any], container_config: asqi.config.ContainerConfig, output_path: str | None = None, datasets_config_path: str | None = None) str¶
Orchestrate data generation workflow.
Handles input validation, configuration loading, and workflow delegation. Actual execution logic is handled by dedicated workflow functions.
- Args:
generation_config_path: Path to generation config YAML file systems_path: Path to systems YAML file (optional) executor_config: Executor configuration dictionary. Expected keys:
“concurrent_tests”: int, number of concurrent tests
“max_failures”: int, max number of failures to display
“progress_interval”: int, interval for progress updates
container_config: Container execution configurations output_path: Optional path to save results JSON file datasets_config_path: Optional path to datasets configuration YAML file
- Returns:
Workflow ID for tracking execution
- Raises:
ValueError: If inputs are invalid FileNotFoundError: If configuration files don’t exist PermissionError: If configuration files cannot be read
- src.asqi.workflow.execute_data_generation(job_name: str, job_id: str, image: str, systems_params: Dict[str, Any], generation_params: Dict[str, Any], container_config: asqi.config.ContainerConfig, env_file: str | None = None, environment: Dict[str, str] | None = None, metadata_config: Dict[str, Any] | None = None, parent_id: str | None = None) TestExecutionResult¶
Execute a single data generation job in a Docker container.
- Args:
job_name: Name of the generation job to execute (pre-validated) job_id: Unique ID of the generation job to execute (pre-validated) image: Docker image to run (pre-validated) systems_params: Dictionary containing generation systems and their configurations (pre-validated) generation_params: Parameters for the generation job (pre-validated) container_config: Container execution configurations env_file: Optional path to .env file for job-level environment variables environment: Optional dictionary of environment variables for the generation job metadata_config: Optional dictionary containing metadata like user_id and job_id to forward into the generation container parent_id: Optional parent workflow ID for tracking hierarchy (defaults to DBOS.workflow_id)
- Returns:
TestExecutionResult containing execution metadata and results
- Raises:
ValueError: If inputs fail validation or JSON output cannot be parsed RuntimeError: If container execution fails
- src.asqi.workflow.run_data_generation_workflow(generation_config: Dict[str, Any], systems_config: Dict[str, Any] | None, executor_config: Dict[str, Any], container_config: asqi.config.ContainerConfig, datasets_config: Dict[str, Any] | None = None, metadata_config: Dict[str, Any] | None = None) Tuple[Dict[str, Any], List[Dict[str, Any]]]¶
Execute a test suite with DBOS durability (tests only, no score card evaluation).
This workflow: 1. Validates image availability and extracts manifests 2. Performs cross-validation of tests, systems, and manifests 3. Executes tests concurrently with progress tracking 4. Aggregates results with detailed error reporting
- Args:
suite_config: Serialized SuiteConfig containing test definitions systems_config: Serialized SystemsConfig containing system configurations (optional) executor_config: Execution parameters controlling concurrency and reporting container_config: Container execution configurations datasets_config: Optional datasets configuration for resolving dataset references metadata_config: Optional dictionary containing metadata like user_id and job_id to forward into generation containers
- Returns:
Execution summary with metadata and individual test results (no score cards) and container results