From 3476d702bcfb40443f34fc1ce3aaea19134d8cb6 Mon Sep 17 00:00:00 2001 From: Filipa Andrade Date: Mon, 29 Jun 2026 09:32:36 +0100 Subject: [PATCH 1/2] Add import_format to S3/GCS import data sources S3 and GCS import data sources accept an optional import_format (csv/ndjson/parquet), emitted as IMPORT_FORMAT in the generated .datasource. This lets you ingest files whose extension does not imply the format (for example NDJSON delivered as .log), which otherwise fail with "Format not supported". Co-Authored-By: Claude Opus 4.8 (1M context) --- CHANGELOG.md | 6 +++ src/tinybird_sdk/generator/datasource.py | 2 + src/tinybird_sdk/schema/datasource.py | 2 + tests/test_s3_import_format.py | 58 ++++++++++++++++++++++++ 4 files changed, 68 insertions(+) create mode 100644 tests/test_s3_import_format.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 037580b..0a1ecba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project follows [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- S3 and GCS import data sources accept an optional `import_format` (`csv`/`ndjson`/`parquet`), emitted as `IMPORT_FORMAT` in the generated `.datasource`. Lets you ingest files whose extension does not imply the format (for example NDJSON delivered as `.log`), where the connector would otherwise fail with `Format not supported`. + ## [0.1.11] - 2026-06-15 ### Changed diff --git a/src/tinybird_sdk/generator/datasource.py b/src/tinybird_sdk/generator/datasource.py index fe504f1..c7693b9 100644 --- a/src/tinybird_sdk/generator/datasource.py +++ b/src/tinybird_sdk/generator/datasource.py @@ -121,6 +121,8 @@ def _generate_import_config(import_config: Any) -> str: lines.append(f"IMPORT_SCHEDULE {import_config.schedule}") if import_config.from_timestamp: lines.append(f"IMPORT_FROM_TIMESTAMP {import_config.from_timestamp}") + if getattr(import_config, "import_format", None): + lines.append(f'IMPORT_FORMAT "{import_config.import_format}"') return "\n".join(lines) diff --git a/src/tinybird_sdk/schema/datasource.py b/src/tinybird_sdk/schema/datasource.py index c67a214..c9131b6 100644 --- a/src/tinybird_sdk/schema/datasource.py +++ b/src/tinybird_sdk/schema/datasource.py @@ -57,6 +57,7 @@ class S3Config: bucket_uri: str schedule: str | None = None from_timestamp: str | None = None + import_format: str | None = None @dataclass(frozen=True, slots=True) @@ -65,6 +66,7 @@ class GCSConfig: bucket_uri: str schedule: str | None = None from_timestamp: str | None = None + import_format: str | None = None @dataclass(frozen=True, slots=True) diff --git a/tests/test_s3_import_format.py b/tests/test_s3_import_format.py new file mode 100644 index 0000000..0fd8490 --- /dev/null +++ b/tests/test_s3_import_format.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from tinybird_sdk import ( + define_datasource, + define_gcs_connection, + define_s3_connection, + t, +) +from tinybird_sdk.generator.datasource import generate_datasource + + +def test_s3_import_emits_import_format_when_set() -> None: + s3 = define_s3_connection("landing_s3", {"region": "us-east-1", "arn": "arn:aws:iam::1:role/r"}) + datasource = define_datasource( + "events", + { + "schema": {"id": t.string()}, + "s3": { + "connection": s3, + "bucket_uri": "s3://bucket/events/*.log", + "import_format": "ndjson", + }, + }, + ) + + content = generate_datasource(datasource).content + assert "IMPORT_BUCKET_URI s3://bucket/events/*.log" in content + assert 'IMPORT_FORMAT "ndjson"' in content + + +def test_s3_import_omits_import_format_when_unset() -> None: + s3 = define_s3_connection("landing_s3", {"region": "us-east-1", "arn": "arn:aws:iam::1:role/r"}) + datasource = define_datasource( + "events", + { + "schema": {"id": t.string()}, + "s3": {"connection": s3, "bucket_uri": "s3://bucket/events/*.ndjson"}, + }, + ) + + assert "IMPORT_FORMAT" not in generate_datasource(datasource).content + + +def test_gcs_import_emits_import_format_when_set() -> None: + gcs = define_gcs_connection("landing_gcs", {"service_account_credentials_json": "{}"}) + datasource = define_datasource( + "events_gcs", + { + "schema": {"id": t.string()}, + "gcs": { + "connection": gcs, + "bucket_uri": "gs://bucket/events/*.log", + "import_format": "ndjson", + }, + }, + ) + + assert 'IMPORT_FORMAT "ndjson"' in generate_datasource(datasource).content From ef8f4d16c6009ec0a890d4d6ed6d7761377814c5 Mon Sep 17 00:00:00 2001 From: Filipa Andrade Date: Mon, 29 Jun 2026 10:19:49 +0100 Subject: [PATCH 2/2] Round-trip import_format through the migrate parser and emitter Mirror from_timestamp end-to-end: parse IMPORT_FORMAT, carry it on the S3/GCS migrate models, emit it from emit_ts, and add parse + round-trip tests. Keeps generator/parser/emitter parity for the new field. Co-Authored-By: Claude Opus 4.8 (1M context) --- CHANGELOG.md | 2 +- src/tinybird_sdk/migrate/emit_ts.py | 4 ++ src/tinybird_sdk/migrate/parse_datasource.py | 12 ++++- src/tinybird_sdk/migrate/types.py | 2 + tests/test_s3_import_format.py | 55 ++++++++++++++++++++ 5 files changed, 72 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a1ecba..5dae597 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ and this project follows [Semantic Versioning](https://semver.org/spec/v2.0.0.ht ### Added -- S3 and GCS import data sources accept an optional `import_format` (`csv`/`ndjson`/`parquet`), emitted as `IMPORT_FORMAT` in the generated `.datasource`. Lets you ingest files whose extension does not imply the format (for example NDJSON delivered as `.log`), where the connector would otherwise fail with `Format not supported`. +- S3 and GCS import data sources accept an optional `import_format` (`csv`/`ndjson`/`parquet`), emitted as `IMPORT_FORMAT` in the generated `.datasource` and round-tripped by the datafile parser and migration emitter. Lets you ingest files whose extension does not imply the format (for example NDJSON delivered as `.log`), where the connector would otherwise fail with `Format not supported`. ## [0.1.11] - 2026-06-15 diff --git a/src/tinybird_sdk/migrate/emit_ts.py b/src/tinybird_sdk/migrate/emit_ts.py index 8cde5d1..7b24142 100644 --- a/src/tinybird_sdk/migrate/emit_ts.py +++ b/src/tinybird_sdk/migrate/emit_ts.py @@ -249,6 +249,8 @@ def _emit_datasource(ds: DatasourceModel) -> str: lines.append(f" 'schedule': {_escape_string(ds.s3.schedule)},") if ds.s3.from_timestamp: lines.append(f" 'from_timestamp': {_escape_string(ds.s3.from_timestamp)},") + if ds.s3.import_format: + lines.append(f" 'import_format': {_escape_string(ds.s3.import_format)},") lines.append(" },") if ds.gcs: @@ -260,6 +262,8 @@ def _emit_datasource(ds: DatasourceModel) -> str: lines.append(f" 'schedule': {_escape_string(ds.gcs.schedule)},") if ds.gcs.from_timestamp: lines.append(f" 'from_timestamp': {_escape_string(ds.gcs.from_timestamp)},") + if ds.gcs.import_format: + lines.append(f" 'import_format': {_escape_string(ds.gcs.import_format)},") lines.append(" },") if ds.dynamodb: diff --git a/src/tinybird_sdk/migrate/parse_datasource.py b/src/tinybird_sdk/migrate/parse_datasource.py index 53164e3..889b267 100644 --- a/src/tinybird_sdk/migrate/parse_datasource.py +++ b/src/tinybird_sdk/migrate/parse_datasource.py @@ -282,6 +282,7 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: import_bucket_uri: str | None = None import_schedule: str | None = None import_from_timestamp: str | None = None + import_format: str | None = None import_table_arn: str | None = None import_export_bucket: str | None = None @@ -416,6 +417,8 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: import_schedule = parse_quoted_value(value) elif key == "IMPORT_FROM_TIMESTAMP": import_from_timestamp = parse_quoted_value(value) + elif key == "IMPORT_FORMAT": + import_format = parse_quoted_value(value) elif key == "IMPORT_TABLE_ARN": import_table_arn = parse_quoted_value(value) elif key == "IMPORT_EXPORT_BUCKET": @@ -502,7 +505,7 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: resource.name, "IMPORT_CONNECTION_NAME, IMPORT_TABLE_ARN and IMPORT_EXPORT_BUCKET are required for DynamoDB imports.", ) - if import_bucket_uri or import_schedule or import_from_timestamp: + if import_bucket_uri or import_schedule or import_from_timestamp or import_format: raise MigrationParseError( resource.file_path, "datasource", @@ -517,7 +520,11 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: imported: DatasourceS3Model | None = None if not dynamodb and ( - import_connection_name or import_bucket_uri or import_schedule or import_from_timestamp + import_connection_name + or import_bucket_uri + or import_schedule + or import_from_timestamp + or import_format ): if not import_connection_name or not import_bucket_uri: raise MigrationParseError( @@ -531,6 +538,7 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: bucket_uri=import_bucket_uri, schedule=import_schedule, from_timestamp=import_from_timestamp, + import_format=import_format, ) if kafka and (imported or dynamodb): diff --git a/src/tinybird_sdk/migrate/types.py b/src/tinybird_sdk/migrate/types.py index 7363bdb..5b9650e 100644 --- a/src/tinybird_sdk/migrate/types.py +++ b/src/tinybird_sdk/migrate/types.py @@ -63,6 +63,7 @@ class DatasourceS3Model: bucket_uri: str schedule: str | None = None from_timestamp: str | None = None + import_format: str | None = None @dataclass(frozen=True, slots=True) @@ -71,6 +72,7 @@ class DatasourceGCSModel: bucket_uri: str schedule: str | None = None from_timestamp: str | None = None + import_format: str | None = None @dataclass(frozen=True, slots=True) diff --git a/tests/test_s3_import_format.py b/tests/test_s3_import_format.py index 0fd8490..0550c1c 100644 --- a/tests/test_s3_import_format.py +++ b/tests/test_s3_import_format.py @@ -7,6 +7,10 @@ t, ) from tinybird_sdk.generator.datasource import generate_datasource +from tinybird_sdk.migrate.emit_ts import emit_migration_file_content +from tinybird_sdk.migrate.parse_connection import parse_connection_file +from tinybird_sdk.migrate.parse_datasource import parse_datasource_file +from tinybird_sdk.migrate.types import ResourceFile def test_s3_import_emits_import_format_when_set() -> None: @@ -56,3 +60,54 @@ def test_gcs_import_emits_import_format_when_set() -> None: ) assert 'IMPORT_FORMAT "ndjson"' in generate_datasource(datasource).content + + +def test_parse_datasource_reads_import_format() -> None: + resource = ResourceFile( + kind="datasource", + file_path="datasources/events.datasource", + absolute_path="/x/datasources/events.datasource", + name="events", + content=( + "SCHEMA >\n" + " `id` String `json:$.id`\n" + "\n" + "IMPORT_CONNECTION_NAME 'landing_s3'\n" + "IMPORT_BUCKET_URI 's3://bucket/events/*.log'\n" + 'IMPORT_FORMAT "ndjson"\n' + ), + ) + + model = parse_datasource_file(resource) + assert model.s3 is not None + assert model.s3.import_format == "ndjson" + + +def test_emit_ts_round_trip_for_s3_import_format() -> None: + connection_resource = ResourceFile( + kind="connection", + file_path="connections/landing_s3.connection", + absolute_path="/x/connections/landing_s3.connection", + name="landing_s3", + content='TYPE s3\nS3_REGION us-east-1\nS3_ARN "arn:aws:iam::1:role/r"\n', + ) + datasource_resource = ResourceFile( + kind="datasource", + file_path="datasources/events.datasource", + absolute_path="/x/datasources/events.datasource", + name="events", + content=( + "SCHEMA >\n" + " `id` String `json:$.id`\n" + "\n" + "IMPORT_CONNECTION_NAME 'landing_s3'\n" + "IMPORT_BUCKET_URI 's3://bucket/events/*.log'\n" + 'IMPORT_FORMAT "ndjson"\n' + ), + ) + + connection = parse_connection_file(connection_resource) + datasource = parse_datasource_file(datasource_resource) + + output = emit_migration_file_content([connection, datasource]) + assert "'import_format': \"ndjson\"" in output