From c570defbd29fb5fc5218db14cb805696dc87b1ea Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Thu, 11 Jun 2026 21:53:04 +0200 Subject: [PATCH] Revert "Fix upstream map index resolution after placeholder expansion (#59691)" This reverts commit a5ffa6c7949e07b13fd410256b99e77621d53b7b. --- .../src/airflow/models/taskinstance.py | 56 +-------- .../tests/unit/models/test_taskinstance.py | 109 ------------------ 2 files changed, 1 insertion(+), 164 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index aeff2695f1bf0..740596f9d69b6 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -2421,19 +2421,7 @@ def tg2(inp): # and "ti_count == ancestor_ti_count" does not work, since the further # expansion may be of length 1. if not _is_further_mapped_inside(relative, common_ancestor): - # During mapped task group expansion, upstream placeholder task instances - # (map_index = -1) may already have been replaced by their first expanded - # successor (map_index = 0) while downstream task instances are still - # unexpanded and continue resolving dependencies against the placeholder index. - resolved_map_index = ( - 0 - if _should_use_post_expansion_placeholder( - task=task, relative=relative, map_index=ancestor_map_index, run_id=run_id, session=session - ) - else ancestor_map_index - ) - - return resolved_map_index + return ancestor_map_index # Otherwise we need a partial aggregation for values from selected task # instances in the ancestor's expansion context. @@ -2507,48 +2495,6 @@ def _visit_relevant_relatives_for_mapped(mapped_tasks: Iterable[tuple[str, int]] return visited -def _should_use_post_expansion_placeholder( - *, - task: Operator, - relative: Operator, - map_index: int, - run_id: str, - session: Session, -) -> bool: - """ - Determine whether upstream dependency resolution should use map_index = 0. - - Returns True when the upstream placeholder task instance - (map_index = -1) has already been replaced by its post-expansion - successor (map_index = 0). - """ - if map_index != -1: - return False - - rows = session.execute( - select(TaskInstance.task_id, TaskInstance.map_index).where( - TaskInstance.dag_id == relative.dag_id, - TaskInstance.run_id == run_id, - TaskInstance.task_id.in_([task.task_id, relative.task_id]), - TaskInstance.map_index.in_([-1, 0]), - ) - ).all() - - task_to_map_indexes: dict[str, set[int]] = defaultdict(set) - for task_id, mi in rows: - task_to_map_indexes[task_id].add(mi) - - # We only rewrite when: - # 1) the current task is still using the placeholder (-1) - # 2) the upstream placeholder (-1) no longer exists - # 3) the post-expansion placeholder (0) does exist - return ( - -1 in task_to_map_indexes[task.task_id] - and -1 not in task_to_map_indexes[relative.task_id] - and 0 in task_to_map_indexes[relative.task_id] - ) - - class TaskInstanceNote(Base): """For storage of arbitrary notes concerning the task instance.""" diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 987768da12492..b9d8c85f61321 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -3523,115 +3523,6 @@ def g(v): assert result == expected -def test_downstream_placeholder_handles_upstream_post_expansion(dag_maker, session): - """ - Test dynamic task mapping behavior when an upstream placeholder task - (map_index = -1) has been replaced by the first expanded task - (map_index = 0). - - This verifies that downstream mapped dependency resolution: - - preserves placeholder behavior before upstream expansion - - correctly resolves the post-expansion transition - - preserves normal expanded task behavior afterwards - """ - - with dag_maker(session=session) as dag: - - @task - def get_mapping_source(): - return ["one", "two", "three"] - - @task - def mapped_task(x): - output = f"{x}" - return output - - @task_group(prefix_group_id=False) - def the_task_group(x): - start = MockOperator(task_id="start") - upstream = mapped_task(x) - - # Downstream task inside the task group that does not directly - # consume the expand input, but is still mapped via the mapped - # task group context. - downstream = MockOperator(task_id="downstream") - - start >> upstream >> downstream - - mapping_source = get_mapping_source() - mapped_tg = the_task_group.expand(x=mapping_source) - - mapping_source >> mapped_tg - - # Create DAG run and execute prerequisites. - dr = dag_maker.create_dagrun() - - dag_maker.run_ti("get_mapping_source", map_index=-1, dag_run=dr, session=session) - - upstream_task = dag.get_task("mapped_task") - downstream_task = dag.get_task("downstream") - - # Before upstream expansion occurs, mapped dependency resolution - # should retain the existing placeholder semantics since no concrete - # upstream/downstream map index pairing exists yet. - downstream_ti = dr.get_task_instance(task_id="downstream", map_index=-1, session=session) - downstream_ti.refresh_from_task(downstream_task) - - result = downstream_ti.get_relevant_upstream_map_indexes( - upstream=upstream_task, - ti_count=1, - session=session, - ) - - assert result == -3 - - # Force expansion of the upstream mapped task. - upstream_task = dag.get_task("mapped_task") - _, max_index = TaskMap.expand_mapped_task( - upstream_task, - dr.run_id, - session=session, - ) - upstream_expanded_ti_count = max_index + 1 - - downstream_task = dag.get_task("downstream") - - # Grab the downstream placeholder TI. - downstream_ti = dr.get_task_instance(task_id="downstream", map_index=-1, session=session) - downstream_ti.refresh_from_task(downstream_task) - - result = downstream_ti.get_relevant_upstream_map_indexes( - upstream=upstream_task, - ti_count=upstream_expanded_ti_count, - session=session, - ) - - assert result == 0 - - # Now do the same for downstream expanded (map_index = 0) to ensure existing behavior is not broken. - # Force expansion of the downstream mapped task. - _, max_index = TaskMap.expand_mapped_task( - downstream_task, - dr.run_id, - session=session, - ) - downstream_expanded_ti_count = max_index + 1 - - # Grab the first expanded downstream task instance (map_index = 0). - downstream_ti = dr.get_task_instance(task_id="downstream", map_index=0, session=session) - downstream_ti.refresh_from_task(downstream_task) - - result = downstream_ti.get_relevant_upstream_map_indexes( - upstream=upstream_task, - ti_count=downstream_expanded_ti_count, - session=session, - ) - - # Verify behavior remains unchanged once the downstream task - # itself has expanded. - assert result == 0 - - def test_find_relevant_relatives_with_non_mapped_task_as_tuple(dag_maker, session): """Test that specifying a non-mapped task as a tuple doesn't raise NotMapped exception.""" # t1 -> t2 (non-mapped) -> t3