refactor: modernize engine with Pydantic models, modular architecture, and session recovery#166
refactor: modernize engine with Pydantic models, modular architecture, and session recovery#166anticomputer wants to merge 17 commits intomainfrom
Conversation
…r CLI - Extract __main__.py into cli.py, runner.py, mcp_lifecycle.py, models.py - Add Pydantic v2 grammar models for all YAML document types - Replace argparse with Typer CLI, add project.scripts entry point - Reduce ruff ignore list from 59 to 22 rules, fix lint issues - Add 29 model tests against real YAML files (68/68 pass) - Full grammar backwards compatibility preserved
Replace all raw dict access with typed model attributes throughout the engine. AvailableTools now returns Pydantic model instances, runner.py uses typed fields, and tests validate via model attributes.
Split mcp_utils into mcp_transport and mcp_prompt. Extract prompt_parser to break cli/runner circular import. Decompose run_main into focused helpers. Add type hints and docstrings across all modules. Add __all__ exports, pyproject keywords and classifiers.
Add api_type field to ModelConfigDocument (chat_completions|responses). Thread api_type through runner -> deploy_task_agents -> TaskAgent. TaskAgent switches between OpenAIChatCompletionsModel and OpenAIResponsesModel based on api_type. Default: chat_completions.
model_settings in model_config can now set api_type, endpoint, and token per model. token is an env var name to resolve. Allows mixing chat_completions and responses API across models and routing to different endpoints within a single taskflow.
Document api_type, endpoint, and token per-model overrides. Update architecture module listing in README. Add per-model settings reference table to GRAMMAR.md.
- Fix mutable default args in TaskAgent.__init__ (list -> None) - Replace bare except Exception with specific exception types - Add __all__ exports to all 14 submodules - Add docstrings to all hook methods in agent.py - Add return type to banner.py and prompt_parser.py - Add module docstrings to capi.py, env_utils.py, shell_utils.py - Remove B006 ruff suppression (no longer needed)
Add --debug/-d flag and TASK_AGENT_DEBUG env var. Default: one-line error chain. Debug: full traceback.
Add TaskflowSession model for task-level checkpointing. Save progress after each task, auto-retry failed tasks 3x. New --resume flag to continue from last checkpoint. Includes 9 tests for session persistence.
Covers: shell tasks, repeat_prompt, async iteration, model_config, globals, inputs, env, MCP toolboxes, headless, blocked_tools, reusable tasks (uses), reusable prompts (include), and handoffs.
| """ | ||
| Convert the endpoint to its full URL. | ||
| """ | ||
| def to_url(self) -> str: |
Check notice
Code scanning / CodeQL
Explicit returns mixed with implicit (fall through) returns Note
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 3 days ago
In general, to fix “explicit returns mixed with implicit returns” you ensure that all code paths in a function either (a) return a value explicitly or (b) clearly never return (e.g., always raise), and you avoid leaving a path that implicitly returns None. For AI_API_ENDPOINT_ENUM.to_url, the function already either returns a string or raises a ValueError for unsupported endpoints. To satisfy CodeQL and make the intent explicit, we can add a final return statement after the match block that is never actually reached but documents the return type and eliminates any perceived implicit None return.
Concretely, in src/seclab_taskflow_agent/capi.py, directly after the match self: block (after the case _: arm that raises ValueError), add an explicit return str(self) (or any sensible default str expression). This line will be unreachable in practice because all cases above either return or raise, so it does not change existing behavior. No new imports or helper methods are required.
| @@ -39,8 +39,9 @@ | ||
| return f"https://{self}/v1" | ||
| case _: | ||
| raise ValueError(f"Unsupported endpoint: {self}") | ||
| # Fallback return to satisfy static analysis; this line should be unreachable. | ||
| return str(self) | ||
|
|
||
|
|
||
| COPILOT_INTEGRATION_ID = "vscode-chat" | ||
|
|
||
|
|
|
|
||
|
|
||
| def supports_tool_calls(model: str, models: dict) -> bool: | ||
| def supports_tool_calls(model: str, models: dict[str, dict]) -> bool: |
Check notice
Code scanning / CodeQL
Explicit returns mixed with implicit (fall through) returns Note
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 3 days ago
In general, to fix "explicit returns mixed with implicit returns" you ensure that all execution paths from a function either (a) explicitly return a value compatible with the type hint, or (b) raise an exception, and you avoid relying on the implicit None at the end of the function. For supports_tool_calls, every case returns a bool or raises a ValueError, but there is no explicit return after the match. Adding a final explicit return False ensures that, even if a new netloc value slips through without matching any case (e.g., due to future changes), the function will still return a bool rather than None.
Concretely, in src/seclab_taskflow_agent/capi.py, after the match netloc: block in supports_tool_calls, add a final return False at the end of the function body. No imports or new helpers are needed. This keeps existing semantics for all currently possible netloc values (since the line is unreachable in the current code paths) while satisfying the static analysis rule and the type annotation.
| @@ -127,6 +127,7 @@ | ||
| f"Unsupported Model Endpoint: {api_endpoint}\n" | ||
| f"Supported endpoints: {[e.to_url() for e in AI_API_ENDPOINT_ENUM]}" | ||
| ) | ||
| return False | ||
|
|
||
|
|
||
| def list_tool_call_models(token: str) -> dict[str, dict]: |
| from .mcp_transport import ( # noqa: F401 | ||
| AsyncDebugMCPServerStdio as AsyncDebugMCPServerStdio, | ||
| ReconnectingMCPServerStdio as ReconnectingMCPServerStdio, | ||
| StreamableMCPThread as StreamableMCPThread, | ||
| ) |
Check notice
Code scanning / CodeQL
Unused import Note
Copilot Autofix
AI 3 days ago
Copilot could not generate an autofix suggestion
Copilot could not generate an autofix suggestion for this alert. Try pushing a new commit or if the problem persists contact support.
There was a problem hiding this comment.
Pull request overview
Refactors the SecLab Taskflow Agent from a monolithic entrypoint into a modular engine with Pydantic v2-validated YAML grammar models, a Typer CLI, per-model API configuration (Chat Completions vs Responses), and task-level checkpoint/resume support.
Changes:
- Introduces typed Pydantic document models and updates YAML loading to validate at parse time.
- Replaces the old argparse-based entrypoint with a Typer CLI and moves execution logic into a dedicated runner with MCP lifecycle modules.
- Adds session checkpointing/resume plus new examples/docs/tests for the modernized architecture and per-model API settings.
Reviewed changes
Copilot reviewed 35 out of 35 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_yaml_parser.py | Updates YAML parsing assertions to use Pydantic document attributes. |
| tests/test_template_utils.py | Switches reusable taskflow prompt access to typed models. |
| tests/test_session.py | Adds unit tests for session checkpoint/load/list behavior. |
| tests/test_models.py | Adds validation tests for Pydantic grammar models and real YAML fixtures. |
| tests/test_cli_parser.py | Updates legacy prompt parsing tests for renamed unpack variables and typed taskflow access. |
| src/seclab_taskflow_agent/template_utils.py | Adds typing/exports and adapts prompt loading to Pydantic PromptDocument. |
| src/seclab_taskflow_agent/shell_utils.py | Adds typing/docs and improves shell error messaging. |
| src/seclab_taskflow_agent/session.py | Implements taskflow session persistence for checkpoint/resume. |
| src/seclab_taskflow_agent/runner.py | New modular execution engine with model resolution, reusable tasks, prompt expansion, retries, and session handling. |
| src/seclab_taskflow_agent/render_utils.py | Adds typing/exports and docs for async output buffering. |
| src/seclab_taskflow_agent/prompt_parser.py | Extracts legacy argparse parsing used by runtime/test compatibility. |
| src/seclab_taskflow_agent/path_utils.py | Centralizes platformdirs data-dir creation and reuses it for MCP data storage. |
| src/seclab_taskflow_agent/models.py | Defines the YAML grammar via Pydantic v2 models (taskflow/personality/toolbox/model_config/prompt). |
| src/seclab_taskflow_agent/mcp_utils.py | Slims MCP utilities and re-exports transport/prompt; updates toolbox param resolution for typed models. |
| src/seclab_taskflow_agent/mcp_transport.py | Extracts MCP transport implementations (stdio wrappers + streamable process thread). |
| src/seclab_taskflow_agent/mcp_servers/memcache/memcache_backend/dictionary_file.py | Tightens type comparisons for memcache “add_state”. |
| src/seclab_taskflow_agent/mcp_servers/codeql/jsonrpyc/init.py | Replaces bare except with except Exception. |
| src/seclab_taskflow_agent/mcp_servers/codeql/client.py | Adds logging import (used for client logging). |
| src/seclab_taskflow_agent/mcp_prompt.py | Extracts system prompt construction helper. |
| src/seclab_taskflow_agent/mcp_lifecycle.py | Extracts MCP server connect/cleanup lifecycle management. |
| src/seclab_taskflow_agent/env_utils.py | Adds typing/docs and exports for env swap + temporary env context manager. |
| src/seclab_taskflow_agent/cli.py | Adds Typer CLI with --debug/--resume, concise error printing, and logging setup. |
| src/seclab_taskflow_agent/capi.py | Adds module docs/exports and typing improvements for token/endpoint/model listing. |
| src/seclab_taskflow_agent/banner.py | Adds typing/exports and docstring for startup banner. |
| src/seclab_taskflow_agent/available_tools.py | Rewrites YAML loader to cache + validate documents via Pydantic models. |
| src/seclab_taskflow_agent/agent.py | Extends TaskAgent to support Responses API, per-model endpoint/token overrides, and adds exports/docs. |
| src/seclab_taskflow_agent/main.py | Replaces monolithic implementation with a small entrypoint delegating to CLI/runner. |
| src/seclab_taskflow_agent/init.py | Adds package-level docs and re-exports key public types. |
| pyproject.toml | Updates metadata, adds console script entrypoint, and tightens Ruff configuration. |
| examples/taskflows/example_responses_api.yaml | Adds an example taskflow configured to use the Responses API. |
| examples/taskflows/echo_responses_api.yaml | Adds a Responses API echo taskflow example. |
| examples/taskflows/comprehensive_test.yaml | Adds a comprehensive taskflow exercising the full grammar and engine features. |
| examples/model_configs/responses_api.yaml | Adds model_config demonstrating per-model api_type/endpoint/token overrides. |
| doc/GRAMMAR.md | Documents per-model settings and engine-handled model_settings keys. |
| README.md | Documents the new architecture, API types, session recovery, and error output behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
- fix uninitialized args and inconsistent return shape in prompt_parser - add explicit returns after exhaustive match statements in capi - narrow BaseException to Exception in mcp_transport thread - add comment for empty except in cleanup shutdown path - iterate over copy in mcp_lifecycle cleanup to avoid mutation during iteration - remove unused _ENGINE_SETTING_KEYS constant from runner - remove unused import in test_session - default TaskDefinition.name/description to empty string for per-index naming - use explicit re-export pattern (as X) in __main__ and cli
| break | ||
|
|
||
| # Re-export for backwards compatibility — some tests import from __main__ | ||
| from .prompt_parser import parse_prompt_args as parse_prompt_args # noqa: E402 |
Check notice
Code scanning / CodeQL
Unused import Note
Copilot Autofix
AI 3 days ago
Copilot could not generate an autofix suggestion
Copilot could not generate an autofix suggestion for this alert. Try pushing a new commit or if the problem persists contact support.
|
|
||
| # Re-export for backwards compatibility — some tests import from __main__ | ||
| from .prompt_parser import parse_prompt_args as parse_prompt_args # noqa: E402 | ||
| from .runner import deploy_task_agents as deploy_task_agents # noqa: E402 |
Check notice
Code scanning / CodeQL
Unused import Note
Copilot Autofix
AI 3 days ago
Copilot could not generate an autofix suggestion
Copilot could not generate an autofix suggestion for this alert. Try pushing a new commit or if the problem persists contact support.
| # Re-export for backwards compatibility — some tests import from __main__ | ||
| from .prompt_parser import parse_prompt_args as parse_prompt_args # noqa: E402 | ||
| from .runner import deploy_task_agents as deploy_task_agents # noqa: E402 | ||
| from .runner import run_main as run_main # noqa: E402 |
Check notice
Code scanning / CodeQL
Unused import Note
Copilot Autofix
AI 3 days ago
Copilot could not generate an autofix suggestion
Copilot could not generate an autofix suggestion for this alert. Try pushing a new commit or if the problem persists contact support.
| # Legacy compatibility shim — implementation moved to prompt_parser.py | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| from .prompt_parser import parse_prompt_args as parse_prompt_args # noqa: E402 |
Check notice
Code scanning / CodeQL
Unused import Note
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 4 days ago
To fix the unused import, remove the import statement for parse_prompt_args from cli.py. In general, unused-import issues are resolved by either deleting the import or using the imported symbol; here there is no use and no indication that it should be re-exported (it is not in __all__), so the best fix is deletion.
Concretely, in src/seclab_taskflow_agent/cli.py, at the end of the file under the "Legacy compatibility shim" comment, delete the line:
from .prompt_parser import parse_prompt_args as parse_prompt_args # noqa: E402No additional methods, imports, or definitions are needed. This change does not alter existing runtime behavior within cli.py, because the imported name was never referenced.
| @@ -178,4 +178,3 @@ | ||
| # Legacy compatibility shim — implementation moved to prompt_parser.py | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| from .prompt_parser import parse_prompt_args as parse_prompt_args # noqa: E402 |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 35 out of 35 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
- remove global OpenAI client/api mutations (race condition with concurrent agents) - add empty resolved_agents validation before deploy_task_agents - store toolbox name on MCPServerEntry instead of accessing private _name - update last_tool_results unconditionally in session.record_task - move raises into match default cases to fix mixed return warnings - catch non-SystemExit exceptions in prompt parser
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 36 out of 36 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
src/seclab_taskflow_agent/runner.py
Outdated
| # Checkpoint after successful task | ||
| session.record_task( | ||
| index=task_index, | ||
| name=task_name, | ||
| success=task_complete, | ||
| tool_results=list(last_mcp_tool_results), | ||
| ) |
There was a problem hiding this comment.
The session checkpoint records the task as completed (session.record_task(... success=task_complete ...)) before the must_complete and not task_complete failure handling below. Since TaskflowSession.next_task_index is based on completed_tasks, a failed must_complete task will be skipped when resuming, preventing recovery/retry of the failed task. Consider only recording tasks as completed when they succeed (or adjust next_task_index to only advance past successful tasks), and ensure the must_complete failure path does not advance the resume cursor past the failing task.
| for g in args[0].globals: | ||
| if "=" not in g: | ||
| logging.error(f"Invalid global variable format: {g}. Expected KEY=VALUE") | ||
| return None, None, None, None, None, help_msg |
There was a problem hiding this comment.
On invalid -g/--global syntax (missing '='), parse_prompt_args() returns (None, None, None, None, None, help_msg), making the returned prompt None. This contradicts the annotated return type (which expects a str prompt in the error tuple) and can propagate None into callers that expect a string prompt (e.g., the runner’s embedded -p parsing path). Consider returning an empty string for the prompt (consistent with the other error paths in this function) or raising a structured error instead.
| return None, None, None, None, None, help_msg | |
| return None, None, None, None, "", help_msg |
| f"Cannot load {toolname} because {pkg_dir} is not a valid directory." | ||
| ) | ||
| filepath = pkg_dir.joinpath(filename + ".yaml") | ||
| with open(filepath) as fh: |
There was a problem hiding this comment.
importlib.resources.files(...).joinpath(...) returns a Traversable, which is not guaranteed to be an os.PathLike. Using open(filepath) can fail for non-filesystem resources (e.g., when the package is loaded from a zip/wheel), raising TypeError. To make resource loading robust across packaging formats, use filepath.open() (Traversable API) or importlib.resources.as_file() / read_text() instead of builtin open() on the traversable.
| with open(filepath) as fh: | |
| with filepath.open() as fh: |
| logging.warning("repeat_prompt enabled but no {{ result }} in prompt") | ||
| try: | ||
| last_result = json.loads(last_mcp_tool_results.pop()) | ||
| text = last_result.get("text", "") | ||
| try: | ||
| iterable_result = json.loads(text) | ||
| except json.JSONDecodeError as exc: |
There was a problem hiding this comment.
_build_prompts_to_run() consumes the last tool result via last_mcp_tool_results.pop() before validating/parsing it. If JSON parsing fails (or the result is non-iterable), an exception is raised and the popped entry is lost; with the task auto-retry loop this will make subsequent attempts fail with IndexError (no last result), and it also breaks resume semantics because the last successful tool result is no longer available. Consider peeking at the last element (or re-inserting it on exception) and only removing it from the list after successful parsing/validation.
- don't advance resume cursor past failed must_complete tasks - peek at last tool result before consuming (safe for retry) - use Traversable.open() for zip/wheel compatibility - return empty string (not None) for prompt on invalid globals
The engine code was a single 671-line
__main__.pywith raw dict access everywhere, no type safety, and no error recovery. This rewrites the internals while keeping the YAML grammar fully backwards-compatible. All existing taskflows (both example and production seclab-taskflows) work without changes.What changed
Architecture.
__main__.pyis now 25 lines. The logic lives in focused modules:models.py-- Pydantic v2 models for all five YAML document types (taskflow, personality, toolbox, model_config, prompt). Validation happens at parse time instead of scattering.get()calls everywhere.runner.py-- execution engine, extracted from__main__and decomposed into_resolve_model_config(),_merge_reusable_task(),_resolve_task_model(),_build_prompts_to_run().cli.py-- Typer-based CLI replacing argparse. Adds--debugand--resumeflags.agent.py-- TaskAgent wrapper, now supports per-model API type, endpoint, and token overrides.mcp_lifecycle.py,mcp_transport.py,mcp_prompt.py-- decomposed from the old 462-linemcp_utils.py.session.py-- task-level checkpoint/resume with auto-retry.prompt_parser.py-- extracted to break a circular import between cli and runner.Pydantic grammar models. Every YAML file is validated against a typed model at load time.
extra="allow"on all models so new fields do not break old parsers. Reserved word conflicts handled via aliases (async->async_task,model_config->model_config_ref). Version normalization (1->"1.0") with validation.Per-model API configuration.
model_settingsin a model_config can now includeapi_type(chat_completions or responses),endpoint, andtoken(env var name). These are extracted by the engine before passing remaining settings to the SDK. This lets you mix endpoints and API types across models in a single taskflow.Session recovery. Taskflow runs are checkpointed after each task. On unrecoverable failure, the session ID is printed and
--resume <id>picks up from the last successful task. Failed tasks auto-retry 3 times with backoff before giving up.Error output. Exceptions now print a concise one-line chain by default instead of dumping 80-line tracebacks through httpx/httpcore/openai internals.
--debugorTASK_AGENT_DEBUG=1gets you the full trace.Code quality. Type hints and docstrings on all public functions.
__all__exports on every module. Fixed mutable default args inTaskAgent.__init__. Replaced bareexcept Exceptionwith specific types. Zero ruff errors across the entire codebase. Ruff ignores reduced from 59 to 21.What did not change
The YAML grammar is 100% backwards-compatible. No taskflow files need modification. All field names, nesting, template syntax, and behavior are preserved.
Testing