Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from __future__ import annotations

import json
import warnings
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any

Expand All @@ -31,11 +32,11 @@
ExtractJob,
LoadJob,
QueryJob,
SchemaField,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no longer needed because we're just including the schema fields in the main ExternalConfig dict under the schema key and calling from_api_repr on the whole thing

UnknownJob,
)
from google.cloud.bigquery.table import EncryptionConfiguration, Table, TableReference

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.common.compat.sdk import AirflowException, conf
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob
from airflow.providers.google.cloud.hooks.gcs import GCSHook
Expand Down Expand Up @@ -136,7 +137,18 @@ class GCSToBigQueryOperator(BaseOperator):
future executions, you can pick up from the max ID.
:param schema_update_options: Allows the schema of the destination
table to be updated as a side effect of the load job.
:param src_fmt_configs: configure optional fields specific to the source format
:param src_fmt_configs: (Deprecated) configure optional fields specific to the source format.
Use ``extra_config`` instead. Note when migrating that ``extra_config`` uses the fully-nested API
structure, so format-specific options must be nested under their parent key
(e.g., ``{"parquetOptions": {"enableListInference": True}}`` rather than
``{"enableListInference": True}``).
:param extra_config: Dict of additional properties to apply over the BigQuery job configuration.
When ``external_table=False``, applied over the load job configuration
(see `JobConfigurationLoad <https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad>`_).
When ``external_table=True``, applied over the external table configuration
(see `ExternalDataConfiguration <https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ExternalDataConfiguration>`_).
Applied after all top-level params, so keys here take precedence over overlapping top-level
operator params. Nested dicts are replaced entirely, not deep-merged.
:param external_table: Flag to specify if the destination table should be
a BigQuery external table. Default Value is False.
:param time_partitioning: configure optional time partitioning fields i.e.
Expand Down Expand Up @@ -189,6 +201,7 @@ class GCSToBigQueryOperator(BaseOperator):
"destination_project_dataset_table",
"impersonation_chain",
"src_fmt_configs",
"extra_config",
)
template_ext: Sequence[str] = (".sql",)
ui_color = "#f0eee4"
Expand Down Expand Up @@ -219,6 +232,7 @@ def __init__(
gcp_conn_id="google_cloud_default",
schema_update_options=(),
src_fmt_configs=None,
extra_config: dict | None = None,
external_table=False,
time_partitioning=None,
range_partitioning=None,
Expand Down Expand Up @@ -289,6 +303,17 @@ def __init__(

self.schema_update_options = schema_update_options
self.src_fmt_configs = src_fmt_configs
if src_fmt_configs:
Comment thread
mlauter marked this conversation as resolved.
warnings.warn(
"The 'src_fmt_configs' parameter is deprecated. Use 'extra_config' instead. "
"Note: 'extra_config' uses the fully-nested API structure, so format-specific "
"options must be nested under their parent key "
"(e.g., {'parquetOptions': {'enableListInference': True}} rather than "
"{'enableListInference': True}).",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
Comment on lines 305 to +315
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deprecation warning only fires when src_fmt_configs is truthy. If a DAG explicitly passes an empty dict (src_fmt_configs={}), this won’t warn even though the deprecated parameter is used. Consider tracking whether the argument was provided (e.g., src_fmt_configs_provided = src_fmt_configs is not None before normalizing None to {}) and warning whenever it was provided, regardless of emptiness.

Copilot uses AI. Check for mistakes.
self.extra_config = extra_config
self.time_partitioning = time_partitioning
self.range_partitioning = range_partitioning
self.cluster_fields = cluster_fields
Expand Down Expand Up @@ -570,11 +595,15 @@ def _create_external_table(self):
)
external_config_api_repr[src_fmt_to_param_mapping[self.source_format]] = self.src_fmt_configs

external_config = ExternalConfig.from_api_repr(external_config_api_repr)
if self.schema_fields:
external_config.schema = [SchemaField.from_api_repr(f) for f in self.schema_fields]
external_config_api_repr["schema"] = {"fields": self.schema_fields}
if self.max_bad_records:
external_config.max_bad_records = self.max_bad_records
external_config_api_repr["maxBadRecords"] = self.max_bad_records

if self.extra_config:
external_config_api_repr.update(self.extra_config)

external_config = ExternalConfig.from_api_repr(external_config_api_repr)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moving this below so that extra_config takes precedence over schema_fields and max_bad_records


# build table definition
table = Table(
Expand Down Expand Up @@ -728,6 +757,10 @@ def _use_existing_table(self):

if self.allow_jagged_rows:
self.configuration["load"]["allowJaggedRows"] = self.allow_jagged_rows

if self.extra_config:
self.configuration["load"].update(self.extra_config)

return self.configuration

def _validate_src_fmt_configs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from google.cloud.exceptions import Conflict
from sqlalchemy import select

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models.trigger import Trigger
from airflow.providers.common.compat.openlineage.facet import (
ColumnLineageDatasetFacet,
Expand Down Expand Up @@ -1739,20 +1740,21 @@ def test_external_table_should_accept_parquet_format_and_options(self, hook):
hook.return_value.generate_job_id.return_value = REAL_JOB_ID
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE)

operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
schema_fields=SCHEMA_FIELDS,
write_disposition=WRITE_DISPOSITION,
external_table=True,
project_id=JOB_PROJECT_ID,
source_format="PARQUET",
src_fmt_configs={
"enableListInference": True,
},
)
with pytest.warns(AirflowProviderDeprecationWarning, match="src_fmt_configs"):
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
schema_fields=SCHEMA_FIELDS,
write_disposition=WRITE_DISPOSITION,
external_table=True,
project_id=JOB_PROJECT_ID,
source_format="PARQUET",
src_fmt_configs={
"enableListInference": True,
},
)

operator.execute(context=MagicMock())

Expand Down Expand Up @@ -1841,19 +1843,20 @@ def test_without_external_table_should_accept_parquet_format_and_options(self, h
]
hook.return_value.generate_job_id.return_value = REAL_JOB_ID
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE)
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
write_disposition=WRITE_DISPOSITION,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
external_table=False,
project_id=JOB_PROJECT_ID,
source_format="PARQUET",
src_fmt_configs={
"enableListInference": True,
},
)
with pytest.warns(AirflowProviderDeprecationWarning, match="src_fmt_configs"):
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
write_disposition=WRITE_DISPOSITION,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
external_table=False,
project_id=JOB_PROJECT_ID,
source_format="PARQUET",
src_fmt_configs={
"enableListInference": True,
},
)

operator.execute(context=MagicMock())

Expand Down Expand Up @@ -1888,6 +1891,208 @@ def test_without_external_table_should_accept_parquet_format_and_options(self, h

hook.return_value.insert_job.assert_has_calls(calls)

@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_extra_config_is_merged_into_load_config(self, hook):
hook.return_value.insert_job.return_value = MagicMock(job_id=REAL_JOB_ID, error_result=False)
hook.return_value.generate_job_id.return_value = REAL_JOB_ID
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE)

operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
write_disposition=WRITE_DISPOSITION,
project_id=JOB_PROJECT_ID,
extra_config={"columnNameCharacterMap": "STRICT"},
)

operator.execute(context=MagicMock())

config = hook.return_value.insert_job.call_args[1]["configuration"]
assert config["load"]["columnNameCharacterMap"] == "STRICT"

@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_extra_config_takes_precedence_over_top_level_params(self, hook):
hook.return_value.insert_job.return_value = MagicMock(job_id=REAL_JOB_ID, error_result=False)
hook.return_value.generate_job_id.return_value = REAL_JOB_ID
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE)

operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
write_disposition=WRITE_DISPOSITION,
project_id=JOB_PROJECT_ID,
autodetect=True,
extra_config={"autodetect": False},
)

operator.execute(context=MagicMock())

config = hook.return_value.insert_job.call_args[1]["configuration"]
assert config["load"]["autodetect"] is False

@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_extra_config_with_nested_format_options(self, hook):
hook.return_value.insert_job.return_value = MagicMock(job_id=REAL_JOB_ID, error_result=False)
hook.return_value.generate_job_id.return_value = REAL_JOB_ID
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE)

operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
write_disposition=WRITE_DISPOSITION,
project_id=JOB_PROJECT_ID,
source_format="PARQUET",
extra_config={"parquetOptions": {"enableListInference": True}},
)

operator.execute(context=MagicMock())

config = hook.return_value.insert_job.call_args[1]["configuration"]
assert config["load"]["parquetOptions"] == {"enableListInference": True}

@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_extra_config_is_merged_into_external_config(self, hook):
hook.return_value.generate_job_id.return_value = REAL_JOB_ID
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE)

operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
schema_fields=SCHEMA_FIELDS,
write_disposition=WRITE_DISPOSITION,
external_table=True,
project_id=JOB_PROJECT_ID,
extra_config={"maxBadRecords": 10},
)

operator.execute(context=MagicMock())

table_resource = hook.return_value.create_table.call_args[1]["table_resource"]
assert table_resource["externalDataConfiguration"]["maxBadRecords"] == 10

@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_max_bad_records_set_from_top_level_param_in_external_table(self, hook):
hook.return_value.generate_job_id.return_value = REAL_JOB_ID
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE)

operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
schema_fields=SCHEMA_FIELDS,
write_disposition=WRITE_DISPOSITION,
external_table=True,
project_id=JOB_PROJECT_ID,
max_bad_records=5,
)

operator.execute(context=MagicMock())

table_resource = hook.return_value.create_table.call_args[1]["table_resource"]
assert table_resource["externalDataConfiguration"]["maxBadRecords"] == 5

@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_extra_config_takes_precedence_over_max_bad_records_in_external_table(self, hook):
hook.return_value.generate_job_id.return_value = REAL_JOB_ID
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE)

operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
schema_fields=SCHEMA_FIELDS,
write_disposition=WRITE_DISPOSITION,
external_table=True,
project_id=JOB_PROJECT_ID,
max_bad_records=5,
extra_config={"maxBadRecords": 10},
)

operator.execute(context=MagicMock())

table_resource = hook.return_value.create_table.call_args[1]["table_resource"]
assert table_resource["externalDataConfiguration"]["maxBadRecords"] == 10

@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_extra_config_takes_precedence_over_schema_fields_in_external_table(self, hook):
hook.return_value.generate_job_id.return_value = REAL_JOB_ID
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE)

override_schema = [{"name": "override_col", "type": "INTEGER", "mode": "NULLABLE"}]
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
schema_fields=SCHEMA_FIELDS,
write_disposition=WRITE_DISPOSITION,
external_table=True,
project_id=JOB_PROJECT_ID,
extra_config={"schema": {"fields": override_schema}},
)

operator.execute(context=MagicMock())

table_resource = hook.return_value.create_table.call_args[1]["table_resource"]
assert table_resource["externalDataConfiguration"]["schema"]["fields"] == override_schema

@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_src_fmt_configs_emits_deprecation_warning(self, hook):
hook.return_value.insert_job.return_value = MagicMock(job_id=REAL_JOB_ID, error_result=False)
hook.return_value.generate_job_id.return_value = REAL_JOB_ID
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE)

with pytest.warns(AirflowProviderDeprecationWarning, match="src_fmt_configs"):
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
write_disposition=WRITE_DISPOSITION,
project_id=JOB_PROJECT_ID,
src_fmt_configs={"skipLeadingRows": 1},
)

operator.execute(context=MagicMock())

config = hook.return_value.insert_job.call_args[1]["configuration"]
assert config["load"]["skipLeadingRows"] == 1

@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_src_fmt_configs_and_extra_config_both_applied_with_precedence(self, hook):
hook.return_value.insert_job.return_value = MagicMock(job_id=REAL_JOB_ID, error_result=False)
hook.return_value.generate_job_id.return_value = REAL_JOB_ID
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE)

with pytest.warns(AirflowProviderDeprecationWarning, match="src_fmt_configs"):
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
write_disposition=WRITE_DISPOSITION,
project_id=JOB_PROJECT_ID,
src_fmt_configs={"skipLeadingRows": 1},
extra_config={"skipLeadingRows": 5, "columnNameCharacterMap": "STRICT"},
)

operator.execute(context=MagicMock())

config = hook.return_value.insert_job.call_args[1]["configuration"]
# extra_config wins for overlapping key
assert config["load"]["skipLeadingRows"] == 5
assert config["load"]["columnNameCharacterMap"] == "STRICT"


@pytest.fixture
def create_task_instance(create_task_instance_of_operator, session):
Expand Down
Loading