-
Notifications
You must be signed in to change notification settings - Fork 16.9k
Support all bq load job and ext table config options in GCSToBigQueryOperator #64505
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import warnings | ||
| from collections.abc import Sequence | ||
| from typing import TYPE_CHECKING, Any | ||
|
|
||
|
|
@@ -31,11 +32,11 @@ | |
| ExtractJob, | ||
| LoadJob, | ||
| QueryJob, | ||
| SchemaField, | ||
| UnknownJob, | ||
| ) | ||
| from google.cloud.bigquery.table import EncryptionConfiguration, Table, TableReference | ||
|
|
||
| from airflow.exceptions import AirflowProviderDeprecationWarning | ||
| from airflow.providers.common.compat.sdk import AirflowException, conf | ||
| from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob | ||
| from airflow.providers.google.cloud.hooks.gcs import GCSHook | ||
|
|
@@ -136,7 +137,18 @@ class GCSToBigQueryOperator(BaseOperator): | |
| future executions, you can pick up from the max ID. | ||
| :param schema_update_options: Allows the schema of the destination | ||
| table to be updated as a side effect of the load job. | ||
| :param src_fmt_configs: configure optional fields specific to the source format | ||
| :param src_fmt_configs: (Deprecated) configure optional fields specific to the source format. | ||
| Use ``extra_config`` instead. Note when migrating that ``extra_config`` uses the fully-nested API | ||
| structure, so format-specific options must be nested under their parent key | ||
| (e.g., ``{"parquetOptions": {"enableListInference": True}}`` rather than | ||
| ``{"enableListInference": True}``). | ||
| :param extra_config: Dict of additional properties to apply over the BigQuery job configuration. | ||
| When ``external_table=False``, applied over the load job configuration | ||
| (see `JobConfigurationLoad <https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad>`_). | ||
| When ``external_table=True``, applied over the external table configuration | ||
| (see `ExternalDataConfiguration <https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ExternalDataConfiguration>`_). | ||
| Applied after all top-level params, so keys here take precedence over overlapping top-level | ||
| operator params. Nested dicts are replaced entirely, not deep-merged. | ||
| :param external_table: Flag to specify if the destination table should be | ||
| a BigQuery external table. Default Value is False. | ||
| :param time_partitioning: configure optional time partitioning fields i.e. | ||
|
|
@@ -189,6 +201,7 @@ class GCSToBigQueryOperator(BaseOperator): | |
| "destination_project_dataset_table", | ||
| "impersonation_chain", | ||
| "src_fmt_configs", | ||
| "extra_config", | ||
| ) | ||
| template_ext: Sequence[str] = (".sql",) | ||
| ui_color = "#f0eee4" | ||
|
|
@@ -219,6 +232,7 @@ def __init__( | |
| gcp_conn_id="google_cloud_default", | ||
| schema_update_options=(), | ||
| src_fmt_configs=None, | ||
| extra_config: dict | None = None, | ||
| external_table=False, | ||
| time_partitioning=None, | ||
| range_partitioning=None, | ||
|
|
@@ -289,6 +303,17 @@ def __init__( | |
|
|
||
| self.schema_update_options = schema_update_options | ||
| self.src_fmt_configs = src_fmt_configs | ||
| if src_fmt_configs: | ||
|
mlauter marked this conversation as resolved.
|
||
| warnings.warn( | ||
| "The 'src_fmt_configs' parameter is deprecated. Use 'extra_config' instead. " | ||
| "Note: 'extra_config' uses the fully-nested API structure, so format-specific " | ||
| "options must be nested under their parent key " | ||
| "(e.g., {'parquetOptions': {'enableListInference': True}} rather than " | ||
| "{'enableListInference': True}).", | ||
| AirflowProviderDeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
|
Comment on lines
305
to
+315
|
||
| self.extra_config = extra_config | ||
| self.time_partitioning = time_partitioning | ||
| self.range_partitioning = range_partitioning | ||
| self.cluster_fields = cluster_fields | ||
|
|
@@ -570,11 +595,15 @@ def _create_external_table(self): | |
| ) | ||
| external_config_api_repr[src_fmt_to_param_mapping[self.source_format]] = self.src_fmt_configs | ||
|
|
||
| external_config = ExternalConfig.from_api_repr(external_config_api_repr) | ||
| if self.schema_fields: | ||
| external_config.schema = [SchemaField.from_api_repr(f) for f in self.schema_fields] | ||
| external_config_api_repr["schema"] = {"fields": self.schema_fields} | ||
| if self.max_bad_records: | ||
| external_config.max_bad_records = self.max_bad_records | ||
| external_config_api_repr["maxBadRecords"] = self.max_bad_records | ||
|
|
||
| if self.extra_config: | ||
| external_config_api_repr.update(self.extra_config) | ||
|
|
||
| external_config = ExternalConfig.from_api_repr(external_config_api_repr) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moving this below so that extra_config takes precedence over schema_fields and max_bad_records |
||
|
|
||
| # build table definition | ||
| table = Table( | ||
|
|
@@ -728,6 +757,10 @@ def _use_existing_table(self): | |
|
|
||
| if self.allow_jagged_rows: | ||
| self.configuration["load"]["allowJaggedRows"] = self.allow_jagged_rows | ||
|
|
||
| if self.extra_config: | ||
| self.configuration["load"].update(self.extra_config) | ||
|
|
||
| return self.configuration | ||
|
|
||
| def _validate_src_fmt_configs( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no longer needed because we're just including the schema fields in the main ExternalConfig dict under the
schemakey and callingfrom_api_repron the whole thing