Skip to content

SQLAlchemy bulk insert is executed per row #602

@varvarvarvar

Description

@varvarvarvar

Expected behavior

I'm using sqlalchemy@2.0.49 to execute bulk insert statement through trino. I expect bulk insert of 1000 rows to be executed in one query, instead there're 1000 separate queries inserting 1 row.

with engine.connect() as conn:
    rows = [
        {"probe_id": i, "probe_name": random.choice(string.ascii_letters)}
        for i in range(1000)
    ]
    col_list = ", ".join(rows[0].keys())
    placeholders = ", ".join([f":{col}" for col in rows[0].keys()])
    quoted_target_table = f'"{_SCHEMA}"."{table_name}"'
    stmt = f"""
        INSERT INTO {quoted_target_table} ({col_list})
        VALUES ({placeholders})
    """
    conn.execute(text(stmt), rows)
    conn.commit()

Actual behavior

Bulk insert of 1000 rows results in 1000 queries each performing 1 insert.

Steps To Reproduce

import random
import string
import uuid

from sqlalchemy import create_engine, text
from sqlalchemy.engine import URL, Engine

from mysettings import trino_settings

_CATALOG = "test_catalog"
_SCHEMA = "test_schema"


def _build_engine() -> Engine:
    url = URL.create(
        "trino",
        username=trino_settings.trino_user,
        host=trino_settings.trino_host,
        port=trino_settings.trino_port,
        database=f"{_CATALOG}/{_SCHEMA}",
    )
    return create_engine(
        url,
        pool_pre_ping=True,
        connect_args={"http_scheme": trino_settings.trino_http_scheme},
    )


def main(engine: Engine, table_name: str) -> None:
    with engine.connect() as conn:
        conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{_CATALOG}"."{_SCHEMA}"'))
        conn.commit()

        conn.execute(
            text(
                f'CREATE TABLE IF NOT EXISTS "{_SCHEMA}"."{table_name}" ('
                '"probe_id" INTEGER, "probe_name" VARCHAR)'
            )
        )
        conn.commit()

        rows = [
            {"probe_id": i, "probe_name": random.choice(string.ascii_letters)}
            for i in range(1000)
        ]
        col_list = ", ".join(rows[0].keys())
        placeholders = ", ".join([f":{col}" for col in rows[0].keys()])
        quoted_target_table = f'"{_SCHEMA}"."{table_name}"'
        stmt = f"""
            INSERT INTO {quoted_target_table} ({col_list})
            VALUES ({placeholders})
        """
        conn.execute(text(stmt), rows)
        conn.commit()

        count_result = conn.execute(
            text(f'SELECT COUNT(*) FROM "{_SCHEMA}"."{table_name}"')
        )
        row_count = int(count_result.scalar_one())
        conn.commit()
        print(f"Rows after insert: {row_count}")
        assert row_count == len(rows)


if __name__ == "__main__":
    engine = _build_engine()
    probe_suffix = uuid.uuid4().hex[:10]
    print(f"Running queries for table ID {probe_suffix}")
    table_name = f"tmp_insert_probe_{probe_suffix}"
    main(engine=engine, table_name=table_name)

Log output

No response

Operating System

MacOS Tahoe 26.3.1(a)

Trino Python client version

0.337.0

Trino Server version

479

Python version

3.14.3

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions