diff --git a/cognition_objects/integration.py b/cognition_objects/integration.py index 53771ce..5e90d10 100644 --- a/cognition_objects/integration.py +++ b/cognition_objects/integration.py @@ -185,15 +185,27 @@ def get_integration_progress( ) -> float: integration = get_by_id(integration_id) count_all_records = integration_records_bo.count(integration) - all_tasks = get_all_etl_tasks(integration_id) - finished_tasks = [task for task in all_tasks if task.state in FINISHED_STATES] if ( count_all_records == 0 or integration.state == enums.CognitionMarkdownFileState.FAILED.value ): return 0.0 - integration_progress = round((len(finished_tasks) / count_all_records) * 100.0, 2) + + all_tasks = get_all_etl_tasks(integration_id) + finished_tasks = [task for task in all_tasks if task.state in FINISHED_STATES] + count_finished_tasks = len(finished_tasks) + + # backward compatibility + if not all_tasks or len(all_tasks) != count_all_records: + all_records, _ = integration_records_bo.get_all_by_integration_id( + integration_id + ) + count_finished_tasks += len( + [record for record in all_records if not record.etl_task_id] + ) + + integration_progress = round((count_finished_tasks / count_all_records) * 100.0, 2) if integration.state not in FINISHED_STATES: integration_progress = min(integration_progress - 1, 0) return integration_progress diff --git a/cognition_objects/markdown_dataset.py b/cognition_objects/markdown_dataset.py index 74b2eed..9d6c5bd 100644 --- a/cognition_objects/markdown_dataset.py +++ b/cognition_objects/markdown_dataset.py @@ -34,7 +34,7 @@ def __get_enriched_query( category_origin: Optional[str] = None, query_add: Optional[str] = "", ) -> str: - where_add = " AND (ed.config_ids->>'isDefault')::bool is true" + where_add = "" if id: id = prevent_sql_injection(id, isinstance(id, str)) where_add += f" AND md.id = '{id}'" @@ -46,7 +46,8 @@ def __get_enriched_query( md.*, COALESCE(mf.num_files, 0) AS num_files, COALESCE(mf.num_reviewed_files, 0) AS num_reviewed_files, - ecp.etl_config + ecp.etl_config, + ecp.id as etl_config_id FROM cognition.{Tablenames.MARKDOWN_DATASET.value} md LEFT JOIN ( SELECT dataset_id, COUNT(*) as num_files, COUNT(CASE WHEN is_reviewed = TRUE THEN 1 END) AS num_reviewed_files @@ -56,7 +57,7 @@ def __get_enriched_query( LEFT JOIN( SELECT md.id, json_array_elements(md.useable_etl_configurations) config_ids FROM cognition.{Tablenames.MARKDOWN_DATASET.value} md - ) ed ON ed.id = md.id + ) ed ON ed.id = md.id AND (ed.config_ids->>'isDefault')::bool is true LEFT JOIN( SELECT ecp.id, ecp.etl_config FROM cognition.{Tablenames.ETL_CONFIG_PRESET.value} ecp @@ -177,6 +178,7 @@ def update( dataset_id: str, name: Optional[str] = None, description: Optional[str] = None, + useable_etl_configurations: Optional[List[Dict[str, Any]]] = None, with_commit: bool = True, ) -> CognitionMarkdownDataset: dataset = get(org_id, dataset_id) @@ -187,6 +189,9 @@ def update( if description: dataset.description = description + if useable_etl_configurations: + dataset.useable_etl_configurations = useable_etl_configurations + general.flush_or_commit(with_commit) return dataset diff --git a/cognition_objects/markdown_file.py b/cognition_objects/markdown_file.py index 4a1bd62..be4ca8e 100644 --- a/cognition_objects/markdown_file.py +++ b/cognition_objects/markdown_file.py @@ -93,14 +93,20 @@ def __get_enriched_query( "etl_task", "global", prefix=et_prefix, - include_columns=["is_active", "error_message"], + include_columns=[ + "started_at", + "finished_at", + "is_active", + "is_stale", + "llm_ops", + "error_message", + ], ) query = f"""SELECT - {mf_select}, {et_select}, LENGTH({mf_prefix}.content) as content_length, - COALESCE({et_prefix}.state, {mf_prefix}.state) state, - COALESCE({et_prefix}.started_at, {mf_prefix}.started_at) started_at, - COALESCE({et_prefix}.finished_at, {mf_prefix}.finished_at) finished_at + {mf_select}, {et_select}, LENGTH({mf_prefix}.content) AS content_length, + COALESCE({et_prefix}.state, {mf_prefix}.state) AS state, + {et_prefix}.meta_data->>'scope_readable' AS scope_readable FROM cognition.markdown_file {mf_prefix} LEFT JOIN global.etl_task {et_prefix} ON {mf_prefix}.etl_task_id = {et_prefix}.id """ diff --git a/etl_utils.py b/etl_utils.py index 39e2901..b9fb99b 100644 --- a/etl_utils.py +++ b/etl_utils.py @@ -301,7 +301,7 @@ def rm_tree(path: Path): if item.is_dir(): rm_tree(item) else: - item.unlink() + item.unlink(missing_ok=True) path.rmdir() etl_cache_dir = ETL_DIR / org_id / download_id @@ -309,9 +309,6 @@ def rm_tree(path: Path): rm_tree(etl_cache_dir) -# TODO: delete_etl_tasks for related file_reference_id - - def get_download_key(org_id: str, download_id: str) -> Path: return Path(org_id) / download_id / "download" diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index 6e8323d..8eb5f22 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -1,20 +1,23 @@ -from typing import Any, List, Optional, Dict, Union +from typing import Any, List, Optional, Dict, Tuple, Union from sqlalchemy.sql.expression import cast from sqlalchemy.orm.attributes import flag_modified from sqlalchemy.dialects.postgresql import UUID +from pathlib import Path import datetime import mimetypes -from submodules.model import enums +from submodules.model import enums, etl_utils from submodules.model.session import session from submodules.model.business_objects import general +from submodules.model.cognition_objects import file_reference as file_reference_co_bo from submodules.model.models import ( EtlTask, CognitionIntegration, IntegrationSharepoint, ) from submodules.model.util import prevent_sql_injection +from submodules.model.etl_utils import get_hashed_string FINISHED_STATES = [ @@ -31,9 +34,47 @@ def get_by_id(id: str) -> EtlTask: return session.query(EtlTask).filter(EtlTask.id == id).first() +def is_valid_extension(file_name: str): + supported_extensions = get_supported_file_extensions() + file_extension = Path(file_name).suffix.lower().strip() + + return file_extension in [ + ext for exts in supported_extensions.values() for ext in exts + ] + + +def is_stale( + etl_task_id: Optional[str] = None, + etl_task: Optional[Dict[str, Any]] = None, +) -> bool: + if not etl_task: + if not etl_task_id: + raise ValueError("Either etl_task_id or etl_task must be provided") + etl_task = get_enriched(etl_task_id) + if not etl_task: + return False + + file_reference = file_reference_co_bo.get_by_id( + etl_task["organization_id"], etl_task["file_reference_id"] + ) + if not file_reference: + return True + + new_full_config, tokenizer = etl_utils.get_full_config_and_tokenizer_from_config_id( + file_reference=file_reference, + etl_config_id=etl_task["etl_config_id"], + markdown_file_id=etl_task["markdown_file_id"], + ) + return etl_task["tokenizer"] == tokenizer and etl_task[ + "full_config_hash" + ] != get_hashed_string(new_full_config) + + def get_all( exclude_failed: Optional[bool] = False, only_active: Optional[bool] = False, + only_inactive: Optional[bool] = False, + created_at_until: Optional[datetime.datetime] = None, ) -> List[EtlTask]: query = session.query(EtlTask) if exclude_failed: @@ -42,9 +83,71 @@ def get_all( ) if only_active: query = query.filter(EtlTask.is_active == True) + if only_inactive: + query = query.filter(EtlTask.is_active == False) + if created_at_until: + query = query.filter(EtlTask.created_at <= created_at_until) return query.order_by(EtlTask.created_at.desc()).all() +def get_enriched(etl_task_id: str) -> Dict[str, Any]: + etl_tasks = get_all_enriched(etl_task_id=etl_task_id) + return etl_tasks[0] if etl_tasks else {} + + +def get_all_enriched( + exclude_failed: Optional[bool] = False, + only_active: Optional[bool] = False, + only_inactive: Optional[bool] = False, + only_markdown_files: Optional[bool] = False, + where_add: Optional[str] = "", + etl_task_id: Optional[str] = None, + dataset_id: Optional[str] = None, +) -> List[Dict[str, Any]]: + mf_join = "" + if etl_task_id: + where_add += " AND et.id::TEXT = '{}'".format( + prevent_sql_injection(etl_task_id, True) + ) + if dataset_id: + where_add += " AND md.id::TEXT = '{}'".format( + prevent_sql_injection(dataset_id, True) + ) + if exclude_failed: + where_add += " AND et.state != '{}'".format( + enums.CognitionMarkdownFileState.FAILED.value + ) + if only_active: + where_add += " AND et.is_active IS TRUE" + if only_inactive: + where_add += " AND et.is_active IS FALSE" + if only_markdown_files: + mf_join = "" + else: + mf_join = "LEFT" + + query = f""" + SELECT + et.*, + mf.id::TEXT AS markdown_file_id, + md.id::TEXT AS dataset_id, + md.config_ids->>'id' AS etl_config_id, + et.meta_data->>'file_reference_id' AS file_reference_id + FROM global.{enums.Tablenames.ETL_TASK.value} et + {mf_join} JOIN ( + SELECT id, etl_task_id, dataset_id + FROM cognition.{enums.Tablenames.MARKDOWN_FILE.value} + ) mf ON et.id = mf.etl_task_id + LEFT JOIN( + SELECT id, json_array_elements(useable_etl_configurations) config_ids + FROM cognition.{enums.Tablenames.MARKDOWN_DATASET.value} + ) md ON md.id = mf.dataset_id AND (md.config_ids->>'isDefault')::BOOL IS true + WHERE 1=1 {where_add} + ORDER BY et.created_at DESC + """ + return list(map(lambda x: x._asdict(), general.execute_all(query))) + + def get_all_in_org( org_id: str, exclude_failed: Optional[bool] = False, @@ -127,17 +230,14 @@ def get_or_create( tokenizer: Optional[str] = None, full_config: Optional[Dict[str, Any]] = None, file_path: Optional[str] = None, - meta_data: Optional[Dict[str, Any]] = None, + meta_data: Optional[Dict[str, Any]] = {}, priority: Optional[int] = -1, id: Optional[str] = None, with_commit: bool = True, -): +) -> Tuple[EtlTask, bool]: if id: return get_by_id(id) - file_reference_id = meta_data.get("file_reference_id") if meta_data else None - integration_id = meta_data.get("integration_id") if meta_data else None - markdown_file_id = meta_data.get("markdown_file_id") if meta_data else None query: EtlTask = session.query(EtlTask).filter( EtlTask.organization_id == org_id, EtlTask.original_file_name == original_file_name, @@ -146,40 +246,40 @@ def get_or_create( if file_path: query = query.filter(EtlTask.file_path == file_path) - if file_reference_id: + + if file_reference_id := meta_data.get("file_reference_id"): query = query.filter( file_reference_id == cast(EtlTask.meta_data.op("->>")("file_reference_id"), UUID) ) - if markdown_file_id: + if markdown_file_id := meta_data.get("markdown_file_id"): query = query.filter( markdown_file_id == cast(EtlTask.meta_data.op("->>")("markdown_file_id"), UUID) ) - if integration_id: + if integration_id := meta_data.get("integration_id"): query = query.filter( integration_id == cast(EtlTask.meta_data.op("->>")("integration_id"), UUID) ) - # TODO: enhance - if with_commit is False: - return query.first() - if etl_task := query.first(): - return etl_task + return etl_task, True - return create( - org_id=org_id, - user_id=user_id, - original_file_name=original_file_name, - file_size_bytes=file_size_bytes, - tokenizer=tokenizer, - full_config=full_config, - meta_data=meta_data, - priority=priority, - file_path=file_path, - id=id, - with_commit=with_commit, + return ( + create( + org_id=org_id, + user_id=user_id, + original_file_name=original_file_name, + file_size_bytes=file_size_bytes, + tokenizer=tokenizer, + full_config=full_config, + meta_data=meta_data, + priority=priority, + file_path=file_path, + id=id, + with_commit=with_commit, + ), + False, ) @@ -205,8 +305,15 @@ def create( file_size_bytes=file_size_bytes, tokenizer=tokenizer, full_config=full_config, + full_config_hash=get_hashed_string(full_config), meta_data=meta_data, priority=priority, + llm_ops={ + "total_llm_calls": 0, + "total_tokens_input": 0, + "total_tokens_output": 0, + "total_cost_eur": 0.0, + }, ) general.add(etl_task, with_commit) @@ -221,6 +328,7 @@ def update( file_path: Optional[str] = None, file_size_bytes: Optional[int] = None, full_config: Optional[Dict] = None, + tokenizer: Optional[str] = None, started_at: Optional[datetime.datetime] = None, finished_at: Optional[Union[str, datetime.datetime]] = None, state: Optional[enums.CognitionMarkdownFileState] = None, @@ -228,6 +336,8 @@ def update( meta_data: Optional[Dict[str, Any]] = None, priority: Optional[int] = None, error_message: Optional[str] = None, + is_stale: Optional[bool] = None, + llm_ops: Optional[Dict[str, Any]] = None, overwrite_meta_data: bool = False, with_commit: bool = True, ) -> Optional[EtlTask]: @@ -244,10 +354,13 @@ def update( etl_task.file_path = file_path if file_size_bytes is not None and etl_task.file_size_bytes is None: etl_task.file_size_bytes = file_size_bytes + if tokenizer is not None: + etl_task.tokenizer = tokenizer if original_file_name is not None and etl_task.original_file_name is None: etl_task.original_file_name = original_file_name if full_config is not None: etl_task.full_config = full_config + etl_task.full_config_hash = get_hashed_string(full_config) flag_modified(etl_task, "full_config") if started_at is not None: etl_task.started_at = started_at @@ -260,6 +373,14 @@ def update( etl_task.state = state.value if is_active is not None: etl_task.is_active = is_active + if is_stale is not None: + etl_task.is_stale = is_stale + if llm_ops is not None: + if overwrite_meta_data: + etl_task.llm_ops = llm_ops + else: + etl_task.llm_ops.update(llm_ops) + flag_modified(etl_task, "llm_ops") if meta_data is not None: if overwrite_meta_data: etl_task.meta_data = meta_data @@ -292,7 +413,6 @@ def execution_finished(id: str) -> bool: def delete_many(ids: List[str], with_commit: bool = True) -> None: - # TODO: cascade delete cached files ( session.query(EtlTask) .filter(EtlTask.id.in_(ids)) diff --git a/models.py b/models.py index 71cb78b..38bda5c 100644 --- a/models.py +++ b/models.py @@ -1602,7 +1602,7 @@ class CognitionMarkdownFile(Base): etl_task_id = Column( UUID(as_uuid=True), - ForeignKey(f"global.{Tablenames.ETL_TASK.value}.id", ondelete="CASCADE"), + ForeignKey(f"global.{Tablenames.ETL_TASK.value}.id", ondelete="SET NULL"), index=True, ) @@ -2610,10 +2610,16 @@ class EtlTask(Base): String, default=CognitionMarkdownFileState.QUEUE.value ) # of type enums.CognitionMarkdownFileState is_active = Column(Boolean, default=False) + priority = Column(Integer, default=0) error_message = Column(String) meta_data = Column(JSON) + full_config_hash = Column(String, index=True) + is_stale = Column(Boolean, default=False) + llm_ops = Column(JSON) + updated_at = Column(DateTime, onupdate=sql.func.now()) + class ConversationShare(Base): __tablename__ = Tablenames.CONVERSATION_SHARE.value