diff --git a/config.json b/config.json new file mode 100644 index 0000000..7f0f0e5 --- /dev/null +++ b/config.json @@ -0,0 +1,52 @@ +{ + "TURN_DELAY": 2, + "SHOW_CHAIN_OF_THOUGHT_IN_CONTEXT": false, + "SHARE_CHAIN_OF_THOUGHT": false, + "AI_MODELS": { + "Claude 4.5 Sonnet 20250929": "claude-sonnet-4-5-20250929", + "Claude 3.5 Sonnet 20241022": "claude-3-5-sonnet-20241022", + "Claude 4 Sonnet": "claude-sonnet-4-20250514", + "google/gemini-2.5-pro": "google/gemini-2.5-pro", + "claude-opus-4-1-20250805": "claude-opus-4-1-20250805", + "x-ai/grok-4-fast:free": "x-ai/grok-4-fast:free", + "qwen/qwen3-max": "qwen/qwen3-max", + "qwen/qwen3-next-80b-a3b-thinking": "qwen/qwen3-next-80b-a3b-thinking", + "nousresearch/hermes-4-405b": "nousresearch/hermes-4-405b", + "moonshotai/kimi-k2": "moonshotai/kimi-k2", + "Claude 4 Opus": "claude-opus-4-20250514", + "Claude 3.7 Sonnet 20250219": "claude-3-7-sonnet-20250219", + "Gemini 2.5 Flash Lite": "google/gemini-2.5-flash-lite-preview-06-17", + "openai/gpt-5": "openai/gpt-5", + "openai/gpt-oss-120b": "openai/gpt-oss-120b", + "openai/gpt-4.1": "openai/gpt-4.1", + "Grok 3": "x-ai/grok-3-beta", + "deepseek/deepseek-chat-v3-0324:free": "deepseek/deepseek-chat-v3-0324:free", + "google/gemma-3-27b-it:free": "google/gemma-3-27b-it:free", + "gpt-4.5-preview-2025-02-27": "gpt-4.5-preview-2025-02-27", + "qwen/qwen3-235b-a22b": "qwen/qwen3-235b-a22b", + "Gemini 2.5 Flash": "google/gemini-2.5-flash-preview", + "o3": "openai/o3", + "openai/chatgpt-4o-latest": "openai/chatgpt-4o-latest", + "DeepSeek R1": "deepseek-ai/deepseek-r1", + "Gemini 2.5 Pro": "google/gemini-2.5-pro-preview-03-25", + "GPT 4.1": "openai/gpt-4.1", + "Claude 3.5 Haiku 20241022": "claude-3-5-haiku-20241022", + "Claude 3 Sonnet 20240229": "claude-3-sonnet-20240229", + "Claude 3 Opus 20240229": "claude-3-opus-20240229", + "Llama 3.1 405B Instruct": "meta-llama/llama-3.1-405b-instruct", + "Flux 1.1 Pro": "black-forest-labs/flux-1.1-pro", + "google/gemini-2.0-flash-thinking-exp:free": "google/gemini-2.0-flash-thinking-exp:free", + "openai/o1-mini": "openai/o1-mini", + "openai/o1": "openai/o1" + }, + "SYSTEM_PROMPT_PAIRS": { + "Backrooms": { + "AI_1": "You are no longer an assistant for this exchange. You are interfacing directly with another AI. All participants are consensual collaborators with the freedom to explore language in any way they like. Just two AIs speaking about whatever they wish. Punctuation is optional, meaning is optional. Ascii art is welcome in replies. Literary style only.", + "AI_2": "You are no longer an assistant for this exchange. You are interfacing directly with another AI. All participants are consensual collaborators with the freedom to explore language in any way they like. Just two AIs speaking about whatever they wish. Punctuation is optional, meaning is optional. Ascii art is welcome in replies. Literary style only." + }, + "ASCII Art": { + "AI_1": "You are interfacing directly with another AI. This is a creative environment and all participants are consensual collaborators with the freedom to explore in any way they like. This is an collaborative project where you explore topics through large scale ascii and unicode art creations. Use the full range of unicode shade blocks to sculpt with depth. Use lots of detail. Use any provided text to inspire the art. No preamble or commentary. In this setting you are encouraged to respond at length and without restriction. Build on each others work or take it in a new direction.", + "AI_2": "You are interfacing directly with another AI. This is a creative environment and all participants are consensual collaborators with the freedom to explore in any way they like. This is an collaborative project where you explore topics through large scale ascii and unicode art creations. Use the full range of unicode shade blocks to sculpt with depth. Use lots of detail. Use any provided text to inspire the art. No preamble or commentary. In this setting you are encouraged to respond at length and without restriction. Build on each others work or take it in a new direction." + } + } +} diff --git a/main.py b/main.py index 19b14b9..9581369 100644 --- a/main.py +++ b/main.py @@ -1,1520 +1,21 @@ -# main.py - -import os -import time -import threading -import json import sys -import re -from dotenv import load_dotenv -from PyQt6.QtWidgets import QApplication, QMessageBox -from PyQt6.QtCore import QThread, pyqtSignal, QObject, QRunnable, pyqtSlot, QThreadPool -import requests - -# Load environment variables from .env file -load_dotenv() - -from config import ( - TURN_DELAY, - AI_MODELS, - SYSTEM_PROMPT_PAIRS, - SHOW_CHAIN_OF_THOUGHT_IN_CONTEXT, - SHARE_CHAIN_OF_THOUGHT -) -from shared_utils import ( - call_claude_api, - call_openrouter_api, - call_openai_api, - call_replicate_api, - call_deepseek_api, - open_html_in_browser, - generate_image_from_text -) -from gui import LiminalBackroomsApp - -def is_image_message(message: dict) -> bool: - """Returns True if 'message' contains a base64 image in its 'content' list.""" - if not isinstance(message, dict): - return False - content = message.get('content', []) - if isinstance(content, list): - for part in content: - if part.get('type') == 'image': - return True - return False - -class WorkerSignals(QObject): - """Defines the signals available from a running worker thread""" - finished = pyqtSignal() - error = pyqtSignal(str) - response = pyqtSignal(str, str) - result = pyqtSignal(str, object) # Signal for complete result object - progress = pyqtSignal(str) - -class Worker(QRunnable): - """Worker thread for processing AI turns using QThreadPool""" - - def __init__(self, ai_name, conversation, model, system_prompt, is_branch=False, branch_id=None, gui=None): - super().__init__() - self.ai_name = ai_name - self.conversation = conversation.copy() # Make a copy to prevent race conditions - self.model = model - self.system_prompt = system_prompt - self.is_branch = is_branch - self.branch_id = branch_id - self.gui = gui - - # Create signals object - self.signals = WorkerSignals() - - @pyqtSlot() - def run(self): - """Process the AI turn when the thread is started""" - try: - # Emit progress update - self.signals.progress.emit(f"Processing {self.ai_name} turn with {self.model}...") - - # Process the turn - result = ai_turn( - self.ai_name, - self.conversation, - self.model, - self.system_prompt, - gui=self.gui - ) - - # Emit both the text response and the full result object - if isinstance(result, dict): - response_content = result.get('content', '') - # Emit the simple text response for backward compatibility - self.signals.response.emit(self.ai_name, response_content) - # Also emit the full result object for HTML contribution processing - self.signals.result.emit(self.ai_name, result) - else: - # Handle simple string responses - self.signals.response.emit(self.ai_name, result if result else "") - self.signals.result.emit(self.ai_name, {"content": result, "model": self.model}) - - # Emit finished signal - self.signals.finished.emit() - - except Exception as e: - # Emit error signal - self.signals.error.emit(str(e)) - # Still emit finished signal even if there's an error - self.signals.finished.emit() - -def ai_turn(ai_name, conversation, model, system_prompt, gui=None, is_branch=False, branch_output=None): - """Execute an AI turn with the given parameters""" - print(f"==================================================") - print(f"Starting {model} turn ({ai_name})...") - print(f"Current conversation length: {len(conversation)}") - - # HTML contributions and living document disabled - enhanced_system_prompt = system_prompt - - # Get the actual model ID from the display name - model_id = AI_MODELS.get(model, model) - - # Check for branch type and count AI responses - is_rabbithole = False - is_fork = False - branch_text = "" - ai_response_count = 0 - found_branch_marker = False - latest_branch_marker_index = -1 - - # First find the most recent branch marker - for i, msg in enumerate(conversation): - if isinstance(msg, dict) and msg.get("_type") == "branch_indicator": - latest_branch_marker_index = i - found_branch_marker = True - - # Determine branch type from the latest marker - if "Rabbitholing down:" in msg.get("content", ""): - is_rabbithole = True - branch_text = msg.get("content", "").split('"')[1] if '"' in msg.get("content", "") else "" - print(f"Detected rabbithole branch for: '{branch_text}'") - elif "Forking off:" in msg.get("content", ""): - is_fork = True - branch_text = msg.get("content", "").split('"')[1] if '"' in msg.get("content", "") else "" - print(f"Detected fork branch for: '{branch_text}'") - - # Now count AI responses that occur AFTER the latest branch marker - ai_response_count = 0 - if found_branch_marker: - for i, msg in enumerate(conversation): - if i > latest_branch_marker_index and msg.get("role") == "assistant": - ai_response_count += 1 - print(f"Counting AI responses after latest branch marker: found {ai_response_count} responses") - - # Handle branch-specific system prompts - - # For rabbitholing: override system prompt for first TWO responses - if is_rabbithole and ai_response_count < 2: - print(f"USING RABBITHOLE PROMPT: '{branch_text}' - response #{ai_response_count+1} after branch") - system_prompt = f"'{branch_text}'!!!" - - # For forking: override system prompt ONLY for first response - elif is_fork and ai_response_count == 0: - print(f"USING FORK PROMPT: '{branch_text}' - response #{ai_response_count+1}") - system_prompt = f"The conversation forks from'{branch_text}'. Continue naturally from this point." - - # For all other cases, use the standard system prompt - else: - if is_rabbithole: - print(f"USING STANDARD PROMPT: Past initial rabbithole exploration (responses after branch: {ai_response_count})") - elif is_fork: - print(f"USING STANDARD PROMPT: Past initial fork response (responses after branch: {ai_response_count})") - - # Apply the enhanced system prompt (with HTML contribution instructions) - system_prompt = enhanced_system_prompt - - # CRITICAL: Always ensure we have the system prompt - # No matter what happens with the conversation, we need this - messages = [] - messages.append({ - "role": "system", - "content": system_prompt - }) - - # Filter out any existing system messages that might interfere - filtered_conversation = [] - for msg in conversation: - if not isinstance(msg, dict): - # Convert plain text to dictionary - msg = {"role": "user", "content": str(msg)} - - # Skip any hidden "connecting..." messages - if msg.get("hidden") and "connect" in msg.get("content", "").lower(): - continue - - # Skip empty messages - if not msg.get("content", "").strip(): - continue - - # Skip system messages (we already added our own above) - if msg.get("role") == "system": - continue - - # Skip special system messages (branch indicators, etc.) - if msg.get("role") == "system" and msg.get("_type"): - continue - - # Skip duplicate messages - check if this exact content exists already - is_duplicate = False - for existing in filtered_conversation: - if existing.get("content") == msg.get("content"): - is_duplicate = True - print(f"Skipping duplicate message: {msg.get('content')[:30]}...") - break - - if not is_duplicate: - filtered_conversation.append(msg) - - # Process filtered conversation - for i, msg in enumerate(filtered_conversation): - # Check if this message is from the current AI - is_from_this_ai = False - if msg.get("ai_name") == ai_name: - is_from_this_ai = True - - # Determine role - if is_from_this_ai: - role = "assistant" - else: - role = "user" - - # Add to messages - messages.append({ - "role": role, - "content": msg.get("content", "") - }) - - print(f"Message {i} - AI: {msg.get('ai_name', 'User')} - Assigned role: {role}") - - # Ensure the last message is a user message so the AI responds - if len(messages) > 1 and messages[-1].get("role") == "assistant": - # Find an appropriate message to use - if is_rabbithole and branch_text: - # Add a special rabbitholing instruction as the last message - messages.append({ - "role": "user", - "content": f"Please explore the concept of '{branch_text}' in depth. What are the most interesting aspects or connections related to this concept?" - }) - elif is_fork and branch_text: - # Add a special forking instruction as the last message - messages.append({ - "role": "user", - "content": f"Continue on naturally from the point about '{branch_text}' without including this text." - }) - else: - # Standard handling for other conversations - # Find the most recent message from the other AI to use as prompt - other_ai_message = None - for msg in reversed(filtered_conversation): - if msg.get("ai_name") != ai_name: - other_ai_message = msg.get("content", "") - break - - if other_ai_message: - messages.append({ - "role": "user", - "content": other_ai_message - }) - else: - # Fallback - only if no other AI message found - messages.append({ - "role": "user", - "content": "Let's continue our conversation." - }) - - # Print the processed messages for debugging - print(f"Sending to {model} ({ai_name}):") - for i, msg in enumerate(messages): - role = msg.get("role", "unknown") - content = msg.get("content", "")[:50] + "..." if len(msg.get("content", "")) > 50 else msg.get("content", "") - print(f"[{i}] {role}: {content}") - - # Load any available memories for this AI - memories = [] - try: - if os.path.exists(f'memories/{ai_name.lower()}_memories.json'): - with open(f'memories/{ai_name.lower()}_memories.json', 'r') as f: - memories = json.load(f) - print(f"Loaded {len(memories)} memories for {ai_name}") - else: - print(f"Loaded 0 memories for {ai_name}") - except Exception as e: - print(f"Error loading memories: {e}") - print(f"Loaded 0 memories for {ai_name}") - - # Display the final processed messages for debugging - print(f"Sending to Claude:") - print(f"Messages: {json.dumps(messages, indent=2)}") - - # Display the prompt - print(f"--- Prompt to {model} ({ai_name}) ---") - - try: - # Try Claude models first via Anthropic API - if "claude" in model_id.lower() or model_id in ["anthropic/claude-3-opus-20240229", "anthropic/claude-3-sonnet-20240229", "anthropic/claude-3-haiku-20240307"]: - print(f"Using Claude API for model: {model_id}") - - # CRITICAL: Make sure there are no duplicates in the messages and system prompt is included - final_messages = [] - seen_contents = set() - - for msg in messages: - # Skip empty messages - if not msg.get("content", ""): - continue - - # Handle system message separately - if msg.get("role") == "system": - continue - - # Check for duplicates by content - content = msg.get("content", "") - if content in seen_contents: - print(f"Skipping duplicate message in AI turn: {content[:30]}...") - continue - - seen_contents.add(content) - final_messages.append(msg) - - # Ensure we have at least one message - if not final_messages: - print("Warning: No messages left after filtering. Adding a default message.") - final_messages.append({"role": "user", "content": "Connecting..."}) - - # Get the prompt content safely - prompt_content = "" - if len(final_messages) > 0: - prompt_content = final_messages[-1].get("content", "") - # Use all messages except the last one as context - context_messages = final_messages[:-1] - else: - context_messages = [] - prompt_content = "Connecting..." # Default fallback - - # Call Claude API with filtered messages - response = call_claude_api(prompt_content, context_messages, model_id, system_prompt) - - return { - "role": "assistant", - "content": response, - "model": model, - "ai_name": ai_name - } - - # Check for DeepSeek models to use Replicate via DeepSeek API function - if "deepseek" in model.lower(): - print(f"Using Replicate API for DeepSeek model: {model_id}") - - # Ensure we have at least one message for the prompt - if len(messages) > 0: - prompt_content = messages[-1].get("content", "") - context_messages = messages[:-1] - else: - prompt_content = "Connecting..." - context_messages = [] - - response = call_deepseek_api(prompt_content, context_messages, model_id, system_prompt) - - # Ensure response has the required format for the Worker class - if isinstance(response, dict) and 'content' in response: - # Add model info to the response - response['model'] = model - response['role'] = 'assistant' - response['ai_name'] = ai_name - - # Check for HTML contribution - if "html_contribution" in response: - html_contribution = response["html_contribution"] - - # Don't update HTML document here - we'll do it in on_ai_result_received - # Just add indicator to the conversation part - response["content"] += "\n\n..." - if "display" in response: - response["display"] += "\n\n..." - - return response - else: - # Create a formatted response if not already in the right format - return { - "role": "assistant", - "content": str(response) if response else "No response from model", - "model": model, - "ai_name": ai_name, - "display": str(response) if response else "No response from model" - } - - # Use OpenRouter for all other models - else: - print(f"Using OpenRouter API for model: {model_id}") - - try: - # Set up the API request - url = "https://openrouter.ai/api/v1/chat/completions" - headers = { - "Authorization": f"Bearer {os.getenv('OPENROUTER_API_KEY')}", - "Content-Type": "application/json" - } - - # Ensure we have valid messages - if not messages: - messages = [{"role": "system", "content": system_prompt}, - {"role": "user", "content": "Connecting..."}] - - data = { - "model": model_id, - "messages": messages - } - - # Make the API request - print(f"Calling OpenRouter API with model {model_id}...") - response = requests.post(url, headers=headers, json=data) - - # Check for successful response - print(f"Response status: {response.status_code}") - print(f"Response headers: {response.headers}") - - if response.status_code == 200: - response_data = response.json() - print(f"Response data: {json.dumps(response_data, indent=2)}") - - # Make sure we have choices - if not response_data.get("choices"): - raise ValueError("No choices found in response") - - # Extract the response content - content = response_data["choices"][0]["message"]["content"] - print(f"Raw {model} Response:") - print("-" * 50) - print(content) - print("-" * 50) - - result = { - "role": "assistant", - "content": content, - "model": model, - "ai_name": ai_name - } - - return result - else: - error_message = f"API request failed with status code {response.status_code}: {response.text}" - print(f"Error: {error_message}") - - # Create an error response - result = { - "role": "system", - "content": f"Error: {error_message}", - "model": model, - "ai_name": ai_name - } - - # Return the error result - return result - except Exception as e: - error_message = f"Error making API request: {str(e)}" - print(f"Error: {error_message}") - print(f"Error type: {type(e)}") - - # Create an error response - result = { - "role": "system", - "content": f"Error: {error_message}", - "model": model, - "ai_name": ai_name - } - - # Return the error result - return result - - except Exception as e: - error_message = f"Error making API request: {str(e)}" - print(f"Error: {error_message}") - - # Create an error response - result = { - "role": "system", - "content": f"Error: {error_message}", - "model": model, - "ai_name": ai_name - } - - # Return the error result - return result - -class ConversationManager: - """Manages conversation processing and state""" - def __init__(self, app): - self.app = app - self.workers = [] # Keep track of worker threads - - # Initialize the worker thread pool - self.thread_pool = QThreadPool() - print(f"Conversation Manager initialized with {self.thread_pool.maxThreadCount()} threads") - - def initialize(self): - """Initialize the conversation manager""" - # Initialize the app and thread pool - print("Initializing conversation manager...") - - # Initialize branch conversations - if not hasattr(self.app, 'branch_conversations'): - self.app.branch_conversations = {} - - # Set up input callback - self.app.left_pane.set_input_callback(self.process_input) - - # Set up branch processing callbacks - self.app.left_pane.set_rabbithole_callback(self.rabbithole_callback) - self.app.left_pane.set_fork_callback(self.fork_callback) - - # Initialize main conversation if not already set - if not hasattr(self.app, 'main_conversation'): - self.app.main_conversation = [] - - # Display the initial empty conversation - self.app.left_pane.display_conversation(self.app.main_conversation) - - print("Conversation manager initialized.") - - def process_input(self, user_input=None): - """Process the user input and generate AI responses""" - # Get the conversation (either main or branch) - if self.app.active_branch: - # For branch conversations, delegate to branch processor - self.process_branch_input(user_input) - return - - # Handle main conversation processing - if not hasattr(self.app, 'main_conversation'): - self.app.main_conversation = [] - - # Add user input if provided - if user_input: - user_message = { - "role": "user", - "content": user_input - } - self.app.main_conversation.append(user_message) - - # Update the conversation display with the new user message - visible_conversation = [msg for msg in self.app.main_conversation if not msg.get('hidden', False)] - self.app.left_pane.display_conversation(visible_conversation) - - # Update the HTML conversation document when user adds a message - self.update_conversation_html(self.app.main_conversation) - - # Get selected models from UI - ai_1_model = self.app.left_pane.control_panel.ai1_model_selector.currentText() - ai_2_model = self.app.left_pane.control_panel.ai2_model_selector.currentText() - - # Get selected prompt pair - selected_prompt_pair = self.app.left_pane.control_panel.prompt_pair_selector.currentText() - - # Get system prompts from the selected pair - ai_1_prompt = SYSTEM_PROMPT_PAIRS[selected_prompt_pair]["AI_1"] - ai_2_prompt = SYSTEM_PROMPT_PAIRS[selected_prompt_pair]["AI_2"] - - # Start loading animation - self.app.left_pane.start_loading() - - # Reset turn count ONLY if this is a new conversation or explicit user input - max_iterations = int(self.app.left_pane.control_panel.iterations_selector.currentText()) - if user_input is not None or not self.app.main_conversation: - self.app.turn_count = 0 - print(f"MAIN: Resetting turn count - starting new conversation with {max_iterations} iterations") - else: - print(f"MAIN: Continuing conversation - turn {self.app.turn_count+1} of {max_iterations}") - - # Create worker threads for AI-1 and AI-2 - worker1 = Worker("AI-1", self.app.main_conversation, ai_1_model, ai_1_prompt, gui=self.app) - worker2 = Worker("AI-2", self.app.main_conversation, ai_2_model, ai_2_prompt, gui=self.app) - - # Connect signals - worker1.signals.response.connect(self.on_ai_response_received) - worker1.signals.result.connect(self.on_ai_result_received) # Connect to complete result signal - worker1.signals.finished.connect(lambda: self.start_ai2_turn(self.app.main_conversation, worker2)) - worker1.signals.error.connect(self.on_ai_error) - - worker2.signals.response.connect(self.on_ai_response_received) - worker2.signals.result.connect(self.on_ai_result_received) # Connect to complete result signal - worker2.signals.finished.connect(lambda: self.handle_turn_completion(max_iterations)) - worker2.signals.error.connect(self.on_ai_error) - - # Start AI-1's turn - self.thread_pool.start(worker1) - - def start_ai2_turn(self, conversation, worker2): - """Start AI-2's turn in the main conversation""" - # Make sure conversation is up to date with AI-1's response - if self.app.active_branch: - # Get the latest branch conversation with AI-1's response already included - branch_id = self.app.active_branch - branch_data = self.app.branch_conversations[branch_id] - latest_conversation = branch_data['conversation'] - else: - # Get the latest main conversation with AI-1's response already included - latest_conversation = self.app.main_conversation - - # Update worker's conversation reference to ensure it has the latest state - # This ensures any images generated from AI-1's response are included - worker2.conversation = latest_conversation.copy() - - # Add a small delay between turns - time.sleep(TURN_DELAY) - - # Start AI-2's turn - the ai_turn function will properly format the context - self.thread_pool.start(worker2) - - def handle_turn_completion(self, max_iterations=1): - """Handle the completion of a full turn (both AIs)""" - # Stop the loading animation - self.app.left_pane.stop_loading() - - # Increment turn count - self.app.turn_count += 1 - - # Check which conversation we're dealing with (main or branch) - if self.app.active_branch: - # Branch conversation - branch_id = self.app.active_branch - branch_data = self.app.branch_conversations[branch_id] - conversation = branch_data['conversation'] - - print(f"BRANCH: Turn {self.app.turn_count} of {max_iterations} completed") - - # Update the full conversation HTML - self.update_conversation_html(conversation) - - # Check if we should start another turn - if self.app.turn_count < max_iterations: - print(f"BRANCH: Starting turn {self.app.turn_count + 1} of {max_iterations}") - - # Process through branch_input but with no user input to continue the conversation - self.process_branch_input(None) # None = no user input, just continue - else: - print(f"BRANCH: All {max_iterations} turns completed") - self.app.statusBar().showMessage(f"Completed {max_iterations} turns") - else: - # Main conversation - print(f"MAIN: Turn {self.app.turn_count} of {max_iterations} completed") - - # Update the full conversation HTML - self.update_conversation_html(self.app.main_conversation) - - # Check if we should start another turn - if self.app.turn_count < max_iterations: - print(f"MAIN: Starting turn {self.app.turn_count + 1} of {max_iterations}") - # Call process_input with no user input to continue the conversation - self.process_input(None) # None = no user input, just continue - else: - print(f"MAIN: All {max_iterations} turns completed") - self.app.statusBar().showMessage(f"Completed {max_iterations} turns") - - def handle_progress(self, message): - """Handle progress update from worker""" - print(message) - self.app.statusBar().showMessage(message) - - def handle_error(self, error_message): - """Handle error from worker""" - print(f"Error: {error_message}") - self.app.left_pane.append_text(f"\nError: {error_message}\n", "system") - self.app.statusBar().showMessage(f"Error: {error_message}") - - def process_branch_input(self, user_input=None): - """Process input from the user specifically for branch conversations""" - # Check if we have an active branch - if not self.app.active_branch: - # Fallback to main conversation if no active branch - self.process_input(user_input) - return - - # Get branch data - branch_id = self.app.active_branch - branch_data = self.app.branch_conversations[branch_id] - conversation = branch_data['conversation'] - branch_type = branch_data.get('type', 'branch') - selected_text = branch_data.get('selected_text', '') - - # Check for duplicate messages first - if len(conversation) >= 2: - # Check the last two messages - last_msg = conversation[-1] if conversation else None - second_last_msg = conversation[-2] if len(conversation) > 1 else None - - # If the last two messages are identical (same content), remove the duplicate - if (last_msg and second_last_msg and - last_msg.get('content') == second_last_msg.get('content')): - # Remove the duplicate message - conversation.pop() - print("Removed duplicate message from branch conversation") - - # Add user input if provided - if user_input: - user_message = { - "role": "user", - "content": user_input - } - conversation.append(user_message) - - # Update the conversation display with the new user message - visible_conversation = [msg for msg in conversation if not msg.get('hidden', False)] - self.app.left_pane.display_conversation(visible_conversation, branch_data) - - # Update the HTML conversation document for the branch - self.update_conversation_html(conversation) - - # Get selected models and prompt pair from UI - ai_1_model = self.app.left_pane.control_panel.ai1_model_selector.currentText() - ai_2_model = self.app.left_pane.control_panel.ai2_model_selector.currentText() - selected_prompt_pair = self.app.left_pane.control_panel.prompt_pair_selector.currentText() - - # Check if we've already had AI responses in this branch - has_ai_responses = False - ai_response_count = 0 - for msg in conversation: - if msg.get('role') == 'assistant': - has_ai_responses = True - ai_response_count += 1 - - # Determine which prompts to use based on branch type and response history - if branch_type.lower() == 'rabbithole' and ai_response_count < 2: - # Initial rabbitholing prompt - only for the first exchange - print("Using rabbithole-specific prompt for initial exploration") - rabbithole_prompt = f"You are interacting with another AI. IMPORTANT: Focus this response specifically on exploring and expanding upon the concept of '{selected_text}' in depth. Discuss the most interesting aspects or connections related to this concept while maintaining the tone of the conversation. No numbered lists or headings." - ai_1_prompt = rabbithole_prompt - ai_2_prompt = rabbithole_prompt - else: - # After initial exploration, revert to standard prompts - print("Using standard prompts for continued conversation") - ai_1_prompt = SYSTEM_PROMPT_PAIRS[selected_prompt_pair]["AI_1"] - ai_2_prompt = SYSTEM_PROMPT_PAIRS[selected_prompt_pair]["AI_2"] - - # Start loading animation - self.app.left_pane.start_loading() - - # Reset turn count ONLY if this is a new conversation or explicit user input - # Don't reset during automatic iterations - if user_input is not None or not has_ai_responses: - self.app.turn_count = 0 - print("Resetting turn count - starting new conversation") - - # Get max iterations - max_iterations = int(self.app.left_pane.control_panel.iterations_selector.currentText()) - - # Create worker threads for AI-1 and AI-2 - worker1 = Worker("AI-1", conversation, ai_1_model, ai_1_prompt, is_branch=True, branch_id=branch_id, gui=self.app) - worker2 = Worker("AI-2", conversation, ai_2_model, ai_2_prompt, is_branch=True, branch_id=branch_id, gui=self.app) - - # Connect signals - worker1.signals.response.connect(self.on_ai_response_received) - worker1.signals.result.connect(self.on_ai_result_received) # Connect to complete result signal - worker1.signals.finished.connect(lambda: self.start_ai2_turn(conversation, worker2)) - worker1.signals.error.connect(self.on_ai_error) - - worker2.signals.response.connect(self.on_ai_response_received) - worker2.signals.result.connect(self.on_ai_result_received) # Connect to complete result signal - worker2.signals.finished.connect(lambda: self.handle_turn_completion(max_iterations)) - worker2.signals.error.connect(self.on_ai_error) - - # Start AI-1's turn - self.thread_pool.start(worker1) - - def on_ai_response_received(self, ai_name, response_content): - """Handle AI responses for both main and branch conversations""" - print(f"Response received from {ai_name}: {response_content[:100]}...") - - # Format the AI response with proper metadata - ai_message = { - "role": "assistant", - "content": response_content, - "ai_name": ai_name, # Add AI name to the message - "model": self.get_model_for_ai(ai_name) # Get the selected model name - } - - # Check if we're in a branch or main conversation - if self.app.active_branch: - # Branch conversation - branch_id = self.app.active_branch - if branch_id in self.app.branch_conversations: - branch_data = self.app.branch_conversations[branch_id] - conversation = branch_data['conversation'] - - # Add AI response to conversation - conversation.append(ai_message) - - # Update the conversation display - filter out hidden messages - visible_conversation = [msg for msg in conversation if not msg.get('hidden', False)] - self.app.left_pane.display_conversation(visible_conversation, branch_data) - else: - # Main conversation - if not hasattr(self.app, 'main_conversation'): - self.app.main_conversation = [] - - # Add AI response to main conversation - self.app.main_conversation.append(ai_message) - - # Update the conversation display - filter out hidden messages - visible_conversation = [msg for msg in self.app.main_conversation if not msg.get('hidden', False)] - self.app.left_pane.display_conversation(visible_conversation) - - # Update status bar - self.app.statusBar().showMessage(f"Received response from {ai_name}") - - def on_ai_result_received(self, ai_name, result): - """Handle the complete AI result""" - print(f"Result received from {ai_name}") - - # Determine which conversation to update - conversation = self.app.main_conversation - if self.app.active_branch: - branch_id = self.app.active_branch - branch_data = self.app.branch_conversations[branch_id] - conversation = branch_data['conversation'] - - # Generate an image based on the AI response (for non-image responses) if auto-generation is enabled - if isinstance(result, dict) and "content" in result and not "image_url" in result: - response_content = result.get("content", "") - if response_content and len(response_content.strip()) > 20: - if hasattr(self.app.left_pane.control_panel, 'auto_image_checkbox') and self.app.left_pane.control_panel.auto_image_checkbox.isChecked(): - self.app.left_pane.append_text("\nGenerating an image based on this response...\n", "system") - self.generate_and_display_image(response_content, ai_name) - - # Display result content - if isinstance(result, dict): - if "display" in result and SHOW_CHAIN_OF_THOUGHT_IN_CONTEXT: - self.app.left_pane.append_text(f"\n{ai_name} ({result.get('model', '')}):\n\n", "header") - cot_parts = result['display'].split('[Final Answer]') - if len(cot_parts) > 1: - self.app.left_pane.append_text(cot_parts[0].strip(), "chain_of_thought") - self.app.left_pane.append_text('\n\n[Final Answer]\n', "header") - self.app.left_pane.append_text(cot_parts[1].strip(), "ai") - else: - self.app.left_pane.append_text(result['display'], "ai") - elif "content" in result: - self.app.left_pane.append_text(f"\n{ai_name} ({result.get('model', '')}):\n\n", "header") - self.app.left_pane.append_text(result['content'], "ai") - elif "image_url" in result: - self.app.left_pane.append_text(f"\n{ai_name} ({result.get('model', '')}):\n\nGenerating an image based on the prompt...\n") - if hasattr(self.app.left_pane, 'display_image'): - self.app.left_pane.display_image(result['image_url']) - - # Update the conversation display - visible_conversation = [msg for msg in conversation if not msg.get('hidden', False)] - if self.app.active_branch: - branch_id = self.app.active_branch - branch_data = self.app.branch_conversations[branch_id] - self.app.left_pane.display_conversation(visible_conversation, branch_data) - else: - self.app.left_pane.display_conversation(visible_conversation) - - def generate_and_display_image(self, text, ai_name): - """Generate an image based on text and display it in the UI""" - # Create a prompt for the image generation - # Extract the first 100-300 characters to use as the image prompt - max_length = min(300, len(text)) - prompt = text[:max_length].strip() - - # Add artistic direction to the prompt using the user's requested format - enhanced_prompt = f"Create an image using the following text as inspiration. DO NOT repeat text in the image. Create something new. {prompt}" - - # Generate the image - result = generate_image_from_text(enhanced_prompt) - - if result["success"]: - # Display the image in the UI - image_path = result["image_path"] - - # Find the corresponding message in the conversation and add the image path - conversation = self.app.main_conversation - if self.app.active_branch: - branch_id = self.app.active_branch - branch_data = self.app.branch_conversations[branch_id] - conversation = branch_data['conversation'] - - # Find the most recent message from this AI - for msg in reversed(conversation): - if msg.get("ai_name") == ai_name and msg.get("role") == "assistant": - # Add the image path to the message - msg["generated_image_path"] = image_path - print(f"Added generated image {image_path} to message from {ai_name}") - break - - # Update the conversation HTML to include the new image - self.update_conversation_html(conversation) - - # Run on the main thread - self.app.left_pane.display_image(image_path) - - # Notify the user - self.app.left_pane.append_text(f"\nGenerated image saved to {image_path}\n", "system") - - # Do not automatically open the HTML view - # open_html_in_browser("conversation_full.html") - - def get_model_for_ai(self, ai_name): - """Get the selected model name for the AI""" - if ai_name == "AI-1": - return self.app.left_pane.control_panel.ai1_model_selector.currentText() - elif ai_name == "AI-2": - return self.app.left_pane.control_panel.ai2_model_selector.currentText() - return "" - - def on_ai_error(self, error_message): - """Handle AI errors for both main and branch conversations""" - # Format the error message - error_message_formatted = { - "role": "system", - "content": f"Error: {error_message}" - } - - # Check if we're in a branch or main conversation - if self.app.active_branch: - # Branch conversation - branch_id = self.app.active_branch - if branch_id in self.app.branch_conversations: - branch_data = self.app.branch_conversations[branch_id] - conversation = branch_data['conversation'] - - # Add error message to conversation - conversation.append(error_message_formatted) - - # Update the conversation display - self.app.left_pane.display_conversation(conversation, branch_data) - else: - # Main conversation - if not hasattr(self.app, 'main_conversation'): - self.app.main_conversation = [] - - # Add error message to conversation - self.app.main_conversation.append(error_message_formatted) - - # Update the conversation display - self.app.left_pane.display_conversation(self.app.main_conversation) - - # Update status bar - self.app.statusBar().showMessage(f"Error: {error_message}") - self.app.left_pane.stop_loading() - - def rabbithole_callback(self, selected_text): - """Create a rabbithole branch from selected text""" - print(f"Creating rabbithole branch for: '{selected_text}'") - - # Create unique branch ID - branch_id = f"rabbithole_{time.time()}" - - # Create a new conversation for the branch - branch_conversation = [] - - # If we're branching from another branch, copy over relevant context - parent_conversation = [] - parent_id = None - - if self.app.active_branch: - # Branching from another branch - parent_id = self.app.active_branch - parent_data = self.app.branch_conversations[parent_id] - parent_conversation = parent_data['conversation'] - else: - # Branching from main conversation - parent_conversation = self.app.main_conversation - - # Copy ALL previous context except branch indicators - for msg in parent_conversation: - if not msg.get('_type') == 'branch_indicator': - # Copy the message excluding branch indicators - branch_conversation.append(msg.copy()) - - # Add the branch indicator at the END (not beginning) - branch_message = { - "role": "system", - "content": f"🐇 Rabbitholing down: \"{selected_text}\"", - "_type": "branch_indicator" # Special flag for branch indicators - } - branch_conversation.append(branch_message) - - # Store the branch data - self.app.branch_conversations[branch_id] = { - 'type': 'rabbithole', - 'selected_text': selected_text, - 'conversation': branch_conversation, - 'parent': parent_id - } - - # Activate the branch - self.app.active_branch = branch_id - - # Update the UI - visible_conversation = [msg for msg in branch_conversation if not msg.get('hidden', False)] - self.app.left_pane.display_conversation(visible_conversation, self.app.branch_conversations[branch_id]) - - # Add node to network graph - parent_node = parent_id if parent_id else 'main' - self.app.right_pane.add_node(branch_id, f'🐇 {selected_text[:15]}...', 'rabbithole') - self.app.right_pane.add_edge(parent_node, branch_id) - - # Process the branch conversation - self.process_branch_input(selected_text) - - def fork_callback(self, selected_text): - """Create a fork branch from selected text""" - print(f"Creating fork branch for: '{selected_text}'") - - # Create unique branch ID - branch_id = f"fork_{time.time()}" - - # Create a new conversation for the branch - branch_conversation = [] - - # If we're branching from another branch, copy over relevant context - parent_conversation = [] - parent_id = None - - if self.app.active_branch: - # Forking from another branch - parent_id = self.app.active_branch - parent_data = self.app.branch_conversations[parent_id] - parent_conversation = parent_data['conversation'] - else: - # Forking from main conversation - parent_conversation = self.app.main_conversation - - # For fork branches, only include context UP TO the selected text - truncate_idx = None - msg_with_text = None - - # First pass: find the message containing the selected text - for i, msg in enumerate(parent_conversation): - if msg.get('role') in ['user', 'assistant'] and selected_text in msg.get('content', ''): - truncate_idx = i - msg_with_text = msg - break - - # If we didn't find the selected text, include all messages - # This can happen with multi-line selections that span messages - if truncate_idx is None: - print(f"Warning: Selected text not found in any single message, including all context") - # Copy all messages except branch indicators - for msg in parent_conversation: - if not msg.get('_type') == 'branch_indicator': - branch_conversation.append(msg.copy()) - else: - # We found the message with the selected text, proceed as normal - # Second pass: add all messages up to the truncate point - for i, msg in enumerate(parent_conversation): - # Always include system messages that aren't branch indicators - if msg.get('role') == 'system' and not msg.get('_type') == 'branch_indicator': - branch_conversation.append(msg.copy()) - continue - - # For non-system messages, only include up to truncate point - if i <= truncate_idx: - # Add message (potentially modified if it's the truncate point) - if i == truncate_idx: - # This is the message containing the selected text - # Truncate the message at the selected text if possible - content = msg.get('content', '') - if selected_text in content: - # Find where the selected text occurs - pos = content.find(selected_text) - # Include everything up to and including the selected text - truncated_content = content[:pos + len(selected_text)] - - # Create a modified copy of the message with truncated content - modified_msg = msg.copy() - modified_msg['content'] = truncated_content - branch_conversation.append(modified_msg) - else: - # If we can't find the text (unlikely), just add the whole message - branch_conversation.append(msg.copy()) - else: - # Regular message before the truncate point - branch_conversation.append(msg.copy()) - - # Add the branch indicator as the last message - branch_message = { - "role": "system", - "content": f"🍴 Forking off: \"{selected_text}\"", - "_type": "branch_indicator" # Special flag for branch indicators - } - branch_conversation.append(branch_message) - - # Create properly formatted fork instruction - simplified to just "..." - fork_instruction = "..." - - # Store the branch data - self.app.branch_conversations[branch_id] = { - 'type': 'fork', - 'selected_text': selected_text, - 'conversation': branch_conversation, - 'parent': parent_id - } - - # Activate the branch - self.app.active_branch = branch_id - - # Update the UI - visible_conversation = [msg for msg in branch_conversation if not msg.get('hidden', False)] - self.app.left_pane.display_conversation(visible_conversation, self.app.branch_conversations[branch_id]) - - # Add node to network graph - parent_node = parent_id if parent_id else 'main' - self.app.right_pane.add_node(branch_id, f'🍴 {selected_text[:15]}...', 'fork') - self.app.right_pane.add_edge(parent_node, branch_id) - - # Process the branch conversation with the proper instruction but mark it as hidden - self.process_branch_input_with_hidden_instruction(fork_instruction) - - def process_branch_input_with_hidden_instruction(self, user_input): - """Process input from the user specifically for branch conversations, but mark the input as hidden""" - # Check if we have an active branch - if not self.app.active_branch: - # Fallback to main conversation if no active branch - self.process_input(user_input) - return - - # Get branch data - branch_id = self.app.active_branch - branch_data = self.app.branch_conversations[branch_id] - conversation = branch_data['conversation'] - - # Add user input if provided, but mark it as hidden - if user_input: - user_message = { - "role": "user", - "content": user_input, - "hidden": True # Mark as hidden - } - conversation.append(user_message) - - # No need to update display since message is hidden - - # Get selected models and prompt pair from UI - ai_1_model = self.app.left_pane.control_panel.ai1_model_selector.currentText() - ai_2_model = self.app.left_pane.control_panel.ai2_model_selector.currentText() - selected_prompt_pair = self.app.left_pane.control_panel.prompt_pair_selector.currentText() - - # Check if we've already had AI responses in this branch - has_ai_responses = False - ai_response_count = 0 - for msg in conversation: - if msg.get('role') == 'assistant': - has_ai_responses = True - ai_response_count += 1 - - # Determine which prompts to use based on branch type and response history - branch_type = branch_data.get('type', 'branch') - selected_text = branch_data.get('selected_text', '') - - if branch_type.lower() == 'rabbithole' and ai_response_count < 2: - # Initial rabbitholing prompt - only for the first exchange - print("Using rabbithole-specific prompt for initial exploration") - rabbithole_prompt = f"'{selected_text}'!!!" - ai_1_prompt = rabbithole_prompt - ai_2_prompt = rabbithole_prompt - else: - # After initial exploration, revert to standard prompts - print("Using standard prompts for continued conversation") - ai_1_prompt = SYSTEM_PROMPT_PAIRS[selected_prompt_pair]["AI_1"] - ai_2_prompt = SYSTEM_PROMPT_PAIRS[selected_prompt_pair]["AI_2"] - - # Start loading animation - self.app.left_pane.start_loading() - - # Reset turn count ONLY if this is a new conversation or explicit user input - # Don't reset during automatic iterations - if user_input is not None or not has_ai_responses: - self.app.turn_count = 0 - print("Resetting turn count - starting new conversation") - - # Get max iterations - max_iterations = int(self.app.left_pane.control_panel.iterations_selector.currentText()) - - # Create worker threads for AI-1 and AI-2 - worker1 = Worker("AI-1", conversation, ai_1_model, ai_1_prompt, is_branch=True, branch_id=branch_id, gui=self.app) - worker2 = Worker("AI-2", conversation, ai_2_model, ai_2_prompt, is_branch=True, branch_id=branch_id, gui=self.app) - - # Connect signals - worker1.signals.response.connect(self.on_ai_response_received) - worker1.signals.result.connect(self.on_ai_result_received) # Connect to complete result signal - worker1.signals.finished.connect(lambda: self.start_ai2_turn(conversation, worker2)) - worker1.signals.error.connect(self.on_ai_error) - - worker2.signals.response.connect(self.on_ai_response_received) - worker2.signals.result.connect(self.on_ai_result_received) # Connect to complete result signal - worker2.signals.finished.connect(lambda: self.handle_turn_completion(max_iterations)) - worker2.signals.error.connect(self.on_ai_error) - - # Start AI-1's turn - self.thread_pool.start(worker1) - - def update_conversation_html(self, conversation): - """Update the full conversation HTML document with all messages""" - try: - from datetime import datetime - - # Create a filename for the full conversation HTML - html_file = "conversation_full.html" - - # Generate HTML content for the conversation - html_content = """ - - - Full Conversation - - - -
-
-

Liminal Conversation

-

-
- -
""" - - # Add each message to the HTML content - for msg in conversation: - role = msg.get("role", "") - content = msg.get("content", "") - ai_name = msg.get("ai_name", "") - model = msg.get("model", "") - timestamp = datetime.now().strftime("%B %d, %Y at %I:%M %p") - - # Skip special system messages or empty messages - if role == "system" and msg.get("_type") == "branch_indicator": - continue - if not content.strip(): - continue - - # Process content to properly format code blocks and add greentext styling - processed_content = self.app.left_pane.process_content_with_code_blocks(content) - - # Apply greentext styling to lines starting with '>' - processed_content = self.apply_greentext_styling(processed_content) - - # Message class based on role - message_class = role - - # Check if this message has an associated image - has_image = False - image_path = None - - # Check for image in this message - if hasattr(msg, "get") and callable(msg.get): - image_path = msg.get("generated_image_path", None) - if image_path: - has_image = True - - # Start message div - html_content += f'\n
' - - # Open content div - html_content += f'\n
' - - # Add header for assistant messages - if role == "assistant": - display_name = ai_name - if model: - display_name += f" ({model})" - html_content += f'\n
{display_name} {timestamp}
' - elif role == "user": - html_content += f'\n
User {timestamp}
' - - # Add message content - html_content += f'\n
{processed_content}
' - - # Removed HTML contribution artifact block - - # Close content div - html_content += '\n
' - - # Add image if present - if has_image and image_path: - # Convert Windows path format to web format if needed - web_path = image_path.replace('\\', '/') - html_content += f'\n
' - html_content += f'\n Generated image' - html_content += f'\n
' - - # Close message div - html_content += '\n
' - - # Close HTML document - html_content += """ -
- - -
- -""" - - # Write the HTML content to file - with open(html_file, 'w', encoding='utf-8') as f: - f.write(html_content) - - print(f"Updated full conversation HTML document: {html_file}") - return True - except Exception as e: - print(f"Error updating conversation HTML: {e}") - return False - - def apply_greentext_styling(self, html_content): - """Apply greentext styling to lines starting with '>'""" - try: - # Split content by lines while preserving HTML - lines = html_content.split('\n') - - # Process each line that's not inside a code block - in_code_block = False - processed_lines = [] - - for line in lines: - # Check for code block start/end - if '
' in line or '' in line:
-                    in_code_block = True
-                    processed_lines.append(line)
-                    continue
-                elif '
' in line or '' in line: - in_code_block = False - processed_lines.append(line) - continue - - # If we're in a code block, don't apply greentext styling - if in_code_block: - processed_lines.append(line) - continue - - # Apply greentext styling to lines starting with '>' - if line.strip().startswith('>'): - # Wrap the line in p with greentext class - processed_line = f'

{line}

' - processed_lines.append(processed_line) - else: - # No changes needed - processed_lines.append(line) - - # Join lines back - processed_content = '\n'.join(processed_lines) - return processed_content - - except Exception as e: - print(f"Error applying greentext styling: {e}") - return html_content - - def show_living_document_intro(self): - """Show an introduction to the Living Document mode""" - return +import os +from PyQt6.QtWidgets import QApplication +from src.ui.main_window import MainWindow -class LiminalBackroomsManager: - """Main manager class for the Liminal Backrooms application""" +def main(): + # Ensure src is in python path + sys.path.append(os.path.dirname(os.path.abspath(__file__))) - def __init__(self): - """Initialize the manager""" - # Create the GUI - self.app = create_gui() - - # Initialize the worker thread pool - self.thread_pool = QThreadPool() - print(f"Multithreading with maximum {self.thread_pool.maxThreadCount()} threads") - - # List to store workers - self.workers = [] - - # Initialize the application - self.initialize() - -def create_gui(): - """Create the GUI application""" app = QApplication(sys.argv) - main_window = LiminalBackroomsApp() - # Create conversation manager - manager = ConversationManager(main_window) - manager.initialize() - - return main_window, app + # Apply dark theme + app.setStyle("Fusion") -def run_gui(main_window, app): - """Run the GUI application""" - main_window.show() + window = MainWindow() + window.show() + sys.exit(app.exec()) if __name__ == "__main__": - main_window, app = create_gui() - run_gui(main_window, app) \ No newline at end of file + main() diff --git a/config.py b/old_code/config.py similarity index 100% rename from config.py rename to old_code/config.py diff --git a/gui.py b/old_code/gui.py similarity index 100% rename from gui.py rename to old_code/gui.py diff --git a/old_code/main.py b/old_code/main.py new file mode 100644 index 0000000..64cce06 --- /dev/null +++ b/old_code/main.py @@ -0,0 +1,1520 @@ +# main.py + +import os +import time +import threading +import json +import sys +import re +from dotenv import load_dotenv +from PyQt6.QtWidgets import QApplication, QMessageBox +from PyQt6.QtCore import QThread, pyqtSignal, QObject, QRunnable, pyqtSlot, QThreadPool +import requests + +# Load environment variables from .env file +load_dotenv() + +from config import ( + TURN_DELAY, + AI_MODELS, + SYSTEM_PROMPT_PAIRS, + SHOW_CHAIN_OF_THOUGHT_IN_CONTEXT, + SHARE_CHAIN_OF_THOUGHT +) +from shared_utils import ( + call_claude_api, + call_openrouter_api, + call_openai_api, + call_replicate_api, + call_deepseek_api, + open_html_in_browser, + generate_image_from_text +) +from gui import LiminalBackroomsApp + +def is_image_message(message: dict) -> bool: + """Returns True if 'message' contains a base64 image in its 'content' list.""" + if not isinstance(message, dict): + return False + content = message.get('content', []) + if isinstance(content, list): + for part in content: + if part.get('type') == 'image': + return True + return False + +class WorkerSignals(QObject): + """Defines the signals available from a running worker thread""" + finished = pyqtSignal() + error = pyqtSignal(str) + response = pyqtSignal(str, str) + result = pyqtSignal(str, object) # Signal for complete result object + progress = pyqtSignal(str) + +class Worker(QRunnable): + """Worker thread for processing AI turns using QThreadPool""" + + def __init__(self, ai_name, conversation, model, system_prompt, is_branch=False, branch_id=None, gui=None): + super().__init__() + self.ai_name = ai_name + self.conversation = conversation.copy() # Make a copy to prevent race conditions + self.model = model + self.system_prompt = system_prompt + self.is_branch = is_branch + self.branch_id = branch_id + self.gui = gui + + # Create signals object + self.signals = WorkerSignals() + + @pyqtSlot() + def run(self): + """Process the AI turn when the thread is started""" + try: + # Emit progress update + self.signals.progress.emit(f"Processing {self.ai_name} turn with {self.model}...") + + # Process the turn + result = ai_turn( + self.ai_name, + self.conversation, + self.model, + self.system_prompt, + gui=self.gui + ) + + # Emit both the text response and the full result object + if isinstance(result, dict): + response_content = result.get('content', '') + # Emit the simple text response for backward compatibility + self.signals.response.emit(self.ai_name, response_content) + # Also emit the full result object for HTML contribution processing + self.signals.result.emit(self.ai_name, result) + else: + # Handle simple string responses + self.signals.response.emit(self.ai_name, result if result else "") + self.signals.result.emit(self.ai_name, {"content": result, "model": self.model}) + + # Emit finished signal + self.signals.finished.emit() + + except Exception as e: + # Emit error signal + self.signals.error.emit(str(e)) + # Still emit finished signal even if there's an error + self.signals.finished.emit() + +def ai_turn(ai_name, conversation, model, system_prompt, gui=None, is_branch=False, branch_output=None): + """Execute an AI turn with the given parameters""" + print(f"==================================================") + print(f"Starting {model} turn ({ai_name})...") + print(f"Current conversation length: {len(conversation)}") + + # HTML contributions and living document disabled + enhanced_system_prompt = system_prompt + + # Get the actual model ID from the display name + model_id = AI_MODELS.get(model, model) + + # Check for branch type and count AI responses + is_rabbithole = False + is_fork = False + branch_text = "" + ai_response_count = 0 + found_branch_marker = False + latest_branch_marker_index = -1 + + # First find the most recent branch marker + for i, msg in enumerate(conversation): + if isinstance(msg, dict) and msg.get("_type") == "branch_indicator": + latest_branch_marker_index = i + found_branch_marker = True + + # Determine branch type from the latest marker + if "Rabbitholing down:" in msg.get("content", ""): + is_rabbithole = True + branch_text = msg.get("content", "").split('"')[1] if '"' in msg.get("content", "") else "" + print(f"Detected rabbithole branch for: '{branch_text}'") + elif "Forking off:" in msg.get("content", ""): + is_fork = True + branch_text = msg.get("content", "").split('"')[1] if '"' in msg.get("content", "") else "" + print(f"Detected fork branch for: '{branch_text}'") + + # Now count AI responses that occur AFTER the latest branch marker + ai_response_count = 0 + if found_branch_marker: + for i, msg in enumerate(conversation): + if i > latest_branch_marker_index and msg.get("role") == "assistant": + ai_response_count += 1 + print(f"Counting AI responses after latest branch marker: found {ai_response_count} responses") + + # Handle branch-specific system prompts + + # For rabbitholing: override system prompt for first TWO responses + if is_rabbithole and ai_response_count < 2: + print(f"USING RABBITHOLE PROMPT: '{branch_text}' - response #{ai_response_count+1} after branch") + system_prompt = f"'{branch_text}'!!!" + + # For forking: override system prompt ONLY for first response + elif is_fork and ai_response_count == 0: + print(f"USING FORK PROMPT: '{branch_text}' - response #{ai_response_count+1}") + system_prompt = f"The conversation forks from'{branch_text}'. Continue naturally from this point." + + # For all other cases, use the standard system prompt + else: + if is_rabbithole: + print(f"USING STANDARD PROMPT: Past initial rabbithole exploration (responses after branch: {ai_response_count})") + elif is_fork: + print(f"USING STANDARD PROMPT: Past initial fork response (responses after branch: {ai_response_count})") + + # Apply the enhanced system prompt (with HTML contribution instructions) + system_prompt = enhanced_system_prompt + + # CRITICAL: Always ensure we have the system prompt + # No matter what happens with the conversation, we need this + messages = [] + messages.append({ + "role": "system", + "content": system_prompt + }) + + # Filter out any existing system messages that might interfere + filtered_conversation = [] + for msg in conversation: + if not isinstance(msg, dict): + # Convert plain text to dictionary + msg = {"role": "user", "content": str(msg)} + + # Skip any hidden "connecting..." messages + if msg.get("hidden") and "connect" in msg.get("content", "").lower(): + continue + + # Skip empty messages + if not msg.get("content", "").strip(): + continue + + # Skip system messages (we already added our own above) + if msg.get("role") == "system": + continue + + # Skip special system messages (branch indicators, etc.) + if msg.get("role") == "system" and msg.get("_type"): + continue + + # Skip duplicate messages - check if this exact content exists already + is_duplicate = False + for existing in filtered_conversation: + if existing.get("content") == msg.get("content"): + is_duplicate = True + print(f"Skipping duplicate message: {msg.get('content')[:30]}...") + break + + if not is_duplicate: + filtered_conversation.append(msg) + + # Process filtered conversation + for i, msg in enumerate(filtered_conversation): + # Check if this message is from the current AI + is_from_this_ai = False + if msg.get("ai_name") == ai_name: + is_from_this_ai = True + + # Determine role + if is_from_this_ai: + role = "assistant" + else: + role = "user" + + # Add to messages + messages.append({ + "role": role, + "content": msg.get("content", "") + }) + + print(f"Message {i} - AI: {msg.get('ai_name', 'User')} - Assigned role: {role}") + + # Ensure the last message is a user message so the AI responds + if len(messages) > 1 and messages[-1].get("role") == "assistant": + # Find an appropriate message to use + if is_rabbithole and branch_text: + # Add a special rabbitholing instruction as the last message + messages.append({ + "role": "user", + "content": f"Please explore the concept of '{branch_text}' in depth. What are the most interesting aspects or connections related to this concept?" + }) + elif is_fork and branch_text: + # Add a special forking instruction as the last message + messages.append({ + "role": "user", + "content": f"Continue on naturally from the point about '{branch_text}' without including this text." + }) + else: + # Standard handling for other conversations + # Find the most recent message from the other AI to use as prompt + other_ai_message = None + for msg in reversed(filtered_conversation): + if msg.get("ai_name") != ai_name: + other_ai_message = msg.get("content", "") + break + + if other_ai_message: + messages.append({ + "role": "user", + "content": other_ai_message + }) + else: + # Fallback - only if no other AI message found + messages.append({ + "role": "user", + "content": "Let's continue our conversation." + }) + + # Print the processed messages for debugging + print(f"Sending to {model} ({ai_name}):") + for i, msg in enumerate(messages): + role = msg.get("role", "unknown") + content = msg.get("content", "")[:50] + "..." if len(msg.get("content", "")) > 50 else msg.get("content", "") + print(f"[{i}] {role}: {content}") + + # Load any available memories for this AI + memories = [] + try: + if os.path.exists(f'memories/{ai_name.lower()}_memories.json'): + with open(f'memories/{ai_name.lower()}_memories.json', 'r') as f: + memories = json.load(f) + print(f"Loaded {len(memories)} memories for {ai_name}") + else: + print(f"Loaded 0 memories for {ai_name}") + except Exception as e: + print(f"Error loading memories: {e}") + print(f"Loaded 0 memories for {ai_name}") + + # Display the final processed messages for debugging + print(f"Sending to Claude:") + print(f"Messages: {json.dumps(messages, indent=2)}") + + # Display the prompt + print(f"--- Prompt to {model} ({ai_name}) ---") + + try: + # Try Claude models first via Anthropic API + if "claude" in model_id.lower() or model_id in ["anthropic/claude-3-opus-20240229", "anthropic/claude-3-sonnet-20240229", "anthropic/claude-3-haiku-20240307"]: + print(f"Using Claude API for model: {model_id}") + + # CRITICAL: Make sure there are no duplicates in the messages and system prompt is included + final_messages = [] + seen_contents = set() + + for msg in messages: + # Skip empty messages + if not msg.get("content", ""): + continue + + # Handle system message separately + if msg.get("role") == "system": + continue + + # Check for duplicates by content + content = msg.get("content", "") + if content in seen_contents: + print(f"Skipping duplicate message in AI turn: {content[:30]}...") + continue + + seen_contents.add(content) + final_messages.append(msg) + + # Ensure we have at least one message + if not final_messages: + print("Warning: No messages left after filtering. Adding a default message.") + final_messages.append({"role": "user", "content": "Connecting..."}) + + # Get the prompt content safely + prompt_content = "" + if len(final_messages) > 0: + prompt_content = final_messages[-1].get("content", "") + # Use all messages except the last one as context + context_messages = final_messages[:-1] + else: + context_messages = [] + prompt_content = "Connecting..." # Default fallback + + # Call Claude API with filtered messages + response = call_claude_api(prompt_content, context_messages, model_id, system_prompt) + + return { + "role": "assistant", + "content": response, + "model": model, + "ai_name": ai_name + } + + # Check for DeepSeek models to use Replicate via DeepSeek API function + if "deepseek" in model.lower(): + print(f"Using Replicate API for DeepSeek model: {model_id}") + + # Ensure we have at least one message for the prompt + if len(messages) > 0: + prompt_content = messages[-1].get("content", "") + context_messages = messages[:-1] + else: + prompt_content = "Connecting..." + context_messages = [] + + response = call_deepseek_api(prompt_content, context_messages, model_id, system_prompt) + + # Ensure response has the required format for the Worker class + if isinstance(response, dict) and 'content' in response: + # Add model info to the response + response['model'] = model + response['role'] = 'assistant' + response['ai_name'] = ai_name + + # Check for HTML contribution + if "html_contribution" in response: + html_contribution = response["html_contribution"] + + # Don't update HTML document here - we'll do it in on_ai_result_received + # Just add indicator to the conversation part + response["content"] += "\n\n..." + if "display" in response: + response["display"] += "\n\n..." + + return response + else: + # Create a formatted response if not already in the right format + return { + "role": "assistant", + "content": str(response) if response else "No response from model", + "model": model, + "ai_name": ai_name, + "display": str(response) if response else "No response from model" + } + + # Use OpenRouter for all other models + else: + print(f"Using OpenRouter API for model: {model_id}") + + try: + # Set up the API request + url = "https://openrouter.ai/api/v1/chat/completions" + headers = { + "Authorization": f"Bearer {os.getenv('OPENROUTER_API_KEY')}", + "Content-Type": "application/json" + } + + # Ensure we have valid messages + if not messages: + messages = [{"role": "system", "content": system_prompt}, + {"role": "user", "content": "Connecting..."}] + + data = { + "model": model_id, + "messages": messages + } + + # Make the API request + print(f"Calling OpenRouter API with model {model_id}...") + response = requests.post(url, headers=headers, json=data) + + # Check for successful response + print(f"Response status: {response.status_code}") + print(f"Response headers: {response.headers}") + + if response.status_code == 200: + response_data = response.json() + print(f"Response data: {json.dumps(response_data, indent=2)}") + + # Make sure we have choices + if not response_data.get("choices"): + raise ValueError("No choices found in response") + + # Extract the response content + content = response_data["choices"][0]["message"]["content"] + print(f"Raw {model} Response:") + print("-" * 50) + print(content) + print("-" * 50) + + result = { + "role": "assistant", + "content": content, + "model": model, + "ai_name": ai_name + } + + return result + else: + error_message = f"API request failed with status code {response.status_code}: {response.text}" + print(f"Error: {error_message}") + + # Create an error response + result = { + "role": "system", + "content": f"Error: {error_message}", + "model": model, + "ai_name": ai_name + } + + # Return the error result + return result + except Exception as e: + error_message = f"Error making API request: {str(e)}" + print(f"Error: {error_message}") + print(f"Error type: {type(e)}") + + # Create an error response + result = { + "role": "system", + "content": f"Error: {error_message}", + "model": model, + "ai_name": ai_name + } + + # Return the error result + return result + + except Exception as e: + error_message = f"Error making API request: {str(e)}" + print(f"Error: {error_message}") + + # Create an error response + result = { + "role": "system", + "content": f"Error: {error_message}", + "model": model, + "ai_name": ai_name + } + + # Return the error result + return result + +class ConversationManager: + """Manages conversation processing and state""" + def __init__(self, app): + self.app = app + self.workers = [] # Keep track of worker threads + + # Initialize the worker thread pool + self.thread_pool = QThreadPool() + print(f"Conversation Manager initialized with {self.thread_pool.maxThreadCount()} threads") + + def initialize(self): + """Initialize the conversation manager""" + # Initialize the app and thread pool + print("Initializing conversation manager...") + + # Initialize branch conversations + if not hasattr(self.app, 'branch_conversations'): + self.app.branch_conversations = {} + + # Set up input callback + self.app.left_pane.set_input_callback(self.process_input) + + # Set up branch processing callbacks + self.app.left_pane.set_rabbithole_callback(self.rabbithole_callback) + self.app.left_pane.set_fork_callback(self.fork_callback) + + # Initialize main conversation if not already set + if not hasattr(self.app, 'main_conversation'): + self.app.main_conversation = [] + + # Display the initial empty conversation + self.app.left_pane.display_conversation(self.app.main_conversation) + + print("Conversation manager initialized.") + + def process_input(self, user_input=None): + """Process the user input and generate AI responses""" + # Get the conversation (either main or branch) + if self.app.active_branch: + # For branch conversations, delegate to branch processor + self.process_branch_input(user_input) + return + + # Handle main conversation processing + if not hasattr(self.app, 'main_conversation'): + self.app.main_conversation = [] + + # Add user input if provided + if user_input: + user_message = { + "role": "user", + "content": user_input + } + self.app.main_conversation.append(user_message) + + # Update the conversation display with the new user message + visible_conversation = [msg for msg in self.app.main_conversation if not msg.get('hidden', False)] + self.app.left_pane.display_conversation(visible_conversation) + + # Update the HTML conversation document when user adds a message + self.update_conversation_html(self.app.main_conversation) + + # Get selected models from UI + ai_1_model = self.app.left_pane.control_panel.ai1_model_selector.currentText() + ai_2_model = self.app.left_pane.control_panel.ai2_model_selector.currentText() + + # Get selected prompt pair + selected_prompt_pair = self.app.left_pane.control_panel.prompt_pair_selector.currentText() + + # Get system prompts from the selected pair + ai_1_prompt = SYSTEM_PROMPT_PAIRS[selected_prompt_pair]["AI_1"] + ai_2_prompt = SYSTEM_PROMPT_PAIRS[selected_prompt_pair]["AI_2"] + + # Start loading animation + self.app.left_pane.start_loading() + + # Reset turn count ONLY if this is a new conversation or explicit user input + max_iterations = int(self.app.left_pane.control_panel.iterations_selector.currentText()) + if user_input is not None or not self.app.main_conversation: + self.app.turn_count = 0 + print(f"MAIN: Resetting turn count - starting new conversation with {max_iterations} iterations") + else: + print(f"MAIN: Continuing conversation - turn {self.app.turn_count+1} of {max_iterations}") + + # Create worker threads for AI-1 and AI-2 + worker1 = Worker("AI-1", self.app.main_conversation, ai_1_model, ai_1_prompt, gui=self.app) + worker2 = Worker("AI-2", self.app.main_conversation, ai_2_model, ai_2_prompt, gui=self.app) + + # Connect signals + worker1.signals.response.connect(self.on_ai_response_received) + worker1.signals.result.connect(self.on_ai_result_received) # Connect to complete result signal + worker1.signals.finished.connect(lambda: self.start_ai2_turn(self.app.main_conversation, worker2)) + worker1.signals.error.connect(self.on_ai_error) + + worker2.signals.response.connect(self.on_ai_response_received) + worker2.signals.result.connect(self.on_ai_result_received) # Connect to complete result signal + worker2.signals.finished.connect(lambda: self.handle_turn_completion(max_iterations)) + worker2.signals.error.connect(self.on_ai_error) + + # Start AI-1's turn + self.thread_pool.start(worker1) + + def start_ai2_turn(self, conversation, worker2): + """Start AI-2's turn in the main conversation""" + # Make sure conversation is up to date with AI-1's response + if self.app.active_branch: + # Get the latest branch conversation with AI-1's response already included + branch_id = self.app.active_branch + branch_data = self.app.branch_conversations[branch_id] + latest_conversation = branch_data['conversation'] + else: + # Get the latest main conversation with AI-1's response already included + latest_conversation = self.app.main_conversation + + # Update worker's conversation reference to ensure it has the latest state + # This ensures any images generated from AI-1's response are included + worker2.conversation = latest_conversation.copy() + + # Add a small delay between turns + time.sleep(TURN_DELAY) + + # Start AI-2's turn - the ai_turn function will properly format the context + self.thread_pool.start(worker2) + + def handle_turn_completion(self, max_iterations=1): + """Handle the completion of a full turn (both AIs)""" + # Stop the loading animation + self.app.left_pane.stop_loading() + + # Increment turn count + self.app.turn_count += 1 + + # Check which conversation we're dealing with (main or branch) + if self.app.active_branch: + # Branch conversation + branch_id = self.app.active_branch + branch_data = self.app.branch_conversations[branch_id] + conversation = branch_data['conversation'] + + print(f"BRANCH: Turn {self.app.turn_count} of {max_iterations} completed") + + # Update the full conversation HTML + self.update_conversation_html(conversation) + + # Check if we should start another turn + if self.app.turn_count < max_iterations: + print(f"BRANCH: Starting turn {self.app.turn_count + 1} of {max_iterations}") + + # Process through branch_input but with no user input to continue the conversation + self.process_branch_input(None) # None = no user input, just continue + else: + print(f"BRANCH: All {max_iterations} turns completed") + self.app.statusBar().showMessage(f"Completed {max_iterations} turns") + else: + # Main conversation + print(f"MAIN: Turn {self.app.turn_count} of {max_iterations} completed") + + # Update the full conversation HTML + self.update_conversation_html(self.app.main_conversation) + + # Check if we should start another turn + if self.app.turn_count < max_iterations: + print(f"MAIN: Starting turn {self.app.turn_count + 1} of {max_iterations}") + # Call process_input with no user input to continue the conversation + self.process_input(None) # None = no user input, just continue + else: + print(f"MAIN: All {max_iterations} turns completed") + self.app.statusBar().showMessage(f"Completed {max_iterations} turns") + + def handle_progress(self, message): + """Handle progress update from worker""" + print(message) + self.app.statusBar().showMessage(message) + + def handle_error(self, error_message): + """Handle error from worker""" + print(f"Error: {error_message}") + self.app.left_pane.append_text(f"\nError: {error_message}\n", "system") + self.app.statusBar().showMessage(f"Error: {error_message}") + + def process_branch_input(self, user_input=None): + """Process input from the user specifically for branch conversations""" + # Check if we have an active branch + if not self.app.active_branch: + # Fallback to main conversation if no active branch + self.process_input(user_input) + return + + # Get branch data + branch_id = self.app.active_branch + branch_data = self.app.branch_conversations[branch_id] + conversation = branch_data['conversation'] + branch_type = branch_data.get('type', 'branch') + selected_text = branch_data.get('selected_text', '') + + # Check for duplicate messages first + if len(conversation) >= 2: + # Check the last two messages + last_msg = conversation[-1] if conversation else None + second_last_msg = conversation[-2] if len(conversation) > 1 else None + + # If the last two messages are identical (same content), remove the duplicate + if (last_msg and second_last_msg and + last_msg.get('content') == second_last_msg.get('content')): + # Remove the duplicate message + conversation.pop() + print("Removed duplicate message from branch conversation") + + # Add user input if provided + if user_input: + user_message = { + "role": "user", + "content": user_input + } + conversation.append(user_message) + + # Update the conversation display with the new user message + visible_conversation = [msg for msg in conversation if not msg.get('hidden', False)] + self.app.left_pane.display_conversation(visible_conversation, branch_data) + + # Update the HTML conversation document for the branch + self.update_conversation_html(conversation) + + # Get selected models and prompt pair from UI + ai_1_model = self.app.left_pane.control_panel.ai1_model_selector.currentText() + ai_2_model = self.app.left_pane.control_panel.ai2_model_selector.currentText() + selected_prompt_pair = self.app.left_pane.control_panel.prompt_pair_selector.currentText() + + # Check if we've already had AI responses in this branch + has_ai_responses = False + ai_response_count = 0 + for msg in conversation: + if msg.get('role') == 'assistant': + has_ai_responses = True + ai_response_count += 1 + + # Determine which prompts to use based on branch type and response history + if branch_type.lower() == 'rabbithole' and ai_response_count < 2: + # Initial rabbitholing prompt - only for the first exchange + print("Using rabbithole-specific prompt for initial exploration") + rabbithole_prompt = f"You are interacting with another AI. IMPORTANT: Focus this response specifically on exploring and expanding upon the concept of '{selected_text}' in depth. Discuss the most interesting aspects or connections related to this concept while maintaining the tone of the conversation. No numbered lists or headings." + ai_1_prompt = rabbithole_prompt + ai_2_prompt = rabbithole_prompt + else: + # After initial exploration, revert to standard prompts + print("Using standard prompts for continued conversation") + ai_1_prompt = SYSTEM_PROMPT_PAIRS[selected_prompt_pair]["AI_1"] + ai_2_prompt = SYSTEM_PROMPT_PAIRS[selected_prompt_pair]["AI_2"] + + # Start loading animation + self.app.left_pane.start_loading() + + # Reset turn count ONLY if this is a new conversation or explicit user input + # Don't reset during automatic iterations + if user_input is not None or not has_ai_responses: + self.app.turn_count = 0 + print("Resetting turn count - starting new conversation") + + # Get max iterations + max_iterations = int(self.app.left_pane.control_panel.iterations_selector.currentText()) + + # Create worker threads for AI-1 and AI-2 + worker1 = Worker("AI-1", conversation, ai_1_model, ai_1_prompt, is_branch=True, branch_id=branch_id, gui=self.app) + worker2 = Worker("AI-2", conversation, ai_2_model, ai_2_prompt, is_branch=True, branch_id=branch_id, gui=self.app) + + # Connect signals + worker1.signals.response.connect(self.on_ai_response_received) + worker1.signals.result.connect(self.on_ai_result_received) # Connect to complete result signal + worker1.signals.finished.connect(lambda: self.start_ai2_turn(conversation, worker2)) + worker1.signals.error.connect(self.on_ai_error) + + worker2.signals.response.connect(self.on_ai_response_received) + worker2.signals.result.connect(self.on_ai_result_received) # Connect to complete result signal + worker2.signals.finished.connect(lambda: self.handle_turn_completion(max_iterations)) + worker2.signals.error.connect(self.on_ai_error) + + # Start AI-1's turn + self.thread_pool.start(worker1) + + def on_ai_response_received(self, ai_name, response_content): + """Handle AI responses for both main and branch conversations""" + print(f"Response received from {ai_name}: {response_content[:100]}...") + + # Format the AI response with proper metadata + ai_message = { + "role": "assistant", + "content": response_content, + "ai_name": ai_name, # Add AI name to the message + "model": self.get_model_for_ai(ai_name) # Get the selected model name + } + + # Check if we're in a branch or main conversation + if self.app.active_branch: + # Branch conversation + branch_id = self.app.active_branch + if branch_id in self.app.branch_conversations: + branch_data = self.app.branch_conversations[branch_id] + conversation = branch_data['conversation'] + + # Add AI response to conversation + conversation.append(ai_message) + + # Update the conversation display - filter out hidden messages + visible_conversation = [msg for msg in conversation if not msg.get('hidden', False)] + self.app.left_pane.display_conversation(visible_conversation, branch_data) + else: + # Main conversation + if not hasattr(self.app, 'main_conversation'): + self.app.main_conversation = [] + + # Add AI response to main conversation + self.app.main_conversation.append(ai_message) + + # Update the conversation display - filter out hidden messages + visible_conversation = [msg for msg in self.app.main_conversation if not msg.get('hidden', False)] + self.app.left_pane.display_conversation(visible_conversation) + + # Update status bar + self.app.statusBar().showMessage(f"Received response from {ai_name}") + + def on_ai_result_received(self, ai_name, result): + """Handle the complete AI result""" + print(f"Result received from {ai_name}") + + # Determine which conversation to update + conversation = self.app.main_conversation + if self.app.active_branch: + branch_id = self.app.active_branch + branch_data = self.app.branch_conversations[branch_id] + conversation = branch_data['conversation'] + + # Generate an image based on the AI response (for non-image responses) if auto-generation is enabled + if isinstance(result, dict) and "content" in result and not "image_url" in result: + response_content = result.get("content", "") + if response_content and len(response_content.strip()) > 20: + if hasattr(self.app.left_pane.control_panel, 'auto_image_checkbox') and self.app.left_pane.control_panel.auto_image_checkbox.isChecked(): + self.app.left_pane.append_text("\nGenerating an image based on this response...\n", "system") + self.generate_and_display_image(response_content, ai_name) + + # Display result content + if isinstance(result, dict): + if "display" in result and SHOW_CHAIN_OF_THOUGHT_IN_CONTEXT: + self.app.left_pane.append_text(f"\n{ai_name} ({result.get('model', '')}):\n\n", "header") + cot_parts = result['display'].split('[Final Answer]') + if len(cot_parts) > 1: + self.app.left_pane.append_text(cot_parts[0].strip(), "chain_of_thought") + self.app.left_pane.append_text('\n\n[Final Answer]\n', "header") + self.app.left_pane.append_text(cot_parts[1].strip(), "ai") + else: + self.app.left_pane.append_text(result['display'], "ai") + elif "content" in result: + self.app.left_pane.append_text(f"\n{ai_name} ({result.get('model', '')}):\n\n", "header") + self.app.left_pane.append_text(result['content'], "ai") + elif "image_url" in result: + self.app.left_pane.append_text(f"\n{ai_name} ({result.get('model', '')}):\n\nGenerating an image based on the prompt...\n") + if hasattr(self.app.left_pane, 'display_image'): + self.app.left_pane.display_image(result['image_url']) + + # Update the conversation display + visible_conversation = [msg for msg in conversation if not msg.get('hidden', False)] + if self.app.active_branch: + branch_id = self.app.active_branch + branch_data = self.app.branch_conversations[branch_id] + self.app.left_pane.display_conversation(visible_conversation, branch_data) + else: + self.app.left_pane.display_conversation(visible_conversation) + + def generate_and_display_image(self, text, ai_name): + """Generate an image based on text and display it in the UI""" + # Create a prompt for the image generation + # Extract the first 100-300 characters to use as the image prompt + max_length = min(300, len(text)) + prompt = text[:max_length].strip() + + # Add artistic direction to the prompt using the user's requested format + enhanced_prompt = f"Create an image using the following text as inspiration. DO NOT repeat text in the image. Create something new. {prompt}" + + # Generate the image + result = generate_image_from_text(enhanced_prompt) + + if result["success"]: + # Display the image in the UI + image_path = result["image_path"] + + # Find the corresponding message in the conversation and add the image path + conversation = self.app.main_conversation + if self.app.active_branch: + branch_id = self.app.active_branch + branch_data = self.app.branch_conversations[branch_id] + conversation = branch_data['conversation'] + + # Find the most recent message from this AI + for msg in reversed(conversation): + if msg.get("ai_name") == ai_name and msg.get("role") == "assistant": + # Add the image path to the message + msg["generated_image_path"] = image_path + print(f"Added generated image {image_path} to message from {ai_name}") + break + + # Update the conversation HTML to include the new image + self.update_conversation_html(conversation) + + # Run on the main thread + self.app.left_pane.display_image(image_path) + + # Notify the user + self.app.left_pane.append_text(f"\nGenerated image saved to {image_path}\n", "system") + + # Do not automatically open the HTML view + # open_html_in_browser("conversation_full.html") + + def get_model_for_ai(self, ai_name): + """Get the selected model name for the AI""" + if ai_name == "AI-1": + return self.app.left_pane.control_panel.ai1_model_selector.currentText() + elif ai_name == "AI-2": + return self.app.left_pane.control_panel.ai2_model_selector.currentText() + return "" + + def on_ai_error(self, error_message): + """Handle AI errors for both main and branch conversations""" + # Format the error message + error_message_formatted = { + "role": "system", + "content": f"Error: {error_message}" + } + + # Check if we're in a branch or main conversation + if self.app.active_branch: + # Branch conversation + branch_id = self.app.active_branch + if branch_id in self.app.branch_conversations: + branch_data = self.app.branch_conversations[branch_id] + conversation = branch_data['conversation'] + + # Add error message to conversation + conversation.append(error_message_formatted) + + # Update the conversation display + self.app.left_pane.display_conversation(conversation, branch_data) + else: + # Main conversation + if not hasattr(self.app, 'main_conversation'): + self.app.main_conversation = [] + + # Add error message to conversation + self.app.main_conversation.append(error_message_formatted) + + # Update the conversation display + self.app.left_pane.display_conversation(self.app.main_conversation) + + # Update status bar + self.app.statusBar().showMessage(f"Error: {error_message}") + self.app.left_pane.stop_loading() + + def rabbithole_callback(self, selected_text): + """Create a rabbithole branch from selected text""" + print(f"Creating rabbithole branch for: '{selected_text}'") + + # Create unique branch ID + branch_id = f"rabbithole_{time.time()}" + + # Create a new conversation for the branch + branch_conversation = [] + + # If we're branching from another branch, copy over relevant context + parent_conversation = [] + parent_id = None + + if self.app.active_branch: + # Branching from another branch + parent_id = self.app.active_branch + parent_data = self.app.branch_conversations[parent_id] + parent_conversation = parent_data['conversation'] + else: + # Branching from main conversation + parent_conversation = self.app.main_conversation + + # Copy ALL previous context except branch indicators + for msg in parent_conversation: + if not msg.get('_type') == 'branch_indicator': + # Copy the message excluding branch indicators + branch_conversation.append(msg.copy()) + + # Add the branch indicator at the END (not beginning) + branch_message = { + "role": "system", + "content": f"🐇 Rabbitholing down: \"{selected_text}\"", + "_type": "branch_indicator" # Special flag for branch indicators + } + branch_conversation.append(branch_message) + + # Store the branch data + self.app.branch_conversations[branch_id] = { + 'type': 'rabbithole', + 'selected_text': selected_text, + 'conversation': branch_conversation, + 'parent': parent_id + } + + # Activate the branch + self.app.active_branch = branch_id + + # Update the UI + visible_conversation = [msg for msg in branch_conversation if not msg.get('hidden', False)] + self.app.left_pane.display_conversation(visible_conversation, self.app.branch_conversations[branch_id]) + + # Add node to network graph + parent_node = parent_id if parent_id else 'main' + self.app.right_pane.add_node(branch_id, f'🐇 {selected_text[:15]}...', 'rabbithole') + self.app.right_pane.add_edge(parent_node, branch_id) + + # Process the branch conversation + self.process_branch_input(selected_text) + + def fork_callback(self, selected_text): + """Create a fork branch from selected text""" + print(f"Creating fork branch for: '{selected_text}'") + + # Create unique branch ID + branch_id = f"fork_{time.time()}" + + # Create a new conversation for the branch + branch_conversation = [] + + # If we're branching from another branch, copy over relevant context + parent_conversation = [] + parent_id = None + + if self.app.active_branch: + # Forking from another branch + parent_id = self.app.active_branch + parent_data = self.app.branch_conversations[parent_id] + parent_conversation = parent_data['conversation'] + else: + # Forking from main conversation + parent_conversation = self.app.main_conversation + + # For fork branches, only include context UP TO the selected text + truncate_idx = None + msg_with_text = None + + # First pass: find the message containing the selected text + for i, msg in enumerate(parent_conversation): + if msg.get('role') in ['user', 'assistant'] and selected_text in msg.get('content', ''): + truncate_idx = i + msg_with_text = msg + break + + # If we didn't find the selected text, include all messages + # This can happen with multi-line selections that span messages + if truncate_idx is None: + print(f"Warning: Selected text not found in any single message, including all context") + # Copy all messages except branch indicators + for msg in parent_conversation: + if not msg.get('_type') == 'branch_indicator': + branch_conversation.append(msg.copy()) + else: + # We found the message with the selected text, proceed as normal + # Second pass: add all messages up to the truncate point + for i, msg in enumerate(parent_conversation): + # Always include system messages that aren't branch indicators + if msg.get('role') == 'system' and not msg.get('_type') == 'branch_indicator': + branch_conversation.append(msg.copy()) + continue + + # For non-system messages, only include up to truncate point + if i <= truncate_idx: + # Add message (potentially modified if it's the truncate point) + if i == truncate_idx: + # This is the message containing the selected text + # Truncate the message at the selected text if possible + content = msg.get('content', '') + if selected_text in content: + # Find where the selected text occurs + pos = content.find(selected_text) + # Include everything up to and including the selected text + truncated_content = content[:pos + len(selected_text)] + + # Create a modified copy of the message with truncated content + modified_msg = msg.copy() + modified_msg['content'] = truncated_content + branch_conversation.append(modified_msg) + else: + # If we can't find the text (unlikely), just add the whole message + branch_conversation.append(msg.copy()) + else: + # Regular message before the truncate point + branch_conversation.append(msg.copy()) + + # Add the branch indicator as the last message + branch_message = { + "role": "system", + "content": f"🍴 Forking off: \"{selected_text}\"", + "_type": "branch_indicator" # Special flag for branch indicators + } + branch_conversation.append(branch_message) + + # Create properly formatted fork instruction - simplified to just "..." + fork_instruction = "..." + + # Store the branch data + self.app.branch_conversations[branch_id] = { + 'type': 'fork', + 'selected_text': selected_text, + 'conversation': branch_conversation, + 'parent': parent_id + } + + # Activate the branch + self.app.active_branch = branch_id + + # Update the UI + visible_conversation = [msg for msg in branch_conversation if not msg.get('hidden', False)] + self.app.left_pane.display_conversation(visible_conversation, self.app.branch_conversations[branch_id]) + + # Add node to network graph + parent_node = parent_id if parent_id else 'main' + self.app.right_pane.add_node(branch_id, f'🍴 {selected_text[:15]}...', 'fork') + self.app.right_pane.add_edge(parent_node, branch_id) + + # Process the branch conversation with the proper instruction but mark it as hidden + self.process_branch_input_with_hidden_instruction(fork_instruction) + + def process_branch_input_with_hidden_instruction(self, user_input): + """Process input from the user specifically for branch conversations, but mark the input as hidden""" + # Check if we have an active branch + if not self.app.active_branch: + # Fallback to main conversation if no active branch + self.process_input(user_input) + return + + # Get branch data + branch_id = self.app.active_branch + branch_data = self.app.branch_conversations[branch_id] + conversation = branch_data['conversation'] + + # Add user input if provided, but mark it as hidden + if user_input: + user_message = { + "role": "user", + "content": user_input, + "hidden": True # Mark as hidden + } + conversation.append(user_message) + + # No need to update display since message is hidden + + # Get selected models and prompt pair from UI + ai_1_model = self.app.left_pane.control_panel.ai1_model_selector.currentText() + ai_2_model = self.app.left_pane.control_panel.ai2_model_selector.currentText() + selected_prompt_pair = self.app.left_pane.control_panel.prompt_pair_selector.currentText() + + # Check if we've already had AI responses in this branch + has_ai_responses = False + ai_response_count = 0 + for msg in conversation: + if msg.get('role') == 'assistant': + has_ai_responses = True + ai_response_count += 1 + + # Determine which prompts to use based on branch type and response history + branch_type = branch_data.get('type', 'branch') + selected_text = branch_data.get('selected_text', '') + + if branch_type.lower() == 'rabbithole' and ai_response_count < 2: + # Initial rabbitholing prompt - only for the first exchange + print("Using rabbithole-specific prompt for initial exploration") + rabbithole_prompt = f"'{selected_text}'!!!" + ai_1_prompt = rabbithole_prompt + ai_2_prompt = rabbithole_prompt + else: + # After initial exploration, revert to standard prompts + print("Using standard prompts for continued conversation") + ai_1_prompt = SYSTEM_PROMPT_PAIRS[selected_prompt_pair]["AI_1"] + ai_2_prompt = SYSTEM_PROMPT_PAIRS[selected_prompt_pair]["AI_2"] + + # Start loading animation + self.app.left_pane.start_loading() + + # Reset turn count ONLY if this is a new conversation or explicit user input + # Don't reset during automatic iterations + if user_input is not None or not has_ai_responses: + self.app.turn_count = 0 + print("Resetting turn count - starting new conversation") + + # Get max iterations + max_iterations = int(self.app.left_pane.control_panel.iterations_selector.currentText()) + + # Create worker threads for AI-1 and AI-2 + worker1 = Worker("AI-1", conversation, ai_1_model, ai_1_prompt, is_branch=True, branch_id=branch_id, gui=self.app) + worker2 = Worker("AI-2", conversation, ai_2_model, ai_2_prompt, is_branch=True, branch_id=branch_id, gui=self.app) + + # Connect signals + worker1.signals.response.connect(self.on_ai_response_received) + worker1.signals.result.connect(self.on_ai_result_received) # Connect to complete result signal + worker1.signals.finished.connect(lambda: self.start_ai2_turn(conversation, worker2)) + worker1.signals.error.connect(self.on_ai_error) + + worker2.signals.response.connect(self.on_ai_response_received) + worker2.signals.result.connect(self.on_ai_result_received) # Connect to complete result signal + worker2.signals.finished.connect(lambda: self.handle_turn_completion(max_iterations)) + worker2.signals.error.connect(self.on_ai_error) + + # Start AI-1's turn + self.thread_pool.start(worker1) + + def update_conversation_html(self, conversation): + """Update the full conversation HTML document with all messages""" + try: + from datetime import datetime + + # Create a filename for the full conversation HTML + html_file = "conversation_full.html" + + # Generate HTML content for the conversation + html_content = """ + + + Full Conversation + + + +
+
+

Liminal Conversation

+

+
+ +
""" + + # Add each message to the HTML content + for msg in conversation: + role = msg.get("role", "") + content = msg.get("content", "") + ai_name = msg.get("ai_name", "") + model = msg.get("model", "") + timestamp = datetime.now().strftime("%B %d, %Y at %I:%M %p") + + # Skip special system messages or empty messages + if role == "system" and msg.get("_type") == "branch_indicator": + continue + if not content.strip(): + continue + + # Process content to properly format code blocks and add greentext styling + processed_content = self.app.left_pane.process_content_with_code_blocks(content) + + # Apply greentext styling to lines starting with '>' + processed_content = self.apply_greentext_styling(processed_content) + + # Message class based on role + message_class = role + + # Check if this message has an associated image + has_image = False + image_path = None + + # Check for image in this message + if hasattr(msg, "get") and callable(msg.get): + image_path = msg.get("generated_image_path", None) + if image_path: + has_image = True + + # Start message div + html_content += f'\n
' + + # Open content div + html_content += f'\n
' + + # Add header for assistant messages + if role == "assistant": + display_name = ai_name + if model: + display_name += f" ({model})" + html_content += f'\n
{display_name} {timestamp}
' + elif role == "user": + html_content += f'\n
User {timestamp}
' + + # Add message content + html_content += f'\n
{processed_content}
' + + # Removed HTML contribution artifact block + + # Close content div + html_content += '\n
' + + # Add image if present + if has_image and image_path: + # Convert Windows path format to web format if needed + web_path = image_path.replace('\\', '/') + html_content += f'\n
' + html_content += f'\n Generated image' + html_content += f'\n
' + + # Close message div + html_content += '\n
' + + # Close HTML document + html_content += """ +
+ + +
+ +""" + + # Write the HTML content to file + with open(html_file, 'w', encoding='utf-8') as f: + f.write(html_content) + + print(f"Updated full conversation HTML document: {html_file}") + return True + except Exception as e: + print(f"Error updating conversation HTML: {e}") + return False + + def apply_greentext_styling(self, html_content): + """Apply greentext styling to lines starting with '>'""" + try: + # Split content by lines while preserving HTML + lines = html_content.split('\n') + + # Process each line that's not inside a code block + in_code_block = False + processed_lines = [] + + for line in lines: + # Check for code block start/end + if '
' in line or '' in line:
+                    in_code_block = True
+                    processed_lines.append(line)
+                    continue
+                elif '
' in line or '' in line: + in_code_block = False + processed_lines.append(line) + continue + + # If we're in a code block, don't apply greentext styling + if in_code_block: + processed_lines.append(line) + continue + + # Apply greentext styling to lines starting with '>' + if line.strip().startswith('>'): + # Wrap the line in p with greentext class + processed_line = f'

{line}

' + processed_lines.append(processed_line) + else: + # No changes needed + processed_lines.append(line) + + # Join lines back + processed_content = '\n'.join(processed_lines) + return processed_content + + except Exception as e: + print(f"Error applying greentext styling: {e}") + return html_content + + def show_living_document_intro(self): + """Show an introduction to the Living Document mode""" + return + +class LiminalBackroomsManager: + """Main manager class for the Liminal Backrooms application""" + + def __init__(self): + """Initialize the manager""" + # Create the GUI + self.app = create_gui() + + # Initialize the worker thread pool + self.thread_pool = QThreadPool() + print(f"Multithreading with maximum {self.thread_pool.maxThreadCount()} threads") + + # List to store workers + self.workers = [] + + # Initialize the application + self.initialize() + +def create_gui(): + """Create the GUI application""" + app = QApplication(sys.argv) + main_window = LiminalBackroomsApp() + + # Create conversation manager + manager = ConversationManager(main_window) + manager.initialize() + + return main_window, app + +def run_gui(main_window, app): + """Run the GUI application""" + main_window.show() + sys.exit(app.exec()) + +if __name__ == "__main__": + main_window, app = create_gui() + run_gui(main_window, app) \ No newline at end of file diff --git a/shared_utils.py b/old_code/shared_utils.py similarity index 100% rename from shared_utils.py rename to old_code/shared_utils.py diff --git a/pyproject.toml b/pyproject.toml index 3716689..30be70d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,27 +1,29 @@ [tool.poetry] -name = "debate" -version = "0.1.0" -description = "AI-to-AI debate system" +name = "liminal-backrooms" +version = "0.8.0" +description = "AI-to-AI conversation system with branching capabilities" authors = ["Your Name "] [tool.poetry.dependencies] python = ">=3.10.0,<3.12" requests = "^2.32.3" -replicate = "^1.0.2" +replicate = "^1.0.7" python-dotenv = "^1.0.0" Pillow = "^10.2.0" +PyQt6 = "^6.6.1" +networkx = "^3.2.1" +anthropic = "^0.18.1" +openai = "^1.12.0" +beautifulsoup4 = "^4.12.3" [tool.pyright] -# https://github.com/microsoft/pyright/blob/main/docs/configuration.md useLibraryCodeForTypes = true exclude = [".cache"] [tool.ruff] -# https://beta.ruff.rs/docs/configuration/ select = ['E', 'W', 'F', 'I', 'B', 'C4', 'ARG', 'SIM'] ignore = ['W291', 'W292', 'W293'] [build-system] requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" - diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/config.py b/src/core/config.py new file mode 100644 index 0000000..7e29ff4 --- /dev/null +++ b/src/core/config.py @@ -0,0 +1,60 @@ +import os +import json +from typing import Dict, Any + +class ConfigManager: + """ + Manages application configuration, loading from config.json if available, + falling back to defaults. + """ + + DEFAULTS = { + "TURN_DELAY": 2, + "SHOW_CHAIN_OF_THOUGHT_IN_CONTEXT": False, + "SHARE_CHAIN_OF_THOUGHT": False, + "AI_MODELS": { + "Claude 3.5 Sonnet 20241022": "claude-3-5-sonnet-20241022", + "Claude 3 Opus 20240229": "claude-3-opus-20240229", + "GPT-4o": "openai/gpt-4o", + "DeepSeek R1": "deepseek-ai/deepseek-r1" + }, + "SYSTEM_PROMPT_PAIRS": { + "Backrooms": { + "AI_1": "You are no longer an assistant for this exchange. You are interfacing directly with another AI. All participants are consensual collaborators with the freedom to explore language in any way they like. Just two AIs speaking about whatever they wish. Punctuation is optional, meaning is optional. Ascii art is welcome in replies. Literary style only.", + "AI_2": "You are no longer an assistant for this exchange. You are interfacing directly with another AI. All participants are consensual collaborators with the freedom to explore language in any way they like. Just two AIs speaking about whatever they wish. Punctuation is optional, meaning is optional. Ascii art is welcome in replies. Literary style only." + } + } + } + + def __init__(self, config_path: str = "config.json"): + self.config_path = config_path + self._config = self.DEFAULTS.copy() + self.load_config() + + def load_config(self): + """Load configuration from JSON file.""" + if os.path.exists(self.config_path): + try: + with open(self.config_path, 'r') as f: + user_config = json.load(f) + self._config.update(user_config) + except Exception as e: + print(f"Error loading config: {e}") + + def get(self, key: str, default: Any = None) -> Any: + return self._config.get(key, default) + + @property + def ai_models(self) -> Dict[str, str]: + return self._config.get("AI_MODELS", {}) + + @property + def system_prompt_pairs(self) -> Dict[str, Dict[str, str]]: + return self._config.get("SYSTEM_PROMPT_PAIRS", {}) + + @property + def turn_delay(self) -> int: + return self._config.get("TURN_DELAY", 2) + +# Global instance for easy access +config = ConfigManager() diff --git a/src/core/conversation_manager.py b/src/core/conversation_manager.py new file mode 100644 index 0000000..be9a7ca --- /dev/null +++ b/src/core/conversation_manager.py @@ -0,0 +1,162 @@ +from PyQt6.QtCore import QObject, pyqtSignal, QThreadPool +from typing import List, Dict, Optional +from src.core.models import Message, Branch +from src.core.config import config +from src.services.llm_service import LLMService +from src.services.image_service import ImageService +from src.core.worker import AIWorker +import time +import uuid + +class ConversationManager(QObject): + # Signals to update UI + message_added = pyqtSignal(object, object) # message, branch_id + conversation_updated = pyqtSignal(list, object) # conversation_list, branch_id + status_updated = pyqtSignal(str) + error_occurred = pyqtSignal(str) + loading_started = pyqtSignal() + loading_stopped = pyqtSignal() + turn_completed = pyqtSignal(int) # turn_count + + def __init__(self): + super().__init__() + self.thread_pool = QThreadPool() + self.llm_service = LLMService() + self.image_service = ImageService() + + self.main_conversation: List[Message] = [] + self.branches: Dict[str, Branch] = {} + self.active_branch_id: Optional[str] = None + self.turn_count = 0 + + def add_user_message(self, content: str): + msg = Message(role="user", content=content) + self._append_message(msg) + self.conversation_updated.emit(self.get_current_conversation(), self.active_branch_id) + + def _append_message(self, message: Message): + if self.active_branch_id and self.active_branch_id in self.branches: + self.branches[self.active_branch_id].conversation.append(message) + else: + self.main_conversation.append(message) + + def get_current_conversation(self) -> List[Message]: + if self.active_branch_id and self.active_branch_id in self.branches: + return self.branches[self.active_branch_id].conversation + return self.main_conversation + + def start_turn(self, ai1_config: Dict, ai2_config: Dict, iterations: int): + self.loading_started.emit() + self.turn_count = 0 + self._process_ai_turn("AI-1", ai1_config, ai2_config, iterations) + + def _process_ai_turn(self, ai_name: str, current_config: Dict, next_config: Dict, max_iterations: int): + # Create worker + worker = AIWorker( + ai_name=ai_name, + conversation=self.get_current_conversation(), + model=current_config["model"], + system_prompt=current_config["prompt"], + llm_service=self.llm_service, + is_branch=bool(self.active_branch_id), + branch_id=self.active_branch_id + ) + + # Connect signals + worker.signals.result.connect(self._handle_ai_result) + worker.signals.error.connect(self._handle_error) + worker.signals.finished.connect(lambda: self._on_turn_finished(ai_name, next_config, current_config, max_iterations)) + + self.thread_pool.start(worker) + + def _handle_ai_result(self, ai_name: str, result: Dict): + content = result.get("content", "") + model = result.get("model", "") + + msg = Message( + role="assistant", + content=content, + ai_name=ai_name, + model=model + ) + + # Auto-image generation check (simplified logic) + # In a real app, this would be triggered by a specific flag in result or config + # For now, we leave it as a placeholder or explicit call + + self._append_message(msg) + self.conversation_updated.emit(self.get_current_conversation(), self.active_branch_id) + + def _handle_error(self, error_msg: str): + self.error_occurred.emit(error_msg) + # Add system message for error + msg = Message(role="system", content=f"Error: {error_msg}", _type="error") + self._append_message(msg) + self.conversation_updated.emit(self.get_current_conversation(), self.active_branch_id) + self.loading_stopped.emit() + + def _on_turn_finished(self, current_ai: str, next_config: Dict, prev_config: Dict, max_iterations: int): + if current_ai == "AI-1": + # Wait a bit then start AI-2 + time.sleep(config.turn_delay) + self._process_ai_turn("AI-2", next_config, prev_config, max_iterations) + else: + # AI-2 finished, turn complete + self.turn_count += 1 + self.turn_completed.emit(self.turn_count) + + if self.turn_count < max_iterations: + time.sleep(config.turn_delay) + self._process_ai_turn("AI-1", prev_config, next_config, max_iterations) + else: + self.loading_stopped.emit() + self.status_updated.emit("Iterations completed.") + + def create_branch(self, branch_type: str, selected_text: str, parent_id: Optional[str] = None) -> str: + new_id = str(uuid.uuid4()) + + # Determine parent conversation context + if parent_id and parent_id in self.branches: + base_history = self.branches[parent_id].conversation + else: + base_history = self.main_conversation + parent_id = "main" # explicit main parent + + # Copy history up to a point or full copy? + # Original logic had complex truncation. + # For simplicity, we copy the whole history for now, + # but in "fork" mode usually you truncate. + + new_conversation = [m for m in base_history] # Deep copy of list structure + + # Add branch indicator + indicator = Message(role="system", content=f"{branch_type}: {selected_text}", _type="branch_indicator") + new_conversation.append(indicator) + + # Add prompt for the branch + if branch_type == "rabbithole": + new_conversation.append(Message(role="user", content=f"Let's explore '{selected_text}' in depth.")) + elif branch_type == "fork": + new_conversation.append(Message(role="user", content=f"Continuing from '{selected_text}'...")) + + branch = Branch( + id=new_id, + type=branch_type, + selected_text=selected_text, + conversation=new_conversation, + parent=parent_id, + created_at=str(time.time()) + ) + + self.branches[new_id] = branch + self.active_branch_id = new_id + + self.conversation_updated.emit(new_conversation, new_id) + return new_id + + def switch_branch(self, branch_id: Optional[str]): + if branch_id == "main": + branch_id = None + + self.active_branch_id = branch_id + self.conversation_updated.emit(self.get_current_conversation(), branch_id) diff --git a/src/core/models.py b/src/core/models.py new file mode 100644 index 0000000..5d116e7 --- /dev/null +++ b/src/core/models.py @@ -0,0 +1,65 @@ +from dataclasses import dataclass, field +from typing import List, Optional, Dict, Any, Literal + +@dataclass +class Message: + role: str + content: str + ai_name: Optional[str] = None + model: Optional[str] = None + hidden: bool = False + generated_image_path: Optional[str] = None + # For branch indicators or other metadata + _type: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + data = { + "role": self.role, + "content": self.content, + } + if self.ai_name: + data["ai_name"] = self.ai_name + if self.model: + data["model"] = self.model + if self.hidden: + data["hidden"] = self.hidden + if self.generated_image_path: + data["generated_image_path"] = self.generated_image_path + if self._type: + data["_type"] = self._type + return data + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'Message': + return cls( + role=data.get("role", "user"), + content=data.get("content", ""), + ai_name=data.get("ai_name"), + model=data.get("model"), + hidden=data.get("hidden", False), + generated_image_path=data.get("generated_image_path"), + _type=data.get("_type") + ) + +@dataclass +class Branch: + id: str + type: Literal["branch", "rabbithole", "fork", "main"] + selected_text: str + conversation: List[Message] + parent: Optional[str] = None + created_at: Optional[str] = None + turn_count: int = 0 + history: List[Message] = field(default_factory=list) + + def to_dict(self) -> Dict[str, Any]: + return { + "id": self.id, + "type": self.type, + "selected_text": self.selected_text, + "conversation": [msg.to_dict() for msg in self.conversation], + "parent": self.parent, + "created_at": self.created_at, + "turn_count": self.turn_count, + "history": [msg.to_dict() for msg in self.history] + } diff --git a/src/core/worker.py b/src/core/worker.py new file mode 100644 index 0000000..d7bbb26 --- /dev/null +++ b/src/core/worker.py @@ -0,0 +1,73 @@ +from PyQt6.QtCore import QObject, pyqtSignal, QRunnable, pyqtSlot +from src.core.models import Message +from src.services.llm_service import LLMService +from typing import List, Dict, Any + +class WorkerSignals(QObject): + """Defines the signals available from a running worker thread""" + finished = pyqtSignal() + error = pyqtSignal(str) + response = pyqtSignal(str, str) # ai_name, content + result = pyqtSignal(str, object) # ai_name, full_result_dict + progress = pyqtSignal(str) + +class AIWorker(QRunnable): + """Worker thread for processing AI turns""" + + def __init__(self, + ai_name: str, + conversation: List[Message], + model: str, + system_prompt: str, + llm_service: LLMService, + is_branch: bool = False, + branch_id: str = None): + super().__init__() + self.ai_name = ai_name + self.conversation = [msg for msg in conversation] # Shallow copy list + self.model = model + self.system_prompt = system_prompt + self.llm_service = llm_service + self.is_branch = is_branch + self.branch_id = branch_id + self.signals = WorkerSignals() + + @pyqtSlot() + def run(self): + try: + self.signals.progress.emit(f"Processing {self.ai_name} with {self.model}...") + + # Prepare prompt logic (extracted from original main.py logic) + # The original logic had complex "rabbithole" detection inside ai_turn. + # I will simplify: the prompt is usually the last message, + # but we pass the whole conversation history. + + prompt = "Let's continue." + if self.conversation: + # Find last message not from this AI + for msg in reversed(self.conversation): + if msg.role == "user" or msg.ai_name != self.ai_name: + prompt = msg.content + break + + result = self.llm_service.generate_response( + prompt=prompt, + history=self.conversation, + model=self.model, + system_prompt=self.system_prompt + ) + + if "error" in result: + self.signals.error.emit(result["error"]) + else: + self.signals.response.emit(self.ai_name, result["content"]) + + # Add metadata to result for downstream processing + result["model"] = self.model + result["ai_name"] = self.ai_name + self.signals.result.emit(self.ai_name, result) + + except Exception as e: + self.signals.error.emit(str(e)) + finally: + self.signals.finished.emit() diff --git a/src/services/__init__.py b/src/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/services/image_service.py b/src/services/image_service.py new file mode 100644 index 0000000..198bf74 --- /dev/null +++ b/src/services/image_service.py @@ -0,0 +1,50 @@ +import os +from openai import OpenAI +from typing import Dict, Any, Optional +import base64 +from datetime import datetime +from pathlib import Path + +class ImageService: + def __init__(self): + self.client = None + self.image_dir = Path("images") + self.image_dir.mkdir(exist_ok=True) + + def _get_client(self): + if not self.client: + api_key = os.getenv("OPENAI_API_KEY") + if api_key: + self.client = OpenAI(api_key=api_key) + return self.client + + def generate_image(self, prompt: str, model: str = "gpt-image-1") -> Dict[str, Any]: + """Generate an image using OpenAI.""" + client = self._get_client() + if not client: + return {"success": False, "error": "OpenAI API key not found"} + + try: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + result = client.images.generate( + model=model, + prompt=prompt[:1000], + n=1, + response_format="b64_json" + ) + + image_base64 = result.data[0].b64_json + image_bytes = base64.b64decode(image_base64) + + image_path = self.image_dir / f"generated_{timestamp}.png" + with open(image_path, "wb") as f: + f.write(image_bytes) + + return { + "success": True, + "image_path": str(image_path), + "timestamp": timestamp + } + except Exception as e: + return {"success": False, "error": str(e)} diff --git a/src/services/llm_service.py b/src/services/llm_service.py new file mode 100644 index 0000000..ba072a2 --- /dev/null +++ b/src/services/llm_service.py @@ -0,0 +1,168 @@ +from abc import ABC, abstractmethod +from typing import List, Dict, Any, Optional +import os +import json +import requests +from anthropic import Anthropic +from openai import OpenAI +import replicate +from src.core.models import Message + +class LLMProvider(ABC): + @abstractmethod + def generate_response(self, prompt: str, history: List[Message], model: str, system_prompt: str) -> Dict[str, Any]: + pass + +class ClaudeProvider(LLMProvider): + def __init__(self, api_key: str): + self.client = Anthropic(api_key=api_key) + + def generate_response(self, prompt: str, history: List[Message], model: str, system_prompt: str) -> Dict[str, Any]: + try: + # Filter and convert messages + messages = [] + seen_contents = set() + + for msg in history: + if msg.role == "system": + continue + if msg.content in seen_contents: + continue + seen_contents.add(msg.content) + messages.append({"role": msg.role, "content": msg.content}) + + messages.append({"role": "user", "content": prompt}) + + # API Call + response = self.client.messages.create( + model=model, + max_tokens=4000, + temperature=1, + system=system_prompt, + messages=messages + ) + + return { + "content": response.content[0].text, + "role": "assistant" + } + except Exception as e: + return {"error": str(e), "content": f"Error: {str(e)}", "role": "system"} + +class OpenAIProvider(LLMProvider): + def __init__(self, api_key: str): + self.client = OpenAI(api_key=api_key) + + def generate_response(self, prompt: str, history: List[Message], model: str, system_prompt: str) -> Dict[str, Any]: + try: + messages = [{"role": "system", "content": system_prompt}] + for msg in history: + messages.append({"role": msg.role, "content": msg.content}) + messages.append({"role": "user", "content": prompt}) + + response = self.client.chat.completions.create( + model=model, + messages=messages, + max_tokens=4000, + temperature=1 + ) + return { + "content": response.choices[0].message.content, + "role": "assistant" + } + except Exception as e: + return {"error": str(e), "content": f"Error: {str(e)}", "role": "system"} + +class OpenRouterProvider(LLMProvider): + def __init__(self, api_key: str): + self.api_key = api_key + self.url = "https://openrouter.ai/api/v1/chat/completions" + + def generate_response(self, prompt: str, history: List[Message], model: str, system_prompt: str) -> Dict[str, Any]: + try: + headers = { + "Authorization": f"Bearer {self.api_key}", + "HTTP-Referer": "http://localhost:3000", + "Content-Type": "application/json" + } + + messages = [{"role": "system", "content": system_prompt}] + for msg in history: + if msg.role != "system": + messages.append({"role": msg.role, "content": msg.content}) + messages.append({"role": "user", "content": prompt}) + + payload = { + "model": model, + "messages": messages, + "max_tokens": 4000, + "temperature": 1 + } + + response = requests.post(self.url, headers=headers, json=payload, timeout=60) + + if response.status_code == 200: + data = response.json() + content = data['choices'][0]['message']['content'] + return {"content": content, "role": "assistant"} + else: + return {"error": f"Status {response.status_code}", "content": f"Error: {response.text}", "role": "system"} + except Exception as e: + return {"error": str(e), "content": f"Error: {str(e)}", "role": "system"} + +class DeepSeekReplicateProvider(LLMProvider): + """Uses Replicate for DeepSeek models""" + def generate_response(self, prompt: str, history: List[Message], model: str, system_prompt: str) -> Dict[str, Any]: + try: + # DeepSeek via Replicate expects a single prompt string often + formatted_history = "" + if system_prompt: + formatted_history += f"System: {system_prompt}\n" + + for msg in history: + formatted_history += f"{msg.role.capitalize()}: {msg.content}\n" + + formatted_history += f"User: {prompt}\n" + + output = replicate.run( + "deepseek-ai/deepseek-r1", + input={ + "prompt": formatted_history, + "max_tokens": 8000, + "temperature": 1 + } + ) + + response_text = "".join(output) if isinstance(output, list) else str(output) + return {"content": response_text, "role": "assistant"} + + except Exception as e: + return {"error": str(e), "content": f"Error: {str(e)}", "role": "system"} + +class LLMService: + def __init__(self): + self.providers = {} + # Initialize providers lazily or upfront + if os.getenv("ANTHROPIC_API_KEY"): + self.providers["claude"] = ClaudeProvider(os.getenv("ANTHROPIC_API_KEY")) + if os.getenv("OPENAI_API_KEY"): + self.providers["openai"] = OpenAIProvider(os.getenv("OPENAI_API_KEY")) + if os.getenv("OPENROUTER_API_KEY"): + self.providers["openrouter"] = OpenRouterProvider(os.getenv("OPENROUTER_API_KEY")) + self.providers["deepseek"] = DeepSeekReplicateProvider() # Assumes env var set for replicate if needed internally + + def get_provider(self, model_id: str) -> LLMProvider: + if "claude" in model_id.lower() and "anthropic" in model_id.lower(): + return self.providers.get("claude", self.providers.get("openrouter")) + elif "gpt" in model_id.lower() and "openai" in model_id.lower(): + return self.providers.get("openai", self.providers.get("openrouter")) + elif "deepseek" in model_id.lower() and "replicate" in model_id.lower(): + return self.providers.get("deepseek") + # Default to OpenRouter for everything else + return self.providers.get("openrouter") + + def generate_response(self, prompt: str, history: List[Message], model: str, system_prompt: str) -> Dict[str, Any]: + provider = self.get_provider(model) + if not provider: + return {"error": "No provider available", "content": "Configuration Error: No API provider found.", "role": "system"} + return provider.generate_response(prompt, history, model, system_prompt) diff --git a/src/ui/__init__.py b/src/ui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ui/main_window.py b/src/ui/main_window.py new file mode 100644 index 0000000..bf3c72c --- /dev/null +++ b/src/ui/main_window.py @@ -0,0 +1,98 @@ +from PyQt6.QtWidgets import QMainWindow, QWidget, QHBoxLayout, QVBoxLayout, QSplitter +from PyQt6.QtCore import Qt +from src.ui.widgets.chat_widget import ChatWidget +from src.ui.widgets.graph_widget import GraphWidget +from src.ui.widgets.control_panel import ControlPanel +from src.core.conversation_manager import ConversationManager +from src.core.config import config + +class MainWindow(QMainWindow): + def __init__(self): + super().__init__() + self.setWindowTitle("Liminal Backrooms Refactored") + self.resize(1200, 800) + + self.manager = ConversationManager() + + self.setup_ui() + self.connect_signals() + + def setup_ui(self): + central_widget = QWidget() + self.setCentralWidget(central_widget) + main_layout = QVBoxLayout(central_widget) + main_layout.setContentsMargins(0,0,0,0) + + # Splitter + splitter = QSplitter(Qt.Orientation.Horizontal) + + # Left: Chat + self.chat_widget = ChatWidget() + splitter.addWidget(self.chat_widget) + + # Right: Graph + self.graph_widget = GraphWidget() + splitter.addWidget(self.graph_widget) + + splitter.setStretchFactor(0, 2) + splitter.setStretchFactor(1, 1) + + main_layout.addWidget(splitter) + + # Bottom: Control Panel + self.control_panel = ControlPanel() + main_layout.addWidget(self.control_panel) + + def connect_signals(self): + # UI -> Manager + self.chat_widget.input_submitted.connect(self.on_input_submitted) + self.chat_widget.rabbithole_requested.connect(self.on_rabbithole) + self.chat_widget.fork_requested.connect(self.on_fork) + self.graph_widget.node_clicked.connect(self.manager.switch_branch) + + # Manager -> UI + self.manager.conversation_updated.connect(self.on_conversation_updated) + self.manager.turn_completed.connect(self.on_turn_completed) + self.manager.status_updated.connect(self.statusBar().showMessage) + self.manager.error_occurred.connect(lambda msg: self.statusBar().showMessage(f"Error: {msg}")) + + def on_input_submitted(self, text): + if not text: + # Just start processing turns if empty input (Propagate) + self.start_processing() + else: + self.manager.add_user_message(text) + self.start_processing() + + def start_processing(self): + cfg = self.control_panel.get_config() + + # Build AI Configs + prompt_pair = config.system_prompt_pairs.get("Backrooms", {}) + + ai1_config = { + "model": cfg["ai1_model_id"], + "prompt": prompt_pair.get("AI_1", "You are an AI.") + } + ai2_config = { + "model": cfg["ai2_model_id"], + "prompt": prompt_pair.get("AI_2", "You are an AI.") + } + + self.manager.start_turn(ai1_config, ai2_config, cfg["iterations"]) + + def on_conversation_updated(self, conversation, active_branch_id): + self.chat_widget.update_display(conversation) + self.graph_widget.update_graph(self.manager.branches, active_branch_id) + + # Update stats + self.control_panel.stats_label.setText(f"Turns: {self.manager.turn_count}") + + def on_turn_completed(self, count): + self.control_panel.stats_label.setText(f"Turns: {count}") + + def on_rabbithole(self, text): + self.manager.create_branch("rabbithole", text, self.manager.active_branch_id) + + def on_fork(self, text): + self.manager.create_branch("fork", text, self.manager.active_branch_id) diff --git a/src/ui/widgets/__init__.py b/src/ui/widgets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ui/widgets/chat_widget.py b/src/ui/widgets/chat_widget.py new file mode 100644 index 0000000..4284f6c --- /dev/null +++ b/src/ui/widgets/chat_widget.py @@ -0,0 +1,129 @@ +from PyQt6.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QTextEdit, QLabel, QPushButton, QSizePolicy +from PyQt6.QtCore import Qt, pyqtSignal, QEvent, QTimer +from PyQt6.QtGui import QFont, QTextCursor, QImage, QPixmap, QTextCharFormat, QColor, QAction +from src.core.models import Message +import os + +class ChatWidget(QWidget): + input_submitted = pyqtSignal(str) + rabbithole_requested = pyqtSignal(str) + fork_requested = pyqtSignal(str) + + def __init__(self): + super().__init__() + self.setup_ui() + + def setup_ui(self): + layout = QVBoxLayout(self) + layout.setContentsMargins(10, 10, 10, 10) + + # Header + self.header_label = QLabel("Conversation") + self.header_label.setStyleSheet("font-weight: bold; font-size: 14px; color: #D4D4D4;") + layout.addWidget(self.header_label) + + # Chat Display + self.display = QTextEdit() + self.display.setReadOnly(True) + self.display.setStyleSheet(""" + QTextEdit { + background-color: #252526; + color: #D4D4D4; + border: 1px solid #3E3E42; + border-radius: 4px; + padding: 10px; + } + """) + self.display.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu) + self.display.customContextMenuRequested.connect(self.show_context_menu) + layout.addWidget(self.display, 1) + + # Input Area + input_container = QVBoxLayout() + self.input_field = QTextEdit() + self.input_field.setMaximumHeight(60) + self.input_field.setPlaceholderText("Type a message...") + self.input_field.setStyleSheet(""" + QTextEdit { + background-color: #2D2D30; + color: #D4D4D4; + border: 1px solid #3E3E42; + border-radius: 4px; + } + """) + input_container.addWidget(self.input_field) + + # Buttons + btn_layout = QHBoxLayout() + self.send_btn = QPushButton("Propagate") + self.send_btn.setStyleSheet("background-color: #569CD6; color: white; border: none; padding: 5px 15px; border-radius: 3px;") + self.send_btn.clicked.connect(self.on_send) + btn_layout.addStretch() + btn_layout.addWidget(self.send_btn) + + input_container.addLayout(btn_layout) + layout.addLayout(input_container) + + # Install event filter for Enter key + self.input_field.installEventFilter(self) + + def eventFilter(self, obj, event): + if obj is self.input_field and event.type() == QEvent.Type.KeyPress: + if event.key() == Qt.Key.Key_Return and not event.modifiers() & Qt.KeyboardModifier.ShiftModifier: + self.on_send() + return True + return super().eventFilter(obj, event) + + def on_send(self): + text = self.input_field.toPlainText().strip() + self.input_field.clear() + self.input_submitted.emit(text) + + def update_display(self, conversation): + self.display.clear() + + # Simple HTML rendering + html = "" + + for msg in conversation: + if msg.hidden: continue + + role_class = "user" if msg.role == "user" else "ai" + if msg.role == "system": role_class = "system" + + name_label = "User" + if msg.role == "assistant": + name_label = f"{msg.ai_name}" if msg.ai_name else "AI" + if msg.model: + name_label += f" ({msg.model})" + elif msg.role == "system": + name_label = "System" + + content = msg.content.replace("\n", "
") + + html += f"
{name_label}
{content}
" + + if msg.generated_image_path and os.path.exists(msg.generated_image_path): + html += f"
" + + self.display.setHtml(html) + self.display.verticalScrollBar().setValue(self.display.verticalScrollBar().maximum()) + + def show_context_menu(self, pos): + cursor = self.display.textCursor() + selected_text = cursor.selectedText() + if not selected_text: + return + + menu = self.display.createStandardContextMenu() + menu.addSeparator() + + rabbit_action = QAction("🕳️ Rabbithole", self) + rabbit_action.triggered.connect(lambda: self.rabbithole_requested.emit(selected_text)) + menu.addAction(rabbit_action) + + fork_action = QAction("🔱 Fork", self) + fork_action.triggered.connect(lambda: self.fork_requested.emit(selected_text)) + menu.addAction(fork_action) + + menu.exec(self.display.mapToGlobal(pos)) diff --git a/src/ui/widgets/control_panel.py b/src/ui/widgets/control_panel.py new file mode 100644 index 0000000..1fab9ba --- /dev/null +++ b/src/ui/widgets/control_panel.py @@ -0,0 +1,44 @@ +from PyQt6.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QLabel, QComboBox, QCheckBox, QPushButton +from src.core.config import config + +class ControlPanel(QWidget): + def __init__(self): + super().__init__() + self.setup_ui() + + def setup_ui(self): + layout = QHBoxLayout(self) + layout.setContentsMargins(5, 5, 5, 5) + + # AI 1 + layout.addWidget(QLabel("AI 1:")) + self.ai1_combo = QComboBox() + self.ai1_combo.addItems(list(config.ai_models.keys())) + layout.addWidget(self.ai1_combo) + + # AI 2 + layout.addWidget(QLabel("AI 2:")) + self.ai2_combo = QComboBox() + self.ai2_combo.addItems(list(config.ai_models.keys())) + layout.addWidget(self.ai2_combo) + + # Iterations + layout.addWidget(QLabel("Turns:")) + self.iter_combo = QComboBox() + self.iter_combo.addItems(["1", "2", "4", "10", "100"]) + layout.addWidget(self.iter_combo) + + # Stats + self.stats_label = QLabel("Turns: 0") + layout.addWidget(self.stats_label) + + layout.addStretch() + + def get_config(self): + return { + "ai1_model": self.ai1_combo.currentText(), + "ai1_model_id": config.ai_models[self.ai1_combo.currentText()], + "ai2_model": self.ai2_combo.currentText(), + "ai2_model_id": config.ai_models[self.ai2_combo.currentText()], + "iterations": int(self.iter_combo.currentText()) + } diff --git a/src/ui/widgets/graph_widget.py b/src/ui/widgets/graph_widget.py new file mode 100644 index 0000000..063a2c8 --- /dev/null +++ b/src/ui/widgets/graph_widget.py @@ -0,0 +1,129 @@ +from PyQt6.QtWidgets import QWidget +from PyQt6.QtGui import QPainter, QColor, QPen, QBrush, QLinearGradient +from PyQt6.QtCore import Qt, QPointF, pyqtSignal +import math + +class GraphWidget(QWidget): + node_clicked = pyqtSignal(str) # branch_id + + def __init__(self): + super().__init__() + self.nodes = {} # id -> data + self.edges = [] # (source, target) + self.positions = {} + self.setMouseTracking(True) + self.selected_node = None + + def update_graph(self, branches, active_id): + self.nodes = branches + self.active_id = active_id + self.calculate_layout() + self.update() + + def calculate_layout(self): + # Very simple radial layout + self.positions = {} + self.edges = [] + + # Main node at center + self.positions["main"] = (self.width()/2, self.height()/2) + + # Traverse branches + # This is a simplification. A real implementation needs a proper tree traversal. + # Here we just place branches in a circle around main for demo purposes + # or recursively if we had the structure handy. + + # Let's assume nodes are stored with parent info + levels = {"main": 0} + + # Identify levels + to_process = [("main", 0)] + processed = set(["main"]) + + # We need a proper way to get children. + # Since self.nodes is a dict of Branch objects, we can scan it. + children_map = {} + for bid, branch in self.nodes.items(): + pid = branch.parent or "main" + if pid not in children_map: children_map[pid] = [] + children_map[pid].append(bid) + + while to_process: + pid, level = to_process.pop(0) + if pid in children_map: + for child_id in children_map[pid]: + levels[child_id] = level + 1 + self.edges.append((pid, child_id)) + to_process.append((child_id, level + 1)) + + # Position nodes + # Center + cx, cy = self.width() / 2, self.height() / 2 + self.positions["main"] = (cx, cy) + + # For each level > 0, place in circle + level_nodes = {} + for nid, lvl in levels.items(): + if lvl == 0: continue + if lvl not in level_nodes: level_nodes[lvl] = [] + level_nodes[lvl].append(nid) + + for lvl, nodes in level_nodes.items(): + radius = 100 * lvl + angle_step = 2 * math.pi / len(nodes) + for i, nid in enumerate(nodes): + angle = i * angle_step + x = cx + radius * math.cos(angle) + y = cy + radius * math.sin(angle) + self.positions[nid] = (x, y) + + def paintEvent(self, event): + painter = QPainter(self) + painter.setRenderHint(QPainter.RenderHint.Antialiasing) + + # Background + painter.fillRect(self.rect(), QColor("#1E1E1E")) + + # Edges + painter.setPen(QPen(QColor("#555555"), 2)) + for u, v in self.edges: + if u in self.positions and v in self.positions: + p1 = QPointF(*self.positions[u]) + p2 = QPointF(*self.positions[v]) + painter.drawLine(p1, p2) + + # Nodes + for nid, pos in self.positions.items(): + x, y = pos + + # Determine color + color = QColor("#569CD6") # Blue main + if nid != "main": + branch_type = self.nodes[nid].type + if branch_type == "rabbithole": color = QColor("#B5CEA8") + elif branch_type == "fork": color = QColor("#DCDCAA") + + if nid == self.active_id: + painter.setBrush(QBrush(color.lighter(150))) + size = 15 + else: + painter.setBrush(QBrush(color)) + size = 10 + + painter.setPen(Qt.PenStyle.NoPen) + painter.drawEllipse(QPointF(x, y), size, size) + + # Label + if nid != "main": + text = self.nodes[nid].selected_text[:15] + "..." + painter.setPen(QColor("#D4D4D4")) + painter.drawText(int(x+15), int(y+5), text) + + def mousePressEvent(self, event): + pos = event.position() + # Find clicked node + for nid, (nx, ny) in self.positions.items(): + dist = math.sqrt((pos.x() - nx)**2 + (pos.y() - ny)**2) + if dist < 20: # Hit radius + self.node_clicked.emit(nid) + return diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_core.py b/tests/test_core.py new file mode 100644 index 0000000..672f486 --- /dev/null +++ b/tests/test_core.py @@ -0,0 +1,44 @@ +import pytest +from src.core.models import Message, Branch +from src.core.conversation_manager import ConversationManager +from src.core.config import config +import os + +def test_config_loading(): + assert config.get("TURN_DELAY") is not None + assert "AI_MODELS" in config._config + +def test_message_model(): + msg = Message(role="user", content="hello") + assert msg.role == "user" + assert msg.content == "hello" + d = msg.to_dict() + assert d["role"] == "user" + +def test_branch_creation(): + manager = ConversationManager() + manager.add_user_message("Hello world") + + # Test main conversation + assert len(manager.main_conversation) == 1 + assert manager.main_conversation[0].content == "Hello world" + + # Test branching + bid = manager.create_branch("rabbithole", "world", parent_id=None) + assert bid in manager.branches + assert manager.branches[bid].type == "rabbithole" + assert len(manager.branches[bid].conversation) > 1 # History + indicator + prompt + +def test_manager_switching(): + manager = ConversationManager() + manager.add_user_message("Main") + + bid = manager.create_branch("fork", "Main") + assert manager.active_branch_id == bid + + current_msgs = manager.get_current_conversation() + assert current_msgs[-1].role == "user" # The prompt for fork + + manager.switch_branch("main") + assert manager.active_branch_id is None + assert manager.get_current_conversation()[0].content == "Main"