diff --git a/CHANGELOG.md b/CHANGELOG.md index 037580b..5dae597 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project follows [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- S3 and GCS import data sources accept an optional `import_format` (`csv`/`ndjson`/`parquet`), emitted as `IMPORT_FORMAT` in the generated `.datasource` and round-tripped by the datafile parser and migration emitter. Lets you ingest files whose extension does not imply the format (for example NDJSON delivered as `.log`), where the connector would otherwise fail with `Format not supported`. + ## [0.1.11] - 2026-06-15 ### Changed diff --git a/src/tinybird_sdk/generator/datasource.py b/src/tinybird_sdk/generator/datasource.py index fe504f1..c7693b9 100644 --- a/src/tinybird_sdk/generator/datasource.py +++ b/src/tinybird_sdk/generator/datasource.py @@ -121,6 +121,8 @@ def _generate_import_config(import_config: Any) -> str: lines.append(f"IMPORT_SCHEDULE {import_config.schedule}") if import_config.from_timestamp: lines.append(f"IMPORT_FROM_TIMESTAMP {import_config.from_timestamp}") + if getattr(import_config, "import_format", None): + lines.append(f'IMPORT_FORMAT "{import_config.import_format}"') return "\n".join(lines) diff --git a/src/tinybird_sdk/migrate/emit_ts.py b/src/tinybird_sdk/migrate/emit_ts.py index 8cde5d1..7b24142 100644 --- a/src/tinybird_sdk/migrate/emit_ts.py +++ b/src/tinybird_sdk/migrate/emit_ts.py @@ -249,6 +249,8 @@ def _emit_datasource(ds: DatasourceModel) -> str: lines.append(f" 'schedule': {_escape_string(ds.s3.schedule)},") if ds.s3.from_timestamp: lines.append(f" 'from_timestamp': {_escape_string(ds.s3.from_timestamp)},") + if ds.s3.import_format: + lines.append(f" 'import_format': {_escape_string(ds.s3.import_format)},") lines.append(" },") if ds.gcs: @@ -260,6 +262,8 @@ def _emit_datasource(ds: DatasourceModel) -> str: lines.append(f" 'schedule': {_escape_string(ds.gcs.schedule)},") if ds.gcs.from_timestamp: lines.append(f" 'from_timestamp': {_escape_string(ds.gcs.from_timestamp)},") + if ds.gcs.import_format: + lines.append(f" 'import_format': {_escape_string(ds.gcs.import_format)},") lines.append(" },") if ds.dynamodb: diff --git a/src/tinybird_sdk/migrate/parse_datasource.py b/src/tinybird_sdk/migrate/parse_datasource.py index 53164e3..889b267 100644 --- a/src/tinybird_sdk/migrate/parse_datasource.py +++ b/src/tinybird_sdk/migrate/parse_datasource.py @@ -282,6 +282,7 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: import_bucket_uri: str | None = None import_schedule: str | None = None import_from_timestamp: str | None = None + import_format: str | None = None import_table_arn: str | None = None import_export_bucket: str | None = None @@ -416,6 +417,8 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: import_schedule = parse_quoted_value(value) elif key == "IMPORT_FROM_TIMESTAMP": import_from_timestamp = parse_quoted_value(value) + elif key == "IMPORT_FORMAT": + import_format = parse_quoted_value(value) elif key == "IMPORT_TABLE_ARN": import_table_arn = parse_quoted_value(value) elif key == "IMPORT_EXPORT_BUCKET": @@ -502,7 +505,7 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: resource.name, "IMPORT_CONNECTION_NAME, IMPORT_TABLE_ARN and IMPORT_EXPORT_BUCKET are required for DynamoDB imports.", ) - if import_bucket_uri or import_schedule or import_from_timestamp: + if import_bucket_uri or import_schedule or import_from_timestamp or import_format: raise MigrationParseError( resource.file_path, "datasource", @@ -517,7 +520,11 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: imported: DatasourceS3Model | None = None if not dynamodb and ( - import_connection_name or import_bucket_uri or import_schedule or import_from_timestamp + import_connection_name + or import_bucket_uri + or import_schedule + or import_from_timestamp + or import_format ): if not import_connection_name or not import_bucket_uri: raise MigrationParseError( @@ -531,6 +538,7 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: bucket_uri=import_bucket_uri, schedule=import_schedule, from_timestamp=import_from_timestamp, + import_format=import_format, ) if kafka and (imported or dynamodb): diff --git a/src/tinybird_sdk/migrate/types.py b/src/tinybird_sdk/migrate/types.py index 7363bdb..5b9650e 100644 --- a/src/tinybird_sdk/migrate/types.py +++ b/src/tinybird_sdk/migrate/types.py @@ -63,6 +63,7 @@ class DatasourceS3Model: bucket_uri: str schedule: str | None = None from_timestamp: str | None = None + import_format: str | None = None @dataclass(frozen=True, slots=True) @@ -71,6 +72,7 @@ class DatasourceGCSModel: bucket_uri: str schedule: str | None = None from_timestamp: str | None = None + import_format: str | None = None @dataclass(frozen=True, slots=True) diff --git a/src/tinybird_sdk/schema/datasource.py b/src/tinybird_sdk/schema/datasource.py index c67a214..c9131b6 100644 --- a/src/tinybird_sdk/schema/datasource.py +++ b/src/tinybird_sdk/schema/datasource.py @@ -57,6 +57,7 @@ class S3Config: bucket_uri: str schedule: str | None = None from_timestamp: str | None = None + import_format: str | None = None @dataclass(frozen=True, slots=True) @@ -65,6 +66,7 @@ class GCSConfig: bucket_uri: str schedule: str | None = None from_timestamp: str | None = None + import_format: str | None = None @dataclass(frozen=True, slots=True) diff --git a/tests/test_s3_import_format.py b/tests/test_s3_import_format.py new file mode 100644 index 0000000..0550c1c --- /dev/null +++ b/tests/test_s3_import_format.py @@ -0,0 +1,113 @@ +from __future__ import annotations + +from tinybird_sdk import ( + define_datasource, + define_gcs_connection, + define_s3_connection, + t, +) +from tinybird_sdk.generator.datasource import generate_datasource +from tinybird_sdk.migrate.emit_ts import emit_migration_file_content +from tinybird_sdk.migrate.parse_connection import parse_connection_file +from tinybird_sdk.migrate.parse_datasource import parse_datasource_file +from tinybird_sdk.migrate.types import ResourceFile + + +def test_s3_import_emits_import_format_when_set() -> None: + s3 = define_s3_connection("landing_s3", {"region": "us-east-1", "arn": "arn:aws:iam::1:role/r"}) + datasource = define_datasource( + "events", + { + "schema": {"id": t.string()}, + "s3": { + "connection": s3, + "bucket_uri": "s3://bucket/events/*.log", + "import_format": "ndjson", + }, + }, + ) + + content = generate_datasource(datasource).content + assert "IMPORT_BUCKET_URI s3://bucket/events/*.log" in content + assert 'IMPORT_FORMAT "ndjson"' in content + + +def test_s3_import_omits_import_format_when_unset() -> None: + s3 = define_s3_connection("landing_s3", {"region": "us-east-1", "arn": "arn:aws:iam::1:role/r"}) + datasource = define_datasource( + "events", + { + "schema": {"id": t.string()}, + "s3": {"connection": s3, "bucket_uri": "s3://bucket/events/*.ndjson"}, + }, + ) + + assert "IMPORT_FORMAT" not in generate_datasource(datasource).content + + +def test_gcs_import_emits_import_format_when_set() -> None: + gcs = define_gcs_connection("landing_gcs", {"service_account_credentials_json": "{}"}) + datasource = define_datasource( + "events_gcs", + { + "schema": {"id": t.string()}, + "gcs": { + "connection": gcs, + "bucket_uri": "gs://bucket/events/*.log", + "import_format": "ndjson", + }, + }, + ) + + assert 'IMPORT_FORMAT "ndjson"' in generate_datasource(datasource).content + + +def test_parse_datasource_reads_import_format() -> None: + resource = ResourceFile( + kind="datasource", + file_path="datasources/events.datasource", + absolute_path="/x/datasources/events.datasource", + name="events", + content=( + "SCHEMA >\n" + " `id` String `json:$.id`\n" + "\n" + "IMPORT_CONNECTION_NAME 'landing_s3'\n" + "IMPORT_BUCKET_URI 's3://bucket/events/*.log'\n" + 'IMPORT_FORMAT "ndjson"\n' + ), + ) + + model = parse_datasource_file(resource) + assert model.s3 is not None + assert model.s3.import_format == "ndjson" + + +def test_emit_ts_round_trip_for_s3_import_format() -> None: + connection_resource = ResourceFile( + kind="connection", + file_path="connections/landing_s3.connection", + absolute_path="/x/connections/landing_s3.connection", + name="landing_s3", + content='TYPE s3\nS3_REGION us-east-1\nS3_ARN "arn:aws:iam::1:role/r"\n', + ) + datasource_resource = ResourceFile( + kind="datasource", + file_path="datasources/events.datasource", + absolute_path="/x/datasources/events.datasource", + name="events", + content=( + "SCHEMA >\n" + " `id` String `json:$.id`\n" + "\n" + "IMPORT_CONNECTION_NAME 'landing_s3'\n" + "IMPORT_BUCKET_URI 's3://bucket/events/*.log'\n" + 'IMPORT_FORMAT "ndjson"\n' + ), + ) + + connection = parse_connection_file(connection_resource) + datasource = parse_datasource_file(datasource_resource) + + output = emit_migration_file_content([connection, datasource]) + assert "'import_format': \"ndjson\"" in output