-
Notifications
You must be signed in to change notification settings - Fork 128
Autorefresh for storm Noaa #1682
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @Megha18jain, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request establishes an automated and robust pipeline for ingesting NOAA storm event data. It covers the entire lifecycle from fetching raw data via FTP, efficiently organizing it into smaller, processable units, to cleaning and transforming it into a structured format. The inclusion of a manifest file ensures that this critical data source can be regularly updated, providing timely and accurate storm information. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new data import pipeline for NOAA storm data, including scripts for downloading, sharding, and processing. The implementation is comprehensive and includes good features like parallel processing and download retries. However, I've identified several critical issues related to data correctness and schema mapping, such as typos in the TMCF and CSV header, a bug that would cause crop_damage to be stored as a tuple, and an issue preventing the script from correctly skipping already downloaded files. Addressing these is crucial for the pipeline to function correctly. I've also included some suggestions to improve code robustness and maintainability.
| local_gz_path = os.path.join(data_dir, filename) | ||
| local_csv_path = local_gz_path[:-3] # Remove .gz extension | ||
|
|
||
| if local_csv_path in existing_files: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check for existing files is incorrect. local_csv_path is a full file path (e.g., noaa_storm_data/some_file.csv), while existing_files contains only file basenames (e.g., {'some_file.csv'}). The in check will therefore always evaluate to false, causing files to be re-downloaded on every run. You should check against the filename without the .gz extension.
| if local_csv_path in existing_files: | |
| if filename[:-3] in existing_files: |
| Node: E:storm_noaa->E0 | ||
| dcid: C:storm_noaa->stormEpisode | ||
| typeOf: dcs:StormEpisode | ||
| name: C:storm_noaa->stromEpisodeName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| indirectInjuries C:storm_noaa->indirectInjuries | ||
| directDeaths C:storm_noaa->directDeaths |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| maxClassification: C:storm_noaa->maxClassification | ||
| lengthTraveled: C:storm_noaa->lengthTraveled | ||
| width: C:storm_noaa->width | ||
| lengthTraveled: C:storm_noaa->lengthTraveled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| startLocation, endLocation = start_loc_str, end_loc_str | ||
|
|
||
| property_damage = cost_to_int(row.get("DAMAGE_PROPERTY", "0")) | ||
| crop_damage = cost_to_int(row.get("DAMAGE_CROPS", "0")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A trailing comma at the end of this line causes crop_damage to be a tuple (e.g., (1000,)) instead of an integer. This will result in incorrectly formatted data in the output CSV. Please remove the comma.
| crop_damage = cost_to_int(row.get("DAMAGE_CROPS", "0")), | |
| crop_damage = cost_to_int(row.get("DAMAGE_CROPS", "0")) |
| property_damage_value = f"[USDollar {property_damage}]" if property_damage else "" | ||
| crop_damage_value = f"[USDollar {crop_damage}]" if crop_damage else "" | ||
|
|
||
| unit = "MilesPerHour" if row.get("MAGNitude_TYPE") else "Inch" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a typo in the dictionary key: MAGNitude_TYPE should be MAGNITUDE_TYPE. This will cause row.get("MAGNitude_TYPE") to always return None, leading to the unit variable being incorrectly assigned in many cases.
| unit = "MilesPerHour" if row.get("MAGNitude_TYPE") else "Inch" | |
| unit = "MilesPerHour" if row.get("MAGNITUDE_TYPE") else "Inch" |
| try: | ||
| # Check if temp dir is empty before trying to remove it | ||
| if not os.listdir(FLAGS.temp_dir): | ||
| os.rmdir(FLAGS.temp_dir) | ||
| logging.info(f"Successfully removed temporary directory {FLAGS.temp_dir}") | ||
| except OSError as e: | ||
| logging.error(f"Error removing temporary directory {FLAGS.temp_dir}: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for cleaning up the temporary directory can be simplified and made more robust. Instead of checking if the directory is empty before removing it (which is prone to race conditions), you can use shutil.rmtree to recursively delete the directory. This is safer and handles cases where some temporary files might not have been deleted due to errors.
| try: | |
| # Check if temp dir is empty before trying to remove it | |
| if not os.listdir(FLAGS.temp_dir): | |
| os.rmdir(FLAGS.temp_dir) | |
| logging.info(f"Successfully removed temporary directory {FLAGS.temp_dir}") | |
| except OSError as e: | |
| logging.error(f"Error removing temporary directory {FLAGS.temp_dir}: {e}") | |
| try: | |
| shutil.rmtree(FLAGS.temp_dir) | |
| logging.info(f"Successfully removed temporary directory {FLAGS.temp_dir}") | |
| except OSError as e: | |
| logging.error(f"Error removing temporary directory {FLAGS.temp_dir}: {e}") |
| if shard_key not in shard_files: | ||
| shard_files[shard_key] = open(out_path, 'w', newline='') | ||
| writer = csv.writer(shard_files[shard_key]) | ||
| writer.writerow(header) | ||
| shard_files[shard_key].writer = writer # Attach writer to file handle | ||
| logging.info(f"Created shard file: {out_path}") | ||
|
|
||
| shard_files[shard_key].writer.writerow(row) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Attaching a writer attribute to a file object is an unconventional pattern that can be confusing for future maintenance. A more standard and clearer approach would be to store a tuple of (file_handle, writer) in your dictionary. This makes the code easier to understand and maintain.
You would also need to update how you close the files later on line 78 to something like for f, _ in shard_files.values(): f.close().
| if shard_key not in shard_files: | |
| shard_files[shard_key] = open(out_path, 'w', newline='') | |
| writer = csv.writer(shard_files[shard_key]) | |
| writer.writerow(header) | |
| shard_files[shard_key].writer = writer # Attach writer to file handle | |
| logging.info(f"Created shard file: {out_path}") | |
| shard_files[shard_key].writer.writerow(row) | |
| if shard_key not in shard_files: | |
| outfile = open(out_path, 'w', newline='') | |
| writer = csv.writer(outfile) | |
| writer.writerow(header) | |
| shard_files[shard_key] = (outfile, writer) | |
| logging.info(f"Created shard file: {out_path}") | |
| shard_files[shard_key][1].writerow(row) |
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
No description provided.