From 2835c7300d498d8ad93414a1433e41508b764da8 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Wed, 10 Dec 2025 22:44:24 +0100 Subject: [PATCH 01/16] fix: add backwards compatibility for integration progress --- cognition_objects/integration.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/cognition_objects/integration.py b/cognition_objects/integration.py index 53771ce..5e90d10 100644 --- a/cognition_objects/integration.py +++ b/cognition_objects/integration.py @@ -185,15 +185,27 @@ def get_integration_progress( ) -> float: integration = get_by_id(integration_id) count_all_records = integration_records_bo.count(integration) - all_tasks = get_all_etl_tasks(integration_id) - finished_tasks = [task for task in all_tasks if task.state in FINISHED_STATES] if ( count_all_records == 0 or integration.state == enums.CognitionMarkdownFileState.FAILED.value ): return 0.0 - integration_progress = round((len(finished_tasks) / count_all_records) * 100.0, 2) + + all_tasks = get_all_etl_tasks(integration_id) + finished_tasks = [task for task in all_tasks if task.state in FINISHED_STATES] + count_finished_tasks = len(finished_tasks) + + # backward compatibility + if not all_tasks or len(all_tasks) != count_all_records: + all_records, _ = integration_records_bo.get_all_by_integration_id( + integration_id + ) + count_finished_tasks += len( + [record for record in all_records if not record.etl_task_id] + ) + + integration_progress = round((count_finished_tasks / count_all_records) * 100.0, 2) if integration.state not in FINISHED_STATES: integration_progress = min(integration_progress - 1, 0) return integration_progress From 65114244ece0d9694b63b85236c928c1017f8173 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Wed, 10 Dec 2025 23:01:39 +0100 Subject: [PATCH 02/16] chore: remove stale comments --- etl_utils.py | 3 --- global_objects/etl_task.py | 1 - 2 files changed, 4 deletions(-) diff --git a/etl_utils.py b/etl_utils.py index 39e2901..5560661 100644 --- a/etl_utils.py +++ b/etl_utils.py @@ -309,9 +309,6 @@ def rm_tree(path: Path): rm_tree(etl_cache_dir) -# TODO: delete_etl_tasks for related file_reference_id - - def get_download_key(org_id: str, download_id: str) -> Path: return Path(org_id) / download_id / "download" diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index 6e8323d..aeb5e7a 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -292,7 +292,6 @@ def execution_finished(id: str) -> bool: def delete_many(ids: List[str], with_commit: bool = True) -> None: - # TODO: cascade delete cached files ( session.query(EtlTask) .filter(EtlTask.id.in_(ids)) From 33959df268f3749095388888b356be15ffa52954 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 11 Dec 2025 15:09:16 +0100 Subject: [PATCH 03/16] perf: update dataset --- cognition_objects/markdown_dataset.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cognition_objects/markdown_dataset.py b/cognition_objects/markdown_dataset.py index 74b2eed..6256352 100644 --- a/cognition_objects/markdown_dataset.py +++ b/cognition_objects/markdown_dataset.py @@ -177,6 +177,7 @@ def update( dataset_id: str, name: Optional[str] = None, description: Optional[str] = None, + useable_etl_configurations: Optional[List[Dict[str, Any]]] = None, with_commit: bool = True, ) -> CognitionMarkdownDataset: dataset = get(org_id, dataset_id) @@ -187,6 +188,9 @@ def update( if description: dataset.description = description + if useable_etl_configurations: + dataset.useable_etl_configurations = useable_etl_configurations + general.flush_or_commit(with_commit) return dataset From 689d45480dfb7cc8ccc6964b3d6469d129812677 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 11 Dec 2025 20:56:56 +0100 Subject: [PATCH 04/16] perf: full_config_hash --- global_objects/etl_task.py | 46 ++++++++++++++++++++------------------ models.py | 3 +++ 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index aeb5e7a..1494aee 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional, Dict, Union +from typing import Any, List, Optional, Dict, Tuple, Union from sqlalchemy.sql.expression import cast from sqlalchemy.orm.attributes import flag_modified from sqlalchemy.dialects.postgresql import UUID @@ -15,6 +15,7 @@ IntegrationSharepoint, ) from submodules.model.util import prevent_sql_injection +from submodules.model.etl_utils import get_hashed_string FINISHED_STATES = [ @@ -131,13 +132,10 @@ def get_or_create( priority: Optional[int] = -1, id: Optional[str] = None, with_commit: bool = True, -): +) -> Tuple[EtlTask, bool]: if id: return get_by_id(id) - file_reference_id = meta_data.get("file_reference_id") if meta_data else None - integration_id = meta_data.get("integration_id") if meta_data else None - markdown_file_id = meta_data.get("markdown_file_id") if meta_data else None query: EtlTask = session.query(EtlTask).filter( EtlTask.organization_id == org_id, EtlTask.original_file_name == original_file_name, @@ -146,6 +144,10 @@ def get_or_create( if file_path: query = query.filter(EtlTask.file_path == file_path) + + file_reference_id = meta_data.get("file_reference_id") if meta_data else None + integration_id = meta_data.get("integration_id") if meta_data else None + markdown_file_id = meta_data.get("markdown_file_id") if meta_data else None if file_reference_id: query = query.filter( file_reference_id @@ -161,25 +163,24 @@ def get_or_create( integration_id == cast(EtlTask.meta_data.op("->>")("integration_id"), UUID) ) - # TODO: enhance - if with_commit is False: - return query.first() - if etl_task := query.first(): - return etl_task + return etl_task, True - return create( - org_id=org_id, - user_id=user_id, - original_file_name=original_file_name, - file_size_bytes=file_size_bytes, - tokenizer=tokenizer, - full_config=full_config, - meta_data=meta_data, - priority=priority, - file_path=file_path, - id=id, - with_commit=with_commit, + return ( + create( + org_id=org_id, + user_id=user_id, + original_file_name=original_file_name, + file_size_bytes=file_size_bytes, + tokenizer=tokenizer, + full_config=full_config, + meta_data=meta_data, + priority=priority, + file_path=file_path, + id=id, + with_commit=with_commit, + ), + False, ) @@ -205,6 +206,7 @@ def create( file_size_bytes=file_size_bytes, tokenizer=tokenizer, full_config=full_config, + full_config_hash=get_hashed_string(full_config), meta_data=meta_data, priority=priority, ) diff --git a/models.py b/models.py index 71cb78b..a8df0a9 100644 --- a/models.py +++ b/models.py @@ -2614,6 +2614,9 @@ class EtlTask(Base): error_message = Column(String) meta_data = Column(JSON) + full_config_hash = Column(String, index=True) + is_stale = Column(Boolean, default=False) + class ConversationShare(Base): __tablename__ = Tablenames.CONVERSATION_SHARE.value From 139db9b4a2ea9071e4b028583f03d37312ff398e Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 11 Dec 2025 22:09:42 +0100 Subject: [PATCH 05/16] perf(etl): is stale check --- cognition_objects/markdown_dataset.py | 7 ++++--- global_objects/etl_task.py | 23 ++++++++++++++++------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/cognition_objects/markdown_dataset.py b/cognition_objects/markdown_dataset.py index 6256352..9d6c5bd 100644 --- a/cognition_objects/markdown_dataset.py +++ b/cognition_objects/markdown_dataset.py @@ -34,7 +34,7 @@ def __get_enriched_query( category_origin: Optional[str] = None, query_add: Optional[str] = "", ) -> str: - where_add = " AND (ed.config_ids->>'isDefault')::bool is true" + where_add = "" if id: id = prevent_sql_injection(id, isinstance(id, str)) where_add += f" AND md.id = '{id}'" @@ -46,7 +46,8 @@ def __get_enriched_query( md.*, COALESCE(mf.num_files, 0) AS num_files, COALESCE(mf.num_reviewed_files, 0) AS num_reviewed_files, - ecp.etl_config + ecp.etl_config, + ecp.id as etl_config_id FROM cognition.{Tablenames.MARKDOWN_DATASET.value} md LEFT JOIN ( SELECT dataset_id, COUNT(*) as num_files, COUNT(CASE WHEN is_reviewed = TRUE THEN 1 END) AS num_reviewed_files @@ -56,7 +57,7 @@ def __get_enriched_query( LEFT JOIN( SELECT md.id, json_array_elements(md.useable_etl_configurations) config_ids FROM cognition.{Tablenames.MARKDOWN_DATASET.value} md - ) ed ON ed.id = md.id + ) ed ON ed.id = md.id AND (ed.config_ids->>'isDefault')::bool is true LEFT JOIN( SELECT ecp.id, ecp.etl_config FROM cognition.{Tablenames.ETL_CONFIG_PRESET.value} ecp diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index 1494aee..492a563 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -32,6 +32,18 @@ def get_by_id(id: str) -> EtlTask: return session.query(EtlTask).filter(EtlTask.id == id).first() +def is_stale( + new_full_config: Dict[str, Any], + etl_task: Optional[EtlTask] = None, + etl_task_id: Optional[str] = None, +) -> bool: + if not etl_task: + if not etl_task_id: + raise Exception("ERROR: Either etl_task or etl_task_id must be provided") + etl_task = get_by_id(etl_task_id) + return etl_task.full_config_hash != get_hashed_string(new_full_config) + + def get_all( exclude_failed: Optional[bool] = False, only_active: Optional[bool] = False, @@ -128,7 +140,7 @@ def get_or_create( tokenizer: Optional[str] = None, full_config: Optional[Dict[str, Any]] = None, file_path: Optional[str] = None, - meta_data: Optional[Dict[str, Any]] = None, + meta_data: Optional[Dict[str, Any]] = {}, priority: Optional[int] = -1, id: Optional[str] = None, with_commit: bool = True, @@ -145,20 +157,17 @@ def get_or_create( if file_path: query = query.filter(EtlTask.file_path == file_path) - file_reference_id = meta_data.get("file_reference_id") if meta_data else None - integration_id = meta_data.get("integration_id") if meta_data else None - markdown_file_id = meta_data.get("markdown_file_id") if meta_data else None - if file_reference_id: + if file_reference_id := meta_data.get("file_reference_id"): query = query.filter( file_reference_id == cast(EtlTask.meta_data.op("->>")("file_reference_id"), UUID) ) - if markdown_file_id: + if markdown_file_id := meta_data.get("markdown_file_id"): query = query.filter( markdown_file_id == cast(EtlTask.meta_data.op("->>")("markdown_file_id"), UUID) ) - if integration_id: + if integration_id := meta_data.get("integration_id"): query = query.filter( integration_id == cast(EtlTask.meta_data.op("->>")("integration_id"), UUID) ) From 623444117424374f085e19fbe34d55e7874ce6d2 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 11 Dec 2025 23:21:27 +0100 Subject: [PATCH 06/16] perf: new etl attrs --- global_objects/etl_task.py | 75 +++++++++++++++++++++++++++++++++++--- models.py | 3 ++ 2 files changed, 72 insertions(+), 6 deletions(-) diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index 492a563..70d9317 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -6,9 +6,10 @@ import datetime import mimetypes -from submodules.model import enums +from submodules.model import enums, etl_utils from submodules.model.session import session from submodules.model.business_objects import general +from submodules.model.cognition_objects import file_reference as file_reference_co_bo from submodules.model.models import ( EtlTask, CognitionIntegration, @@ -33,15 +34,31 @@ def get_by_id(id: str) -> EtlTask: def is_stale( - new_full_config: Dict[str, Any], - etl_task: Optional[EtlTask] = None, etl_task_id: Optional[str] = None, + etl_task: Optional[Dict[str, Any]] = None, ) -> bool: if not etl_task: if not etl_task_id: - raise Exception("ERROR: Either etl_task or etl_task_id must be provided") - etl_task = get_by_id(etl_task_id) - return etl_task.full_config_hash != get_hashed_string(new_full_config) + raise ValueError("Either etl_task_id or etl_task must be provided") + etl_task = get_enriched(etl_task_id) + if not etl_task: + return False + + file_reference = file_reference_co_bo.get_by_id( + etl_task["organization_id"], etl_task["file_reference_id"] + ) + if not file_reference: + return False + + new_full_config, tokenizer = etl_utils.get_full_config_and_tokenizer_from_config_id( + file_reference=file_reference, + etl_config_id=etl_task["etl_config_id"], + markdown_file_id=etl_task["markdown_file_id"], + ) + return ( + etl_task.tokenizer == tokenizer + and etl_task.full_config_hash != get_hashed_string(new_full_config) + ) def get_all( @@ -58,6 +75,52 @@ def get_all( return query.order_by(EtlTask.created_at.desc()).all() +def get_enriched(etl_task_id: str) -> Dict[str, Any]: + etl_tasks = get_all_enriched( + where_add=f" AND et.id = '{prevent_sql_injection(etl_task_id, True)}' ", + ) + return etl_tasks[0] if etl_tasks else {} + + +def get_all_enriched( + exclude_failed: Optional[bool] = False, + only_active: Optional[bool] = False, + only_markdown_files: Optional[bool] = False, + where_add: Optional[str] = "", +) -> List[Dict[str, Any]]: + mf_join = "" + if exclude_failed: + where_add += " AND et.state != '{}'".format( + enums.CognitionMarkdownFileState.FAILED.value + ) + if only_active: + where_add += " AND et.is_active IS TRUE" + if only_markdown_files: + mf_join = "" + else: + mf_join = "LEFT" + + query = f""" + SELECT + et.*, + md.id AS dataset_id, + md.config_ids->>'id' AS etl_config_id, + et.meta_data->>'file_reference_id' AS file_reference_id + FROM global.{enums.Tablenames.ETL_TASK.value} et + {mf_join} JOIN ( + SELECT id, dataset_id + FROM cognition.{enums.Tablenames.MARKDOWN_FILE.value} + ) mf ON et.meta_data->>'markdown_file_id' = mf.id::varchar + LEFT JOIN( + SELECT id, json_array_elements(useable_etl_configurations) config_ids + FROM cognition.{enums.Tablenames.MARKDOWN_DATASET.value} + ) md ON md.id = mf.dataset_id AND (md.config_ids->>'isDefault')::bool is true + WHERE 1=1 {where_add} + ORDER BY et.created_at DESC + """ + return general.execute_all(query) + + def get_all_in_org( org_id: str, exclude_failed: Optional[bool] = False, diff --git a/models.py b/models.py index a8df0a9..7bb8a15 100644 --- a/models.py +++ b/models.py @@ -2610,12 +2610,15 @@ class EtlTask(Base): String, default=CognitionMarkdownFileState.QUEUE.value ) # of type enums.CognitionMarkdownFileState is_active = Column(Boolean, default=False) + priority = Column(Integer, default=0) error_message = Column(String) meta_data = Column(JSON) full_config_hash = Column(String, index=True) is_stale = Column(Boolean, default=False) + num_llm_ops = Column(Integer, default=0) + updated_at = Column(DateTime, onupdate=sql.func.now()) class ConversationShare(Base): From eda6ab5782654d1cb44c580a8ef882766b377899 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 11 Dec 2025 23:34:44 +0100 Subject: [PATCH 07/16] perf: etl task new attrs --- models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models.py b/models.py index 7bb8a15..a31f738 100644 --- a/models.py +++ b/models.py @@ -2617,7 +2617,7 @@ class EtlTask(Base): full_config_hash = Column(String, index=True) is_stale = Column(Boolean, default=False) - num_llm_ops = Column(Integer, default=0) + llm_ops = Column(JSON) updated_at = Column(DateTime, onupdate=sql.func.now()) From 61ce3bd8dc61ee6b698bf31569ed6098f366b783 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 12 Dec 2025 00:32:15 +0100 Subject: [PATCH 08/16] perf: create/update new etl attrs --- global_objects/etl_task.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index 70d9317..dd2bdf0 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -77,7 +77,7 @@ def get_all( def get_enriched(etl_task_id: str) -> Dict[str, Any]: etl_tasks = get_all_enriched( - where_add=f" AND et.id = '{prevent_sql_injection(etl_task_id, True)}' ", + where_add=f" AND et.id = '{prevent_sql_injection(etl_task_id, True)}'", ) return etl_tasks[0] if etl_tasks else {} @@ -103,6 +103,7 @@ def get_all_enriched( query = f""" SELECT et.*, + mf.id AS markdown_file_id, md.id AS dataset_id, md.config_ids->>'id' AS etl_config_id, et.meta_data->>'file_reference_id' AS file_reference_id @@ -281,6 +282,12 @@ def create( full_config_hash=get_hashed_string(full_config), meta_data=meta_data, priority=priority, + llm_ops={ + "total_llm_calls": 0, + "total_tokens_input": 0, + "total_tokens_output": 0, + "total_cost_eur": 0.0, + }, ) general.add(etl_task, with_commit) @@ -302,6 +309,8 @@ def update( meta_data: Optional[Dict[str, Any]] = None, priority: Optional[int] = None, error_message: Optional[str] = None, + is_stale: Optional[bool] = None, + llm_ops: Optional[Dict[str, Any]] = None, overwrite_meta_data: bool = False, with_commit: bool = True, ) -> Optional[EtlTask]: @@ -334,6 +343,14 @@ def update( etl_task.state = state.value if is_active is not None: etl_task.is_active = is_active + if is_stale is not None: + etl_task.is_stale = is_stale + if llm_ops is not None: + if overwrite_meta_data: + etl_task.llm_ops = llm_ops + else: + etl_task.llm_ops.update(llm_ops) + flag_modified(etl_task, "llm_ops") if meta_data is not None: if overwrite_meta_data: etl_task.meta_data = meta_data From 433b35b200a587e3464b244b95fceb3f9fc0af59 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 12 Dec 2025 02:03:26 +0100 Subject: [PATCH 09/16] fix: minor improvements --- global_objects/etl_task.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index dd2bdf0..188b35a 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -77,9 +77,9 @@ def get_all( def get_enriched(etl_task_id: str) -> Dict[str, Any]: etl_tasks = get_all_enriched( - where_add=f" AND et.id = '{prevent_sql_injection(etl_task_id, True)}'", + where_add=f" AND et.id::TEXT = '{prevent_sql_injection(etl_task_id, True)}'", ) - return etl_tasks[0] if etl_tasks else {} + return etl_tasks[0]._asdict() if etl_tasks else {} def get_all_enriched( @@ -103,19 +103,19 @@ def get_all_enriched( query = f""" SELECT et.*, - mf.id AS markdown_file_id, - md.id AS dataset_id, + mf.id::TEXT AS markdown_file_id, + md.id::TEXT AS dataset_id, md.config_ids->>'id' AS etl_config_id, et.meta_data->>'file_reference_id' AS file_reference_id FROM global.{enums.Tablenames.ETL_TASK.value} et {mf_join} JOIN ( SELECT id, dataset_id FROM cognition.{enums.Tablenames.MARKDOWN_FILE.value} - ) mf ON et.meta_data->>'markdown_file_id' = mf.id::varchar + ) mf ON et.meta_data->>'markdown_file_id' = mf.id::TEXT LEFT JOIN( SELECT id, json_array_elements(useable_etl_configurations) config_ids FROM cognition.{enums.Tablenames.MARKDOWN_DATASET.value} - ) md ON md.id = mf.dataset_id AND (md.config_ids->>'isDefault')::bool is true + ) md ON md.id = mf.dataset_id AND (md.config_ids->>'isDefault')::BOOL IS true WHERE 1=1 {where_add} ORDER BY et.created_at DESC """ @@ -302,6 +302,7 @@ def update( file_path: Optional[str] = None, file_size_bytes: Optional[int] = None, full_config: Optional[Dict] = None, + tokenizer: Optional[str] = None, started_at: Optional[datetime.datetime] = None, finished_at: Optional[Union[str, datetime.datetime]] = None, state: Optional[enums.CognitionMarkdownFileState] = None, @@ -327,10 +328,13 @@ def update( etl_task.file_path = file_path if file_size_bytes is not None and etl_task.file_size_bytes is None: etl_task.file_size_bytes = file_size_bytes + if tokenizer is not None: + etl_task.tokenizer = tokenizer if original_file_name is not None and etl_task.original_file_name is None: etl_task.original_file_name = original_file_name if full_config is not None: etl_task.full_config = full_config + etl_task.full_config_hash = get_hashed_string(full_config) flag_modified(etl_task, "full_config") if started_at is not None: etl_task.started_at = started_at From 144dde2af3d2c940ee13df161194c4aa76a0c197 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 12 Dec 2025 02:04:26 +0100 Subject: [PATCH 10/16] chore: conflict resolution --- cognition_objects/markdown_file.py | 2 +- global_objects/etl_task.py | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/cognition_objects/markdown_file.py b/cognition_objects/markdown_file.py index 4a1bd62..4eb2ec6 100644 --- a/cognition_objects/markdown_file.py +++ b/cognition_objects/markdown_file.py @@ -93,7 +93,7 @@ def __get_enriched_query( "etl_task", "global", prefix=et_prefix, - include_columns=["is_active", "error_message"], + include_columns=["is_active", "is_stale", "llm_ops", "error_message"], ) query = f"""SELECT diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index 188b35a..191e1c8 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -76,10 +76,8 @@ def get_all( def get_enriched(etl_task_id: str) -> Dict[str, Any]: - etl_tasks = get_all_enriched( - where_add=f" AND et.id::TEXT = '{prevent_sql_injection(etl_task_id, True)}'", - ) - return etl_tasks[0]._asdict() if etl_tasks else {} + etl_tasks = get_all_enriched(etl_task_id=etl_task_id) + return etl_tasks[0] if etl_tasks else {} def get_all_enriched( @@ -87,8 +85,18 @@ def get_all_enriched( only_active: Optional[bool] = False, only_markdown_files: Optional[bool] = False, where_add: Optional[str] = "", + etl_task_id: Optional[str] = None, + dataset_id: Optional[str] = None, ) -> List[Dict[str, Any]]: mf_join = "" + if etl_task_id: + where_add += " AND et.id::TEXT = '{}'".format( + prevent_sql_injection(etl_task_id, True) + ) + if dataset_id: + where_add += " AND md.id::TEXT = '{}'".format( + prevent_sql_injection(dataset_id, True) + ) if exclude_failed: where_add += " AND et.state != '{}'".format( enums.CognitionMarkdownFileState.FAILED.value From 5200c7faee8b1f06f2bee0ab6d00000331f685a0 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 12 Dec 2025 12:06:36 +0100 Subject: [PATCH 11/16] fix: is_stale --- cognition_objects/markdown_file.py | 4 ++-- global_objects/etl_task.py | 11 +++++------ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/cognition_objects/markdown_file.py b/cognition_objects/markdown_file.py index 4eb2ec6..77d4b39 100644 --- a/cognition_objects/markdown_file.py +++ b/cognition_objects/markdown_file.py @@ -99,8 +99,8 @@ def __get_enriched_query( query = f"""SELECT {mf_select}, {et_select}, LENGTH({mf_prefix}.content) as content_length, COALESCE({et_prefix}.state, {mf_prefix}.state) state, - COALESCE({et_prefix}.started_at, {mf_prefix}.started_at) started_at, - COALESCE({et_prefix}.finished_at, {mf_prefix}.finished_at) finished_at + {et_prefix}.started_at, + {et_prefix}.finished_at FROM cognition.markdown_file {mf_prefix} LEFT JOIN global.etl_task {et_prefix} ON {mf_prefix}.etl_task_id = {et_prefix}.id """ diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index 191e1c8..ab5a551 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -48,17 +48,16 @@ def is_stale( etl_task["organization_id"], etl_task["file_reference_id"] ) if not file_reference: - return False + return True new_full_config, tokenizer = etl_utils.get_full_config_and_tokenizer_from_config_id( file_reference=file_reference, etl_config_id=etl_task["etl_config_id"], markdown_file_id=etl_task["markdown_file_id"], ) - return ( - etl_task.tokenizer == tokenizer - and etl_task.full_config_hash != get_hashed_string(new_full_config) - ) + return etl_task["tokenizer"] == tokenizer and etl_task[ + "full_config_hash" + ] != get_hashed_string(new_full_config) def get_all( @@ -127,7 +126,7 @@ def get_all_enriched( WHERE 1=1 {where_add} ORDER BY et.created_at DESC """ - return general.execute_all(query) + return list(map(lambda x: x._asdict(), general.execute_all(query))) def get_all_in_org( From c9761e87c80039af986e86f5693032fc969cc1cb Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 12 Dec 2025 14:05:55 +0100 Subject: [PATCH 12/16] perf: fetch parsing scope --- cognition_objects/markdown_file.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/cognition_objects/markdown_file.py b/cognition_objects/markdown_file.py index 77d4b39..be4ca8e 100644 --- a/cognition_objects/markdown_file.py +++ b/cognition_objects/markdown_file.py @@ -93,14 +93,20 @@ def __get_enriched_query( "etl_task", "global", prefix=et_prefix, - include_columns=["is_active", "is_stale", "llm_ops", "error_message"], + include_columns=[ + "started_at", + "finished_at", + "is_active", + "is_stale", + "llm_ops", + "error_message", + ], ) query = f"""SELECT - {mf_select}, {et_select}, LENGTH({mf_prefix}.content) as content_length, - COALESCE({et_prefix}.state, {mf_prefix}.state) state, - {et_prefix}.started_at, - {et_prefix}.finished_at + {mf_select}, {et_select}, LENGTH({mf_prefix}.content) AS content_length, + COALESCE({et_prefix}.state, {mf_prefix}.state) AS state, + {et_prefix}.meta_data->>'scope_readable' AS scope_readable FROM cognition.markdown_file {mf_prefix} LEFT JOIN global.etl_task {et_prefix} ON {mf_prefix}.etl_task_id = {et_prefix}.id """ From 0577f334fbebf4a18fefe3399b5044b2e8865c02 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Mon, 15 Dec 2025 08:39:15 +0100 Subject: [PATCH 13/16] perf: only fetch inactive enriched datasets for stale config check --- global_objects/etl_task.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index ab5a551..a42b275 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -82,6 +82,7 @@ def get_enriched(etl_task_id: str) -> Dict[str, Any]: def get_all_enriched( exclude_failed: Optional[bool] = False, only_active: Optional[bool] = False, + only_inactive: Optional[bool] = False, only_markdown_files: Optional[bool] = False, where_add: Optional[str] = "", etl_task_id: Optional[str] = None, @@ -102,6 +103,8 @@ def get_all_enriched( ) if only_active: where_add += " AND et.is_active IS TRUE" + if only_inactive: + where_add += " AND et.is_active IS FALSE" if only_markdown_files: mf_join = "" else: From eeb8dd0a62f67cf9b145239d49c0f96af90b9ba5 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Mon, 15 Dec 2025 08:58:19 +0100 Subject: [PATCH 14/16] perf: SET NULL on md_file etl_task deletion --- etl_utils.py | 2 +- global_objects/etl_task.py | 8 +++++++- models.py | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/etl_utils.py b/etl_utils.py index 5560661..b9fb99b 100644 --- a/etl_utils.py +++ b/etl_utils.py @@ -301,7 +301,7 @@ def rm_tree(path: Path): if item.is_dir(): rm_tree(item) else: - item.unlink() + item.unlink(missing_ok=True) path.rmdir() etl_cache_dir = ETL_DIR / org_id / download_id diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index a42b275..21e8db0 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -63,6 +63,8 @@ def is_stale( def get_all( exclude_failed: Optional[bool] = False, only_active: Optional[bool] = False, + only_inactive: Optional[bool] = False, + created_at_until: Optional[datetime.datetime] = None, ) -> List[EtlTask]: query = session.query(EtlTask) if exclude_failed: @@ -71,6 +73,10 @@ def get_all( ) if only_active: query = query.filter(EtlTask.is_active == True) + if only_inactive: + query = query.filter(EtlTask.is_active == False) + if created_at_until: + query = query.filter(EtlTask.created_at <= created_at_until) return query.order_by(EtlTask.created_at.desc()).all() @@ -121,7 +127,7 @@ def get_all_enriched( {mf_join} JOIN ( SELECT id, dataset_id FROM cognition.{enums.Tablenames.MARKDOWN_FILE.value} - ) mf ON et.meta_data->>'markdown_file_id' = mf.id::TEXT + ) mf ON et.id = mf.etl_task_id LEFT JOIN( SELECT id, json_array_elements(useable_etl_configurations) config_ids FROM cognition.{enums.Tablenames.MARKDOWN_DATASET.value} diff --git a/models.py b/models.py index a31f738..38bda5c 100644 --- a/models.py +++ b/models.py @@ -1602,7 +1602,7 @@ class CognitionMarkdownFile(Base): etl_task_id = Column( UUID(as_uuid=True), - ForeignKey(f"global.{Tablenames.ETL_TASK.value}.id", ondelete="CASCADE"), + ForeignKey(f"global.{Tablenames.ETL_TASK.value}.id", ondelete="SET NULL"), index=True, ) From 4c97c8d3a89384f0130004fb44562e43721cafcd Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Mon, 15 Dec 2025 10:04:00 +0100 Subject: [PATCH 15/16] fix: get all enriched etl tasks join criteria --- global_objects/etl_task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index 21e8db0..95a6410 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -125,7 +125,7 @@ def get_all_enriched( et.meta_data->>'file_reference_id' AS file_reference_id FROM global.{enums.Tablenames.ETL_TASK.value} et {mf_join} JOIN ( - SELECT id, dataset_id + SELECT id, etl_task_id, dataset_id FROM cognition.{enums.Tablenames.MARKDOWN_FILE.value} ) mf ON et.id = mf.etl_task_id LEFT JOIN( From d097dd260644bb382375a00763faa7ff800e1ca3 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Mon, 15 Dec 2025 11:16:32 +0100 Subject: [PATCH 16/16] perf: adds is_valid_extension check for etl task --- global_objects/etl_task.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/global_objects/etl_task.py b/global_objects/etl_task.py index 95a6410..8eb5f22 100644 --- a/global_objects/etl_task.py +++ b/global_objects/etl_task.py @@ -2,6 +2,7 @@ from sqlalchemy.sql.expression import cast from sqlalchemy.orm.attributes import flag_modified from sqlalchemy.dialects.postgresql import UUID +from pathlib import Path import datetime import mimetypes @@ -33,6 +34,15 @@ def get_by_id(id: str) -> EtlTask: return session.query(EtlTask).filter(EtlTask.id == id).first() +def is_valid_extension(file_name: str): + supported_extensions = get_supported_file_extensions() + file_extension = Path(file_name).suffix.lower().strip() + + return file_extension in [ + ext for exts in supported_extensions.values() for ext in exts + ] + + def is_stale( etl_task_id: Optional[str] = None, etl_task: Optional[Dict[str, Any]] = None,