Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Changelog

## 1.0.0

### Breaking Changes

- **`read_dlo()` and `read_dmo()` now return DataFrames with all-lowercase column names.**

Column names returned by both `QueryAPIDataCloudReader` and `SFCLIDataCloudReader` are now lowercased to match the column names produced by the deployed Data Cloud environment (e.g., `unitprice__c` instead of `UnitPrice__c`).

**Why:** In the deployed environment, column names are normalized to lowercase by the underlying Iceberg metadata layer. The local SDK previously returned the original API casing, causing "column does not exist" errors when scripts were deployed. This change aligns local behavior with the cloud.

**Migration:** Update any column references in your local scripts to use lowercase:

```python
# Before
df.withColumn("Description__c", upper(col("Description__c")))
df.drop("KQ_Id__c")
df["UnitPrice__c"]

# After
df.withColumn("description__c", upper(col("description__c")))
df.drop("kq_id__c")
df["unitprice__c"]
```

Scripts already running in Data Cloud are unaffected — the cloud always returned lowercase column names.
2 changes: 1 addition & 1 deletion src/datacustomcode/io/reader/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,5 @@ def _pandas_to_spark_schema(
spark_type = TimestampType()
else:
spark_type = PANDAS_TYPE_MAPPING.get(str(dtype), StringType())
fields.append(StructField(column, spark_type, nullable))
fields.append(StructField(column.lower(), spark_type, nullable))
return StructType(fields)
6 changes: 3 additions & 3 deletions src/datacustomcode/templates/script/payload/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ def main():
df = client.read_dlo("Account_std__dll")

# Perform transformations on the DataFrame
df_upper1 = df.withColumn("Description__c", upper(col("Description__c")))
df_upper1 = df.withColumn("description__c", upper(col("description__c")))

# Drop specific columns related to relationships
df_upper1 = df_upper1.drop("SfdcOrganizationId__c")
df_upper1 = df_upper1.drop("KQ_Id__c")
df_upper1 = df_upper1.drop("sfdcorganizationid__c")
df_upper1 = df_upper1.drop("kq_id__c")

# Save the transformed DataFrame
dlo_name = "Account_std_copy__dll"
Expand Down
44 changes: 42 additions & 2 deletions tests/io/reader/test_query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ def test_pandas_to_spark_schema_nullable(self):
schema = _pandas_to_spark_schema(df, nullable=False)
assert not schema.fields[0].nullable

def test_pandas_to_spark_schema_lowercases_columns(self):
"""Column names from the API are lowercased to match Data Cloud."""
df = pd.DataFrame({"UnitPrice__c": [1.0], "Quantity__c": [2], "Name__c": ["a"]})
schema = _pandas_to_spark_schema(df)
assert [f.name for f in schema.fields] == [
"unitprice__c",
"quantity__c",
"name__c",
]

def test_pandas_to_spark_schema_already_lowercase_is_idempotent(self):
"""Already-lowercase column names are returned unchanged."""
df = pd.DataFrame({"unitprice__c": [1.0], "quantity__c": [2]})
schema = _pandas_to_spark_schema(df)
assert [f.name for f in schema.fields] == ["unitprice__c", "quantity__c"]

def test_pandas_to_spark_schema_datetime_types(self):
"""Test conversion of pandas datetime types to Spark TimestampType."""

Expand Down Expand Up @@ -147,8 +163,8 @@ def mock_spark_session(self):

@pytest.fixture
def mock_pandas_dataframe(self):
"""Create a sample pandas DataFrame for testing."""
return pd.DataFrame({"col1": [1, 2], "col2": ["a", "b"]})
"""Sample pandas DataFrame with PascalCase columns, as the QueryAPI returns."""
return pd.DataFrame({"Col1__c": [1, 2], "Col2__c": ["a", "b"]})

@pytest.fixture
def mock_connection(self, mock_pandas_dataframe):
Expand Down Expand Up @@ -301,3 +317,27 @@ def test_read_dmo_with_custom_row_limit(
mock_connection.get_pandas_dataframe.assert_called_once_with(
SQL_QUERY_TEMPLATE.format("test_dmo", 25)
)

def test_read_dlo_schema_is_lowercase(
self, reader_without_init, mock_connection, mock_pandas_dataframe
):
"""read_dlo returns a schema with all-lowercase field names even when the
QueryAPI returns PascalCase column names."""
reader_without_init._conn = mock_connection

reader_without_init.read_dlo("test_dlo")

_, schema_arg = reader_without_init.spark.createDataFrame.call_args[0]
assert all(f.name == f.name.lower() for f in schema_arg.fields)

def test_read_dmo_schema_is_lowercase(
self, reader_without_init, mock_connection, mock_pandas_dataframe
):
"""read_dmo returns a schema with all-lowercase field names even when the
QueryAPI returns PascalCase column names."""
reader_without_init._conn = mock_connection

reader_without_init.read_dmo("test_dmo")

_, schema_arg = reader_without_init.spark.createDataFrame.call_args[0]
assert all(f.name == f.name.lower() for f in schema_arg.fields)
14 changes: 13 additions & 1 deletion tests/io/reader/test_sf_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ def reader(self):

@pytest.fixture
def sample_df(self):
return pd.DataFrame({"id": [1, 2], "name": ["a", "b"]})
"""DataFrame with PascalCase columns, as the REST API metadata returns."""
return pd.DataFrame({"Id__c": [1, 2], "Name__c": ["a", "b"]})

@pytest.mark.parametrize(
"method,obj_name",
Expand Down Expand Up @@ -348,6 +349,17 @@ def test_auto_infers_schema_when_none_given(self, reader, sample_df, method):
_, schema_arg = reader.spark.createDataFrame.call_args[0]
assert isinstance(schema_arg, StructType)

@pytest.mark.parametrize("method", ["read_dlo", "read_dmo"])
def test_auto_infers_schema_lowercases_pascal_case_columns(
self, reader, sample_df, method
):
"""Schema is lowercased so local results match Data Cloud column names."""
with patch.object(reader, "_execute_query", return_value=sample_df):
getattr(reader, method)("SomeObj")

_, schema_arg = reader.spark.createDataFrame.call_args[0]
assert all(f.name == f.name.lower() for f in schema_arg.fields)

@pytest.mark.parametrize("method", ["read_dlo", "read_dmo"])
def test_uses_provided_schema(self, reader, sample_df, method):
from pyspark.sql.types import (
Expand Down
Loading