-
Notifications
You must be signed in to change notification settings - Fork 75
[SYNPY-1749]Allow quote, apostrophe and ellipsis in store_row_async #1316
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
3feba11
1c68dac
4a29a16
3ecb6ec
af989c0
4d06d3a
e1b20dc
7ef7110
a4913a6
98689d3
a0af1b6
5002bd6
c874fe4
dab80f0
8644201
3412534
7db7c85
8a75043
d00b30b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,7 @@ | |
| from io import BytesIO | ||
| from typing import Any, Dict, List, Optional, Protocol, Tuple, Union | ||
|
|
||
| import pandas as pd | ||
| from tqdm import tqdm | ||
| from tqdm.contrib.logging import logging_redirect_tqdm | ||
| from typing_extensions import Self | ||
|
|
@@ -138,9 +139,10 @@ def row_labels_from_rows(rows: List[Row]) -> List[Row]: | |
| ) | ||
|
|
||
|
|
||
| def convert_dtypes_to_json_serializable(df): | ||
| def convert_dtypes_to_json_serializable(df) -> pd.DataFrame: | ||
| """ | ||
| Convert the dtypes of the int64 and float64 columns to object columns which are JSON serializable types. | ||
| Replace both Ellipsis and pandas NA within nested structures which are not JSON serializable types. | ||
| Also, convert the ROW_ID, ROW_VERSION, and ROW_ID.1 columns to int columns which are JSON serializable types. | ||
| Arguments: | ||
| df: The dataframe to convert the dtypes of. | ||
|
|
@@ -163,16 +165,63 @@ def convert_dtypes_to_json_serializable(df): | |
| "datetime_list_col": [[datetime(2021, 1, 1), datetime(2021, 1, 2), datetime(2021, 1, 3)], [datetime(2021, 1, 4), datetime(2021, 1, 5), datetime(2021, 1, 6)], None, [datetime(2021, 1, 7), datetime(2021, 1, 8), datetime(2021, 1, 9)]], | ||
| "entityid_list_col": [["syn123", "syn456", None], ["syn101", "syn102", "syn103"], None, ["syn104", "syn105", "syn106"]], | ||
| "userid_list_col": [["user1", "user2", "user3"], ["user4", "user5", None], None, ["user7", "user8", "user9"]], | ||
| "json_col_with_quotes": [ | ||
| { | ||
| "id": 1, | ||
| "description": 'Text with "quotes" in the description field', | ||
| "references": [] | ||
| }, | ||
| { | ||
| "id": 2, | ||
| "description": 'Another description with "quoted text" here', | ||
| "references": ["ref1", "ref2"] | ||
| }, | ||
| { | ||
| "id": 3, | ||
| "description": 'Description containing "multiple" quoted "words"', | ||
| "references": [...] | ||
| }, | ||
| { | ||
| "id": 4, | ||
| "description": 'Description containing apostrophes sage\'s', | ||
| "references": [...] | ||
| } | ||
|
|
||
| ], | ||
| }).convert_dtypes() | ||
| df = convert_dtypes_to_json_serializable(df) | ||
| print(df) | ||
| """ | ||
| import pandas as pd | ||
|
|
||
| def _serialize_json_value(x): | ||
| if isinstance(x, (list, dict)): | ||
|
|
||
| def _reformat_special_values(obj): | ||
| if obj is ...: | ||
| return "..." | ||
| if obj is pd.NA: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried: and then I saw: As you could see, |
||
| return None | ||
| if isinstance(obj, dict): | ||
| return {k: _reformat_special_values(v) for k, v in obj.items()} | ||
| if isinstance(obj, list): | ||
| return [_reformat_special_values(item) for item in obj] | ||
| return obj | ||
|
|
||
| cleaned_x = _reformat_special_values(x) | ||
| return cleaned_x | ||
| # Handle standalone ellipsis | ||
| if x is ...: | ||
| return "..." | ||
| return x | ||
|
|
||
| for col in df.columns: | ||
| df[col] = ( | ||
| df[col].replace({pd.NA: None}).astype(object) | ||
| ) # this will convert the int64 and float64 columns to object columns | ||
| sample_values = df[col].dropna() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I tried ^this and all the unit tests could pass. |
||
| if len(sample_values): | ||
| df[col] = df[col].apply(_serialize_json_value) | ||
| # restore the original values of the column especially for the int64 and float64 columns since apply function changes the dtype | ||
| df[col] = df[col].convert_dtypes() | ||
| df[col] = df[col].replace({pd.NA: None}).astype(object) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the
I think it might be cleaner to have
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As an example, the I am not sure if this is intended. |
||
|
|
||
| # Convert ROW_ prefixed columns back to int (like ROW_ID, ROW_VERSION) | ||
| if col in [ | ||
| "ROW_ID", | ||
|
|
@@ -2809,7 +2858,6 @@ async def main(): | |
| timeout=timeout, | ||
| synapse_client=synapse_client, | ||
| ) | ||
|
|
||
| if download_location: | ||
| return csv_path | ||
|
|
||
|
|
@@ -3168,7 +3216,9 @@ async def store_rows_async( | |
| insert_size_bytes: int = 900 * MB, | ||
| csv_table_descriptor: Optional[CsvTableDescriptor] = None, | ||
| read_csv_kwargs: Optional[Dict[str, Any]] = None, | ||
| to_csv_kwargs: Optional[Dict[str, Any]] = None, | ||
| to_csv_kwargs: Optional[Dict[str, Any]] = { | ||
| "escapechar": "\\", | ||
| }, | ||
| job_timeout: int = 600, | ||
| synapse_client: Optional[Synapse] = None, | ||
| ) -> None: | ||
|
|
@@ -3387,7 +3437,8 @@ async def store_rows_async( | |
| function when writing the data to a CSV file. This is only used when | ||
| the `values` argument is a Pandas DataFrame. See | ||
| <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html> | ||
| for complete list of supported arguments. | ||
| for complete list of supported arguments. The default is {"escapechar": "\\"}. | ||
| Ensure escapechar="\\" is set, along with other relevant kwargs, if your data contains double quotes. | ||
|
|
||
| job_timeout: The maximum amount of time to wait for a job to complete. | ||
| This is used when inserting, and updating rows of data. Each individual | ||
|
|
@@ -3786,6 +3837,7 @@ async def _stream_and_update_from_df( | |
| "AppendableRowSetRequest", | ||
| ] | ||
| ] = None, | ||
| to_csv_kwargs: Optional[Dict[str, Any]] = None, | ||
| ) -> None: | ||
| """ | ||
| Organize the process of reading in and uploading parts of the DataFrame we are | ||
|
|
@@ -3816,6 +3868,8 @@ async def _stream_and_update_from_df( | |
| being uploaded. | ||
| changes: Additional changes to the table that should | ||
| execute within this transaction. | ||
| to_csv_kwargs: Additional arguments to pass to the `pd.DataFrame.to_csv` | ||
| function when writing the data to a CSV file. | ||
| """ | ||
| file_handle_id = await multipart_upload_dataframe_async( | ||
| syn=client, | ||
|
|
@@ -3828,6 +3882,7 @@ async def _stream_and_update_from_df( | |
| line_start=line_start, | ||
| line_end=line_end, | ||
| bytes_to_prepend=header, | ||
| to_csv_kwargs=to_csv_kwargs, | ||
| ) | ||
| # We are using a semaphore here because large tables can take a very long time | ||
| # for the update to complete. This will allow us to wait for the update to | ||
|
|
@@ -4031,8 +4086,8 @@ async def _chunk_and_upload_df( | |
| to_csv_kwargs: Additional arguments to pass to the `pd.DataFrame.to_csv` | ||
| function when writing the data to a CSV file. | ||
| """ | ||
| df = convert_dtypes_to_json_serializable(df) | ||
danlu1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # Loop over the rows of the DF to determine the size/boundries we'll be uploading | ||
|
|
||
| chunks_to_upload = [] | ||
| size_of_chunk = 0 | ||
| buffer = BytesIO() | ||
|
|
@@ -4142,6 +4197,7 @@ async def _chunk_and_upload_df( | |
| header=header_line, | ||
| changes=changes, | ||
| file_suffix=f"{part}", | ||
| to_csv_kwargs=to_csv_kwargs, | ||
| ) | ||
| ) | ||
| ) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need to call
.convert_dtypes()here? sinceconvert_dtypes_to_json_serializablealso callsconvert_dtypesin one of the steps?