Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project follows [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- S3 and GCS import data sources accept an optional `import_format` (`csv`/`ndjson`/`parquet`), emitted as `IMPORT_FORMAT` in the generated `.datasource` and round-tripped by the datafile parser and migration emitter. Lets you ingest files whose extension does not imply the format (for example NDJSON delivered as `.log`), where the connector would otherwise fail with `Format not supported`.

## [0.1.11] - 2026-06-15

### Changed
Expand Down
2 changes: 2 additions & 0 deletions src/tinybird_sdk/generator/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ def _generate_import_config(import_config: Any) -> str:
lines.append(f"IMPORT_SCHEDULE {import_config.schedule}")
if import_config.from_timestamp:
lines.append(f"IMPORT_FROM_TIMESTAMP {import_config.from_timestamp}")
if getattr(import_config, "import_format", None):
lines.append(f'IMPORT_FORMAT "{import_config.import_format}"')
return "\n".join(lines)


Expand Down
4 changes: 4 additions & 0 deletions src/tinybird_sdk/migrate/emit_ts.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ def _emit_datasource(ds: DatasourceModel) -> str:
lines.append(f" 'schedule': {_escape_string(ds.s3.schedule)},")
if ds.s3.from_timestamp:
lines.append(f" 'from_timestamp': {_escape_string(ds.s3.from_timestamp)},")
if ds.s3.import_format:
lines.append(f" 'import_format': {_escape_string(ds.s3.import_format)},")
lines.append(" },")

if ds.gcs:
Expand All @@ -260,6 +262,8 @@ def _emit_datasource(ds: DatasourceModel) -> str:
lines.append(f" 'schedule': {_escape_string(ds.gcs.schedule)},")
if ds.gcs.from_timestamp:
lines.append(f" 'from_timestamp': {_escape_string(ds.gcs.from_timestamp)},")
if ds.gcs.import_format:
lines.append(f" 'import_format': {_escape_string(ds.gcs.import_format)},")
lines.append(" },")

if ds.dynamodb:
Expand Down
12 changes: 10 additions & 2 deletions src/tinybird_sdk/migrate/parse_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel:
import_bucket_uri: str | None = None
import_schedule: str | None = None
import_from_timestamp: str | None = None
import_format: str | None = None
import_table_arn: str | None = None
import_export_bucket: str | None = None

Expand Down Expand Up @@ -416,6 +417,8 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel:
import_schedule = parse_quoted_value(value)
elif key == "IMPORT_FROM_TIMESTAMP":
import_from_timestamp = parse_quoted_value(value)
elif key == "IMPORT_FORMAT":
import_format = parse_quoted_value(value)
elif key == "IMPORT_TABLE_ARN":
import_table_arn = parse_quoted_value(value)
elif key == "IMPORT_EXPORT_BUCKET":
Expand Down Expand Up @@ -502,7 +505,7 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel:
resource.name,
"IMPORT_CONNECTION_NAME, IMPORT_TABLE_ARN and IMPORT_EXPORT_BUCKET are required for DynamoDB imports.",
)
if import_bucket_uri or import_schedule or import_from_timestamp:
if import_bucket_uri or import_schedule or import_from_timestamp or import_format:
raise MigrationParseError(
resource.file_path,
"datasource",
Expand All @@ -517,7 +520,11 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel:

imported: DatasourceS3Model | None = None
if not dynamodb and (
import_connection_name or import_bucket_uri or import_schedule or import_from_timestamp
import_connection_name
or import_bucket_uri
or import_schedule
or import_from_timestamp
or import_format
):
if not import_connection_name or not import_bucket_uri:
raise MigrationParseError(
Expand All @@ -531,6 +538,7 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel:
bucket_uri=import_bucket_uri,
schedule=import_schedule,
from_timestamp=import_from_timestamp,
import_format=import_format,
)

if kafka and (imported or dynamodb):
Expand Down
2 changes: 2 additions & 0 deletions src/tinybird_sdk/migrate/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class DatasourceS3Model:
bucket_uri: str
schedule: str | None = None
from_timestamp: str | None = None
import_format: str | None = None


@dataclass(frozen=True, slots=True)
Expand All @@ -71,6 +72,7 @@ class DatasourceGCSModel:
bucket_uri: str
schedule: str | None = None
from_timestamp: str | None = None
import_format: str | None = None


@dataclass(frozen=True, slots=True)
Expand Down
2 changes: 2 additions & 0 deletions src/tinybird_sdk/schema/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class S3Config:
bucket_uri: str
schedule: str | None = None
from_timestamp: str | None = None
import_format: str | None = None


@dataclass(frozen=True, slots=True)
Expand All @@ -65,6 +66,7 @@ class GCSConfig:
bucket_uri: str
schedule: str | None = None
from_timestamp: str | None = None
import_format: str | None = None


@dataclass(frozen=True, slots=True)
Expand Down
113 changes: 113 additions & 0 deletions tests/test_s3_import_format.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
from __future__ import annotations

from tinybird_sdk import (
define_datasource,
define_gcs_connection,
define_s3_connection,
t,
)
from tinybird_sdk.generator.datasource import generate_datasource
from tinybird_sdk.migrate.emit_ts import emit_migration_file_content
from tinybird_sdk.migrate.parse_connection import parse_connection_file
from tinybird_sdk.migrate.parse_datasource import parse_datasource_file
from tinybird_sdk.migrate.types import ResourceFile


def test_s3_import_emits_import_format_when_set() -> None:
s3 = define_s3_connection("landing_s3", {"region": "us-east-1", "arn": "arn:aws:iam::1:role/r"})
datasource = define_datasource(
"events",
{
"schema": {"id": t.string()},
"s3": {
"connection": s3,
"bucket_uri": "s3://bucket/events/*.log",
"import_format": "ndjson",
},
},
)

content = generate_datasource(datasource).content
assert "IMPORT_BUCKET_URI s3://bucket/events/*.log" in content
assert 'IMPORT_FORMAT "ndjson"' in content


def test_s3_import_omits_import_format_when_unset() -> None:
s3 = define_s3_connection("landing_s3", {"region": "us-east-1", "arn": "arn:aws:iam::1:role/r"})
datasource = define_datasource(
"events",
{
"schema": {"id": t.string()},
"s3": {"connection": s3, "bucket_uri": "s3://bucket/events/*.ndjson"},
},
)

assert "IMPORT_FORMAT" not in generate_datasource(datasource).content


def test_gcs_import_emits_import_format_when_set() -> None:
gcs = define_gcs_connection("landing_gcs", {"service_account_credentials_json": "{}"})
datasource = define_datasource(
"events_gcs",
{
"schema": {"id": t.string()},
"gcs": {
"connection": gcs,
"bucket_uri": "gs://bucket/events/*.log",
"import_format": "ndjson",
},
},
)

assert 'IMPORT_FORMAT "ndjson"' in generate_datasource(datasource).content


def test_parse_datasource_reads_import_format() -> None:
resource = ResourceFile(
kind="datasource",
file_path="datasources/events.datasource",
absolute_path="/x/datasources/events.datasource",
name="events",
content=(
"SCHEMA >\n"
" `id` String `json:$.id`\n"
"\n"
"IMPORT_CONNECTION_NAME 'landing_s3'\n"
"IMPORT_BUCKET_URI 's3://bucket/events/*.log'\n"
'IMPORT_FORMAT "ndjson"\n'
),
)

model = parse_datasource_file(resource)
assert model.s3 is not None
assert model.s3.import_format == "ndjson"


def test_emit_ts_round_trip_for_s3_import_format() -> None:
connection_resource = ResourceFile(
kind="connection",
file_path="connections/landing_s3.connection",
absolute_path="/x/connections/landing_s3.connection",
name="landing_s3",
content='TYPE s3\nS3_REGION us-east-1\nS3_ARN "arn:aws:iam::1:role/r"\n',
)
datasource_resource = ResourceFile(
kind="datasource",
file_path="datasources/events.datasource",
absolute_path="/x/datasources/events.datasource",
name="events",
content=(
"SCHEMA >\n"
" `id` String `json:$.id`\n"
"\n"
"IMPORT_CONNECTION_NAME 'landing_s3'\n"
"IMPORT_BUCKET_URI 's3://bucket/events/*.log'\n"
'IMPORT_FORMAT "ndjson"\n'
),
)

connection = parse_connection_file(connection_resource)
datasource = parse_datasource_file(datasource_resource)

output = emit_migration_file_content([connection, datasource])
assert "'import_format': \"ndjson\"" in output
Loading