Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.datamate.common.domain.enums.EdgeType;
import com.datamate.common.domain.enums.NodeType;
import com.datamate.common.domain.model.LineageEdge;
import com.datamate.common.domain.model.LineageNode;
import com.datamate.common.domain.service.LineageService;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.KnowledgeBaseErrorCode;
import com.datamate.common.interfaces.PagedResponse;
import com.datamate.common.interfaces.PagingQuery;
import com.datamate.common.setting.domain.entity.ModelConfig;
import com.datamate.common.setting.domain.repository.ModelConfigRepository;
import com.datamate.common.setting.infrastructure.client.ModelClient;
import com.datamate.datamanagement.domain.model.dataset.Dataset;
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository;
import com.datamate.rag.indexer.domain.model.FileStatus;
import com.datamate.rag.indexer.domain.model.KnowledgeBase;
import com.datamate.rag.indexer.domain.model.RagChunk;
Expand Down Expand Up @@ -36,7 +45,10 @@

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
* 知识库服务类
Expand All @@ -51,7 +63,10 @@ public class KnowledgeBaseService {
private final RagFileRepository ragFileRepository;
private final ApplicationEventPublisher eventPublisher;
private final ModelConfigRepository modelConfigRepository;
private final DatasetRepository datasetRepository;
private final DatasetFileRepository datasetFileRepository;
private final MilvusService milvusService;
private final LineageService lineageService;

/**
* 创建知识库
Expand Down Expand Up @@ -151,6 +166,7 @@ public void addFiles(AddFilesReq request) {
}).toList();
ragFileRepository.saveBatch(ragFiles, 100);
eventPublisher.publishEvent(new DataInsertedEvent(knowledgeBase, request));
updateLineageGraph(knowledgeBase, request.getFiles());
}

public PagedResponse<RagFile> listFiles(String knowledgeBaseId, RagFileReq request) {
Expand Down Expand Up @@ -222,4 +238,55 @@ public List<SearchResp.SearchResult> retrieve(RetrieveReq request) {
});
return searchResults;
}
}

/**
* 向知识库添加文件的时候,将相关数据集加入血缘图
*
* @param knowledgeBase 知识库
* @param files 数据集中选择的文件
*/
private void updateLineageGraph(KnowledgeBase knowledgeBase, List<AddFilesReq.FileInfo> files) {
LineageNode kbNode = lineageService.getNodeById(knowledgeBase.getId());
if (kbNode == null) {
kbNode = new LineageNode();
kbNode.setId(knowledgeBase.getId());
kbNode.setNodeType(NodeType.KNOWLEDGE_BASE);
kbNode.setName(knowledgeBase.getName());
kbNode.setDescription(knowledgeBase.getDescription());
}

// 获取所有唯一的数据集ID
Set<String> datasetIds = files.stream()
.map(fileInfo -> {
DatasetFile datasetFile = datasetFileRepository.getById(fileInfo.id());
return datasetFile != null ? datasetFile.getDatasetId() : null;
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());

// 为每个数据集创建血缘关系
for (String datasetId : datasetIds) {
Dataset dataset = datasetRepository.getById(datasetId);
if (dataset == null) continue;

// 创建源数据集节点
LineageNode datasetNode = new LineageNode();
datasetNode.setId(dataset.getId());
datasetNode.setNodeType(NodeType.DATASET);
datasetNode.setName(dataset.getName());
datasetNode.setDescription(dataset.getDescription());

// 创建血缘边
LineageEdge edge = new LineageEdge();
edge.setProcessId(knowledgeBase.getId());
edge.setName("");
edge.setEdgeType(EdgeType.KNOWLEDGE_BASE);
edge.setDescription("Add the files from dataset to the knowledge base.");
edge.setFromNodeId(dataset.getId());
edge.setToNodeId(knowledgeBase.getId());

// 生成血缘图
lineageService.generateGraph(datasetNode, edge, kbNode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ public enum EdgeType {
DATA_CLEANING,
DATA_LABELING,
DATA_SYNTHESIS,
DATA_RATIO
DATA_RATIO,
KNOWLEDGE_BASE,
}
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,10 @@ async def export_synthesis_task_to_dataset(
- 仅写入文件,不再创建数据集。
"""
exporter = SynthesisDatasetExporter(db)
generation = GenerationService(db)
try:
dataset = await exporter.export_task_to_dataset(task_id, dataset_id)
await generation.add_synthesis_to_graph(db, task_id, dataset_id)
except SynthesisExportError as e:
logger.error(
"Failed to export synthesis task %s to dataset %s: %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.db.models.base_entity import LineageNode, LineageEdge
from app.db.models.data_synthesis import (
DataSynthInstance,
DataSynthesisFileInstance,
DataSynthesisChunkInstance,
SynthesisData,
)
from app.db.models.dataset_management import DatasetFiles
from app.db.models.dataset_management import DatasetFiles, Dataset
from app.db.session import logger
from app.module.generation.schema.generation import Config, SyntheConfig
from app.module.generation.service.prompt import (
Expand All @@ -26,6 +27,8 @@
from app.module.shared.util.model_chat import extract_json_substring
from app.module.shared.llm import LLMFactory
from app.module.system.service.common_service import get_model_by_id
from app.module.shared.common.lineage import LineageService
from app.module.shared.schema import NodeType, EdgeType


def _filter_docs(split_docs, chunk_size):
Expand Down Expand Up @@ -657,3 +660,52 @@ async def _increment_processed_chunks(self, file_task_id: str, delta: int) -> No
file_task.processed_chunks = new_value
await self.db.commit()
await self.db.refresh(file_task)

async def add_synthesis_to_graph(self, db: AsyncSession, task_id: str, dest_dataset_id: str) -> None:
"""记录数据合成血缘关系:源数据集 -> 合成数据集 via DATA_SYNTHESIS"""
try:
# 获取任务和目标数据集信息
task = await self.db.get(DataSynthInstance, task_id)
src_dataset_result = await db.execute(
select(DatasetFiles.dataset_id)
.join(DataSynthesisFileInstance, DatasetFiles.id == DataSynthesisFileInstance.source_file_id)
.where(DataSynthesisFileInstance.synthesis_instance_id == task_id)
.limit(1)
)
src_dataset_id = src_dataset_result.scalar_one_or_none()
src_dataset = await self.db.get(Dataset, src_dataset_id)
dst_dataset = await self.db.get(Dataset, dest_dataset_id)

if not task or not dst_dataset:
logger.warning("Missing task or destination dataset for lineage graph")
return

src_node = LineageNode(
id=src_dataset.id,
node_type=NodeType.DATASET.value,
name=src_dataset.name,
description=src_dataset.description
)
dest_node = LineageNode(
id=dst_dataset.id,
node_type=NodeType.DATASET.value,
name=dst_dataset.name,
description=dst_dataset.description
)
synthesis_edge = LineageEdge(
process_id=task_id,
name=task.name,
edge_type=EdgeType.DATA_SYNTHESIS.value,
description=task.description,
from_node_id=src_node.id,
to_node_id=dst_dataset.id
)

# 生成血缘图
lineage_service = LineageService(db=db)
await lineage_service.generate_graph(src_node, synthesis_edge, dest_node)
await self.db.commit()

logger.info(f"Added synthesis lineage: {src_node.name} -> {dest_dataset.name}")
except Exception as exc:
logger.error(f"Failed to add synthesis lineage: {exc}")
58 changes: 56 additions & 2 deletions runtime/datamate-python/app/module/ratio/service/ratio_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.logging import get_logger
from app.db.models.base_entity import LineageNode, LineageEdge
from app.db.models.ratio_task import RatioInstance, RatioRelation
from app.db.models import Dataset, DatasetFiles
from app.db.session import AsyncSessionLocal
from app.module.dataset.schema.dataset_file import DatasetFileTag
from app.module.shared.schema import TaskStatus
from app.module.shared.common.lineage import LineageService
from app.module.shared.schema import TaskStatus, NodeType, EdgeType
from app.module.ratio.schema.ratio_task import FilterCondition

logger = get_logger(__name__)
Expand Down Expand Up @@ -126,7 +128,12 @@ async def execute_dataset_ratio_task(instance_id: str) -> None:
# Done
instance.status = TaskStatus.COMPLETED.name
logger.info(f"Dataset ratio execution completed: instance={instance_id}, files={added_count}, size={added_size}, {instance.status}")

await RatioTaskService._add_task_to_graph(
session=session,
src_relations=relations,
task=instance,
dst_dataset=target_ds,
)
except Exception as e:
logger.exception(f"Dataset ratio execution failed for {instance_id}: {e}")
try:
Expand Down Expand Up @@ -326,3 +333,50 @@ def get_all_tags(tags) -> list[dict]:
for tag_data in file_tag.get_tags():
all_tags.append(tag_data)
return all_tags

@staticmethod
async def _add_task_to_graph(
session: AsyncSession,
src_relations: List[RatioRelation],
task: RatioInstance,
dst_dataset: Dataset,
) -> None:
"""
在比例抽取任务完成后,将数据集加入血缘图。
ratio_task(DATASOURCE) -> dataset(DATASET) via DATA_RATIO edge
"""
try:
if not src_relations:
logger.warning("Source ratio relations is empty when building lineage graph")
return

lineage_service = LineageService(db=session)
dst_node = LineageNode(
id=dst_dataset.id,
node_type=NodeType.DATASET.value,
name=dst_dataset.name,
description=dst_dataset.description,
)
for rel in src_relations:
ds: Optional[Dataset] = await session.get(Dataset, rel.source_dataset_id)
src_node = LineageNode(
id=rel.source_dataset_id,
node_type=NodeType.DATASET.value,
name=ds.name,
description=ds.description,
)
ratio_edge = LineageEdge(
process_id=task.id,
name=task.name,
edge_type=EdgeType.DATA_RATIO.value,
description=task.description,
from_node_id=rel.source_dataset_id,
to_node_id=dst_node.id,
)
await lineage_service.generate_graph(src_node, ratio_edge, dst_node)
logger.info("Add dataset lineage graph: %s -> %s -> %s", src_node.id, ratio_edge.id, dst_node.id)
await session.commit()
logger.info("Add dataset lineage graph success")
except Exception as exc:
logger.error("Failed to add dataset lineage graph: %s", exc)
await session.rollback()
Loading