Skip to content

Commit 4f5e0b1

Browse files
ETL Postrelease Improvements (#207)
* fix: add backwards compatibility for integration progress * chore: remove stale comments * perf: update dataset * perf: full_config_hash * perf(etl): is stale check * perf: new etl attrs * perf: etl task new attrs * perf: create/update new etl attrs * fix: minor improvements * chore: conflict resolution * fix: is_stale * perf: fetch parsing scope * perf: only fetch inactive enriched datasets for stale config check * perf: SET NULL on md_file etl_task deletion * fix: get all enriched etl tasks join criteria * perf: adds is_valid_extension check for etl task
1 parent 67d2713 commit 4f5e0b1

File tree

6 files changed

+190
-44
lines changed

6 files changed

+190
-44
lines changed

cognition_objects/integration.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,15 +185,27 @@ def get_integration_progress(
185185
) -> float:
186186
integration = get_by_id(integration_id)
187187
count_all_records = integration_records_bo.count(integration)
188-
all_tasks = get_all_etl_tasks(integration_id)
189-
finished_tasks = [task for task in all_tasks if task.state in FINISHED_STATES]
190188

191189
if (
192190
count_all_records == 0
193191
or integration.state == enums.CognitionMarkdownFileState.FAILED.value
194192
):
195193
return 0.0
196-
integration_progress = round((len(finished_tasks) / count_all_records) * 100.0, 2)
194+
195+
all_tasks = get_all_etl_tasks(integration_id)
196+
finished_tasks = [task for task in all_tasks if task.state in FINISHED_STATES]
197+
count_finished_tasks = len(finished_tasks)
198+
199+
# backward compatibility
200+
if not all_tasks or len(all_tasks) != count_all_records:
201+
all_records, _ = integration_records_bo.get_all_by_integration_id(
202+
integration_id
203+
)
204+
count_finished_tasks += len(
205+
[record for record in all_records if not record.etl_task_id]
206+
)
207+
208+
integration_progress = round((count_finished_tasks / count_all_records) * 100.0, 2)
197209
if integration.state not in FINISHED_STATES:
198210
integration_progress = min(integration_progress - 1, 0)
199211
return integration_progress

cognition_objects/markdown_dataset.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def __get_enriched_query(
3434
category_origin: Optional[str] = None,
3535
query_add: Optional[str] = "",
3636
) -> str:
37-
where_add = " AND (ed.config_ids->>'isDefault')::bool is true"
37+
where_add = ""
3838
if id:
3939
id = prevent_sql_injection(id, isinstance(id, str))
4040
where_add += f" AND md.id = '{id}'"
@@ -46,7 +46,8 @@ def __get_enriched_query(
4646
md.*,
4747
COALESCE(mf.num_files, 0) AS num_files,
4848
COALESCE(mf.num_reviewed_files, 0) AS num_reviewed_files,
49-
ecp.etl_config
49+
ecp.etl_config,
50+
ecp.id as etl_config_id
5051
FROM cognition.{Tablenames.MARKDOWN_DATASET.value} md
5152
LEFT JOIN (
5253
SELECT dataset_id, COUNT(*) as num_files, COUNT(CASE WHEN is_reviewed = TRUE THEN 1 END) AS num_reviewed_files
@@ -56,7 +57,7 @@ def __get_enriched_query(
5657
LEFT JOIN(
5758
SELECT md.id, json_array_elements(md.useable_etl_configurations) config_ids
5859
FROM cognition.{Tablenames.MARKDOWN_DATASET.value} md
59-
) ed ON ed.id = md.id
60+
) ed ON ed.id = md.id AND (ed.config_ids->>'isDefault')::bool is true
6061
LEFT JOIN(
6162
SELECT ecp.id, ecp.etl_config
6263
FROM cognition.{Tablenames.ETL_CONFIG_PRESET.value} ecp
@@ -177,6 +178,7 @@ def update(
177178
dataset_id: str,
178179
name: Optional[str] = None,
179180
description: Optional[str] = None,
181+
useable_etl_configurations: Optional[List[Dict[str, Any]]] = None,
180182
with_commit: bool = True,
181183
) -> CognitionMarkdownDataset:
182184
dataset = get(org_id, dataset_id)
@@ -187,6 +189,9 @@ def update(
187189
if description:
188190
dataset.description = description
189191

192+
if useable_etl_configurations:
193+
dataset.useable_etl_configurations = useable_etl_configurations
194+
190195
general.flush_or_commit(with_commit)
191196

192197
return dataset

cognition_objects/markdown_file.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,20 @@ def __get_enriched_query(
9393
"etl_task",
9494
"global",
9595
prefix=et_prefix,
96-
include_columns=["is_active", "error_message"],
96+
include_columns=[
97+
"started_at",
98+
"finished_at",
99+
"is_active",
100+
"is_stale",
101+
"llm_ops",
102+
"error_message",
103+
],
97104
)
98105

99106
query = f"""SELECT
100-
{mf_select}, {et_select}, LENGTH({mf_prefix}.content) as content_length,
101-
COALESCE({et_prefix}.state, {mf_prefix}.state) state,
102-
COALESCE({et_prefix}.started_at, {mf_prefix}.started_at) started_at,
103-
COALESCE({et_prefix}.finished_at, {mf_prefix}.finished_at) finished_at
107+
{mf_select}, {et_select}, LENGTH({mf_prefix}.content) AS content_length,
108+
COALESCE({et_prefix}.state, {mf_prefix}.state) AS state,
109+
{et_prefix}.meta_data->>'scope_readable' AS scope_readable
104110
FROM cognition.markdown_file {mf_prefix}
105111
LEFT JOIN global.etl_task {et_prefix} ON {mf_prefix}.etl_task_id = {et_prefix}.id
106112
"""

etl_utils.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -301,17 +301,14 @@ def rm_tree(path: Path):
301301
if item.is_dir():
302302
rm_tree(item)
303303
else:
304-
item.unlink()
304+
item.unlink(missing_ok=True)
305305
path.rmdir()
306306

307307
etl_cache_dir = ETL_DIR / org_id / download_id
308308
if etl_cache_dir.exists() and etl_cache_dir.is_dir():
309309
rm_tree(etl_cache_dir)
310310

311311

312-
# TODO: delete_etl_tasks for related file_reference_id
313-
314-
315312
def get_download_key(org_id: str, download_id: str) -> Path:
316313
return Path(org_id) / download_id / "download"
317314

0 commit comments

Comments
 (0)