diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..1928786 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,27 @@ +# Changelog + +## 1.0.0 + +### Breaking Changes + +- **`read_dlo()` and `read_dmo()` now return DataFrames with all-lowercase column names.** + + Column names returned by both `QueryAPIDataCloudReader` and `SFCLIDataCloudReader` are now lowercased to match the column names produced by the deployed Data Cloud environment (e.g., `unitprice__c` instead of `UnitPrice__c`). + + **Why:** In the deployed environment, column names are normalized to lowercase by the underlying Iceberg metadata layer. The local SDK previously returned the original API casing, causing "column does not exist" errors when scripts were deployed. This change aligns local behavior with the cloud. + + **Migration:** Update any column references in your local scripts to use lowercase: + + ```python + # Before + df.withColumn("Description__c", upper(col("Description__c"))) + df.drop("KQ_Id__c") + df["UnitPrice__c"] + + # After + df.withColumn("description__c", upper(col("description__c"))) + df.drop("kq_id__c") + df["unitprice__c"] + ``` + + Scripts already running in Data Cloud are unaffected — the cloud always returned lowercase column names. diff --git a/src/datacustomcode/io/reader/utils.py b/src/datacustomcode/io/reader/utils.py index 737a76c..b8e65e6 100644 --- a/src/datacustomcode/io/reader/utils.py +++ b/src/datacustomcode/io/reader/utils.py @@ -49,5 +49,5 @@ def _pandas_to_spark_schema( spark_type = TimestampType() else: spark_type = PANDAS_TYPE_MAPPING.get(str(dtype), StringType()) - fields.append(StructField(column, spark_type, nullable)) + fields.append(StructField(column.lower(), spark_type, nullable)) return StructType(fields) diff --git a/src/datacustomcode/templates/script/payload/entrypoint.py b/src/datacustomcode/templates/script/payload/entrypoint.py index 9365fb9..10ba1d7 100644 --- a/src/datacustomcode/templates/script/payload/entrypoint.py +++ b/src/datacustomcode/templates/script/payload/entrypoint.py @@ -10,11 +10,11 @@ def main(): df = client.read_dlo("Account_std__dll") # Perform transformations on the DataFrame - df_upper1 = df.withColumn("Description__c", upper(col("Description__c"))) + df_upper1 = df.withColumn("description__c", upper(col("description__c"))) # Drop specific columns related to relationships - df_upper1 = df_upper1.drop("SfdcOrganizationId__c") - df_upper1 = df_upper1.drop("KQ_Id__c") + df_upper1 = df_upper1.drop("sfdcorganizationid__c") + df_upper1 = df_upper1.drop("kq_id__c") # Save the transformed DataFrame dlo_name = "Account_std_copy__dll" diff --git a/tests/io/reader/test_query_api.py b/tests/io/reader/test_query_api.py index 6bb8b5a..8e2e77a 100644 --- a/tests/io/reader/test_query_api.py +++ b/tests/io/reader/test_query_api.py @@ -60,6 +60,22 @@ def test_pandas_to_spark_schema_nullable(self): schema = _pandas_to_spark_schema(df, nullable=False) assert not schema.fields[0].nullable + def test_pandas_to_spark_schema_lowercases_columns(self): + """Column names from the API are lowercased to match Data Cloud.""" + df = pd.DataFrame({"UnitPrice__c": [1.0], "Quantity__c": [2], "Name__c": ["a"]}) + schema = _pandas_to_spark_schema(df) + assert [f.name for f in schema.fields] == [ + "unitprice__c", + "quantity__c", + "name__c", + ] + + def test_pandas_to_spark_schema_already_lowercase_is_idempotent(self): + """Already-lowercase column names are returned unchanged.""" + df = pd.DataFrame({"unitprice__c": [1.0], "quantity__c": [2]}) + schema = _pandas_to_spark_schema(df) + assert [f.name for f in schema.fields] == ["unitprice__c", "quantity__c"] + def test_pandas_to_spark_schema_datetime_types(self): """Test conversion of pandas datetime types to Spark TimestampType.""" @@ -147,8 +163,8 @@ def mock_spark_session(self): @pytest.fixture def mock_pandas_dataframe(self): - """Create a sample pandas DataFrame for testing.""" - return pd.DataFrame({"col1": [1, 2], "col2": ["a", "b"]}) + """Sample pandas DataFrame with PascalCase columns, as the QueryAPI returns.""" + return pd.DataFrame({"Col1__c": [1, 2], "Col2__c": ["a", "b"]}) @pytest.fixture def mock_connection(self, mock_pandas_dataframe): @@ -301,3 +317,27 @@ def test_read_dmo_with_custom_row_limit( mock_connection.get_pandas_dataframe.assert_called_once_with( SQL_QUERY_TEMPLATE.format("test_dmo", 25) ) + + def test_read_dlo_schema_is_lowercase( + self, reader_without_init, mock_connection, mock_pandas_dataframe + ): + """read_dlo returns a schema with all-lowercase field names even when the + QueryAPI returns PascalCase column names.""" + reader_without_init._conn = mock_connection + + reader_without_init.read_dlo("test_dlo") + + _, schema_arg = reader_without_init.spark.createDataFrame.call_args[0] + assert all(f.name == f.name.lower() for f in schema_arg.fields) + + def test_read_dmo_schema_is_lowercase( + self, reader_without_init, mock_connection, mock_pandas_dataframe + ): + """read_dmo returns a schema with all-lowercase field names even when the + QueryAPI returns PascalCase column names.""" + reader_without_init._conn = mock_connection + + reader_without_init.read_dmo("test_dmo") + + _, schema_arg = reader_without_init.spark.createDataFrame.call_args[0] + assert all(f.name == f.name.lower() for f in schema_arg.fields) diff --git a/tests/io/reader/test_sf_cli.py b/tests/io/reader/test_sf_cli.py index 3a94b0b..a9e4dff 100644 --- a/tests/io/reader/test_sf_cli.py +++ b/tests/io/reader/test_sf_cli.py @@ -311,7 +311,8 @@ def reader(self): @pytest.fixture def sample_df(self): - return pd.DataFrame({"id": [1, 2], "name": ["a", "b"]}) + """DataFrame with PascalCase columns, as the REST API metadata returns.""" + return pd.DataFrame({"Id__c": [1, 2], "Name__c": ["a", "b"]}) @pytest.mark.parametrize( "method,obj_name", @@ -348,6 +349,17 @@ def test_auto_infers_schema_when_none_given(self, reader, sample_df, method): _, schema_arg = reader.spark.createDataFrame.call_args[0] assert isinstance(schema_arg, StructType) + @pytest.mark.parametrize("method", ["read_dlo", "read_dmo"]) + def test_auto_infers_schema_lowercases_pascal_case_columns( + self, reader, sample_df, method + ): + """Schema is lowercased so local results match Data Cloud column names.""" + with patch.object(reader, "_execute_query", return_value=sample_df): + getattr(reader, method)("SomeObj") + + _, schema_arg = reader.spark.createDataFrame.call_args[0] + assert all(f.name == f.name.lower() for f in schema_arg.fields) + @pytest.mark.parametrize("method", ["read_dlo", "read_dmo"]) def test_uses_provided_schema(self, reader, sample_df, method): from pyspark.sql.types import (