diff --git a/airflow-core/src/airflow/cli/commands/connection_command.py b/airflow-core/src/airflow/cli/commands/connection_command.py index 8911bc80b99e2..8e98fc1573141 100644 --- a/airflow-core/src/airflow/cli/commands/connection_command.py +++ b/airflow-core/src/airflow/cli/commands/connection_command.py @@ -411,26 +411,38 @@ def _import_helper(file_path: str, overwrite: bool) -> None: :param overwrite: Whether to skip or overwrite on collision. """ connections_dict = load_connections_dict(file_path) + suc_count = fail_count = 0 with create_session() as session: for conn_id, conn in connections_dict.items(): try: helpers.validate_key(conn_id, max_length=200) except Exception as e: print(f"Could not import connection. {e}") + fail_count += 1 continue existing_conn_id = session.scalar(select(Connection.id).where(Connection.conn_id == conn_id)) if existing_conn_id is not None: if not overwrite: print(f"Could not import connection {conn_id}: connection already exists.") + fail_count += 1 continue # The conn_ids match, but the PK of the new entry must also be the same as the old conn.id = existing_conn_id - session.merge(conn) - session.commit() - print(f"Imported connection {conn_id}") + try: + session.merge(conn) + session.commit() + except Exception as e: + print(f"Connection import failed for {conn_id}: {e!r}") + session.rollback() + fail_count += 1 + else: + suc_count += 1 + print(f"{suc_count} of {len(connections_dict)} connections successfully imported.") + if fail_count: + print(f"{fail_count} connection(s) failed to be imported.") @suppress_logs_and_warning