Skip to content
13 changes: 10 additions & 3 deletions cognition_objects/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,20 @@ def get_all_etl_tasks(
def get_integration_progress(
integration_id: str,
) -> float:
count_all_records = integration_records_bo.count(integration_id)
integration = get_by_id(integration_id)
count_all_records = integration_records_bo.count(integration)
all_tasks = get_all_etl_tasks(integration_id)
finished_tasks = [task for task in all_tasks if task.state in FINISHED_STATES]

if count_all_records == 0:
if (
count_all_records == 0
or integration.state == enums.CognitionMarkdownFileState.FAILED.value
):
return 0.0
return round((len(finished_tasks) / count_all_records) * 100.0, 2)
integration_progress = round((len(finished_tasks) / count_all_records) * 100.0, 2)
if integration.state not in FINISHED_STATES:
integration_progress = min(integration_progress - 1, 0)
return integration_progress


def count_org_integrations(org_id: str) -> Dict[str, int]:
Expand Down
13 changes: 1 addition & 12 deletions enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,17 +400,6 @@ class Pages(Enum):
SETTINGS = "settings"


class DOCS(Enum):
UPLOADING_DATA = "https://docs.kern.ai/refinery/project-creation-and-data-upload"
KNOWLEDGE_BASE = "https://docs.kern.ai/refinery/heuristics#labeling-functions"
WORKFLOW = "https://docs.kern.ai/refinery/manual-labeling#labeling-workflow"
CREATING_PROJECTS = "https://docs.kern.ai/refinery/project-creation-and-data-upload#project-creation-workflow"
WEAK_SUPERVISION = "https://docs.kern.ai/refinery/weak-supervision"
CREATE_EMBEDDINGS = "https://docs.kern.ai/refinery/embedding-integration"
INFORMATION_SOURCES = "https://docs.kern.ai/refinery/heuristics#labeling-functions"
DATA_BROWSER = "https://docs.kern.ai/refinery/data-management"


class SliceTypes(Enum):
STATIC_DEFAULT = "STATIC_DEFAULT"
STATIC_OUTLIER = "STATIC_OUTLIER"
Expand Down Expand Up @@ -1091,7 +1080,7 @@ def get_supported_file_extensions(self) -> List[str]:
".webp",
".avif",
]
return ["txt"]
return [".txt"]

@staticmethod
def from_extension(value: str):
Expand Down
137 changes: 35 additions & 102 deletions etl_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
FileReference,
CognitionIntegration,
IntegrationSharepoint,
CognitionMarkdownDataset,
CognitionMarkdownFile,
)

ETL_DIR = Path(os.getenv("ETL_DIR", "/app/data/etl"))
Expand All @@ -24,14 +22,20 @@ def get_full_config_and_tokenizer_from_config_id(
etl_config_id: Optional[str] = None, # or in file_reference.meta_data
content_type: Optional[str] = None, # or in file_reference.content_type
chunk_size: Optional[int] = 1000,
# only set for markdown datasets
markdown_file_id: Optional[str] = None, # or in file_reference.meta_data
# only set for chat messages
project_id: Optional[str] = None,
conversation_id: Optional[str] = None,
project_id: Optional[str] = None, # or in file_reference.meta_data
conversation_id: Optional[str] = None, # or in file_reference.meta_data
) -> Tuple[Dict[str, Any], str]:
for_dataset = False
for_project = False
if project_id and conversation_id:
# project related load
for_project = True
elif markdown_file_id:
# dataset related load
for_dataset = True

etl_preset_item = etl_config_presets_db_co.get(
etl_config_id or file_reference.meta_data.get("etl_config_id")
Expand All @@ -46,6 +50,11 @@ def get_full_config_and_tokenizer_from_config_id(
"llmIdentifier": llm_indicator_extract,
"overwriteVisionPrompt": extraction_config.get("overwriteVisionPrompt"),
}
elif extraction_config.get("azureDiApiBase"):
llm_config = {
"azureDiApiBase": extraction_config["azureDiApiBase"],
"azureDiEnvVarId": extraction_config["azureDiEnvVarId"],
}
full_config = [
{
"task_type": enums.CognitionMarkdownFileState.EXTRACTING.value,
Expand All @@ -56,7 +65,7 @@ def get_full_config_and_tokenizer_from_config_id(
"minio_path": file_reference.minio_path,
"fallback": None, # later filled by config of project
},
**llm_config,
"llm_config": llm_config,
},
]

Expand All @@ -68,8 +77,9 @@ def get_full_config_and_tokenizer_from_config_id(
"llmIdentifier": transformation_config.get("llmIdentifier"),
}

# splitting strategy "CHUNK" needs llm_config to execute `split_large_sections_via_llm`
splitting_config = {
"llm_config": transformation_llm_config, # splitting strategy "CHUNK" needs llm_config to execute `split_large_sections_via_llm`
"llm_config": transformation_llm_config,
"task_type": enums.CognitionMarkdownFileState.SPLITTING.value,
"task_config": {
"use_cache": True,
Expand All @@ -79,8 +89,6 @@ def get_full_config_and_tokenizer_from_config_id(
}

if transformation_type == "COMMON_ETL":
# add default splitting for common etl

full_config.append(splitting_config)
transformers = [
{ # NOTE: __call_gpt_with_key only reads user_prompt
Expand Down Expand Up @@ -119,6 +127,7 @@ def get_full_config_and_tokenizer_from_config_id(
},
}
)

if for_project:
full_config.append(
{
Expand All @@ -139,109 +148,24 @@ def get_full_config_and_tokenizer_from_config_id(
},
},
)
else:
elif for_dataset:
full_config.append(
{
"task_type": enums.CognitionMarkdownFileState.LOADING.value,
"task_config": {
"markdown_file": {
"enabled": True,
"id": file_reference.meta_data["markdown_file_id"],
"id": (
markdown_file_id
or file_reference.meta_data["markdown_file_id"]
),
}
},
},
)
return full_config, etl_preset_item.etl_config.get("tokenizer")


def get_full_config_for_markdown_file(
file_reference: FileReference,
markdown_dataset: CognitionMarkdownDataset,
markdown_file: CognitionMarkdownFile,
chunk_size: Optional[int] = 1000,
) -> List[Dict[str, Any]]:
extraction_llm_config, transformation_llm_config = __get_llm_config_from_dataset(
markdown_dataset
)
extractor = markdown_file.meta_data.get("extractor")
if extractor is None:
print(
f"WARNING: {__name__} - no extractor found in markdown_file meta_data for {file_reference.original_file_name}, will infer default"
)

full_config = [
{
"llm_config": extraction_llm_config,
"task_type": enums.CognitionMarkdownFileState.EXTRACTING.value,
"task_config": {
"use_cache": True,
"extractor": extractor,
"minio_path": file_reference.minio_path,
"fallback": None, # later filled by config of project
},
},
{
"llm_config": extraction_llm_config,
"task_type": enums.CognitionMarkdownFileState.SPLITTING.value,
"task_config": {
"use_cache": True,
"strategy": enums.ETLSplitStrategy.CHUNK.value,
"chunk_size": chunk_size,
},
},
{
"llm_config": transformation_llm_config,
"task_type": enums.CognitionMarkdownFileState.TRANSFORMING.value,
"task_config": {
"use_cache": True,
"transformers": [
{ # NOTE: __call_gpt_with_key only reads user_prompt
"enabled": False,
"name": enums.ETLTransformer.CLEANSE.value,
"system_prompt": None,
"user_prompt": None,
},
{
"enabled": True,
"name": enums.ETLTransformer.TEXT_TO_TABLE.value,
"system_prompt": None,
"user_prompt": None,
},
{
"enabled": False,
"name": enums.ETLTransformer.SUMMARIZE.value,
"system_prompt": None,
"user_prompt": None,
},
],
},
},
{
"task_type": enums.CognitionMarkdownFileState.LOADING.value,
"task_config": {
"markdown_file": {
"enabled": True,
"id": str(markdown_file.id),
},
},
},
]
return full_config


def __get_llm_config_from_dataset(
markdown_dataset: CognitionMarkdownDataset,
) -> Tuple[Dict[str, Any], str]:
extraction_llm_config = markdown_dataset.llm_config.get("extraction", {})
transformation_llm_config = markdown_dataset.llm_config.get("transformation", {})
if not extraction_llm_config or not transformation_llm_config:
raise ValueError(
f"Dataset with id {markdown_dataset.id} has incomplete llm_config"
)

return extraction_llm_config, transformation_llm_config


def get_full_config_for_integration(
integration: CognitionIntegration,
record: IntegrationSharepoint,
Expand Down Expand Up @@ -385,6 +309,9 @@ def rm_tree(path: Path):
rm_tree(etl_cache_dir)


# TODO: delete_etl_tasks for related file_reference_id


def get_download_key(org_id: str, download_id: str) -> Path:
return Path(org_id) / download_id / "download"

Expand Down Expand Up @@ -490,10 +417,16 @@ def get_transformation_key(
return transformation_key


def get_hashed_string(*args, delimiter: str = "_") -> str:
hash_string = delimiter.join(map(str, args))
hasher = hashlib.new("sha256")
hasher.update(hash_string.encode())
def get_hashed_string(*args, delimiter: str = "_", from_bytes: bool = False) -> str:
if not from_bytes:
_hash = delimiter.join(map(str, args)).encode()
else:
try:
_hash = next(map(bytes, args))
except StopIteration:
raise ValueError("ERROR: A 'bytes' argument is required to hash")

hasher = hashlib.sha256(_hash)
return hasher.hexdigest()


Expand Down
66 changes: 66 additions & 0 deletions global_objects/etl_task.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Any, List, Optional, Dict, Union
from sqlalchemy.sql.expression import cast
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.dialects.postgresql import UUID

import datetime
import mimetypes
Expand Down Expand Up @@ -117,6 +119,70 @@ def get_supported_file_extensions() -> Dict[str, List[str]]:
return file_extensions


def get_or_create(
org_id: str,
user_id: str,
original_file_name: str,
file_size_bytes: int,
tokenizer: Optional[str] = None,
full_config: Optional[Dict[str, Any]] = None,
file_path: Optional[str] = None,
meta_data: Optional[Dict[str, Any]] = None,
priority: Optional[int] = -1,
id: Optional[str] = None,
with_commit: bool = True,
):
if id:
return get_by_id(id)

file_reference_id = meta_data.get("file_reference_id") if meta_data else None
integration_id = meta_data.get("integration_id") if meta_data else None
markdown_file_id = meta_data.get("markdown_file_id") if meta_data else None
query: EtlTask = session.query(EtlTask).filter(
EtlTask.organization_id == org_id,
EtlTask.original_file_name == original_file_name,
EtlTask.file_size_bytes == file_size_bytes,
)

if file_path:
query = query.filter(EtlTask.file_path == file_path)
if file_reference_id:
query = query.filter(
file_reference_id
== cast(EtlTask.meta_data.op("->>")("file_reference_id"), UUID)
)
if markdown_file_id:
query = query.filter(
markdown_file_id
== cast(EtlTask.meta_data.op("->>")("markdown_file_id"), UUID)
)
if integration_id:
query = query.filter(
integration_id == cast(EtlTask.meta_data.op("->>")("integration_id"), UUID)
)

# TODO: enhance
if with_commit is False:
return query.first()

if etl_task := query.first():
return etl_task

return create(
org_id=org_id,
user_id=user_id,
original_file_name=original_file_name,
file_size_bytes=file_size_bytes,
tokenizer=tokenizer,
full_config=full_config,
meta_data=meta_data,
priority=priority,
file_path=file_path,
id=id,
with_commit=with_commit,
)


def create(
org_id: str,
user_id: str,
Expand Down
16 changes: 11 additions & 5 deletions integration_objects/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
IntegrationPdf,
IntegrationGithubIssue,
IntegrationGithubFile,
CognitionIntegration,
)


Expand All @@ -31,12 +32,12 @@ def get(
return query.order_by(IntegrationModel.created_at.desc()).all()


def count(integration_id: str) -> Union[List[object], object]:
IntegrationModel = integration_model(integration_id)
def count(integration: CognitionIntegration) -> int:
IntegrationModel = integration_model(integration=integration)
return (
session.query(IntegrationModel)
.filter(
IntegrationModel.integration_id == integration_id,
IntegrationModel.integration_id == integration.id,
)
.count()
)
Expand Down Expand Up @@ -105,8 +106,13 @@ def get_all_by_integration_id(
)


def integration_model(integration_id: str) -> Type:
integration = integration_db_bo.get_by_id(integration_id)
def integration_model(
integration_id: Optional[str] = None,
integration: Optional[CognitionIntegration] = None,
) -> Type:
if not integration_id and not integration:
raise ValueError("Either integration_id or integration must be provided")
integration = integration or integration_db_bo.get_by_id(integration_id)
if integration.type == CognitionIntegrationType.SHAREPOINT.value:
return IntegrationSharepoint
elif integration.type == CognitionIntegrationType.PDF.value:
Expand Down