Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/praisonai-agents/praisonaiagents/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
from .tool_execution import ToolExecutionMixin
from .chat_handler import ChatHandlerMixin
from .session_manager import SessionManagerMixin
# New mixins for god class decomposition
from .chat_mixin import ChatMixin
from .execution_mixin import ExecutionMixin
from .memory_mixin import MemoryMixin

# Module-level logger for thread safety errors and debugging
logger = get_logger(__name__)
Expand Down Expand Up @@ -196,7 +200,7 @@ def __init__(self, agent_name: str, total_cost: float, max_budget: float):
f"${total_cost:.4f} >= ${max_budget:.4f}"
)

class Agent(ToolExecutionMixin, ChatHandlerMixin, SessionManagerMixin):
class Agent(ToolExecutionMixin, ChatHandlerMixin, SessionManagerMixin, ChatMixin, ExecutionMixin, MemoryMixin):
# Class-level counter for generating unique display names for nameless agents
_agent_counter = 0
_agent_counter_lock = threading.Lock()
Expand Down
127 changes: 127 additions & 0 deletions src/praisonai-agents/praisonaiagents/agent/chat_mixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""
Chat and LLM functionality for Agent class.

This module contains methods related to chat, LLM communication, streaming,
and conversation processing. Extracted from the main agent.py file for better maintainability.

Round 1 of agent god class decomposition - targeting ~1500 lines reduction.
"""

import os
import time
import json
import logging
import asyncio
import contextlib
from typing import List, Optional, Any, Dict, Union, Literal, Callable, Generator

from praisonaiagents._logging import get_logger


class ChatMixin:
"""Mixin class containing chat and LLM communication methods for the Agent class.

This mixin handles:
- Main chat() and achat() methods
- LLM response processing and formatting
- Streaming functionality
- Tool call handling in chat context
- Response templating and formatting
"""

def chat(self, prompt: str, temperature: float = 1.0, tools: Optional[List[Any]] = None,
output_json: Optional[Any] = None, output_pydantic: Optional[Any] = None,
reasoning_steps: bool = False, stream: Optional[bool] = None,
task_name: Optional[str] = None, task_description: Optional[str] = None,
task_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None,
force_retrieval: bool = False, skip_retrieval: bool = False,
attachments: Optional[List[str]] = None, tool_choice: Optional[str] = None) -> Optional[str]:
"""
Chat with the agent.

Args:
prompt: Text query that WILL be stored in chat_history
attachments: Optional list of image/file paths that are ephemeral
(used for THIS turn only, NEVER stored in history).
Supports: file paths, URLs, or data URIs.
tool_choice: Optional tool choice mode ('auto', 'required', 'none').
'required' forces the LLM to call a tool before responding.
temperature: Sampling temperature (0.0-2.0)
tools: Tools available for this conversation
output_json: JSON schema for structured output
output_pydantic: Pydantic model for structured output
reasoning_steps: Whether to include reasoning steps
stream: Whether to stream the response
task_name: Name of the task for context
task_description: Description of the task
task_id: Unique identifier for the task
config: Additional configuration
force_retrieval: Force knowledge retrieval
skip_retrieval: Skip knowledge retrieval

Returns:
The agent's response as a string, or None if blocked by hooks
"""
# This method needs to be implemented by moving logic from agent.py
# Placeholder for now - actual implementation will be moved from main agent.py
return self._chat_impl(
prompt, temperature, tools, output_json, output_pydantic,
reasoning_steps, stream, task_name, task_description, task_id,
config, force_retrieval, skip_retrieval, attachments, None, tool_choice
)

def _chat_impl(self, prompt, temperature, tools, output_json, output_pydantic,
reasoning_steps, stream, task_name, task_description, task_id,
config, force_retrieval, skip_retrieval, attachments, _trace_emitter, tool_choice=None):
"""Internal chat implementation (extracted for trace wrapping).

This method will contain the full chat logic moved from agent.py.
"""
raise NotImplementedError("This method needs to be moved from agent.py")

async def achat(self, prompt: str, temperature=1.0, tools=None, output_json=None,
output_pydantic=None, reasoning_steps=False, task_name=None,
task_description=None, task_id=None, attachments=None):
"""Async version of chat method.

This method will contain the async chat logic moved from agent.py.
"""
raise NotImplementedError("This method needs to be moved from agent.py")

def _process_agent_output(self, response: Any) -> str:
"""Process and format agent output from LLM.

This method will contain output processing logic moved from agent.py.
"""
raise NotImplementedError("This method needs to be moved from agent.py")

def _format_response(self, response: str, **kwargs) -> str:
"""Format agent response according to configured templates.

This method will contain response formatting logic moved from agent.py.
"""
raise NotImplementedError("This method needs to be moved from agent.py")

def _handle_tool_calls(self, tool_calls: List[Any]) -> Any:
"""Handle tool calls from LLM in chat context.

This method will contain tool call handling logic moved from agent.py.
"""
raise NotImplementedError("This method needs to be moved from agent.py")

def _build_multimodal_prompt(self, prompt: str, attachments: Optional[List[str]] = None) -> Union[str, List[Dict[str, Any]]]:
"""Build multimodal prompt from text and attachments.

This method will contain multimodal prompt building logic moved from agent.py.
"""
if not attachments:
return prompt
# Placeholder - actual implementation to be moved
raise NotImplementedError("This method needs to be moved from agent.py")

def chat_with_context(self, prompt: str, context: Optional[Dict[str, Any]] = None, **kwargs) -> Optional[str]:
"""Chat with additional context information.

This method will contain context-aware chat logic moved from agent.py.
"""
raise NotImplementedError("This method needs to be moved from agent.py")
196 changes: 196 additions & 0 deletions src/praisonai-agents/praisonaiagents/agent/execution_mixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
"""
Execution and runtime functionality for Agent class.

This module contains methods related to running agents, execution control,
and autonomous operation. Extracted from the main agent.py file for better maintainability.

Round 2 of agent god class decomposition - targeting ~1200 lines reduction.
"""

import os
import time
import logging
import asyncio
import concurrent.futures
from typing import List, Optional, Any, Dict, Union, Literal, Generator, Callable

from praisonaiagents._logging import get_logger


class ExecutionMixin:
"""Mixin class containing execution and runtime methods for the Agent class.

This mixin handles:
- Main run() and arun() methods
- start() and astart() entry points
- Autonomous execution (run_autonomous, run_until)
- Execution control and lifecycle management
"""

def run(self, prompt: str, **kwargs: Any) -> Optional[str]:
"""
Run the agent synchronously with a prompt.

This is a blocking method that executes the agent and returns the result.

Args:
prompt: The input prompt/query for the agent
**kwargs: Additional keyword arguments passed to underlying methods

Returns:
The agent's response as a string, or None if failed
"""
# This method needs to be implemented by moving logic from agent.py
raise NotImplementedError("This method needs to be moved from agent.py")

async def arun(self, prompt: str, **kwargs):
"""
Run the agent asynchronously with a prompt.

This is the async version of run() for non-blocking execution.

Args:
prompt: The input prompt/query for the agent
**kwargs: Additional keyword arguments

Returns:
The agent's response
"""
# This method needs to be implemented by moving logic from agent.py
raise NotImplementedError("This method needs to be moved from agent.py")

def start(self, prompt: Optional[str] = None, **kwargs: Any) -> Union[str, Generator[str, None, None], None]:
"""
Start the agent with optional prompt.

This is the main entry point for agent execution, supporting both
streaming and non-streaming modes.

Args:
prompt: Optional input prompt. If None, agent may run autonomously
**kwargs: Additional configuration options

Returns:
Agent response (string) or generator for streaming, or None
"""
# This method needs to be implemented by moving logic from agent.py
raise NotImplementedError("This method needs to be moved from agent.py")

async def astart(self, prompt: str, **kwargs):
"""
Start the agent asynchronously.

Async version of start() method.

Args:
prompt: Input prompt for the agent
**kwargs: Additional configuration options

Returns:
Agent response
"""
# This method needs to be implemented by moving logic from agent.py
raise NotImplementedError("This method needs to be moved from agent.py")

def run_autonomous(self, initial_prompt: Optional[str] = None, max_iterations: int = 10,
goal: Optional[str] = None, **kwargs) -> Any:
"""
Run the agent autonomously with self-direction.

Args:
initial_prompt: Starting prompt for autonomous execution
max_iterations: Maximum number of autonomous iterations
goal: Optional goal for the autonomous agent
**kwargs: Additional configuration

Returns:
Results from autonomous execution
"""
# This method needs to be implemented by moving logic from agent.py
raise NotImplementedError("This method needs to be moved from agent.py")

async def run_autonomous_async(self, initial_prompt: Optional[str] = None, max_iterations: int = 10,
goal: Optional[str] = None, **kwargs) -> Any:
"""
Run the agent autonomously asynchronously.

Async version of run_autonomous().

Args:
initial_prompt: Starting prompt for autonomous execution
max_iterations: Maximum number of autonomous iterations
goal: Optional goal for the autonomous agent
**kwargs: Additional configuration

Returns:
Results from autonomous execution
"""
# This method needs to be implemented by moving logic from agent.py
raise NotImplementedError("This method needs to be moved from agent.py")

def run_until(self, condition: Callable[[], bool], prompt: str, max_iterations: int = 50,
**kwargs) -> Any:
"""
Run the agent until a specific condition is met.

Args:
condition: Function that returns True when execution should stop
prompt: Input prompt for execution
max_iterations: Maximum iterations before stopping
**kwargs: Additional configuration

Returns:
Results when condition is met or max iterations reached
"""
# This method needs to be implemented by moving logic from agent.py
raise NotImplementedError("This method needs to be moved from agent.py")

async def run_until_async(self, condition: Callable[[], bool], prompt: str,
max_iterations: int = 50, **kwargs) -> Any:
"""
Async version of run_until().

Args:
condition: Function that returns True when execution should stop
prompt: Input prompt for execution
max_iterations: Maximum iterations before stopping
**kwargs: Additional configuration

Returns:
Results when condition is met or max iterations reached
"""
# This method needs to be implemented by moving logic from agent.py
raise NotImplementedError("This method needs to be moved from agent.py")

def _run_verification_hooks(self) -> List[Dict[str, Any]]:
"""
Run verification hooks during execution.

Returns:
List of hook results
"""
# This method needs to be implemented by moving logic from agent.py
raise NotImplementedError("This method needs to be moved from agent.py")

def _start_run(self, input_content: str) -> None:
"""
Initialize a new execution run.

Args:
input_content: The input that started this run
"""
# This method needs to be implemented by moving logic from agent.py
pass

def _end_run(self, output_content: str, status: str = "completed",
metrics: Optional[Dict[str, Any]] = None) -> None:
"""
Finalize the current execution run.

Args:
output_content: The output from this run
status: Completion status (completed, failed, etc.)
metrics: Optional execution metrics
"""
# This method needs to be implemented by moving logic from agent.py
pass
Loading