From 5331d3d34254e4fea85767fc9211d25ca7df9f48 Mon Sep 17 00:00:00 2001 From: Sam Minot Date: Fri, 17 Apr 2026 16:08:30 -0500 Subject: [PATCH 1/6] Move over debug workflow branch (#202) --- cirro/cli/__init__.py | 3 +- cirro/cli/cli.py | 18 +- cirro/cli/controller.py | 403 +++++++++++++++++++++- cirro/cli/models.py | 14 + cirro/models/file.py | 13 + cirro/sdk/dataset.py | 156 ++++++++- cirro/sdk/nextflow_utils.py | 67 ++++ cirro/sdk/task.py | 566 +++++++++++++++++++++++++++++++ cirro/services/execution.py | 4 + cirro/services/file.py | 23 ++ tests/test_dataset_tasks.py | 159 +++++++++ tests/test_nextflow_utils.py | 143 ++++++++ tests/test_preprocess.py | 1 + tests/test_process_definition.py | 5 + tests/test_task.py | 380 +++++++++++++++++++++ 15 files changed, 1946 insertions(+), 9 deletions(-) create mode 100644 cirro/sdk/nextflow_utils.py create mode 100644 cirro/sdk/task.py create mode 100644 tests/test_dataset_tasks.py create mode 100644 tests/test_nextflow_utils.py create mode 100644 tests/test_task.py diff --git a/cirro/cli/__init__.py b/cirro/cli/__init__.py index 28f0cadd..6348d1f6 100644 --- a/cirro/cli/__init__.py +++ b/cirro/cli/__init__.py @@ -1,6 +1,6 @@ from cirro.cli.controller import run_ingest, run_download, run_configure, run_list_datasets from cirro.cli.controller import run_create_pipeline_config, run_validate_folder -from cirro.cli.controller import run_list_projects, run_list_files +from cirro.cli.controller import run_list_projects, run_list_files, run_debug __all__ = [ 'run_ingest', @@ -11,4 +11,5 @@ 'run_validate_folder', 'run_list_projects', 'run_list_files', + 'run_debug', ] diff --git a/cirro/cli/cli.py b/cirro/cli/cli.py index f6bf6c66..d612b593 100644 --- a/cirro/cli/cli.py +++ b/cirro/cli/cli.py @@ -6,7 +6,7 @@ from cirro.cli import run_create_pipeline_config, run_validate_folder from cirro.cli import run_ingest, run_download, run_configure, run_list_datasets -from cirro.cli.controller import handle_error, run_upload_reference, run_list_projects, run_list_files +from cirro.cli.controller import handle_error, run_upload_reference, run_list_projects, run_list_files, run_debug from cirro.cli.interactive.utils import InputError @@ -142,6 +142,22 @@ def upload_reference(**kwargs): run_upload_reference(kwargs, interactive=kwargs.get('interactive')) +# no_args_is_help=False: running 'cirro debug' with no arguments enters interactive mode +@run.command(help='Debug a failed workflow execution', no_args_is_help=False) +@click.option('--project', + help='Name or ID of the project', + default=None) +@click.option('--dataset', + help='Name or ID of the dataset', + default=None) +@click.option('-i', '--interactive', + help='Gather arguments interactively', + is_flag=True, default=False) +def debug(**kwargs): + check_required_args(kwargs) + run_debug(kwargs, interactive=kwargs.get('interactive')) + + @run.command(help='Configure authentication') def configure(): run_configure() diff --git a/cirro/cli/controller.py b/cirro/cli/controller.py index ab056114..e80ac9aa 100644 --- a/cirro/cli/controller.py +++ b/cirro/cli/controller.py @@ -3,25 +3,31 @@ import os import sys from pathlib import Path +from typing import List, Optional, Set +from cirro.sdk.task import DataPortalTask from cirro_api_client.v1.models import UploadDatasetRequest, Status, Executor from cirro.cirro_client import CirroApi from cirro.cli.interactive.auth_args import gather_auth_config +from cirro.cli.interactive.common_args import ask_project from cirro.cli.interactive.create_pipeline_config import gather_create_pipeline_config_arguments -from cirro.cli.interactive.download_args import gather_download_arguments, ask_dataset_files -from cirro.cli.interactive.download_args import gather_download_arguments_dataset +from cirro.cli.interactive.download_args import gather_download_arguments, ask_dataset_files, \ + ask_dataset, gather_download_arguments_dataset from cirro.cli.interactive.list_dataset_args import gather_list_arguments from cirro.cli.interactive.upload_args import gather_upload_arguments from cirro.cli.interactive.upload_reference_args import gather_reference_upload_arguments -from cirro.cli.interactive.utils import get_id_from_name, get_item_from_name_or_id, InputError, validate_files +from cirro.cli.interactive.utils import get_id_from_name, get_item_from_name_or_id, InputError, \ + validate_files, ask_yes_no, ask from cirro.cli.interactive.validate_args import gather_validate_arguments, gather_validate_arguments_dataset from cirro.cli.models import ListArguments, UploadArguments, DownloadArguments, CreatePipelineConfigArguments, \ - UploadReferenceArguments, ValidateArguments, ListFilesArguments + UploadReferenceArguments, ValidateArguments, ListFilesArguments, DebugArguments from cirro.config import UserConfig, save_user_config, load_user_config from cirro.file_utils import get_files_in_directory from cirro.models.process import PipelineDefinition, ConfigAppStatus, CONFIG_APP_URL +from cirro.sdk.dataset import DataPortalDataset from cirro.services.service_helpers import list_all_datasets +from cirro.utils import convert_size # Log to STDOUT log_formatter = logging.Formatter( @@ -324,6 +330,395 @@ def run_create_pipeline_config(input_params: CreatePipelineConfigArguments, inte f"{CONFIG_APP_URL}") +def run_debug(input_params: DebugArguments, interactive=False): + """ + Debug a failed workflow execution. + + Displays the execution log, identifies the primary failed task, and + shows its logs, inputs, and outputs. In interactive mode the user can + drill into the input chain to trace back the root cause. + """ + cirro = _init_cirro_client() + projects = _get_projects(cirro) + + if interactive: + project_name = ask_project(projects, input_params.get('project')) + input_params['project'] = get_id_from_name(projects, project_name) + datasets = list_all_datasets(project_id=input_params['project'], client=cirro) + datasets = [d for d in datasets if d.status != Status.RUNNING] + input_params['dataset'] = ask_dataset(datasets, input_params.get('dataset'), msg_action='debug') + else: + input_params['project'] = get_id_from_name(projects, input_params['project']) + datasets = cirro.datasets.list(input_params['project']) + input_params['dataset'] = get_id_from_name(datasets, input_params['dataset']) + dataset_obj = get_item_from_name_or_id(datasets, input_params['dataset']) + if dataset_obj and dataset_obj.status == Status.RUNNING: + raise InputError( + f"Dataset '{dataset_obj.name}' ({dataset_obj.id}) is currently RUNNING. " + "The debug command is only available for completed or failed datasets." + ) + + project_id = input_params['project'] + dataset_id = input_params['dataset'] + + dataset_detail = cirro.datasets.get(project_id=project_id, dataset_id=dataset_id) + sdk_dataset = DataPortalDataset(dataset=dataset_detail, client=cirro) + + # --- Execution log --- + execution_log = sdk_dataset.logs + log_lines = execution_log.splitlines() + + print("\n=== Execution Log (last 50 lines) ===") + print('\n'.join(log_lines[-50:])) + + # Only search for a failed task when the dataset actually failed. + if sdk_dataset.status != Status.FAILED: + if interactive: + if log_lines and ask_yes_no('Show full execution log?'): + print(execution_log) + return + + # --- Primary failed task --- + try: + if interactive: + print("\nSearching for the primary failed task (this may take a moment)...") + failed_task = sdk_dataset.primary_failed_task + except Exception as e: + print(f"\nCould not load task trace: {e}") + if interactive and log_lines and ask_yes_no('Show full execution log?'): + print(execution_log) + return + + if interactive: + if failed_task is None: + print("\nNo failed tasks found in this execution.") + if log_lines and ask_yes_no('Show full execution log?'): + print(execution_log) + return + + choices = [ + f"Show task info: {failed_task.name}", + "Show full execution log", + _DONE, + ] + while True: + choice = ask('select', 'Primary failed task found. What would you like to do?', choices=choices) + if choice.startswith("Show task info"): + _task_menu(failed_task, depth=0) + elif choice == "Show full execution log": + print(execution_log) + else: + break + else: + if failed_task is None: + print("\nNo failed tasks found in this execution.") + return + + _print_task_debug_recursive( + failed_task, + max_depth=input_params.get('max_depth'), + max_tasks=input_params.get('max_tasks'), + show_script=input_params.get('show_script', True), + show_log=input_params.get('show_log', True), + show_files=input_params.get('show_files', True), + ) + + +def _print_task_debug(task, depth: int = 0, + show_script: bool = True, + show_log: bool = True, + show_files: bool = True) -> None: + """Print all debug info for one task, indented according to its depth in the input chain.""" + indent = " " * depth + label = "Primary Failed Task" if depth == 0 else f"Source Task [depth {depth}]" + _print_task_header(task, indent, label) + + if show_script: + task_script = task.script + print(f"\n{indent}--- Task Script ---") + print('\n'.join(indent + line for line in (task_script or "(empty)").splitlines())) + + if show_log: + task_log = task.logs + print(f"\n{indent}--- Task Log ---") + print('\n'.join(indent + line for line in (task_log or "(empty)").splitlines())) + + if show_files: + inputs = task.inputs + print(f"\n{indent}--- Inputs ({len(inputs)}) ---") + for f in inputs: + source = f"from task: {f.source_task.name}" if f.source_task else "staged input" + try: + size_str = convert_size(f.size) + except Exception: + size_str = "unknown size" + print(f"{indent} {f.name} ({size_str}) [{source}]") + + outputs = task.outputs + print(f"\n{indent}--- Outputs ({len(outputs)}) ---") + for f in outputs: + try: + size_str = convert_size(f.size) + except Exception: + size_str = "unknown size" + print(f"{indent} {f.name} ({size_str})") + + +def _print_task_debug_recursive( + task, + max_depth: Optional[int], + max_tasks: Optional[int], + show_script: bool = True, + show_log: bool = True, + show_files: bool = True, + _depth: int = 0, + _seen: Optional[Set[str]] = None, + _counter: Optional[List[int]] = None +) -> None: + """ + Print debug info for a task and then recurse into the tasks that created + each of its input files. + + Deduplicates tasks (a task that produced multiple inputs is only printed + once). Stops early when ``max_depth`` or ``max_tasks`` is reached and + prints a notice so the user knows output was capped. + """ + if _seen is None: + _seen = set() + if _counter is None: + _counter = [0] + + if task.name in _seen: + return + + if max_tasks is not None and _counter[0] >= max_tasks: + indent = " " * _depth + print(f"\n{indent}[max-tasks limit reached — stopping recursion]") + return + + _seen.add(task.name) + _counter[0] += 1 + + _print_task_debug(task, depth=_depth, + show_script=show_script, + show_log=show_log, + show_files=show_files) + + if max_depth is not None and _depth >= max_depth: + source_tasks = [ + f.source_task for f in task.inputs + if f.source_task and f.source_task.name not in _seen + ] + if source_tasks: + indent = " " * (_depth + 1) + names = ', '.join(t.name for t in source_tasks) + print(f"\n{indent}[max-depth limit reached — not expanding: {names}]") + return + + for f in task.inputs: + if f.source_task and f.source_task.name not in _seen: + _print_task_debug_recursive( + f.source_task, max_depth, max_tasks, + show_script=show_script, + show_log=show_log, + show_files=show_files, + _depth=_depth + 1, _seen=_seen, _counter=_counter + ) + + +_BACK = "Back" +_DONE = "Done" +# Binary formats that cannot be meaningfully displayed as text +_BINARY_EXTENSIONS = {'.bam', '.cram', '.bai', '.crai', '.bcf', '.idx'} + + +def _print_task_header(task: DataPortalTask, indent: str, label: str) -> None: + print(f"\n{indent}=== {label} ===") + print(f"{indent}Name: {task.name}") + print(f"{indent}Status: {task.status}") + print(f"{indent}Exit Code: {task.exit_code}") + print(f"{indent}Hash: {task.hash}") + print(f"{indent}Work Dir: {task.work_dir}") + + +def _task_menu(task: DataPortalTask, depth: int = 0) -> None: + """ + Menu-driven exploration of a single task. + + The user can show the script/log, browse inputs and outputs, and drill + into any source task that produced an input file. The menu loops until + the user selects Back / Done. + """ + indent = " " * depth + label = "Primary Failed Task" if depth == 0 else "Source Task" + _print_task_header(task, indent, label) + + inputs = task.inputs + outputs = task.outputs + + while True: + choices = [ + "Show task script", + "Show task log", + f"Browse inputs ({len(inputs)})", + f"Browse outputs ({len(outputs)})", + _DONE if depth == 0 else _BACK, + ] + choice = ask('select', 'What would you like to do?', choices=choices) + + if choice == "Show task script": + content = task.script + print(f"\n{indent}--- Task Script ---") + print(content if content else "(empty)") + + elif choice == "Show task log": + content = task.logs + print(f"\n{indent}--- Task Log ---") + print(content if content else "(empty)") + + elif choice.startswith("Browse inputs"): + _browse_files_menu(inputs, "input", depth) + + elif choice.startswith("Browse outputs"): + _browse_files_menu(outputs, "output", depth) + + else: # Done / Back + break + + +def _browse_files_menu(files, kind: str, depth: int) -> None: + """ + Let the user pick a file from a list, then enter its file menu. + + ``kind`` is ``'input'`` or ``'output'``, used only for the prompt label. + When there is only one file the selection step is skipped and the file + menu opens immediately. + """ + indent = " " * depth + if not files: + print(f"\n{indent}No {kind} files available.") + return + + if len(files) == 1: + _file_menu(files[0], depth) + return + + # Build display labels — disambiguate duplicates by appending a counter + seen: dict = {} + labels = [] + for f in files: + seen[f.name] = seen.get(f.name, 0) + 1 + counts: dict = {} + for f in files: + if seen[f.name] > 1: + counts[f.name] = counts.get(f.name, 0) + 1 + label = f"{f.name} [{counts[f.name]}]" + else: + label = f.name + source = f"from task: {f.source_task.name}" if f.source_task else "staged input" + try: + size_str = convert_size(f.size) + except Exception: + size_str = "unknown size" + labels.append(f"{label} ({size_str}) [{source}]") + + choices = labels + [_BACK] + + while True: + choice = ask('select', f'Select a {kind} file to inspect', choices=choices) + if choice == _BACK: + break + + idx = labels.index(choice) + _file_menu(files[idx], depth) + + +def _file_read_options(name: str): + """Return the list of read-action strings appropriate for a given filename.""" + lower = name.lower() + # Strip compression suffix to check underlying type + for ext in ('.gz', '.bz2', '.zst'): + if lower.endswith(ext): + lower = lower[:-len(ext)] + break + + suffix = Path(lower).suffix + + if suffix in _BINARY_EXTENSIONS: + return [] # no readable options for binary formats + + options = [] + if suffix in ('.csv', '.tsv'): + options.append("Read as CSV (first 10 rows)") + if suffix == '.json': + options.append("Read as JSON") + options.append("Read as text (first 100 lines)") + return options + + +def _file_menu(wf, depth: int) -> None: + """Menu for inspecting a single WorkDirFile: read contents or drill into source task.""" + indent = " " * depth + source = f"from task: {wf.source_task.name}" if wf.source_task else "staged input" + try: + size_str = convert_size(wf.size) + except Exception: + size_str = "unknown size" + print(f"\n{indent}File: {wf.name} ({size_str}) [{source}]") + + read_options = _file_read_options(wf.name) + if not read_options and not wf.source_task: + print(f"{indent}(binary file — no readable options)") + return + + choices = list(read_options) + if wf.source_task: + choices.append(f"Drill into source task: {wf.source_task.name}") + choices.append(_BACK) + + while True: + choice = ask('select', f'What would you like to do with {wf.name!r}?', + choices=choices) + + if choice == _BACK: + break + + elif choice.startswith("Read as CSV"): + try: + df = wf.read_csv() + print(df.head(10).to_string()) + except Exception as e: + print(f"Could not read as CSV: {e}") + + elif choice.startswith("Read as JSON"): + try: + data = wf.read_json() + output = json.dumps(data, indent=2) + # Cap output at ~200 lines so the terminal isn't flooded + lines = output.splitlines() + if len(lines) > 200: + print('\n'.join(lines[:200])) + print(f"... ({len(lines) - 200} more lines)") + else: + print(output) + except Exception as e: + print(f"Could not read as JSON: {e}") + + elif choice.startswith("Read as text"): + try: + lines = wf.readlines() + if len(lines) > 100: + print('\n'.join(lines[:100])) + print(f"... ({len(lines) - 100} more lines)") + else: + print('\n'.join(lines)) + except Exception as e: + print(f"Could not read as text: {e}") + + elif choice.startswith("Drill into source task"): + _task_menu(wf.source_task, depth=depth + 1) + + def _init_cirro_client(): _check_configure() cirro = CirroApi(user_agent="Cirro CLI") diff --git a/cirro/cli/models.py b/cirro/cli/models.py index 1acc4690..a162f415 100644 --- a/cirro/cli/models.py +++ b/cirro/cli/models.py @@ -54,3 +54,17 @@ class ListFilesArguments(TypedDict): dataset: str interactive: bool file_limit: int + + +class _DebugArgumentsBase(TypedDict): + project: str + dataset: str + interactive: bool + + +class DebugArguments(_DebugArgumentsBase, total=False): + max_depth: Optional[int] + max_tasks: Optional[int] + show_script: bool + show_log: bool + show_files: bool diff --git a/cirro/models/file.py b/cirro/models/file.py index cdd2f12e..fb0d18b1 100644 --- a/cirro/models/file.py +++ b/cirro/models/file.py @@ -68,6 +68,19 @@ def upload_dataset(cls, project_id: str, dataset_id: str, base_url: str, token_l project_id=project_id ) + @classmethod + def scratch_download(cls, project_id: str, dataset_id: str, base_url: str, token_lifetime_override: int = None): + """Create an access context for reading files from the Nextflow scratch bucket.""" + return cls( + file_access_request=ProjectFileAccessRequest( + access_type=ProjectAccessType.READ_SCRATCH, + dataset_id=dataset_id, + token_lifetime_hours=token_lifetime_override + ), + base_url=base_url, + project_id=project_id + ) + @classmethod def upload_reference(cls, project_id: str, base_url: str): return cls( diff --git a/cirro/sdk/dataset.py b/cirro/sdk/dataset.py index 34aea03a..1084970b 100644 --- a/cirro/sdk/dataset.py +++ b/cirro/sdk/dataset.py @@ -1,11 +1,17 @@ +import csv import datetime +from functools import cached_property import re +from io import StringIO from pathlib import Path -from typing import Union, List, Optional, Any +from typing import Union, List, Optional, Any, TYPE_CHECKING + +if TYPE_CHECKING: + from cirro.sdk.task import DataPortalTask from cirro_api_client.v1.api.processes import validate_file_requirements -from cirro_api_client.v1.models import Dataset, DatasetDetail, RunAnalysisRequest, ProcessDetail, Status, \ - RunAnalysisRequestParams, Tag, ArtifactType, NamedItem, ValidateFileRequirementsRequest +from cirro_api_client.v1.models import Dataset, DatasetDetail, Executor, RunAnalysisRequest, ProcessDetail, \ + Status, RunAnalysisRequestParams, Tag, ArtifactType, NamedItem, ValidateFileRequirementsRequest from cirro.cirro_client import CirroApi from cirro.file_utils import bytes_to_human_readable, filter_files_by_pattern @@ -161,6 +167,14 @@ def process(self) -> ProcessDetail: """ return self._client.processes.get(self.process_id) + @cached_property + def executor(self) -> Executor: + """ + Executor type for the process that created this dataset + (e.g. ``Executor.NEXTFLOW``, ``Executor.CROMWELL``). + """ + return self.process.executor + @property def project_id(self) -> str: """ID of the project containing the dataset""" @@ -241,6 +255,142 @@ def created_at(self) -> datetime.datetime: """Timestamp of dataset creation""" return self._data.created_at + @cached_property + def logs(self) -> str: + """ + Return the top-level Nextflow execution log for this dataset. + + Fetches the log from CloudWatch via the Cirro API. Returns an empty + string if no log events are available (e.g. the job has not started + yet, or the dataset was not created by a Nextflow workflow). + + Returns: + str: Execution log text, or an empty string if unavailable. + """ + try: + return self._client.execution.get_execution_logs( + project_id=self.project_id, + dataset_id=self.id + ) + except Exception: + return '' + + @cached_property + def tasks(self) -> List['DataPortalTask']: + """ + List of tasks from the workflow execution. + + Task metadata and the parsing logic depend on the executor: + + - **Nextflow**: read from the ``WORKFLOW_TRACE`` TSV artifact. + - **Cromwell**: not yet implemented (raises ``NotImplementedError``). + + Input and output files for each task are fetched from S3 on demand. + + Returns: + `List[DataPortalTask]` + + Raises: + DataPortalInputError: If the required trace artifact is missing. + NotImplementedError: If task inspection is not yet implemented for + this executor. + """ + return self._load_tasks() + + def _load_tasks(self) -> List['DataPortalTask']: + """Dispatch task loading to the executor-specific implementation.""" + if self.executor == Executor.NEXTFLOW: + return self._load_tasks_nextflow() + elif self.executor == Executor.CROMWELL: + return self._load_tasks_cromwell() + else: + raise DataPortalInputError( + f"Task inspection is not supported for executor '{self.executor}'" + ) + + def _load_tasks_nextflow(self) -> List['DataPortalTask']: + """Load tasks from the Nextflow WORKFLOW_TRACE TSV artifact.""" + from cirro.sdk.task import DataPortalTask + + try: + trace_file = self.get_artifact(ArtifactType.WORKFLOW_TRACE) + except DataPortalAssetNotFound: + raise DataPortalInputError( + "WORKFLOW_TRACE artifact not found for this Nextflow dataset" + ) + + try: + content = trace_file.read() + except Exception as e: + raise DataPortalInputError( + f"Could not read the workflow trace artifact: {e}" + ) from e + + if not content.strip(): + return [] + + reader = csv.DictReader(StringIO(content), delimiter='\t') + + # Build all tasks with a shared reference list so each task can look up + # sibling tasks when resolving input source_task links. + all_tasks_ref: List = [] + tasks = [] + for row in reader: + task = DataPortalTask( + trace_row=row, + client=self._client, + project_id=self.project_id, + dataset_id=self.id, + all_tasks_ref=all_tasks_ref + ) + tasks.append(task) + + # Populate the shared list after all tasks are constructed so that + # lazy input resolution can see the complete set. + all_tasks_ref.extend(tasks) + return tasks + + def _load_tasks_cromwell(self) -> List['DataPortalTask']: + """Load tasks for a Cromwell workflow execution (not yet implemented).""" + raise NotImplementedError( + "Task inspection for Cromwell workflows is not yet implemented" + ) + + @property + def primary_failed_task(self) -> Optional['DataPortalTask']: + """ + Find the root-cause failed task in this workflow execution. + + Returns ``None`` gracefully in all non-error situations: + + - The executor is not Nextflow (currently only implemented for Nextflow). + - The dataset has no task trace (still queued or just started). + - The trace is empty (no tasks ran). + - No tasks have a ``FAILED`` status (the workflow succeeded or was + stopped before any task actually failed). + + The executor check is performed first to avoid an unnecessary API call + when the executor does not support task inspection. + + Returns: + `cirro.sdk.task.DataPortalTask`, or ``None`` if no failed task is found. + """ + from cirro.sdk.nextflow_utils import find_primary_failed_task + + if self.executor != Executor.NEXTFLOW: + return None + + try: + tasks = self.tasks + except (DataPortalInputError, NotImplementedError): + return None + + if not tasks: + return None + + execution_log = self.logs + return find_primary_failed_task(tasks, execution_log) + def _get_detail(self): if not isinstance(self._data, DatasetDetail): self._data = self._client.datasets.get(project_id=self.project_id, dataset_id=self.id) diff --git a/cirro/sdk/nextflow_utils.py b/cirro/sdk/nextflow_utils.py new file mode 100644 index 00000000..668d08df --- /dev/null +++ b/cirro/sdk/nextflow_utils.py @@ -0,0 +1,67 @@ +import re +from typing import List, Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from cirro.sdk.task import DataPortalTask + + +def parse_inputs_from_command_run(content: str) -> List[str]: + """ + Parse S3 source URIs from a Nextflow .command.run staging block. + + Nextflow stages inputs with lines like: + aws s3 cp --only-show-errors s3://bucket/path/file.bam ./file.bam + or without flags: + aws s3 cp s3://bucket/path/file.bam ./file.bam + + Returns the list of S3 URIs found. + """ + return re.findall(r'aws s3 cp(?:\s+--\S+)*\s+(s3://\S+)\s+\S', content) + + +def find_primary_failed_task( + tasks: List['DataPortalTask'], + execution_log: str +) -> Optional['DataPortalTask']: + """ + Identify the root-cause failed task in a Nextflow workflow execution. + + Strategy: + 1. Filter tasks where status == "FAILED" and exit_code is not None and != 0. + 2. If none, fall back to any task with status == "FAILED". + 3. Parse execution_log for "Error executing process > 'TASK_NAME'" to cross-reference + the task list (exact match first, then substring match). + 4. Fall back to the FAILED task with the lowest task_id (ran earliest). + + Returns None if no failed task is found. + """ + # Step 1: tasks that actually failed with a non-zero exit code + hard_failed = [ + t for t in tasks + if t.status == "FAILED" and t.exit_code is not None and t.exit_code != 0 + ] + + # Step 2: fall back to any FAILED task if the above is empty + candidate_pool = hard_failed if hard_failed else [t for t in tasks if t.status == "FAILED"] + + if not candidate_pool: + return None + + if len(candidate_pool) == 1: + return candidate_pool[0] + + # Step 3: try to cross-reference the execution log + log_match = re.search(r"Error executing process > '([^']+)'", execution_log) + if log_match: + log_task_name = log_match.group(1) + # Exact match first + for task in candidate_pool: + if task.name == log_task_name: + return task + # Partial match + for task in candidate_pool: + if log_task_name in task.name or task.name in log_task_name: + return task + + # Step 4: fall back to earliest failing task + return min(candidate_pool, key=lambda t: t.task_id) diff --git a/cirro/sdk/task.py b/cirro/sdk/task.py new file mode 100644 index 00000000..c409d77c --- /dev/null +++ b/cirro/sdk/task.py @@ -0,0 +1,566 @@ +import csv +from functools import cached_property +import gzip +import json +from io import BytesIO, StringIO +from pathlib import PurePath +import re +from typing import Any, List, Optional, TYPE_CHECKING + +from cirro.models.file import FileAccessContext +from cirro.models.s3_path import S3Path +from cirro.sdk.exceptions import DataPortalAssetNotFound +from cirro.sdk.nextflow_utils import parse_inputs_from_command_run + +if TYPE_CHECKING: + from cirro.cirro_client import CirroApi + from pandas import DataFrame + + +class WorkDirFile: + """ + A file that lives in a Nextflow work directory or a dataset staging area. + + Each WorkDirFile either originated from another task's work directory + (``source_task`` is set) or was a primary/staged input to the workflow + (``source_task`` is ``None``). + """ + + def __init__( + self, + s3_uri: str, + client: 'CirroApi', + project_id: str, + size: Optional[int] = None, + source_task: Optional['DataPortalTask'] = None, + dataset_id: str = '' + ): + """ + Obtained from a task's ``inputs`` or ``outputs`` property. + + ```python + for task in dataset.tasks: + for f in task.inputs: + print(f.name, f.source_task) + ``` + """ + self._s3_uri = s3_uri + self._client = client + self._project_id = project_id + self._dataset_id = dataset_id + self._size = size + self._source_task = source_task + self._s3_path = S3Path(s3_uri) + + @property + def source_task(self) -> Optional['DataPortalTask']: + """The task that produced this file, or ``None`` for staged/primary inputs.""" + return self._source_task + + @property + def name(self) -> str: + """Filename (last component of the S3 URI).""" + return PurePath(self._s3_uri).name + + @property + def size(self) -> int: + """File size in bytes (fetched lazily via head_object if not pre-populated).""" + if self._size is None: + try: + s3 = self._get_s3_client() + resp = s3.head_object(Bucket=self._s3_path.bucket, Key=self._s3_path.key) + self._size = resp['ContentLength'] + except Exception as e: + raise DataPortalAssetNotFound( + f"Could not determine size of {self.name!r} — " + f"the work directory may have been cleaned up: {e}" + ) from e + return self._size + + def _access_context(self) -> FileAccessContext: + """Return the appropriate FileAccessContext for this file's location.""" + if self._dataset_id: + return FileAccessContext.scratch_download( + project_id=self._project_id, + dataset_id=self._dataset_id, + base_url=self._s3_path.base + ) + return FileAccessContext.download( + project_id=self._project_id, + base_url=self._s3_path.base + ) + + def _get(self) -> bytes: + """Return the raw bytes of the file.""" + try: + return self._client.file.get_file_from_path(self._access_context(), self._s3_path.key) + except Exception as e: + raise DataPortalAssetNotFound( + f"Could not read {self.name!r} — " + f"the work directory may have been cleaned up: {e}" + ) from e + + def read(self, encoding: str = 'utf-8', compression: Optional[str] = None) -> str: + """ + Read the file contents as text. + + Args: + encoding (str): Character encoding (default 'utf-8'). + compression (str): ``'gzip'`` to decompress on the fly, or ``None`` + (default) to read as-is. + """ + raw = self._get() + if compression is None: + return raw.decode(encoding, errors='replace') + if compression == 'gzip': + with gzip.open(BytesIO(raw), 'rt', encoding=encoding) as fh: + return fh.read() + raise ValueError(f"Unsupported compression: {compression!r} (use 'gzip' or None)") + + def readlines(self, encoding: str = 'utf-8', compression: Optional[str] = None) -> List[str]: + """Read the file contents as a list of lines.""" + return self.read(encoding=encoding, compression=compression).splitlines() + + def read_json(self, encoding: str = 'utf-8') -> Any: + """ + Parse the file as JSON. + + Returns whatever the top-level JSON value is (dict, list, etc.). + """ + try: + return json.loads(self.read(encoding=encoding)) + except json.JSONDecodeError as e: + raise ValueError(f"Could not parse {self.name!r} as JSON: {e}") from e + + def read_csv(self, compression: str = 'infer', encoding: str = 'utf-8', + **kwargs) -> 'DataFrame': + """ + Parse the file as a Pandas DataFrame. + + The default separator is a comma; pass ``sep='\\t'`` for TSV files. + Compression is inferred from the file extension by default, but can be + overridden with ``compression='gzip'`` or ``compression=None``. + + All additional keyword arguments are forwarded to + ``pandas.read_csv``. + """ + try: + import pandas + except ImportError: + raise ImportError( + "pandas is required to read CSV files. " + "Install it with: pip install pandas" + ) + + if compression == 'infer': + name = self.name + if name.endswith('.gz'): + compression = dict(method='gzip') + elif name.endswith('.bz2'): + compression = dict(method='bz2') + elif name.endswith('.xz'): + compression = dict(method='xz') + elif name.endswith('.zst'): + compression = dict(method='zstd') + else: + compression = None + + raw = self._get() + if compression is not None: + handle = BytesIO(raw) + try: + return pandas.read_csv(handle, compression=compression, **kwargs) + finally: + handle.close() + else: + handle = StringIO(raw.decode(encoding)) + try: + return pandas.read_csv(handle, **kwargs) + finally: + handle.close() + + def _get_s3_client(self): + return self._client.file.get_aws_s3_client(self._access_context()) + + def __str__(self): + return self.name + + def __repr__(self): + return f'WorkDirFile(name={self.name!r})' + + +class DataPortalTask: + """ + Represents a single task from a Nextflow workflow execution. + + Task metadata (name, status, exit code, work directory, etc.) is read + from the workflow trace artifact. Log contents and input/output files are + fetched from the task's S3 work directory on demand. + """ + + def __init__( + self, + trace_row: dict, + client: 'CirroApi', + project_id: str, + dataset_id: str = '', + all_tasks_ref: Optional[list] = None + ): + """ + Obtained from a dataset's ``tasks`` property. + + ```python + for task in dataset.tasks: + print(task.name, task.status) + print(task.logs) + ``` + + Args: + trace_row (dict): A row from the Nextflow trace TSV, parsed as a dict. + client (CirroApi): Authenticated CirroApi client. + project_id (str): ID of the project that owns this dataset. + dataset_id (str): ID of the dataset (execution) that owns this task. + all_tasks_ref (list): A shared list that will contain all tasks once they + are all built. Used by ``inputs`` to resolve ``source_task``. + """ + self._trace = trace_row + self._client = client + self._project_id = project_id + self._dataset_id = dataset_id + self._all_tasks_ref: list = all_tasks_ref if all_tasks_ref is not None else [] + + # ------------------------------------------------------------------ # + # Trace-derived properties # + # ------------------------------------------------------------------ # + + @property + def task_id(self) -> int: + """Sequential task identifier from the trace.""" + try: + return int(self._trace.get('task_id', 0)) + except (ValueError, TypeError): + return 0 + + @property + def name(self) -> str: + """Full task name, e.g. ``NFCORE_RNASEQ:RNASEQ:TRIMGALORE (sample1)``.""" + return self._trace.get('name', '') + + @property + def status(self) -> str: + """Task status string from the trace, e.g. ``COMPLETED``, ``FAILED``, ``ABORTED``.""" + return self._trace.get('status', '') + + @property + def hash(self) -> str: + """Short hash prefix used by Nextflow, e.g. ``99/b42c07``.""" + return self._trace.get('hash', '') + + @property + def work_dir(self) -> str: + """Full S3 URI of the task's work directory.""" + return self._trace.get('workdir', '') + + @property + def native_id(self) -> str: + """Native job ID on the underlying executor (e.g. AWS Batch job ID).""" + return self._trace.get('native_id', '') + + @property + def exit_code(self) -> Optional[int]: + """Process exit code, or ``None`` if the task did not reach completion.""" + val = self._trace.get('exit', '') + if val in ('', None, '-'): + return None + try: + return int(val) + except (ValueError, TypeError): + return None + + # ------------------------------------------------------------------ # + # Work-directory file access # + # ------------------------------------------------------------------ # + + def _get_access_context(self) -> FileAccessContext: + if not self.work_dir: + raise DataPortalAssetNotFound( + f"Task {self.name!r} has no work directory recorded in the trace" + ) + s3_path = S3Path(self.work_dir) + if self._dataset_id: + return FileAccessContext.scratch_download( + project_id=self._project_id, + dataset_id=self._dataset_id, + base_url=s3_path.base + ) + return FileAccessContext.download( + project_id=self._project_id, + base_url=s3_path.base + ) + + def _read_work_file(self, filename: str) -> str: + """ + Read a file from the task's work directory. + + Returns an empty string if the work directory has been cleaned up or + the file does not exist. + """ + if not self.work_dir: + return '' + try: + s3_path = S3Path(self.work_dir) + key = f'{s3_path.key}/{filename}' + access_context = self._get_access_context() + return self._client.file.get_file_from_path( + access_context, key + ).decode('utf-8', errors='replace') + except Exception: + return '' + + @cached_property + def logs(self) -> str: + """ + Return the task log (combined stdout/stderr of the task process). + + Fetches via the Cirro execution API when a native job ID is available, + which works even when the S3 scratch bucket is not directly accessible. + Falls back to reading ``.command.log`` from the S3 work directory. + Returns an empty string if neither source can be read. + """ + if self._dataset_id and self.native_id: + try: + return self._client.execution.get_task_logs( + project_id=self._project_id, + dataset_id=self._dataset_id, + task_id=self.native_id + ) + except Exception: + pass + return self._read_work_file('.command.log') + + @cached_property + def script(self) -> str: + """ + Return the contents of ``.command.sh`` from the task's work directory. + + This is the actual shell script that Nextflow executed — the user's + pipeline code for this task. Falls back to parsing the script from the + ``WORKFLOW_LOGS`` artifact when the work directory is not accessible + (scratch bucket requires elevated permissions). + Returns an empty string if the script cannot be obtained. + """ + content = self._read_work_file('.command.sh') + if content: + return content + return self._script_from_workflow_log() + + def _script_from_workflow_log(self) -> str: + """ + Parse this task's shell script from the WORKFLOW_LOGS artifact. + + When a Nextflow task fails the head-node log includes a block: + + Error executing process > 'TASK_NAME' + ... + Command executed: +