From 08952f8d1c815de8eb1965a2dbb2093122b4d770 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Sat, 31 Jan 2026 17:09:11 +0800 Subject: [PATCH 01/20] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E7=AE=97=E5=AD=90=E6=A0=A1=E9=AA=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../infrastructure/persistence/Impl/OperatorRepositoryImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/persistence/Impl/OperatorRepositoryImpl.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/persistence/Impl/OperatorRepositoryImpl.java index 7b43869b..a36b2649 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/persistence/Impl/OperatorRepositoryImpl.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/persistence/Impl/OperatorRepositoryImpl.java @@ -48,7 +48,7 @@ public int countOperatorByStar(boolean isStar) { @Override public boolean operatorInTemplateOrRunning(String operatorId) { - return mapper.operatorInTemplate(operatorId) > 0 && mapper.operatorInUnstopTask(operatorId) > 0; + return mapper.operatorInTemplate(operatorId) > 0 || mapper.operatorInUnstopTask(operatorId) > 0; } @Override From e0da089a61ec6246e615d0830606b33726e8c1fa Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Sat, 31 Jan 2026 18:04:29 +0800 Subject: [PATCH 02/20] =?UTF-8?q?=E7=AE=97=E5=AD=90=E5=B8=82=E5=9C=BApytho?= =?UTF-8?q?n=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gateway/ApiGatewayApplication.java | 4 + .../datamate-python/app/db/models/__init__.py | 16 + .../app/db/models/chunk_upload.py | 38 ++ .../datamate-python/app/db/models/operator.py | 70 ++ .../datamate-python/app/module/__init__.py | 4 + .../app/module/operator/README.md | 138 ++++ .../app/module/operator/__init__.py | 4 + .../app/module/operator/constants.py | 50 ++ .../app/module/operator/exceptions.py | 72 +++ .../app/module/operator/interface/__init__.py | 9 + .../operator/interface/category_routes.py | 43 ++ .../operator/interface/operator_routes.py | 270 ++++++++ .../app/module/operator/parsers/__init__.py | 15 + .../operator/parsers/abstract_parser.py | 97 +++ .../module/operator/parsers/parser_holder.py | 52 ++ .../app/module/operator/parsers/tar_parser.py | 41 ++ .../app/module/operator/parsers/zip_parser.py | 41 ++ .../module/operator/repository/__init__.py | 15 + .../category_relation_repository.py | 77 +++ .../repository/category_repository.py | 23 + .../repository/operator_release_repository.py | 72 +++ .../repository/operator_repository.py | 121 ++++ .../app/module/operator/schema/__init__.py | 29 + .../app/module/operator/schema/category.py | 40 ++ .../app/module/operator/schema/operator.py | 72 +++ .../app/module/operator/schema/release.py | 22 + .../app/module/operator/service/__init__.py | 11 + .../operator/service/category_service.py | 101 +++ .../operator/service/operator_service.py | 599 ++++++++++++++++++ .../app/module/shared/__init__.py | 21 + .../module/shared/chunk_upload_repository.py | 95 +++ .../app/module/shared/chunks_saver.py | 146 +++++ .../app/module/shared/file_models.py | 38 ++ .../app/module/shared/file_service.py | 187 ++++++ scripts/db/data-operator-init.sql | 13 +- scripts/images/backend-python/Dockerfile | 1 + 36 files changed, 2646 insertions(+), 1 deletion(-) create mode 100644 runtime/datamate-python/app/db/models/chunk_upload.py create mode 100644 runtime/datamate-python/app/db/models/operator.py create mode 100644 runtime/datamate-python/app/module/operator/README.md create mode 100644 runtime/datamate-python/app/module/operator/__init__.py create mode 100644 runtime/datamate-python/app/module/operator/constants.py create mode 100644 runtime/datamate-python/app/module/operator/exceptions.py create mode 100644 runtime/datamate-python/app/module/operator/interface/__init__.py create mode 100644 runtime/datamate-python/app/module/operator/interface/category_routes.py create mode 100644 runtime/datamate-python/app/module/operator/interface/operator_routes.py create mode 100644 runtime/datamate-python/app/module/operator/parsers/__init__.py create mode 100644 runtime/datamate-python/app/module/operator/parsers/abstract_parser.py create mode 100644 runtime/datamate-python/app/module/operator/parsers/parser_holder.py create mode 100644 runtime/datamate-python/app/module/operator/parsers/tar_parser.py create mode 100644 runtime/datamate-python/app/module/operator/parsers/zip_parser.py create mode 100644 runtime/datamate-python/app/module/operator/repository/__init__.py create mode 100644 runtime/datamate-python/app/module/operator/repository/category_relation_repository.py create mode 100644 runtime/datamate-python/app/module/operator/repository/category_repository.py create mode 100644 runtime/datamate-python/app/module/operator/repository/operator_release_repository.py create mode 100644 runtime/datamate-python/app/module/operator/repository/operator_repository.py create mode 100644 runtime/datamate-python/app/module/operator/schema/__init__.py create mode 100644 runtime/datamate-python/app/module/operator/schema/category.py create mode 100644 runtime/datamate-python/app/module/operator/schema/operator.py create mode 100644 runtime/datamate-python/app/module/operator/schema/release.py create mode 100644 runtime/datamate-python/app/module/operator/service/__init__.py create mode 100644 runtime/datamate-python/app/module/operator/service/category_service.py create mode 100644 runtime/datamate-python/app/module/operator/service/operator_service.py create mode 100644 runtime/datamate-python/app/module/shared/chunk_upload_repository.py create mode 100644 runtime/datamate-python/app/module/shared/chunks_saver.py create mode 100644 runtime/datamate-python/app/module/shared/file_models.py create mode 100644 runtime/datamate-python/app/module/shared/file_service.py diff --git a/backend/api-gateway/src/main/java/com/datamate/gateway/ApiGatewayApplication.java b/backend/api-gateway/src/main/java/com/datamate/gateway/ApiGatewayApplication.java index ee504973..de9e1f28 100644 --- a/backend/api-gateway/src/main/java/com/datamate/gateway/ApiGatewayApplication.java +++ b/backend/api-gateway/src/main/java/com/datamate/gateway/ApiGatewayApplication.java @@ -45,6 +45,10 @@ public RouteLocator customRouteLocator(RouteLocatorBuilder builder) { .route("python-service", r -> r.path("/api/rag/**", "api/models/**") .uri("http://datamate-backend-python:18000")) + // 数据评估服务路由 + .route("data-operator", r -> r.path("/api/operators/**", "api/categories/**") + .uri("http://datamate-backend-python:18000")) + .route("deer-flow-frontend", r -> r.path("/chat/**") .uri("http://deer-flow-frontend:3000")) diff --git a/runtime/datamate-python/app/db/models/__init__.py b/runtime/datamate-python/app/db/models/__init__.py index 2b83de26..060e4b64 100644 --- a/runtime/datamate-python/app/db/models/__init__.py +++ b/runtime/datamate-python/app/db/models/__init__.py @@ -21,6 +21,17 @@ EvaluationItem ) +from .operator import ( + Operator, + Category, + CategoryRelation, + OperatorRelease +) + +from .chunk_upload import ( + ChunkUploadPreRequest +) + __all__ = [ "Dataset", "DatasetTag", @@ -32,4 +43,9 @@ "LabelingProject", "EvaluationTask", "EvaluationItem", + "Operator", + "Category", + "CategoryRelation", + "OperatorRelease", + "ChunkUploadPreRequest", ] diff --git a/runtime/datamate-python/app/db/models/chunk_upload.py b/runtime/datamate-python/app/db/models/chunk_upload.py new file mode 100644 index 00000000..5b5a2b0c --- /dev/null +++ b/runtime/datamate-python/app/db/models/chunk_upload.py @@ -0,0 +1,38 @@ +""" +Chunk Upload Database Model +分片上传数据库模型 +""" +from sqlalchemy import Column, String, Integer, DateTime +from sqlalchemy.sql import func + +from app.db.models.base_entity import Base, BaseEntity + + +class ChunkUploadPreRequest(BaseEntity): + """分片上传预请求""" + __tablename__ = "t_chunk_upload_request" + + id = Column(String(36), primary_key=True, comment="请求ID") + total_file_num = Column(Integer, nullable=False, comment="总文件数") + uploaded_file_num = Column(Integer, nullable=True, comment="已上传文件数") + upload_path = Column(String(512), nullable=False, comment="文件路径") + timeout = Column(DateTime, nullable=False, comment="上传请求超时时间") + service_id = Column(String(64), nullable=True, comment="上传请求所属服务ID") + check_info = Column(String(512), nullable=True, comment="业务信息") + + def increment_uploaded_file_num(self): + """增加已上传文件数""" + if self.uploaded_file_num is None: + self.uploaded_file_num = 1 + else: + self.uploaded_file_num += 1 + + def is_upload_complete(self) -> bool: + """检查是否已完成上传""" + return (self.uploaded_file_num is not None and + self.uploaded_file_num == self.total_file_num) + + def is_request_timeout(self) -> bool: + """检查是否已超时""" + from datetime import datetime, timezone + return self.timeout is not None and datetime.now(timezone.utc) > self.timeout diff --git a/runtime/datamate-python/app/db/models/operator.py b/runtime/datamate-python/app/db/models/operator.py new file mode 100644 index 00000000..57362461 --- /dev/null +++ b/runtime/datamate-python/app/db/models/operator.py @@ -0,0 +1,70 @@ +""" +Operator Market Data Models +算子市场数据模型 +""" +from sqlalchemy import Column, String, Integer, Boolean, BigInteger, Text, JSON, TIMESTAMP, Index +from sqlalchemy.sql import func + +from app.db.models.base_entity import Base, BaseEntity + + +class Operator(BaseEntity): + """算子实体""" + __tablename__ = "t_operator" + + id = Column(String(36), primary_key=True, index=True, comment="算子ID") + name = Column(String(255), nullable=False, comment="算子名称") + description = Column(Text, nullable=True, comment="算子描述") + version = Column(String(50), nullable=False, comment="算子版本") + inputs = Column(Text, nullable=True, comment="输入定义(JSON)") + outputs = Column(Text, nullable=True, comment="输出定义(JSON)") + runtime = Column(Text, nullable=True, comment="运行时配置(JSON)") + settings = Column(Text, nullable=True, comment="算子设置(JSON)") + file_name = Column(String(255), nullable=True, comment="文件名") + file_size = Column(BigInteger, nullable=True, comment="文件大小(字节)") + metrics = Column(Text, nullable=True, comment="算子指标(JSON)") + usage_count = Column(Integer, default=0, nullable=False, comment="使用次数") + is_star = Column(Boolean, default=False, nullable=False, comment="是否收藏") + + __table_args__ = ( + Index("idx_is_star", "is_star"), + ) + + +class Category(BaseEntity): + """算子分类实体""" + __tablename__ = "t_operator_category" + + id = Column(String(36), primary_key=True, index=True, comment="分类ID") + name = Column(String(255), nullable=False, comment="分类名称") + value = Column(String(255), nullable=True, comment="分类值") + type = Column(String(50), nullable=True, comment="分类类型") + parent_id = Column(String(36), nullable=False, default="0", comment="父分类ID") + + +class CategoryRelation(BaseEntity): + """算子分类关系实体""" + __tablename__ = "t_operator_category_relation" + + category_id = Column(String(36), primary_key=True, comment="分类ID") + operator_id = Column(String(36), primary_key=True, comment="算子ID") + + __table_args__ = ( + Index("idx_category_id", "category_id"), + Index("idx_operator_id", "operator_id"), + ) + + +class OperatorRelease(BaseEntity): + """算子发布版本实体""" + __tablename__ = "t_operator_release" + + id = Column(String(36), primary_key=True, comment="算子ID") + version = Column(String(50), primary_key=True, comment="版本号") + release_date = Column(TIMESTAMP, nullable=False, default=func.now(), comment="发布时间") + changelog = Column(JSON, nullable=True, comment="更新日志列表") + + +# Ignore data scope for operator models +for model in [Operator, Category, CategoryRelation, OperatorRelease]: + model.__ignore_data_scope__ = True diff --git a/runtime/datamate-python/app/module/__init__.py b/runtime/datamate-python/app/module/__init__.py index 7d3c482b..9437b11d 100644 --- a/runtime/datamate-python/app/module/__init__.py +++ b/runtime/datamate-python/app/module/__init__.py @@ -7,6 +7,8 @@ from .evaluation.interface import router as evaluation_router from .collection.interface import router as collection_route from .rag.interface.rag_interface import router as rag_router +from .operator.interface import operator_router +from .operator.interface import category_router router = APIRouter( prefix="/api" @@ -19,5 +21,7 @@ router.include_router(evaluation_router) router.include_router(collection_route) router.include_router(rag_router) +router.include_router(operator_router) +router.include_router(category_router) __all__ = ["router"] diff --git a/runtime/datamate-python/app/module/operator/README.md b/runtime/datamate-python/app/module/operator/README.md new file mode 100644 index 00000000..703e8ed3 --- /dev/null +++ b/runtime/datamate-python/app/module/operator/README.md @@ -0,0 +1,138 @@ +# Operator Market Service - Python Implementation + +## 概述 + +这是 `operator-market-service` 的 Python 实现,已集成到 `runtime/datamate-python` 项目中。 + +## 功能 + +- **算子管理**:创建、查询、更新、删除算子 +- **分类管理**:树状分类结构查询 +- **文件上传**:支持算子文件上传和解析(支持 tar/zip 格式) +- **MCP 工具集成**:通过 fastapi-mcp 提供 MCP 工具接口 + +## 目录结构 + +``` +app/module/operator_market/ +├── __init__.py # 模块入口 +├── constants.py # 常量定义 +├── exceptions.py # 异常定义 +├── schema/ # Pydantic Schema 定义 +│ ├── __init__.py +│ ├── operator.py # 算子相关 Schema +│ ├── category.py # 分类相关 Schema +│ └── release.py # 发布版本 Schema +├── parsers/ # 文件解析器 +│ ├── __init__.py +│ ├── abstract_parser.py # 抽象解析器基类 +│ ├── tar_parser.py # TAR 文件解析器 +│ ├── zip_parser.py # ZIP 文件解析器 +│ └── parser_holder.py # 解析器持有者 +├── repository/ # 数据访问层 +│ ├── __init__.py +│ ├── operator_repository.py +│ ├── category_repository.py +│ ├── category_relation_repository.py +│ └── operator_release_repository.py +├── service/ # 服务层 +│ ├── __init__.py +│ ├── operator_service.py +│ └── category_service.py +└── interface/ # API 接口层 + ├── __init__.py + ├── operator_routes.py + └── category_routes.py +``` + +## API 端点 + +### 算子相关 (`/api/operator-market/operators`) + +| 方法 | 路径 | 描述 | +|------|--------|------| +| POST | `/list` | 查询算子列表(支持分页、分类过滤、关键词搜索) | +| GET | `/{operator_id}` | 获取算子详情 | +| PUT | `/{operator_id}` | 更新算子信息 | +| POST | `/create` | 创建新算子 | +| POST | `/upload` | 上传算子文件 | +| POST | `/upload/pre-upload` | 预上传(获取请求 ID) | +| POST | `/upload/chunk` | 分块上传 | +| DELETE | `/{operator_id}` | 删除算子 | +| GET | `/examples/download` | 下载示例算子 | + +### 分类相关 (`/api/operator-market/categories`) + +| 方法 | 路径 | 描述 | +|------|--------|------| +| GET | `/tree` | 获取分类树状结构 | + +## 数据库表 + +- `t_operator` - 算子表 +- `t_operator_category` - 分类表 +- `t_operator_category_relation` - 分类关系表 +- `t_operator_release` - 算子发布版本表 +- `v_operator` - 算子视图(包含分类信息) + +## 文件格式支持 + +算子文件需包含 `metadata.yml` 文件,格式如下: + +```yaml +raw_id: "operator-id" +name: "算子名称" +description: "算子描述" +version: "1.0.0" +language: "python" # python, java +modal: "text" # text, image, audio, video +vendor: "datamate" # datamate, data-juicer, or other +inputs: {...} +outputs: {...} +runtime: {...} +settings: {...} +metrics: {...} +release: + - "更新日志1" + - "更新日志2" +``` + +## 待实现功能 + +- [ ] 算子收藏功能完善 +- [ ] 标签过滤功能 + +## 使用示例 + +### 查询算子列表 + +```bash +curl -X POST "http://localhost:18000/api/operator-market/operators/list" \ + -H "Content-Type: application/json" \ + -d '{ + "page": 1, + "size": 10, + "keyword": "test", + "isStar": false + }' +``` + +### 获取分类树 + +```bash +curl -X GET "http://localhost:18000/api/operator-market/categories/tree" +``` + +### 创建算子 + +```bash +curl -X POST "http://localhost:18000/api/operator-market/operators/create" \ + -H "Content-Type: application/json" \ + -d '{ + "id": "new-operator-id", + "name": "新算子", + "description": "这是一个新算子", + "version": "1.0.0", + "fileName": "operator.tar" + }' +``` diff --git a/runtime/datamate-python/app/module/operator/__init__.py b/runtime/datamate-python/app/module/operator/__init__.py new file mode 100644 index 00000000..1ac84e31 --- /dev/null +++ b/runtime/datamate-python/app/module/operator/__init__.py @@ -0,0 +1,4 @@ +""" +Operator Market Service Module +算子市场服务模块 +""" diff --git a/runtime/datamate-python/app/module/operator/constants.py b/runtime/datamate-python/app/module/operator/constants.py new file mode 100644 index 00000000..e6d83ee9 --- /dev/null +++ b/runtime/datamate-python/app/module/operator/constants.py @@ -0,0 +1,50 @@ +""" +Operator Market Constants +算子市场常量定义 +""" + +# Service ID +SERVICE_ID = "operator" + +# YAML metadata path +YAML_PATH = "metadata.yml" + +# Example operator file path +EXAMPLE_OPERATOR_PATH = "/app/test_operator.tar" + +# Category IDs +CATEGORY_PYTHON = "python" +CATEGORY_PYTHON_ID = "9eda9d5d-072b-499b-916c-797a0a8750e1" + +CATEGORY_JAVA = "java" +CATEGORY_JAVA_ID = "b5bfc548-8ef6-417c-b8a6-a4197c078249" + +CATEGORY_CUSTOMIZED_ID = "ec2cdd17-8b93-4a81-88c4-ac9e98d10757" +CATEGORY_TEXT_ID = "d8a5df7a-52a9-42c2-83c4-01062e60f597" +CATEGORY_IMAGE_ID = "de36b61c-9e8a-4422-8c31-d30585c7100f" +CATEGORY_AUDIO_ID = "42dd9392-73e4-458c-81ff-41751ada47b5" +CATEGORY_VIDEO_ID = "a233d584-73c8-4188-ad5d-8f7c8dda9c27" +CATEGORY_ALL_ID = "4d7dbd77-0a92-44f3-9056-2cd62d4a71e4" +CATEGORY_STAR_ID = "51847c24-bba9-11f0-888b-5b143cb738aa" +CATEGORY_PREDEFINED_ID = "96a3b07a-3439-4557-a835-525faad60ca3" +CATEGORY_DATAMATE_ID = "431e7798-5426-4e1a-aae6-b9905a836b34" +CATEGORY_DATA_JUICER_ID = "79b385b4-fde8-4617-bcba-02a176938996" +CATEGORY_OTHER_VENDOR_ID = "f00eaa3e-96c1-4de4-96cd-9848ef5429ec" + +# Category mapping +CATEGORY_MAP = { + CATEGORY_PYTHON: CATEGORY_PYTHON_ID, + CATEGORY_JAVA: CATEGORY_JAVA_ID, + "text": CATEGORY_TEXT_ID, + "image": CATEGORY_IMAGE_ID, + "audio": CATEGORY_AUDIO_ID, + "video": CATEGORY_VIDEO_ID, + "all": CATEGORY_ALL_ID, + "datamate": CATEGORY_DATAMATE_ID, + "data-juicer": CATEGORY_DATA_JUICER_ID, +} + +# File paths +OPERATOR_BASE_PATH = "/operators" +UPLOAD_DIR = "upload" +EXTRACT_DIR = "extract" diff --git a/runtime/datamate-python/app/module/operator/exceptions.py b/runtime/datamate-python/app/module/operator/exceptions.py new file mode 100644 index 00000000..6eca13f5 --- /dev/null +++ b/runtime/datamate-python/app/module/operator/exceptions.py @@ -0,0 +1,72 @@ +""" +Operator Market Exceptions +算子市场异常定义 +""" +from enum import Enum +from typing import Optional + + +class OperatorErrorCode: + """算子错误码""" + def __init__(self, message: str, error_code: str): + self.message = message + self.error_code = error_code + + +class OperatorException(RuntimeError): + """算子异常基类""" + def __init__(self, operator_error_code: OperatorErrorCode): + self.message = operator_error_code.message + self.error_code = operator_error_code.error_code + super().__init__(self.message) + + +class OperatorErrorCodeEnum(Enum): + """算子错误码枚举""" + FIELD_NOT_FOUND = OperatorErrorCode( + "必填字段缺失", "OPERATOR_FIELD_NOT_FOUND" + ) + SETTINGS_PARSE_FAILED = OperatorErrorCode( + "设置解析失败", "OPERATOR_SETTINGS_PARSE_FAILED" + ) + OPERATOR_IN_INSTANCE = OperatorErrorCode( + "算子正在使用中", "OPERATOR_IN_INSTANCE" + ) + CANT_DELETE_PREDEFINED_OPERATOR = OperatorErrorCode( + "无法删除预定义算子", "CANT_DELETE_PREDEFINED_OPERATOR" + ) + + +class FieldNotFoundError(OperatorException): + """必填字段缺失""" + def __init__(self, field_name: str): + super().__init__( + OperatorErrorCodeEnum.FIELD_NOT_FOUND.value + ) + self.message = f"Required field '{field_name}' is missing" + self.field_name = field_name + + +class SettingsParseError(OperatorException): + """设置解析失败""" + def __init__(self, detail: Optional[str] = None): + super().__init__( + OperatorErrorCodeEnum.SETTINGS_PARSE_FAILED.value + ) + self.detail = detail + + +class OperatorInInstanceError(OperatorException): + """算子正在使用中""" + def __init__(self): + super().__init__( + OperatorErrorCodeEnum.OPERATOR_IN_INSTANCE.value + ) + + +class CannotDeletePredefinedOperatorError(OperatorException): + """无法删除预定义算子""" + def __init__(self): + super().__init__( + OperatorErrorCodeEnum.CANT_DELETE_PREDEFINED_OPERATOR.value + ) diff --git a/runtime/datamate-python/app/module/operator/interface/__init__.py b/runtime/datamate-python/app/module/operator/interface/__init__.py new file mode 100644 index 00000000..f83ad24f --- /dev/null +++ b/runtime/datamate-python/app/module/operator/interface/__init__.py @@ -0,0 +1,9 @@ +""" +Operator Market API Interfaces +算子市场 API 接口层 +""" +from .operator_routes import router as operator_router +from .category_routes import router as category_router + + +__all__ = ["operator_router", "category_router"] diff --git a/runtime/datamate-python/app/module/operator/interface/category_routes.py b/runtime/datamate-python/app/module/operator/interface/category_routes.py new file mode 100644 index 00000000..ed4207e0 --- /dev/null +++ b/runtime/datamate-python/app/module/operator/interface/category_routes.py @@ -0,0 +1,43 @@ +""" +Category API Routes +分类 API 路由 +""" +from fastapi import APIRouter, Depends + +from app.db.session import get_db +from app.module.shared.schema import StandardResponse +from app.module.operator.schema import CategoryTreePagedResponse +from app.module.operator.service import CategoryService +from app.module.operator.repository import ( + CategoryRepository, + CategoryRelationRepository, +) +from app.module.operator.repository.operator_repository import OperatorRepository +from app.db.models.operator import Category, CategoryRelation, Operator + +router = APIRouter(prefix="/categories", tags=["Category"]) + + +def get_category_service() -> CategoryService: + """获取分类服务实例""" + return CategoryService( + category_repo=CategoryRepository(Category()), + category_relation_repo=CategoryRelationRepository(CategoryRelation()), + operator_repo=OperatorRepository(Operator()), + ) + + +@router.get( + "/tree", + response_model=StandardResponse[CategoryTreePagedResponse], + summary="获取分类树", + description="获取算子树状分类结构,包含分组维度(如语言、模态)及资源统计数量", + tags=['mcp'] +) +async def get_category_tree( + service: CategoryService = Depends(get_category_service), + db=Depends(get_db) +): + """获取分类树""" + result = await service.get_all_categories(db) + return StandardResponse(code=200, message="success", data=result) diff --git a/runtime/datamate-python/app/module/operator/interface/operator_routes.py b/runtime/datamate-python/app/module/operator/interface/operator_routes.py new file mode 100644 index 00000000..8a1911d2 --- /dev/null +++ b/runtime/datamate-python/app/module/operator/interface/operator_routes.py @@ -0,0 +1,270 @@ +""" +Operator API Routes +算子 API 路由 +""" +from pathlib import Path +from typing import List, Optional + +from fastapi import APIRouter, Depends, HTTPException, UploadFile, Form +from fastapi.responses import FileResponse + +from app.db.session import get_db +from app.module.shared.schema import StandardResponse, PaginatedData +from app.module.operator.schema import ( + OperatorDto, + OperatorUpdateDto, + OperatorListRequest, + PreUploadResponse, +) +from app.module.operator.service import OperatorService +from app.module.operator.repository import ( + OperatorRepository, + CategoryRelationRepository, + OperatorReleaseRepository, +) +from app.module.operator.parsers import ParserHolder +from app.db.models.operator import Operator, CategoryRelation, OperatorRelease +from app.core.logging import get_logger +from app.module.shared.file_service import FileService +from app.module.shared.chunk_upload_repository import ChunkUploadRepository +from app.db.models.chunk_upload import ChunkUploadPreRequest + +logger = get_logger(__name__) + +router = APIRouter(prefix="/operators", tags=["Operator"]) + +def get_operator_service() -> OperatorService: + """获取算子服务实例""" + return OperatorService( + operator_repo=OperatorRepository(Operator()), + category_relation_repo=CategoryRelationRepository(CategoryRelation()), + operator_release_repo=OperatorReleaseRepository(OperatorRelease()), + parser_holder=ParserHolder(), + file_service=FileService(ChunkUploadRepository()), + ) + + +@router.post( + "/list", + response_model=StandardResponse[PaginatedData[OperatorDto]], + summary="查询算子列表", + description="根据参数查询算子列表(支持分页、分类过滤、关键词搜索)", + tags=['mcp'] +) +async def list_operators( + request: OperatorListRequest, + service: OperatorService = Depends(get_operator_service), + db=Depends(get_db) +): + """查询算子列表""" + operators = await service.get_operators( + page=request.page, + size=request.size, + categories=request.categories, + keyword=request.keyword, + is_star=request.is_star, + db=db + ) + + count = await service.count_operators( + categories=request.categories, + keyword=request.keyword, + is_star=request.is_star, + db=db + ) + + total_pages = (count + request.size - 1) // request.size # Ceiling division + + return StandardResponse( + code=200, + message="success", + data=PaginatedData( + page=request.page, + size=request.size, + total_elements=count, + total_pages=total_pages, + content=operators, + ) + ) + + +@router.get( + "/{operator_id}", + response_model=StandardResponse[OperatorDto], + summary="获取算子详情", + description="根据 ID 获取算子详细信息" +) +async def get_operator( + operator_id: str, + service: OperatorService = Depends(get_operator_service), + db=Depends(get_db) +): + """获取算子详情""" + try: + operator = await service.get_operator_by_id(operator_id, db) + return StandardResponse(code=200, message="success", data=operator) + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + + +@router.put( + "/{operator_id}", + response_model=StandardResponse[OperatorDto], + summary="更新算子", + description="更新算子信息" +) +async def update_operator( + operator_id: str, + request: OperatorUpdateDto, + service: OperatorService = Depends(get_operator_service), + db=Depends(get_db) +): + """更新算子""" + try: + operator = await service.update_operator(operator_id, request, db) + await db.commit() + return StandardResponse(code=200, message="success", data=operator) + except Exception as e: + logger.error(f"{operator_id} {request}", e) + await db.rollback() + raise HTTPException(status_code=400, detail=str(e)) + + +@router.post( + "/create", + response_model=StandardResponse[OperatorDto], + summary="创建算子", + description="创建新算子" +) +async def create_operator( + request: OperatorDto, + service: OperatorService = Depends(get_operator_service), + db=Depends(get_db) +): + """创建算子""" + try: + operator = await service.create_operator(request, db) + await db.commit() + return StandardResponse(code=200, message="success", data=operator) + except Exception as e: + await db.rollback() + raise HTTPException(status_code=400, detail=str(e)) + + +@router.post( + "/upload", + response_model=StandardResponse[OperatorDto], + summary="上传算子", + description="上传算子文件并解析元数据" +) +async def upload_operator( + file_name: str, + service: OperatorService = Depends(get_operator_service), + db=Depends(get_db) +): + """上传算子""" + try: + operator = await service.upload_operator(file_name, db) + return StandardResponse(code=200, message="success", data=operator) + except Exception as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.post( + "/upload/pre-upload", + response_model=StandardResponse[PreUploadResponse], + summary="预上传", + description="获取预上传 ID,用于分块上传" +) +async def pre_upload( + service: OperatorService = Depends(get_operator_service), + db=Depends(get_db) +): + """预上传""" + result = await service.pre_upload(db) + return StandardResponse( + code=200, + message="success", + data=PreUploadResponse(req_id=result["req_id"]) + ) + + +@router.post( + "/upload/chunk", + response_model=StandardResponse[dict], + summary="分块上传", + description="分块上传算子文件" +) +async def chunk_upload( + req_id: str = Form(..., description="预上传ID"), + file_no: int = Form(1, description="文件编号"), + file_name: str = Form(..., description="文件名"), + total_chunk_num: int = Form(1, description="总分块数"), + chunk_no: int = Form(1, description="当前分块号"), + file: UploadFile = ..., + check_sum_hex: Optional[str] = Form(None, description="校验和"), + service: OperatorService = Depends(get_operator_service), + db=Depends(get_db) +): + """分块上传""" + try: + file_content = await file.read() + result = await service.chunk_upload( + req_id=req_id, + file_no=file_no, + file_name=file_name, + total_chunk_num=total_chunk_num, + chunk_no=chunk_no, + check_sum_hex=check_sum_hex, + file_content=file_content, + db=db + ) + await db.commit() + return StandardResponse(code=200, message="success", data=result.dict()) + except Exception as e: + await db.rollback() + raise HTTPException(status_code=400, detail=str(e)) + + +@router.delete( + "/{operator_id}", + response_model=StandardResponse[None], + summary="删除算子", + description="删除算子" +) +async def delete_operator( + operator_id: str, + service: OperatorService = Depends(get_operator_service), + db=Depends(get_db) +): + """删除算子""" + try: + await service.delete_operator(operator_id, db) + await db.commit() + return StandardResponse(code=200, message="success", data=None) + except Exception as e: + await db.rollback() + raise HTTPException(status_code=400, detail=str(e)) + + +@router.get( + "/examples/download", + response_class=FileResponse, + summary="下载示例算子", + description="下载示例算子文件" +) +async def download_example_operator( + service: OperatorService = Depends(get_operator_service) +): + """下载示例算子""" + from app.module.operator.constants import EXAMPLE_OPERATOR_PATH + example_path = EXAMPLE_OPERATOR_PATH + try: + file_path = service.download_example_operator(example_path) + return FileResponse( + path=str(file_path), + filename=file_path.name, + media_type="application/octet-stream" + ) + except FileNotFoundError: + raise HTTPException(status_code=404, detail="Example file not found") diff --git a/runtime/datamate-python/app/module/operator/parsers/__init__.py b/runtime/datamate-python/app/module/operator/parsers/__init__.py new file mode 100644 index 00000000..db3c0504 --- /dev/null +++ b/runtime/datamate-python/app/module/operator/parsers/__init__.py @@ -0,0 +1,15 @@ +""" +Operator File Parsers +算子文件解析器 +""" +from .abstract_parser import AbstractParser +from .tar_parser import TarParser +from .zip_parser import ZipParser +from .parser_holder import ParserHolder + +__all__ = [ + "AbstractParser", + "TarParser", + "ZipParser", + "ParserHolder", +] diff --git a/runtime/datamate-python/app/module/operator/parsers/abstract_parser.py b/runtime/datamate-python/app/module/operator/parsers/abstract_parser.py new file mode 100644 index 00000000..27e9aa3c --- /dev/null +++ b/runtime/datamate-python/app/module/operator/parsers/abstract_parser.py @@ -0,0 +1,97 @@ +""" +Abstract Parser +抽象解析器基类 +""" +import json +import yaml +from abc import ABC, abstractmethod +from typing import Dict, Any, Optional + +from app.module.operator.schema import OperatorDto, OperatorReleaseDto +from app.module.operator.constants import CATEGORY_MAP, CATEGORY_OTHER_VENDOR_ID, CATEGORY_CUSTOMIZED_ID +from app.module.operator.exceptions import FieldNotFoundError + + +class AbstractParser(ABC): + """算子文件解析器抽象基类""" + + @abstractmethod + def parse_yaml_from_archive(self, archive_path: str, entry_path: str) -> OperatorDto: + """ + 从压缩包内读取指定路径的 yaml 文件并解析为 OperatorDto + + Args: + archive_path: 压缩包路径(zip 或 tar) + entry_path: 压缩包内部的文件路径,例如 "config/app.yaml" + + Returns: + 解析后的 OperatorDto + """ + pass + + @abstractmethod + def extract_to(self, archive_path: str, target_dir: str) -> None: + """ + 将压缩包解压到目标目录(保持相对路径) + + Args: + archive_path: 压缩包路径 + target_dir: 目标目录 + """ + pass + + def parse_yaml(self, yaml_content: str) -> OperatorDto: + """解析 YAML 内容为 OperatorDto""" + content: Dict[str, Any] = yaml.safe_load(yaml_content) + + operator = OperatorDto( + id=self._to_string(content.get("raw_id")), + name=self._to_string(content.get("name")), + description=self._to_string(content.get("description")), + version=self._to_string(content.get("version")), + inputs=self._to_json(content.get("inputs")), + outputs=self._to_json(content.get("outputs")), + runtime=self._to_json(content.get("runtime")), + settings=self._to_json(content.get("settings")), + metrics=self._to_json(content.get("metrics")), + ) + + # Handle changelog + changelog = content.get("release") + if isinstance(changelog, list): + operator_release = OperatorReleaseDto(changelog=changelog) + else: + operator_release = OperatorReleaseDto(changelog=[]) + operator.releases = [operator_release] + + # Build categories + categories = [ + CATEGORY_MAP.get(self._to_lower(content.get("language")), ""), + CATEGORY_MAP.get(self._to_lower(content.get("modal")), ""), + CATEGORY_MAP.get(self._to_lower(content.get("vendor")), CATEGORY_OTHER_VENDOR_ID), + CATEGORY_CUSTOMIZED_ID, + ] + operator.categories = categories + + return operator + + def _to_string(self, obj: Any) -> str: + """转换为字符串""" + if obj is None: + raise FieldNotFoundError("field") + return str(obj) + + def _to_lower(self, obj: Any) -> str: + """转换为小写字符串""" + if obj is None: + raise FieldNotFoundError("field") + return str(obj).lower() + + def _to_json(self, obj: Any) -> Optional[str]: + """转换为 JSON 字符串""" + if obj is None: + return None + try: + return json.dumps(obj) + except (TypeError, ValueError) as e: + raise ValueError(f"Failed to serialize to JSON: {e}") diff --git a/runtime/datamate-python/app/module/operator/parsers/parser_holder.py b/runtime/datamate-python/app/module/operator/parsers/parser_holder.py new file mode 100644 index 00000000..e4a79d63 --- /dev/null +++ b/runtime/datamate-python/app/module/operator/parsers/parser_holder.py @@ -0,0 +1,52 @@ +""" +Parser Holder +解析器持有者,根据文件类型选择合适的解析器 +""" +import os +from typing import Dict, Type + +from app.module.operator.parsers.abstract_parser import AbstractParser +from app.module.operator.parsers.tar_parser import TarParser +from app.module.operator.parsers.zip_parser import ZipParser +from app.module.operator.schema import OperatorDto + + +class ParserHolder: + """解析器持有者,根据文件类型选择解析器""" + + def __init__(self): + self._parsers: Dict[str, AbstractParser] = { + "tar": TarParser(), + "gz": TarParser(), + "tgz": TarParser(), + "zip": ZipParser(), + } + + def get_parser(self, file_path: str) -> AbstractParser: + """根据文件扩展名获取解析器""" + _, ext = os.path.splitext(file_path) + file_type = ext.lstrip('.').lower() + + if file_type not in self._parsers: + raise ValueError(f"Unsupported file type: {file_type}") + + return self._parsers[file_type] + + def parse_yaml_from_archive( + self, + file_type: str, + archive_path: str, + entry_path: str + ) -> OperatorDto: + """从压缩包解析 YAML""" + if file_type not in self._parsers: + raise ValueError(f"Unsupported file type: {file_type}") + + return self._parsers[file_type].parse_yaml_from_archive(archive_path, entry_path) + + def extract_to(self, file_type: str, archive_path: str, target_dir: str) -> None: + """解压文件到目标目录""" + if file_type not in self._parsers: + raise ValueError(f"Unsupported file type: {file_type}") + + self._parsers[file_type].extract_to(archive_path, target_dir) diff --git a/runtime/datamate-python/app/module/operator/parsers/tar_parser.py b/runtime/datamate-python/app/module/operator/parsers/tar_parser.py new file mode 100644 index 00000000..e2618cfa --- /dev/null +++ b/runtime/datamate-python/app/module/operator/parsers/tar_parser.py @@ -0,0 +1,41 @@ +""" +Tar File Parser +TAR 文件解析器 +""" +import tarfile +import os +from typing import Optional + +from app.module.operator.parsers.abstract_parser import AbstractParser +from app.module.operator.schema import OperatorDto + + +class TarParser(AbstractParser): + """TAR 压缩包解析器""" + + def parse_yaml_from_archive(self, archive_path: str, entry_path: str) -> OperatorDto: + """从 TAR 文件中解析 YAML""" + try: + with tarfile.open(archive_path, 'r:*') as tar: + for member in tar.getmembers(): + if member.name == entry_path or member.name.endswith(f"/{entry_path}"): + file = tar.extractfile(member) + if file: + content = file.read().decode('utf-8') + return self.parse_yaml(content) + raise FileNotFoundError(f"File '{entry_path}' not found in archive") + except (tarfile.TarError, EOFError) as e: + raise ValueError(f"Failed to parse TAR file: {e}") + + def extract_to(self, archive_path: str, target_dir: str) -> None: + """解压 TAR 文件到目标目录""" + try: + os.makedirs(target_dir, exist_ok=True) + with tarfile.open(archive_path, 'r:*') as tar: + # Safety check: prevent path traversal + for member in tar.getmembers(): + if os.path.isabs(member.name) or ".." in member.name.split("/"): + raise ValueError(f"Unsafe path in archive: {member.name}") + tar.extractall(target_dir) + except (tarfile.TarError, EOFError) as e: + raise ValueError(f"Failed to extract TAR file: {e}") diff --git a/runtime/datamate-python/app/module/operator/parsers/zip_parser.py b/runtime/datamate-python/app/module/operator/parsers/zip_parser.py new file mode 100644 index 00000000..a1741efe --- /dev/null +++ b/runtime/datamate-python/app/module/operator/parsers/zip_parser.py @@ -0,0 +1,41 @@ +""" +Zip File Parser +ZIP 文件解析器 +""" +import zipfile +import os +from typing import Optional + +from app.module.operator.parsers.abstract_parser import AbstractParser +from app.module.operator.schema import OperatorDto + + +class ZipParser(AbstractParser): + """ZIP 压缩包解析器""" + + def parse_yaml_from_archive(self, archive_path: str, entry_path: str) -> OperatorDto: + """从 ZIP 文件中解析 YAML""" + try: + with zipfile.ZipFile(archive_path, 'r') as zf: + # Check all possible paths + for name in zf.namelist(): + if name == entry_path or name.endswith(f"/{entry_path}"): + with zf.open(name) as file: + content = file.read().decode('utf-8') + return self.parse_yaml(content) + raise FileNotFoundError(f"File '{entry_path}' not found in archive") + except (zipfile.BadZipFile, zipfile.LargeZipFile) as e: + raise ValueError(f"Failed to parse ZIP file: {e}") + + def extract_to(self, archive_path: str, target_dir: str) -> None: + """解压 ZIP 文件到目标目录""" + try: + os.makedirs(target_dir, exist_ok=True) + with zipfile.ZipFile(archive_path, 'r') as zf: + # Safety check: prevent path traversal + for name in zf.namelist(): + if os.path.isabs(name) or ".." in name.split("/"): + raise ValueError(f"Unsafe path in archive: {name}") + zf.extractall(target_dir) + except (zipfile.BadZipFile, zipfile.LargeZipFile) as e: + raise ValueError(f"Failed to extract ZIP file: {e}") diff --git a/runtime/datamate-python/app/module/operator/repository/__init__.py b/runtime/datamate-python/app/module/operator/repository/__init__.py new file mode 100644 index 00000000..67859d72 --- /dev/null +++ b/runtime/datamate-python/app/module/operator/repository/__init__.py @@ -0,0 +1,15 @@ +""" +Operator Market Repositories +算子市场数据访问层 +""" +from .operator_repository import OperatorRepository +from .category_repository import CategoryRepository +from .category_relation_repository import CategoryRelationRepository +from .operator_release_repository import OperatorReleaseRepository + +__all__ = [ + "OperatorRepository", + "CategoryRepository", + "CategoryRelationRepository", + "OperatorReleaseRepository", +] diff --git a/runtime/datamate-python/app/module/operator/repository/category_relation_repository.py b/runtime/datamate-python/app/module/operator/repository/category_relation_repository.py new file mode 100644 index 00000000..1edd5868 --- /dev/null +++ b/runtime/datamate-python/app/module/operator/repository/category_relation_repository.py @@ -0,0 +1,77 @@ +""" +Category Relation Repository +分类关系数据访问层 +""" +from typing import List + +from sqlalchemy import select, delete, and_ +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db.models.operator import CategoryRelation +from app.module.operator.constants import CATEGORY_PREDEFINED_ID + + +class CategoryRelationRepository: + """分类关系数据访问层""" + + def __init__(self, model: CategoryRelation): + self.model = model + + async def find_all(self, db: AsyncSession) -> List[CategoryRelation]: + """查询所有分类关系""" + result = await db.execute(select(self.model)) + return result.scalars().all() + + async def batch_insert( + self, + operator_id: str, + category_ids: List[str], + db: AsyncSession + ) -> None: + """批量插入分类关系""" + for category_id in category_ids: + entity = CategoryRelation( + category_id=category_id, + operator_id=operator_id + ) + db.add(entity) + + async def batch_update( + self, + operator_id: str, + category_ids: List[str], + db: AsyncSession + ) -> None: + """批量更新分类关系(先删除后插入)""" + # Delete existing relations + await db.execute( + delete(self.model) + .where(self.model.operator_id == operator_id) + ) + # Insert new relations + for category_id in category_ids: + entity = CategoryRelation( + category_id=category_id, + operator_id=operator_id + ) + db.add(entity) + + async def delete_by_operator_id(self, operator_id: str, db: AsyncSession) -> None: + """根据算子ID删除分类关系""" + await db.execute( + delete(self.model) + .where(self.model.operator_id == operator_id) + ) + + async def operator_is_predefined(self, operator_id: str, db: AsyncSession) -> bool: + """检查算子是否为预定义算子""" + result = await db.execute( + select(self.model) + .where( + and_( + self.model.operator_id == operator_id, + self.model.category_id == CATEGORY_PREDEFINED_ID + ) + ) + ) + return result.first() is not None diff --git a/runtime/datamate-python/app/module/operator/repository/category_repository.py b/runtime/datamate-python/app/module/operator/repository/category_repository.py new file mode 100644 index 00000000..b5434d34 --- /dev/null +++ b/runtime/datamate-python/app/module/operator/repository/category_repository.py @@ -0,0 +1,23 @@ +""" +Category Repository +分类数据访问层 +""" +from typing import List + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db.models.operator import Category +from app.module.operator.schema import CategoryDto + + +class CategoryRepository: + """分类数据访问层""" + + def __init__(self, model: Category): + self.model = model + + async def find_all(self, db: AsyncSession) -> List[Category]: + """查询所有分类""" + result = await db.execute(select(self.model)) + return result.scalars().all() diff --git a/runtime/datamate-python/app/module/operator/repository/operator_release_repository.py b/runtime/datamate-python/app/module/operator/repository/operator_release_repository.py new file mode 100644 index 00000000..bcab7be8 --- /dev/null +++ b/runtime/datamate-python/app/module/operator/repository/operator_release_repository.py @@ -0,0 +1,72 @@ +""" +Operator Release Repository +算子发布版本数据访问层 +""" +from typing import List + +from sqlalchemy import select, delete, and_ +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db.models.operator import OperatorRelease +from app.module.operator.schema import OperatorReleaseDto + + +class OperatorReleaseRepository: + """算子发布版本数据访问层""" + + def __init__(self, model: OperatorRelease): + self.model = model + + async def find_all_by_operator_id( + self, + operator_id: str, + db: AsyncSession + ) -> List[OperatorRelease]: + """查询算子的所有发布版本""" + result = await db.execute( + select(OperatorRelease) + .where(OperatorRelease.id == operator_id) + .order_by(OperatorRelease.release_date.desc()) + ) + return result.scalars().all() + + async def insert( + self, + dto: OperatorReleaseDto, + db: AsyncSession + ) -> None: + """插入发布版本""" + entity = OperatorRelease( + id=dto.id, + version=dto.version, + release_date=dto.release_date, + changelog=dto.changelog + ) + db.add(entity) + + async def update( + self, + dto: OperatorReleaseDto, + db: AsyncSession + ) -> None: + """更新发布版本""" + result = await db.execute( + select(OperatorRelease) + .where( + and_( + OperatorRelease.id == dto.id, + OperatorRelease.version == dto.version + ) + ) + ) + entity = result.scalar_one_or_none() + if entity: + entity.changelog = dto.changelog + entity.release_date = dto.release_date + + async def delete(self, operator_id: str, db: AsyncSession) -> None: + """删除算子的所有发布版本""" + await db.execute( + delete(OperatorRelease) + .where(OperatorRelease.id == operator_id) + ) diff --git a/runtime/datamate-python/app/module/operator/repository/operator_repository.py b/runtime/datamate-python/app/module/operator/repository/operator_repository.py new file mode 100644 index 00000000..990f7eb3 --- /dev/null +++ b/runtime/datamate-python/app/module/operator/repository/operator_repository.py @@ -0,0 +1,121 @@ +""" +Operator Repository +算子数据访问层 +""" +import json +from typing import List, Optional +from datetime import datetime, timezone + +from sqlalchemy import select, text, update +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db.models.operator import Operator +from app.module.operator.schema import OperatorDto + + +class OperatorRepository: + """算子数据访问层""" + + def __init__(self, model: Operator): + self.model = model + + async def find_all(self, db: AsyncSession) -> List[Operator]: + """查询所有算子""" + result = await db.execute(select(Operator)) + return result.scalars().all() + + async def insert(self, dto: OperatorDto, db: AsyncSession) -> None: + """插入算子""" + entity = Operator( + id=dto.id, + name=dto.name, + description=dto.description, + version=dto.version, + inputs=dto.inputs, + outputs=dto.outputs, + runtime=dto.runtime, + settings=dto.settings, + file_name=dto.file_name, + file_size=dto.file_size, + metrics=dto.metrics, + usage_count=dto.usage_count or 0, + is_star=dto.is_star or False, + ) + db.add(entity) + + async def update(self, dto: OperatorDto, db: AsyncSession) -> None: + """更新算子""" + await db.execute( + update(Operator) + .where(Operator.id == dto.id) + .values( + name=dto.name, + description=dto.description, + version=dto.version, + inputs=dto.inputs, + outputs=dto.outputs, + runtime=dto.runtime, + settings=dto.settings, + file_name=dto.file_name, + file_size=dto.file_size, + metrics=dto.metrics, + is_star=dto.is_star, + updated_at=datetime.utcnow(), + ) + ) + + async def delete(self, operator_id: str, db: AsyncSession) -> None: + """删除算子""" + entity = await db.get(Operator, operator_id) + if entity: + await db.delete(entity) + + async def count_by_star(self, is_star: bool, db: AsyncSession) -> int: + """统计收藏算子数量""" + result = await db.execute( + select(text("COUNT(*)")) + .select_from(Operator) + .where(Operator.is_star == is_star) + ) + return result.scalar() or 0 + + async def operator_in_template(self, operator_id: str, db: AsyncSession) -> bool: + """检查算子是否在模板中""" + result = await db.execute( + text(""" + SELECT COUNT(*) FROM t_operator_instance oi + JOIN t_clean_template t ON oi.instance_id = t.id + WHERE oi.operator_id = :operator_id + """), + {"operator_id": operator_id} + ) + return (result.scalar() or 0) > 0 + + async def operator_in_unstop_task(self, operator_id: str, db: AsyncSession) -> bool: + """检查算子是否在未完成的任务中""" + result = await db.execute( + text(""" + SELECT COUNT(*) FROM t_operator_instance oi + JOIN t_clean_task t ON oi.instance_id = t.id + WHERE oi.operator_id = :operator_id AND t.status != 'COMPLETED' + """), + {"operator_id": operator_id} + ) + return (result.scalar() or 0) > 0 + + async def increment_usage_count( + self, + operator_ids: List[str], + db: AsyncSession + ) -> None: + """增加算子使用次数""" + if not operator_ids: + return + await db.execute( + update(Operator) + .where(Operator.id.in_(operator_ids)) + .values( + usage_count=Operator.usage_count + 1, + updated_at=datetime.now(timezone.utc), + ) + ) diff --git a/runtime/datamate-python/app/module/operator/schema/__init__.py b/runtime/datamate-python/app/module/operator/schema/__init__.py new file mode 100644 index 00000000..a084cbaf --- /dev/null +++ b/runtime/datamate-python/app/module/operator/schema/__init__.py @@ -0,0 +1,29 @@ +""" +Operator Market Schemas +算子市场 Schema 定义 +""" +from .operator import ( + OperatorDto, + OperatorListRequest, + PreUploadResponse, + OperatorUpdateDto, +) +from .category import ( + CategoryDto, + CategoryTreeResponse, + CategoryTreePagedResponse, + CategoryRelationDto, +) +from .release import OperatorReleaseDto + +__all__ = [ + "OperatorDto", + "OperatorListRequest", + "PreUploadResponse", + "CategoryDto", + "CategoryTreeResponse", + "CategoryTreePagedResponse", + "CategoryRelationDto", + "OperatorReleaseDto", + "OperatorUpdateDto", +] diff --git a/runtime/datamate-python/app/module/operator/schema/category.py b/runtime/datamate-python/app/module/operator/schema/category.py new file mode 100644 index 00000000..afd6e3c5 --- /dev/null +++ b/runtime/datamate-python/app/module/operator/schema/category.py @@ -0,0 +1,40 @@ +""" +Category Schemas +分类 Schema 定义 +""" +from typing import List, Optional +from datetime import datetime +from pydantic import BaseModel, Field + +from app.module.shared.schema import BaseResponseModel + + +class CategoryDto(BaseResponseModel): + """分类 DTO""" + id: str = Field(..., description="分类ID") + name: str = Field(..., description="分类名称") + value: Optional[str] = Field(None, description="分类值") + type: Optional[str] = Field(None, description="分类类型") + parent_id: Optional[str] = Field(None, description="父分类ID") + count: Optional[int] = Field(0, description="算子数量") + created_at: Optional[datetime] = Field(None, description="创建时间") + + +class CategoryTreeResponse(BaseResponseModel): + """分类树响应""" + id: str = Field(..., description="分类ID") + name: str = Field(..., description="分类名称") + count: int = Field(0, description="算子总数") + categories: List[CategoryDto] = Field(default_factory=list, description="子分类列表") + + +class CategoryTreePagedResponse(BaseResponseModel): + """分类树分页响应""" + star_count: int = Field(0, description="收藏的算子数量") + categories: List[CategoryTreeResponse] = Field(default_factory=list, description="分类树列表") + + +class CategoryRelationDto(BaseResponseModel): + """分类关系 DTO""" + category_id: str = Field(..., description="分类ID") + operator_id: str = Field(..., description="算子ID") diff --git a/runtime/datamate-python/app/module/operator/schema/operator.py b/runtime/datamate-python/app/module/operator/schema/operator.py new file mode 100644 index 00000000..c53ed864 --- /dev/null +++ b/runtime/datamate-python/app/module/operator/schema/operator.py @@ -0,0 +1,72 @@ +""" +Operator Schemas +算子 Schema 定义 +""" +from __future__ import annotations + +from typing import List, Optional, Dict, Any +from datetime import datetime +from pydantic import BaseModel, Field + +from app.module.shared.schema import BaseResponseModel +from .release import OperatorReleaseDto + + +class OperatorDto(BaseResponseModel): + """算子 DTO""" + id: str = Field(..., description="算子ID") + name: str = Field(..., description="算子名称") + description: Optional[str] = Field(None, description="算子描述") + version: str = Field(..., description="算子版本") + inputs: Optional[str] = Field(None, description="输入定义(JSON)") + outputs: Optional[str] = Field(None, description="输出定义(JSON)") + runtime: Optional[str] = Field(None, description="运行时配置(JSON)") + settings: Optional[str] = Field(None, description="算子设置(JSON)") + file_name: Optional[str] = Field(None, description="文件名") + file_size: Optional[int] = Field(None, description="文件大小(字节)") + metrics: Optional[str] = Field(None, description="算子指标(JSON)") + usage_count: Optional[int] = Field(None, description="使用次数") + is_star: Optional[bool] = Field(None, description="是否收藏") + categories: Optional[List[str]] = Field(None, description="分类ID列表") + overrides: Optional[Dict[str, Any]] = Field(None, description="设置覆盖值") + requirements: Optional[List[str]] = Field(None, description="Python 依赖列表") + readme: Optional[str] = Field(None, description="README 内容") + releases: Optional[List[OperatorReleaseDto]] = Field(None, description="发布版本列表") + created_at: Optional[datetime] = Field(None, description="创建时间") + updated_at: Optional[datetime] = Field(None, description="更新时间") + + +class OperatorListRequest(BaseResponseModel): + """算子列表查询请求""" + page: int = Field(1, ge=0, description="页码(从0开始)") + size: int = Field(10, ge=1, le=100, description="页大小") + categories: List[List[str]] = Field(default_factory=list, description="分类ID列表(每个父分类下的id放到一个列表中)") + keyword: Optional[str] = Field(None, description="搜索关键词") + label_name: Optional[str] = Field(None, description="标签名称(暂不支持)") + is_star: Optional[bool] = Field(None, description="是否收藏") + + +class PreUploadResponse(BaseResponseModel): + """预上传响应""" + req_id: str = Field(..., description="请求ID") + + +class OperatorUpdateDto(BaseResponseModel): + """算子更新 DTO(所有字段可选)""" + name: Optional[str] = Field(None, description="算子名称") + description: Optional[str] = Field(None, description="算子描述") + version: Optional[str] = Field(None, description="算子版本") + inputs: Optional[str] = Field(None, description="输入定义(JSON)") + outputs: Optional[str] = Field(None, description="输出定义(JSON)") + runtime: Optional[str] = Field(None, description="运行时配置(JSON)") + settings: Optional[str] = Field(None, description="算子设置(JSON)") + file_name: Optional[str] = Field(None, description="文件名") + file_size: Optional[int] = Field(None, description="文件大小(字节)") + metrics: Optional[str] = Field(None, description="算子指标(JSON)") + usage_count: Optional[int] = Field(None, description="使用次数") + is_star: Optional[bool] = Field(None, description="是否收藏") + categories: Optional[List[str]] = Field(None, description="分类ID列表") + overrides: Optional[Dict[str, Any]] = Field(None, description="设置覆盖值") + requirements: Optional[List[str]] = Field(None, description="Python 依赖列表") + readme: Optional[str] = Field(None, description="README 内容") + releases: Optional[List[OperatorReleaseDto]] = Field(None, description="发布版本列表") diff --git a/runtime/datamate-python/app/module/operator/schema/release.py b/runtime/datamate-python/app/module/operator/schema/release.py new file mode 100644 index 00000000..f91297ee --- /dev/null +++ b/runtime/datamate-python/app/module/operator/schema/release.py @@ -0,0 +1,22 @@ +""" +Operator Release Schemas +算子发布版本 Schema 定义 +""" +from __future__ import annotations + +from typing import List, Optional +from datetime import datetime +from pydantic import BaseModel, Field + +from app.module.shared.schema import BaseResponseModel + + +class OperatorReleaseDto(BaseResponseModel): + """算子发布版本 DTO""" + id: str = Field(..., description="算子ID") + version: str = Field(..., description="版本号") + release_date: Optional[datetime] = Field(None, description="发布时间") + changelog: Optional[List[str]] = Field(None, description="更新日志列表") + + +__all__ = ["OperatorReleaseDto"] diff --git a/runtime/datamate-python/app/module/operator/service/__init__.py b/runtime/datamate-python/app/module/operator/service/__init__.py new file mode 100644 index 00000000..3e1c1d0c --- /dev/null +++ b/runtime/datamate-python/app/module/operator/service/__init__.py @@ -0,0 +1,11 @@ +""" +Operator Market Services +算子市场服务层 +""" +from .operator_service import OperatorService +from .category_service import CategoryService + +__all__ = [ + "OperatorService", + "CategoryService", +] diff --git a/runtime/datamate-python/app/module/operator/service/category_service.py b/runtime/datamate-python/app/module/operator/service/category_service.py new file mode 100644 index 00000000..47a654b6 --- /dev/null +++ b/runtime/datamate-python/app/module/operator/service/category_service.py @@ -0,0 +1,101 @@ +""" +Category Service +分类服务层 +""" +from typing import List + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.module.operator.repository import ( + CategoryRepository, + CategoryRelationRepository, +) +from app.module.operator.schema import ( + CategoryDto, + CategoryTreeResponse, + CategoryTreePagedResponse, +) +from app.db.models.operator import Operator +from app.module.operator.repository.operator_repository import OperatorRepository + + +class CategoryService: + """分类服务""" + + def __init__( + self, + category_repo: CategoryRepository, + category_relation_repo: CategoryRelationRepository, + operator_repo: OperatorRepository, + ): + self.category_repo = category_repo + self.category_relation_repo = category_relation_repo + self.operator_repo = operator_repo + + async def get_all_categories( + self, + db: AsyncSession + ) -> CategoryTreePagedResponse: + """获取所有分类(树状结构)""" + # Get all categories + all_categories = await self.category_repo.find_all(db) + category_map = {c.id: c for c in all_categories} + + # Get all relations and count operators per category + all_relations = await self.category_relation_repo.find_all(db) + relation_map = {} + for rel in all_relations: + if rel.category_id not in relation_map: + relation_map[rel.category_id] = 0 + relation_map[rel.category_id] += 1 + + # Group by parent_id + grouped_by_parent = {} + for cat in all_categories: + if cat.parent_id != "0": + if cat.parent_id not in grouped_by_parent: + grouped_by_parent[cat.parent_id] = [] + grouped_by_parent[cat.parent_id].append(cat) + + # Build category trees + parent_ids = sorted( + grouped_by_parent.keys(), + key=lambda pid: category_map[pid].created_at or 0 + ) + + category_trees = [] + for parent_id in parent_ids: + group = grouped_by_parent[parent_id] + parent_category = category_map[parent_id] + + # Build DTOs for children + child_dtos = [] + total_count = 0 + for cat in sorted(group, key=lambda c: c.created_at or 0): + cat_dto = CategoryDto( + id=cat.id, + name=cat.name, + value=cat.value, + type=cat.type, + parent_id=cat.parent_id, + count=relation_map.get(cat.id, 0), + created_at=cat.created_at, + ) + child_dtos.append(cat_dto) + total_count += cat_dto.count + + tree = CategoryTreeResponse( + id=parent_id, + name=parent_category.name, + count=total_count, + categories=child_dtos, + ) + category_trees.append(tree) + + # Get star count + star_count = await self.operator_repo.count_by_star(True, db) + + return CategoryTreePagedResponse( + star_count=star_count, + categories=category_trees, + ) diff --git a/runtime/datamate-python/app/module/operator/service/operator_service.py b/runtime/datamate-python/app/module/operator/service/operator_service.py new file mode 100644 index 00000000..17127e58 --- /dev/null +++ b/runtime/datamate-python/app/module/operator/service/operator_service.py @@ -0,0 +1,599 @@ +""" +Operator Service +算子服务层 +""" +import json +import os +import uuid +import shutil +from pathlib import Path +from typing import List, Optional, Dict, Any, TYPE_CHECKING + +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, text, func + +from app.core.logging import get_logger +from app.module.operator.repository import ( + OperatorRepository, + CategoryRelationRepository, + OperatorReleaseRepository, +) +from app.module.operator.schema import ( + OperatorDto, + OperatorUpdateDto, + OperatorReleaseDto, +) +from app.module.operator.parsers import ParserHolder +from app.module.operator.constants import ( + OPERATOR_BASE_PATH, + UPLOAD_DIR, + EXTRACT_DIR, + YAML_PATH, + SERVICE_ID, +) +from app.module.operator.exceptions import ( + SettingsParseError, + OperatorInInstanceError, + CannotDeletePredefinedOperatorError, +) +from app.module.shared.file_service import FileService +from app.module.shared.file_models import ( + ChunkUploadRequestDto, + FileUploadResult, +) + +logger = get_logger(__name__) + + +class OperatorService: + """算子服务""" + + def __init__( + self, + operator_repo: OperatorRepository, + category_relation_repo: CategoryRelationRepository, + operator_release_repo: OperatorReleaseRepository, + parser_holder: ParserHolder, + file_service: FileService, + ): + self.operator_repo = operator_repo + self.category_relation_repo = category_relation_repo + self.operator_release_repo = operator_release_repo + self.parser_holder = parser_holder + self.file_service = file_service + + async def get_operators( + self, + page: int, + size: int, + categories: List[List[str]], + keyword: Optional[str], + is_star: Optional[bool], + db: AsyncSession + ) -> List[OperatorDto]: + """查询算子列表(分页)""" + offset = page * size + + # Build query with categories filter + conditions = [] + params = {"limit": size, "offset": offset} + + if is_star is not None: + conditions.append("ov.is_star = :is_star") + params["is_star"] = is_star + + if keyword: + conditions.append( + "(ov.operator_name ILIKE :keyword OR ov.description ILIKE :keyword)" + ) + params["keyword"] = f"%{keyword}%" + + where_clause = "" + if conditions: + where_clause = "WHERE " + " AND ".join(conditions) + + # Handle categories grouping + group_by = "GROUP BY ov.operator_id, ov.operator_name, ov.description, ov.version, " \ + "ov.inputs, ov.outputs, ov.runtime, ov.settings, ov.is_star, " \ + "ov.file_size, ov.usage_count, ov.created_at, ov.updated_at, ov.created_by, ov.updated_by" + + having_clause = "" + if categories: + # Flatten all category IDs for IN clause + all_category_ids = [cat_id for sublist in categories for cat_id in sublist] + if all_category_ids: + where_clause += " AND category_id = ANY(:category_ids)" if where_clause else "WHERE category_id = ANY(:category_ids)" + params["category_ids"] = all_category_ids + + # Build HAVING clause for category groups + having_clauses = [] + for i, cat_group in enumerate(categories): + cat_list = ", ".join([f"'{cat_id}'" for cat_id in cat_group]) + having_clauses.append( + f"SUM(CASE WHEN category_id IN ({cat_list}) THEN 1 ELSE 0 END) > 0" + ) + having_clause = "HAVING " + " AND ".join(having_clauses) + + query = f""" + SELECT + ov.operator_id AS id, + ov.operator_name AS name, + ov.description, + ov.version, + ov.inputs, + ov.outputs, + ov.runtime, + ov.settings, + ov.is_star, + ov.file_size, + ov.usage_count, + ov.created_at, + ov.updated_at + FROM v_operator ov + {where_clause} + {group_by} + {having_clause} + ORDER BY ov.created_at DESC + LIMIT :limit OFFSET :offset + """ + + result = await db.execute(text(query), params) + rows = result.fetchall() + + # Convert to DTOs + operators = [] + for row in rows: + operators.append(OperatorDto( + id=row.id, + name=row.name, + description=row.description, + version=row.version, + inputs=row.inputs, + outputs=row.outputs, + runtime=row.runtime, + settings=row.settings, + file_name=None, + file_size=row.file_size, + metrics=None, + usage_count=row.usage_count, + is_star=row.is_star, + created_at=row.created_at, + updated_at=row.updated_at, + )) + + return operators + + async def count_operators( + self, + categories: List[List[str]], + keyword: Optional[str], + is_star: Optional[bool], + db: AsyncSession + ) -> int: + """统计算子数量""" + conditions = [] + params = {} + + if is_star is not None: + conditions.append("is_star = :is_star") + params["is_star"] = is_star + + if keyword: + conditions.append( + "(operator_name ILIKE :keyword OR description ILIKE :keyword)" + ) + params["keyword"] = f"%{keyword}%" + + where_clause = "" + if conditions: + where_clause = "WHERE " + " AND ".join(conditions) + + # Handle categories grouping + group_by = "GROUP BY operator_id, operator_name, description, version, inputs, outputs, " \ + "runtime, settings, is_star, file_size, usage_count, created_at, updated_at, " \ + "created_by, updated_by" + + having_clause = "" + if categories: + # Flatten all category IDs for IN clause + all_category_ids = [cat_id for sublist in categories for cat_id in sublist] + if all_category_ids: + where_clause += " AND category_id = ANY(:category_ids)" if where_clause else "WHERE category_id = ANY(:category_ids)" + params["category_ids"] = all_category_ids + + # Build HAVING clause for category groups + having_clauses = [] + for i, cat_group in enumerate(categories): + cat_list = ", ".join([f"'{cat_id}'" for cat_id in cat_group]) + having_clauses.append( + f"SUM(CASE WHEN category_id IN ({cat_list}) THEN 1 ELSE 0 END) > 0" + ) + having_clause = "HAVING " + " AND ".join(having_clauses) + + query = f""" + SELECT COUNT(*) as count + FROM ( + SELECT operator_id + FROM v_operator + {where_clause} + {group_by} + {having_clause} + ) AS t + """ + + result = await db.execute(text(query), params) + return result.scalar() or 0 + + async def get_operator_by_id( + self, + operator_id: str, + db: AsyncSession + ) -> OperatorDto: + """根据 ID 获取算子详情""" + result = await db.execute( + text("SELECT * FROM v_operator WHERE operator_id = :operator_id"), + {"operator_id": operator_id} + ) + row = result.fetchone() + + if not row: + raise ValueError(f"Operator {operator_id} not found") + + # Build DTO + operator = OperatorDto( + id=row.operator_id, + name=row.operator_name, + description=row.description, + version=row.version, + inputs=row.inputs, + outputs=row.outputs, + runtime=row.runtime, + settings=row.settings, + file_name=row.file_name, + file_size=row.file_size, + metrics=row.metrics, + usage_count=row.usage_count, + is_star=row.is_star, + created_at=row.created_at, + updated_at=row.updated_at, + ) + + # Read requirements and readme if file exists + if row.file_name: + extract_path = self._get_extract_path( + self._get_stem(row.file_name) + ) + operator.requirements = self._read_requirements(extract_path) + operator.readme = self._get_readme_content(extract_path) + + operator.file_name = None # Don't return file_name + + # Load releases + releases = await self.operator_release_repo.find_all_by_operator_id( + operator_id, db + ) + operator.releases = [ + OperatorReleaseDto( + id=release.id, + version=release.version, + release_date=release.release_date, + changelog=release.changelog + ) + for release in releases + ] + + return operator + + async def create_operator( + self, + req: OperatorDto, + db: AsyncSession + ) -> OperatorDto: + """创建算子""" + from datetime import datetime, timezone + + # Generate ID if not provided + if not req.id: + req.id = str(uuid.uuid4()) + + # Override settings + self._override_settings(req) + + # Insert operator + await self.operator_repo.insert(req, db) + + # Insert category relations + if req.categories: + await self.category_relation_repo.batch_insert( + req.id, req.categories, db + ) + + # Insert release + if req.releases: + release = req.releases[0] + release.id = req.id + release.version = req.version + release.release_date = datetime.now(timezone.utc) + await self.operator_release_repo.insert(release, db) + + # Extract files + if req.file_name: + self.parser_holder.extract_to( + self._get_file_type(req.file_name), + self._get_upload_path(req.file_name), + self._get_extract_path(self._get_stem(req.file_name)) + ) + + await db.flush() + return await self.get_operator_by_id(req.id, db) + + async def update_operator( + self, + operator_id: str, + req: OperatorUpdateDto, + db: AsyncSession + ) -> OperatorDto: + """更新算子""" + from datetime import datetime, timezone + + # Get existing operator + existing = await self.get_operator_by_id(operator_id, db) + + # Merge update request into existing operator + # Only update fields that are provided (not None) + if req.name is not None: + existing.name = req.name + if req.description is not None: + existing.description = req.description + if req.version is not None: + existing.version = req.version + if req.inputs is not None: + existing.inputs = req.inputs + if req.outputs is not None: + existing.outputs = req.outputs + if req.runtime is not None: + existing.runtime = req.runtime + if req.settings is not None: + existing.settings = req.settings + if req.file_name is not None: + existing.file_name = req.file_name + if req.file_size is not None: + existing.file_size = req.file_size + if req.metrics is not None: + existing.metrics = req.metrics + if req.usage_count is not None: + existing.usage_count = req.usage_count + if req.is_star is not None: + existing.is_star = req.is_star + if req.categories is not None: + existing.categories = req.categories + if req.overrides is not None: + existing.overrides = req.overrides + + # Override settings + self._override_settings(existing) + + # Update operator + await self.operator_repo.update(existing, db) + + # Update category relations + if req.categories is not None: + await self.category_relation_repo.batch_update( + operator_id, req.categories, db + ) + + # Update release + logger.info(f"########### {req.releases}") + if req.releases is not None and len(req.releases) > 0: + release = req.releases[0] + if release.version is None: + release.version = existing.version + release.id = operator_id + release.release_date = datetime.now(timezone.utc) + if existing.version == release.version: + await self.operator_release_repo.update(release, db) + else: + await self.operator_release_repo.insert(release, db) + + # Extract files + if req.file_name is not None: + self.parser_holder.extract_to( + self._get_file_type(req.file_name), + self._get_upload_path(req.file_name), + self._get_extract_path(self._get_stem(req.file_name)) + ) + + await db.flush() + return await self.get_operator_by_id(operator_id, db) + + async def delete_operator( + self, + operator_id: str, + db: AsyncSession + ) -> None: + """删除算子""" + # Check if operator is in use + in_template = await self.operator_repo.operator_in_template(operator_id, db) + in_unstop_task = await self.operator_repo.operator_in_unstop_task(operator_id, db) + if in_template and in_unstop_task: + raise OperatorInInstanceError() + + # Check if operator is predefined + is_predefined = await self.category_relation_repo.operator_is_predefined( + operator_id, db + ) + if is_predefined: + raise CannotDeletePredefinedOperatorError() + + # Get operator for file cleanup + operator = await self.get_operator_by_id(operator_id, db) + + # Delete from database + await self.operator_repo.delete(operator_id, db) + await self.category_relation_repo.delete_by_operator_id(operator_id, db) + await self.operator_release_repo.delete(operator_id, db) + + # Delete extracted files + if operator.file_name: + extract_path = self._get_extract_path(self._get_stem(operator.file_name)) + shutil.rmtree(extract_path, ignore_errors=True) + + async def upload_operator( + self, + file_name: str, + db: AsyncSession + ) -> OperatorDto: + """上传算子文件并解析元数据""" + return self.parser_holder.parse_yaml_from_archive( + self._get_file_type(file_name), + self._get_upload_path(file_name), + YAML_PATH + ) + + async def pre_upload(self, db: AsyncSession) -> Dict[str, str]: + """预上传,返回请求 ID""" + from app.module.operator.constants import OPERATOR_BASE_PATH, UPLOAD_DIR + + upload_path = os.path.join(OPERATOR_BASE_PATH, UPLOAD_DIR) + req_id = await self.file_service.pre_upload( + upload_path=upload_path, + service_id=SERVICE_ID, + check_info=None + ) + return {"req_id": req_id} + + async def chunk_upload( + self, + req_id: str, + file_no: int, + file_name: str, + total_chunk_num: int, + chunk_no: int, + check_sum_hex: Optional[str], + file_content: bytes, + db: AsyncSession + ) -> FileUploadResult: + """分块上传文件""" + from app.module.operator.constants import OPERATOR_BASE_PATH, UPLOAD_DIR + + upload_path = os.path.join(OPERATOR_BASE_PATH, UPLOAD_DIR) + + chunk_request = ChunkUploadRequestDto( + req_id=req_id, + file_no=file_no, + file_name=file_name, + total_chunk_num=total_chunk_num, + chunk_no=chunk_no, + check_sum_hex=check_sum_hex, + ) + + return await self.file_service.chunk_upload( + chunk_request, upload_path, file_content, db + ) + + def download_example_operator(self, file_path: str) -> Path: + """下载示例算子文件""" + path = Path(file_path) + if not path.exists(): + raise FileNotFoundError(f"File not found: {file_path}") + return path + + def _override_settings(self, operator: OperatorDto) -> None: + """用 overrides 值覆盖 settings 的 defaultVal""" + if not operator.settings or not operator.overrides: + return + + try: + settings = json.loads(operator.settings) + for key, value in operator.overrides.items(): + if key not in settings: + continue + + setting = settings[key] + setting_type = setting.get("type") + + match setting_type: + case "slider" | "switch" | "select" | "input" | "radio": + setting["defaultVal"] = value + case "checkbox": + setting["defaultVal"] = self._convert_to_list_string(value) + case "range": + self._update_properties(setting, value) + + settings[key] = setting + + operator.settings = json.dumps(settings) + except json.JSONDecodeError as e: + raise SettingsParseError(str(e)) + + def _convert_to_list_string(self, value: Any) -> str: + """转换为逗号分隔的字符串""" + if value is None: + return "" + if isinstance(value, list): + return ",".join(str(v) for v in value) + return str(value) + + def _update_properties(self, setting: Dict[str, Any], value: Any) -> None: + """更新 range 类型的 properties""" + if not isinstance(value, list): + return + + properties = setting.get("properties", []) + if not isinstance(properties, list) or len(properties) != len(value): + return + + for i, prop in enumerate(properties): + if isinstance(prop, dict): + prop["defaultVal"] = value[i] + + setting["properties"] = properties + + def _read_requirements(self, extract_path: str) -> List[str]: + """读取 requirements.txt""" + requirements_path = Path(extract_path) / "requirements.txt" + if not requirements_path.exists(): + return [] + + requirements = [] + try: + with open(requirements_path, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + requirements.append(line) + except Exception as e: + logger.warning(f"Failed to read requirements: {e}") + return requirements + + def _get_readme_content(self, extract_path: str) -> str: + """读取 README 内容""" + dir_path = Path(extract_path) + if not dir_path.exists(): + return "" + + candidates = ["README.md", "readme.md", "Readme.md"] + for filename in candidates: + readme_path = dir_path / filename + if readme_path.exists(): + try: + return readme_path.read_text(encoding='utf-8') + except Exception as e: + logger.warning(f"Failed to read README: {e}") + return "" + + def _get_file_type(self, file_name: str) -> str: + """获取文件类型(扩展名)""" + return file_name.rsplit('.', 1)[-1].lower() if '.' in file_name else "" + + def _get_stem(self, file_name: str) -> str: + """获取文件名不含扩展名""" + return file_name.rsplit('.', 1)[0] if '.' in file_name else file_name + + def _get_upload_path(self, file_name: str) -> str: + """获取上传文件路径""" + return os.path.join(OPERATOR_BASE_PATH, UPLOAD_DIR, file_name) + + def _get_extract_path(self, file_stem: str) -> str: + """获取解压路径""" + return os.path.join(OPERATOR_BASE_PATH, EXTRACT_DIR, file_stem) diff --git a/runtime/datamate-python/app/module/shared/__init__.py b/runtime/datamate-python/app/module/shared/__init__.py index e69de29b..fd0d7a1a 100644 --- a/runtime/datamate-python/app/module/shared/__init__.py +++ b/runtime/datamate-python/app/module/shared/__init__.py @@ -0,0 +1,21 @@ +""" +Shared Module Init +共享模块初始化 +""" +from .file_service import FileService +from .file_models import ( + ChunkUploadPreRequestDto, + ChunkUploadRequestDto, + FileUploadResult, +) +from .chunks_saver import ChunksSaver +from .chunk_upload_repository import ChunkUploadRepository + +__all__ = [ + "FileService", + "ChunkUploadPreRequestDto", + "ChunkUploadRequestDto", + "FileUploadResult", + "ChunksSaver", + "ChunkUploadRepository", +] diff --git a/runtime/datamate-python/app/module/shared/chunk_upload_repository.py b/runtime/datamate-python/app/module/shared/chunk_upload_repository.py new file mode 100644 index 00000000..8a0c717d --- /dev/null +++ b/runtime/datamate-python/app/module/shared/chunk_upload_repository.py @@ -0,0 +1,95 @@ +""" +Chunk Upload Repository +分片上传数据访问层 +""" +from typing import Optional, List + +from sqlalchemy import select, update, delete +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db.models.chunk_upload import ChunkUploadPreRequest +from app.core.logging import get_logger + +logger = get_logger(__name__) + + +class ChunkUploadRepository: + """分片上传数据访问层""" + + async def find_by_id( + self, + req_id: str, + db: AsyncSession + ) -> Optional[ChunkUploadPreRequest]: + """根据ID查询""" + result = await db.execute( + select(ChunkUploadPreRequest).where(ChunkUploadPreRequest.id == req_id) + ) + return result.scalar_one_or_none() + + async def find_by_service_id( + self, + service_id: str, + db: AsyncSession + ) -> List[ChunkUploadPreRequest]: + """根据服务ID查询""" + result = await db.execute( + select(ChunkUploadPreRequest).where( + ChunkUploadPreRequest.service_id == service_id + ) + ) + return result.scalars().all() + + async def find_all(self, db: AsyncSession) -> List[ChunkUploadPreRequest]: + """查询所有""" + result = await db.execute(select(ChunkUploadPreRequest)) + return result.scalars().all() + + async def insert( + self, + request: ChunkUploadPreRequest, + db: AsyncSession + ) -> None: + """插入""" + db.add(request) + + async def update( + self, + request: ChunkUploadPreRequest, + db: AsyncSession + ) -> int: + """更新""" + from datetime import datetime, timezone + result = await db.execute( + update(ChunkUploadPreRequest) + .where(ChunkUploadPreRequest.id == request.id) + .values( + uploaded_file_num=request.uploaded_file_num, + timeout=request.timeout, + ) + ) + return result.rowcount + + async def delete_by_id( + self, + req_id: str, + db: AsyncSession + ) -> int: + """根据ID删除""" + result = await db.execute( + delete(ChunkUploadPreRequest).where(ChunkUploadPreRequest.id == req_id) + ) + return result.rowcount + + async def delete_by_service_id( + self, + service_id: str, + db: AsyncSession + ) -> int: + """根据服务ID删除""" + result = await db.execute( + delete(ChunkUploadPreRequest).where( + ChunkUploadPreRequest.service_id == service_id + ) + ) + return result.rowcount diff --git a/runtime/datamate-python/app/module/shared/chunks_saver.py b/runtime/datamate-python/app/module/shared/chunks_saver.py new file mode 100644 index 00000000..554b263b --- /dev/null +++ b/runtime/datamate-python/app/module/shared/chunks_saver.py @@ -0,0 +1,146 @@ +""" +Chunks Saver +分片保存器,用于处理文件分片上传 +""" +import os +from pathlib import Path +from typing import Optional +from datetime import datetime, timezone + +from fastapi import UploadFile + +from app.core.logging import get_logger +from app.module.shared.file_models import ChunkUploadRequestDto + +logger = get_logger(__name__) + + +class ChunksSaver: + """分片保存器""" + + TEMP_DIR_NAME_FORMAT = "req_%s_chunks" + + @staticmethod + def save( + file_upload_request: ChunkUploadRequestDto, + pre_upload_req_id: str, + upload_path: str, + file_content: bytes + ) -> Optional[Path]: + """ + 保存分片 + + Args: + file_upload_request: 上传分片的请求 + pre_upload_req_id: 预上传请求ID + upload_path: 上传基础路径 + file_content: 文件内容(字节) + + Returns: + 保存后的文件路径,如果不是最后一个分片则返回None + """ + start_time = datetime.now(timezone.utc) + + temp_dir = Path(upload_path) / ( + ChunksSaver.TEMP_DIR_NAME_FORMAT % pre_upload_req_id + ) + temp_dir.mkdir(parents=True, exist_ok=True) + + temp_file = temp_dir / str(file_upload_request.file_no) + + ChunksSaver._append_to_target_file(temp_file, file_content) + + if file_upload_request.total_chunk_num != file_upload_request.chunk_no: + elapsed = (datetime.now(timezone.utc) - start_time).total_seconds() + logger.debug(f"save chunk {file_upload_request.chunk_no} cost {elapsed}s") + return None + + final_file = Path(upload_path) / file_upload_request.file_name + + try: + temp_file.rename(final_file) + except OSError as e: + logger.error( + f"failed to mv file: {temp_file.name}, req id: {pre_upload_req_id}, error: {e}" + ) + raise ValueError("failed to move file to target dir") from e + + elapsed = (datetime.now(timezone.utc) - start_time).total_seconds() + logger.debug(f"save chunk {file_upload_request.chunk_no} cost {elapsed}s") + + return final_file + + @staticmethod + def save_file( + file_upload_request: ChunkUploadRequestDto, + upload_path: str, + file_content: bytes + ) -> Path: + """ + 保存文件(不分片) + + Args: + file_upload_request: 上传请求 + upload_path: 上传路径 + file_content: 文件内容(字节) + + Returns: + 保存后的文件路径 + """ + target_file = Path(upload_path) / file_upload_request.file_name + + logger.info(f"file path {target_file}, file size {len(file_content)}") + + try: + target_file.parent.mkdir(parents=True, exist_ok=True) + target_file.write_bytes(file_content) + except OSError as e: + logger.error(f"failed to save file: {target_file}, error: {e}") + raise ValueError("failed to save file") from e + + return target_file + + @staticmethod + def delete_folder(folder_path: str) -> None: + """ + 删除指定路径下的所有文件 + + Args: + folder_path: 文件夹路径 + """ + folder = Path(folder_path) + + if not folder.exists(): + logger.info(f"folder {folder_path} does not exist") + return + + try: + for item in folder.glob("*"): + if item.is_file(): + item.unlink() + elif item.is_dir(): + for sub_item in item.glob("*"): + if sub_item.is_file(): + sub_item.unlink() + elif sub_item.is_dir(): + ChunksSaver.delete_folder(str(sub_item)) + item.rmdir() + except OSError as e: + logger.error(f"failed to delete folder: {folder_path}, error: {e}") + raise ValueError("failed to delete folder") from e + + @staticmethod + def _append_to_target_file(target_file: Path, content: bytes) -> None: + """ + 追加内容到目标文件末尾 + + Args: + target_file: 目标文件 + content: 要追加的内容 + """ + try: + with open(target_file, "ab") as f: + f.write(content) + except OSError as e: + logger.error(f"failed to append to file: {target_file}, error: {e}") + raise ValueError("failed to append content to file") from e diff --git a/runtime/datamate-python/app/module/shared/file_models.py b/runtime/datamate-python/app/module/shared/file_models.py new file mode 100644 index 00000000..c4e98775 --- /dev/null +++ b/runtime/datamate-python/app/module/shared/file_models.py @@ -0,0 +1,38 @@ +""" +File Models +文件相关模型定义 +""" +from pathlib import Path +from typing import Optional +from pydantic import BaseModel, Field +from datetime import datetime + + +class ChunkUploadPreRequestDto(BaseModel): + """分片上传预请求DTO""" + id: str = Field(..., description="请求ID") + total_file_num: int = Field(..., description="总文件数", ge=1) + uploaded_file_num: Optional[int] = Field(None, description="已上传文件数", ge=0) + upload_path: str = Field(..., description="文件路径") + timeout: Optional[datetime] = Field(None, description="上传请求超时时间") + service_id: Optional[str] = Field(None, description="上传请求所属服务ID") + check_info: Optional[str] = Field(None, description="业务信息") + + +class ChunkUploadRequestDto(BaseModel): + """分片上传请求DTO""" + req_id: str = Field(..., description="预上传返回的ID") + file_no: int = Field(1, description="文件编号", ge=1) + file_name: str = Field(..., description="文件名称") + total_chunk_num: int = Field(1, description="总分块数量", ge=1) + chunk_no: int = Field(1, description="当前分块编号", ge=1) + file_size: Optional[int] = Field(None, description="文件大小", ge=0) + check_sum_hex: Optional[str] = Field(None, description="文件校验和(十六进制字符串)") + + +class FileUploadResult(BaseModel): + """文件上传结果""" + is_all_files_uploaded: bool = Field(..., description="是否所有文件已上传") + check_info: Optional[str] = Field(None, description="业务上传信息") + saved_file_path: Optional[str] = Field(None, description="保存的文件路径") + file_name: str = Field(..., description="文件名称") diff --git a/runtime/datamate-python/app/module/shared/file_service.py b/runtime/datamate-python/app/module/shared/file_service.py new file mode 100644 index 00000000..1c859c85 --- /dev/null +++ b/runtime/datamate-python/app/module/shared/file_service.py @@ -0,0 +1,187 @@ +""" +File Service +文件服务,处理文件上传、分片上传等功能 +""" +import os +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +from fastapi import UploadFile + +from app.core.logging import get_logger +from app.module.shared.file_models import ( + ChunkUploadPreRequestDto, + ChunkUploadRequestDto, + FileUploadResult, +) +from app.module.shared.chunks_saver import ChunksSaver +from app.module.shared.chunk_upload_repository import ChunkUploadRepository +from app.db.models.chunk_upload import ChunkUploadPreRequest + +logger = get_logger(__name__) + + +class FileService: + """文件服务""" + + DEFAULT_TIMEOUT_SECONDS = 120 + + def __init__( + self, + chunk_upload_repo: ChunkUploadRepository, + ): + self.chunk_upload_repo = chunk_upload_repo + + async def pre_upload( + self, + upload_path: str, + service_id: str, + check_info: Optional[str] = None + ) -> str: + """ + 预上传 + + Args: + upload_path: 上传路径 + service_id: 服务ID + check_info: 业务信息 + + Returns: + 预上传请求ID + """ + req_id = str(uuid.uuid4()) + timeout = datetime.now(timezone.utc).replace( + microsecond=0 + ) + timezone.timedelta(seconds=self.DEFAULT_TIMEOUT_SECONDS) + + pre_request = ChunkUploadPreRequest( + id=req_id, + total_file_num=1, + uploaded_file_num=0, + upload_path=upload_path, + timeout=timeout, + service_id=service_id, + check_info=check_info, + ) + + await self.chunk_upload_repo.insert(pre_request) + return req_id + + async def chunk_upload( + self, + upload_request: ChunkUploadRequestDto, + upload_path: str, + file_content: bytes, + db_session, + ) -> FileUploadResult: + """ + 分片上传 + + Args: + upload_request: 上传请求 + upload_path: 上传路径 + file_content: 文件内容 + db_session: 数据库会话 + + Returns: + 上传结果 + """ + upload_request.file_size = len(file_content) + + pre_request = await self.chunk_upload_repo.find_by_id( + upload_request.req_id, db_session + ) + + if pre_request is None: + logger.error(f"pre-upload request not found: {upload_request.req_id}") + raise ValueError("Pre-upload request not found") + + if pre_request.is_upload_complete(): + logger.error(f"upload already complete: {upload_request.req_id}") + raise ValueError("Upload already complete") + + if pre_request.is_request_timeout(): + logger.error(f"upload request timeout: {upload_request.req_id}") + raise ValueError("Upload request timeout") + + saved_file_path = None + + if upload_request.total_chunk_num > 1: + saved_file_path = await self._upload_chunk( + upload_request, pre_request, upload_path, file_content + ) + else: + saved_file_path = await self._upload_file( + upload_request, pre_request, upload_path, file_content + ) + + update_count = await self.chunk_upload_repo.update(pre_request, db_session) + + if update_count == 0: + logger.error(f"failed to update pre-request: {upload_request.req_id}") + raise ValueError("Failed to update pre-upload request") + + is_finish = pre_request.uploaded_file_num == pre_request.total_file_num + + if is_finish: + temp_dir = os.path.join( + upload_path, + ChunksSaver.TEMP_DIR_NAME_FORMAT % pre_request.id + ) + try: + ChunksSaver.delete_folder(temp_dir) + except Exception as e: + logger.warning(f"failed to delete temp dir: {temp_dir}, error: {e}") + + await self.chunk_upload_repo.delete_by_id(pre_request.id, db_session) + + return FileUploadResult( + is_all_files_uploaded=is_finish, + check_info=pre_request.check_info, + saved_file_path=str(saved_file_path) if saved_file_path else None, + file_name=upload_request.file_name, + ) + + async def _upload_file( + self, + upload_request: ChunkUploadRequestDto, + pre_request: ChunkUploadPreRequest, + upload_path: str, + file_content: bytes + ) -> Path: + """上传单文件""" + saved_file = ChunksSaver.save_file( + upload_request, upload_path, file_content + ) + + from datetime import timezone + pre_request.timeout = datetime.now(timezone.utc).replace( + microsecond=0 + ) + timezone.timedelta(seconds=self.DEFAULT_TIMEOUT_SECONDS) + pre_request.increment_uploaded_file_num() + + return saved_file + + async def _upload_chunk( + self, + upload_request: ChunkUploadRequestDto, + pre_request: ChunkUploadPreRequest, + upload_path: str, + file_content: bytes + ) -> Optional[Path]: + """上传分片""" + saved_file = ChunksSaver.save( + upload_request, pre_request.id, upload_path, file_content + ) + + if saved_file is not None: + pre_request.increment_uploaded_file_num() + return saved_file + + from datetime import timezone + pre_request.timeout = datetime.now(timezone.utc).replace( + microsecond=0 + ) + timezone.timedelta(seconds=self.DEFAULT_TIMEOUT_SECONDS) + return None diff --git a/scripts/db/data-operator-init.sql b/scripts/db/data-operator-init.sql index 0587b841..c85380a0 100644 --- a/scripts/db/data-operator-init.sql +++ b/scripts/db/data-operator-init.sql @@ -49,6 +49,10 @@ CREATE TABLE IF NOT EXISTS t_operator_release version VARCHAR(255), release_date TIMESTAMP, changelog JSON, + created_by VARCHAR(255), + updated_by VARCHAR(255), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id, version) ); @@ -60,7 +64,10 @@ CREATE TABLE IF NOT EXISTS t_operator_category value VARCHAR(64) UNIQUE, type VARCHAR(64), parent_id VARCHAR(64), - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + created_by VARCHAR(255), + updated_by VARCHAR(255), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); COMMENT ON TABLE t_operator_category IS '算子分类表'; @@ -76,6 +83,10 @@ CREATE TABLE IF NOT EXISTS t_operator_category_relation ( category_id VARCHAR(64), operator_id VARCHAR(64), + created_by VARCHAR(255), + updated_by VARCHAR(255), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (category_id, operator_id) ); diff --git a/scripts/images/backend-python/Dockerfile b/scripts/images/backend-python/Dockerfile index 4d276dd0..826a0531 100644 --- a/scripts/images/backend-python/Dockerfile +++ b/scripts/images/backend-python/Dockerfile @@ -55,6 +55,7 @@ ENV NLTK_DATA=/usr/local/nltk_data # Copy the rest of the application COPY runtime/datamate-python /app +COPY runtime/ops/examples/test_operator/test_operator.tar /app/test_operator.tar COPY --from=datax-builder /DataX/target/datax/datax /opt/datax RUN cp /opt/datax/plugin/reader/mysqlreader/libs/mysql* /opt/datax/plugin/reader/starrocksreader/libs/ From 09b8564f39362871b75105f162b0cc3c503edd2e Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Wed, 4 Feb 2026 10:11:50 +0800 Subject: [PATCH 03/20] =?UTF-8?q?=E7=AE=97=E5=AD=90=E5=B8=82=E5=9C=BApytho?= =?UTF-8?q?n=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/module/operator/service/operator_service.py | 1 + .../app/module/shared/file_service.py | 13 ++++++------- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/runtime/datamate-python/app/module/operator/service/operator_service.py b/runtime/datamate-python/app/module/operator/service/operator_service.py index 17127e58..094c49f5 100644 --- a/runtime/datamate-python/app/module/operator/service/operator_service.py +++ b/runtime/datamate-python/app/module/operator/service/operator_service.py @@ -458,6 +458,7 @@ async def pre_upload(self, db: AsyncSession) -> Dict[str, str]: req_id = await self.file_service.pre_upload( upload_path=upload_path, service_id=SERVICE_ID, + db_session=db, check_info=None ) return {"req_id": req_id} diff --git a/runtime/datamate-python/app/module/shared/file_service.py b/runtime/datamate-python/app/module/shared/file_service.py index 1c859c85..e51db024 100644 --- a/runtime/datamate-python/app/module/shared/file_service.py +++ b/runtime/datamate-python/app/module/shared/file_service.py @@ -4,7 +4,7 @@ """ import os import uuid -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Optional @@ -38,6 +38,7 @@ async def pre_upload( self, upload_path: str, service_id: str, + db_session, check_info: Optional[str] = None ) -> str: """ @@ -54,7 +55,7 @@ async def pre_upload( req_id = str(uuid.uuid4()) timeout = datetime.now(timezone.utc).replace( microsecond=0 - ) + timezone.timedelta(seconds=self.DEFAULT_TIMEOUT_SECONDS) + ) + timedelta(seconds=self.DEFAULT_TIMEOUT_SECONDS) pre_request = ChunkUploadPreRequest( id=req_id, @@ -66,7 +67,7 @@ async def pre_upload( check_info=check_info, ) - await self.chunk_upload_repo.insert(pre_request) + await self.chunk_upload_repo.insert(pre_request, db_session) return req_id async def chunk_upload( @@ -156,10 +157,9 @@ async def _upload_file( upload_request, upload_path, file_content ) - from datetime import timezone pre_request.timeout = datetime.now(timezone.utc).replace( microsecond=0 - ) + timezone.timedelta(seconds=self.DEFAULT_TIMEOUT_SECONDS) + ) + timedelta(seconds=self.DEFAULT_TIMEOUT_SECONDS) pre_request.increment_uploaded_file_num() return saved_file @@ -180,8 +180,7 @@ async def _upload_chunk( pre_request.increment_uploaded_file_num() return saved_file - from datetime import timezone pre_request.timeout = datetime.now(timezone.utc).replace( microsecond=0 - ) + timezone.timedelta(seconds=self.DEFAULT_TIMEOUT_SECONDS) + ) + timedelta(seconds=self.DEFAULT_TIMEOUT_SECONDS) return None From 492cd00cc507f05a89b18deef2439e01578d56f1 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Wed, 4 Feb 2026 15:11:54 +0800 Subject: [PATCH 04/20] =?UTF-8?q?=E7=AE=97=E5=AD=90=E5=B8=82=E5=9C=BApytho?= =?UTF-8?q?n=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deployment/docker/datamate/docker-compose.yml | 2 + .../Detail/components/ChangeLog.tsx | 2 +- .../pages/OperatorMarket/operator.const.tsx | 6 +- .../app/db/models/chunk_upload.py | 8 +-- .../operator/interface/operator_routes.py | 67 ++++++++++--------- .../operator/parsers/abstract_parser.py | 31 +++++++-- .../module/operator/parsers/parser_holder.py | 13 +++- .../app/module/operator/parsers/tar_parser.py | 10 ++- .../app/module/operator/parsers/zip_parser.py | 11 ++- .../category_relation_repository.py | 16 ++--- .../app/module/operator/schema/operator.py | 2 +- .../operator/service/operator_service.py | 65 ++++++++++++------ .../app/module/shared/file_service.py | 17 ++--- scripts/db/data-operator-init.sql | 3 - 14 files changed, 157 insertions(+), 96 deletions(-) diff --git a/deployment/docker/datamate/docker-compose.yml b/deployment/docker/datamate/docker-compose.yml index 9f3ec006..eb49a0ce 100644 --- a/deployment/docker/datamate/docker-compose.yml +++ b/deployment/docker/datamate/docker-compose.yml @@ -35,6 +35,8 @@ services: - flow_volume:/flow - log_volume:/var/log/datamate - graph_data_volume:/data/rag_storage + - operator-upload-volume:/operators/upload + - operator-runtime-volume:/operators/extract networks: [ datamate ] depends_on: - datamate-database diff --git a/frontend/src/pages/OperatorMarket/Detail/components/ChangeLog.tsx b/frontend/src/pages/OperatorMarket/Detail/components/ChangeLog.tsx index c9e77ad0..cf3f100c 100644 --- a/frontend/src/pages/OperatorMarket/Detail/components/ChangeLog.tsx +++ b/frontend/src/pages/OperatorMarket/Detail/components/ChangeLog.tsx @@ -20,7 +20,7 @@ export default function ChangeLog({ operator }) { )}