From bb4a0448b78b2e29adda3d99a0e2eacc3c34ec54 Mon Sep 17 00:00:00 2001 From: Rohit Nair P Date: Mon, 30 Mar 2026 23:30:33 +0530 Subject: [PATCH 1/5] Improve CLI import command performance by avoiding N+1 commits --- airflow-core/newsfragments/import_cli.improvement.rst | 1 + airflow-core/src/airflow/cli/commands/connection_command.py | 1 - airflow-core/src/airflow/cli/commands/variable_command.py | 6 +++--- 3 files changed, 4 insertions(+), 4 deletions(-) create mode 100644 airflow-core/newsfragments/import_cli.improvement.rst diff --git a/airflow-core/newsfragments/import_cli.improvement.rst b/airflow-core/newsfragments/import_cli.improvement.rst new file mode 100644 index 0000000000000..813edafba9413 --- /dev/null +++ b/airflow-core/newsfragments/import_cli.improvement.rst @@ -0,0 +1 @@ +Improve CLI import command performance by avoiding N+1 commits diff --git a/airflow-core/src/airflow/cli/commands/connection_command.py b/airflow-core/src/airflow/cli/commands/connection_command.py index 8911bc80b99e2..c858e80e90c8c 100644 --- a/airflow-core/src/airflow/cli/commands/connection_command.py +++ b/airflow-core/src/airflow/cli/commands/connection_command.py @@ -429,7 +429,6 @@ def _import_helper(file_path: str, overwrite: bool) -> None: conn.id = existing_conn_id session.merge(conn) - session.commit() print(f"Imported connection {conn_id}") diff --git a/airflow-core/src/airflow/cli/commands/variable_command.py b/airflow-core/src/airflow/cli/commands/variable_command.py index 85166cb6e79d3..e929693142e2e 100644 --- a/airflow-core/src/airflow/cli/commands/variable_command.py +++ b/airflow-core/src/airflow/cli/commands/variable_command.py @@ -36,7 +36,7 @@ from airflow.utils import cli as cli_utils from airflow.utils.cli import suppress_logs_and_warning from airflow.utils.providers_configuration_loader import providers_configuration_loaded -from airflow.utils.session import create_session, provide_session +from airflow.utils.session import NEW_SESSION, create_session, provide_session class VariableDisplayMapper: @@ -122,7 +122,7 @@ def variables_delete(args): @cli_utils.action_cli @providers_configuration_loaded @provide_session -def variables_import(args, session): +def variables_import(args, *, session=NEW_SESSION): """Import variables from a given file.""" if not os.path.exists(args.file): raise SystemExit("Missing variables file.") @@ -153,7 +153,7 @@ def variables_import(args, session): description = None if isinstance(v, dict) and v.get("value"): # verify that var configuration has value value, description = v["value"], v.get("description") - Variable.set(k, value, description, serialize_json=not isinstance(value, str)) + Variable.set(k, value, description, serialize_json=not isinstance(value, str), session=session) except Exception as e: print(f"Variable import failed: {e!r}") fail_count += 1 From 444c52e87c864ad9b2239c1161910e811774cefe Mon Sep 17 00:00:00 2001 From: Rohit Nair P Date: Tue, 31 Mar 2026 01:24:45 +0530 Subject: [PATCH 2/5] Rename newsfragment file to match PR 64498 --- .../{import_cli.improvement.rst => 64498.improvement.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename airflow-core/newsfragments/{import_cli.improvement.rst => 64498.improvement.rst} (100%) diff --git a/airflow-core/newsfragments/import_cli.improvement.rst b/airflow-core/newsfragments/64498.improvement.rst similarity index 100% rename from airflow-core/newsfragments/import_cli.improvement.rst rename to airflow-core/newsfragments/64498.improvement.rst From 1ad233b806d369716afd067ec8c1b69d95861098 Mon Sep 17 00:00:00 2001 From: Rohit Nair P Date: Tue, 31 Mar 2026 01:27:33 +0530 Subject: [PATCH 3/5] Remove newsfragment as per reviewer feedback --- airflow-core/newsfragments/64498.improvement.rst | 1 - 1 file changed, 1 deletion(-) delete mode 100644 airflow-core/newsfragments/64498.improvement.rst diff --git a/airflow-core/newsfragments/64498.improvement.rst b/airflow-core/newsfragments/64498.improvement.rst deleted file mode 100644 index 813edafba9413..0000000000000 --- a/airflow-core/newsfragments/64498.improvement.rst +++ /dev/null @@ -1 +0,0 @@ -Improve CLI import command performance by avoiding N+1 commits From f235093422268d9ad08fd6b4ab47e292cb6d84de Mon Sep 17 00:00:00 2001 From: Rohit Nair P Date: Tue, 7 Apr 2026 09:49:42 +0530 Subject: [PATCH 4/5] Standardize error handling in CLI connection import --- .../airflow/cli/commands/connection_command.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/cli/commands/connection_command.py b/airflow-core/src/airflow/cli/commands/connection_command.py index 8911bc80b99e2..8e98fc1573141 100644 --- a/airflow-core/src/airflow/cli/commands/connection_command.py +++ b/airflow-core/src/airflow/cli/commands/connection_command.py @@ -411,26 +411,38 @@ def _import_helper(file_path: str, overwrite: bool) -> None: :param overwrite: Whether to skip or overwrite on collision. """ connections_dict = load_connections_dict(file_path) + suc_count = fail_count = 0 with create_session() as session: for conn_id, conn in connections_dict.items(): try: helpers.validate_key(conn_id, max_length=200) except Exception as e: print(f"Could not import connection. {e}") + fail_count += 1 continue existing_conn_id = session.scalar(select(Connection.id).where(Connection.conn_id == conn_id)) if existing_conn_id is not None: if not overwrite: print(f"Could not import connection {conn_id}: connection already exists.") + fail_count += 1 continue # The conn_ids match, but the PK of the new entry must also be the same as the old conn.id = existing_conn_id - session.merge(conn) - session.commit() - print(f"Imported connection {conn_id}") + try: + session.merge(conn) + session.commit() + except Exception as e: + print(f"Connection import failed for {conn_id}: {e!r}") + session.rollback() + fail_count += 1 + else: + suc_count += 1 + print(f"{suc_count} of {len(connections_dict)} connections successfully imported.") + if fail_count: + print(f"{fail_count} connection(s) failed to be imported.") @suppress_logs_and_warning From 6cad37056b5e54fe636968b34d6ca9334d1a700f Mon Sep 17 00:00:00 2001 From: Rohit Nair P Date: Tue, 7 Apr 2026 09:56:35 +0530 Subject: [PATCH 5/5] Revert variable_command.py to upstream --- airflow-core/src/airflow/cli/commands/variable_command.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/cli/commands/variable_command.py b/airflow-core/src/airflow/cli/commands/variable_command.py index e929693142e2e..85166cb6e79d3 100644 --- a/airflow-core/src/airflow/cli/commands/variable_command.py +++ b/airflow-core/src/airflow/cli/commands/variable_command.py @@ -36,7 +36,7 @@ from airflow.utils import cli as cli_utils from airflow.utils.cli import suppress_logs_and_warning from airflow.utils.providers_configuration_loader import providers_configuration_loaded -from airflow.utils.session import NEW_SESSION, create_session, provide_session +from airflow.utils.session import create_session, provide_session class VariableDisplayMapper: @@ -122,7 +122,7 @@ def variables_delete(args): @cli_utils.action_cli @providers_configuration_loaded @provide_session -def variables_import(args, *, session=NEW_SESSION): +def variables_import(args, session): """Import variables from a given file.""" if not os.path.exists(args.file): raise SystemExit("Missing variables file.") @@ -153,7 +153,7 @@ def variables_import(args, *, session=NEW_SESSION): description = None if isinstance(v, dict) and v.get("value"): # verify that var configuration has value value, description = v["value"], v.get("description") - Variable.set(k, value, description, serialize_json=not isinstance(value, str), session=session) + Variable.set(k, value, description, serialize_json=not isinstance(value, str)) except Exception as e: print(f"Variable import failed: {e!r}") fail_count += 1