diff --git a/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/application/KnowledgeBaseService.java b/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/application/KnowledgeBaseService.java index b9f7f26c..95940111 100644 --- a/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/application/KnowledgeBaseService.java +++ b/backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/application/KnowledgeBaseService.java @@ -2,6 +2,11 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.datamate.common.domain.enums.EdgeType; +import com.datamate.common.domain.enums.NodeType; +import com.datamate.common.domain.model.LineageEdge; +import com.datamate.common.domain.model.LineageNode; +import com.datamate.common.domain.service.LineageService; import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.common.infrastructure.exception.KnowledgeBaseErrorCode; import com.datamate.common.interfaces.PagedResponse; @@ -9,6 +14,10 @@ import com.datamate.common.setting.domain.entity.ModelConfig; import com.datamate.common.setting.domain.repository.ModelConfigRepository; import com.datamate.common.setting.infrastructure.client.ModelClient; +import com.datamate.datamanagement.domain.model.dataset.Dataset; +import com.datamate.datamanagement.domain.model.dataset.DatasetFile; +import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository; +import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository; import com.datamate.rag.indexer.domain.model.FileStatus; import com.datamate.rag.indexer.domain.model.KnowledgeBase; import com.datamate.rag.indexer.domain.model.RagChunk; @@ -36,7 +45,10 @@ import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; /** * 知识库服务类 @@ -51,7 +63,10 @@ public class KnowledgeBaseService { private final RagFileRepository ragFileRepository; private final ApplicationEventPublisher eventPublisher; private final ModelConfigRepository modelConfigRepository; + private final DatasetRepository datasetRepository; + private final DatasetFileRepository datasetFileRepository; private final MilvusService milvusService; + private final LineageService lineageService; /** * 创建知识库 @@ -151,6 +166,7 @@ public void addFiles(AddFilesReq request) { }).toList(); ragFileRepository.saveBatch(ragFiles, 100); eventPublisher.publishEvent(new DataInsertedEvent(knowledgeBase, request)); + updateLineageGraph(knowledgeBase, request.getFiles()); } public PagedResponse listFiles(String knowledgeBaseId, RagFileReq request) { @@ -222,4 +238,55 @@ public List retrieve(RetrieveReq request) { }); return searchResults; } -} \ No newline at end of file + + /** + * 向知识库添加文件的时候,将相关数据集加入血缘图 + * + * @param knowledgeBase 知识库 + * @param files 数据集中选择的文件 + */ + private void updateLineageGraph(KnowledgeBase knowledgeBase, List files) { + LineageNode kbNode = lineageService.getNodeById(knowledgeBase.getId()); + if (kbNode == null) { + kbNode = new LineageNode(); + kbNode.setId(knowledgeBase.getId()); + kbNode.setNodeType(NodeType.KNOWLEDGE_BASE); + kbNode.setName(knowledgeBase.getName()); + kbNode.setDescription(knowledgeBase.getDescription()); + } + + // 获取所有唯一的数据集ID + Set datasetIds = files.stream() + .map(fileInfo -> { + DatasetFile datasetFile = datasetFileRepository.getById(fileInfo.id()); + return datasetFile != null ? datasetFile.getDatasetId() : null; + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + // 为每个数据集创建血缘关系 + for (String datasetId : datasetIds) { + Dataset dataset = datasetRepository.getById(datasetId); + if (dataset == null) continue; + + // 创建源数据集节点 + LineageNode datasetNode = new LineageNode(); + datasetNode.setId(dataset.getId()); + datasetNode.setNodeType(NodeType.DATASET); + datasetNode.setName(dataset.getName()); + datasetNode.setDescription(dataset.getDescription()); + + // 创建血缘边 + LineageEdge edge = new LineageEdge(); + edge.setProcessId(knowledgeBase.getId()); + edge.setName(""); + edge.setEdgeType(EdgeType.KNOWLEDGE_BASE); + edge.setDescription("Add the files from dataset to the knowledge base."); + edge.setFromNodeId(dataset.getId()); + edge.setToNodeId(knowledgeBase.getId()); + + // 生成血缘图 + lineageService.generateGraph(datasetNode, edge, kbNode); + } + } +} diff --git a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/enums/EdgeType.java b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/enums/EdgeType.java index 928415b7..7c298a27 100644 --- a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/enums/EdgeType.java +++ b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/enums/EdgeType.java @@ -10,5 +10,6 @@ public enum EdgeType { DATA_CLEANING, DATA_LABELING, DATA_SYNTHESIS, - DATA_RATIO + DATA_RATIO, + KNOWLEDGE_BASE, } diff --git a/runtime/datamate-python/app/module/generation/interface/generation_api.py b/runtime/datamate-python/app/module/generation/interface/generation_api.py index bee0fba9..dfbd6b8d 100644 --- a/runtime/datamate-python/app/module/generation/interface/generation_api.py +++ b/runtime/datamate-python/app/module/generation/interface/generation_api.py @@ -496,8 +496,10 @@ async def export_synthesis_task_to_dataset( - 仅写入文件,不再创建数据集。 """ exporter = SynthesisDatasetExporter(db) + generation = GenerationService(db) try: dataset = await exporter.export_task_to_dataset(task_id, dataset_id) + await generation.add_synthesis_to_graph(db, task_id, dataset_id) except SynthesisExportError as e: logger.error( "Failed to export synthesis task %s to dataset %s: %s", diff --git a/runtime/datamate-python/app/module/generation/service/generation_service.py b/runtime/datamate-python/app/module/generation/service/generation_service.py index f5d0ad98..3dec30df 100644 --- a/runtime/datamate-python/app/module/generation/service/generation_service.py +++ b/runtime/datamate-python/app/module/generation/service/generation_service.py @@ -8,13 +8,14 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from app.db.models.base_entity import LineageNode, LineageEdge from app.db.models.data_synthesis import ( DataSynthInstance, DataSynthesisFileInstance, DataSynthesisChunkInstance, SynthesisData, ) -from app.db.models.dataset_management import DatasetFiles +from app.db.models.dataset_management import DatasetFiles, Dataset from app.db.session import logger from app.module.generation.schema.generation import Config, SyntheConfig from app.module.generation.service.prompt import ( @@ -26,6 +27,8 @@ from app.module.shared.util.model_chat import extract_json_substring from app.module.shared.llm import LLMFactory from app.module.system.service.common_service import get_model_by_id +from app.module.shared.common.lineage import LineageService +from app.module.shared.schema import NodeType, EdgeType def _filter_docs(split_docs, chunk_size): @@ -657,3 +660,52 @@ async def _increment_processed_chunks(self, file_task_id: str, delta: int) -> No file_task.processed_chunks = new_value await self.db.commit() await self.db.refresh(file_task) + + async def add_synthesis_to_graph(self, db: AsyncSession, task_id: str, dest_dataset_id: str) -> None: + """记录数据合成血缘关系:源数据集 -> 合成数据集 via DATA_SYNTHESIS""" + try: + # 获取任务和目标数据集信息 + task = await self.db.get(DataSynthInstance, task_id) + src_dataset_result = await db.execute( + select(DatasetFiles.dataset_id) + .join(DataSynthesisFileInstance, DatasetFiles.id == DataSynthesisFileInstance.source_file_id) + .where(DataSynthesisFileInstance.synthesis_instance_id == task_id) + .limit(1) + ) + src_dataset_id = src_dataset_result.scalar_one_or_none() + src_dataset = await self.db.get(Dataset, src_dataset_id) + dst_dataset = await self.db.get(Dataset, dest_dataset_id) + + if not task or not dst_dataset: + logger.warning("Missing task or destination dataset for lineage graph") + return + + src_node = LineageNode( + id=src_dataset.id, + node_type=NodeType.DATASET.value, + name=src_dataset.name, + description=src_dataset.description + ) + dest_node = LineageNode( + id=dst_dataset.id, + node_type=NodeType.DATASET.value, + name=dst_dataset.name, + description=dst_dataset.description + ) + synthesis_edge = LineageEdge( + process_id=task_id, + name=task.name, + edge_type=EdgeType.DATA_SYNTHESIS.value, + description=task.description, + from_node_id=src_node.id, + to_node_id=dst_dataset.id + ) + + # 生成血缘图 + lineage_service = LineageService(db=db) + await lineage_service.generate_graph(src_node, synthesis_edge, dest_node) + await self.db.commit() + + logger.info(f"Added synthesis lineage: {src_node.name} -> {dest_dataset.name}") + except Exception as exc: + logger.error(f"Failed to add synthesis lineage: {exc}") diff --git a/runtime/datamate-python/app/module/ratio/service/ratio_task.py b/runtime/datamate-python/app/module/ratio/service/ratio_task.py index 10247dab..9a865f7e 100644 --- a/runtime/datamate-python/app/module/ratio/service/ratio_task.py +++ b/runtime/datamate-python/app/module/ratio/service/ratio_task.py @@ -10,11 +10,13 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.core.logging import get_logger +from app.db.models.base_entity import LineageNode, LineageEdge from app.db.models.ratio_task import RatioInstance, RatioRelation from app.db.models import Dataset, DatasetFiles from app.db.session import AsyncSessionLocal from app.module.dataset.schema.dataset_file import DatasetFileTag -from app.module.shared.schema import TaskStatus +from app.module.shared.common.lineage import LineageService +from app.module.shared.schema import TaskStatus, NodeType, EdgeType from app.module.ratio.schema.ratio_task import FilterCondition logger = get_logger(__name__) @@ -126,7 +128,12 @@ async def execute_dataset_ratio_task(instance_id: str) -> None: # Done instance.status = TaskStatus.COMPLETED.name logger.info(f"Dataset ratio execution completed: instance={instance_id}, files={added_count}, size={added_size}, {instance.status}") - + await RatioTaskService._add_task_to_graph( + session=session, + src_relations=relations, + task=instance, + dst_dataset=target_ds, + ) except Exception as e: logger.exception(f"Dataset ratio execution failed for {instance_id}: {e}") try: @@ -326,3 +333,50 @@ def get_all_tags(tags) -> list[dict]: for tag_data in file_tag.get_tags(): all_tags.append(tag_data) return all_tags + + @staticmethod + async def _add_task_to_graph( + session: AsyncSession, + src_relations: List[RatioRelation], + task: RatioInstance, + dst_dataset: Dataset, + ) -> None: + """ + 在比例抽取任务完成后,将数据集加入血缘图。 + ratio_task(DATASOURCE) -> dataset(DATASET) via DATA_RATIO edge + """ + try: + if not src_relations: + logger.warning("Source ratio relations is empty when building lineage graph") + return + + lineage_service = LineageService(db=session) + dst_node = LineageNode( + id=dst_dataset.id, + node_type=NodeType.DATASET.value, + name=dst_dataset.name, + description=dst_dataset.description, + ) + for rel in src_relations: + ds: Optional[Dataset] = await session.get(Dataset, rel.source_dataset_id) + src_node = LineageNode( + id=rel.source_dataset_id, + node_type=NodeType.DATASET.value, + name=ds.name, + description=ds.description, + ) + ratio_edge = LineageEdge( + process_id=task.id, + name=task.name, + edge_type=EdgeType.DATA_RATIO.value, + description=task.description, + from_node_id=rel.source_dataset_id, + to_node_id=dst_node.id, + ) + await lineage_service.generate_graph(src_node, ratio_edge, dst_node) + logger.info("Add dataset lineage graph: %s -> %s -> %s", src_node.id, ratio_edge.id, dst_node.id) + await session.commit() + logger.info("Add dataset lineage graph success") + except Exception as exc: + logger.error("Failed to add dataset lineage graph: %s", exc) + await session.rollback()