diff --git a/pyproject.toml b/pyproject.toml index 8843395..0420075 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,6 @@ dependencies = [ "optimum[openvino]>1.26.1", "pip>=25.2", "pybind11>=3.0.3", - "gpu-metrics", "pydantic>=2.11.7", "pynput>=1.8.1", "pytest>=8.4.2", @@ -60,7 +59,6 @@ url = "https://download.pytorch.org/whl/cpu" explicit = true [tool.uv.sources] -gpu-metrics = { path = "gpu-metrics" } torch = { index = "pytorch-cpu" } torchvision = { index = "pytorch-cpu" } diff --git a/setup.bat b/setup.bat index cb8e8b8..b9c8f11 100644 --- a/setup.bat +++ b/setup.bat @@ -9,20 +9,33 @@ if %errorlevel% neq 0 ( set "PATH=%USERPROFILE%\.local\bin;%PATH%" ) +uv sync +call .venv\Scripts\activate.bat + +uv pip install "optimum-intel[openvino] @ git+https://github.com/huggingface/optimum-intel" +uv pip install --pre -U openvino-genai --extra-index-url https://storage.openvinotoolkit.org/simple/wheels/nightly + +echo checking for Intel oneAPI... if not exist "C:\Program Files (x86)\Intel\oneAPI\setvars.bat" ( - echo ERROR: Intel oneAPI not found. + echo Warning: Intel oneAPI not found. Skipping gpu-metrics install. echo install from https://www.intel.com/content/www/us/en/developer/tools/oneapi/base-toolkit.html - pause - exit /b 1 + goto :skip_gpu_metrics ) call "C:\Program Files (x86)\Intel\oneAPI\setvars.bat" intel64 --force -uv sync -call .venv\Scripts\activate.bat +if defined LEVEL_ZERO_V1_SDK_PATH ( + echo %INCLUDE% | findstr /I /C:"%LEVEL_ZERO_V1_SDK_PATH%\include" >nul || set "INCLUDE=%INCLUDE%;%LEVEL_ZERO_V1_SDK_PATH%\include" + echo %LIB% | findstr /I /C:"%LEVEL_ZERO_V1_SDK_PATH%\lib" >nul || set "LIB=%LIB%;%LEVEL_ZERO_V1_SDK_PATH%\lib" +) -uv pip install "optimum-intel[openvino] @ git+https://github.com/huggingface/optimum-intel" -uv pip install --pre -U openvino-genai --extra-index-url https://storage.openvinotoolkit.org/simple/wheels/nightly +echo installing gpu-metrics (soft dependency)... +uv pip install ./gpu-metrics +if %errorlevel% neq 0 ( + echo Warning: gpu-metrics build failed. Intel GPU telemetry will be unavailable. +) + +:skip_gpu_metrics set /p set_key="set OPENARC_API_KEY? (y/N): " if /I not "%set_key%"=="y" goto :skip_key @@ -32,4 +45,4 @@ setx OPENARC_API_KEY "%api_key%" :skip_key openarc --help -pause \ No newline at end of file +pause diff --git a/setup.sh b/setup.sh index 492e319..db0b6fd 100644 --- a/setup.sh +++ b/setup.sh @@ -9,20 +9,23 @@ if ! command -v uv &> /dev/null; then export PATH="$HOME/.local/bin:$PATH" fi -if [ ! -f "/opt/intel/oneapi/setvars.sh" ]; then - echo "ERROR: Intel oneAPI not found." - echo "install from https://www.intel.com/content/www/us/en/developer/tools/oneapi/base-toolkit.html" - exit 1 -fi - -source /opt/intel/oneapi/setvars.sh intel64 --force - uv sync source .venv/bin/activate uv pip install "optimum-intel[openvino] @ git+https://github.com/huggingface/optimum-intel" uv pip install --pre -U openvino-genai --extra-index-url https://storage.openvinotoolkit.org/simple/wheels/nightly +echo "checking for Intel oneAPI..." +if [ ! -f "/opt/intel/oneapi/setvars.sh" ]; then + echo "Warning: Intel oneAPI not found. Skipping gpu-metrics install." + echo "install from https://www.intel.com/content/www/us/en/developer/tools/oneapi/base-toolkit.html" +else + source /opt/intel/oneapi/setvars.sh intel64 --force + + echo "installing gpu-metrics (soft dependency)..." + uv pip install ./gpu-metrics || echo "Warning: gpu-metrics build failed. Intel GPU telemetry will be unavailable." +fi + read -p "set OPENARC_API_KEY? (y/N): " set_key if [[ "$set_key" =~ ^[Yy]$ ]]; then read -p "key (default: openarc-api-key): " api_key diff --git a/src/cli/groups/serve.py b/src/cli/groups/serve.py index dd45aa9..8856762 100644 --- a/src/cli/groups/serve.py +++ b/src/cli/groups/serve.py @@ -21,9 +21,9 @@ def serve(): help=""" - Host to bind the server to """) -@click.option("--port", - type=int, - default=8000, +@click.option("--port", + type=int, + default=8000, show_default=True, help=""" - Port to bind the server to @@ -33,43 +33,45 @@ def serve(): help="Load models on startup. Specify once followed by space-separated model names.") @click.option("--use-api-key", is_flag=True, default=False, help="Require OPENARC_API_KEY for all requests.") +@click.option("-v", "--verbose", count=True, default=0, + help="Increase verbosity: -v for warnings, -vv for info, -vvv for full access logs.") @click.argument('startup_models', nargs=-1, required=False) @click.pass_context -def start(ctx, host, port, load_models, use_api_key, startup_models): +def start(ctx, host, port, load_models, use_api_key, verbose, startup_models): """ - 'start' reads --host and --port from config or defaults to 0.0.0.0:8000 - + Examples: openarc serve start openarc serve start --load-models model1 model2 openarc serve start --lm Dolphin-X1 kokoro whisper """ from ..modules.launch_server import start_server - + # Save server configuration for other CLI commands to use config_path = ctx.obj.server_config.save_server_config(host, port) console.print(f"[dim]Configuration saved to: {config_path}[/dim]") - + # Handle startup models models_to_load = [] if load_models: models_to_load.append(load_models) if startup_models: models_to_load.extend(startup_models) - + if models_to_load: saved_model_names = ctx.obj.server_config.get_model_names() missing = [m for m in models_to_load if m not in saved_model_names] - + if missing: console.print("[yellow]Warning: Models not in config (will be skipped):[/yellow]") for m in missing: console.print(f" • {m}") console.print("[dim]Use 'openarc list' to see saved configurations.[/dim]\n") - + os.environ["OPENARC_STARTUP_MODELS"] = ",".join(models_to_load) console.print(f"[blue]Models to load on startup:[/blue] {', '.join(models_to_load)}\n") - + if use_api_key: if not os.getenv("OPENARC_API_KEY"): console.print("[red]Error: You chose to require an API key but OPENARC_API_KEY has not been set.[/red]") @@ -81,4 +83,4 @@ def start(ctx, host, port, load_models, use_api_key, startup_models): console.print("[blue]OPENARC_API_KEY_REQUIRED=[/blue][yellow]False[/yellow] [dim][Clients do not need to authenticate.][/dim]") console.print(f"[green]Starting OpenArc server on {host}:{port}[/green]") - start_server(host=host, port=port) + start_server(host=host, port=port, verbose=verbose) diff --git a/src/cli/modules/launch_server.py b/src/cli/modules/launch_server.py index b1a9203..6d7bacd 100644 --- a/src/cli/modules/launch_server.py +++ b/src/cli/modules/launch_server.py @@ -5,111 +5,125 @@ # Configure logging log_file = Path(__file__).parent.parent.parent.parent / "openarc.log" -# Create a custom logging configuration for uvicorn -LOG_CONFIG = { - "version": 1, - "disable_existing_loggers": False, - "formatters": { - "default": { - "format": "%(asctime)s - %(levelname)s - %(message)s", - }, - "access": { - "format": "%(asctime)s - %(levelname)s - %(message)s", - }, - }, - "handlers": { - "default": { - "formatter": "default", - "class": "logging.StreamHandler", - "stream": "ext://sys.stderr", - }, - "file": { - "formatter": "default", - "class": "logging.FileHandler", - "filename": str(log_file), - }, - "access": { - "formatter": "access", - "class": "logging.StreamHandler", - "stream": "ext://sys.stdout", +def _level_from_verbose(verbose: int) -> str: + if verbose >= 2: + return "INFO" + if verbose == 1: + return "WARNING" + return "ERROR" + + +def _build_log_config(verbose: int): + app_level = _level_from_verbose(verbose) + access_level = "INFO" if verbose >= 3 else "WARNING" + + return { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "default": { + "format": "%(asctime)s - %(levelname)s - %(message)s", + }, + "access": { + "format": "%(asctime)s - %(levelname)s - %(message)s", + }, }, - "access_file": { - "formatter": "access", - "class": "logging.FileHandler", - "filename": str(log_file), + "handlers": { + "default": { + "formatter": "default", + "class": "logging.StreamHandler", + "stream": "ext://sys.stderr", + }, + "file": { + "formatter": "default", + "class": "logging.FileHandler", + "filename": str(log_file), + }, + "access": { + "formatter": "access", + "class": "logging.StreamHandler", + "stream": "ext://sys.stdout", + }, + "access_file": { + "formatter": "access", + "class": "logging.FileHandler", + "filename": str(log_file), + }, }, - }, - "loggers": { - "uvicorn": { - "handlers": ["default", "file"], - "level": "INFO", - "propagate": False, + "loggers": { + "uvicorn": { + "handlers": ["default", "file"], + "level": "INFO", + "propagate": False, + }, + "uvicorn.error": { + "level": "INFO", + "handlers": ["default", "file"], + "propagate": False, + }, + "uvicorn.access": { + "handlers": ["access", "access_file"], + "level": access_level, + "propagate": False, + }, + "openarc.access": { + "handlers": ["default", "file"], + "level": access_level, + "propagate": False, + }, }, - "uvicorn.error": { - "level": "INFO", + "root": { + "level": app_level, "handlers": ["default", "file"], - "propagate": False, - }, - "uvicorn.access": { - "handlers": ["access", "access_file"], - "level": "WARNING", # Disabled - using custom RequestLoggingMiddleware instead - "propagate": False, }, - }, - "root": { - "level": "INFO", - "handlers": ["default", "file"], - }, -} + } -# Configure root logger -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[ - logging.StreamHandler(), - logging.FileHandler(log_file) - ] -) logger = logging.getLogger("OpenArc") -def start_server(host: str = "0.0.0.0", port: int = 8001, reload: bool = False): +def start_server(host: str = "0.0.0.0", port: int = 8001, reload: bool = False, verbose: int = 0): """ Launches the OpenArc API server - + Args: host: Host to bind the server to port: Port to bind the server to """ - logger.info(f"Launching {host}:{port}") - logger.info("--------------------------------") - logger.info("OpenArc endpoints:") - logger.info(" - POST /openarc/load Load a model") - logger.info(" - POST /openarc/unload Unload a model") - logger.info(" - GET /openarc/status Get model status") - logger.info(" - GET /openarc/metrics Get hardware telemetry") - logger.info(" - POST /openarc/models/update Update model configuration") - logger.info(" - POST /openarc/bench Run inference benchmark") - logger.info(" - GET /openarc/downloader List active model downloads") - logger.info(" - POST /openarc/downloader Start a model download") - logger.info(" - DELETE /openarc/downloader Cancel a model download") - logger.info(" - POST /openarc/downloader/pause Pause a model download") - logger.info(" - POST /openarc/downloader/resume Resume a model download") - logger.info("--------------------------------") - logger.info("OpenAI compatible endpoints:") - logger.info(" - GET /v1/models") - logger.info(" - POST /v1/chat/completions") - logger.info(" - POST /v1/audio/transcriptions: Whisper only") - logger.info(" - POST /v1/audio/speech: Kokoro only") - logger.info(" - POST /v1/embeddings") - logger.info(" - POST /v1/rerank") - + + app_level_name = _level_from_verbose(verbose) + app_level_num = getattr(logging, app_level_name) + + logger.setLevel(app_level_num) + logging.getLogger().setLevel(app_level_num) + + print(f"Launching {host}:{port}") + print("--------------------------------") + print("OpenArc endpoints:") + print(" - POST /openarc/load Load a model") + print(" - POST /openarc/unload Unload a model") + print(" - GET /openarc/status Get model status") + print(" - GET /openarc/metrics Get hardware telemetry") + print(" - POST /openarc/models/update Update model configuration") + print(" - POST /openarc/bench Run inference benchmark") + print(" - GET /openarc/downloader List active model downloads") + print(" - POST /openarc/downloader Start a model download") + print(" - DELETE /openarc/downloader Cancel a model download") + print(" - POST /openarc/downloader/pause Pause a model download") + print(" - POST /openarc/downloader/resume Resume a model download") + print("--------------------------------") + print("OpenAI compatible endpoints:") + print(" - GET /v1/models") + print(" - POST /v1/chat/completions") + print(" - POST /v1/audio/transcriptions: Whisper only") + print(" - POST /v1/audio/speech: Kokoro only") + print(" - POST /v1/embeddings") + print(" - POST /v1/rerank") + uvicorn.run( "src.server.main:app", host=host, port=port, - log_config=LOG_CONFIG, + log_config=_build_log_config(verbose), reload=reload - ) \ No newline at end of file + ) diff --git a/src/server/main.py b/src/server/main.py index 64ce2e6..ede3f93 100644 --- a/src/server/main.py +++ b/src/server/main.py @@ -20,6 +20,7 @@ from src.server.routes.openarc import router as openarc_router logger = logging.getLogger(__name__) +_access_logger = logging.getLogger("openarc.access") class RequestLoggingMiddleware(BaseHTTPMiddleware): @@ -27,21 +28,21 @@ async def dispatch(self, request: Request, call_next): start_time = time.time() client_ip = request.client.host if request.client else "unknown" - logger.info( + _access_logger.info( f"Request received: {request.method} {request.url.path} from {client_ip}" ) try: response = await call_next(request) process_time = time.time() - start_time - logger.info( + _access_logger.info( f"Request completed: {request.method} {request.url.path} " f"status={response.status_code} duration={process_time:.3f}s" ) return response except Exception as e: process_time = time.time() - start_time - logger.error( + _access_logger.error( f"Request failed: {request.method} {request.url.path} " f"error={str(e)} duration={process_time:.3f}s" ) diff --git a/src/server/model_registry.py b/src/server/model_registry.py index f6824dc..ab52fd3 100644 --- a/src/server/model_registry.py +++ b/src/server/model_registry.py @@ -17,7 +17,6 @@ ) logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) @dataclass(frozen=False, slots=True) class ModelRecord: @@ -52,7 +51,7 @@ def registered_models(self) -> dict: if self.error_message: result["error_message"] = self.error_message return result - + class ModelRegistry: """Tracks loaded models by private model_id. Async-safe.""" @@ -71,7 +70,7 @@ def add_on_unloaded(self, callback: Callable[[ModelRecord], Awaitable[None]]) -> async def register_load(self, loader: ModelLoadConfig) -> str: """Register and load a model, waiting for completion. - + Raises: ValueError: If model name already exists Exception: Any exception during loading is propagated to caller @@ -82,7 +81,7 @@ async def register_load(self, loader: ModelLoadConfig) -> str: if existing_record.model_name == loader.model_name: logger.info(f"Load failed! model_name '{loader.model_name}' already exists") raise ValueError(f"model_name '{loader.model_name}' already registered") - + # Create a model record with LOADING status record = ModelRecord( model_path=loader.model_path, @@ -93,19 +92,19 @@ async def register_load(self, loader: ModelLoadConfig) -> str: runtime_config=loader.runtime_config, status=ModelStatus.LOADING, ) - + # Register the model record immediately async with self._lock: self._models[record.model_id] = record - + # Start loading task loading_task = asyncio.create_task(self._load_task(record.model_id, loader)) - + # Update the record with the task reference async with self._lock: if record.model_id in self._models: self._models[record.model_id].loading_task = loading_task - + # Wait for loading to complete and propagate exceptions try: await loading_task @@ -118,7 +117,7 @@ async def register_load(self, loader: ModelLoadConfig) -> str: raise RuntimeError(f"Model loading failed: {error_msg}") except asyncio.CancelledError: raise RuntimeError("Model loading was cancelled") - + return record.model_id async def register_unload(self, model_name: str) -> bool: @@ -130,20 +129,20 @@ async def register_unload(self, model_name: str) -> bool: if record.model_name == model_name: model_id = mid break - + if model_id is None: return False - + # Start background unload task asyncio.create_task(self._unload_task(model_id)) return True - + async def _load_task(self, model_id: str, load_config: ModelLoadConfig) -> None: """Background task to load a model and update its status.""" try: # Load the model instance model_instance = await create_model_instance(load_config) - + # Update the record with successful loading async with self._lock: if model_id in self._models: @@ -157,7 +156,7 @@ async def _load_task(self, model_id: str, load_config: ModelLoadConfig) -> None: # Fire loaded event callbacks outside the lock for cb in self._on_loaded: asyncio.create_task(cb(record)) - + except Exception as e: # Log the full exception with traceback logger.error(f"Model loading failed for {load_config.model_name}", exc_info=True) @@ -169,7 +168,7 @@ async def _load_task(self, model_id: str, load_config: ModelLoadConfig) -> None: record.status = ModelStatus.FAILED record.error_message = str(e) record.loading_task = None - + async def _unload_task(self, model_id: str) -> None: """Background task to unload a model and clean up resources.""" try: @@ -178,7 +177,7 @@ async def _unload_task(self, model_id: str) -> None: return record = self._models[model_id] model_instance = record.model_instance - + # Call the model's unload_model method if it exists and model is loaded if model_instance and hasattr(model_instance, 'unload_model'): unload_fn = getattr(model_instance, 'unload_model') @@ -191,7 +190,7 @@ async def _unload_task(self, model_id: str) -> None: # Await if coroutine/awaitable if inspect.isawaitable(result): await result - + # Remove from registry async with self._lock: removed_record = None @@ -206,7 +205,7 @@ async def _unload_task(self, model_id: str) -> None: if removed_record is not None: for cb in self._on_unloaded: asyncio.create_task(cb(removed_record)) - + except Exception as e: logger.info(f"Error during model unload: {e}") @@ -237,7 +236,7 @@ async def status(self) -> dict: async def create_model_instance(load_config: ModelLoadConfig) -> Any: """Factory function to create the appropriate model instance based on engine type.""" key = (load_config.engine, load_config.model_type) - + if key not in MODEL_CLASS_REGISTRY: available = [f"{engine.value}/{model.value}" for engine, model in MODEL_CLASS_REGISTRY.keys()] error_msg = ( @@ -246,16 +245,14 @@ async def create_model_instance(load_config: ModelLoadConfig) -> Any: ) logger.info(f"Model load failed: {error_msg}") raise ValueError(error_msg) - + # Dynamic import and instantiation class_path = MODEL_CLASS_REGISTRY[key] module_path, class_name = class_path.rsplit('.', 1) module = importlib.import_module(module_path) model_class = getattr(module, class_name) - + # Create and load model instance model_instance = model_class(load_config) await asyncio.to_thread(model_instance.load_model, load_config) return model_instance - - \ No newline at end of file diff --git a/src/server/routes/openarc.py b/src/server/routes/openarc.py index 2b5c410..c01dce9 100644 --- a/src/server/routes/openarc.py +++ b/src/server/routes/openarc.py @@ -2,6 +2,8 @@ import importlib.metadata import json import logging +from multiprocessing import cpu_count +from operator import is_ from pathlib import Path from typing import Any, Dict, Optional @@ -19,10 +21,136 @@ DownloaderRequest, ) -logger = logging.getLogger(__name__) +import openvino as ov + + +logger = logging.getLogger(__name__) router = APIRouter(prefix="/openarc") +is_gpu_metrics_installed = True + +class GPUInfo(BaseModel): + id: str + name: str + total_vram: Optional[int] = None + used_vram: Optional[int] = None + usage: Optional[float] = None + is_shared: Optional[bool] = None + + +def get_gpu_info_with_metrics(): + gpus = [] + try: + import gpu_metrics + + data = gpu_metrics.get_gpu_metrics() + for idx_str, gpu_data in data.items(): + name = gpu_data.get("name", f"Intel GPU {idx_str}") + total_vram_mb = 0 + used_vram_mb = 0 + is_shared = False + + mem_list = gpu_data.get("memory", []) + if mem_list and len(mem_list) > 0: + total_vram_mb = mem_list[0].get("total", 0) // (1024 * 1024) + used_vram_mb = mem_list[0].get("used", 0) // (1024 * 1024) + else: + import psutil + + vm = psutil.virtual_memory() + total_vram_mb = vm.total // (1024 * 1024) + is_shared = True + + gpus.append( + GPUInfo( + id=f"GPU.{idx_str}", + name=name, + total_vram=total_vram_mb, + used_vram=used_vram_mb, + usage=gpu_data.get("utilization", 0.0), + is_shared=is_shared, + ).model_dump() + ) + except ImportError: + is_gpu_metrics_installed = False + return False, [] + except Exception as e: + logging.error(f"Failed to fetch GPU metrics: {e}") + return False, [] + return True, gpus + + + +def get_cpu_info(): + cpu_info = {"id": "CPU", "name": "System CPU"} + try: + core = ov.Core() + devices = core.available_devices + for device in devices: + if "CPU" in device: + try: + cpu_info["name"] = str(core.get_property(device, "FULL_DEVICE_NAME")) + except Exception: + cpu_info["name"] = device + break + except Exception as e: + logging.error(f"Failed to query CPU info: {e}") + + return cpu_info + +def get_npu_info(): + npus = [] + try: + core = ov.Core() + devices = core.available_devices + for device in devices: + if "NPU" in device: + try: + name = core.get_property(device, "FULL_DEVICE_NAME") + except Exception: + name = device + npus.append({"id": device, "name": str(name)}) + except Exception as e: + logging.error(f"Failed to query NPU info: {e}") + return npus + +def get_gpu_info(): + gpu_metrics_status = False + gpus = [] + + if is_gpu_metrics_installed: + gpu_metrics_status, gpus = get_gpu_info_with_metrics() + + if not gpu_metrics_status: + try: + core = ov.Core() + devices = core.available_devices + for device in devices: + if "GPU" in device: + try: + name = core.get_property(device, "FULL_DEVICE_NAME") + except Exception: + name = device + + vram_bytes = core.get_property(device, "GPU_DEVICE_TOTAL_MEM_SIZE") + total_vram_mb = vram_bytes // (1024 * 1024) + gpus.append( + GPUInfo( + id=device, + name=str(name), + total_vram=total_vram_mb, + used_vram=0, + usage=0, + is_shared=False, + ).dict() + ) + except Exception as e: + logging.error(f"Failed to query GPU info: {e}") + + return gpus, gpu_metrics_status + + @router.post("/load", dependencies=[Depends(verify_api_key)]) async def load_model(load_config: ModelLoadConfig): @@ -150,73 +278,10 @@ async def get_version(): def get_hardware_metrics(): - gpus = [] - cpu_info = {"id": "CPU", "name": "System CPU"} - npus = [] - - try: - import cpuinfo - - info = cpuinfo.get_cpu_info() - cpu_info["name"] = info.get("brand_raw", "System CPU") - except Exception as e: - logging.error(f"Failed to query CPU info: {e}") - - try: - import openvino as ov - - core = ov.Core() - devices = core.available_devices - for device in devices: - if "NPU" in device: - try: - name = core.get_property(device, "FULL_DEVICE_NAME") - except Exception: - name = device - npus.append({"id": device, "name": str(name)}) - except Exception: - pass - - try: - import gpu_metrics - - data = gpu_metrics.get_gpu_metrics() - for idx_str, gpu_data in data.items(): - name = gpu_data.get("name", f"Intel GPU {idx_str}") - total_vram_mb = 0 - used_vram_mb = 0 - is_shared = False - - mem_list = gpu_data.get("memory", []) - if mem_list and len(mem_list) > 0: - total_vram_mb = mem_list[0].get("total", 0) // (1024 * 1024) - used_vram_mb = mem_list[0].get("used", 0) // (1024 * 1024) - else: - import psutil - - vm = psutil.virtual_memory() - total_vram_mb = vm.total // (1024 * 1024) - is_shared = True - - gpus.append( - { - "id": f"GPU.{idx_str}", - "name": str(name), - "total_vram": int(total_vram_mb), - "used_vram": int(used_vram_mb), - "usage": 0.0, - "is_shared": is_shared, - } - ) - except ImportError: - logging.warning( - "gpu_metrics module not found. Intel GPU telemetry will be missing." - ) - except Exception as e: - logging.error(f"Failed to fetch GPU metrics: {e}") - - return {"cpu": cpu_info, "gpus": gpus, "npus": npus} - + cpu_info = get_cpu_info() + gpu_info, gpu_metrics_status = get_gpu_info() + npu_info = get_npu_info() + return cpu_info, gpu_info, npu_info, gpu_metrics_status @router.get("/metrics", dependencies=[Depends(verify_api_key)]) async def get_metrics(): @@ -225,11 +290,12 @@ async def get_metrics(): vm = psutil.virtual_memory() hw_metrics = await asyncio.to_thread(get_hardware_metrics) + cpu_info, gpus, npus, gpu_metrics_status = hw_metrics return { "cpus": [ { - "id": hw_metrics["cpu"]["id"], - "name": hw_metrics["cpu"]["name"], + "id": cpu_info["id"], + "name": cpu_info["name"], "cores": psutil.cpu_count(logical=False) or 1, "threads": psutil.cpu_count(logical=True) or 1, "usage": psutil.cpu_percent(), @@ -237,8 +303,9 @@ async def get_metrics(): ], "total_ram": vm.total // (1024 * 1024), "used_ram": vm.used // (1024 * 1024), - "gpus": hw_metrics["gpus"], - "npus": hw_metrics["npus"], + "gpus": gpus, + "npus": npus, + "gpu_metrics_worked": gpu_metrics_status, } diff --git a/src/server/worker_registry.py b/src/server/worker_registry.py index 2e54bd5..7a7126a 100644 --- a/src/server/worker_registry.py +++ b/src/server/worker_registry.py @@ -25,7 +25,6 @@ from src.server.models.registration import ModelType logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) @dataclass class WorkerPacket: @@ -73,11 +72,11 @@ class WorkerPacket: class InferWorker: """ Handles generation for individual packets. - + Responsibilities: - Execute generation requests using pipelines - + Methods: - infer_llm: Process text-to-text generation requests - infer_vlm: Process image-to-text generation requests @@ -86,7 +85,7 @@ class InferWorker: - infer_emb: Process embedding requests - infer_rerank: Process reranking requests """ - + @staticmethod async def infer_llm(packet: WorkerPacket, llm_instance: OVGenAI_LLM) -> WorkerPacket: """Generate text for a single packet using the OVGenAI_LLM pipeline""" @@ -120,7 +119,7 @@ async def infer_llm(packet: WorkerPacket, llm_instance: OVGenAI_LLM) -> WorkerPa # Signal error to stream if streaming if packet.gen_config.stream and packet.stream_queue is not None: await packet.stream_queue.put(None) - + return packet @staticmethod @@ -156,7 +155,7 @@ async def infer_vlm(packet: WorkerPacket, vlm_model: OVGenAI_VLM) -> WorkerPacke # Signal error to stream if streaming if packet.gen_config.stream and packet.stream_queue is not None: await packet.stream_queue.put(None) - + return packet @staticmethod @@ -184,7 +183,7 @@ async def infer_whisper(packet: WorkerPacket, whisper_model: OVGenAI_Whisper) -> # Store error in packet response packet.response = f"Error: {str(e)}" packet.metrics = None - + return packet @staticmethod @@ -253,7 +252,7 @@ async def infer_kokoro(packet: WorkerPacket, kokoro_model: OV_Kokoro) -> WorkerP packet.metrics = None return packet - + @staticmethod async def infer_qwen3_tts(packet: WorkerPacket, tts_model: OVQwen3TTS) -> WorkerPacket: """Generate speech audio for a single packet using the OVQwen3TTS engine.""" @@ -317,7 +316,7 @@ async def infer_emb(packet: WorkerPacket, emb_instance: Optimum_EMB) -> WorkerPa packet.response = final_data packet.metrics = metrics - + except Exception as e: # Log the full exception with traceback logger.error("EMB inference failed!", exc_info=True) @@ -327,7 +326,7 @@ async def infer_emb(packet: WorkerPacket, emb_instance: Optimum_EMB) -> WorkerPa # Signal error to stream if streaming if packet.gen_config.stream and packet.stream_queue is not None: await packet.stream_queue.put(None) - + return packet @staticmethod @@ -345,7 +344,7 @@ async def infer_rerank(packet: WorkerPacket, rerank_instance: Optimum_RR) -> Wor packet.response = final_data packet.metrics = metrics - + except Exception as e: # Log the full exception with traceback logger.error("Reranking failed!", exc_info=True) @@ -355,15 +354,15 @@ async def infer_rerank(packet: WorkerPacket, rerank_instance: Optimum_RR) -> Wor # Signal error to stream if streaming if packet.gen_config.stream and packet.stream_queue is not None: await packet.stream_queue.put(None) - + return packet - + class QueueWorker: """ Manages inference worker loops for consuming and processing packets from model queues. - + """ - + @staticmethod async def queue_worker_llm(model_name: str, model_queue: asyncio.Queue, llm_model: OVGenAI_LLM, registry: ModelRegistry): """Text model inference worker that processes packets from queue""" @@ -486,7 +485,7 @@ async def queue_worker_kokoro(model_name: str, model_queue: asyncio.Queue, kokor break # Log the text that was converted to speech - + if completed_packet.metrics: logger.info(f"[Kokoro Worker: {model_name}] Metrics: {completed_packet.metrics}") @@ -569,12 +568,12 @@ async def queue_worker_rr(model_name: str, model_queue: asyncio.Queue, rr_model: class WorkerRegistry: """ Central orchestrator for managing per-model inference workers and request routing. - + WorkerRegistry serves as the main coordination layer that bridges the ModelRegistry with the actual inference execution. It automatically spawns and manages dedicated worker tasks for each loaded model, routing generation requests to the appropriate model-specific queues. - + """ @@ -607,7 +606,7 @@ def __init__(self, model_registry: ModelRegistry): self._model_tasks_rerank: Dict[str, asyncio.Task] = {} self._lock = asyncio.Lock() - + # Track active requests for cancellation: request_id -> (model_name, packet) self._active_requests: Dict[str, tuple[str, WorkerPacket]] = {} @@ -687,7 +686,7 @@ async def _on_model_loaded(self, record: ModelRecord) -> None: self._model_queues_emb[record.model_name] = q task = asyncio.create_task(QueueWorker.queue_worker_emb(record.model_name, q, instance, self._model_registry)) self._model_tasks_emb[record.model_name] = task - + elif mt == ModelType.RERANK and isinstance(instance, Optimum_RR): if record.model_name not in self._model_queues_rerank: q: asyncio.Queue = asyncio.Queue() @@ -754,7 +753,7 @@ async def _on_model_unloaded(self, record: ModelRecord) -> None: await q.put(None) if t is not None and not t.done(): t.cancel() - + # Try rerank dicts q = self._model_queues_rerank.pop(record.model_name, None) t = self._model_tasks_rerank.pop(record.model_name, None) @@ -807,7 +806,7 @@ def _get_rerank_queue(self, model_name: str) -> asyncio.Queue: if q is not None: return q raise ValueError(f"Rerank model '{model_name}' is not loaded or no worker is available") - + async def generate(self, model_name: str, gen_config: OVGenAI_GenConfig) -> Dict[str, Any]: """Generate text without streaming.""" request_id = uuid.uuid4().hex @@ -827,7 +826,7 @@ async def stream_generate(self, model_name: str, gen_config: OVGenAI_GenConfig) """Generate text with streaming.""" request_id = uuid.uuid4().hex gen_config.request_id = request_id # Set request_id for cancellation tracking - + stream_queue: asyncio.Queue = asyncio.Queue() result_future: asyncio.Future = asyncio.get_running_loop().create_future() packet = WorkerPacket( @@ -837,11 +836,11 @@ async def stream_generate(self, model_name: str, gen_config: OVGenAI_GenConfig) stream_queue=stream_queue, result_future=result_future, ) - + # Register active request async with self._lock: self._active_requests[request_id] = (model_name, packet) - + try: q = self._get_model_queue(model_name) await q.put(packet) @@ -858,17 +857,17 @@ async def stream_generate(self, model_name: str, gen_config: OVGenAI_GenConfig) async def infer_cancel(self, request_id: str) -> bool: """ Cancel an ongoing inference request by request_id. - + Args: request_id: The request ID to cancel - + Returns: True if cancellation was triggered, False if request not found """ async with self._lock: if request_id in self._active_requests: model_name, _ = self._active_requests[request_id] - + # Look up model instance from ModelRegistry async with self._model_registry._lock: for record in self._model_registry._models.values(): @@ -968,7 +967,7 @@ async def generate_speech_kokoro(self, model_name: str, gen_config: OV_KokoroGen await q.put(packet) completed = await result_future return {"audio_base64": completed.response or "", "metrics": completed.metrics or {}} - + async def embed(self, model_name: str, tok_config: PreTrainedTokenizerConfig) -> Dict[str, Any]: """Create embeddings.""" request_id = uuid.uuid4().hex @@ -983,7 +982,7 @@ async def embed(self, model_name: str, tok_config: PreTrainedTokenizerConfig) -> await q.put(packet) completed = await result_future return {"data": completed.response, "metrics": completed.metrics or {}} - + async def rerank(self, model_name: str, rr_config: RerankerConfig) -> Dict[str, Any]: """Rerank documents.""" request_id = uuid.uuid4().hex @@ -997,4 +996,4 @@ async def rerank(self, model_name: str, rr_config: RerankerConfig) -> Dict[str, q = self._get_rerank_queue(model_name) await q.put(packet) completed = await result_future - return {"data": completed.response, "metrics": completed.metrics or {}} \ No newline at end of file + return {"data": completed.response, "metrics": completed.metrics or {}} diff --git a/uv.lock b/uv.lock index 81f7967..8663bcc 100644 --- a/uv.lock +++ b/uv.lock @@ -960,17 +960,6 @@ http = [ { name = "aiohttp" }, ] -[[package]] -name = "gpu-metrics" -version = "0.1.0" -source = { directory = "gpu-metrics" } -dependencies = [ - { name = "pybind11" }, -] - -[package.metadata] -requires-dist = [{ name = "pybind11" }] - [[package]] name = "grapheme" version = "0.6.0" @@ -2197,7 +2186,6 @@ dependencies = [ { name = "click" }, { name = "ddgs" }, { name = "fastapi" }, - { name = "gpu-metrics" }, { name = "huggingface-hub", extra = ["cli"] }, { name = "ipykernel" }, { name = "ipywidgets" }, @@ -2231,7 +2219,6 @@ requires-dist = [ { name = "click", specifier = ">=8.2.1" }, { name = "ddgs", specifier = ">=9.6.1" }, { name = "fastapi", specifier = ">=0.116.1" }, - { name = "gpu-metrics", directory = "gpu-metrics" }, { name = "huggingface-hub", extras = ["cli"], specifier = ">=0.33.4" }, { name = "ipykernel", specifier = ">=7.0.1" }, { name = "ipywidgets", specifier = ">=8.1.7" },