From 85e8a52afe678ec6fd7e3877fe1e9795c5a9bd4f Mon Sep 17 00:00:00 2001 From: insistence <3055204202@qq.com> Date: Fri, 6 Feb 2026 09:06:31 +0800 Subject: [PATCH 1/4] =?UTF-8?q?feat:=20=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=96=B0=E5=A2=9Eredis=E5=88=86=E5=B8=83=E5=BC=8F?= =?UTF-8?q?=E9=94=81=E4=BB=A5=E5=9C=A8=E5=A4=9Aworker=E4=B8=8B=E6=AD=A3?= =?UTF-8?q?=E5=B8=B8=E8=BF=90=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ruoyi-fastapi-backend/app.py | 1 + ruoyi-fastapi-backend/common/constant.py | 10 + ruoyi-fastapi-backend/config/database.py | 115 ++- ruoyi-fastapi-backend/config/get_db.py | 9 + ruoyi-fastapi-backend/config/get_scheduler.py | 832 +++++++++++++++--- .../module_admin/dao/job_dao.py | 12 + .../module_admin/service/job_service.py | 3 + ruoyi-fastapi-backend/server.py | 145 ++- ruoyi-fastapi-backend/utils/server_util.py | 91 ++ 9 files changed, 1066 insertions(+), 152 deletions(-) diff --git a/ruoyi-fastapi-backend/app.py b/ruoyi-fastapi-backend/app.py index 2d2072d..b9bc3f3 100644 --- a/ruoyi-fastapi-backend/app.py +++ b/ruoyi-fastapi-backend/app.py @@ -12,4 +12,5 @@ port=AppConfig.app_port, root_path=AppConfig.app_root_path, reload=AppConfig.app_reload, + workers=AppConfig.app_workers, ) diff --git a/ruoyi-fastapi-backend/common/constant.py b/ruoyi-fastapi-backend/common/constant.py index da70e15..8323c6a 100644 --- a/ruoyi-fastapi-backend/common/constant.py +++ b/ruoyi-fastapi-backend/common/constant.py @@ -133,6 +133,16 @@ class JobConstant: JOB_WHITE_LIST = ['module_task'] +class LockConstant: + """ + 分布式锁常量 + """ + + APP_STARTUP_LOCK_KEY = 'app:startup:lock' + LOCK_EXPIRE_SECONDS = 60 + LOCK_RENEWAL_INTERVAL = 20 + + class MenuConstant: """ 菜单常量 diff --git a/ruoyi-fastapi-backend/config/database.py b/ruoyi-fastapi-backend/config/database.py index 72e0ce4..e526b42 100644 --- a/ruoyi-fastapi-backend/config/database.py +++ b/ruoyi-fastapi-backend/config/database.py @@ -1,29 +1,108 @@ from urllib.parse import quote_plus -from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, create_async_engine -from sqlalchemy.orm import DeclarativeBase +from sqlalchemy import Engine, create_engine +from sqlalchemy.ext.asyncio import AsyncAttrs, AsyncEngine, async_sessionmaker, create_async_engine +from sqlalchemy.orm import DeclarativeBase, sessionmaker from config.env import DataBaseConfig -ASYNC_SQLALCHEMY_DATABASE_URL = ( - f'mysql+asyncmy://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@' - f'{DataBaseConfig.db_host}:{DataBaseConfig.db_port}/{DataBaseConfig.db_database}' -) -if DataBaseConfig.db_type == 'postgresql': - ASYNC_SQLALCHEMY_DATABASE_URL = ( - f'postgresql+asyncpg://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@' + +def build_async_sqlalchemy_database_url() -> str: + """ + 构建异步 SQLAlchemy 数据库连接 URL + + :return: 异步 SQLAlchemy 数据库连接 URL + """ + if DataBaseConfig.db_type == 'postgresql': + return ( + f'postgresql+asyncpg://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@' + f'{DataBaseConfig.db_host}:{DataBaseConfig.db_port}/{DataBaseConfig.db_database}' + ) + return ( + f'mysql+asyncmy://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@' f'{DataBaseConfig.db_host}:{DataBaseConfig.db_port}/{DataBaseConfig.db_database}' ) -async_engine = create_async_engine( - ASYNC_SQLALCHEMY_DATABASE_URL, - echo=DataBaseConfig.db_echo, - max_overflow=DataBaseConfig.db_max_overflow, - pool_size=DataBaseConfig.db_pool_size, - pool_recycle=DataBaseConfig.db_pool_recycle, - pool_timeout=DataBaseConfig.db_pool_timeout, -) -AsyncSessionLocal = async_sessionmaker(autocommit=False, autoflush=False, bind=async_engine) + +ASYNC_SQLALCHEMY_DATABASE_URL = build_async_sqlalchemy_database_url() + + +def build_sync_sqlalchemy_database_url() -> str: + """ + 构建同步 SQLAlchemy 数据库连接 URL + + :return: 同步 SQLAlchemy 数据库连接 URL + """ + if DataBaseConfig.db_type == 'postgresql': + return ( + f'postgresql+psycopg2://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@' + f'{DataBaseConfig.db_host}:{DataBaseConfig.db_port}/{DataBaseConfig.db_database}' + ) + return ( + f'mysql+pymysql://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@' + f'{DataBaseConfig.db_host}:{DataBaseConfig.db_port}/{DataBaseConfig.db_database}' + ) + + +SYNC_SQLALCHEMY_DATABASE_URL = build_sync_sqlalchemy_database_url() + + +def create_async_db_engine(echo: bool | None = None) -> AsyncEngine: + """ + 创建异步 SQLAlchemy Engine + + :param echo: 可选,是否输出 SQLAlchemy SQL 日志 + :return: 异步 SQLAlchemy Engine + """ + return create_async_engine( + ASYNC_SQLALCHEMY_DATABASE_URL, + echo=DataBaseConfig.db_echo if echo is None else echo, + max_overflow=DataBaseConfig.db_max_overflow, + pool_size=DataBaseConfig.db_pool_size, + pool_recycle=DataBaseConfig.db_pool_recycle, + pool_timeout=DataBaseConfig.db_pool_timeout, + ) + + +def create_sync_db_engine(echo: bool | None = None) -> Engine: + """ + 创建同步 SQLAlchemy Engine + + :param echo: 可选,是否输出 SQLAlchemy SQL 日志 + :return: 同步 SQLAlchemy Engine + """ + return create_engine( + SYNC_SQLALCHEMY_DATABASE_URL, + echo=DataBaseConfig.db_echo if echo is None else echo, + max_overflow=DataBaseConfig.db_max_overflow, + pool_size=DataBaseConfig.db_pool_size, + pool_recycle=DataBaseConfig.db_pool_recycle, + pool_timeout=DataBaseConfig.db_pool_timeout, + ) + + +def create_async_session_local(engine: AsyncEngine) -> async_sessionmaker: + """ + 创建异步 Session 工厂 + + :param engine: 异步 SQLAlchemy Engine + :return: 异步 Session 工厂 + """ + return async_sessionmaker(autocommit=False, autoflush=False, bind=engine) + + +def create_sync_session_local(engine: Engine) -> sessionmaker: + """ + 创建同步 Session 工厂 + + :param engine: 同步 SQLAlchemy Engine + :return: 同步 Session 工厂 + """ + return sessionmaker(autocommit=False, autoflush=False, bind=engine) + + +async_engine = create_async_db_engine() +AsyncSessionLocal = create_async_session_local(async_engine) class Base(AsyncAttrs, DeclarativeBase): diff --git a/ruoyi-fastapi-backend/config/get_db.py b/ruoyi-fastapi-backend/config/get_db.py index 16df0ab..43f838a 100644 --- a/ruoyi-fastapi-backend/config/get_db.py +++ b/ruoyi-fastapi-backend/config/get_db.py @@ -26,3 +26,12 @@ async def init_create_table() -> None: async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) logger.info('✅️ 数据库连接成功') + + +async def close_async_engine() -> None: + """ + 应用关闭时释放数据库连接池 + + :return: + """ + await async_engine.dispose() diff --git a/ruoyi-fastapi-backend/config/get_scheduler.py b/ruoyi-fastapi-backend/config/get_scheduler.py index 28045c0..a6c2598 100644 --- a/ruoyi-fastapi-backend/config/get_scheduler.py +++ b/ruoyi-fastapi-backend/config/get_scheduler.py @@ -1,3 +1,4 @@ +import asyncio import importlib import json from asyncio import iscoroutinefunction @@ -16,16 +17,25 @@ from apscheduler.triggers.combining import OrTrigger from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.date import DateTrigger -from sqlalchemy.engine import create_engine -from sqlalchemy.orm import sessionmaker +from redis import asyncio as aioredis +from sqlalchemy.engine import Engine +from sqlalchemy.ext.asyncio import AsyncEngine import module_task # noqa: F401 -from config.database import AsyncSessionLocal, quote_plus -from config.env import DataBaseConfig, RedisConfig +from common.constant import LockConstant +from config.database import ( + SYNC_SQLALCHEMY_DATABASE_URL, + create_async_db_engine, + create_async_session_local, + create_sync_db_engine, + create_sync_session_local, +) +from config.env import LogConfig, RedisConfig from module_admin.dao.job_dao import JobDao from module_admin.entity.vo.job_vo import JobLogModel, JobModel from module_admin.service.job_log_service import JobLogService from utils.log_util import logger +from utils.server_util import StartupUtil, WorkerIdUtil # 重写Cron定时 @@ -86,24 +96,6 @@ def __find_recent_workday(cls, day: int) -> int: diff += 1 -SQLALCHEMY_DATABASE_URL = ( - f'mysql+pymysql://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@' - f'{DataBaseConfig.db_host}:{DataBaseConfig.db_port}/{DataBaseConfig.db_database}' -) -if DataBaseConfig.db_type == 'postgresql': - SQLALCHEMY_DATABASE_URL = ( - f'postgresql+psycopg2://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@' - f'{DataBaseConfig.db_host}:{DataBaseConfig.db_port}/{DataBaseConfig.db_database}' - ) -engine = create_engine( - SQLALCHEMY_DATABASE_URL, - echo=DataBaseConfig.db_echo, - max_overflow=DataBaseConfig.db_max_overflow, - pool_size=DataBaseConfig.db_pool_size, - pool_recycle=DataBaseConfig.db_pool_recycle, - pool_timeout=DataBaseConfig.db_pool_timeout, -) -SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) redis_config = { 'host': RedisConfig.redis_host, 'port': RedisConfig.redis_port, @@ -111,15 +103,9 @@ def __find_recent_workday(cls, day: int) -> int: 'password': RedisConfig.redis_password, 'db': RedisConfig.redis_database, } -job_stores = { - 'default': MemoryJobStore(), - 'sqlalchemy': SQLAlchemyJobStore(url=SQLALCHEMY_DATABASE_URL, engine=engine), - 'redis': RedisJobStore(**redis_config), -} executors = {'default': AsyncIOExecutor(), 'processpool': ProcessPoolExecutor(5)} job_defaults = {'coalesce': False, 'max_instance': 1} scheduler = AsyncIOScheduler() -scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults) class SchedulerUtil: @@ -127,23 +113,608 @@ class SchedulerUtil: 定时任务相关方法 """ + # 分布式锁相关类变量 + _is_leader: bool = False + _worker_id: str = WorkerIdUtil.get_worker_id(LogConfig.log_worker_id) + _redis: aioredis.Redis | None = None + _job_update_time_cache: dict[str, datetime] = {} + _sync_channel: str = 'scheduler:sync:request' + _sync_listener_task: asyncio.Task | None = None + _lock_lost_task: asyncio.Task | None = None + _sync_task: asyncio.Task | None = None + _sync_pending: bool = False + _sync_lock: asyncio.Lock = asyncio.Lock() + _last_sync_at: datetime | None = None + _sync_debounce_seconds: float = 0.5 + _sync_min_interval_seconds: float = 2.0 + _reacquire_task: asyncio.Task | None = None + _reacquire_interval_seconds: float = 5.0 + _sync_async_engine: AsyncEngine | None = None + _sync_async_sessionmaker: Any | None = None + _disposed_sync_engines: bool = False + + # 懒加载的同步 Engine 和 SessionLocal + _jobstore_engine: Engine | None = None + _listener_engine: Engine | None = None + _session_local: Any | None = None + _scheduler_configured: bool = False + + @classmethod + def _get_jobstore_engine(cls) -> Engine: + """ + 懒加载获取 jobstore 使用的同步 Engine + + :return: 同步 Engine + """ + if cls._jobstore_engine is None: + cls._jobstore_engine = create_sync_db_engine(echo=False) + return cls._jobstore_engine + + @classmethod + def _get_listener_engine(cls) -> Engine: + """ + 懒加载获取 listener 使用的同步 Engine + + :return: 同步 Engine + """ + if cls._listener_engine is None: + cls._listener_engine = create_sync_db_engine() + return cls._listener_engine + + @classmethod + def _get_session_local(cls) -> Any: + """ + 懒加载获取同步 SessionLocal + + :return: SessionLocal + """ + if cls._session_local is None: + cls._session_local = create_sync_session_local(cls._get_listener_engine()) + return cls._session_local + + @classmethod + def _configure_scheduler(cls) -> None: + """ + 配置 scheduler(懒加载 jobstore) + + :return: None + """ + if cls._scheduler_configured: + return + job_stores = { + 'default': MemoryJobStore(), + 'sqlalchemy': SQLAlchemyJobStore(url=SYNC_SQLALCHEMY_DATABASE_URL, engine=cls._get_jobstore_engine()), + 'redis': RedisJobStore(**redis_config), + } + scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults) + cls._scheduler_configured = True + @classmethod - async def init_system_scheduler(cls) -> None: + async def init_system_scheduler(cls, redis: aioredis.Redis) -> None: """ - 应用启动时初始化定时任务 + 应用启动时初始化定时任务(使用分布式锁确保只有一个worker启动scheduler) + :param redis: Redis连接对象 :return: """ - logger.info('🔎 开始启动定时任务...') + cls._redis = redis + logger.info(f'🔎 Worker {cls._worker_id} 尝试获取 Application 锁...') + + acquired = await StartupUtil.acquire_startup_log_gate( + redis=redis, + lock_key=LockConstant.APP_STARTUP_LOCK_KEY, + worker_id=cls._worker_id, + lock_expire_seconds=LockConstant.LOCK_EXPIRE_SECONDS, + ) + + if acquired: + await cls._start_scheduler_as_leader(redis) + else: + cls._is_leader = False + logger.info(f'⏸️ Worker {cls._worker_id} 未持有 Application 锁,跳过 Scheduler 启动') + + @classmethod + async def _start_scheduler_as_leader(cls, redis: aioredis.Redis) -> None: + """ + 以 Leader 身份启动 Scheduler(内部方法,调用前需确保已持有锁) + + :param redis: Redis连接对象 + :return: None + """ + cls._is_leader = True + cls._disposed_sync_engines = False + logger.info(f'🎯 Worker {cls._worker_id} 持有 Application 锁,开始启动定时任务...') + # 懒加载配置 scheduler + cls._configure_scheduler() scheduler.start() - async with AsyncSessionLocal() as session: + + # 加载数据库中的定时任务 + async with cls._get_sync_async_session() as session: job_list = await JobDao.get_job_list_for_scheduler(session) for item in job_list: - cls.remove_scheduler_job(job_id=str(item.job_id)) - cls.add_scheduler_job(item) + cls._add_job_to_scheduler(item) + + # 添加事件监听器 scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL) + + # 添加任务状态同步任务(每30秒从数据库同步一次任务状态) + scheduler.add_job( + func=cls.request_scheduler_sync, + trigger='interval', + seconds=30, + id='_scheduler_job_sync', + name='Scheduler任务同步', + replace_existing=True, + ) + cls._sync_listener_task = asyncio.create_task(cls._listen_sync_channel(redis)) + logger.info('✅️ 系统初始定时任务加载成功') + @classmethod + def on_lock_lost(cls) -> None: + """ + 锁丢失处理入口 + + :return: None + """ + if not cls._is_leader: + return + cls._is_leader = False + logger.warning(f'⚠️ Worker {cls._worker_id} 失去 Application 锁') + if cls._lock_lost_task: + cls._lock_lost_task.cancel() + cls._lock_lost_task = asyncio.create_task(cls._handle_lock_lost()) + + @classmethod + async def _handle_lock_lost(cls) -> None: + """ + 处理锁丢失后的资源释放 + + :return: None + """ + if cls._sync_listener_task: + cls._sync_listener_task.cancel() + try: + await cls._sync_listener_task + except asyncio.CancelledError: + pass + cls._sync_listener_task = None + if cls._sync_task: + cls._sync_task.cancel() + try: + await cls._sync_task + except asyncio.CancelledError: + pass + cls._sync_task = None + cls._sync_pending = False + if getattr(scheduler, 'running', False): + scheduler.shutdown() + await cls._dispose_sync_async_engine() + cls._dispose_sync_engines() + cls._ensure_reacquire_task() + + @classmethod + async def _sync_jobs_from_database(cls) -> None: + """ + 从数据库同步任务状态,确保多worker环境下任务状态一致 + """ + if not cls._is_leader: + return + + try: + async with cls._get_sync_async_session() as session: + db_jobs_all = await JobDao.get_all_job_list_for_scheduler(session) + db_jobs_enabled = [job for job in db_jobs_all if job.status == '0'] + db_enabled_ids = {str(job.job_id) for job in db_jobs_enabled} + db_job_map = {str(job.job_id): job for job in db_jobs_enabled} + db_job_update_time_map = { + str(job.job_id): job.update_time for job in db_jobs_enabled if job.update_time is not None + } + scheduler_jobs = scheduler.get_jobs() + scheduler_job_map = {job.id: job for job in scheduler_jobs if not job.id.startswith('_')} + scheduler_job_ids = set(scheduler_job_map.keys()) + + jobs_to_remove = scheduler_job_ids - db_enabled_ids + for job_id in jobs_to_remove: + scheduler.remove_job(job_id=job_id) + logger.info(f'🗑️ 同步移除任务: {job_id}') + cls._refresh_job_update_cache(job_id, None) + + jobs_to_add = db_enabled_ids - scheduler_job_ids + for job_id in jobs_to_add: + job_info = db_job_map.get(job_id) + if job_info: + cls._add_job_to_scheduler(job_info) + logger.info(f'➕ 同步添加任务: {job_info.job_name}') + cls._refresh_job_update_cache(job_id, job_info.update_time) + + jobs_to_update = db_enabled_ids & scheduler_job_ids + for job_id in jobs_to_update: + job_info = db_job_map.get(job_id) + scheduler_job = scheduler_job_map.get(job_id) + job_update_time = db_job_update_time_map.get(job_id) + cls._sync_update_job(job_id, job_info, scheduler_job, job_update_time) + + except Exception as e: + logger.error(f'❌ 任务同步异常: {e}') + + @classmethod + def _is_job_config_in_sync(cls, scheduler_job: Job, job_info: JobModel) -> bool: + """ + 判断任务配置是否一致 + + :param scheduler_job: 调度器任务对象 + :param job_info: 数据库任务对象 + :return: 是否一致 + """ + job_state = scheduler_job.__getstate__() + job_kwargs = json.loads(job_info.job_kwargs) if job_info.job_kwargs else None + job_args = job_info.job_args.split(',') if job_info.job_args else None + job_executor = job_info.job_executor + if iscoroutinefunction(cls._import_function(job_info.invoke_target)): + job_executor = 'default' + expected = { + 'name': job_info.job_name, + 'executor': job_executor, + 'jobstore': job_info.job_group, + 'misfire_grace_time': 1000000000000 if job_info.misfire_policy == '3' else None, + 'coalesce': job_info.misfire_policy == '2', + 'max_instances': 3 if job_info.concurrent == '0' else 1, + 'trigger': str(MyCronTrigger.from_crontab(job_info.cron_expression)), + 'args': tuple(job_args) if job_args else None, + 'kwargs': job_kwargs if job_kwargs else None, + 'func': str(cls._import_function(job_info.invoke_target)), + } + current = { + 'name': job_state.get('name'), + 'executor': job_state.get('executor'), + 'jobstore': scheduler_job._jobstore_alias, + 'misfire_grace_time': job_state.get('misfire_grace_time'), + 'coalesce': job_state.get('coalesce'), + 'max_instances': job_state.get('max_instances'), + 'trigger': str(job_state.get('trigger')), + 'args': job_state.get('args'), + 'kwargs': job_state.get('kwargs'), + 'func': str(job_state.get('func')), + } + return expected == current + + @classmethod + def _sync_update_job( + cls, job_id: str, job_info: JobModel | None, scheduler_job: Job | None, job_update_time: datetime | None + ) -> None: + """ + 同步更新任务配置 + + :param job_id: 任务ID + :param job_info: 数据库任务对象 + :param scheduler_job: 调度器任务对象 + :param job_update_time: 任务更新时间 + :return: None + """ + if not job_info or not scheduler_job: + return + if cls._should_skip_job_update(job_id, job_update_time): + return + if not cls._is_job_config_in_sync(scheduler_job, job_info): + scheduler.remove_job(job_id=job_id) + cls._add_job_to_scheduler(job_info) + logger.info(f'♻️ 同步更新任务: {job_info.job_name}') + cls._refresh_job_update_cache(job_id, job_update_time) + + @classmethod + def _should_skip_job_update(cls, job_id: str, job_update_time: datetime | None) -> bool: + """ + 判断是否跳过同步更新 + + :param job_id: 任务ID + :param job_update_time: 任务更新时间 + :return: 是否跳过 + """ + if job_update_time is None: + return False + return cls._job_update_time_cache.get(job_id) == job_update_time + + @classmethod + def _refresh_job_update_cache(cls, job_id: str, job_update_time: datetime | None) -> None: + """ + 刷新任务更新时间缓存 + + :param job_id: 任务ID + :param job_update_time: 任务更新时间 + :return: None + """ + if job_update_time is not None: + cls._job_update_time_cache[job_id] = job_update_time + else: + cls._job_update_time_cache.pop(job_id, None) + + @classmethod + async def request_scheduler_sync(cls) -> None: + """ + 请求调度器同步任务状态 + + :return: None + """ + if cls._is_leader: + cls._sync_pending = True + cls._ensure_sync_task() + return + if cls._redis: + await cls._redis.publish(cls._sync_channel, cls._worker_id) + + @classmethod + def _ensure_sync_task(cls) -> None: + """ + 启动同步调度任务 + + :return: None + """ + if cls._sync_task and not cls._sync_task.done(): + return + cls._sync_task = asyncio.create_task(cls._run_sync_loop()) + + @classmethod + def _get_sync_async_session(cls) -> Any: + """ + 获取同步任务使用的异步 Session + + :return: 异步 Session + """ + if not cls._sync_async_sessionmaker: + cls._sync_async_engine = create_async_db_engine(echo=False) + cls._sync_async_sessionmaker = create_async_session_local(cls._sync_async_engine) + return cls._sync_async_sessionmaker() + + @classmethod + async def _dispose_sync_async_engine(cls) -> None: + """ + 释放同步任务使用的异步 Engine + + :return: None + """ + if cls._sync_async_engine: + await cls._sync_async_engine.dispose() + cls._sync_async_engine = None + cls._sync_async_sessionmaker = None + + @classmethod + def _dispose_sync_engines(cls) -> None: + """ + 释放 Scheduler 使用的同步 Engine + + :return: None + """ + if cls._disposed_sync_engines: + return + if cls._jobstore_engine: + cls._jobstore_engine.dispose() + cls._jobstore_engine = None + if cls._listener_engine: + cls._listener_engine.dispose() + cls._listener_engine = None + cls._session_local = None + cls._disposed_sync_engines = True + + @classmethod + def _ensure_reacquire_task(cls) -> None: + """ + 启动锁重新竞争任务 + + :return: None + """ + if not cls._redis: + return + if cls._reacquire_task and not cls._reacquire_task.done(): + return + cls._reacquire_task = asyncio.create_task(cls._run_reacquire_loop()) + + @classmethod + async def _run_reacquire_loop(cls) -> None: + """ + 循环尝试重新获取锁并恢复调度器 + + :return: None + """ + try: + while not cls._is_leader: + if not cls._redis: + await asyncio.sleep(cls._reacquire_interval_seconds) + continue + acquired = await StartupUtil.acquire_startup_log_gate( + redis=cls._redis, + lock_key=LockConstant.APP_STARTUP_LOCK_KEY, + worker_id=cls._worker_id, + lock_expire_seconds=LockConstant.LOCK_EXPIRE_SECONDS, + ) + if acquired: + # 直接调用 _start_scheduler_as_leader,避免重复获取锁 + await cls._start_scheduler_as_leader(cls._redis) + return + await asyncio.sleep(cls._reacquire_interval_seconds) + except asyncio.CancelledError: + raise + finally: + cls._reacquire_task = None + + @classmethod + async def _run_sync_loop(cls) -> None: + """ + 执行同步调度循环 + + :return: None + """ + try: + while True: + if not cls._sync_pending: + break + cls._sync_pending = False + await asyncio.sleep(cls._sync_debounce_seconds) + await cls._sync_with_throttle() + except asyncio.CancelledError: + raise + finally: + cls._sync_task = None + + @classmethod + async def _sync_with_throttle(cls) -> None: + """ + 按节流规则执行同步 + + :return: None + """ + async with cls._sync_lock: + if not cls._is_leader: + return + if cls._last_sync_at: + elapsed = datetime.now() - cls._last_sync_at + min_interval = timedelta(seconds=cls._sync_min_interval_seconds) + if elapsed < min_interval: + await asyncio.sleep((min_interval - elapsed).total_seconds()) + await cls._sync_jobs_from_database() + cls._last_sync_at = datetime.now() + + @classmethod + async def _listen_sync_channel(cls, redis: aioredis.Redis) -> None: + """ + 监听同步请求通道 + + :param redis: Redis连接对象 + :return: None + """ + while True: + pubsub = redis.pubsub() + try: + await pubsub.subscribe(cls._sync_channel) + async for message in pubsub.listen(): + if not cls._is_leader: + continue + if message.get('type') != 'message': + continue + await cls.request_scheduler_sync() + except asyncio.CancelledError: + await pubsub.unsubscribe(cls._sync_channel) + await pubsub.close() + raise + except Exception as e: + logger.error(f'❌ Scheduler 同步监听异常: {e},5秒后重试...') + await pubsub.close() + await asyncio.sleep(5) + finally: + try: + await pubsub.close() + except Exception: + pass + + @classmethod + async def _execute_async_job_with_log( + cls, job_func: Callable[..., Any], job_info: JobModel, args: list, kwargs: dict + ) -> None: + """ + 执行异步任务并记录日志 + + :param job_func: 任务函数 + :param job_info: 任务对象信息 + :param args: 位置参数 + :param kwargs: 关键字参数 + :return: None + """ + status = '0' + exception_info = '' + job_executor = job_info.job_executor + if iscoroutinefunction(job_func): + job_executor = 'default' + try: + await job_func(*args, **kwargs) + except Exception as e: + status = '1' + exception_info = str(e) + logger.error(f'❌ 异步执行任务 {job_info.job_name} 失败: {e}') + finally: + cls._record_job_execution_log(job_info, job_executor, status, exception_info) + + @classmethod + def _record_job_execution_log(cls, job_info: JobModel, job_executor: str, status: str, exception_info: str) -> None: + """ + 记录任务执行日志(用于非 Leader Worker 直接执行任务时) + + :param job_info: 任务对象信息 + :param job_executor: 任务执行器 + :param status: 执行状态 0-成功 1-失败 + :param exception_info: 异常信息 + :return: None + """ + try: + job_args = job_info.job_args if job_info.job_args else '' + job_kwargs = job_info.job_kwargs if job_info.job_kwargs else '{}' + job_trigger = str(MyCronTrigger.from_crontab(job_info.cron_expression)) if job_info.cron_expression else '' + job_message = ( + f'事件类型: DirectExecution(非Leader), 任务ID: {job_info.job_id}, ' + f'任务名称: {job_info.job_name}, 执行于{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}' + ) + job_log = JobLogModel( + jobName=job_info.job_name, + jobGroup=job_info.job_group, + jobExecutor=job_executor, + invokeTarget=job_info.invoke_target, + jobArgs=job_args, + jobKwargs=job_kwargs, + jobTrigger=job_trigger, + jobMessage=job_message, + status=status, + exceptionInfo=exception_info, + createTime=datetime.now(), + ) + session = cls._get_session_local()() + try: + JobLogService.add_job_log_services(session, job_log) + finally: + session.close() + except Exception as e: + logger.error(f'❌ 记录任务执行日志失败: {e}') + + @classmethod + def _prepare_scheduler_job_add(cls, job_info: JobModel) -> dict[str, Any]: + """ + 构建调度器任务参数 + + :param job_info: 任务对象信息 + :return: 调度器任务参数 + """ + job_func = cls._import_function(job_info.invoke_target) + job_executor = job_info.job_executor + if iscoroutinefunction(job_func): + job_executor = 'default' + return { + 'func': job_func, + 'trigger': MyCronTrigger.from_crontab(job_info.cron_expression), + 'args': job_info.job_args.split(',') if job_info.job_args else None, + 'kwargs': json.loads(job_info.job_kwargs) if job_info.job_kwargs else None, + 'id': str(job_info.job_id), + 'name': job_info.job_name, + 'misfire_grace_time': 1000000000000 if job_info.misfire_policy == '3' else None, + 'coalesce': job_info.misfire_policy == '2', + 'max_instances': 3 if job_info.concurrent == '0' else 1, + 'jobstore': job_info.job_group, + 'executor': job_executor, + } + + @classmethod + def _add_job_to_scheduler(cls, job_info: JobModel) -> None: + """ + 内部方法:将任务添加到调度器(不检查应用锁状态,仅供内部使用) + + :param job_info: 任务对象信息 + """ + try: + # 先移除已存在的同ID任务 + existing_job = scheduler.get_job(job_id=str(job_info.job_id)) + if existing_job: + scheduler.remove_job(job_id=str(job_info.job_id)) + scheduler.add_job(**cls._prepare_scheduler_job_add(job_info)) + except Exception as e: + logger.error(f'❌ 添加任务 {job_info.job_name} 失败: {e}') + @classmethod async def close_system_scheduler(cls) -> None: """ @@ -151,8 +722,46 @@ async def close_system_scheduler(cls) -> None: :return: """ - scheduler.shutdown() - logger.info('✅️ 关闭定时任务成功') + if cls._sync_listener_task: + cls._sync_listener_task.cancel() + try: + await cls._sync_listener_task + except asyncio.CancelledError: + pass + cls._sync_listener_task = None + if cls._sync_task: + cls._sync_task.cancel() + try: + await cls._sync_task + except asyncio.CancelledError: + pass + cls._sync_task = None + cls._sync_pending = False + if cls._reacquire_task: + cls._reacquire_task.cancel() + try: + await cls._reacquire_task + except asyncio.CancelledError: + pass + cls._reacquire_task = None + await cls._dispose_sync_async_engine() + cls._dispose_sync_engines() + if cls._lock_lost_task: + cls._lock_lost_task.cancel() + try: + await cls._lock_lost_task + except asyncio.CancelledError: + pass + cls._lock_lost_task = None + if getattr(scheduler, 'running', False): + scheduler.shutdown() + logger.info('✅️ 关闭定时任务成功') + # 释放锁 + if cls._redis: + current_holder = await cls._redis.get(LockConstant.APP_STARTUP_LOCK_KEY) + if current_holder == cls._worker_id: + await cls._redis.delete(LockConstant.APP_STARTUP_LOCK_KEY) + logger.info(f'🔓 Worker {cls._worker_id} 释放 Application 锁') @classmethod def _import_function(cls, func_path: str) -> Callable[..., Any]: @@ -186,23 +795,10 @@ def add_scheduler_job(cls, job_info: JobModel) -> None: :param job_info: 任务对象信息 :return: """ - job_func = cls._import_function(job_info.invoke_target) - job_executor = job_info.job_executor - if iscoroutinefunction(job_func): - job_executor = 'default' - scheduler.add_job( - func=job_func, - trigger=MyCronTrigger.from_crontab(job_info.cron_expression), - args=job_info.job_args.split(',') if job_info.job_args else None, - kwargs=json.loads(job_info.job_kwargs) if job_info.job_kwargs else None, - id=str(job_info.job_id), - name=job_info.job_name, - misfire_grace_time=1000000000000 if job_info.misfire_policy == '3' else None, - coalesce=job_info.misfire_policy == '2', - max_instances=3 if job_info.concurrent == '0' else 1, - jobstore=job_info.job_group, - executor=job_executor, - ) + # 非应用锁 worker 跳过操作(数据库状态是持久化的,持有应用锁时会加载) + if not cls._is_leader: + return + scheduler.add_job(**cls._prepare_scheduler_job_add(job_info)) @classmethod def execute_scheduler_job_once(cls, job_info: JobModel) -> None: @@ -216,6 +812,30 @@ def execute_scheduler_job_once(cls, job_info: JobModel) -> None: job_executor = job_info.job_executor if iscoroutinefunction(job_func): job_executor = 'default' + + # 非应用锁 worker:直接执行函数(不通过 scheduler) + if not cls._is_leader: + logger.info(f'📍 当前 Worker 未持有 Application 锁,直接执行任务 {job_info.job_name}') + args = job_info.job_args.split(',') if job_info.job_args else [] + kwargs = json.loads(job_info.job_kwargs) if job_info.job_kwargs else {} + status = '0' + exception_info = '' + try: + if iscoroutinefunction(job_func): + asyncio.create_task(cls._execute_async_job_with_log(job_func, job_info, args, kwargs)) # noqa: RUF006 + else: + job_func(*args, **kwargs) + except Exception as e: + status = '1' + exception_info = str(e) + logger.error(f'❌ 直接执行任务 {job_info.job_name} 失败: {e}') + finally: + # 同步任务记录日志(异步任务在 _execute_async_job_with_log 中记录) + if not iscoroutinefunction(job_func): + cls._record_job_execution_log(job_info, job_executor, status, exception_info) + return + + # 应用锁 worker:通过 scheduler 执行 job_trigger = DateTrigger() if job_info.status == '0': job_trigger = OrTrigger(triggers=[DateTrigger(), MyCronTrigger.from_crontab(job_info.cron_expression)]) @@ -241,54 +861,70 @@ def remove_scheduler_job(cls, job_id: str | int) -> None: :param job_id: 任务id :return: """ + # 非应用锁 worker 跳过操作(数据库状态是持久化的,持有应用锁时会根据状态加载) + if not cls._is_leader: + return query_job = cls.get_scheduler_job(job_id=job_id) if query_job: scheduler.remove_job(job_id=str(job_id)) @classmethod def scheduler_event_listener(cls, event: SchedulerEvent) -> None: - # 获取事件类型和任务ID - event_type = event.__class__.__name__ - # 获取任务执行异常信息 - status = '0' - exception_info = '' - if event_type == 'JobExecutionEvent' and event.exception: - exception_info = str(event.exception) - status = '1' - if hasattr(event, 'job_id'): - job_id = event.job_id - query_job = cls.get_scheduler_job(job_id=job_id) - if query_job: - query_job_info = query_job.__getstate__() - # 获取任务名称 - job_name = query_job_info.get('name') - # 获取任务组名 - job_group = query_job._jobstore_alias - # 获取任务执行器 - job_executor = query_job_info.get('executor') - # 获取调用目标字符串 - invoke_target = query_job_info.get('func') - # 获取调用函数位置参数 - job_args = ','.join(query_job_info.get('args')) - # 获取调用函数关键字参数 - job_kwargs = json.dumps(query_job_info.get('kwargs')) - # 获取任务触发器 - job_trigger = str(query_job_info.get('trigger')) - # 构造日志消息 - job_message = f'事件类型: {event_type}, 任务ID: {job_id}, 任务名称: {job_name}, 执行于{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}' - job_log = JobLogModel( - jobName=job_name, - jobGroup=job_group, - jobExecutor=job_executor, - invokeTarget=invoke_target, - jobArgs=job_args, - jobKwargs=job_kwargs, - jobTrigger=job_trigger, - jobMessage=job_message, - status=status, - exceptionInfo=exception_info, - createTime=datetime.now(), - ) - session = SessionLocal() - JobLogService.add_job_log_services(session, job_log) - session.close() + """ + 调度器事件监听器,记录任务执行日志 + """ + try: + # 获取事件类型和任务ID + event_type = event.__class__.__name__ + # 获取任务执行异常信息 + status = '0' + exception_info = '' + if event_type == 'JobExecutionEvent' and event.exception: + exception_info = str(event.exception) + status = '1' + if hasattr(event, 'job_id'): + job_id = event.job_id + # 跳过内部系统任务(以 _ 开头的任务ID),不记录日志 + if str(job_id).startswith('_'): + return + query_job = cls.get_scheduler_job(job_id=job_id) + if query_job: + query_job_info = query_job.__getstate__() + # 获取任务名称 + job_name = query_job_info.get('name') + # 获取任务组名 + job_group = query_job._jobstore_alias + # 获取任务执行器 + job_executor = query_job_info.get('executor') + # 获取调用目标字符串 + invoke_target = query_job_info.get('func') + # 获取调用函数位置参数(安全处理) + args = query_job_info.get('args') + job_args = ','.join(str(arg) for arg in args) if args else '' + # 获取调用函数关键字参数 + kwargs = query_job_info.get('kwargs') + job_kwargs = json.dumps(kwargs) if kwargs else '{}' + # 获取任务触发器 + job_trigger = str(query_job_info.get('trigger')) + # 构造日志消息 + job_message = f'事件类型: {event_type}, 任务ID: {job_id}, 任务名称: {job_name}, 执行于{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}' + job_log = JobLogModel( + jobName=job_name, + jobGroup=job_group, + jobExecutor=job_executor, + invokeTarget=invoke_target, + jobArgs=job_args, + jobKwargs=job_kwargs, + jobTrigger=job_trigger, + jobMessage=job_message, + status=status, + exceptionInfo=exception_info, + createTime=datetime.now(), + ) + session = cls._get_session_local()() + try: + JobLogService.add_job_log_services(session, job_log) + finally: + session.close() + except Exception as e: + logger.error(f'❌ 调度任务事件监听器异常: {e}') diff --git a/ruoyi-fastapi-backend/module_admin/dao/job_dao.py b/ruoyi-fastapi-backend/module_admin/dao/job_dao.py index 512f87c..52df8e3 100644 --- a/ruoyi-fastapi-backend/module_admin/dao/job_dao.py +++ b/ruoyi-fastapi-backend/module_admin/dao/job_dao.py @@ -97,6 +97,18 @@ async def get_job_list_for_scheduler(cls, db: AsyncSession) -> Sequence[SysJob]: return job_list + @classmethod + async def get_all_job_list_for_scheduler(cls, db: AsyncSession) -> Sequence[SysJob]: + """ + 获取全部定时任务列表信息 + + :param db: orm对象 + :return: 定时任务列表信息对象 + """ + job_list = (await db.execute(select(SysJob).distinct())).scalars().all() + + return job_list + @classmethod async def add_job_dao(cls, db: AsyncSession, job: JobModel) -> SysJob: """ diff --git a/ruoyi-fastapi-backend/module_admin/service/job_service.py b/ruoyi-fastapi-backend/module_admin/service/job_service.py index b785e4a..5e73135 100644 --- a/ruoyi-fastapi-backend/module_admin/service/job_service.py +++ b/ruoyi-fastapi-backend/module_admin/service/job_service.py @@ -83,6 +83,7 @@ async def add_job_services(cls, query_db: AsyncSession, page_object: JobModel) - if job_info.status == '0': SchedulerUtil.add_scheduler_job(job_info=job_info) await query_db.commit() + await SchedulerUtil.request_scheduler_sync() result = {'is_success': True, 'message': '新增成功'} except Exception as e: await query_db.rollback() @@ -144,6 +145,7 @@ async def edit_job_services(cls, query_db: AsyncSession, page_object: EditJobMod job_info = await cls.job_detail_services(query_db, edit_job.get('job_id')) SchedulerUtil.add_scheduler_job(job_info=job_info) await query_db.commit() + await SchedulerUtil.request_scheduler_sync() return CrudResponseModel(is_success=True, message='更新成功') except Exception as e: await query_db.rollback() @@ -183,6 +185,7 @@ async def delete_job_services(cls, query_db: AsyncSession, page_object: DeleteJo await JobDao.delete_job_dao(query_db, JobModel(jobId=job_id)) SchedulerUtil.remove_scheduler_job(job_id=job_id) await query_db.commit() + await SchedulerUtil.request_scheduler_sync() return CrudResponseModel(is_success=True, message='删除成功') except Exception as e: await query_db.rollback() diff --git a/ruoyi-fastapi-backend/server.py b/ruoyi-fastapi-backend/server.py index 42bc4bd..d463d9d 100644 --- a/ruoyi-fastapi-backend/server.py +++ b/ruoyi-fastapi-backend/server.py @@ -1,61 +1,134 @@ +import asyncio from collections.abc import AsyncGenerator from contextlib import asynccontextmanager from fastapi import FastAPI +from common.constant import LockConstant from common.router import auto_register_routers from config.env import AppConfig -from config.get_db import init_create_table +from config.get_db import close_async_engine, init_create_table from config.get_redis import RedisUtil from config.get_scheduler import SchedulerUtil from exceptions.handle import handle_exception from middlewares.handle import handle_middleware +from module_admin.service.log_service import LogAggregatorService from sub_applications.handle import handle_sub_applications from utils.common_util import worship from utils.log_util import logger -from utils.server_util import APIDocsUtil, IPUtil +from utils.server_util import APIDocsUtil, IPUtil, StartupUtil + + +async def _start_background_tasks(app: FastAPI) -> None: + """ + 启动应用后台任务 + + :param app: FastAPI对象 + :return: None + """ + await SchedulerUtil.init_system_scheduler(app.state.redis) + app.state.log_aggregator_task = asyncio.create_task(LogAggregatorService.consume_stream(app.state.redis)) + + +async def _stop_background_tasks(app: FastAPI) -> None: + """ + 停止应用后台任务并释放资源 + + :param app: FastAPI对象 + :return: None + """ + log_task = getattr(app.state, 'log_aggregator_task', None) + if log_task: + log_task.cancel() + try: + await log_task + except asyncio.CancelledError: + pass + lock_task = getattr(app.state, 'lock_renewal_task', None) + if lock_task: + lock_task.cancel() + try: + await lock_task + except asyncio.CancelledError: + pass + await RedisUtil.close_redis_pool(app) + await SchedulerUtil.close_system_scheduler() + await close_async_engine() # 生命周期事件 @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: - logger.info(f'⏰️ {AppConfig.app_name}开始启动') - worship() - await init_create_table() - app.state.redis = await RedisUtil.create_redis_pool() - await RedisUtil.init_sys_dict(app.state.redis) - await RedisUtil.init_sys_config(app.state.redis) - await SchedulerUtil.init_system_scheduler() - logger.info(f'🚀 {AppConfig.app_name}启动成功') - host = AppConfig.app_host - port = AppConfig.app_port - if host == '0.0.0.0': - local_ip = IPUtil.get_local_ip() - network_ips = IPUtil.get_network_ips() - else: - local_ip = host - network_ips = [host] - - app_links = [f'🏠 Local: http://{local_ip}:{port}'] - app_links.extend(f'📡 Network: http://{ip}:{port}' for ip in network_ips) - logger.opt(colors=True).info('💻 应用地址:\n' + '\n'.join(app_links)) - - if not AppConfig.app_disable_swagger: - swagger_links = [f'🏠 Local: http://{local_ip}:{port}{APIDocsUtil.docs_url()}'] - swagger_links.extend( - f'📡 Network: http://{ip}:{port}{APIDocsUtil.docs_url()}' for ip in network_ips - ) - logger.opt(colors=True).info('📄 Swagger文档:\n' + '\n'.join(swagger_links)) + """ + 应用生命周期管理 + + :param app: FastAPI对象 + :return: None + """ + app.state.redis = await RedisUtil.create_redis_pool(log_enabled=False) + startup_log_enabled = await StartupUtil.acquire_startup_log_gate( + redis=app.state.redis, + lock_key=LockConstant.APP_STARTUP_LOCK_KEY, + worker_id=SchedulerUtil._worker_id, + lock_expire_seconds=LockConstant.LOCK_EXPIRE_SECONDS, + ) + app.state.startup_log_enabled = startup_log_enabled - if not AppConfig.app_disable_redoc: - redoc_links = [f'🏠 Local: http://{local_ip}:{port}{APIDocsUtil.redoc_url()}'] - redoc_links.extend( - f'📡 Network: http://{ip}:{port}{APIDocsUtil.redoc_url()}' for ip in network_ips + # 获取锁成功后立即启动锁续期任务,避免初始化时间过长导致锁过期 + if startup_log_enabled: + app.state.lock_renewal_task = StartupUtil.start_lock_renewal( + redis=app.state.redis, + lock_key=LockConstant.APP_STARTUP_LOCK_KEY, + worker_id=SchedulerUtil._worker_id, + lock_expire_seconds=LockConstant.LOCK_EXPIRE_SECONDS, + interval_seconds=LockConstant.LOCK_RENEWAL_INTERVAL, + on_lock_lost=SchedulerUtil.on_lock_lost, ) - logger.opt(colors=True).info('📚 ReDoc文档:\n' + '\n'.join(redoc_links)) + + with logger.contextualize(startup_phase=True, startup_log_enabled=startup_log_enabled): + logger.info(f'⏰️ {AppConfig.app_name}开始启动') + if startup_log_enabled: + worship() + await init_create_table() + await RedisUtil.check_redis_connection(app.state.redis, log_enabled=startup_log_enabled) + await RedisUtil.init_sys_dict(app.state.redis) + await RedisUtil.init_sys_config(app.state.redis) + await _start_background_tasks(app) + + if startup_log_enabled: + # 短暂等待确保下面的启动日志在最后打印 + await asyncio.sleep(0.5) + logger.info(f'🚀 {AppConfig.app_name}启动成功') + host = AppConfig.app_host + port = AppConfig.app_port + if host == '0.0.0.0': + local_ip = IPUtil.get_local_ip() + network_ips = IPUtil.get_network_ips() + else: + local_ip = host + network_ips = [host] + + app_links = [f'🏠 Local: http://{local_ip}:{port}'] + app_links.extend(f'📡 Network: http://{ip}:{port}' for ip in network_ips) + logger.opt(colors=True).info('💻 应用地址:\n' + '\n'.join(app_links)) + + if not AppConfig.app_disable_swagger: + swagger_links = [f'🏠 Local: http://{local_ip}:{port}{APIDocsUtil.docs_url()}'] + swagger_links.extend( + f'📡 Network: http://{ip}:{port}{APIDocsUtil.docs_url()}' for ip in network_ips + ) + logger.opt(colors=True).info('📄 Swagger文档:\n' + '\n'.join(swagger_links)) + + if not AppConfig.app_disable_redoc: + redoc_links = [f'🏠 Local: http://{local_ip}:{port}{APIDocsUtil.redoc_url()}'] + redoc_links.extend( + f'📡 Network: http://{ip}:{port}{APIDocsUtil.redoc_url()}' for ip in network_ips + ) + logger.opt(colors=True).info('📚 ReDoc文档:\n' + '\n'.join(redoc_links)) yield - await RedisUtil.close_redis_pool(app) - await SchedulerUtil.close_system_scheduler() + shutdown_log_enabled = getattr(app.state, 'startup_log_enabled', False) + with logger.contextualize(startup_phase=True, startup_log_enabled=shutdown_log_enabled): + await _stop_background_tasks(app) def create_app() -> FastAPI: diff --git a/ruoyi-fastapi-backend/utils/server_util.py b/ruoyi-fastapi-backend/utils/server_util.py index 873fe89..b868685 100644 --- a/ruoyi-fastapi-backend/utils/server_util.py +++ b/ruoyi-fastapi-backend/utils/server_util.py @@ -1,5 +1,8 @@ +import asyncio import ipaddress +import os import socket +import uuid from collections.abc import Callable import psutil @@ -7,6 +10,7 @@ from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html, get_swagger_ui_oauth2_redirect_html from fastapi.openapi.utils import get_openapi from fastapi.responses import HTMLResponse, JSONResponse +from redis import asyncio as aioredis from config.env import AppConfig @@ -333,6 +337,93 @@ def _register_docs_routes( app.add_route(url, redoc_handler, include_in_schema=False) +class StartupUtil: + """ + 启动门禁工具类 + """ + + @classmethod + async def acquire_startup_log_gate( + cls, redis: aioredis.Redis, lock_key: str, worker_id: str, lock_expire_seconds: int + ) -> bool: + """ + 获取启动日志门禁 + + :param redis: Redis连接对象 + :param lock_key: 分布式锁key + :param worker_id: 当前worker标识 + :param lock_expire_seconds: 锁过期时间 + :return: 是否获得启动日志输出权 + """ + acquired = await redis.set(lock_key, worker_id, nx=True, ex=lock_expire_seconds) + if acquired: + return True + current_holder = await redis.get(lock_key) + return current_holder == worker_id + + @classmethod + def start_lock_renewal( + cls, + redis: aioredis.Redis, + lock_key: str, + worker_id: str, + lock_expire_seconds: int, + interval_seconds: int, + on_lock_lost: Callable[[], None] | None = None, + ) -> asyncio.Task: + """ + 启动分布式锁续期任务 + + :param redis: Redis连接对象 + :param lock_key: 分布式锁key + :param worker_id: 当前worker标识 + :param lock_expire_seconds: 锁过期时间 + :param interval_seconds: 续期间隔时间 + :param on_lock_lost: 失去锁时的回调 + :return: 异步任务对象 + """ + + async def _loop() -> None: + while True: + try: + current_holder = await redis.get(lock_key) + if current_holder == worker_id: + await redis.expire(lock_key, lock_expire_seconds) + await asyncio.sleep(interval_seconds) + continue + if on_lock_lost: + on_lock_lost() + break + except Exception: + await asyncio.sleep(interval_seconds) + + return asyncio.create_task(_loop()) + + +class WorkerIdUtil: + """ + Worker标识生成工具类 + """ + + _worker_id: str | None = None + + @classmethod + def get_worker_id(cls, configured_worker_id: str | None) -> str: + """ + 获取当前worker标识 + + :param configured_worker_id: 配置的worker标识 + :return: 当前worker标识 + """ + if cls._worker_id: + return cls._worker_id + worker_id = configured_worker_id + if not worker_id or worker_id.lower() == 'auto': + worker_id = f'{os.getpid()}-{uuid.uuid4().hex[:6]}' + cls._worker_id = worker_id + return worker_id + + class IPUtil: """ IP工具类 From 0b6b041f54a94aeb55c3d29914418fc3650f1f7b Mon Sep 17 00:00:00 2001 From: insistence <3055204202@qq.com> Date: Fri, 6 Feb 2026 09:07:04 +0800 Subject: [PATCH 2/4] =?UTF-8?q?refactor:=20=E9=87=8D=E6=9E=84=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E7=B3=BB=E7=BB=9F=E4=BB=A5=E6=94=AF=E6=8C=81=E5=A4=9A?= =?UTF-8?q?worker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ruoyi-fastapi-backend/.env.dev | 50 +++- ruoyi-fastapi-backend/.env.dockermy | 50 +++- ruoyi-fastapi-backend/.env.dockerpg | 50 +++- ruoyi-fastapi-backend/.env.prod | 50 +++- .../common/annotation/log_annotation.py | 9 +- ruoyi-fastapi-backend/config/env.py | 39 +++ ruoyi-fastapi-backend/config/get_redis.py | 39 ++- .../middlewares/trace_middleware/ctx.py | 52 +++- .../middlewares/trace_middleware/middle.py | 6 +- .../middlewares/trace_middleware/span.py | 10 +- .../module_admin/service/log_service.py | 230 ++++++++++++++++++ ruoyi-fastapi-backend/utils/log_util.py | 213 +++++++++++++--- 12 files changed, 743 insertions(+), 55 deletions(-) diff --git a/ruoyi-fastapi-backend/.env.dev b/ruoyi-fastapi-backend/.env.dev index 0048481..bcda2e8 100644 --- a/ruoyi-fastapi-backend/.env.dev +++ b/ruoyi-fastapi-backend/.env.dev @@ -13,6 +13,8 @@ APP_PORT = 9099 APP_VERSION= '1.8.1' # 应用是否开启热重载 APP_RELOAD = true +# 应用工作进程数 +APP_WORKERS = 1 # 应用是否开启IP归属区域查询 APP_IP_LOCATION_QUERY = true # 应用是否允许账号同时登录 @@ -67,4 +69,50 @@ REDIS_USERNAME = '' # Redis密码 REDIS_PASSWORD = '' # Redis数据库 -REDIS_DATABASE = 2 \ No newline at end of file +REDIS_DATABASE = 2 + +# -------- 日志配置 -------- +# Redis Stream Key +LOG_STREAM_KEY = 'log:stream' +# Redis Stream 消费组名称 +LOG_STREAM_GROUP = 'log_aggregator' +# Redis Stream 消费者名称前缀 +LOG_STREAM_CONSUMER_PREFIX = 'worker' +# 每次读取的最大消息数量 +LOG_STREAM_BATCH_SIZE = 100 +# 阻塞读取等待时间(毫秒) +LOG_STREAM_BLOCK_MS = 2000 +# Stream 最大长度(近似裁剪) +LOG_STREAM_MAXLEN = 100000 +# Pending 回收最小空闲时间(毫秒) +LOG_STREAM_CLAIM_IDLE_MS = 60000 +# Pending 回收检查间隔(毫秒) +LOG_STREAM_CLAIM_INTERVAL_MS = 5000 +# 每次回收的最大消息数量 +LOG_STREAM_CLAIM_BATCH_SIZE = 100 +# 去重 Key 过期时间(秒) +LOG_STREAM_DEDUP_TTL = 3600 +# 去重 Key 前缀 +LOG_STREAM_DEDUP_PREFIX = 'log:dedup' +# stdout 输出是否为 JSON +LOGURU_JSON = false +# Loguru 最低输出级别 +LOGURU_LEVEL = 'INFO' +# 是否输出到 stdout +LOGURU_STDOUT = true +# 是否启用文件日志 +LOG_FILE_ENABLED = true +# 文件日志根目录 +LOG_FILE_BASE_DIR = 'logs' +# 文件滚动策略 +LOGURU_ROTATION = '50MB' +# 文件保留策略 +LOGURU_RETENTION = '30 days' +# 文件压缩格式 +LOGURU_COMPRESSION = 'zip' +# 实例标识(用于区分实例) +LOG_INSTANCE_ID = 'dev' +# 服务名称(用于统一标识服务) +LOG_SERVICE_NAME = 'ruoyi-fastapi-backend' +# Worker 标识(auto 自动生成) +LOG_WORKER_ID = 'auto' diff --git a/ruoyi-fastapi-backend/.env.dockermy b/ruoyi-fastapi-backend/.env.dockermy index 7c0796e..008861c 100644 --- a/ruoyi-fastapi-backend/.env.dockermy +++ b/ruoyi-fastapi-backend/.env.dockermy @@ -13,6 +13,8 @@ APP_PORT = 9099 APP_VERSION= '1.8.1' # 应用是否开启热重载 APP_RELOAD = false +# 应用工作进程数 +APP_WORKERS = 1 # 应用是否开启IP归属区域查询 APP_IP_LOCATION_QUERY = true # 应用是否允许账号同时登录 @@ -67,4 +69,50 @@ REDIS_USERNAME = '' # Redis密码 REDIS_PASSWORD = '' # Redis数据库 -REDIS_DATABASE = 2 \ No newline at end of file +REDIS_DATABASE = 2 + +# -------- 日志配置 -------- +# Redis Stream Key +LOG_STREAM_KEY = 'log:stream' +# Redis Stream 消费组名称 +LOG_STREAM_GROUP = 'log_aggregator' +# Redis Stream 消费者名称前缀 +LOG_STREAM_CONSUMER_PREFIX = 'worker' +# 每次读取的最大消息数量 +LOG_STREAM_BATCH_SIZE = 100 +# 阻塞读取等待时间(毫秒) +LOG_STREAM_BLOCK_MS = 2000 +# Stream 最大长度(近似裁剪) +LOG_STREAM_MAXLEN = 100000 +# Pending 回收最小空闲时间(毫秒) +LOG_STREAM_CLAIM_IDLE_MS = 60000 +# Pending 回收检查间隔(毫秒) +LOG_STREAM_CLAIM_INTERVAL_MS = 5000 +# 每次回收的最大消息数量 +LOG_STREAM_CLAIM_BATCH_SIZE = 100 +# 去重 Key 过期时间(秒) +LOG_STREAM_DEDUP_TTL = 3600 +# 去重 Key 前缀 +LOG_STREAM_DEDUP_PREFIX = 'log:dedup' +# stdout 输出是否为 JSON +LOGURU_JSON = false +# Loguru 最低输出级别 +LOGURU_LEVEL = 'INFO' +# 是否输出到 stdout +LOGURU_STDOUT = true +# 是否启用文件日志 +LOG_FILE_ENABLED = true +# 文件日志根目录 +LOG_FILE_BASE_DIR = 'logs' +# 文件滚动策略 +LOGURU_ROTATION = '50MB' +# 文件保留策略 +LOGURU_RETENTION = '30 days' +# 文件压缩格式 +LOGURU_COMPRESSION = 'zip' +# 实例标识(用于区分实例) +LOG_INSTANCE_ID = 'dockermy' +# 服务名称(用于统一标识服务) +LOG_SERVICE_NAME = 'ruoyi-fastapi-backend' +# Worker 标识(auto 自动生成) +LOG_WORKER_ID = 'auto' diff --git a/ruoyi-fastapi-backend/.env.dockerpg b/ruoyi-fastapi-backend/.env.dockerpg index e254e15..a64b08a 100644 --- a/ruoyi-fastapi-backend/.env.dockerpg +++ b/ruoyi-fastapi-backend/.env.dockerpg @@ -13,6 +13,8 @@ APP_PORT = 9099 APP_VERSION= '1.8.1' # 应用是否开启热重载 APP_RELOAD = false +# 应用工作进程数 +APP_WORKERS = 1 # 应用是否开启IP归属区域查询 APP_IP_LOCATION_QUERY = true # 应用是否允许账号同时登录 @@ -67,4 +69,50 @@ REDIS_USERNAME = '' # Redis密码 REDIS_PASSWORD = '' # Redis数据库 -REDIS_DATABASE = 2 \ No newline at end of file +REDIS_DATABASE = 2 + +# -------- 日志配置 -------- +# Redis Stream Key +LOG_STREAM_KEY = 'log:stream' +# Redis Stream 消费组名称 +LOG_STREAM_GROUP = 'log_aggregator' +# Redis Stream 消费者名称前缀 +LOG_STREAM_CONSUMER_PREFIX = 'worker' +# 每次读取的最大消息数量 +LOG_STREAM_BATCH_SIZE = 100 +# 阻塞读取等待时间(毫秒) +LOG_STREAM_BLOCK_MS = 2000 +# Stream 最大长度(近似裁剪) +LOG_STREAM_MAXLEN = 100000 +# Pending 回收最小空闲时间(毫秒) +LOG_STREAM_CLAIM_IDLE_MS = 60000 +# Pending 回收检查间隔(毫秒) +LOG_STREAM_CLAIM_INTERVAL_MS = 5000 +# 每次回收的最大消息数量 +LOG_STREAM_CLAIM_BATCH_SIZE = 100 +# 去重 Key 过期时间(秒) +LOG_STREAM_DEDUP_TTL = 3600 +# 去重 Key 前缀 +LOG_STREAM_DEDUP_PREFIX = 'log:dedup' +# stdout 输出是否为 JSON +LOGURU_JSON = false +# Loguru 最低输出级别 +LOGURU_LEVEL = 'INFO' +# 是否输出到 stdout +LOGURU_STDOUT = true +# 是否启用文件日志 +LOG_FILE_ENABLED = true +# 文件日志根目录 +LOG_FILE_BASE_DIR = 'logs' +# 文件滚动策略 +LOGURU_ROTATION = '50MB' +# 文件保留策略 +LOGURU_RETENTION = '30 days' +# 文件压缩格式 +LOGURU_COMPRESSION = 'zip' +# 实例标识(用于区分实例) +LOG_INSTANCE_ID = 'dockerpg' +# 服务名称(用于统一标识服务) +LOG_SERVICE_NAME = 'ruoyi-fastapi-backend' +# Worker 标识(auto 自动生成) +LOG_WORKER_ID = 'auto' diff --git a/ruoyi-fastapi-backend/.env.prod b/ruoyi-fastapi-backend/.env.prod index 5237abc..746e35a 100644 --- a/ruoyi-fastapi-backend/.env.prod +++ b/ruoyi-fastapi-backend/.env.prod @@ -13,6 +13,8 @@ APP_PORT = 9099 APP_VERSION= '1.8.1' # 应用是否开启热重载 APP_RELOAD = false +# 应用工作进程数 +APP_WORKERS = 1 # 应用是否开启IP归属区域查询 APP_IP_LOCATION_QUERY = true # 应用是否允许账号同时登录 @@ -67,4 +69,50 @@ REDIS_USERNAME = '' # Redis密码 REDIS_PASSWORD = '' # Redis数据库 -REDIS_DATABASE = 2 \ No newline at end of file +REDIS_DATABASE = 2 + +# -------- 日志配置 -------- +# Redis Stream Key +LOG_STREAM_KEY = 'log:stream' +# Redis Stream 消费组名称 +LOG_STREAM_GROUP = 'log_aggregator' +# Redis Stream 消费者名称前缀 +LOG_STREAM_CONSUMER_PREFIX = 'worker' +# 每次读取的最大消息数量 +LOG_STREAM_BATCH_SIZE = 100 +# 阻塞读取等待时间(毫秒) +LOG_STREAM_BLOCK_MS = 2000 +# Stream 最大长度(近似裁剪) +LOG_STREAM_MAXLEN = 100000 +# Pending 回收最小空闲时间(毫秒) +LOG_STREAM_CLAIM_IDLE_MS = 60000 +# Pending 回收检查间隔(毫秒) +LOG_STREAM_CLAIM_INTERVAL_MS = 5000 +# 每次回收的最大消息数量 +LOG_STREAM_CLAIM_BATCH_SIZE = 100 +# 去重 Key 过期时间(秒) +LOG_STREAM_DEDUP_TTL = 3600 +# 去重 Key 前缀 +LOG_STREAM_DEDUP_PREFIX = 'log:dedup' +# stdout 输出是否为 JSON +LOGURU_JSON = false +# Loguru 最低输出级别 +LOGURU_LEVEL = 'INFO' +# 是否输出到 stdout +LOGURU_STDOUT = true +# 是否启用文件日志 +LOG_FILE_ENABLED = true +# 文件日志根目录 +LOG_FILE_BASE_DIR = 'logs' +# 文件滚动策略 +LOGURU_ROTATION = '50MB' +# 文件保留策略 +LOGURU_RETENTION = '30 days' +# 文件压缩格式 +LOGURU_COMPRESSION = 'zip' +# 实例标识(用于区分实例) +LOG_INSTANCE_ID = 'prod' +# 服务名称(用于统一标识服务) +LOG_SERVICE_NAME = 'ruoyi-fastapi-backend' +# Worker 标识(auto 自动生成) +LOG_WORKER_ID = 'auto' diff --git a/ruoyi-fastapi-backend/common/annotation/log_annotation.py b/ruoyi-fastapi-backend/common/annotation/log_annotation.py index 1ddbe64..1e9ad17 100644 --- a/ruoyi-fastapi-backend/common/annotation/log_annotation.py +++ b/ruoyi-fastapi-backend/common/annotation/log_annotation.py @@ -10,7 +10,6 @@ from async_lru import alru_cache from fastapi import Request from fastapi.responses import JSONResponse, ORJSONResponse, UJSONResponse -from sqlalchemy.ext.asyncio import AsyncSession from starlette.status import HTTP_200_OK from typing_extensions import ParamSpec from user_agents import parse @@ -20,7 +19,7 @@ from config.env import AppConfig from exceptions.exception import LoginException, ServiceException, ServiceWarning from module_admin.entity.vo.log_vo import LogininforModel, OperLogModel -from module_admin.service.log_service import LoginLogService, OperationLogService +from module_admin.service.log_service import LogQueueService from utils.dependency_util import DependencyUtil from utils.log_util import logger from utils.response_util import ResponseUtil @@ -63,8 +62,6 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: request_name_list = get_function_parameters_name_by_type(func, Request) request = get_function_parameters_value_by_name(func, request_name_list[0], *args, **kwargs) DependencyUtil.check_exclude_routes(request, err_msg='当前路由不在认证规则内,不可使用Log装饰器') - session_name_list = get_function_parameters_name_by_type(func, AsyncSession) - query_db = get_function_parameters_value_by_name(func, session_name_list[0], *args, **kwargs) request_method = request.method user_agent = request.headers.get('User-Agent') # 获取操作类型 @@ -122,7 +119,7 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: } ) - await LoginLogService.add_login_log_services(query_db, LogininforModel(**login_log)) + await LogQueueService.enqueue_login_log(request, LogininforModel(**login_log), func_path) else: current_user = RequestContext.get_current_user() oper_name = current_user.user.user_name @@ -145,7 +142,7 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: operTime=oper_time, costTime=int(cost_time), ) - await OperationLogService.add_operation_log_services(query_db, operation_log) + await LogQueueService.enqueue_operation_log(request, operation_log, func_path) return result diff --git a/ruoyi-fastapi-backend/config/env.py b/ruoyi-fastapi-backend/config/env.py index 518c068..a0b3064 100644 --- a/ruoyi-fastapi-backend/config/env.py +++ b/ruoyi-fastapi-backend/config/env.py @@ -21,6 +21,7 @@ class AppSettings(BaseSettings): app_port: int = 9099 app_version: str = '1.0.0' app_reload: bool = True + app_workers: int = 1 app_ip_location_query: bool = True app_same_time_login: bool = True app_disable_swagger: bool = False @@ -75,6 +76,36 @@ class RedisSettings(BaseSettings): redis_database: int = 2 +class LogSettings(BaseSettings): + """ + 日志与队列配置 + """ + + log_stream_key: str = 'log:stream' + log_stream_group: str = 'log_aggregator' + log_stream_consumer_prefix: str = 'worker' + log_stream_batch_size: int = 100 + log_stream_block_ms: int = 2000 + log_stream_maxlen: int = 100000 + log_stream_claim_idle_ms: int = 60000 + log_stream_claim_interval_ms: int = 5000 + log_stream_claim_batch_size: int = 100 + log_stream_dedup_ttl: int = 3600 + log_stream_dedup_prefix: str = 'log:dedup' + + loguru_json: bool = False + loguru_level: str = 'INFO' + loguru_stdout: bool = True + log_file_enabled: bool = True + log_file_base_dir: str = 'logs' + loguru_rotation: str = '50MB' + loguru_retention: str = '30 days' + loguru_compression: str = 'zip' + log_instance_id: str = 'prod' + log_service_name: str = 'ruoyi-fastapi-backend' + log_worker_id: str = 'auto' + + class GenSettings: """ 代码生成配置 @@ -184,6 +215,12 @@ def get_redis_config(self) -> RedisSettings: # 实例化Redis配置模型 return RedisSettings() + def get_log_config(self) -> LogSettings: + """ + 获取日志配置 + """ + return LogSettings() + def get_gen_config(self) -> GenSettings: """ 获取代码生成配置 @@ -243,6 +280,8 @@ def parse_cli_args() -> None: DataBaseConfig = get_config.get_database_config() # Redis配置 RedisConfig = get_config.get_redis_config() +# 日志配置 +LogConfig = get_config.get_log_config() # 代码生成配置 GenConfig = get_config.get_gen_config() # 上传配置 diff --git a/ruoyi-fastapi-backend/config/get_redis.py b/ruoyi-fastapi-backend/config/get_redis.py index e477784..559c677 100644 --- a/ruoyi-fastapi-backend/config/get_redis.py +++ b/ruoyi-fastapi-backend/config/get_redis.py @@ -16,13 +16,14 @@ class RedisUtil: """ @classmethod - async def create_redis_pool(cls) -> aioredis.Redis: + async def create_redis_pool(cls, log_enabled: bool = True, log_start_enabled: bool | None = None) -> aioredis.Redis: """ 应用启动时初始化redis连接 + :param log_enabled: 是否输出日志 + :param log_start_enabled: 是否输出开始连接日志 :return: Redis连接对象 """ - logger.info('🔎 开始连接redis...') redis = await aioredis.from_url( url=f'redis://{RedisConfig.redis_host}', port=RedisConfig.redis_port, @@ -32,19 +33,45 @@ async def create_redis_pool(cls) -> aioredis.Redis: encoding='utf-8', decode_responses=True, ) + if log_start_enabled is None: + log_start_enabled = log_enabled + if log_enabled or log_start_enabled: + await cls.check_redis_connection(redis, log_enabled=log_enabled, log_start_enabled=log_start_enabled) + return redis + + @classmethod + async def check_redis_connection( + cls, redis: aioredis.Redis, log_enabled: bool = True, log_start_enabled: bool | None = None + ) -> None: + """ + 检查redis连接状态 + + :param redis: redis对象 + :param log_enabled: 是否输出日志 + :param log_start_enabled: 是否输出开始连接日志 + :return: None + """ + if log_start_enabled is None: + log_start_enabled = log_enabled + if log_start_enabled: + logger.info('🔎 开始连接redis...') try: connection = await redis.ping() + if not log_enabled: + return if connection: logger.info('✅️ redis连接成功') else: logger.error('❌️ redis连接失败') except AuthenticationError as e: - logger.error(f'❌️ redis用户名或密码错误,详细错误信息:{e}') + if log_enabled: + logger.error(f'❌️ redis用户名或密码错误,详细错误信息:{e}') except RedisTimeoutError as e: - logger.error(f'❌️ redis连接超时,详细错误信息:{e}') + if log_enabled: + logger.error(f'❌️ redis连接超时,详细错误信息:{e}') except RedisError as e: - logger.error(f'❌️ redis连接错误,详细错误信息:{e}') - return redis + if log_enabled: + logger.error(f'❌️ redis连接错误,详细错误信息:{e}') @classmethod async def close_redis_pool(cls, app: FastAPI) -> None: diff --git a/ruoyi-fastapi-backend/middlewares/trace_middleware/ctx.py b/ruoyi-fastapi-backend/middlewares/trace_middleware/ctx.py index 4f98008..f5461d1 100644 --- a/ruoyi-fastapi-backend/middlewares/trace_middleware/ctx.py +++ b/ruoyi-fastapi-backend/middlewares/trace_middleware/ctx.py @@ -1,16 +1,64 @@ import contextvars from uuid import uuid4 +CTX_TRACE_ID: contextvars.ContextVar[str] = contextvars.ContextVar('trace-id', default='') CTX_REQUEST_ID: contextvars.ContextVar[str] = contextvars.ContextVar('request-id', default='') +CTX_SPAN_ID: contextvars.ContextVar[str] = contextvars.ContextVar('span-id', default='') +CTX_REQUEST_PATH: contextvars.ContextVar[str] = contextvars.ContextVar('request-path', default='') +CTX_REQUEST_METHOD: contextvars.ContextVar[str] = contextvars.ContextVar('request-method', default='') class TraceCtx: @staticmethod - def set_id() -> str: + def set_trace_id() -> str: + _id = uuid4().hex + CTX_TRACE_ID.set(_id) + return _id + + @staticmethod + def get_trace_id() -> str: + return CTX_TRACE_ID.get() + + @staticmethod + def set_request_id() -> str: _id = uuid4().hex CTX_REQUEST_ID.set(_id) return _id @staticmethod - def get_id() -> str: + def get_request_id() -> str: return CTX_REQUEST_ID.get() + + @staticmethod + def set_span_id() -> str: + _id = uuid4().hex + CTX_SPAN_ID.set(_id) + return _id + + @staticmethod + def get_span_id() -> str: + return CTX_SPAN_ID.get() + + @staticmethod + def set_request_path(path: str) -> None: + CTX_REQUEST_PATH.set(path) + + @staticmethod + def get_request_path() -> str: + return CTX_REQUEST_PATH.get() + + @staticmethod + def set_request_method(method: str) -> None: + CTX_REQUEST_METHOD.set(method) + + @staticmethod + def get_request_method() -> str: + return CTX_REQUEST_METHOD.get() + + @staticmethod + def clear() -> None: + CTX_TRACE_ID.set('') + CTX_REQUEST_ID.set('') + CTX_SPAN_ID.set('') + CTX_REQUEST_PATH.set('') + CTX_REQUEST_METHOD.set('') diff --git a/ruoyi-fastapi-backend/middlewares/trace_middleware/middle.py b/ruoyi-fastapi-backend/middlewares/trace_middleware/middle.py index 42cd78c..57a9a28 100644 --- a/ruoyi-fastapi-backend/middlewares/trace_middleware/middle.py +++ b/ruoyi-fastapi-backend/middlewares/trace_middleware/middle.py @@ -2,6 +2,7 @@ from starlette.types import ASGIApp, Message, Receive, Scope, Send +from .ctx import TraceCtx from .span import Span, get_current_span @@ -39,4 +40,7 @@ async def handle_outgoing_request(message: 'Message') -> None: await span.response(message) await send(message) - await self.app(scope, handle_outgoing_receive, handle_outgoing_request) + try: + await self.app(scope, handle_outgoing_receive, handle_outgoing_request) + finally: + TraceCtx.clear() diff --git a/ruoyi-fastapi-backend/middlewares/trace_middleware/span.py b/ruoyi-fastapi-backend/middlewares/trace_middleware/span.py index ef6e46f..ba66000 100644 --- a/ruoyi-fastapi-backend/middlewares/trace_middleware/span.py +++ b/ruoyi-fastapi-backend/middlewares/trace_middleware/span.py @@ -19,7 +19,11 @@ async def request_before(self) -> None: """ request_before: 处理header信息等, 如记录请求体信息 """ - TraceCtx.set_id() + TraceCtx.set_trace_id() + TraceCtx.set_request_id() + TraceCtx.set_span_id() + TraceCtx.set_request_path(self.scope.get('path', '')) + TraceCtx.set_request_method(self.scope.get('method', '')) async def request_after(self, message: Message) -> Message: """ @@ -39,7 +43,9 @@ async def response(self, message: Message) -> Message: pass """ if message['type'] == 'http.response.start': - message['headers'].append((b'request-id', TraceCtx.get_id().encode())) + message['headers'].append((b'request-id', TraceCtx.get_request_id().encode())) + message['headers'].append((b'trace-id', TraceCtx.get_trace_id().encode())) + message['headers'].append((b'span-id', TraceCtx.get_span_id().encode())) return message diff --git a/ruoyi-fastapi-backend/module_admin/service/log_service.py b/ruoyi-fastapi-backend/module_admin/service/log_service.py index e411711..edce948 100644 --- a/ruoyi-fastapi-backend/module_admin/service/log_service.py +++ b/ruoyi-fastapi-backend/module_admin/service/log_service.py @@ -1,10 +1,19 @@ +import asyncio +import hashlib +import json +import os +import uuid from typing import Any from fastapi import Request +from redis import asyncio as aioredis from sqlalchemy.ext.asyncio import AsyncSession from common.vo import CrudResponseModel, PageModel +from config.database import AsyncSessionLocal +from config.env import LogConfig from exceptions.exception import ServiceException +from middlewares.trace_middleware.ctx import TraceCtx from module_admin.dao.log_dao import LoginLogDao, OperationLogDao from module_admin.entity.vo.log_vo import ( DeleteLoginLogModel, @@ -17,6 +26,7 @@ ) from module_admin.service.dict_service import DictDataService from utils.excel_util import ExcelUtil +from utils.log_util import logger class OperationLogService: @@ -261,3 +271,223 @@ async def export_login_log_list_services(login_log_list: list) -> bytes: binary_data = ExcelUtil.export_list2excel(login_log_list, mapping_dict) return binary_data + + +class LogQueueService: + @classmethod + def _build_event_id(cls, request_id: str, log_type: str, source: str) -> str: + """ + 生成日志事件唯一标识 + + :param request_id: 请求唯一标识 + :param log_type: 日志类型 + :param source: 日志来源 + :return: 事件唯一标识 + """ + if not request_id: + return uuid.uuid4().hex + base = f'{request_id}:{log_type}:{source}' + return hashlib.md5(base.encode('utf-8')).hexdigest() + + @classmethod + async def _xadd_event(cls, redis: aioredis.Redis, event_type: str, payload: dict, source: str) -> None: + """ + 写入日志事件到Redis Streams + + :param redis: Redis连接对象 + :param event_type: 事件类型 + :param payload: 事件负载 + :param source: 日志来源 + :return: None + """ + request_id = TraceCtx.get_request_id() + trace_id = TraceCtx.get_trace_id() + span_id = TraceCtx.get_span_id() + event_id = cls._build_event_id(request_id, event_type, source) + await redis.xadd( + LogConfig.log_stream_key, + { + 'event_id': event_id, + 'event_type': event_type, + 'request_id': request_id, + 'trace_id': trace_id, + 'span_id': span_id, + 'payload': json.dumps(payload, ensure_ascii=False, default=str), + }, + maxlen=LogConfig.log_stream_maxlen, + approximate=True, + ) + + @classmethod + async def enqueue_login_log(cls, request: Request, login_log: LogininforModel, source: str) -> None: + """ + 登录日志入队 + + :param request: Request对象 + :param login_log: 登录日志模型 + :param source: 日志来源 + :return: None + """ + payload = login_log.model_dump(by_alias=True, exclude_none=True) + await cls._xadd_event(request.app.state.redis, 'login', payload, source) + + @classmethod + async def enqueue_operation_log(cls, request: Request, operation_log: OperLogModel, source: str) -> None: + """ + 操作日志入队 + + :param request: Request对象 + :param operation_log: 操作日志模型 + :param source: 日志来源 + :return: None + """ + payload = operation_log.model_dump(by_alias=True, exclude_none=True) + await cls._xadd_event(request.app.state.redis, 'operation', payload, source) + + +class LogAggregatorService: + @classmethod + async def _ensure_group(cls, redis: aioredis.Redis) -> None: + """ + 初始化消费组 + + :param redis: Redis连接对象 + :return: None + """ + try: + await redis.xgroup_create( + name=LogConfig.log_stream_key, + groupname=LogConfig.log_stream_group, + id='0-0', + mkstream=True, + ) + except Exception as exc: + if 'BUSYGROUP' not in str(exc): + raise + + @classmethod + async def _acquire_dedup(cls, redis: aioredis.Redis, event_id: str) -> bool: + """ + 获取去重锁 + + :param redis: Redis连接对象 + :param event_id: 事件唯一标识 + :return: 是否获取成功 + """ + if not event_id: + return False + key = f'{LogConfig.log_stream_dedup_prefix}:{event_id}' + return await redis.set(key, '1', nx=True, ex=LogConfig.log_stream_dedup_ttl) + + @classmethod + async def _release_dedup(cls, redis: aioredis.Redis, event_id: str) -> None: + """ + 释放去重锁 + + :param redis: Redis连接对象 + :param event_id: 事件唯一标识 + :return: None + """ + if not event_id: + return + await redis.delete(f'{LogConfig.log_stream_dedup_prefix}:{event_id}') + + @classmethod + async def _claim_pending(cls, redis: aioredis.Redis, consumer_name: str) -> None: + if LogConfig.log_stream_claim_idle_ms <= 0: + return + start_id = '0-0' + while True: + result = await redis.xautoclaim( + name=LogConfig.log_stream_key, + groupname=LogConfig.log_stream_group, + consumername=consumer_name, + min_idle_time=LogConfig.log_stream_claim_idle_ms, + start_id=start_id, + count=LogConfig.log_stream_claim_batch_size, + ) + if not result: + return + next_start_id, messages = result[0], result[1] + if messages: + await cls._process_messages(redis, LogConfig.log_stream_key, messages) + if not messages or next_start_id == start_id: + return + start_id = next_start_id + + @classmethod + async def consume_stream(cls, redis: aioredis.Redis) -> None: + """ + 消费日志队列 + + :param redis: Redis连接对象 + :return: None + """ + await cls._ensure_group(redis) + consumer_name = f'{LogConfig.log_stream_consumer_prefix}-{os.getpid()}-{uuid.uuid4().hex[:6]}' + last_claim_time = 0.0 + while True: + try: + now = asyncio.get_running_loop().time() + if now - last_claim_time >= LogConfig.log_stream_claim_interval_ms / 1000: + await cls._claim_pending(redis, consumer_name) + last_claim_time = now + result = await redis.xreadgroup( + groupname=LogConfig.log_stream_group, + consumername=consumer_name, + streams={LogConfig.log_stream_key: '>'}, + count=LogConfig.log_stream_batch_size, + block=LogConfig.log_stream_block_ms, + ) + if not result: + continue + for stream_name, messages in result: + await cls._process_messages(redis, stream_name, messages) + except asyncio.CancelledError: + raise + except Exception as exc: + logger.error(f'日志聚合消费异常: {exc}') + await asyncio.sleep(1) + + @classmethod + async def _process_messages(cls, redis: aioredis.Redis, stream_name: str, messages: list[tuple[str, dict]]) -> None: + """ + 处理消息并落库 + + :param redis: Redis连接对象 + :param stream_name: Stream名称 + :param messages: 消息列表 + :return: None + """ + if not messages: + return + async with AsyncSessionLocal() as session: + ack_ids: list[str] = [] + dedup_event_ids: list[str] = [] + try: + for message_id, data in messages: + event_type = data.get('event_type') + event_id = data.get('event_id') + payload_raw = data.get('payload') or '{}' + if event_type not in {'login', 'operation'}: + ack_ids.append(message_id) + continue + acquired = await cls._acquire_dedup(redis, event_id) + if not acquired: + ack_ids.append(message_id) + continue + dedup_event_ids.append(event_id) + payload = json.loads(payload_raw) + if event_type == 'login': + await LoginLogDao.add_login_log_dao(session, LogininforModel(**payload)) + elif event_type == 'operation': + await OperationLogDao.add_operation_log_dao(session, OperLogModel(**payload)) + ack_ids.append(message_id) + if ack_ids: + await session.commit() + await redis.xack(stream_name, LogConfig.log_stream_group, *ack_ids) + except Exception: + await session.rollback() + for event_id in dedup_event_ids: + await cls._release_dedup(redis, event_id) + raise diff --git a/ruoyi-fastapi-backend/utils/log_util.py b/ruoyi-fastapi-backend/utils/log_util.py index 8996306..0817faf 100644 --- a/ruoyi-fastapi-backend/utils/log_util.py +++ b/ruoyi-fastapi-backend/utils/log_util.py @@ -1,59 +1,204 @@ +import json +import logging import os import sys -import time +from typing import Any from loguru import logger as _logger from loguru._logger import Logger +from config.env import AppConfig, LogConfig from middlewares.trace_middleware import TraceCtx +from utils.server_util import WorkerIdUtil + + +class InterceptHandler(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + """ + 拦截标准 logging 记录并转发到 Loguru + + :param record: 原生 logging 日志记录 + :return: None + """ + try: + level = _logger.level(record.levelname).name + except ValueError: + level = record.levelno + frame, depth = logging.currentframe(), 2 + while frame and frame.f_code.co_filename == logging.__file__: + frame = frame.f_back + depth += 1 + _logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) class LoggerInitializer: def __init__(self) -> None: - self.log_path = os.path.join(os.getcwd(), 'logs') - self.__ensure_log_directory_exists() - self.log_path_error = os.path.join(self.log_path, f'{time.strftime("%Y-%m-%d")}_error.log') + """ + 初始化日志基础配置与运行时标识 + + :return: None + """ + self.worker_id = WorkerIdUtil.get_worker_id(LogConfig.log_worker_id) + self.instance_id = LogConfig.log_instance_id + self.service_name = LogConfig.log_service_name or AppConfig.app_name + self._log_file_enabled = LogConfig.log_file_enabled + self._log_base_dir = LogConfig.log_file_base_dir + self._ensure_log_directory_exists() + + def _ensure_log_directory_exists(self) -> None: + """ + 确保日志目录存在 + + :return: None + """ + if not self._log_file_enabled: + return + if self._log_base_dir and not os.path.exists(self._log_base_dir): + os.makedirs(self._log_base_dir, exist_ok=True) + + def _filter(self, record: dict) -> bool: + """ + 注入 Trace 上下文并控制启动阶段日志输出 - def __ensure_log_directory_exists(self) -> None: + :param record: Loguru 日志记录字典 + :return: 是否允许输出日志 """ - 确保日志目录存在,如果不存在则创建 + record['extra']['trace_id'] = TraceCtx.get_trace_id() + record['extra']['request_id'] = TraceCtx.get_request_id() + record['extra']['span_id'] = TraceCtx.get_span_id() + record['extra']['path'] = TraceCtx.get_request_path() + record['extra']['method'] = TraceCtx.get_request_method() + record['extra']['worker_id'] = self.worker_id + record['extra']['instance_id'] = self.instance_id + record['extra']['service'] = self.service_name + if record['extra'].get('startup_phase'): + return bool(record['extra'].get('startup_log_enabled')) + return True + + def _stdout_sink(self, message: Any) -> None: + """ + 将 Loguru 日志记录序列化为 JSON 并输出到 stdout + + :param message: Loguru 消息对象 + :return: None + """ + record = message.record + exception = None + if record['exception']: + exception = { + 'type': record['exception'].type.__name__ if record['exception'].type else None, + 'value': str(record['exception'].value) if record['exception'].value else None, + 'traceback': str(record['exception'].traceback) if record['exception'].traceback else None, + } + payload = { + 'timestamp': record['time'].isoformat(), + 'level': record['level'].name, + 'message': record['message'], + 'logger': record['name'], + 'trace_id': record['extra'].get('trace_id'), + 'request_id': record['extra'].get('request_id'), + 'span_id': record['extra'].get('span_id'), + 'worker_id': record['extra'].get('worker_id'), + 'instance_id': record['extra'].get('instance_id'), + 'service': record['extra'].get('service'), + 'method': record['extra'].get('method'), + 'path': record['extra'].get('path'), + 'module': record['module'], + 'function': record['function'], + 'line': record['line'], + 'exception': exception, + 'extra': record['extra'], + } + sys.stdout.write(json.dumps(payload, ensure_ascii=False, default=str) + '\n') + + def _info_file_filter(self, record: dict) -> bool: + """ + 仅输出 INFO 级别日志到 info 文件 + + :param record: Loguru 日志记录字典 + :return: 是否允许输出日志 """ - if not os.path.exists(self.log_path): - os.mkdir(self.log_path) + return self._filter(record) and record['level'].name == 'INFO' - @staticmethod - def __filter(log: dict) -> dict: + def _error_file_filter(self, record: dict) -> bool: """ - 自定义日志过滤器,添加trace_id + 输出 WARNING 及以上日志到 error 文件 + + :param record: Loguru 日志记录字典 + :return: 是否允许输出日志 + """ + return self._filter(record) and record['level'].no >= logging.WARNING + + def _configure_logging(self) -> None: """ - log['trace_id'] = TraceCtx.get_id() - return log + 统一接管标准 logging 与第三方日志输出 + + :return: None + """ + logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True) + for logger_name in ('uvicorn', 'uvicorn.error', 'uvicorn.access', 'fastapi'): + logging.getLogger(logger_name).handlers = [InterceptHandler()] + logging.getLogger(logger_name).propagate = False + for logger_name in ('LiteLLM', 'litellm'): + logging.getLogger(logger_name).setLevel(logging.WARNING) def init_log(self) -> Logger: """ - 初始化日志配置 + 初始化 Loguru 输出与标准 logging 配置 + + :return: 已配置的 Loguru Logger 实例 """ - # 自定义日志格式 - format_str = ( - '{time:YYYY-MM-DD HH:mm:ss.SSS} | ' - '{trace_id} | ' - '{level: <8} | ' - '{name}:{function}:{line} - ' - '{message}' - ) _logger.remove() - # 移除后重新添加sys.stderr, 目的: 控制台输出与文件日志内容和结构一致 - _logger.add(sys.stderr, filter=self.__filter, format=format_str, enqueue=True) - _logger.add( - self.log_path_error, - filter=self.__filter, - format=format_str, - rotation='50MB', - encoding='utf-8', - enqueue=True, - compression='zip', - ) - + info_log_path = os.path.join(self._log_base_dir, '{time:YYYY}', '{time:MM}', '{time:DD}', 'info.log') + error_log_path = os.path.join(self._log_base_dir, '{time:YYYY}', '{time:MM}', '{time:DD}', 'error.log') + if LogConfig.loguru_stdout: + if LogConfig.loguru_json: + _logger.add( + self._stdout_sink, + level=LogConfig.loguru_level, + enqueue=True, + filter=self._filter, + ) + else: + format_str = ( + '{time:YYYY-MM-DD HH:mm:ss.SSS} | ' + '{extra[trace_id]} | ' + '{extra[span_id]} | ' + '{extra[request_id]} | ' + '{extra[worker_id]} | ' + '{level: <8} | ' + '{name}:{function}:{line} - ' + '{message}' + ) + _logger.add( + sys.stdout, + level=LogConfig.loguru_level, + enqueue=True, + filter=self._filter, + format=format_str, + ) + if self._log_file_enabled: + _logger.add( + info_log_path, + level='INFO', + rotation=LogConfig.loguru_rotation, + retention=LogConfig.loguru_retention, + compression=LogConfig.loguru_compression, + enqueue=True, + filter=self._info_file_filter, + serialize=LogConfig.loguru_json, + ) + _logger.add( + error_log_path, + level='WARNING', + rotation=LogConfig.loguru_rotation, + retention=LogConfig.loguru_retention, + compression=LogConfig.loguru_compression, + enqueue=True, + filter=self._error_file_filter, + serialize=LogConfig.loguru_json, + ) + self._configure_logging() return _logger From 710707f04421229046e915b0ddd08138b2d97802 Mon Sep 17 00:00:00 2001 From: insistence <3055204202@qq.com> Date: Fri, 6 Feb 2026 09:07:15 +0800 Subject: [PATCH 3/4] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8Dlint=E9=94=99?= =?UTF-8?q?=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../module_admin/service/captcha_service.py | 5 +++-- .../module_admin/service/server_service.py | 3 ++- ruoyi-fastapi-backend/utils/gen_util.py | 6 +++--- ruoyi-fastapi-backend/utils/template_util.py | 2 +- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/ruoyi-fastapi-backend/module_admin/service/captcha_service.py b/ruoyi-fastapi-backend/module_admin/service/captcha_service.py index 5187d74..b90caa0 100644 --- a/ruoyi-fastapi-backend/module_admin/service/captcha_service.py +++ b/ruoyi-fastapi-backend/module_admin/service/captcha_service.py @@ -1,8 +1,8 @@ import base64 import io -import os import random +import anyio from PIL import Image, ImageDraw, ImageFont @@ -20,7 +20,8 @@ async def create_captcha_image_service(cls) -> list[str, int]: draw = ImageDraw.Draw(image) # 设置字体 - font = ImageFont.truetype(os.path.join(os.path.abspath(os.getcwd()), 'assets', 'font', 'Arial.ttf'), size=30) + font_path = (await anyio.Path.cwd()) / 'assets' / 'font' / 'Arial.ttf' + font = ImageFont.truetype(font_path, size=30) # 生成两个0-9之间的随机整数 num1 = random.randint(0, 9) diff --git a/ruoyi-fastapi-backend/module_admin/service/server_service.py b/ruoyi-fastapi-backend/module_admin/service/server_service.py index 5a39698..ca84090 100644 --- a/ruoyi-fastapi-backend/module_admin/service/server_service.py +++ b/ruoyi-fastapi-backend/module_admin/service/server_service.py @@ -3,6 +3,7 @@ import socket import time +import anyio import psutil from module_admin.entity.vo.server_vo import CpuInfo, MemoryInfo, PyInfo, ServerMonitorModel, SysFiles, SysInfo @@ -41,7 +42,7 @@ async def get_server_monitor_info() -> ServerMonitorModel: os_name = platform.platform() computer_name = platform.node() os_arch = platform.machine() - user_dir = os.path.abspath(os.getcwd()) + user_dir = str(await anyio.Path.cwd()) sys = SysInfo( computerIp=computer_ip, computerName=computer_name, osArch=os_arch, osName=os_name, userDir=user_dir ) diff --git a/ruoyi-fastapi-backend/utils/gen_util.py b/ruoyi-fastapi-backend/utils/gen_util.py index c08ceec..9549546 100644 --- a/ruoyi-fastapi-backend/utils/gen_util.py +++ b/ruoyi-fastapi-backend/utils/gen_util.py @@ -126,7 +126,7 @@ def get_module_name(cls, package_name: str) -> str: param package_name: 包名 :return: 模块名 """ - return package_name.split('.')[-1] + return package_name.rsplit('.', maxsplit=1)[-1] @classmethod def get_business_name(cls, table_name: str) -> str: @@ -136,7 +136,7 @@ def get_business_name(cls, table_name: str) -> str: param table_name: 业务表名 :return: 业务名 """ - return table_name.split('_')[-1] + return table_name.rsplit('_', maxsplit=1)[-1] @classmethod def convert_class_name(cls, table_name: str) -> str: @@ -186,7 +186,7 @@ def get_db_type(cls, column_type: str) -> str: :return: 数据库类型 """ if '(' in column_type: - return column_type.split('(')[0] + return column_type.split('(', maxsplit=1)[0] return column_type @classmethod diff --git a/ruoyi-fastapi-backend/utils/template_util.py b/ruoyi-fastapi-backend/utils/template_util.py index 9983fa5..89cc412 100644 --- a/ruoyi-fastapi-backend/utils/template_util.py +++ b/ruoyi-fastapi-backend/utils/template_util.py @@ -293,7 +293,7 @@ def get_db_type(cls, column_type: str) -> str: :return: 数据库类型 """ if '(' in column_type: - return column_type.split('(')[0] + return column_type.split('(', maxsplit=1)[0] return column_type @classmethod From fa4309f0ec556641d8d4bb2ab1998849341036f3 Mon Sep 17 00:00:00 2001 From: insistence <3055204202@qq.com> Date: Fri, 6 Feb 2026 09:07:28 +0800 Subject: [PATCH 4/4] =?UTF-8?q?chore:=20=E6=9B=B4=E6=96=B0.gitignore?= =?UTF-8?q?=E8=A7=84=E5=88=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 65199d4..220a696 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,5 @@ # Byte-compiled / optimized / DLL files __pycache__/ -.idea/ -.vscode/ *.py[cod] *$py.class @@ -138,3 +136,19 @@ dmypy.json # Cython debug symbols cython_debug/ + +# PyCharm +.idea/ + +# VSCode +.vscode/ + +# AI Editor +.agent/ +.claude/ +.codebuddy/ +.codex/ +.cursor/ +.opencode/ +.qoder/ +.trae/