Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ jobs:
run: pip install uv

- name: Install package
run: uv sync --locked --all-extras --group ci --verbose
run: |
uv sync --all-extras --group ci --group dev --verbose

# BigQuery start
# - id: 'auth'
Expand All @@ -50,17 +51,22 @@ jobs:
# - name: 'Use gcloud CLI'
# run: "gcloud config configurations list"

- name: 'Create datadiff SA'
run: "echo '${{ secrets.BQ_SA }}' > bq_sa.json"

# BigQuery end

- name: Run unit tests
env:
DATADIFF_SNOWFLAKE_URI: '${{ secrets.DATADIFF_SNOWFLAKE_URI }}'
DATADIFF_PRESTO_URI: '${{ secrets.DATADIFF_PRESTO_URI }}'
DATADIFF_TRINO_URI: '${{ secrets.DATADIFF_TRINO_URI }}'
# DATADIFF_BIGQUERY_URI: '${{ secrets.DATADIFF_BIGQUERY_URI }}'
GOOGLE_APPLICATION_CREDENTIALS: 'bq_sa.json'
DATADIFF_BIGQUERY_URI: '${{ secrets.DATADIFF_BIGQUERY_URI }}'
DATADIFF_CLICKHOUSE_URI: 'clickhouse://clickhouse:Password1@localhost:9000/clickhouse'
DATADIFF_REDSHIFT_URI: '${{ secrets.DATADIFF_REDSHIFT_URI }}'
MOTHERDUCK_TOKEN: '${{ secrets.MOTHERDUCK_TOKEN }}'
run: |
chmod +x tests/waiting_for_stack_up.sh
./tests/waiting_for_stack_up.sh && TEST_ACROSS_ALL_DBS=0 uv run unittest-parallel -j 16
./tests/waiting_for_stack_up.sh
TEST_ACROSS_ALL_DBS=0 uv run unittest-parallel -j 16
13 changes: 10 additions & 3 deletions data_diff/databases/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,12 @@ def query(self, sql_ast: Union[Expr, Generator], res_type: type = None, log_mess
It's a cleaner approach than exposing cursors, but may not be enough in all cases.
"""

sql_code: Union[str, ThreadLocalInterpreter]

compiler = Compiler(self)

if self.is_closed:
raise ConnectError("This database connection is closed.")
if isinstance(sql_ast, Generator):
sql_code = ThreadLocalInterpreter(compiler, sql_ast)
elif isinstance(sql_ast, list):
Expand All @@ -973,8 +978,9 @@ def query(self, sql_ast: Union[Expr, Generator], res_type: type = None, log_mess
if res_type is None:
res_type = sql_ast.type
sql_code = self.compile(sql_ast)
if sql_code is SKIP:
return SKIP

if sql_code is SKIP or sql_code == "":
return QueryResult([]) # Return empty QueryResult if no-op

if log_message:
logger.debug("Running SQL (%s): %s \n%s", self.name, log_message, sql_code)
Expand All @@ -987,13 +993,14 @@ def query(self, sql_ast: Union[Expr, Generator], res_type: type = None, log_mess
for row in explain:
# Most returned a 1-tuple. Presto returns a string
if isinstance(row, tuple):
(row,) = row
(row) = row
logger.debug("EXPLAIN: %s", row)
answer = input("Continue? [y/n] ")
if answer.lower() not in ["y", "yes"]:
sys.exit(1)

res = self._query(sql_code)

if res_type is list:
return list(res)
elif res_type is int:
Expand Down
17 changes: 15 additions & 2 deletions data_diff/databases/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
UnknownColType,
Time,
Date,
TimestampTZ, # Added this import
)
from data_diff.databases.base import (
BaseDialect,
Expand Down Expand Up @@ -96,11 +97,20 @@ def to_string(self, s: str) -> str:
return f"cast({s} as string)"

def type_repr(self, t) -> str:
if isinstance(t, Timestamp) or isinstance(t, TimestampTZ): # BigQuery's TIMESTAMP type, does not accept precision
return "TIMESTAMP"
if isinstance(t, Datetime): # BigQuery's DATETIME type, does not accept precision
return "DATETIME"
if isinstance(t, Date):
return "DATE"
if isinstance(t, Time):
return "TIME"
try:
return {str: "STRING", float: "FLOAT64"}[t]
except KeyError:
return super().type_repr(t)


def parse_type(self, table_path: DbPath, info: RawColumnInfo) -> ColType:
col_type = super().parse_type(table_path, info)
if not isinstance(col_type, UnknownColType):
Expand Down Expand Up @@ -151,7 +161,7 @@ def to_comparable(self, value: str, coltype: ColType) -> str:
return super().to_comparable(value, coltype)

def set_timezone_to_utc(self) -> str:
raise NotImplementedError()
return ""

def parse_table_name(self, name: str) -> DbPath:
path = parse_table_name(name)
Expand Down Expand Up @@ -252,12 +262,15 @@ def __init__(self, project, *, dataset, bigquery_credentials=None, **kw) -> None
target_scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

self._client = bigquery.Client(project=project, credentials=credentials, **kw)
default_config = bigquery.QueryJobConfig(default_dataset=f"{project}.{dataset}")

self._client = bigquery.Client(project=project, credentials=credentials, default_query_job_config=default_config, **kw)
Comment thread
ramyamasani marked this conversation as resolved.
self.project = project
self.dataset = dataset

self.default_schema = dataset


def _normalize_returned_value(self, value):
if isinstance(value, bytes):
return value.decode()
Expand Down
2 changes: 1 addition & 1 deletion data_diff/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.11.1"
__version__ = "0.12.1"
3 changes: 3 additions & 0 deletions dev/dev.env
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ CLICKHOUSE_USER=clickhouse
CLICKHOUSE_PASSWORD=Password1
CLICKHOUSE_DB=clickhouse
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1

GOOGLE_APPLICATION_CREDENTIALS=bq.sa
DATADIFF_BIGQUERY_URI=bigquery://dms-analytics-v2-data-diff/datadiff_tests
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ services:
container_name: dd-clickhouse
image: clickhouse/clickhouse-server:21.12.3.32
restart: always
user: "101:101"
volumes:
- clickhouse-data:/var/lib/clickhouse:delegated
ulimits:
Expand Down
7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "data-diff"
version = "0.12.0"
version = "0.12.1"
description = "Command-line tool and Python library to efficiently diff rows across two different databases."
readme = "README.md"
requires-python = ">=3.9,<4.0"
Expand All @@ -23,12 +23,13 @@ classifiers = [
"Typing :: Typed",
]
dependencies = [
"setuptools>=69,<71",
"pydantic<2.0",
"dsnparse<0.2.0",
"click==8.1.7",
"rich",
"toml>=0.10.2",
"dbt-core>=1.0.0,<2.0.0",
"dbt-core>=1.7.0,<2.0.0",
"keyring",
"tabulate==0.9.0",
"urllib3<2",
Expand All @@ -50,6 +51,7 @@ trino = ["trino>=0.314.0"]
clickhouse = ["clickhouse-driver"]
vertica = ["vertica-python"]
duckdb = ["duckdb"]
bigquery = ["google-cloud-bigquery"]
all-dbs = [
"preql>=0.2.19",
"mysql-connector-python==8.0.29",
Expand All @@ -63,6 +65,7 @@ all-dbs = [
"clickhouse-driver",
"vertica-python",
"duckdb",
"google-cloud-bigquery",
]

[project.scripts]
Expand Down
6 changes: 5 additions & 1 deletion tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ def run_datadiff_cli(*args):
logging.error(e.stderr)
raise
if stderr:
raise Exception(stderr)
stderr_str = stderr.decode()
if "Traceback" in stderr_str or "Error" in stderr_str or "Exception" in stderr_str:
# Ignore FutureWarning
if "FutureWarning" not in stderr_str:
raise Exception(stderr)
return stdout.splitlines()


Expand Down
Loading
Loading