-
Notifications
You must be signed in to change notification settings - Fork 130
Autorefresh for storm Noaa #1682
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
1802979
d57429d
7f4bb88
8e1789e
4d5cb23
8b78fde
331a855
b729667
8b5969c
89056a3
7e1847f
7b25643
f080d5e
1b4890d
d54355a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| # NOAA Storm Events Import | ||
|
|
||
| This directory contains scripts to download, process, and import storm event data from the National Oceanic and Atmospheric Administration (NOAA). | ||
|
|
||
| ## Scripts | ||
|
|
||
| - `download_script.py`: Downloads the raw storm data from the NOAA website. | ||
| - `process_noaa_data_withoutapi.py`: Processes the raw CSV files, cleans the data, and prepares it for geo-id resolution. | ||
| - `add_geoids_to_storm_data.py`: Adds GeoIDs to the processed storm data and generates the final cleaned CSV file for import. | ||
|
|
||
| ## Data | ||
|
|
||
| - `output/storm_noaa.tmcf`: Template MCF file for the storm data. | ||
| - `cleaned_storm_data_with_geoids.csv`: The final cleaned and processed data with GeoIDs, ready for import. This is generated by `add_geoids_to_storm_data.py`. | ||
|
|
||
| ## Manifest | ||
|
|
||
| - `manifest.json`: Contains the import specifications for the Data Commons import system. | ||
|
|
||
| ## Running the import | ||
|
|
||
| To run the full import process, execute the scripts in the following order: | ||
|
|
||
| 1. `python3 download_script.py` | ||
| 2. `python3 process_noaa_data_withoutapi.py` | ||
| 3. `python3 add_geoids_to_storm_data.py` | ||
|
|
||
| ## Autorefresh | ||
|
|
||
| This import is configured for semi-monthly autorefresh. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,194 @@ | ||
|
|
||
| import csv | ||
| import os | ||
| import sys | ||
| import json | ||
| import re | ||
| from absl import logging | ||
| from absl import app | ||
|
|
||
| # Get the directory of the current script to define a project root | ||
| _MODULE_DIR = os.path.dirname(os.path.abspath(__file__)) | ||
| PROJECT_ROOT = os.path.abspath(os.path.join(_MODULE_DIR, '..', '..', '..')) | ||
|
|
||
| # Add project root and data/util to sys.path for module imports | ||
| sys.path.insert(0, PROJECT_ROOT) | ||
| sys.path.insert(0, os.path.join(PROJECT_ROOT, 'data', 'util')) | ||
|
|
||
| try: | ||
| # Import the file_util module for GCS operations | ||
| from data.util import file_util | ||
| from data.util import latlng2place_mapsapi | ||
| except ImportError as e: | ||
| logging.fatal(f"Failed to import a required utility module: {e}. Ensure that data/util/file_util.py and data/util/latlng2place_mapsapi.py exist.") | ||
| sys.exit(1) | ||
|
|
||
| # --- GCS Configuration --- | ||
| GCS_BUCKET_NAME = "unresolved_mcf" | ||
| GCS_INPUT_PREFIX = "storms/latest" | ||
| LOCATION_MASTER_FILENAME = "location_full_list.csv" | ||
| API_KEY_FILENAME = "api_key.json" | ||
|
|
||
| # Define the local working directory for downloaded files | ||
| LOCAL_WORKING_DIR = os.path.join(_MODULE_DIR, 'gcs_input') | ||
|
|
||
|
|
||
| def setup_local_directories(directory_path: str): | ||
| """Ensures that the specified local directory exists.""" | ||
| try: | ||
| os.makedirs(directory_path, exist_ok=True) | ||
| logging.info(f"Created/Ensured local working directory: {directory_path}") | ||
| except OSError as e: | ||
| logging.fatal(f"Error creating directory {directory_path}: {e}") | ||
|
|
||
|
|
||
| def download_gcs_file( | ||
| gcs_bucket: str, | ||
| gcs_prefix: str, | ||
| filename: str, | ||
| local_target_dir: str | ||
| ) -> str: | ||
| """ | ||
| Downloads a file from GCS using file_util. | ||
| Returns the local path to the downloaded file. | ||
| """ | ||
| logging.info(f"--- Starting GCS File Download for {filename} ---") | ||
| gcs_source_path = f"gs://{gcs_bucket}/{gcs_prefix}/{filename}" | ||
| local_destination_path = os.path.join(local_target_dir, filename) | ||
|
|
||
| try: | ||
| file_util.file_copy(gcs_source_path, local_destination_path) | ||
| logging.info(f"Successfully copied '{gcs_source_path}' to '{local_destination_path}'") | ||
| return local_destination_path | ||
| except Exception as e: | ||
| logging.fatal(f"Error copying '{gcs_source_path}' to '{local_destination_path}': {e}") | ||
| return None | ||
|
|
||
|
|
||
| def get_geoid_from_api(lat: str, lon: str, resolver) -> str: | ||
| """ | ||
| Queries the Maps API to find the geoid for the given coordinates. | ||
| Returns the geoid if found, otherwise returns "NOT_FOUND". | ||
| """ | ||
| if not lat or not lon: | ||
| logging.warning(f"Missing lat/lon for API lookup.") | ||
| return "NOT_FOUND" | ||
|
|
||
| try: | ||
| geoids = resolver.resolve(lat, lon) | ||
| if geoids: | ||
| return geoids[0] # Return the first resolved geoid | ||
| else: | ||
| logging.warning(f"API resolver returned no geoids for lat/lon: {lat}, {lon}") | ||
| return "NOT_FOUND" | ||
| except Exception as e: | ||
| logging.error(f"Error querying API for coordinates '{lat}, {lon}': {e}") | ||
| return "NOT_FOUND" | ||
|
|
||
|
|
||
| def process_storm_data(location_master_file, storm_data_file, output_file, resolver): | ||
| """ | ||
| Processes storm data by adding a 'observationAbout' column with geoid information. | ||
| If a geoid is not found in the master file, it queries an external API using BEGIN_LAT and BEGIN_LON. | ||
| Rows without a resolvable geoid are dropped. | ||
|
|
||
| Args: | ||
| location_master_file (str): Path to the location master CSV file. | ||
| storm_data_file (str): Path to the cleaned storm data CSV file. | ||
| output_file (str): Path to the output CSV file. | ||
| resolver: An initialized latlng2place_mapsapi.Resolver object. | ||
| """ | ||
| # Load location master data into a dictionary | ||
| location_to_geoid = {} | ||
| with open(location_master_file, 'r', encoding='utf-8') as f: | ||
| reader = csv.reader(f) | ||
| header = next(reader) # Skip header | ||
| for row in reader: | ||
| if len(row) >= 2: | ||
| location_to_geoid[row[0]] = row[1] | ||
|
|
||
| # Process storm data and add observationAbout column | ||
| with open(storm_data_file, 'r', encoding='utf-8') as infile, \ | ||
| open(output_file, 'w', newline='', encoding='utf-8') as outfile: | ||
| reader = csv.reader(infile) | ||
| writer = csv.writer(outfile) | ||
|
|
||
| try: | ||
| header = next(reader) | ||
| try: | ||
| begin_lat_idx = header.index("BEGIN_LAT") | ||
| begin_lon_idx = header.index("BEGIN_LON") | ||
| except ValueError as e: | ||
| logging.error(f"Error: Missing required column in storm data file: {e}") | ||
| return | ||
|
|
||
| writer.writerow(header + ["observationAbout"]) | ||
|
|
||
| for row in reader: | ||
| if len(row) > begin_lat_idx and len(row) > begin_lon_idx: | ||
| lat = row[begin_lat_idx] | ||
| lon = row[begin_lon_idx] | ||
|
|
||
| # The key for the master file is a formatted string | ||
| location_key = f"[LatLong {lat} {lon}]" | ||
| geoid = location_to_geoid.get(location_key) | ||
|
|
||
| if not geoid: | ||
| geoid = get_geoid_from_api(lat, lon, resolver) | ||
|
|
||
| if geoid and geoid != "NOT_FOUND": | ||
| writer.writerow(row + [geoid]) | ||
| else: | ||
| logging.warning(f"Geoid not found for location: '{lat}, {lon}'. Dropping row.") | ||
| else: | ||
| logging.warning(f"Row is shorter than expected. Dropping row: {row}") | ||
| except StopIteration: | ||
| logging.warning("Warning: Storm data file is empty.") | ||
|
|
||
|
|
||
|
|
||
| def main(argv): | ||
| """Main function to orchestrate the data processing.""" | ||
| del argv # Unused argument | ||
|
|
||
| setup_local_directories(LOCAL_WORKING_DIR) | ||
|
|
||
| # Download the API key and set it for the resolver | ||
| local_api_key_path = download_gcs_file( | ||
| gcs_bucket=GCS_BUCKET_NAME, | ||
| gcs_prefix=GCS_INPUT_PREFIX, | ||
| filename=API_KEY_FILENAME, | ||
| local_target_dir=LOCAL_WORKING_DIR | ||
| ) | ||
|
|
||
| api_key = None | ||
| if local_api_key_path: | ||
| with open(local_api_key_path, 'r') as f: | ||
| api_key_data = json.load(f) | ||
| api_key = api_key_data.get('api-key') | ||
|
|
||
| if not api_key: | ||
| logging.fatal("Could not load the API key. Exiting.") | ||
| return | ||
|
|
||
| # Initialize the resolver | ||
| resolver = latlng2place_mapsapi.Resolver(api_key=api_key) | ||
|
|
||
| # Download the location master file | ||
| local_location_master_path = download_gcs_file( | ||
| gcs_bucket=GCS_BUCKET_NAME, | ||
| gcs_prefix=GCS_INPUT_PREFIX, | ||
| filename=LOCATION_MASTER_FILENAME, | ||
| local_target_dir=LOCAL_WORKING_DIR | ||
| ) | ||
|
|
||
| if local_location_master_path: | ||
| # Define input and output file paths | ||
| cleaned_storm_data_path = 'noaa_storm_output/cleaned_storm_data.csv' | ||
| output_path = 'cleaned_storm_data_with_geoids.csv' | ||
|
|
||
| process_storm_data(local_location_master_path, cleaned_storm_data_path, output_path, resolver) | ||
| print(f"Processing complete. Output written to {output_path}") | ||
Megha18jain marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if __name__ == "__main__": | ||
| app.run(main) | ||
Megha18jain marked this conversation as resolved.
Show resolved
Hide resolved
Megha18jain marked this conversation as resolved.
Show resolved
Hide resolved
|
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,146 @@ | ||||||
|
|
||||||
| import os | ||||||
| import shutil | ||||||
| import sys | ||||||
| import gzip | ||||||
| import re | ||||||
| from ftplib import FTP, error_perm | ||||||
| from absl import logging | ||||||
| import time | ||||||
|
|
||||||
| # Add the project root directory to the Python path. | ||||||
| _SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) | ||||||
| _PROJECT_ROOT = os.path.abspath(os.path.join(_SCRIPTS_DIR, '..', '..')) | ||||||
| sys.path.append(_PROJECT_ROOT) | ||||||
| sys.path.append(os.path.join(_PROJECT_ROOT, 'util')) | ||||||
| from util import file_util | ||||||
|
|
||||||
| DATA_DIR = "noaa_storm_data" | ||||||
| FTP_HOST = "ftp.ncdc.noaa.gov" | ||||||
| FTP_PATH = "/pub/data/swdi/stormevents/csvfiles/" | ||||||
| DOWNLOAD_RETRIES = 10 | ||||||
| INITIAL_RETRY_DELAY_SECONDS = 5 | ||||||
|
|
||||||
|
|
||||||
| def get_existing_files(data_dir): | ||||||
| """ | ||||||
| Returns a set of existing CSV filenames in the data directory. | ||||||
| """ | ||||||
| existing_files = set() | ||||||
| for filename in os.listdir(data_dir): | ||||||
| if filename.endswith('.csv'): | ||||||
| existing_files.add(filename) | ||||||
| return existing_files | ||||||
|
|
||||||
|
|
||||||
| def download_and_unzip_data(data_dir, existing_files): | ||||||
| """ | ||||||
| Downloads and unzips the NOAA storm data using ftplib, with retries for individual file downloads. | ||||||
| """ | ||||||
| logging.info(f"Connecting to FTP server: {FTP_HOST}...") | ||||||
| try: | ||||||
| with FTP(FTP_HOST, timeout=120) as ftp: | ||||||
| ftp.login() | ||||||
| ftp.cwd(FTP_PATH) | ||||||
| filenames = [f for f in ftp.nlst() if f.endswith('.csv.gz')] | ||||||
| logging.info(f"Found {len(filenames)} files in {FTP_PATH}.") | ||||||
| downloaded_count = 0 | ||||||
| failed_downloads = [] | ||||||
|
|
||||||
| for filename in filenames: | ||||||
| local_gz_path = os.path.join(data_dir, filename) | ||||||
| local_csv_path = local_gz_path[:-3] # Remove .gz extension | ||||||
|
|
||||||
| if local_csv_path in existing_files: | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The check for existing files is incorrect.
Suggested change
|
||||||
| logging.info(f"Skipping {filename} as it already exists.") | ||||||
| downloaded_count += 1 | ||||||
| continue | ||||||
|
|
||||||
| delay = INITIAL_RETRY_DELAY_SECONDS | ||||||
| for attempt in range(DOWNLOAD_RETRIES): | ||||||
| try: | ||||||
| logging.info(f"Downloading {filename} (attempt {attempt + 1}/{DOWNLOAD_RETRIES})...") | ||||||
| with file_util.FileIO(local_gz_path, 'wb') as f: | ||||||
| ftp.retrbinary(f"RETR {filename}", f.write) | ||||||
|
|
||||||
| logging.info(f"Unzipping {filename}...") | ||||||
| with gzip.open(local_gz_path, 'rb') as f_in: | ||||||
| with file_util.FileIO(local_csv_path, 'wb') as f_out: | ||||||
| shutil.copyfileobj(f_in, f_out) | ||||||
|
|
||||||
| os.remove(local_gz_path) # Clean up the gzipped file | ||||||
| downloaded_count += 1 | ||||||
| break # Success, break the retry loop | ||||||
| except (TimeoutError, error_perm) as e: | ||||||
| logging.warning(f"Attempt {attempt + 1} failed for {filename}: {e}") | ||||||
| if attempt < DOWNLOAD_RETRIES - 1: | ||||||
| logging.info(f"Retrying in {delay} seconds...") | ||||||
| time.sleep(delay) | ||||||
| delay *= 2 # Exponentially increase the delay | ||||||
| else: | ||||||
| logging.error(f"Failed to download {filename} after {DOWNLOAD_RETRIES} attempts.") | ||||||
| failed_downloads.append(filename) | ||||||
| if os.path.exists(local_gz_path): | ||||||
| os.remove(local_gz_path) | ||||||
|
|
||||||
| if downloaded_count != len(filenames): | ||||||
| logging.error(f"Download incomplete. Expected {len(filenames)} files, but only downloaded {downloaded_count}.") | ||||||
| if failed_downloads: | ||||||
| logging.error(f"The following files failed to download: {', '.join(failed_downloads)}") | ||||||
| raise RuntimeError( | ||||||
| "Failed to download files after multiple retries.") | ||||||
| else: | ||||||
| logging.info("Download and extraction completed successfully.") | ||||||
|
|
||||||
| except error_perm as e: | ||||||
Megha18jain marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| logging.error(f"FTP permission error: {e}") | ||||||
| logging.error("Please check your FTP credentials and permissions.") | ||||||
| except TimeoutError: | ||||||
| logging.error(f"Connection to {FTP_HOST} timed out.") | ||||||
| logging.error("This could be due to a network issue, a firewall blocking the connection, or the server being temporarily unavailable.") | ||||||
| logging.error("You can try running the script again later or manually downloading the data from:") | ||||||
| logging.error(f"ftp://{FTP_HOST}{FTP_PATH}") | ||||||
| except Exception as e: | ||||||
| logging.error(f"An unexpected error occurred during the FTP process: {e}") | ||||||
|
|
||||||
|
|
||||||
| def create_missing_location_files(data_dir): | ||||||
| """ | ||||||
| Creates empty location files for years that have details files but no | ||||||
| location files. | ||||||
| """ | ||||||
| logging.info("Checking for and creating missing location files...") | ||||||
| header = "YEARMONTH,EPISODE_ID,EVENT_ID,LOCATION_INDEX,RANGE,AZIMUTH,LOCATION,LATITUDE,LONGITUDE,LAT2,LON2" | ||||||
|
|
||||||
| details_files = file_util.file_get_matching( | ||||||
| os.path.join(data_dir, "StormEvents_details*_d*.csv")) | ||||||
| locations_files = file_util.file_get_matching( | ||||||
| os.path.join(data_dir, "StormEvents_locations*_d*.csv")) | ||||||
|
|
||||||
| def extract_year(filename): | ||||||
| match = re.search(r"_d(\d{4})_", filename) | ||||||
| return match.group(1) if match else None | ||||||
|
|
||||||
| detail_years = {extract_year(f) for f in details_files if extract_year(f)} | ||||||
| location_years = {extract_year(f) for f in locations_files if extract_year(f)} | ||||||
|
|
||||||
| missing_years = detail_years - location_years | ||||||
|
|
||||||
| for year in sorted(list(missing_years)): | ||||||
| location_filename = os.path.join( | ||||||
| data_dir, f"StormEvents_locations_d{year}_generated.csv") | ||||||
| with file_util.FileIO(location_filename, "w") as f: | ||||||
| f.write(header + "\n") | ||||||
| logging.info(f"Created missing locations file: {location_filename}") | ||||||
|
|
||||||
| logging.info("Missing location file check complete.") | ||||||
|
|
||||||
|
|
||||||
| if __name__ == "__main__": | ||||||
| logging.set_verbosity(logging.INFO) | ||||||
| if not os.path.exists(DATA_DIR): | ||||||
| os.makedirs(DATA_DIR) | ||||||
|
|
||||||
| existing_files = get_existing_files(DATA_DIR) | ||||||
| download_and_unzip_data(DATA_DIR, existing_files) | ||||||
| create_missing_location_files(DATA_DIR) | ||||||
Uh oh!
There was an error while loading. Please reload this page.