diff --git a/src/scope/core/events/__init__.py b/src/scope/core/events/__init__.py new file mode 100644 index 000000000..ceb248e33 --- /dev/null +++ b/src/scope/core/events/__init__.py @@ -0,0 +1,23 @@ +"""Event processing framework for discrete, async operations. + +This module provides a base class for event-driven processors that +handle discrete operations asynchronously without blocking the main +pipeline. + +Unlike frame processors (continuous, every frame), event processors +are triggered by discrete events like prompt changes or image uploads. +""" + +from .processor import ( + EventProcessor, + ProcessorConfig, + ProcessorResult, + ProcessorState, +) + +__all__ = [ + "EventProcessor", + "ProcessorConfig", + "ProcessorResult", + "ProcessorState", +] diff --git a/src/scope/core/events/processor.py b/src/scope/core/events/processor.py new file mode 100644 index 000000000..038867d33 --- /dev/null +++ b/src/scope/core/events/processor.py @@ -0,0 +1,242 @@ +"""Base class for event-driven async processors. + +Event processors handle discrete, triggered operations (as opposed to +continuous frame processing). They run asynchronously and don't block +the main pipeline. + +Examples: + - Prompt enhancement (text → text) + - Image generation (text → image) + - Captioning (image → text) + - Style extraction (image → embeddings) +""" + +import logging +import threading +from abc import ABC, abstractmethod +from collections.abc import Callable +from dataclasses import dataclass +from enum import Enum +from typing import Generic, TypeVar + +logger = logging.getLogger(__name__) + +# Type variables for input and output +TInput = TypeVar("TInput") +TOutput = TypeVar("TOutput") + + +class ProcessorState(Enum): + """Current state of an event processor.""" + + IDLE = "idle" + PROCESSING = "processing" + CANCELLED = "cancelled" + + +@dataclass +class ProcessorResult(Generic[TOutput]): + """Result from an event processor.""" + + success: bool + output: TOutput | None = None + error: str | None = None + cancelled: bool = False + + +@dataclass +class ProcessorConfig: + """Configuration for an event processor.""" + + cancel_on_new: bool = True # Cancel in-flight work when new event arrives + timeout: float | None = None # Optional timeout in seconds + name: str = "EventProcessor" # Name for logging + + +class EventProcessor(ABC, Generic[TInput, TOutput]): + """Base class for discrete, async event processing. + + Subclasses implement `process()` to handle events. The processor + runs work in a background thread and calls the callback when done. + + Features: + - Non-blocking: doesn't block the main pipeline + - Cancellable: can cancel in-flight work when new event arrives + - Thread-safe: safe to call submit() from any thread + - Independent: multiple processors can run concurrently + + Example: + class PromptEnhancer(EventProcessor[str, str]): + def process(self, prompt: str) -> str: + return enhance_with_llm(prompt) + + enhancer = PromptEnhancer() + enhancer.submit("a cat", callback=lambda r: print(r.output)) + """ + + def __init__(self, config: ProcessorConfig | None = None): + """Initialize the event processor. + + Args: + config: Optional configuration. Uses defaults if not provided. + """ + self.config = config or ProcessorConfig() + self._thread: threading.Thread | None = None + self._cancel_requested = threading.Event() + self._lock = threading.Lock() + self._state = ProcessorState.IDLE + self._current_callback: Callable[[ProcessorResult[TOutput]], None] | None = None + + # Track the latest result for polling (alternative to callbacks) + self._latest_result: ProcessorResult[TOutput] | None = None + self._result_ready = threading.Event() + + @property + def state(self) -> ProcessorState: + """Current processor state.""" + with self._lock: + return self._state + + @property + def is_processing(self) -> bool: + """Whether the processor is currently working.""" + return self.state == ProcessorState.PROCESSING + + @property + def latest_result(self) -> ProcessorResult[TOutput] | None: + """Get the latest result (for polling instead of callbacks).""" + with self._lock: + return self._latest_result + + def submit( + self, + event: TInput, + callback: Callable[[ProcessorResult[TOutput]], None] | None = None, + ) -> bool: + """Submit an event for async processing. + + If cancel_on_new is True and there's work in flight, it will be + cancelled before starting the new work. + + Args: + event: The input event to process. + callback: Optional callback when processing completes. + Called with ProcessorResult containing output or error. + + Returns: + True if submitted, False if rejected. + """ + with self._lock: + # Cancel in-flight work if configured + if self.config.cancel_on_new and self._thread and self._thread.is_alive(): + logger.debug( + f"{self.config.name}: Cancelling in-flight work for new event" + ) + self._cancel_requested.set() + # Don't wait for thread - let it finish in background + + # Reset state + self._cancel_requested.clear() + self._result_ready.clear() + self._state = ProcessorState.PROCESSING + self._current_callback = callback + + # Start worker thread + self._thread = threading.Thread( + target=self._worker, + args=(event,), + daemon=True, + name=f"{self.config.name}-worker", + ) + self._thread.start() + + logger.debug(f"{self.config.name}: Submitted event for processing") + return True + + def cancel(self) -> bool: + """Request cancellation of in-flight work. + + Returns: + True if cancellation was requested, False if nothing to cancel. + """ + with self._lock: + if self._thread and self._thread.is_alive(): + self._cancel_requested.set() + self._state = ProcessorState.CANCELLED + logger.debug(f"{self.config.name}: Cancellation requested") + return True + return False + + def wait(self, timeout: float | None = None) -> ProcessorResult[TOutput] | None: + """Wait for the current processing to complete. + + Args: + timeout: Max seconds to wait. None = wait forever. + + Returns: + The result, or None if timeout. + """ + if self._result_ready.wait(timeout=timeout): + return self._latest_result + return None + + def _worker(self, event: TInput) -> None: + """Worker thread that runs the processing.""" + result: ProcessorResult[TOutput] + + try: + # Check for early cancellation + if self._cancel_requested.is_set(): + result = ProcessorResult(success=False, cancelled=True) + else: + # Run the actual processing + output = self.process(event) + + # Check if cancelled during processing + if self._cancel_requested.is_set(): + result = ProcessorResult(success=False, cancelled=True) + else: + result = ProcessorResult(success=True, output=output) + + except Exception as e: + logger.warning(f"{self.config.name}: Processing failed: {e}") + result = ProcessorResult(success=False, error=str(e)) + + # Store result and update state + with self._lock: + if not self._cancel_requested.is_set(): + self._latest_result = result + self._state = ProcessorState.IDLE + self._result_ready.set() + + # Call callback if provided + if self._current_callback: + try: + self._current_callback(result) + except Exception as e: + logger.error(f"{self.config.name}: Callback error: {e}") + + @abstractmethod + def process(self, event: TInput) -> TOutput: + """Process an event. Override in subclass. + + This method runs in a background thread. It should: + - Be thread-safe + - Check self._cancel_requested periodically for long operations + - Raise exceptions on error (they'll be caught and reported) + + Args: + event: The input to process. + + Returns: + The processed output. + """ + raise NotImplementedError + + def check_cancelled(self) -> bool: + """Check if cancellation was requested. Call periodically in process(). + + Returns: + True if should stop processing. + """ + return self._cancel_requested.is_set() diff --git a/src/scope/core/plugins/__init__.py b/src/scope/core/plugins/__init__.py index 03b4ac6f5..a001be0a3 100644 --- a/src/scope/core/plugins/__init__.py +++ b/src/scope/core/plugins/__init__.py @@ -1,6 +1,19 @@ """Plugin system for Scope.""" from .hookspecs import hookimpl -from .manager import load_plugins, pm, register_plugin_pipelines +from .manager import ( + get_event_processor, + load_plugins, + pm, + register_plugin_event_processors, + register_plugin_pipelines, +) -__all__ = ["hookimpl", "load_plugins", "pm", "register_plugin_pipelines"] +__all__ = [ + "get_event_processor", + "hookimpl", + "load_plugins", + "pm", + "register_plugin_event_processors", + "register_plugin_pipelines", +] diff --git a/src/scope/core/plugins/hookspecs.py b/src/scope/core/plugins/hookspecs.py index 62ce4f672..7ca6308ad 100644 --- a/src/scope/core/plugins/hookspecs.py +++ b/src/scope/core/plugins/hookspecs.py @@ -18,7 +18,24 @@ def register_pipelines(self, register): Usage: register(PipelineClass) Example: - @scope.core.hookimpl - def register_pipelines(register): + @hookimpl + def register_pipelines(self, register): register(MyPipeline) """ + + @hookspec + def register_event_processors(self, register): + """Register event processors for discrete async operations. + + Event processors handle triggered operations (as opposed to continuous + frame processing). They run asynchronously and don't block pipelines. + + Args: + register: Callback to register processors. + Usage: register(name: str, processor: EventProcessor) + + Example: + @hookimpl + def register_event_processors(self, register): + register("prompt_enhancer", MyPromptEnhancer()) + """ diff --git a/src/scope/core/plugins/manager.py b/src/scope/core/plugins/manager.py index 9a724a42f..2cae7c58a 100644 --- a/src/scope/core/plugins/manager.py +++ b/src/scope/core/plugins/manager.py @@ -1,17 +1,27 @@ """Plugin manager for discovering and loading Scope plugins.""" +from __future__ import annotations + import logging +from typing import TYPE_CHECKING import pluggy from .hookspecs import ScopeHookSpec +if TYPE_CHECKING: + from scope.core.events import EventProcessor + logger = logging.getLogger(__name__) # Create the plugin manager singleton pm = pluggy.PluginManager("scope") pm.add_hookspecs(ScopeHookSpec) +# Event processor registry +_event_processors: dict[str, EventProcessor] = {} +_event_processors_loaded = False + def load_plugins(): """Discover and load all plugins via entry points.""" @@ -34,3 +44,37 @@ def register_callback(pipeline_class): logger.info(f"Registered plugin pipeline: {pipeline_id}") pm.hook.register_pipelines(register=register_callback) + + +def register_plugin_event_processors(): + """Call register_event_processors hook for all plugins. + + This populates the event processor registry with processors from plugins. + """ + global _event_processors_loaded + if _event_processors_loaded: + return + + def register_callback(name: str, processor: EventProcessor): + """Callback function passed to plugins.""" + _event_processors[name] = processor + logger.info(f"Registered event processor: {name}") + + pm.hook.register_event_processors(register=register_callback) + _event_processors_loaded = True + + +def get_event_processor(name: str) -> EventProcessor | None: + """Get a registered event processor by name. + + Args: + name: The processor name (e.g., "prompt_enhancer", "image_generator") + + Returns: + The registered EventProcessor, or None if not found. + """ + # Ensure event processors are loaded + if not _event_processors_loaded: + register_plugin_event_processors() + + return _event_processors.get(name) diff --git a/src/scope/server/pipeline_processor.py b/src/scope/server/pipeline_processor.py index 1f27546a2..4cba911f3 100644 --- a/src/scope/server/pipeline_processor.py +++ b/src/scope/server/pipeline_processor.py @@ -84,6 +84,12 @@ def __init__( "vace_use_input_video", True ) + # Prompt enhancement - async processor with cache + self._last_prompts_hash: int | None = None + self._enhanced_prompts: list | None = None + self._prompt_enhancer = None # EventProcessor from plugin registry + self._enhancement_lock = threading.Lock() # Protects enhanced prompts cache + def _resize_output_queue(self, target_size: int): """Resize the output queue to the target size, transferring existing frames. @@ -189,6 +195,56 @@ def update_parameters(self, parameters: dict[str, Any]): ) return False + def _get_prompt_enhancer(self): + """Get the prompt enhancer from the plugin registry.""" + if self._prompt_enhancer is None: + from scope.core.plugins import get_event_processor + + self._prompt_enhancer = get_event_processor("prompt_enhancer") + return self._prompt_enhancer + + def _trigger_async_enhancement(self, prompts: list, prompts_hash: int) -> None: + """Trigger async prompt enhancement without blocking. + + Args: + prompts: List of prompt items to enhance. + prompts_hash: Hash of prompts for cache validation. + """ + enhancer = self._get_prompt_enhancer() + if enhancer is None: + logger.debug( + "[ENHANCE] No prompt enhancer registered, using original prompts" + ) + return + + # Create request as dict (plugin accepts dict or its own request type) + request = { + "prompts": prompts, + "prompts_hash": prompts_hash, + "context": { + "pipeline_id": self.pipeline_id, + "mode": "video" if self._video_mode else "text", + }, + } + + def on_complete(result): + """Callback when enhancement finishes.""" + if result.success and result.output: + with self._enhancement_lock: + # Only update if this result matches current prompts + if result.output.prompts_hash == self._last_prompts_hash: + self._enhanced_prompts = result.output.enhanced_prompts + logger.info( + f"[ENHANCE] Prompts enhanced for {self.pipeline_id}" + ) + elif result.cancelled: + logger.debug("[ENHANCE] Enhancement cancelled") + else: + logger.warning(f"[ENHANCE] Enhancement failed: {result.error}") + + enhancer.submit(request, callback=on_complete) + logger.info(f"[ENHANCE] Submitted enhancement for {self.pipeline_id}") + def worker_loop(self): """Main worker loop that processes frames.""" logger.info(f"Worker thread started for pipeline: {self.pipeline_id}") @@ -358,6 +414,26 @@ def process_chunk(self): # Pass parameters (excluding prepare-only parameters) call_params = dict(self.parameters.items()) + # Trigger async prompt enhancement when prompts change + if "prompts" in call_params and call_params["prompts"]: + prompts_hash = hash(str(call_params["prompts"])) + + if prompts_hash != self._last_prompts_hash: + # Prompts changed - trigger async enhancement + logger.info( + f"[ENHANCE] Prompts changed (old_hash={self._last_prompts_hash}, new_hash={prompts_hash})" + ) + self._trigger_async_enhancement( + call_params["prompts"], prompts_hash + ) + # Update hash to prevent re-triggering + self._last_prompts_hash = prompts_hash + + # Use cached enhanced prompts if available, otherwise use original + with self._enhancement_lock: + if self._enhanced_prompts is not None: + call_params["prompts"] = self._enhanced_prompts + # Pass reset_cache as init_cache to pipeline call_params["init_cache"] = not self.is_prepared if reset_cache is not None: