@@ -1666,6 +1666,7 @@ def plan_builder(
16661666 # This ensures that no models outside the impacted sub-DAG(s) will be backfilled unexpectedly.
16671667 backfill_models = modified_model_names or None
16681668
1669+ plan_execution_time = execution_time or now ()
16691670 max_interval_end_per_model = None
16701671 default_start , default_end = None , None
16711672 if not run :
@@ -1680,17 +1681,31 @@ def plan_builder(
16801681 max_interval_end_per_model ,
16811682 backfill_models ,
16821683 modified_model_names ,
1683- execution_time or now () ,
1684+ plan_execution_time ,
16841685 )
16851686
1687+ if (
1688+ start
1689+ and default_end
1690+ and to_datetime (start , relative_base = to_datetime (plan_execution_time ))
1691+ > to_datetime (default_end )
1692+ ):
1693+ # If the requested start is newer than prod's latest interval end, fall back to execution time
1694+ # instead of forcing an invalid [start, default_end] range.
1695+ default_start , default_end = None , None
1696+
16861697 # Refresh snapshot intervals to ensure that they are up to date with values reflected in the max_interval_end_per_model.
16871698 self .state_sync .refresh_snapshot_intervals (context_diff .snapshots .values ())
1699+ max_interval_end_per_model = self ._filter_stale_end_overrides (
1700+ max_interval_end_per_model ,
1701+ context_diff .snapshots_by_name ,
1702+ )
16881703
16891704 start_override_per_model = self ._calculate_start_override_per_model (
16901705 min_intervals ,
16911706 start or default_start ,
16921707 end or default_end ,
1693- execution_time or now () ,
1708+ plan_execution_time ,
16941709 backfill_models ,
16951710 snapshots ,
16961711 max_interval_end_per_model ,
@@ -3181,6 +3196,20 @@ def _get_max_interval_end_per_model(
31813196 ).items ()
31823197 }
31833198
3199+ @staticmethod
3200+ def _filter_stale_end_overrides (
3201+ max_interval_end_per_model : t .Dict [str , datetime ],
3202+ snapshots_by_name : t .Dict [str , Snapshot ],
3203+ ) -> t .Dict [str , datetime ]:
3204+ # Drop stale interval ends for snapshots whose new versions have no intervals yet. Otherwise the old
3205+ # prod end is reused as an end_override, causing missing_intervals() to skip the new snapshot entirely
3206+ # when the requested start is newer than that stale end.
3207+ return {
3208+ model_fqn : end
3209+ for model_fqn , end in max_interval_end_per_model .items ()
3210+ if model_fqn not in snapshots_by_name or snapshots_by_name [model_fqn ].intervals
3211+ }
3212+
31843213 @staticmethod
31853214 def _get_models_for_interval_end (
31863215 snapshots : t .Dict [str , Snapshot ], backfill_models : t .Set [str ]
0 commit comments