diff --git a/.gitignore b/.gitignore
index 65199d4..220a696 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,7 +1,5 @@
# Byte-compiled / optimized / DLL files
__pycache__/
-.idea/
-.vscode/
*.py[cod]
*$py.class
@@ -138,3 +136,19 @@ dmypy.json
# Cython debug symbols
cython_debug/
+
+# PyCharm
+.idea/
+
+# VSCode
+.vscode/
+
+# AI Editor
+.agent/
+.claude/
+.codebuddy/
+.codex/
+.cursor/
+.opencode/
+.qoder/
+.trae/
diff --git a/ruoyi-fastapi-backend/.env.dev b/ruoyi-fastapi-backend/.env.dev
index 0048481..bcda2e8 100644
--- a/ruoyi-fastapi-backend/.env.dev
+++ b/ruoyi-fastapi-backend/.env.dev
@@ -13,6 +13,8 @@ APP_PORT = 9099
APP_VERSION= '1.8.1'
# 应用是否开启热重载
APP_RELOAD = true
+# 应用工作进程数
+APP_WORKERS = 1
# 应用是否开启IP归属区域查询
APP_IP_LOCATION_QUERY = true
# 应用是否允许账号同时登录
@@ -67,4 +69,50 @@ REDIS_USERNAME = ''
# Redis密码
REDIS_PASSWORD = ''
# Redis数据库
-REDIS_DATABASE = 2
\ No newline at end of file
+REDIS_DATABASE = 2
+
+# -------- 日志配置 --------
+# Redis Stream Key
+LOG_STREAM_KEY = 'log:stream'
+# Redis Stream 消费组名称
+LOG_STREAM_GROUP = 'log_aggregator'
+# Redis Stream 消费者名称前缀
+LOG_STREAM_CONSUMER_PREFIX = 'worker'
+# 每次读取的最大消息数量
+LOG_STREAM_BATCH_SIZE = 100
+# 阻塞读取等待时间(毫秒)
+LOG_STREAM_BLOCK_MS = 2000
+# Stream 最大长度(近似裁剪)
+LOG_STREAM_MAXLEN = 100000
+# Pending 回收最小空闲时间(毫秒)
+LOG_STREAM_CLAIM_IDLE_MS = 60000
+# Pending 回收检查间隔(毫秒)
+LOG_STREAM_CLAIM_INTERVAL_MS = 5000
+# 每次回收的最大消息数量
+LOG_STREAM_CLAIM_BATCH_SIZE = 100
+# 去重 Key 过期时间(秒)
+LOG_STREAM_DEDUP_TTL = 3600
+# 去重 Key 前缀
+LOG_STREAM_DEDUP_PREFIX = 'log:dedup'
+# stdout 输出是否为 JSON
+LOGURU_JSON = false
+# Loguru 最低输出级别
+LOGURU_LEVEL = 'INFO'
+# 是否输出到 stdout
+LOGURU_STDOUT = true
+# 是否启用文件日志
+LOG_FILE_ENABLED = true
+# 文件日志根目录
+LOG_FILE_BASE_DIR = 'logs'
+# 文件滚动策略
+LOGURU_ROTATION = '50MB'
+# 文件保留策略
+LOGURU_RETENTION = '30 days'
+# 文件压缩格式
+LOGURU_COMPRESSION = 'zip'
+# 实例标识(用于区分实例)
+LOG_INSTANCE_ID = 'dev'
+# 服务名称(用于统一标识服务)
+LOG_SERVICE_NAME = 'ruoyi-fastapi-backend'
+# Worker 标识(auto 自动生成)
+LOG_WORKER_ID = 'auto'
diff --git a/ruoyi-fastapi-backend/.env.dockermy b/ruoyi-fastapi-backend/.env.dockermy
index 7c0796e..008861c 100644
--- a/ruoyi-fastapi-backend/.env.dockermy
+++ b/ruoyi-fastapi-backend/.env.dockermy
@@ -13,6 +13,8 @@ APP_PORT = 9099
APP_VERSION= '1.8.1'
# 应用是否开启热重载
APP_RELOAD = false
+# 应用工作进程数
+APP_WORKERS = 1
# 应用是否开启IP归属区域查询
APP_IP_LOCATION_QUERY = true
# 应用是否允许账号同时登录
@@ -67,4 +69,50 @@ REDIS_USERNAME = ''
# Redis密码
REDIS_PASSWORD = ''
# Redis数据库
-REDIS_DATABASE = 2
\ No newline at end of file
+REDIS_DATABASE = 2
+
+# -------- 日志配置 --------
+# Redis Stream Key
+LOG_STREAM_KEY = 'log:stream'
+# Redis Stream 消费组名称
+LOG_STREAM_GROUP = 'log_aggregator'
+# Redis Stream 消费者名称前缀
+LOG_STREAM_CONSUMER_PREFIX = 'worker'
+# 每次读取的最大消息数量
+LOG_STREAM_BATCH_SIZE = 100
+# 阻塞读取等待时间(毫秒)
+LOG_STREAM_BLOCK_MS = 2000
+# Stream 最大长度(近似裁剪)
+LOG_STREAM_MAXLEN = 100000
+# Pending 回收最小空闲时间(毫秒)
+LOG_STREAM_CLAIM_IDLE_MS = 60000
+# Pending 回收检查间隔(毫秒)
+LOG_STREAM_CLAIM_INTERVAL_MS = 5000
+# 每次回收的最大消息数量
+LOG_STREAM_CLAIM_BATCH_SIZE = 100
+# 去重 Key 过期时间(秒)
+LOG_STREAM_DEDUP_TTL = 3600
+# 去重 Key 前缀
+LOG_STREAM_DEDUP_PREFIX = 'log:dedup'
+# stdout 输出是否为 JSON
+LOGURU_JSON = false
+# Loguru 最低输出级别
+LOGURU_LEVEL = 'INFO'
+# 是否输出到 stdout
+LOGURU_STDOUT = true
+# 是否启用文件日志
+LOG_FILE_ENABLED = true
+# 文件日志根目录
+LOG_FILE_BASE_DIR = 'logs'
+# 文件滚动策略
+LOGURU_ROTATION = '50MB'
+# 文件保留策略
+LOGURU_RETENTION = '30 days'
+# 文件压缩格式
+LOGURU_COMPRESSION = 'zip'
+# 实例标识(用于区分实例)
+LOG_INSTANCE_ID = 'dockermy'
+# 服务名称(用于统一标识服务)
+LOG_SERVICE_NAME = 'ruoyi-fastapi-backend'
+# Worker 标识(auto 自动生成)
+LOG_WORKER_ID = 'auto'
diff --git a/ruoyi-fastapi-backend/.env.dockerpg b/ruoyi-fastapi-backend/.env.dockerpg
index e254e15..a64b08a 100644
--- a/ruoyi-fastapi-backend/.env.dockerpg
+++ b/ruoyi-fastapi-backend/.env.dockerpg
@@ -13,6 +13,8 @@ APP_PORT = 9099
APP_VERSION= '1.8.1'
# 应用是否开启热重载
APP_RELOAD = false
+# 应用工作进程数
+APP_WORKERS = 1
# 应用是否开启IP归属区域查询
APP_IP_LOCATION_QUERY = true
# 应用是否允许账号同时登录
@@ -67,4 +69,50 @@ REDIS_USERNAME = ''
# Redis密码
REDIS_PASSWORD = ''
# Redis数据库
-REDIS_DATABASE = 2
\ No newline at end of file
+REDIS_DATABASE = 2
+
+# -------- 日志配置 --------
+# Redis Stream Key
+LOG_STREAM_KEY = 'log:stream'
+# Redis Stream 消费组名称
+LOG_STREAM_GROUP = 'log_aggregator'
+# Redis Stream 消费者名称前缀
+LOG_STREAM_CONSUMER_PREFIX = 'worker'
+# 每次读取的最大消息数量
+LOG_STREAM_BATCH_SIZE = 100
+# 阻塞读取等待时间(毫秒)
+LOG_STREAM_BLOCK_MS = 2000
+# Stream 最大长度(近似裁剪)
+LOG_STREAM_MAXLEN = 100000
+# Pending 回收最小空闲时间(毫秒)
+LOG_STREAM_CLAIM_IDLE_MS = 60000
+# Pending 回收检查间隔(毫秒)
+LOG_STREAM_CLAIM_INTERVAL_MS = 5000
+# 每次回收的最大消息数量
+LOG_STREAM_CLAIM_BATCH_SIZE = 100
+# 去重 Key 过期时间(秒)
+LOG_STREAM_DEDUP_TTL = 3600
+# 去重 Key 前缀
+LOG_STREAM_DEDUP_PREFIX = 'log:dedup'
+# stdout 输出是否为 JSON
+LOGURU_JSON = false
+# Loguru 最低输出级别
+LOGURU_LEVEL = 'INFO'
+# 是否输出到 stdout
+LOGURU_STDOUT = true
+# 是否启用文件日志
+LOG_FILE_ENABLED = true
+# 文件日志根目录
+LOG_FILE_BASE_DIR = 'logs'
+# 文件滚动策略
+LOGURU_ROTATION = '50MB'
+# 文件保留策略
+LOGURU_RETENTION = '30 days'
+# 文件压缩格式
+LOGURU_COMPRESSION = 'zip'
+# 实例标识(用于区分实例)
+LOG_INSTANCE_ID = 'dockerpg'
+# 服务名称(用于统一标识服务)
+LOG_SERVICE_NAME = 'ruoyi-fastapi-backend'
+# Worker 标识(auto 自动生成)
+LOG_WORKER_ID = 'auto'
diff --git a/ruoyi-fastapi-backend/.env.prod b/ruoyi-fastapi-backend/.env.prod
index 5237abc..746e35a 100644
--- a/ruoyi-fastapi-backend/.env.prod
+++ b/ruoyi-fastapi-backend/.env.prod
@@ -13,6 +13,8 @@ APP_PORT = 9099
APP_VERSION= '1.8.1'
# 应用是否开启热重载
APP_RELOAD = false
+# 应用工作进程数
+APP_WORKERS = 1
# 应用是否开启IP归属区域查询
APP_IP_LOCATION_QUERY = true
# 应用是否允许账号同时登录
@@ -67,4 +69,50 @@ REDIS_USERNAME = ''
# Redis密码
REDIS_PASSWORD = ''
# Redis数据库
-REDIS_DATABASE = 2
\ No newline at end of file
+REDIS_DATABASE = 2
+
+# -------- 日志配置 --------
+# Redis Stream Key
+LOG_STREAM_KEY = 'log:stream'
+# Redis Stream 消费组名称
+LOG_STREAM_GROUP = 'log_aggregator'
+# Redis Stream 消费者名称前缀
+LOG_STREAM_CONSUMER_PREFIX = 'worker'
+# 每次读取的最大消息数量
+LOG_STREAM_BATCH_SIZE = 100
+# 阻塞读取等待时间(毫秒)
+LOG_STREAM_BLOCK_MS = 2000
+# Stream 最大长度(近似裁剪)
+LOG_STREAM_MAXLEN = 100000
+# Pending 回收最小空闲时间(毫秒)
+LOG_STREAM_CLAIM_IDLE_MS = 60000
+# Pending 回收检查间隔(毫秒)
+LOG_STREAM_CLAIM_INTERVAL_MS = 5000
+# 每次回收的最大消息数量
+LOG_STREAM_CLAIM_BATCH_SIZE = 100
+# 去重 Key 过期时间(秒)
+LOG_STREAM_DEDUP_TTL = 3600
+# 去重 Key 前缀
+LOG_STREAM_DEDUP_PREFIX = 'log:dedup'
+# stdout 输出是否为 JSON
+LOGURU_JSON = false
+# Loguru 最低输出级别
+LOGURU_LEVEL = 'INFO'
+# 是否输出到 stdout
+LOGURU_STDOUT = true
+# 是否启用文件日志
+LOG_FILE_ENABLED = true
+# 文件日志根目录
+LOG_FILE_BASE_DIR = 'logs'
+# 文件滚动策略
+LOGURU_ROTATION = '50MB'
+# 文件保留策略
+LOGURU_RETENTION = '30 days'
+# 文件压缩格式
+LOGURU_COMPRESSION = 'zip'
+# 实例标识(用于区分实例)
+LOG_INSTANCE_ID = 'prod'
+# 服务名称(用于统一标识服务)
+LOG_SERVICE_NAME = 'ruoyi-fastapi-backend'
+# Worker 标识(auto 自动生成)
+LOG_WORKER_ID = 'auto'
diff --git a/ruoyi-fastapi-backend/app.py b/ruoyi-fastapi-backend/app.py
index 2d2072d..b9bc3f3 100644
--- a/ruoyi-fastapi-backend/app.py
+++ b/ruoyi-fastapi-backend/app.py
@@ -12,4 +12,5 @@
port=AppConfig.app_port,
root_path=AppConfig.app_root_path,
reload=AppConfig.app_reload,
+ workers=AppConfig.app_workers,
)
diff --git a/ruoyi-fastapi-backend/common/annotation/log_annotation.py b/ruoyi-fastapi-backend/common/annotation/log_annotation.py
index 1ddbe64..1e9ad17 100644
--- a/ruoyi-fastapi-backend/common/annotation/log_annotation.py
+++ b/ruoyi-fastapi-backend/common/annotation/log_annotation.py
@@ -10,7 +10,6 @@
from async_lru import alru_cache
from fastapi import Request
from fastapi.responses import JSONResponse, ORJSONResponse, UJSONResponse
-from sqlalchemy.ext.asyncio import AsyncSession
from starlette.status import HTTP_200_OK
from typing_extensions import ParamSpec
from user_agents import parse
@@ -20,7 +19,7 @@
from config.env import AppConfig
from exceptions.exception import LoginException, ServiceException, ServiceWarning
from module_admin.entity.vo.log_vo import LogininforModel, OperLogModel
-from module_admin.service.log_service import LoginLogService, OperationLogService
+from module_admin.service.log_service import LogQueueService
from utils.dependency_util import DependencyUtil
from utils.log_util import logger
from utils.response_util import ResponseUtil
@@ -63,8 +62,6 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
request_name_list = get_function_parameters_name_by_type(func, Request)
request = get_function_parameters_value_by_name(func, request_name_list[0], *args, **kwargs)
DependencyUtil.check_exclude_routes(request, err_msg='当前路由不在认证规则内,不可使用Log装饰器')
- session_name_list = get_function_parameters_name_by_type(func, AsyncSession)
- query_db = get_function_parameters_value_by_name(func, session_name_list[0], *args, **kwargs)
request_method = request.method
user_agent = request.headers.get('User-Agent')
# 获取操作类型
@@ -122,7 +119,7 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
}
)
- await LoginLogService.add_login_log_services(query_db, LogininforModel(**login_log))
+ await LogQueueService.enqueue_login_log(request, LogininforModel(**login_log), func_path)
else:
current_user = RequestContext.get_current_user()
oper_name = current_user.user.user_name
@@ -145,7 +142,7 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
operTime=oper_time,
costTime=int(cost_time),
)
- await OperationLogService.add_operation_log_services(query_db, operation_log)
+ await LogQueueService.enqueue_operation_log(request, operation_log, func_path)
return result
diff --git a/ruoyi-fastapi-backend/common/constant.py b/ruoyi-fastapi-backend/common/constant.py
index da70e15..8323c6a 100644
--- a/ruoyi-fastapi-backend/common/constant.py
+++ b/ruoyi-fastapi-backend/common/constant.py
@@ -133,6 +133,16 @@ class JobConstant:
JOB_WHITE_LIST = ['module_task']
+class LockConstant:
+ """
+ 分布式锁常量
+ """
+
+ APP_STARTUP_LOCK_KEY = 'app:startup:lock'
+ LOCK_EXPIRE_SECONDS = 60
+ LOCK_RENEWAL_INTERVAL = 20
+
+
class MenuConstant:
"""
菜单常量
diff --git a/ruoyi-fastapi-backend/config/database.py b/ruoyi-fastapi-backend/config/database.py
index 72e0ce4..e526b42 100644
--- a/ruoyi-fastapi-backend/config/database.py
+++ b/ruoyi-fastapi-backend/config/database.py
@@ -1,29 +1,108 @@
from urllib.parse import quote_plus
-from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, create_async_engine
-from sqlalchemy.orm import DeclarativeBase
+from sqlalchemy import Engine, create_engine
+from sqlalchemy.ext.asyncio import AsyncAttrs, AsyncEngine, async_sessionmaker, create_async_engine
+from sqlalchemy.orm import DeclarativeBase, sessionmaker
from config.env import DataBaseConfig
-ASYNC_SQLALCHEMY_DATABASE_URL = (
- f'mysql+asyncmy://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@'
- f'{DataBaseConfig.db_host}:{DataBaseConfig.db_port}/{DataBaseConfig.db_database}'
-)
-if DataBaseConfig.db_type == 'postgresql':
- ASYNC_SQLALCHEMY_DATABASE_URL = (
- f'postgresql+asyncpg://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@'
+
+def build_async_sqlalchemy_database_url() -> str:
+ """
+ 构建异步 SQLAlchemy 数据库连接 URL
+
+ :return: 异步 SQLAlchemy 数据库连接 URL
+ """
+ if DataBaseConfig.db_type == 'postgresql':
+ return (
+ f'postgresql+asyncpg://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@'
+ f'{DataBaseConfig.db_host}:{DataBaseConfig.db_port}/{DataBaseConfig.db_database}'
+ )
+ return (
+ f'mysql+asyncmy://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@'
f'{DataBaseConfig.db_host}:{DataBaseConfig.db_port}/{DataBaseConfig.db_database}'
)
-async_engine = create_async_engine(
- ASYNC_SQLALCHEMY_DATABASE_URL,
- echo=DataBaseConfig.db_echo,
- max_overflow=DataBaseConfig.db_max_overflow,
- pool_size=DataBaseConfig.db_pool_size,
- pool_recycle=DataBaseConfig.db_pool_recycle,
- pool_timeout=DataBaseConfig.db_pool_timeout,
-)
-AsyncSessionLocal = async_sessionmaker(autocommit=False, autoflush=False, bind=async_engine)
+
+ASYNC_SQLALCHEMY_DATABASE_URL = build_async_sqlalchemy_database_url()
+
+
+def build_sync_sqlalchemy_database_url() -> str:
+ """
+ 构建同步 SQLAlchemy 数据库连接 URL
+
+ :return: 同步 SQLAlchemy 数据库连接 URL
+ """
+ if DataBaseConfig.db_type == 'postgresql':
+ return (
+ f'postgresql+psycopg2://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@'
+ f'{DataBaseConfig.db_host}:{DataBaseConfig.db_port}/{DataBaseConfig.db_database}'
+ )
+ return (
+ f'mysql+pymysql://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@'
+ f'{DataBaseConfig.db_host}:{DataBaseConfig.db_port}/{DataBaseConfig.db_database}'
+ )
+
+
+SYNC_SQLALCHEMY_DATABASE_URL = build_sync_sqlalchemy_database_url()
+
+
+def create_async_db_engine(echo: bool | None = None) -> AsyncEngine:
+ """
+ 创建异步 SQLAlchemy Engine
+
+ :param echo: 可选,是否输出 SQLAlchemy SQL 日志
+ :return: 异步 SQLAlchemy Engine
+ """
+ return create_async_engine(
+ ASYNC_SQLALCHEMY_DATABASE_URL,
+ echo=DataBaseConfig.db_echo if echo is None else echo,
+ max_overflow=DataBaseConfig.db_max_overflow,
+ pool_size=DataBaseConfig.db_pool_size,
+ pool_recycle=DataBaseConfig.db_pool_recycle,
+ pool_timeout=DataBaseConfig.db_pool_timeout,
+ )
+
+
+def create_sync_db_engine(echo: bool | None = None) -> Engine:
+ """
+ 创建同步 SQLAlchemy Engine
+
+ :param echo: 可选,是否输出 SQLAlchemy SQL 日志
+ :return: 同步 SQLAlchemy Engine
+ """
+ return create_engine(
+ SYNC_SQLALCHEMY_DATABASE_URL,
+ echo=DataBaseConfig.db_echo if echo is None else echo,
+ max_overflow=DataBaseConfig.db_max_overflow,
+ pool_size=DataBaseConfig.db_pool_size,
+ pool_recycle=DataBaseConfig.db_pool_recycle,
+ pool_timeout=DataBaseConfig.db_pool_timeout,
+ )
+
+
+def create_async_session_local(engine: AsyncEngine) -> async_sessionmaker:
+ """
+ 创建异步 Session 工厂
+
+ :param engine: 异步 SQLAlchemy Engine
+ :return: 异步 Session 工厂
+ """
+ return async_sessionmaker(autocommit=False, autoflush=False, bind=engine)
+
+
+def create_sync_session_local(engine: Engine) -> sessionmaker:
+ """
+ 创建同步 Session 工厂
+
+ :param engine: 同步 SQLAlchemy Engine
+ :return: 同步 Session 工厂
+ """
+ return sessionmaker(autocommit=False, autoflush=False, bind=engine)
+
+
+async_engine = create_async_db_engine()
+AsyncSessionLocal = create_async_session_local(async_engine)
class Base(AsyncAttrs, DeclarativeBase):
diff --git a/ruoyi-fastapi-backend/config/env.py b/ruoyi-fastapi-backend/config/env.py
index 518c068..a0b3064 100644
--- a/ruoyi-fastapi-backend/config/env.py
+++ b/ruoyi-fastapi-backend/config/env.py
@@ -21,6 +21,7 @@ class AppSettings(BaseSettings):
app_port: int = 9099
app_version: str = '1.0.0'
app_reload: bool = True
+ app_workers: int = 1
app_ip_location_query: bool = True
app_same_time_login: bool = True
app_disable_swagger: bool = False
@@ -75,6 +76,36 @@ class RedisSettings(BaseSettings):
redis_database: int = 2
+class LogSettings(BaseSettings):
+ """
+ 日志与队列配置
+ """
+
+ log_stream_key: str = 'log:stream'
+ log_stream_group: str = 'log_aggregator'
+ log_stream_consumer_prefix: str = 'worker'
+ log_stream_batch_size: int = 100
+ log_stream_block_ms: int = 2000
+ log_stream_maxlen: int = 100000
+ log_stream_claim_idle_ms: int = 60000
+ log_stream_claim_interval_ms: int = 5000
+ log_stream_claim_batch_size: int = 100
+ log_stream_dedup_ttl: int = 3600
+ log_stream_dedup_prefix: str = 'log:dedup'
+
+ loguru_json: bool = False
+ loguru_level: str = 'INFO'
+ loguru_stdout: bool = True
+ log_file_enabled: bool = True
+ log_file_base_dir: str = 'logs'
+ loguru_rotation: str = '50MB'
+ loguru_retention: str = '30 days'
+ loguru_compression: str = 'zip'
+ log_instance_id: str = 'prod'
+ log_service_name: str = 'ruoyi-fastapi-backend'
+ log_worker_id: str = 'auto'
+
+
class GenSettings:
"""
代码生成配置
@@ -184,6 +215,12 @@ def get_redis_config(self) -> RedisSettings:
# 实例化Redis配置模型
return RedisSettings()
+ def get_log_config(self) -> LogSettings:
+ """
+ 获取日志配置
+ """
+ return LogSettings()
+
def get_gen_config(self) -> GenSettings:
"""
获取代码生成配置
@@ -243,6 +280,8 @@ def parse_cli_args() -> None:
DataBaseConfig = get_config.get_database_config()
# Redis配置
RedisConfig = get_config.get_redis_config()
+# 日志配置
+LogConfig = get_config.get_log_config()
# 代码生成配置
GenConfig = get_config.get_gen_config()
# 上传配置
diff --git a/ruoyi-fastapi-backend/config/get_db.py b/ruoyi-fastapi-backend/config/get_db.py
index 16df0ab..43f838a 100644
--- a/ruoyi-fastapi-backend/config/get_db.py
+++ b/ruoyi-fastapi-backend/config/get_db.py
@@ -26,3 +26,12 @@ async def init_create_table() -> None:
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
logger.info('✅️ 数据库连接成功')
+
+
+async def close_async_engine() -> None:
+ """
+ 应用关闭时释放数据库连接池
+
+ :return:
+ """
+ await async_engine.dispose()
diff --git a/ruoyi-fastapi-backend/config/get_redis.py b/ruoyi-fastapi-backend/config/get_redis.py
index e477784..559c677 100644
--- a/ruoyi-fastapi-backend/config/get_redis.py
+++ b/ruoyi-fastapi-backend/config/get_redis.py
@@ -16,13 +16,14 @@ class RedisUtil:
"""
@classmethod
- async def create_redis_pool(cls) -> aioredis.Redis:
+ async def create_redis_pool(cls, log_enabled: bool = True, log_start_enabled: bool | None = None) -> aioredis.Redis:
"""
应用启动时初始化redis连接
+ :param log_enabled: 是否输出日志
+ :param log_start_enabled: 是否输出开始连接日志
:return: Redis连接对象
"""
- logger.info('🔎 开始连接redis...')
redis = await aioredis.from_url(
url=f'redis://{RedisConfig.redis_host}',
port=RedisConfig.redis_port,
@@ -32,19 +33,45 @@ async def create_redis_pool(cls) -> aioredis.Redis:
encoding='utf-8',
decode_responses=True,
)
+ if log_start_enabled is None:
+ log_start_enabled = log_enabled
+ if log_enabled or log_start_enabled:
+ await cls.check_redis_connection(redis, log_enabled=log_enabled, log_start_enabled=log_start_enabled)
+ return redis
+
+ @classmethod
+ async def check_redis_connection(
+ cls, redis: aioredis.Redis, log_enabled: bool = True, log_start_enabled: bool | None = None
+ ) -> None:
+ """
+ 检查redis连接状态
+
+ :param redis: redis对象
+ :param log_enabled: 是否输出日志
+ :param log_start_enabled: 是否输出开始连接日志
+ :return: None
+ """
+ if log_start_enabled is None:
+ log_start_enabled = log_enabled
+ if log_start_enabled:
+ logger.info('🔎 开始连接redis...')
try:
connection = await redis.ping()
+ if not log_enabled:
+ return
if connection:
logger.info('✅️ redis连接成功')
else:
logger.error('❌️ redis连接失败')
except AuthenticationError as e:
- logger.error(f'❌️ redis用户名或密码错误,详细错误信息:{e}')
+ if log_enabled:
+ logger.error(f'❌️ redis用户名或密码错误,详细错误信息:{e}')
except RedisTimeoutError as e:
- logger.error(f'❌️ redis连接超时,详细错误信息:{e}')
+ if log_enabled:
+ logger.error(f'❌️ redis连接超时,详细错误信息:{e}')
except RedisError as e:
- logger.error(f'❌️ redis连接错误,详细错误信息:{e}')
- return redis
+ if log_enabled:
+ logger.error(f'❌️ redis连接错误,详细错误信息:{e}')
@classmethod
async def close_redis_pool(cls, app: FastAPI) -> None:
diff --git a/ruoyi-fastapi-backend/config/get_scheduler.py b/ruoyi-fastapi-backend/config/get_scheduler.py
index 28045c0..a6c2598 100644
--- a/ruoyi-fastapi-backend/config/get_scheduler.py
+++ b/ruoyi-fastapi-backend/config/get_scheduler.py
@@ -1,3 +1,4 @@
+import asyncio
import importlib
import json
from asyncio import iscoroutinefunction
@@ -16,16 +17,25 @@
from apscheduler.triggers.combining import OrTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
-from sqlalchemy.engine import create_engine
-from sqlalchemy.orm import sessionmaker
+from redis import asyncio as aioredis
+from sqlalchemy.engine import Engine
+from sqlalchemy.ext.asyncio import AsyncEngine
import module_task # noqa: F401
-from config.database import AsyncSessionLocal, quote_plus
-from config.env import DataBaseConfig, RedisConfig
+from common.constant import LockConstant
+from config.database import (
+ SYNC_SQLALCHEMY_DATABASE_URL,
+ create_async_db_engine,
+ create_async_session_local,
+ create_sync_db_engine,
+ create_sync_session_local,
+)
+from config.env import LogConfig, RedisConfig
from module_admin.dao.job_dao import JobDao
from module_admin.entity.vo.job_vo import JobLogModel, JobModel
from module_admin.service.job_log_service import JobLogService
from utils.log_util import logger
+from utils.server_util import StartupUtil, WorkerIdUtil
# 重写Cron定时
@@ -86,24 +96,6 @@ def __find_recent_workday(cls, day: int) -> int:
diff += 1
-SQLALCHEMY_DATABASE_URL = (
- f'mysql+pymysql://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@'
- f'{DataBaseConfig.db_host}:{DataBaseConfig.db_port}/{DataBaseConfig.db_database}'
-)
-if DataBaseConfig.db_type == 'postgresql':
- SQLALCHEMY_DATABASE_URL = (
- f'postgresql+psycopg2://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@'
- f'{DataBaseConfig.db_host}:{DataBaseConfig.db_port}/{DataBaseConfig.db_database}'
- )
-engine = create_engine(
- SQLALCHEMY_DATABASE_URL,
- echo=DataBaseConfig.db_echo,
- max_overflow=DataBaseConfig.db_max_overflow,
- pool_size=DataBaseConfig.db_pool_size,
- pool_recycle=DataBaseConfig.db_pool_recycle,
- pool_timeout=DataBaseConfig.db_pool_timeout,
-)
-SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
redis_config = {
'host': RedisConfig.redis_host,
'port': RedisConfig.redis_port,
@@ -111,15 +103,9 @@ def __find_recent_workday(cls, day: int) -> int:
'password': RedisConfig.redis_password,
'db': RedisConfig.redis_database,
}
-job_stores = {
- 'default': MemoryJobStore(),
- 'sqlalchemy': SQLAlchemyJobStore(url=SQLALCHEMY_DATABASE_URL, engine=engine),
- 'redis': RedisJobStore(**redis_config),
-}
executors = {'default': AsyncIOExecutor(), 'processpool': ProcessPoolExecutor(5)}
job_defaults = {'coalesce': False, 'max_instance': 1}
scheduler = AsyncIOScheduler()
-scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults)
class SchedulerUtil:
@@ -127,23 +113,608 @@ class SchedulerUtil:
定时任务相关方法
"""
+ # 分布式锁相关类变量
+ _is_leader: bool = False
+ _worker_id: str = WorkerIdUtil.get_worker_id(LogConfig.log_worker_id)
+ _redis: aioredis.Redis | None = None
+ _job_update_time_cache: dict[str, datetime] = {}
+ _sync_channel: str = 'scheduler:sync:request'
+ _sync_listener_task: asyncio.Task | None = None
+ _lock_lost_task: asyncio.Task | None = None
+ _sync_task: asyncio.Task | None = None
+ _sync_pending: bool = False
+ _sync_lock: asyncio.Lock = asyncio.Lock()
+ _last_sync_at: datetime | None = None
+ _sync_debounce_seconds: float = 0.5
+ _sync_min_interval_seconds: float = 2.0
+ _reacquire_task: asyncio.Task | None = None
+ _reacquire_interval_seconds: float = 5.0
+ _sync_async_engine: AsyncEngine | None = None
+ _sync_async_sessionmaker: Any | None = None
+ _disposed_sync_engines: bool = False
+
+ # 懒加载的同步 Engine 和 SessionLocal
+ _jobstore_engine: Engine | None = None
+ _listener_engine: Engine | None = None
+ _session_local: Any | None = None
+ _scheduler_configured: bool = False
+
+ @classmethod
+ def _get_jobstore_engine(cls) -> Engine:
+ """
+ 懒加载获取 jobstore 使用的同步 Engine
+
+ :return: 同步 Engine
+ """
+ if cls._jobstore_engine is None:
+ cls._jobstore_engine = create_sync_db_engine(echo=False)
+ return cls._jobstore_engine
+
+ @classmethod
+ def _get_listener_engine(cls) -> Engine:
+ """
+ 懒加载获取 listener 使用的同步 Engine
+
+ :return: 同步 Engine
+ """
+ if cls._listener_engine is None:
+ cls._listener_engine = create_sync_db_engine()
+ return cls._listener_engine
+
+ @classmethod
+ def _get_session_local(cls) -> Any:
+ """
+ 懒加载获取同步 SessionLocal
+
+ :return: SessionLocal
+ """
+ if cls._session_local is None:
+ cls._session_local = create_sync_session_local(cls._get_listener_engine())
+ return cls._session_local
+
+ @classmethod
+ def _configure_scheduler(cls) -> None:
+ """
+ 配置 scheduler(懒加载 jobstore)
+
+ :return: None
+ """
+ if cls._scheduler_configured:
+ return
+ job_stores = {
+ 'default': MemoryJobStore(),
+ 'sqlalchemy': SQLAlchemyJobStore(url=SYNC_SQLALCHEMY_DATABASE_URL, engine=cls._get_jobstore_engine()),
+ 'redis': RedisJobStore(**redis_config),
+ }
+ scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults)
+ cls._scheduler_configured = True
+
@classmethod
- async def init_system_scheduler(cls) -> None:
+ async def init_system_scheduler(cls, redis: aioredis.Redis) -> None:
"""
- 应用启动时初始化定时任务
+ 应用启动时初始化定时任务(使用分布式锁确保只有一个worker启动scheduler)
+ :param redis: Redis连接对象
:return:
"""
- logger.info('🔎 开始启动定时任务...')
+ cls._redis = redis
+ logger.info(f'🔎 Worker {cls._worker_id} 尝试获取 Application 锁...')
+
+ acquired = await StartupUtil.acquire_startup_log_gate(
+ redis=redis,
+ lock_key=LockConstant.APP_STARTUP_LOCK_KEY,
+ worker_id=cls._worker_id,
+ lock_expire_seconds=LockConstant.LOCK_EXPIRE_SECONDS,
+ )
+
+ if acquired:
+ await cls._start_scheduler_as_leader(redis)
+ else:
+ cls._is_leader = False
+ logger.info(f'⏸️ Worker {cls._worker_id} 未持有 Application 锁,跳过 Scheduler 启动')
+
+ @classmethod
+ async def _start_scheduler_as_leader(cls, redis: aioredis.Redis) -> None:
+ """
+ 以 Leader 身份启动 Scheduler(内部方法,调用前需确保已持有锁)
+
+ :param redis: Redis连接对象
+ :return: None
+ """
+ cls._is_leader = True
+ cls._disposed_sync_engines = False
+ logger.info(f'🎯 Worker {cls._worker_id} 持有 Application 锁,开始启动定时任务...')
+ # 懒加载配置 scheduler
+ cls._configure_scheduler()
scheduler.start()
- async with AsyncSessionLocal() as session:
+
+ # 加载数据库中的定时任务
+ async with cls._get_sync_async_session() as session:
job_list = await JobDao.get_job_list_for_scheduler(session)
for item in job_list:
- cls.remove_scheduler_job(job_id=str(item.job_id))
- cls.add_scheduler_job(item)
+ cls._add_job_to_scheduler(item)
+
+ # 添加事件监听器
scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL)
+
+ # 添加任务状态同步任务(每30秒从数据库同步一次任务状态)
+ scheduler.add_job(
+ func=cls.request_scheduler_sync,
+ trigger='interval',
+ seconds=30,
+ id='_scheduler_job_sync',
+ name='Scheduler任务同步',
+ replace_existing=True,
+ )
+ cls._sync_listener_task = asyncio.create_task(cls._listen_sync_channel(redis))
+
logger.info('✅️ 系统初始定时任务加载成功')
+ @classmethod
+ def on_lock_lost(cls) -> None:
+ """
+ 锁丢失处理入口
+
+ :return: None
+ """
+ if not cls._is_leader:
+ return
+ cls._is_leader = False
+ logger.warning(f'⚠️ Worker {cls._worker_id} 失去 Application 锁')
+ if cls._lock_lost_task:
+ cls._lock_lost_task.cancel()
+ cls._lock_lost_task = asyncio.create_task(cls._handle_lock_lost())
+
+ @classmethod
+ async def _handle_lock_lost(cls) -> None:
+ """
+ 处理锁丢失后的资源释放
+
+ :return: None
+ """
+ if cls._sync_listener_task:
+ cls._sync_listener_task.cancel()
+ try:
+ await cls._sync_listener_task
+ except asyncio.CancelledError:
+ pass
+ cls._sync_listener_task = None
+ if cls._sync_task:
+ cls._sync_task.cancel()
+ try:
+ await cls._sync_task
+ except asyncio.CancelledError:
+ pass
+ cls._sync_task = None
+ cls._sync_pending = False
+ if getattr(scheduler, 'running', False):
+ scheduler.shutdown()
+ await cls._dispose_sync_async_engine()
+ cls._dispose_sync_engines()
+ cls._ensure_reacquire_task()
+
+ @classmethod
+ async def _sync_jobs_from_database(cls) -> None:
+ """
+ 从数据库同步任务状态,确保多worker环境下任务状态一致
+ """
+ if not cls._is_leader:
+ return
+
+ try:
+ async with cls._get_sync_async_session() as session:
+ db_jobs_all = await JobDao.get_all_job_list_for_scheduler(session)
+ db_jobs_enabled = [job for job in db_jobs_all if job.status == '0']
+ db_enabled_ids = {str(job.job_id) for job in db_jobs_enabled}
+ db_job_map = {str(job.job_id): job for job in db_jobs_enabled}
+ db_job_update_time_map = {
+ str(job.job_id): job.update_time for job in db_jobs_enabled if job.update_time is not None
+ }
+ scheduler_jobs = scheduler.get_jobs()
+ scheduler_job_map = {job.id: job for job in scheduler_jobs if not job.id.startswith('_')}
+ scheduler_job_ids = set(scheduler_job_map.keys())
+
+ jobs_to_remove = scheduler_job_ids - db_enabled_ids
+ for job_id in jobs_to_remove:
+ scheduler.remove_job(job_id=job_id)
+ logger.info(f'🗑️ 同步移除任务: {job_id}')
+ cls._refresh_job_update_cache(job_id, None)
+
+ jobs_to_add = db_enabled_ids - scheduler_job_ids
+ for job_id in jobs_to_add:
+ job_info = db_job_map.get(job_id)
+ if job_info:
+ cls._add_job_to_scheduler(job_info)
+ logger.info(f'➕ 同步添加任务: {job_info.job_name}')
+ cls._refresh_job_update_cache(job_id, job_info.update_time)
+
+ jobs_to_update = db_enabled_ids & scheduler_job_ids
+ for job_id in jobs_to_update:
+ job_info = db_job_map.get(job_id)
+ scheduler_job = scheduler_job_map.get(job_id)
+ job_update_time = db_job_update_time_map.get(job_id)
+ cls._sync_update_job(job_id, job_info, scheduler_job, job_update_time)
+
+ except Exception as e:
+ logger.error(f'❌ 任务同步异常: {e}')
+
+ @classmethod
+ def _is_job_config_in_sync(cls, scheduler_job: Job, job_info: JobModel) -> bool:
+ """
+ 判断任务配置是否一致
+
+ :param scheduler_job: 调度器任务对象
+ :param job_info: 数据库任务对象
+ :return: 是否一致
+ """
+ job_state = scheduler_job.__getstate__()
+ job_kwargs = json.loads(job_info.job_kwargs) if job_info.job_kwargs else None
+ job_args = job_info.job_args.split(',') if job_info.job_args else None
+ job_executor = job_info.job_executor
+ if iscoroutinefunction(cls._import_function(job_info.invoke_target)):
+ job_executor = 'default'
+ expected = {
+ 'name': job_info.job_name,
+ 'executor': job_executor,
+ 'jobstore': job_info.job_group,
+ 'misfire_grace_time': 1000000000000 if job_info.misfire_policy == '3' else None,
+ 'coalesce': job_info.misfire_policy == '2',
+ 'max_instances': 3 if job_info.concurrent == '0' else 1,
+ 'trigger': str(MyCronTrigger.from_crontab(job_info.cron_expression)),
+ 'args': tuple(job_args) if job_args else None,
+ 'kwargs': job_kwargs if job_kwargs else None,
+ 'func': str(cls._import_function(job_info.invoke_target)),
+ }
+ current = {
+ 'name': job_state.get('name'),
+ 'executor': job_state.get('executor'),
+ 'jobstore': scheduler_job._jobstore_alias,
+ 'misfire_grace_time': job_state.get('misfire_grace_time'),
+ 'coalesce': job_state.get('coalesce'),
+ 'max_instances': job_state.get('max_instances'),
+ 'trigger': str(job_state.get('trigger')),
+ 'args': job_state.get('args'),
+ 'kwargs': job_state.get('kwargs'),
+ 'func': str(job_state.get('func')),
+ }
+ return expected == current
+
+ @classmethod
+ def _sync_update_job(
+ cls, job_id: str, job_info: JobModel | None, scheduler_job: Job | None, job_update_time: datetime | None
+ ) -> None:
+ """
+ 同步更新任务配置
+
+ :param job_id: 任务ID
+ :param job_info: 数据库任务对象
+ :param scheduler_job: 调度器任务对象
+ :param job_update_time: 任务更新时间
+ :return: None
+ """
+ if not job_info or not scheduler_job:
+ return
+ if cls._should_skip_job_update(job_id, job_update_time):
+ return
+ if not cls._is_job_config_in_sync(scheduler_job, job_info):
+ scheduler.remove_job(job_id=job_id)
+ cls._add_job_to_scheduler(job_info)
+ logger.info(f'♻️ 同步更新任务: {job_info.job_name}')
+ cls._refresh_job_update_cache(job_id, job_update_time)
+
+ @classmethod
+ def _should_skip_job_update(cls, job_id: str, job_update_time: datetime | None) -> bool:
+ """
+ 判断是否跳过同步更新
+
+ :param job_id: 任务ID
+ :param job_update_time: 任务更新时间
+ :return: 是否跳过
+ """
+ if job_update_time is None:
+ return False
+ return cls._job_update_time_cache.get(job_id) == job_update_time
+
+ @classmethod
+ def _refresh_job_update_cache(cls, job_id: str, job_update_time: datetime | None) -> None:
+ """
+ 刷新任务更新时间缓存
+
+ :param job_id: 任务ID
+ :param job_update_time: 任务更新时间
+ :return: None
+ """
+ if job_update_time is not None:
+ cls._job_update_time_cache[job_id] = job_update_time
+ else:
+ cls._job_update_time_cache.pop(job_id, None)
+
+ @classmethod
+ async def request_scheduler_sync(cls) -> None:
+ """
+ 请求调度器同步任务状态
+
+ :return: None
+ """
+ if cls._is_leader:
+ cls._sync_pending = True
+ cls._ensure_sync_task()
+ return
+ if cls._redis:
+ await cls._redis.publish(cls._sync_channel, cls._worker_id)
+
+ @classmethod
+ def _ensure_sync_task(cls) -> None:
+ """
+ 启动同步调度任务
+
+ :return: None
+ """
+ if cls._sync_task and not cls._sync_task.done():
+ return
+ cls._sync_task = asyncio.create_task(cls._run_sync_loop())
+
+ @classmethod
+ def _get_sync_async_session(cls) -> Any:
+ """
+ 获取同步任务使用的异步 Session
+
+ :return: 异步 Session
+ """
+ if not cls._sync_async_sessionmaker:
+ cls._sync_async_engine = create_async_db_engine(echo=False)
+ cls._sync_async_sessionmaker = create_async_session_local(cls._sync_async_engine)
+ return cls._sync_async_sessionmaker()
+
+ @classmethod
+ async def _dispose_sync_async_engine(cls) -> None:
+ """
+ 释放同步任务使用的异步 Engine
+
+ :return: None
+ """
+ if cls._sync_async_engine:
+ await cls._sync_async_engine.dispose()
+ cls._sync_async_engine = None
+ cls._sync_async_sessionmaker = None
+
+ @classmethod
+ def _dispose_sync_engines(cls) -> None:
+ """
+ 释放 Scheduler 使用的同步 Engine
+
+ :return: None
+ """
+ if cls._disposed_sync_engines:
+ return
+ if cls._jobstore_engine:
+ cls._jobstore_engine.dispose()
+ cls._jobstore_engine = None
+ if cls._listener_engine:
+ cls._listener_engine.dispose()
+ cls._listener_engine = None
+ cls._session_local = None
+ cls._disposed_sync_engines = True
+
+ @classmethod
+ def _ensure_reacquire_task(cls) -> None:
+ """
+ 启动锁重新竞争任务
+
+ :return: None
+ """
+ if not cls._redis:
+ return
+ if cls._reacquire_task and not cls._reacquire_task.done():
+ return
+ cls._reacquire_task = asyncio.create_task(cls._run_reacquire_loop())
+
+ @classmethod
+ async def _run_reacquire_loop(cls) -> None:
+ """
+ 循环尝试重新获取锁并恢复调度器
+
+ :return: None
+ """
+ try:
+ while not cls._is_leader:
+ if not cls._redis:
+ await asyncio.sleep(cls._reacquire_interval_seconds)
+ continue
+ acquired = await StartupUtil.acquire_startup_log_gate(
+ redis=cls._redis,
+ lock_key=LockConstant.APP_STARTUP_LOCK_KEY,
+ worker_id=cls._worker_id,
+ lock_expire_seconds=LockConstant.LOCK_EXPIRE_SECONDS,
+ )
+ if acquired:
+ # 直接调用 _start_scheduler_as_leader,避免重复获取锁
+ await cls._start_scheduler_as_leader(cls._redis)
+ return
+ await asyncio.sleep(cls._reacquire_interval_seconds)
+ except asyncio.CancelledError:
+ raise
+ finally:
+ cls._reacquire_task = None
+
+ @classmethod
+ async def _run_sync_loop(cls) -> None:
+ """
+ 执行同步调度循环
+
+ :return: None
+ """
+ try:
+ while True:
+ if not cls._sync_pending:
+ break
+ cls._sync_pending = False
+ await asyncio.sleep(cls._sync_debounce_seconds)
+ await cls._sync_with_throttle()
+ except asyncio.CancelledError:
+ raise
+ finally:
+ cls._sync_task = None
+
+ @classmethod
+ async def _sync_with_throttle(cls) -> None:
+ """
+ 按节流规则执行同步
+
+ :return: None
+ """
+ async with cls._sync_lock:
+ if not cls._is_leader:
+ return
+ if cls._last_sync_at:
+ elapsed = datetime.now() - cls._last_sync_at
+ min_interval = timedelta(seconds=cls._sync_min_interval_seconds)
+ if elapsed < min_interval:
+ await asyncio.sleep((min_interval - elapsed).total_seconds())
+ await cls._sync_jobs_from_database()
+ cls._last_sync_at = datetime.now()
+
+ @classmethod
+ async def _listen_sync_channel(cls, redis: aioredis.Redis) -> None:
+ """
+ 监听同步请求通道
+
+ :param redis: Redis连接对象
+ :return: None
+ """
+ while True:
+ pubsub = redis.pubsub()
+ try:
+ await pubsub.subscribe(cls._sync_channel)
+ async for message in pubsub.listen():
+ if not cls._is_leader:
+ continue
+ if message.get('type') != 'message':
+ continue
+ await cls.request_scheduler_sync()
+ except asyncio.CancelledError:
+ await pubsub.unsubscribe(cls._sync_channel)
+ await pubsub.close()
+ raise
+ except Exception as e:
+ logger.error(f'❌ Scheduler 同步监听异常: {e},5秒后重试...')
+ await pubsub.close()
+ await asyncio.sleep(5)
+ finally:
+ try:
+ await pubsub.close()
+ except Exception:
+ pass
+
+ @classmethod
+ async def _execute_async_job_with_log(
+ cls, job_func: Callable[..., Any], job_info: JobModel, args: list, kwargs: dict
+ ) -> None:
+ """
+ 执行异步任务并记录日志
+
+ :param job_func: 任务函数
+ :param job_info: 任务对象信息
+ :param args: 位置参数
+ :param kwargs: 关键字参数
+ :return: None
+ """
+ status = '0'
+ exception_info = ''
+ job_executor = job_info.job_executor
+ if iscoroutinefunction(job_func):
+ job_executor = 'default'
+ try:
+ await job_func(*args, **kwargs)
+ except Exception as e:
+ status = '1'
+ exception_info = str(e)
+ logger.error(f'❌ 异步执行任务 {job_info.job_name} 失败: {e}')
+ finally:
+ cls._record_job_execution_log(job_info, job_executor, status, exception_info)
+
+ @classmethod
+ def _record_job_execution_log(cls, job_info: JobModel, job_executor: str, status: str, exception_info: str) -> None:
+ """
+ 记录任务执行日志(用于非 Leader Worker 直接执行任务时)
+
+ :param job_info: 任务对象信息
+ :param job_executor: 任务执行器
+ :param status: 执行状态 0-成功 1-失败
+ :param exception_info: 异常信息
+ :return: None
+ """
+ try:
+ job_args = job_info.job_args if job_info.job_args else ''
+ job_kwargs = job_info.job_kwargs if job_info.job_kwargs else '{}'
+ job_trigger = str(MyCronTrigger.from_crontab(job_info.cron_expression)) if job_info.cron_expression else ''
+ job_message = (
+ f'事件类型: DirectExecution(非Leader), 任务ID: {job_info.job_id}, '
+ f'任务名称: {job_info.job_name}, 执行于{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
+ )
+ job_log = JobLogModel(
+ jobName=job_info.job_name,
+ jobGroup=job_info.job_group,
+ jobExecutor=job_executor,
+ invokeTarget=job_info.invoke_target,
+ jobArgs=job_args,
+ jobKwargs=job_kwargs,
+ jobTrigger=job_trigger,
+ jobMessage=job_message,
+ status=status,
+ exceptionInfo=exception_info,
+ createTime=datetime.now(),
+ )
+ session = cls._get_session_local()()
+ try:
+ JobLogService.add_job_log_services(session, job_log)
+ finally:
+ session.close()
+ except Exception as e:
+ logger.error(f'❌ 记录任务执行日志失败: {e}')
+
+ @classmethod
+ def _prepare_scheduler_job_add(cls, job_info: JobModel) -> dict[str, Any]:
+ """
+ 构建调度器任务参数
+
+ :param job_info: 任务对象信息
+ :return: 调度器任务参数
+ """
+ job_func = cls._import_function(job_info.invoke_target)
+ job_executor = job_info.job_executor
+ if iscoroutinefunction(job_func):
+ job_executor = 'default'
+ return {
+ 'func': job_func,
+ 'trigger': MyCronTrigger.from_crontab(job_info.cron_expression),
+ 'args': job_info.job_args.split(',') if job_info.job_args else None,
+ 'kwargs': json.loads(job_info.job_kwargs) if job_info.job_kwargs else None,
+ 'id': str(job_info.job_id),
+ 'name': job_info.job_name,
+ 'misfire_grace_time': 1000000000000 if job_info.misfire_policy == '3' else None,
+ 'coalesce': job_info.misfire_policy == '2',
+ 'max_instances': 3 if job_info.concurrent == '0' else 1,
+ 'jobstore': job_info.job_group,
+ 'executor': job_executor,
+ }
+
+ @classmethod
+ def _add_job_to_scheduler(cls, job_info: JobModel) -> None:
+ """
+ 内部方法:将任务添加到调度器(不检查应用锁状态,仅供内部使用)
+
+ :param job_info: 任务对象信息
+ """
+ try:
+ # 先移除已存在的同ID任务
+ existing_job = scheduler.get_job(job_id=str(job_info.job_id))
+ if existing_job:
+ scheduler.remove_job(job_id=str(job_info.job_id))
+ scheduler.add_job(**cls._prepare_scheduler_job_add(job_info))
+ except Exception as e:
+ logger.error(f'❌ 添加任务 {job_info.job_name} 失败: {e}')
+
@classmethod
async def close_system_scheduler(cls) -> None:
"""
@@ -151,8 +722,46 @@ async def close_system_scheduler(cls) -> None:
:return:
"""
- scheduler.shutdown()
- logger.info('✅️ 关闭定时任务成功')
+ if cls._sync_listener_task:
+ cls._sync_listener_task.cancel()
+ try:
+ await cls._sync_listener_task
+ except asyncio.CancelledError:
+ pass
+ cls._sync_listener_task = None
+ if cls._sync_task:
+ cls._sync_task.cancel()
+ try:
+ await cls._sync_task
+ except asyncio.CancelledError:
+ pass
+ cls._sync_task = None
+ cls._sync_pending = False
+ if cls._reacquire_task:
+ cls._reacquire_task.cancel()
+ try:
+ await cls._reacquire_task
+ except asyncio.CancelledError:
+ pass
+ cls._reacquire_task = None
+ await cls._dispose_sync_async_engine()
+ cls._dispose_sync_engines()
+ if cls._lock_lost_task:
+ cls._lock_lost_task.cancel()
+ try:
+ await cls._lock_lost_task
+ except asyncio.CancelledError:
+ pass
+ cls._lock_lost_task = None
+ if getattr(scheduler, 'running', False):
+ scheduler.shutdown()
+ logger.info('✅️ 关闭定时任务成功')
+ # 释放锁
+ if cls._redis:
+ current_holder = await cls._redis.get(LockConstant.APP_STARTUP_LOCK_KEY)
+ if current_holder == cls._worker_id:
+ await cls._redis.delete(LockConstant.APP_STARTUP_LOCK_KEY)
+ logger.info(f'🔓 Worker {cls._worker_id} 释放 Application 锁')
@classmethod
def _import_function(cls, func_path: str) -> Callable[..., Any]:
@@ -186,23 +795,10 @@ def add_scheduler_job(cls, job_info: JobModel) -> None:
:param job_info: 任务对象信息
:return:
"""
- job_func = cls._import_function(job_info.invoke_target)
- job_executor = job_info.job_executor
- if iscoroutinefunction(job_func):
- job_executor = 'default'
- scheduler.add_job(
- func=job_func,
- trigger=MyCronTrigger.from_crontab(job_info.cron_expression),
- args=job_info.job_args.split(',') if job_info.job_args else None,
- kwargs=json.loads(job_info.job_kwargs) if job_info.job_kwargs else None,
- id=str(job_info.job_id),
- name=job_info.job_name,
- misfire_grace_time=1000000000000 if job_info.misfire_policy == '3' else None,
- coalesce=job_info.misfire_policy == '2',
- max_instances=3 if job_info.concurrent == '0' else 1,
- jobstore=job_info.job_group,
- executor=job_executor,
- )
+ # 非应用锁 worker 跳过操作(数据库状态是持久化的,持有应用锁时会加载)
+ if not cls._is_leader:
+ return
+ scheduler.add_job(**cls._prepare_scheduler_job_add(job_info))
@classmethod
def execute_scheduler_job_once(cls, job_info: JobModel) -> None:
@@ -216,6 +812,30 @@ def execute_scheduler_job_once(cls, job_info: JobModel) -> None:
job_executor = job_info.job_executor
if iscoroutinefunction(job_func):
job_executor = 'default'
+
+ # 非应用锁 worker:直接执行函数(不通过 scheduler)
+ if not cls._is_leader:
+ logger.info(f'📍 当前 Worker 未持有 Application 锁,直接执行任务 {job_info.job_name}')
+ args = job_info.job_args.split(',') if job_info.job_args else []
+ kwargs = json.loads(job_info.job_kwargs) if job_info.job_kwargs else {}
+ status = '0'
+ exception_info = ''
+ try:
+ if iscoroutinefunction(job_func):
+ asyncio.create_task(cls._execute_async_job_with_log(job_func, job_info, args, kwargs)) # noqa: RUF006
+ else:
+ job_func(*args, **kwargs)
+ except Exception as e:
+ status = '1'
+ exception_info = str(e)
+ logger.error(f'❌ 直接执行任务 {job_info.job_name} 失败: {e}')
+ finally:
+ # 同步任务记录日志(异步任务在 _execute_async_job_with_log 中记录)
+ if not iscoroutinefunction(job_func):
+ cls._record_job_execution_log(job_info, job_executor, status, exception_info)
+ return
+
+ # 应用锁 worker:通过 scheduler 执行
job_trigger = DateTrigger()
if job_info.status == '0':
job_trigger = OrTrigger(triggers=[DateTrigger(), MyCronTrigger.from_crontab(job_info.cron_expression)])
@@ -241,54 +861,70 @@ def remove_scheduler_job(cls, job_id: str | int) -> None:
:param job_id: 任务id
:return:
"""
+ # 非应用锁 worker 跳过操作(数据库状态是持久化的,持有应用锁时会根据状态加载)
+ if not cls._is_leader:
+ return
query_job = cls.get_scheduler_job(job_id=job_id)
if query_job:
scheduler.remove_job(job_id=str(job_id))
@classmethod
def scheduler_event_listener(cls, event: SchedulerEvent) -> None:
- # 获取事件类型和任务ID
- event_type = event.__class__.__name__
- # 获取任务执行异常信息
- status = '0'
- exception_info = ''
- if event_type == 'JobExecutionEvent' and event.exception:
- exception_info = str(event.exception)
- status = '1'
- if hasattr(event, 'job_id'):
- job_id = event.job_id
- query_job = cls.get_scheduler_job(job_id=job_id)
- if query_job:
- query_job_info = query_job.__getstate__()
- # 获取任务名称
- job_name = query_job_info.get('name')
- # 获取任务组名
- job_group = query_job._jobstore_alias
- # 获取任务执行器
- job_executor = query_job_info.get('executor')
- # 获取调用目标字符串
- invoke_target = query_job_info.get('func')
- # 获取调用函数位置参数
- job_args = ','.join(query_job_info.get('args'))
- # 获取调用函数关键字参数
- job_kwargs = json.dumps(query_job_info.get('kwargs'))
- # 获取任务触发器
- job_trigger = str(query_job_info.get('trigger'))
- # 构造日志消息
- job_message = f'事件类型: {event_type}, 任务ID: {job_id}, 任务名称: {job_name}, 执行于{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
- job_log = JobLogModel(
- jobName=job_name,
- jobGroup=job_group,
- jobExecutor=job_executor,
- invokeTarget=invoke_target,
- jobArgs=job_args,
- jobKwargs=job_kwargs,
- jobTrigger=job_trigger,
- jobMessage=job_message,
- status=status,
- exceptionInfo=exception_info,
- createTime=datetime.now(),
- )
- session = SessionLocal()
- JobLogService.add_job_log_services(session, job_log)
- session.close()
+ """
+ 调度器事件监听器,记录任务执行日志
+ """
+ try:
+ # 获取事件类型和任务ID
+ event_type = event.__class__.__name__
+ # 获取任务执行异常信息
+ status = '0'
+ exception_info = ''
+ if event_type == 'JobExecutionEvent' and event.exception:
+ exception_info = str(event.exception)
+ status = '1'
+ if hasattr(event, 'job_id'):
+ job_id = event.job_id
+ # 跳过内部系统任务(以 _ 开头的任务ID),不记录日志
+ if str(job_id).startswith('_'):
+ return
+ query_job = cls.get_scheduler_job(job_id=job_id)
+ if query_job:
+ query_job_info = query_job.__getstate__()
+ # 获取任务名称
+ job_name = query_job_info.get('name')
+ # 获取任务组名
+ job_group = query_job._jobstore_alias
+ # 获取任务执行器
+ job_executor = query_job_info.get('executor')
+ # 获取调用目标字符串
+ invoke_target = query_job_info.get('func')
+ # 获取调用函数位置参数(安全处理)
+ args = query_job_info.get('args')
+ job_args = ','.join(str(arg) for arg in args) if args else ''
+ # 获取调用函数关键字参数
+ kwargs = query_job_info.get('kwargs')
+ job_kwargs = json.dumps(kwargs) if kwargs else '{}'
+ # 获取任务触发器
+ job_trigger = str(query_job_info.get('trigger'))
+ # 构造日志消息
+ job_message = f'事件类型: {event_type}, 任务ID: {job_id}, 任务名称: {job_name}, 执行于{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
+ job_log = JobLogModel(
+ jobName=job_name,
+ jobGroup=job_group,
+ jobExecutor=job_executor,
+ invokeTarget=invoke_target,
+ jobArgs=job_args,
+ jobKwargs=job_kwargs,
+ jobTrigger=job_trigger,
+ jobMessage=job_message,
+ status=status,
+ exceptionInfo=exception_info,
+ createTime=datetime.now(),
+ )
+ session = cls._get_session_local()()
+ try:
+ JobLogService.add_job_log_services(session, job_log)
+ finally:
+ session.close()
+ except Exception as e:
+ logger.error(f'❌ 调度任务事件监听器异常: {e}')
diff --git a/ruoyi-fastapi-backend/middlewares/trace_middleware/ctx.py b/ruoyi-fastapi-backend/middlewares/trace_middleware/ctx.py
index 4f98008..f5461d1 100644
--- a/ruoyi-fastapi-backend/middlewares/trace_middleware/ctx.py
+++ b/ruoyi-fastapi-backend/middlewares/trace_middleware/ctx.py
@@ -1,16 +1,64 @@
import contextvars
from uuid import uuid4
+CTX_TRACE_ID: contextvars.ContextVar[str] = contextvars.ContextVar('trace-id', default='')
CTX_REQUEST_ID: contextvars.ContextVar[str] = contextvars.ContextVar('request-id', default='')
+CTX_SPAN_ID: contextvars.ContextVar[str] = contextvars.ContextVar('span-id', default='')
+CTX_REQUEST_PATH: contextvars.ContextVar[str] = contextvars.ContextVar('request-path', default='')
+CTX_REQUEST_METHOD: contextvars.ContextVar[str] = contextvars.ContextVar('request-method', default='')
class TraceCtx:
@staticmethod
- def set_id() -> str:
+ def set_trace_id() -> str:
+ _id = uuid4().hex
+ CTX_TRACE_ID.set(_id)
+ return _id
+
+ @staticmethod
+ def get_trace_id() -> str:
+ return CTX_TRACE_ID.get()
+
+ @staticmethod
+ def set_request_id() -> str:
_id = uuid4().hex
CTX_REQUEST_ID.set(_id)
return _id
@staticmethod
- def get_id() -> str:
+ def get_request_id() -> str:
return CTX_REQUEST_ID.get()
+
+ @staticmethod
+ def set_span_id() -> str:
+ _id = uuid4().hex
+ CTX_SPAN_ID.set(_id)
+ return _id
+
+ @staticmethod
+ def get_span_id() -> str:
+ return CTX_SPAN_ID.get()
+
+ @staticmethod
+ def set_request_path(path: str) -> None:
+ CTX_REQUEST_PATH.set(path)
+
+ @staticmethod
+ def get_request_path() -> str:
+ return CTX_REQUEST_PATH.get()
+
+ @staticmethod
+ def set_request_method(method: str) -> None:
+ CTX_REQUEST_METHOD.set(method)
+
+ @staticmethod
+ def get_request_method() -> str:
+ return CTX_REQUEST_METHOD.get()
+
+ @staticmethod
+ def clear() -> None:
+ CTX_TRACE_ID.set('')
+ CTX_REQUEST_ID.set('')
+ CTX_SPAN_ID.set('')
+ CTX_REQUEST_PATH.set('')
+ CTX_REQUEST_METHOD.set('')
diff --git a/ruoyi-fastapi-backend/middlewares/trace_middleware/middle.py b/ruoyi-fastapi-backend/middlewares/trace_middleware/middle.py
index 42cd78c..57a9a28 100644
--- a/ruoyi-fastapi-backend/middlewares/trace_middleware/middle.py
+++ b/ruoyi-fastapi-backend/middlewares/trace_middleware/middle.py
@@ -2,6 +2,7 @@
from starlette.types import ASGIApp, Message, Receive, Scope, Send
+from .ctx import TraceCtx
from .span import Span, get_current_span
@@ -39,4 +40,7 @@ async def handle_outgoing_request(message: 'Message') -> None:
await span.response(message)
await send(message)
- await self.app(scope, handle_outgoing_receive, handle_outgoing_request)
+ try:
+ await self.app(scope, handle_outgoing_receive, handle_outgoing_request)
+ finally:
+ TraceCtx.clear()
diff --git a/ruoyi-fastapi-backend/middlewares/trace_middleware/span.py b/ruoyi-fastapi-backend/middlewares/trace_middleware/span.py
index ef6e46f..ba66000 100644
--- a/ruoyi-fastapi-backend/middlewares/trace_middleware/span.py
+++ b/ruoyi-fastapi-backend/middlewares/trace_middleware/span.py
@@ -19,7 +19,11 @@ async def request_before(self) -> None:
"""
request_before: 处理header信息等, 如记录请求体信息
"""
- TraceCtx.set_id()
+ TraceCtx.set_trace_id()
+ TraceCtx.set_request_id()
+ TraceCtx.set_span_id()
+ TraceCtx.set_request_path(self.scope.get('path', ''))
+ TraceCtx.set_request_method(self.scope.get('method', ''))
async def request_after(self, message: Message) -> Message:
"""
@@ -39,7 +43,9 @@ async def response(self, message: Message) -> Message:
pass
"""
if message['type'] == 'http.response.start':
- message['headers'].append((b'request-id', TraceCtx.get_id().encode()))
+ message['headers'].append((b'request-id', TraceCtx.get_request_id().encode()))
+ message['headers'].append((b'trace-id', TraceCtx.get_trace_id().encode()))
+ message['headers'].append((b'span-id', TraceCtx.get_span_id().encode()))
return message
diff --git a/ruoyi-fastapi-backend/module_admin/dao/job_dao.py b/ruoyi-fastapi-backend/module_admin/dao/job_dao.py
index 512f87c..52df8e3 100644
--- a/ruoyi-fastapi-backend/module_admin/dao/job_dao.py
+++ b/ruoyi-fastapi-backend/module_admin/dao/job_dao.py
@@ -97,6 +97,18 @@ async def get_job_list_for_scheduler(cls, db: AsyncSession) -> Sequence[SysJob]:
return job_list
+ @classmethod
+ async def get_all_job_list_for_scheduler(cls, db: AsyncSession) -> Sequence[SysJob]:
+ """
+ 获取全部定时任务列表信息
+
+ :param db: orm对象
+ :return: 定时任务列表信息对象
+ """
+ job_list = (await db.execute(select(SysJob).distinct())).scalars().all()
+
+ return job_list
+
@classmethod
async def add_job_dao(cls, db: AsyncSession, job: JobModel) -> SysJob:
"""
diff --git a/ruoyi-fastapi-backend/module_admin/service/captcha_service.py b/ruoyi-fastapi-backend/module_admin/service/captcha_service.py
index 5187d74..b90caa0 100644
--- a/ruoyi-fastapi-backend/module_admin/service/captcha_service.py
+++ b/ruoyi-fastapi-backend/module_admin/service/captcha_service.py
@@ -1,8 +1,8 @@
import base64
import io
-import os
import random
+import anyio
from PIL import Image, ImageDraw, ImageFont
@@ -20,7 +20,8 @@ async def create_captcha_image_service(cls) -> list[str, int]:
draw = ImageDraw.Draw(image)
# 设置字体
- font = ImageFont.truetype(os.path.join(os.path.abspath(os.getcwd()), 'assets', 'font', 'Arial.ttf'), size=30)
+ font_path = (await anyio.Path.cwd()) / 'assets' / 'font' / 'Arial.ttf'
+ font = ImageFont.truetype(font_path, size=30)
# 生成两个0-9之间的随机整数
num1 = random.randint(0, 9)
diff --git a/ruoyi-fastapi-backend/module_admin/service/job_service.py b/ruoyi-fastapi-backend/module_admin/service/job_service.py
index b785e4a..5e73135 100644
--- a/ruoyi-fastapi-backend/module_admin/service/job_service.py
+++ b/ruoyi-fastapi-backend/module_admin/service/job_service.py
@@ -83,6 +83,7 @@ async def add_job_services(cls, query_db: AsyncSession, page_object: JobModel) -
if job_info.status == '0':
SchedulerUtil.add_scheduler_job(job_info=job_info)
await query_db.commit()
+ await SchedulerUtil.request_scheduler_sync()
result = {'is_success': True, 'message': '新增成功'}
except Exception as e:
await query_db.rollback()
@@ -144,6 +145,7 @@ async def edit_job_services(cls, query_db: AsyncSession, page_object: EditJobMod
job_info = await cls.job_detail_services(query_db, edit_job.get('job_id'))
SchedulerUtil.add_scheduler_job(job_info=job_info)
await query_db.commit()
+ await SchedulerUtil.request_scheduler_sync()
return CrudResponseModel(is_success=True, message='更新成功')
except Exception as e:
await query_db.rollback()
@@ -183,6 +185,7 @@ async def delete_job_services(cls, query_db: AsyncSession, page_object: DeleteJo
await JobDao.delete_job_dao(query_db, JobModel(jobId=job_id))
SchedulerUtil.remove_scheduler_job(job_id=job_id)
await query_db.commit()
+ await SchedulerUtil.request_scheduler_sync()
return CrudResponseModel(is_success=True, message='删除成功')
except Exception as e:
await query_db.rollback()
diff --git a/ruoyi-fastapi-backend/module_admin/service/log_service.py b/ruoyi-fastapi-backend/module_admin/service/log_service.py
index e411711..edce948 100644
--- a/ruoyi-fastapi-backend/module_admin/service/log_service.py
+++ b/ruoyi-fastapi-backend/module_admin/service/log_service.py
@@ -1,10 +1,19 @@
+import asyncio
+import hashlib
+import json
+import os
+import uuid
from typing import Any
from fastapi import Request
+from redis import asyncio as aioredis
from sqlalchemy.ext.asyncio import AsyncSession
from common.vo import CrudResponseModel, PageModel
+from config.database import AsyncSessionLocal
+from config.env import LogConfig
from exceptions.exception import ServiceException
+from middlewares.trace_middleware.ctx import TraceCtx
from module_admin.dao.log_dao import LoginLogDao, OperationLogDao
from module_admin.entity.vo.log_vo import (
DeleteLoginLogModel,
@@ -17,6 +26,7 @@
)
from module_admin.service.dict_service import DictDataService
from utils.excel_util import ExcelUtil
+from utils.log_util import logger
class OperationLogService:
@@ -261,3 +271,223 @@ async def export_login_log_list_services(login_log_list: list) -> bytes:
binary_data = ExcelUtil.export_list2excel(login_log_list, mapping_dict)
return binary_data
+
+
+class LogQueueService:
+ @classmethod
+ def _build_event_id(cls, request_id: str, log_type: str, source: str) -> str:
+ """
+ 生成日志事件唯一标识
+
+ :param request_id: 请求唯一标识
+ :param log_type: 日志类型
+ :param source: 日志来源
+ :return: 事件唯一标识
+ """
+ if not request_id:
+ return uuid.uuid4().hex
+ base = f'{request_id}:{log_type}:{source}'
+ return hashlib.md5(base.encode('utf-8')).hexdigest()
+
+ @classmethod
+ async def _xadd_event(cls, redis: aioredis.Redis, event_type: str, payload: dict, source: str) -> None:
+ """
+ 写入日志事件到Redis Streams
+
+ :param redis: Redis连接对象
+ :param event_type: 事件类型
+ :param payload: 事件负载
+ :param source: 日志来源
+ :return: None
+ """
+ request_id = TraceCtx.get_request_id()
+ trace_id = TraceCtx.get_trace_id()
+ span_id = TraceCtx.get_span_id()
+ event_id = cls._build_event_id(request_id, event_type, source)
+ await redis.xadd(
+ LogConfig.log_stream_key,
+ {
+ 'event_id': event_id,
+ 'event_type': event_type,
+ 'request_id': request_id,
+ 'trace_id': trace_id,
+ 'span_id': span_id,
+ 'payload': json.dumps(payload, ensure_ascii=False, default=str),
+ },
+ maxlen=LogConfig.log_stream_maxlen,
+ approximate=True,
+ )
+
+ @classmethod
+ async def enqueue_login_log(cls, request: Request, login_log: LogininforModel, source: str) -> None:
+ """
+ 登录日志入队
+
+ :param request: Request对象
+ :param login_log: 登录日志模型
+ :param source: 日志来源
+ :return: None
+ """
+ payload = login_log.model_dump(by_alias=True, exclude_none=True)
+ await cls._xadd_event(request.app.state.redis, 'login', payload, source)
+
+ @classmethod
+ async def enqueue_operation_log(cls, request: Request, operation_log: OperLogModel, source: str) -> None:
+ """
+ 操作日志入队
+
+ :param request: Request对象
+ :param operation_log: 操作日志模型
+ :param source: 日志来源
+ :return: None
+ """
+ payload = operation_log.model_dump(by_alias=True, exclude_none=True)
+ await cls._xadd_event(request.app.state.redis, 'operation', payload, source)
+
+
+class LogAggregatorService:
+ @classmethod
+ async def _ensure_group(cls, redis: aioredis.Redis) -> None:
+ """
+ 初始化消费组
+
+ :param redis: Redis连接对象
+ :return: None
+ """
+ try:
+ await redis.xgroup_create(
+ name=LogConfig.log_stream_key,
+ groupname=LogConfig.log_stream_group,
+ id='0-0',
+ mkstream=True,
+ )
+ except Exception as exc:
+ if 'BUSYGROUP' not in str(exc):
+ raise
+
+ @classmethod
+ async def _acquire_dedup(cls, redis: aioredis.Redis, event_id: str) -> bool:
+ """
+ 获取去重锁
+
+ :param redis: Redis连接对象
+ :param event_id: 事件唯一标识
+ :return: 是否获取成功
+ """
+ if not event_id:
+ return False
+ key = f'{LogConfig.log_stream_dedup_prefix}:{event_id}'
+ return await redis.set(key, '1', nx=True, ex=LogConfig.log_stream_dedup_ttl)
+
+ @classmethod
+ async def _release_dedup(cls, redis: aioredis.Redis, event_id: str) -> None:
+ """
+ 释放去重锁
+
+ :param redis: Redis连接对象
+ :param event_id: 事件唯一标识
+ :return: None
+ """
+ if not event_id:
+ return
+ await redis.delete(f'{LogConfig.log_stream_dedup_prefix}:{event_id}')
+
+ @classmethod
+ async def _claim_pending(cls, redis: aioredis.Redis, consumer_name: str) -> None:
+ if LogConfig.log_stream_claim_idle_ms <= 0:
+ return
+ start_id = '0-0'
+ while True:
+ result = await redis.xautoclaim(
+ name=LogConfig.log_stream_key,
+ groupname=LogConfig.log_stream_group,
+ consumername=consumer_name,
+ min_idle_time=LogConfig.log_stream_claim_idle_ms,
+ start_id=start_id,
+ count=LogConfig.log_stream_claim_batch_size,
+ )
+ if not result:
+ return
+ next_start_id, messages = result[0], result[1]
+ if messages:
+ await cls._process_messages(redis, LogConfig.log_stream_key, messages)
+ if not messages or next_start_id == start_id:
+ return
+ start_id = next_start_id
+
+ @classmethod
+ async def consume_stream(cls, redis: aioredis.Redis) -> None:
+ """
+ 消费日志队列
+
+ :param redis: Redis连接对象
+ :return: None
+ """
+ await cls._ensure_group(redis)
+ consumer_name = f'{LogConfig.log_stream_consumer_prefix}-{os.getpid()}-{uuid.uuid4().hex[:6]}'
+ last_claim_time = 0.0
+ while True:
+ try:
+ now = asyncio.get_running_loop().time()
+ if now - last_claim_time >= LogConfig.log_stream_claim_interval_ms / 1000:
+ await cls._claim_pending(redis, consumer_name)
+ last_claim_time = now
+ result = await redis.xreadgroup(
+ groupname=LogConfig.log_stream_group,
+ consumername=consumer_name,
+ streams={LogConfig.log_stream_key: '>'},
+ count=LogConfig.log_stream_batch_size,
+ block=LogConfig.log_stream_block_ms,
+ )
+ if not result:
+ continue
+ for stream_name, messages in result:
+ await cls._process_messages(redis, stream_name, messages)
+ except asyncio.CancelledError:
+ raise
+ except Exception as exc:
+ logger.error(f'日志聚合消费异常: {exc}')
+ await asyncio.sleep(1)
+
+ @classmethod
+ async def _process_messages(cls, redis: aioredis.Redis, stream_name: str, messages: list[tuple[str, dict]]) -> None:
+ """
+ 处理消息并落库
+
+ :param redis: Redis连接对象
+ :param stream_name: Stream名称
+ :param messages: 消息列表
+ :return: None
+ """
+ if not messages:
+ return
+ async with AsyncSessionLocal() as session:
+ ack_ids: list[str] = []
+ dedup_event_ids: list[str] = []
+ try:
+ for message_id, data in messages:
+ event_type = data.get('event_type')
+ event_id = data.get('event_id')
+ payload_raw = data.get('payload') or '{}'
+ if event_type not in {'login', 'operation'}:
+ ack_ids.append(message_id)
+ continue
+ acquired = await cls._acquire_dedup(redis, event_id)
+ if not acquired:
+ ack_ids.append(message_id)
+ continue
+ dedup_event_ids.append(event_id)
+ payload = json.loads(payload_raw)
+ if event_type == 'login':
+ await LoginLogDao.add_login_log_dao(session, LogininforModel(**payload))
+ elif event_type == 'operation':
+ await OperationLogDao.add_operation_log_dao(session, OperLogModel(**payload))
+ ack_ids.append(message_id)
+ if ack_ids:
+ await session.commit()
+ await redis.xack(stream_name, LogConfig.log_stream_group, *ack_ids)
+ except Exception:
+ await session.rollback()
+ for event_id in dedup_event_ids:
+ await cls._release_dedup(redis, event_id)
+ raise
diff --git a/ruoyi-fastapi-backend/module_admin/service/server_service.py b/ruoyi-fastapi-backend/module_admin/service/server_service.py
index 5a39698..ca84090 100644
--- a/ruoyi-fastapi-backend/module_admin/service/server_service.py
+++ b/ruoyi-fastapi-backend/module_admin/service/server_service.py
@@ -3,6 +3,7 @@
import socket
import time
+import anyio
import psutil
from module_admin.entity.vo.server_vo import CpuInfo, MemoryInfo, PyInfo, ServerMonitorModel, SysFiles, SysInfo
@@ -41,7 +42,7 @@ async def get_server_monitor_info() -> ServerMonitorModel:
os_name = platform.platform()
computer_name = platform.node()
os_arch = platform.machine()
- user_dir = os.path.abspath(os.getcwd())
+ user_dir = str(await anyio.Path.cwd())
sys = SysInfo(
computerIp=computer_ip, computerName=computer_name, osArch=os_arch, osName=os_name, userDir=user_dir
)
diff --git a/ruoyi-fastapi-backend/server.py b/ruoyi-fastapi-backend/server.py
index 42bc4bd..d463d9d 100644
--- a/ruoyi-fastapi-backend/server.py
+++ b/ruoyi-fastapi-backend/server.py
@@ -1,61 +1,134 @@
+import asyncio
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from fastapi import FastAPI
+from common.constant import LockConstant
from common.router import auto_register_routers
from config.env import AppConfig
-from config.get_db import init_create_table
+from config.get_db import close_async_engine, init_create_table
from config.get_redis import RedisUtil
from config.get_scheduler import SchedulerUtil
from exceptions.handle import handle_exception
from middlewares.handle import handle_middleware
+from module_admin.service.log_service import LogAggregatorService
from sub_applications.handle import handle_sub_applications
from utils.common_util import worship
from utils.log_util import logger
-from utils.server_util import APIDocsUtil, IPUtil
+from utils.server_util import APIDocsUtil, IPUtil, StartupUtil
+
+
+async def _start_background_tasks(app: FastAPI) -> None:
+ """
+ 启动应用后台任务
+
+ :param app: FastAPI对象
+ :return: None
+ """
+ await SchedulerUtil.init_system_scheduler(app.state.redis)
+ app.state.log_aggregator_task = asyncio.create_task(LogAggregatorService.consume_stream(app.state.redis))
+
+
+async def _stop_background_tasks(app: FastAPI) -> None:
+ """
+ 停止应用后台任务并释放资源
+
+ :param app: FastAPI对象
+ :return: None
+ """
+ log_task = getattr(app.state, 'log_aggregator_task', None)
+ if log_task:
+ log_task.cancel()
+ try:
+ await log_task
+ except asyncio.CancelledError:
+ pass
+ lock_task = getattr(app.state, 'lock_renewal_task', None)
+ if lock_task:
+ lock_task.cancel()
+ try:
+ await lock_task
+ except asyncio.CancelledError:
+ pass
+ await RedisUtil.close_redis_pool(app)
+ await SchedulerUtil.close_system_scheduler()
+ await close_async_engine()
# 生命周期事件
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
- logger.info(f'⏰️ {AppConfig.app_name}开始启动')
- worship()
- await init_create_table()
- app.state.redis = await RedisUtil.create_redis_pool()
- await RedisUtil.init_sys_dict(app.state.redis)
- await RedisUtil.init_sys_config(app.state.redis)
- await SchedulerUtil.init_system_scheduler()
- logger.info(f'🚀 {AppConfig.app_name}启动成功')
- host = AppConfig.app_host
- port = AppConfig.app_port
- if host == '0.0.0.0':
- local_ip = IPUtil.get_local_ip()
- network_ips = IPUtil.get_network_ips()
- else:
- local_ip = host
- network_ips = [host]
-
- app_links = [f'🏠 Local: http://{local_ip}:{port}']
- app_links.extend(f'📡 Network: http://{ip}:{port}' for ip in network_ips)
- logger.opt(colors=True).info('💻 应用地址:\n' + '\n'.join(app_links))
-
- if not AppConfig.app_disable_swagger:
- swagger_links = [f'🏠 Local: http://{local_ip}:{port}{APIDocsUtil.docs_url()}']
- swagger_links.extend(
- f'📡 Network: http://{ip}:{port}{APIDocsUtil.docs_url()}' for ip in network_ips
- )
- logger.opt(colors=True).info('📄 Swagger文档:\n' + '\n'.join(swagger_links))
+ """
+ 应用生命周期管理
+
+ :param app: FastAPI对象
+ :return: None
+ """
+ app.state.redis = await RedisUtil.create_redis_pool(log_enabled=False)
+ startup_log_enabled = await StartupUtil.acquire_startup_log_gate(
+ redis=app.state.redis,
+ lock_key=LockConstant.APP_STARTUP_LOCK_KEY,
+ worker_id=SchedulerUtil._worker_id,
+ lock_expire_seconds=LockConstant.LOCK_EXPIRE_SECONDS,
+ )
+ app.state.startup_log_enabled = startup_log_enabled
- if not AppConfig.app_disable_redoc:
- redoc_links = [f'🏠 Local: http://{local_ip}:{port}{APIDocsUtil.redoc_url()}']
- redoc_links.extend(
- f'📡 Network: http://{ip}:{port}{APIDocsUtil.redoc_url()}' for ip in network_ips
+ # 获取锁成功后立即启动锁续期任务,避免初始化时间过长导致锁过期
+ if startup_log_enabled:
+ app.state.lock_renewal_task = StartupUtil.start_lock_renewal(
+ redis=app.state.redis,
+ lock_key=LockConstant.APP_STARTUP_LOCK_KEY,
+ worker_id=SchedulerUtil._worker_id,
+ lock_expire_seconds=LockConstant.LOCK_EXPIRE_SECONDS,
+ interval_seconds=LockConstant.LOCK_RENEWAL_INTERVAL,
+ on_lock_lost=SchedulerUtil.on_lock_lost,
)
- logger.opt(colors=True).info('📚 ReDoc文档:\n' + '\n'.join(redoc_links))
+
+ with logger.contextualize(startup_phase=True, startup_log_enabled=startup_log_enabled):
+ logger.info(f'⏰️ {AppConfig.app_name}开始启动')
+ if startup_log_enabled:
+ worship()
+ await init_create_table()
+ await RedisUtil.check_redis_connection(app.state.redis, log_enabled=startup_log_enabled)
+ await RedisUtil.init_sys_dict(app.state.redis)
+ await RedisUtil.init_sys_config(app.state.redis)
+ await _start_background_tasks(app)
+
+ if startup_log_enabled:
+ # 短暂等待确保下面的启动日志在最后打印
+ await asyncio.sleep(0.5)
+ logger.info(f'🚀 {AppConfig.app_name}启动成功')
+ host = AppConfig.app_host
+ port = AppConfig.app_port
+ if host == '0.0.0.0':
+ local_ip = IPUtil.get_local_ip()
+ network_ips = IPUtil.get_network_ips()
+ else:
+ local_ip = host
+ network_ips = [host]
+
+ app_links = [f'🏠 Local: http://{local_ip}:{port}']
+ app_links.extend(f'📡 Network: http://{ip}:{port}' for ip in network_ips)
+ logger.opt(colors=True).info('💻 应用地址:\n' + '\n'.join(app_links))
+
+ if not AppConfig.app_disable_swagger:
+ swagger_links = [f'🏠 Local: http://{local_ip}:{port}{APIDocsUtil.docs_url()}']
+ swagger_links.extend(
+ f'📡 Network: http://{ip}:{port}{APIDocsUtil.docs_url()}' for ip in network_ips
+ )
+ logger.opt(colors=True).info('📄 Swagger文档:\n' + '\n'.join(swagger_links))
+
+ if not AppConfig.app_disable_redoc:
+ redoc_links = [f'🏠 Local: http://{local_ip}:{port}{APIDocsUtil.redoc_url()}']
+ redoc_links.extend(
+ f'📡 Network: http://{ip}:{port}{APIDocsUtil.redoc_url()}' for ip in network_ips
+ )
+ logger.opt(colors=True).info('📚 ReDoc文档:\n' + '\n'.join(redoc_links))
yield
- await RedisUtil.close_redis_pool(app)
- await SchedulerUtil.close_system_scheduler()
+ shutdown_log_enabled = getattr(app.state, 'startup_log_enabled', False)
+ with logger.contextualize(startup_phase=True, startup_log_enabled=shutdown_log_enabled):
+ await _stop_background_tasks(app)
def create_app() -> FastAPI:
diff --git a/ruoyi-fastapi-backend/utils/gen_util.py b/ruoyi-fastapi-backend/utils/gen_util.py
index c08ceec..9549546 100644
--- a/ruoyi-fastapi-backend/utils/gen_util.py
+++ b/ruoyi-fastapi-backend/utils/gen_util.py
@@ -126,7 +126,7 @@ def get_module_name(cls, package_name: str) -> str:
param package_name: 包名
:return: 模块名
"""
- return package_name.split('.')[-1]
+ return package_name.rsplit('.', maxsplit=1)[-1]
@classmethod
def get_business_name(cls, table_name: str) -> str:
@@ -136,7 +136,7 @@ def get_business_name(cls, table_name: str) -> str:
param table_name: 业务表名
:return: 业务名
"""
- return table_name.split('_')[-1]
+ return table_name.rsplit('_', maxsplit=1)[-1]
@classmethod
def convert_class_name(cls, table_name: str) -> str:
@@ -186,7 +186,7 @@ def get_db_type(cls, column_type: str) -> str:
:return: 数据库类型
"""
if '(' in column_type:
- return column_type.split('(')[0]
+ return column_type.split('(', maxsplit=1)[0]
return column_type
@classmethod
diff --git a/ruoyi-fastapi-backend/utils/log_util.py b/ruoyi-fastapi-backend/utils/log_util.py
index 8996306..0817faf 100644
--- a/ruoyi-fastapi-backend/utils/log_util.py
+++ b/ruoyi-fastapi-backend/utils/log_util.py
@@ -1,59 +1,204 @@
+import json
+import logging
import os
import sys
-import time
+from typing import Any
from loguru import logger as _logger
from loguru._logger import Logger
+from config.env import AppConfig, LogConfig
from middlewares.trace_middleware import TraceCtx
+from utils.server_util import WorkerIdUtil
+
+
+class InterceptHandler(logging.Handler):
+ def emit(self, record: logging.LogRecord) -> None:
+ """
+ 拦截标准 logging 记录并转发到 Loguru
+
+ :param record: 原生 logging 日志记录
+ :return: None
+ """
+ try:
+ level = _logger.level(record.levelname).name
+ except ValueError:
+ level = record.levelno
+ frame, depth = logging.currentframe(), 2
+ while frame and frame.f_code.co_filename == logging.__file__:
+ frame = frame.f_back
+ depth += 1
+ _logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())
class LoggerInitializer:
def __init__(self) -> None:
- self.log_path = os.path.join(os.getcwd(), 'logs')
- self.__ensure_log_directory_exists()
- self.log_path_error = os.path.join(self.log_path, f'{time.strftime("%Y-%m-%d")}_error.log')
+ """
+ 初始化日志基础配置与运行时标识
+
+ :return: None
+ """
+ self.worker_id = WorkerIdUtil.get_worker_id(LogConfig.log_worker_id)
+ self.instance_id = LogConfig.log_instance_id
+ self.service_name = LogConfig.log_service_name or AppConfig.app_name
+ self._log_file_enabled = LogConfig.log_file_enabled
+ self._log_base_dir = LogConfig.log_file_base_dir
+ self._ensure_log_directory_exists()
+
+ def _ensure_log_directory_exists(self) -> None:
+ """
+ 确保日志目录存在
+
+ :return: None
+ """
+ if not self._log_file_enabled:
+ return
+ if self._log_base_dir and not os.path.exists(self._log_base_dir):
+ os.makedirs(self._log_base_dir, exist_ok=True)
+
+ def _filter(self, record: dict) -> bool:
+ """
+ 注入 Trace 上下文并控制启动阶段日志输出
- def __ensure_log_directory_exists(self) -> None:
+ :param record: Loguru 日志记录字典
+ :return: 是否允许输出日志
"""
- 确保日志目录存在,如果不存在则创建
+ record['extra']['trace_id'] = TraceCtx.get_trace_id()
+ record['extra']['request_id'] = TraceCtx.get_request_id()
+ record['extra']['span_id'] = TraceCtx.get_span_id()
+ record['extra']['path'] = TraceCtx.get_request_path()
+ record['extra']['method'] = TraceCtx.get_request_method()
+ record['extra']['worker_id'] = self.worker_id
+ record['extra']['instance_id'] = self.instance_id
+ record['extra']['service'] = self.service_name
+ if record['extra'].get('startup_phase'):
+ return bool(record['extra'].get('startup_log_enabled'))
+ return True
+
+ def _stdout_sink(self, message: Any) -> None:
+ """
+ 将 Loguru 日志记录序列化为 JSON 并输出到 stdout
+
+ :param message: Loguru 消息对象
+ :return: None
+ """
+ record = message.record
+ exception = None
+ if record['exception']:
+ exception = {
+ 'type': record['exception'].type.__name__ if record['exception'].type else None,
+ 'value': str(record['exception'].value) if record['exception'].value else None,
+ 'traceback': str(record['exception'].traceback) if record['exception'].traceback else None,
+ }
+ payload = {
+ 'timestamp': record['time'].isoformat(),
+ 'level': record['level'].name,
+ 'message': record['message'],
+ 'logger': record['name'],
+ 'trace_id': record['extra'].get('trace_id'),
+ 'request_id': record['extra'].get('request_id'),
+ 'span_id': record['extra'].get('span_id'),
+ 'worker_id': record['extra'].get('worker_id'),
+ 'instance_id': record['extra'].get('instance_id'),
+ 'service': record['extra'].get('service'),
+ 'method': record['extra'].get('method'),
+ 'path': record['extra'].get('path'),
+ 'module': record['module'],
+ 'function': record['function'],
+ 'line': record['line'],
+ 'exception': exception,
+ 'extra': record['extra'],
+ }
+ sys.stdout.write(json.dumps(payload, ensure_ascii=False, default=str) + '\n')
+
+ def _info_file_filter(self, record: dict) -> bool:
+ """
+ 仅输出 INFO 级别日志到 info 文件
+
+ :param record: Loguru 日志记录字典
+ :return: 是否允许输出日志
"""
- if not os.path.exists(self.log_path):
- os.mkdir(self.log_path)
+ return self._filter(record) and record['level'].name == 'INFO'
- @staticmethod
- def __filter(log: dict) -> dict:
+ def _error_file_filter(self, record: dict) -> bool:
"""
- 自定义日志过滤器,添加trace_id
+ 输出 WARNING 及以上日志到 error 文件
+
+ :param record: Loguru 日志记录字典
+ :return: 是否允许输出日志
+ """
+ return self._filter(record) and record['level'].no >= logging.WARNING
+
+ def _configure_logging(self) -> None:
"""
- log['trace_id'] = TraceCtx.get_id()
- return log
+ 统一接管标准 logging 与第三方日志输出
+
+ :return: None
+ """
+ logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
+ for logger_name in ('uvicorn', 'uvicorn.error', 'uvicorn.access', 'fastapi'):
+ logging.getLogger(logger_name).handlers = [InterceptHandler()]
+ logging.getLogger(logger_name).propagate = False
+ for logger_name in ('LiteLLM', 'litellm'):
+ logging.getLogger(logger_name).setLevel(logging.WARNING)
def init_log(self) -> Logger:
"""
- 初始化日志配置
+ 初始化 Loguru 输出与标准 logging 配置
+
+ :return: 已配置的 Loguru Logger 实例
"""
- # 自定义日志格式
- format_str = (
- '{time:YYYY-MM-DD HH:mm:ss.SSS} | '
- '{trace_id} | '
- '{level: <8} | '
- '{name}:{function}:{line} - '
- '{message}'
- )
_logger.remove()
- # 移除后重新添加sys.stderr, 目的: 控制台输出与文件日志内容和结构一致
- _logger.add(sys.stderr, filter=self.__filter, format=format_str, enqueue=True)
- _logger.add(
- self.log_path_error,
- filter=self.__filter,
- format=format_str,
- rotation='50MB',
- encoding='utf-8',
- enqueue=True,
- compression='zip',
- )
-
+ info_log_path = os.path.join(self._log_base_dir, '{time:YYYY}', '{time:MM}', '{time:DD}', 'info.log')
+ error_log_path = os.path.join(self._log_base_dir, '{time:YYYY}', '{time:MM}', '{time:DD}', 'error.log')
+ if LogConfig.loguru_stdout:
+ if LogConfig.loguru_json:
+ _logger.add(
+ self._stdout_sink,
+ level=LogConfig.loguru_level,
+ enqueue=True,
+ filter=self._filter,
+ )
+ else:
+ format_str = (
+ '{time:YYYY-MM-DD HH:mm:ss.SSS} | '
+ '{extra[trace_id]} | '
+ '{extra[span_id]} | '
+ '{extra[request_id]} | '
+ '{extra[worker_id]} | '
+ '{level: <8} | '
+ '{name}:{function}:{line} - '
+ '{message}'
+ )
+ _logger.add(
+ sys.stdout,
+ level=LogConfig.loguru_level,
+ enqueue=True,
+ filter=self._filter,
+ format=format_str,
+ )
+ if self._log_file_enabled:
+ _logger.add(
+ info_log_path,
+ level='INFO',
+ rotation=LogConfig.loguru_rotation,
+ retention=LogConfig.loguru_retention,
+ compression=LogConfig.loguru_compression,
+ enqueue=True,
+ filter=self._info_file_filter,
+ serialize=LogConfig.loguru_json,
+ )
+ _logger.add(
+ error_log_path,
+ level='WARNING',
+ rotation=LogConfig.loguru_rotation,
+ retention=LogConfig.loguru_retention,
+ compression=LogConfig.loguru_compression,
+ enqueue=True,
+ filter=self._error_file_filter,
+ serialize=LogConfig.loguru_json,
+ )
+ self._configure_logging()
return _logger
diff --git a/ruoyi-fastapi-backend/utils/server_util.py b/ruoyi-fastapi-backend/utils/server_util.py
index 873fe89..b868685 100644
--- a/ruoyi-fastapi-backend/utils/server_util.py
+++ b/ruoyi-fastapi-backend/utils/server_util.py
@@ -1,5 +1,8 @@
+import asyncio
import ipaddress
+import os
import socket
+import uuid
from collections.abc import Callable
import psutil
@@ -7,6 +10,7 @@
from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html, get_swagger_ui_oauth2_redirect_html
from fastapi.openapi.utils import get_openapi
from fastapi.responses import HTMLResponse, JSONResponse
+from redis import asyncio as aioredis
from config.env import AppConfig
@@ -333,6 +337,93 @@ def _register_docs_routes(
app.add_route(url, redoc_handler, include_in_schema=False)
+class StartupUtil:
+ """
+ 启动门禁工具类
+ """
+
+ @classmethod
+ async def acquire_startup_log_gate(
+ cls, redis: aioredis.Redis, lock_key: str, worker_id: str, lock_expire_seconds: int
+ ) -> bool:
+ """
+ 获取启动日志门禁
+
+ :param redis: Redis连接对象
+ :param lock_key: 分布式锁key
+ :param worker_id: 当前worker标识
+ :param lock_expire_seconds: 锁过期时间
+ :return: 是否获得启动日志输出权
+ """
+ acquired = await redis.set(lock_key, worker_id, nx=True, ex=lock_expire_seconds)
+ if acquired:
+ return True
+ current_holder = await redis.get(lock_key)
+ return current_holder == worker_id
+
+ @classmethod
+ def start_lock_renewal(
+ cls,
+ redis: aioredis.Redis,
+ lock_key: str,
+ worker_id: str,
+ lock_expire_seconds: int,
+ interval_seconds: int,
+ on_lock_lost: Callable[[], None] | None = None,
+ ) -> asyncio.Task:
+ """
+ 启动分布式锁续期任务
+
+ :param redis: Redis连接对象
+ :param lock_key: 分布式锁key
+ :param worker_id: 当前worker标识
+ :param lock_expire_seconds: 锁过期时间
+ :param interval_seconds: 续期间隔时间
+ :param on_lock_lost: 失去锁时的回调
+ :return: 异步任务对象
+ """
+
+ async def _loop() -> None:
+ while True:
+ try:
+ current_holder = await redis.get(lock_key)
+ if current_holder == worker_id:
+ await redis.expire(lock_key, lock_expire_seconds)
+ await asyncio.sleep(interval_seconds)
+ continue
+ if on_lock_lost:
+ on_lock_lost()
+ break
+ except Exception:
+ await asyncio.sleep(interval_seconds)
+
+ return asyncio.create_task(_loop())
+
+
+class WorkerIdUtil:
+ """
+ Worker标识生成工具类
+ """
+
+ _worker_id: str | None = None
+
+ @classmethod
+ def get_worker_id(cls, configured_worker_id: str | None) -> str:
+ """
+ 获取当前worker标识
+
+ :param configured_worker_id: 配置的worker标识
+ :return: 当前worker标识
+ """
+ if cls._worker_id:
+ return cls._worker_id
+ worker_id = configured_worker_id
+ if not worker_id or worker_id.lower() == 'auto':
+ worker_id = f'{os.getpid()}-{uuid.uuid4().hex[:6]}'
+ cls._worker_id = worker_id
+ return worker_id
+
+
class IPUtil:
"""
IP工具类
diff --git a/ruoyi-fastapi-backend/utils/template_util.py b/ruoyi-fastapi-backend/utils/template_util.py
index 9983fa5..89cc412 100644
--- a/ruoyi-fastapi-backend/utils/template_util.py
+++ b/ruoyi-fastapi-backend/utils/template_util.py
@@ -293,7 +293,7 @@ def get_db_type(cls, column_type: str) -> str:
:return: 数据库类型
"""
if '(' in column_type:
- return column_type.split('(')[0]
+ return column_type.split('(', maxsplit=1)[0]
return column_type
@classmethod