Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions statvar_imports/storm_noaa/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# NOAA Storm Events Import

This directory contains scripts to download, process, and import storm event data from the National Oceanic and Atmospheric Administration (NOAA).

## Scripts

- `download_script.py`: Downloads the raw storm data from the NOAA website.
- `process_noaa_data_withoutapi.py`: Processes the raw CSV files, cleans the data, and prepares it for geo-id resolution.
- `add_geoids_to_storm_data.py`: Adds GeoIDs to the processed storm data and generates the final cleaned CSV file for import.

## Data

- `output/storm_noaa.tmcf`: Template MCF file for the storm data.
- `cleaned_storm_data_with_geoids.csv`: The final cleaned and processed data with GeoIDs, ready for import. This is generated by `add_geoids_to_storm_data.py`.

## Manifest

- `manifest.json`: Contains the import specifications for the Data Commons import system.

## Running the import

To run the full import process, execute the scripts in the following order:

1. `python3 download_script.py`
2. `python3 process_noaa_data_withoutapi.py`
3. `python3 add_geoids_to_storm_data.py`

## Autorefresh

This import is configured for semi-monthly autorefresh.
194 changes: 194 additions & 0 deletions statvar_imports/storm_noaa/add_geoids_to_storm_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@

import csv
import os
import sys
import json
import re
from absl import logging
from absl import app

# Get the directory of the current script to define a project root
_MODULE_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.abspath(os.path.join(_MODULE_DIR, '..', '..', '..'))

# Add project root and data/util to sys.path for module imports
sys.path.insert(0, PROJECT_ROOT)
sys.path.insert(0, os.path.join(PROJECT_ROOT, 'data', 'util'))

try:
# Import the file_util module for GCS operations
from data.util import file_util
from data.util import latlng2place_mapsapi
except ImportError as e:
logging.fatal(f"Failed to import a required utility module: {e}. Ensure that data/util/file_util.py and data/util/latlng2place_mapsapi.py exist.")
sys.exit(1)

# --- GCS Configuration ---
GCS_BUCKET_NAME = "unresolved_mcf"
GCS_INPUT_PREFIX = "storms/latest"
LOCATION_MASTER_FILENAME = "location_full_list.csv"
API_KEY_FILENAME = "api_key.json"

# Define the local working directory for downloaded files
LOCAL_WORKING_DIR = os.path.join(_MODULE_DIR, 'gcs_input')


def setup_local_directories(directory_path: str):
"""Ensures that the specified local directory exists."""
try:
os.makedirs(directory_path, exist_ok=True)
logging.info(f"Created/Ensured local working directory: {directory_path}")
except OSError as e:
logging.fatal(f"Error creating directory {directory_path}: {e}")


def download_gcs_file(
gcs_bucket: str,
gcs_prefix: str,
filename: str,
local_target_dir: str
) -> str:
"""
Downloads a file from GCS using file_util.
Returns the local path to the downloaded file.
"""
logging.info(f"--- Starting GCS File Download for {filename} ---")
gcs_source_path = f"gs://{gcs_bucket}/{gcs_prefix}/{filename}"
local_destination_path = os.path.join(local_target_dir, filename)

try:
file_util.file_copy(gcs_source_path, local_destination_path)
logging.info(f"Successfully copied '{gcs_source_path}' to '{local_destination_path}'")
return local_destination_path
except Exception as e:
logging.fatal(f"Error copying '{gcs_source_path}' to '{local_destination_path}': {e}")
return None


def get_geoid_from_api(lat: str, lon: str, resolver) -> str:
"""
Queries the Maps API to find the geoid for the given coordinates.
Returns the geoid if found, otherwise returns "NOT_FOUND".
"""
if not lat or not lon:
logging.warning(f"Missing lat/lon for API lookup.")
return "NOT_FOUND"

try:
geoids = resolver.resolve(lat, lon)
if geoids:
return geoids[0] # Return the first resolved geoid
else:
logging.warning(f"API resolver returned no geoids for lat/lon: {lat}, {lon}")
return "NOT_FOUND"
except Exception as e:
logging.error(f"Error querying API for coordinates '{lat}, {lon}': {e}")
return "NOT_FOUND"


def process_storm_data(location_master_file, storm_data_file, output_file, resolver):
"""
Processes storm data by adding a 'observationAbout' column with geoid information.
If a geoid is not found in the master file, it queries an external API using BEGIN_LAT and BEGIN_LON.
Rows without a resolvable geoid are dropped.

Args:
location_master_file (str): Path to the location master CSV file.
storm_data_file (str): Path to the cleaned storm data CSV file.
output_file (str): Path to the output CSV file.
resolver: An initialized latlng2place_mapsapi.Resolver object.
"""
# Load location master data into a dictionary
location_to_geoid = {}
with open(location_master_file, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
header = next(reader) # Skip header
for row in reader:
if len(row) >= 2:
location_to_geoid[row[0]] = row[1]

# Process storm data and add observationAbout column
with open(storm_data_file, 'r', encoding='utf-8') as infile, \
open(output_file, 'w', newline='', encoding='utf-8') as outfile:
reader = csv.reader(infile)
writer = csv.writer(outfile)

try:
header = next(reader)
try:
begin_lat_idx = header.index("BEGIN_LAT")
begin_lon_idx = header.index("BEGIN_LON")
except ValueError as e:
logging.error(f"Error: Missing required column in storm data file: {e}")
return

writer.writerow(header + ["observationAbout"])

for row in reader:
if len(row) > begin_lat_idx and len(row) > begin_lon_idx:
lat = row[begin_lat_idx]
lon = row[begin_lon_idx]

# The key for the master file is a formatted string
location_key = f"[LatLong {lat} {lon}]"
geoid = location_to_geoid.get(location_key)

if not geoid:
geoid = get_geoid_from_api(lat, lon, resolver)

if geoid and geoid != "NOT_FOUND":
writer.writerow(row + [geoid])
else:
logging.warning(f"Geoid not found for location: '{lat}, {lon}'. Dropping row.")
else:
logging.warning(f"Row is shorter than expected. Dropping row: {row}")
except StopIteration:
logging.warning("Warning: Storm data file is empty.")



def main(argv):
"""Main function to orchestrate the data processing."""
del argv # Unused argument

setup_local_directories(LOCAL_WORKING_DIR)

# Download the API key and set it for the resolver
local_api_key_path = download_gcs_file(
gcs_bucket=GCS_BUCKET_NAME,
gcs_prefix=GCS_INPUT_PREFIX,
filename=API_KEY_FILENAME,
local_target_dir=LOCAL_WORKING_DIR
)

api_key = None
if local_api_key_path:
with open(local_api_key_path, 'r') as f:
api_key_data = json.load(f)
api_key = api_key_data.get('api-key')

if not api_key:
logging.fatal("Could not load the API key. Exiting.")
return

# Initialize the resolver
resolver = latlng2place_mapsapi.Resolver(api_key=api_key)

# Download the location master file
local_location_master_path = download_gcs_file(
gcs_bucket=GCS_BUCKET_NAME,
gcs_prefix=GCS_INPUT_PREFIX,
filename=LOCATION_MASTER_FILENAME,
local_target_dir=LOCAL_WORKING_DIR
)

if local_location_master_path:
# Define input and output file paths
cleaned_storm_data_path = 'noaa_storm_output/cleaned_storm_data.csv'
output_path = 'cleaned_storm_data_with_geoids.csv'

process_storm_data(local_location_master_path, cleaned_storm_data_path, output_path, resolver)
print(f"Processing complete. Output written to {output_path}")

if __name__ == "__main__":
app.run(main)
146 changes: 146 additions & 0 deletions statvar_imports/storm_noaa/download_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@

import os
import shutil
import sys
import gzip
import re
from ftplib import FTP, error_perm
from absl import logging
import time

# Add the project root directory to the Python path.
_SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
_PROJECT_ROOT = os.path.abspath(os.path.join(_SCRIPTS_DIR, '..', '..'))
sys.path.append(_PROJECT_ROOT)
sys.path.append(os.path.join(_PROJECT_ROOT, 'util'))
from util import file_util

DATA_DIR = "noaa_storm_data"
FTP_HOST = "ftp.ncdc.noaa.gov"
FTP_PATH = "/pub/data/swdi/stormevents/csvfiles/"
DOWNLOAD_RETRIES = 10
INITIAL_RETRY_DELAY_SECONDS = 5


def get_existing_files(data_dir):
"""
Returns a set of existing CSV filenames in the data directory.
"""
existing_files = set()
for filename in os.listdir(data_dir):
if filename.endswith('.csv'):
existing_files.add(filename)
return existing_files


def download_and_unzip_data(data_dir, existing_files):
"""
Downloads and unzips the NOAA storm data using ftplib, with retries for individual file downloads.
"""
logging.info(f"Connecting to FTP server: {FTP_HOST}...")
try:
with FTP(FTP_HOST, timeout=120) as ftp:
ftp.login()
ftp.cwd(FTP_PATH)
filenames = [f for f in ftp.nlst() if f.endswith('.csv.gz')]
logging.info(f"Found {len(filenames)} files in {FTP_PATH}.")
downloaded_count = 0
failed_downloads = []

for filename in filenames:
local_gz_path = os.path.join(data_dir, filename)
local_csv_path = local_gz_path[:-3] # Remove .gz extension

if local_csv_path in existing_files:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The check for existing files is incorrect. local_csv_path is a full file path (e.g., noaa_storm_data/some_file.csv), while existing_files contains only file basenames (e.g., {'some_file.csv'}). The in check will therefore always evaluate to false, causing files to be re-downloaded on every run. You should check against the filename without the .gz extension.

Suggested change
if local_csv_path in existing_files:
if filename[:-3] in existing_files:

logging.info(f"Skipping {filename} as it already exists.")
downloaded_count += 1
continue

delay = INITIAL_RETRY_DELAY_SECONDS
for attempt in range(DOWNLOAD_RETRIES):
try:
logging.info(f"Downloading {filename} (attempt {attempt + 1}/{DOWNLOAD_RETRIES})...")
with file_util.FileIO(local_gz_path, 'wb') as f:
ftp.retrbinary(f"RETR {filename}", f.write)

logging.info(f"Unzipping {filename}...")
with gzip.open(local_gz_path, 'rb') as f_in:
with file_util.FileIO(local_csv_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)

os.remove(local_gz_path) # Clean up the gzipped file
downloaded_count += 1
break # Success, break the retry loop
except (TimeoutError, error_perm) as e:
logging.warning(f"Attempt {attempt + 1} failed for {filename}: {e}")
if attempt < DOWNLOAD_RETRIES - 1:
logging.info(f"Retrying in {delay} seconds...")
time.sleep(delay)
delay *= 2 # Exponentially increase the delay
else:
logging.error(f"Failed to download {filename} after {DOWNLOAD_RETRIES} attempts.")
failed_downloads.append(filename)
if os.path.exists(local_gz_path):
os.remove(local_gz_path)

if downloaded_count != len(filenames):
logging.error(f"Download incomplete. Expected {len(filenames)} files, but only downloaded {downloaded_count}.")
if failed_downloads:
logging.error(f"The following files failed to download: {', '.join(failed_downloads)}")
raise RuntimeError(
"Failed to download files after multiple retries.")
else:
logging.info("Download and extraction completed successfully.")

except error_perm as e:
logging.error(f"FTP permission error: {e}")
logging.error("Please check your FTP credentials and permissions.")
except TimeoutError:
logging.error(f"Connection to {FTP_HOST} timed out.")
logging.error("This could be due to a network issue, a firewall blocking the connection, or the server being temporarily unavailable.")
logging.error("You can try running the script again later or manually downloading the data from:")
logging.error(f"ftp://{FTP_HOST}{FTP_PATH}")
except Exception as e:
logging.error(f"An unexpected error occurred during the FTP process: {e}")


def create_missing_location_files(data_dir):
"""
Creates empty location files for years that have details files but no
location files.
"""
logging.info("Checking for and creating missing location files...")
header = "YEARMONTH,EPISODE_ID,EVENT_ID,LOCATION_INDEX,RANGE,AZIMUTH,LOCATION,LATITUDE,LONGITUDE,LAT2,LON2"

details_files = file_util.file_get_matching(
os.path.join(data_dir, "StormEvents_details*_d*.csv"))
locations_files = file_util.file_get_matching(
os.path.join(data_dir, "StormEvents_locations*_d*.csv"))

def extract_year(filename):
match = re.search(r"_d(\d{4})_", filename)
return match.group(1) if match else None

detail_years = {extract_year(f) for f in details_files if extract_year(f)}
location_years = {extract_year(f) for f in locations_files if extract_year(f)}

missing_years = detail_years - location_years

for year in sorted(list(missing_years)):
location_filename = os.path.join(
data_dir, f"StormEvents_locations_d{year}_generated.csv")
with file_util.FileIO(location_filename, "w") as f:
f.write(header + "\n")
logging.info(f"Created missing locations file: {location_filename}")

logging.info("Missing location file check complete.")


if __name__ == "__main__":
logging.set_verbosity(logging.INFO)
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)

existing_files = get_existing_files(DATA_DIR)
download_and_unzip_data(DATA_DIR, existing_files)
create_missing_location_files(DATA_DIR)
Loading
Loading