r"""Utility functions that are used across modules.
Ensure these do not reference any function from colors.py. If a function does need to reference another function from colors.py,
add it in that file.
Public utility functions should be added to documentation in the '/docs/_templates/overrides/tropycal.utils.rst' file."""
import shapely.geometry as sgeom
import math
import numpy as np
from datetime import datetime as dt
import requests
import urllib
import warnings
import scipy.interpolate as interp
import re
import shapefile
import zipfile
from io import BytesIO
from .. import constants
# ===========================================================================================================
# Public utilities
# These are used internally and have use externally. Add these to documentation.
# ===========================================================================================================
[docs]def wind_to_category(wind_speed):
r"""
Convert sustained wind speed in knots to Saffir-Simpson Hurricane Wind Scale category.
Parameters
----------
wind_speed : int
Sustained wind speed in knots.
Returns
-------
int
Category corresponding to the sustained wind. 0 is tropical storm, -1 is tropical depression.
"""
# NaN handling
if np.isnan(wind_speed):
return np.nan
# Category 5 hurricane
if wind_speed >= 137:
return 5
# Category 4 hurricane
elif wind_speed >= 113:
return 4
# Category 3 hurricane
elif wind_speed >= 96:
return 3
# Category 2 hurricane
elif wind_speed >= 83:
return 2
# Category 1 hurricane
elif wind_speed >= 64:
return 1
# Tropical storm
elif wind_speed >= 34:
return 0
# Tropical depression
else:
return -1
[docs]def category_to_wind(category):
r"""
Convert Saffir-Simpson Hurricane Wind Scale category to minimum threshold sustained wind speed in knots.
Parameters
----------
category : int
Saffir-Simpson Hurricane Wind Scale category. Use 0 for tropical storm, -1 for tropical depression.
Returns
-------
int
Sustained wind speed in knots corresponding to the minimum threshold of the requested category.
"""
# Construct dictionary of thresholds
conversion = {-1: 5,
0: 34,
1: 64,
2: 83,
3: 96,
4: 113,
5: 137}
# Return category
return conversion.get(category, np.nan)
[docs]def classify_subtropical(storm_type):
r"""
Check whether a tropical cyclone was purely subtropical.
Parameters
----------
storm_type : list or numpy.ndarray
List or array containing storm types.
Returns
-------
bool
Boolean identifying whether the tropical cyclone was purely subtropical.
"""
# Ensure storm_type is a numpy array
storm_type_check = np.array(storm_type)
# Ensure all storm types are uppercase
storm_track_check = [i.upper() for i in storm_type_check]
# Check for subtropical depression status
if 'SD' in storm_type_check:
if 'SD' in storm_type_check and True not in np.isin(storm_track_check, ['TD', 'TS', 'HU']):
return True
# Check for subtropical storm status
if 'SS' in storm_type_check and True not in np.isin(storm_track_check, ['TD', 'TS', 'HU']):
return True
# Otherwise, it was a tropical cyclone at some point in its life cycle
else:
return False
[docs]def get_storm_classification(wind_speed, subtropical_flag, basin):
r"""
Retrieve the tropical cyclone classification given its subtropical status and current basin.
These strings take the format of "Tropical Storm", "Hurricane", "Typhoon", etc.
Parameters
----------
wind_speed : int
Integer denoting sustained wind speed in knots.
subtropical_flag : bool
Boolean denoting whether the cyclone is subtropical or not.
basin : str
String denoting basin in which the tropical cyclone is located.
Returns
-------
str
String denoting the classification of the tropical cyclone.
Notes
-----
.. warning::
This function currently does not differentiate between 1-minute, 3-minute and 10-minute sustained wind speeds.
"""
# North Atlantic and East Pacific basins
if basin in constants.NHC_BASINS:
if wind_speed == 0:
return "Unknown"
elif wind_speed < 34:
if subtropical_flag:
return "Subtropical Depression"
else:
return "Tropical Depression"
elif wind_speed < 63:
if subtropical_flag:
return "Subtropical Storm"
else:
return "Tropical Storm"
else:
return "Hurricane"
# West Pacific basin
elif basin == 'west_pacific':
if wind_speed == 0:
return "Unknown"
elif wind_speed < 34:
if subtropical_flag:
return "Subtropical Depression"
else:
return "Tropical Depression"
elif wind_speed < 63:
if subtropical_flag:
return "Subtropical Storm"
else:
return "Tropical Storm"
elif wind_speed < 130:
return "Typhoon"
else:
return "Super Typhoon"
# Australia and South Pacific basins
elif basin == 'australia' or basin == 'south_pacific':
if wind_speed == 0:
return "Unknown"
elif wind_speed < 63:
return "Tropical Cyclone"
else:
return "Severe Tropical Cyclone"
# North Indian Ocean
elif basin == 'north_indian':
if wind_speed == 0:
return "Unknown"
elif wind_speed < 28:
return "Depression"
elif wind_speed < 34:
return "Deep Depression"
elif wind_speed < 48:
return "Cyclonic Storm"
elif wind_speed < 64:
return "Severe Cyclonic Storm"
elif wind_speed < 90:
return "Very Severe Cyclonic Storm"
elif wind_speed < 120:
return "Extremely Severe Cyclonic Storm"
else:
return "Super Cyclonic Storm"
# South Indian Ocean
elif basin == 'south_indian':
if wind_speed == 0:
return "Unknown"
elif wind_speed < 28:
return "Tropical Disturbance"
elif wind_speed < 34:
return "Tropical Depression"
elif wind_speed < 48:
return "Moderate Tropical Storm"
elif wind_speed < 64:
return "Severe Tropical Storm"
elif wind_speed < 90:
return "Tropical Cyclone"
elif wind_speed < 115:
return "Intense Tropical Cyclone"
else:
return "Very Intense Tropical Cyclone"
# Otherwise, return a generic "Cyclone" classification
else:
return "Cyclone"
[docs]def get_storm_type(wind_speed, subtropical_flag, typhoon=False):
r"""
Retrieve the 2-character tropical cyclone type (e.g., "TD", "TS", "HU") given its subtropical status.
Parameters
----------
wind_speed : int
Integer denoting sustained wind speed in knots.
subtropical_flag : bool
Boolean denoting whether the cyclone is subtropical or not.
typhoon : bool, optional
Boolean denoting whether typhoon (True) or hurricane (False) classification should be used for wind speeds at or above 64 kt. Default is False.
Returns
-------
str
String denoting the tropical cyclone type.
Notes
-----
The available types and their descriptions are as follows:
.. list-table::
:widths: 25 75
:header-rows: 1
* - Property
- Description
* - SD
- Subtropical Depression
* - SS
- Subtropical Storm
* - TD
- Tropical Depression
* - TS
- Tropical Storm
* - HU
- Hurricane
* - TY
- Typhoon
* - ST
- Super Typhoon
"""
# Tropical depression
if wind_speed < 34:
if subtropical_flag:
return "SD"
else:
return "TD"
# Tropical storm
elif wind_speed < 63:
if subtropical_flag:
return "SS"
else:
return "TS"
# Hurricane
elif not typhoon:
return "HU"
# Typhoon
elif wind_speed < 130:
return "TY"
# Super Typhoon
else:
return "ST"
[docs]def get_basin(lat, lon, source_basin=""):
r"""
Returns the current basin of the tropical cyclone.
Parameters
----------
lat : int or float
Latitude of the storm.
lon : int or float
Longitude of the storm.
Other Parameters
----------------
source_basin : str, optional
String representing the origin storm basin (e.g., "north_atlantic", "east_pacific").
Returns
-------
str
String representing the current basin (e.g., "north_atlantic", "east_pacific").
Notes
-----
For storms in the North Atlantic or East Pacific basin, ``source_basin`` must be provided. This is because storms located over Mexico or Central America could be in either basin depending on where they originated (e.g., storms originated in the Atlantic basin are considered to be within the Atlantic basin while over Mexico or Central America until emerging in the Pacific Ocean).
"""
# Error check
if not is_number(lat):
msg = "\"lat\" must be of type int or float."
raise TypeError(msg)
if not is_number(lon):
msg = "\"lon\" must be of type int or float."
raise TypeError(msg)
# Fix longitude
if lon < 0.0:
lon = lon + 360.0
# Northern hemisphere check
if lat >= 0.0:
if lon < 100.0:
if lat < 40.0:
return "north_indian"
else:
if lon < 70.0:
return "north_atlantic"
else:
return "west_pacific"
elif lon <= 180.0:
return "west_pacific"
else:
if source_basin == "north_atlantic":
if constants.PATH_PACIFIC.contains_point((lat, lon)):
return "east_pacific"
else:
return "north_atlantic"
elif source_basin == "east_pacific":
if constants.PATH_ATLANTIC.contains_point((lat, lon)):
return "north_atlantic"
else:
return "east_pacific"
else:
msg = "Cannot determine whether storm is in North Atlantic or East Pacific basins."
raise RuntimeError(msg)
# Southern hemisphere check
else:
if lon < 20.0:
return "south_atlantic"
elif lon < 90.0:
return "south_indian"
elif lon < 160.0:
return "australia"
elif lon < 280.0:
return "south_pacific"
else:
return "south_atlantic"
[docs]def knots_to_mph(wind_speed):
r"""
Convert wind from knots to miles per hour, in increments of 5 as used by NHC.
Parameters
----------
wind_speed : int
Sustained wind in knots.
Returns
-------
int
Sustained wind in miles per hour.
"""
# Ensure input is rounded down to nearest multiple of 5
wind_speed = wind_speed - (wind_speed % 5)
# Define knots and mph conversions
kts = [0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100,
105, 110, 115, 120, 125, 130, 135, 140, 145, 150, 155, 160, 165, 170, 175, 180, 185]
mphs = [0, 10, 15, 20, 25, 30, 35, 40, 45, 50, 60, 65, 70, 75, 80, 85, 90, 100, 105, 110,
115, 120, 125, 130, 140, 145, 150, 155, 160, 165, 175, 180, 185, 190, 195, 200, 205, 210]
# If value is in list, return converted value
if wind_speed in kts:
return mphs[kts.index(wind_speed)]
# Otherwise, return original value
return wind_speed
[docs]def accumulated_cyclone_energy(wind_speed, hours=6):
r"""
Calculate Accumulated Cyclone Energy (ACE) based on sustained wind speed in knots.
Parameters
----------
wind_speed : int or list, numpy.ndarray
Sustained wind in knots.
hours : int, optional
Duration in hours over which the sustained wind was observed. Default is 6 hours.
Returns
-------
float
Accumulated cyclone energy.
Notes
-----
As defined in `Bell et al. (2000)`_, Accumulated Cyclone Energy (ACE) is calculated as follows:
.. math:: ACE = 10^{-4} \sum v^{2}_{max}
As shown above, ACE is the sum of the squares of the estimated maximum sustained wind speed (in knots). By default, this assumes data is provided every 6 hours, as is the standard in HURDATv2 and NHC's Best Track, though this function provides an option to use a different hour duration.
.. _Bell et al. (2000): https://journals.ametsoc.org/view/journals/bams/81/6/1520-0477_2000_81_s1_caf_2_0_co_2.xml
"""
# Determine types
if isinstance(wind_speed, (np.ndarray, list)):
wind_speed = np.copy(wind_speed)
# Calculate ACE
ace = ((10**-4) * (wind_speed**2)) * (hours/6.0)
# Coerce to zero if wind speed less than TS force
if isinstance(ace, (np.ndarray, list)):
ace[wind_speed < 34] = 0.0
return np.round(ace, 4)
else:
if wind_speed < 34:
ace = 0.0
return round(ace, 4)
[docs]def dropsonde_mslp_estimate(mslp, surface_wind):
r"""
Apply a NHC rule of thumb for estimating a TC's minimum central MSLP. This is intended for a dropsonde released in the eye, accounting for drifting by factoring in the surface wind in knots.
Parameters
----------
mslp : int or float
Dropsonde surface MSLP, in hPa.
surface_wind : int or float
Surface wind as measured by the dropsonde. This **must** be surface wind; this cannot be substituted by a level just above the surface.
Returns
-------
float
Estimated TC minimum central MSLP using the NHC estimation method.
Notes
-----
Source is from NHC presentation:
https://www.nhc.noaa.gov/outreach/presentations/nhc2013_aircraftData.pdf
"""
return mslp - (surface_wind / 10.0)
[docs]def get_two_current():
r"""
Retrieve the latest NHC Tropical Weather Outlook (TWO).
Returns
-------
dict
A dictionary of shapefiles for points, areas and lines.
Notes
-----
The shapefiles returned are modified versions of Cartopy's BasicReader, allowing to read in shapefiles directly from URL without having to download the shapefile locally first.
"""
# Retrieve NHC shapefiles for development areas
shapefiles = {}
for name in ['areas', 'lines', 'points']:
try:
# Read in shapefile zip from NHC
url = 'https://www.nhc.noaa.gov/xgtwo/gtwo_shapefiles.zip'
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
file_like_object = BytesIO(response.read())
tar = zipfile.ZipFile(file_like_object)
# Get file list (points, areas)
members = '\n'.join([i for i in tar.namelist()])
nums = "[0123456789]"
search_pattern = f'gtwo_{name}_20{nums}{nums}{nums}{nums}{nums}{nums}{nums}{nums}{nums}{nums}.shp'
pattern = re.compile(search_pattern)
filelist = pattern.findall(members)
files = []
for file in filelist:
if file not in files:
files.append(file.split(".shp")[0]) # remove duplicates
# Retrieve necessary components for shapefile
members = tar.namelist()
members_names = [i for i in members]
data = {'shp': 0, 'dbf': 0, 'prj': 0, 'shx': 0}
for key in data.keys():
idx = members_names.index(files[0]+"."+key)
data[key] = BytesIO(tar.read(members[idx]))
# Read in shapefile
orig_reader = shapefile.Reader(
shp=data['shp'], dbf=data['dbf'], prj=data['prj'], shx=data['shx'])
shapefiles[name] = BasicReader(orig_reader)
except:
shapefiles[name] = None
return shapefiles
[docs]def get_two_archive(time):
r"""
Retrieve an archived NHC Tropical Weather Outlook (TWO). If none available within 30 hours of the specified time, an empty dict is returned.
Parameters
----------
time : datetime
Valid time for archived shapefile.
Returns
-------
dict
A dictionary of shapefiles for points, areas and lines.
Notes
-----
The shapefiles returned are modified versions of Cartopy's BasicReader, allowing to read in shapefiles directly from URL without having to download the shapefile locally first.
TWO shapefiles are available courtesy of the National Hurricane Center beginning 28 July 2010.
"""
# Determine TWO URL and info based on requested time
if time >= dt(2023, 5, 1):
directory_url = 'https://www.nhc.noaa.gov/gis/gtwo/archive/'
elif time >= dt(2014, 6, 1):
directory_url = 'https://www.nhc.noaa.gov/gis/gtwo_5day/archive/'
elif time >= dt(2010, 7, 28):
directory_url = 'https://www.nhc.noaa.gov/gis/gtwo_2day/archive/'
else:
return {'areas': None, 'lines': None, 'points': None}
# Fetch list of TWOs based on requested time
page = requests.get(directory_url).text
content = page.split("\n")
files = []
for line in content:
if '<a href="' in line and 'zip">' in line:
filename = line.split('zip">')[1]
filename = filename.split("</a>")[0]
if '_' not in filename:
continue
if 'gtwo' not in filename.split('_')[0]:
files.append(filename)
del content
# Find closest NHC shapefile if within 24 hours
dates = [dt.strptime(i.split("_")[0], '%Y%m%d%H%M') for i in files]
diff = [(time-i).total_seconds()/3600 for i in dates]
diff = [i for i in diff if i >= 0]
# Continue if less than 24 hours difference
if len(diff) > 0 and np.nanmin(diff) <= 30:
two_date = dates[diff.index(np.nanmin(diff))].strftime('%Y%m%d%H%M')
# Retrieve NHC shapefiles for development areas
shapefiles = {}
for name in ['areas', 'lines', 'points']:
try:
# Read in shapefile zip from NHC
file_url = f'{directory_url}{two_date}_gtwo.zip'
request = urllib.request.Request(file_url)
response = urllib.request.urlopen(request)
file_like_object = BytesIO(response.read())
tar = zipfile.ZipFile(file_like_object)
# Get file list (points, areas)
members = '\n'.join([i for i in tar.namelist()])
nums = "[0123456789]"
search_pattern = f'gtwo_{name}_20{nums}{nums}{nums}{nums}{nums}{nums}{nums}{nums}{nums}{nums}.shp'
pattern = re.compile(search_pattern)
filelist = pattern.findall(members)
files = []
for file in filelist:
if file not in files:
# remove duplicates
files.append(file.split(".shp")[0])
# Alternatively, check files for older format (generally 2014 and earlier)
if len(files) == 0:
if name in ['lines', 'points']:
shapefiles[name] = None
continue
search_pattern = f'20{nums}{nums}{nums}{nums}{nums}{nums}{nums}{nums}{nums}{nums}_gtwo.shp'
pattern = re.compile(search_pattern)
filelist = pattern.findall(members)
for file in filelist:
if file not in files:
# remove duplicates
files.append(file.split(".shp")[0])
# Retrieve necessary components for shapefile
members = tar.namelist()
members_names = [i for i in members]
data = {'shp': 0, 'dbf': 0, 'prj': 0, 'shx': 0}
for key in data.keys():
idx = members_names.index(files[0]+"."+key)
data[key] = BytesIO(tar.read(members[idx]))
# Read in shapefile
orig_reader = shapefile.Reader(
shp=data['shp'], dbf=data['dbf'], prj=data['prj'], shx=data['shx'])
shapefiles[name] = BasicReader(orig_reader)
except:
shapefiles[name] = None
else:
shapefiles = {'areas': None, 'lines': None, 'points': None}
return shapefiles
[docs]def nhc_cone_radii(year, basin, forecast_hour=None):
r"""
Retrieve the official NHC Cone of Uncertainty radii by basin, year and forecast hour(s). Units are in nautical miles.
Parameters
----------
year : int
Valid year for cone of uncertainty radii.
basin : str
Basin for cone of uncertainty radii. If basin is invalid, return value will be an empty dict. Please refer to :ref:`options-domain` for available basin options.
forecast_hour : int or list, optional
Forecast hour(s) to retrieve the cone of uncertainty for. If empty, all available forecast hours will be retrieved.
Returns
-------
dict
Dictionary with forecast hour(s) as the keys, and the cone radius in nautical miles for each respective forecast hour as the values.
Notes
-----
1. NHC cone radii are available beginning 2008 onward. Radii for years before 2008 will be defaulted to 2008, and if the current year's radii are not available yet, the radii for the most recent year will be returned.
2. NHC began producing cone radii for forecast hour 60 in 2020. Years before 2020 do not have a forecast hour 60.
"""
# Source: https://www.nhc.noaa.gov/verification/verify3.shtml
# Source 2: https://www.nhc.noaa.gov/aboutcone.shtml
# Radii are in nautical miles
cone_climo_hr = [3, 12, 24, 36, 48, 72, 96, 120]
# Basin check
if basin not in ['north_atlantic', 'east_pacific']:
return {}
# Fix for 2020 and later that incorporates 60 hour forecasts
if year >= 2020:
cone_climo_hr = [3, 12, 24, 36, 48, 60, 72, 96, 120]
# Forecast hour check
if forecast_hour is None:
forecast_hour = cone_climo_hr
elif isinstance(forecast_hour, int):
if forecast_hour not in cone_climo_hr:
raise ValueError(
f"Forecast hour {forecast_hour} is invalid. Available forecast hours for {year} are: {cone_climo_hr}")
else:
forecast_hour = [forecast_hour]
elif isinstance(forecast_hour, list):
forecast_hour = [i for i in forecast_hour if i in cone_climo_hr]
if len(forecast_hour) == 0:
raise ValueError(
f"Requested forecast hours are invalid. Available forecast hours for {year} are: {cone_climo_hr}")
else:
raise TypeError("forecast_hour must be of type int or list")
# Year check
if year > np.max([k for k in constants.CONE_SIZE_ATL.keys()]):
year = [k for k in constants.CONE_SIZE_ATL.keys()][0]
warnings.warn(
f"No cone information is available for the requested year. Defaulting to {year} cone.")
elif year not in constants.CONE_SIZE_ATL.keys():
year = 2008
warnings.warn(
"No cone information is available for the requested year. Defaulting to 2008 cone.")
# Retrieve data
cone_radii = {}
for hour in list(np.sort(forecast_hour)):
hour_index = cone_climo_hr.index(hour)
if basin == 'north_atlantic':
cone_radii[hour] = constants.CONE_SIZE_ATL[year][hour_index]
elif basin == 'east_pacific':
cone_radii[hour] = constants.CONE_SIZE_PAC[year][hour_index]
return cone_radii
[docs]def generate_nhc_cone(forecast, basin, shift_lons=False, cone_days=5, cone_year=None, grid_res=0.05, return_xarray=False):
r"""
Generates a gridded cone of uncertainty using forecast data from NHC.
Parameters
----------
forecast : dict
Dictionary containing forecast data
basin : str
Basin for cone of uncertainty radii. Please refer to :ref:`options-domain` for available basin options.
Other Parameters
----------------
shift_lons : bool, optional
If true, grid will be shifted to +0 to +360 degrees longitude. Default is False (-180 to +180 degrees).
cone_days : int, optional
Number of forecast days to generate the cone through. Default is 5 days.
cone_year : int, optional
Year valid for cone radii. If None, this fuction will attempt to retrieve the year from the forecast dict.
grid_res : int or float, optional
Horizontal resolution of the cone of uncertainty grid in degrees. Default is 0.05 degrees.
return_xarray : bool, optional
If True, returns output as an xarray Dataset. Default is False, returning output as a dictionary.
Returns
-------
dict or xarray.Dataset
Depending on `return_xarray`, returns either a dictionary or an xarray Dataset containing the gridded cone of uncertainty and its accompanying attributes.
Notes
-----
Forecast dicts can be retrieved for realtime storm objects using ``RealtimeStorm.get_forecast_realtime()``, and for archived storms using ``Storm.get_nhc_forecast_dict()``.
"""
# Check forecast dict has all required keys
check_dict = [True if i in forecast.keys() else False for i in [
'fhr', 'lat', 'lon', 'init']]
if False in check_dict:
raise ValueError(
"forecast dict must contain keys 'fhr', 'lat', 'lon' and 'init'. You may retrieve a forecast dict for a Storm object through 'storm.get_operational_forecasts()'.")
# Check forecast basin
if basin not in constants.ALL_BASINS:
raise ValueError("basin cannot be identified.")
# Retrieve cone of uncertainty year
if cone_year is None:
cone_year = forecast['init'].year
if cone_year > np.max([k for k in constants.CONE_SIZE_ATL.keys()]):
cone_year = [k for k in constants.CONE_SIZE_ATL.keys()][0]
warnings.warn(
f"No cone information is available for the requested year. Defaulting to {cone_year} cone.")
elif cone_year not in constants.CONE_SIZE_ATL.keys():
cone_year = 2008
warnings.warn(
"No cone information is available for the requested year. Defaulting to 2008 cone.")
# Retrieve cone size and hours for given year
if basin in ['north_atlantic', 'east_pacific']:
output = nhc_cone_radii(cone_year, basin)
cone_climo_hr = [k for k in output.keys()]
cone_size = [output[k] for k in output.keys()]
else:
cone_climo_hr = [3, 12, 24, 36, 48, 72, 96, 120]
cone_size = 0
# Function for interpolating between 2 times
def temporal_interpolation(value, orig_times, target_times):
f = interp.interp1d(orig_times, value)
ynew = f(target_times)
return ynew
# Function for finding nearest value in an array
def find_nearest(array, val):
return array[np.abs(array - val).argmin()]
# Function for adding a radius surrounding a point
def add_radius(lats, lons, vlat, vlon, rad, grid_res=0.05):
# construct new array expanding slightly over rad from lat/lon center
grid_fac = (rad*4)/111.0
# Make grid surrounding position coordinate & radius of circle
nlon = np.arange(find_nearest(lons, vlon-grid_fac),
find_nearest(lons, vlon+grid_fac+grid_res), grid_res)
nlat = np.arange(find_nearest(lats, vlat-grid_fac),
find_nearest(lats, vlat+grid_fac+grid_res), grid_res)
lons, lats = np.meshgrid(nlon, nlat)
return_arr = np.zeros((lons.shape))
# Calculate distance from vlat/vlon at each gridpoint
r_earth = 6.371 * 10**6
dlat = np.subtract(np.radians(lats), np.radians(vlat))
dlon = np.subtract(np.radians(lons), np.radians(vlon))
a = np.sin(dlat/2) * np.sin(dlat/2) + np.cos(np.radians(lats)) * \
np.cos(np.radians(vlat)) * np.sin(dlon/2) * np.sin(dlon/2)
c = 2 * np.arctan(np.sqrt(a), np.sqrt(1-a))
dist = (r_earth * c)/1000.0
dist = dist * 0.621371 # to miles
dist = dist * 0.868976 # to nautical miles
# Mask out values less than radius
return_arr[dist <= rad] = 1
# Attach small array into larger subset array
small_coords = {'lat': nlat, 'lon': nlon}
return return_arr, small_coords
# --------------------------------------------------------------------
# Check if fhr3 is available, then get forecast data
flag_12 = 0
if forecast['fhr'][0] == 12:
flag_12 = 1
cone_climo_hr = cone_climo_hr[1:]
fcst_lon = forecast['lon']
fcst_lat = forecast['lat']
fhr = forecast['fhr']
t = np.array(forecast['fhr'])/6.0
subtract_by = t[0]
t = t - t[0]
interp_fhr_idx = np.arange(t[0], t[-1]+0.1, 0.1) - t[0]
elif 3 in forecast['fhr'] and 1 in forecast['fhr'] and 0 in forecast['fhr']:
fcst_lon = forecast['lon'][2:]
fcst_lat = forecast['lat'][2:]
fhr = forecast['fhr'][2:]
t = np.array(fhr)/6.0
interp_fhr_idx = np.arange(t[0], t[-1]+0.01, 0.1)
elif 3 in forecast['fhr'] and 0 in forecast['fhr']:
idx = np.array([i for i, j in enumerate(
forecast['fhr']) if j in cone_climo_hr])
fcst_lon = np.array(forecast['lon'])[idx]
fcst_lat = np.array(forecast['lat'])[idx]
fhr = np.array(forecast['fhr'])[idx]
t = np.array(fhr)/6.0
interp_fhr_idx = np.arange(t[0], t[-1]+0.01, 0.1)
elif forecast['fhr'][1] < 12:
cone_climo_hr[0] = 0
fcst_lon = [forecast['lon'][0]]+forecast['lon'][2:]
fcst_lat = [forecast['lat'][0]]+forecast['lat'][2:]
fhr = [forecast['fhr'][0]]+forecast['fhr'][2:]
t = np.array(fhr)/6.0
interp_fhr_idx = np.arange(t[0]/6.0, t[-1]+0.1, 0.1)
else:
cone_climo_hr[0] = 0
fcst_lon = forecast['lon']
fcst_lat = forecast['lat']
fhr = forecast['fhr']
t = np.array(fhr)/6.0
interp_fhr_idx = np.arange(t[0], t[-1]+0.1, 0.1)
# Determine index of forecast day cap
if (cone_days*24) in fhr:
cone_day_cap = list(fhr).index(cone_days*24)+1
fcst_lon = fcst_lon[:cone_day_cap]
fcst_lat = fcst_lat[:cone_day_cap]
fhr = fhr[:cone_day_cap]
t = np.array(fhr)/6.0
interp_fhr_idx = np.arange(interp_fhr_idx[0], t[-1]+0.1, 0.1)
else:
cone_day_cap = len(fhr)
# Account for dateline
if shift_lons:
temp_lon = np.array(fcst_lon)
temp_lon[temp_lon < 0] = temp_lon[temp_lon < 0]+360.0
fcst_lon = temp_lon.tolist()
# Interpolate forecast data temporally and spatially
interp_kind = 'quadratic'
if len(t) == 2:
interp_kind = 'linear' # Interpolate linearly if only 2 forecast points
x1 = interp.interp1d(t, fcst_lon, kind=interp_kind)
y1 = interp.interp1d(t, fcst_lat, kind=interp_kind)
interp_fhr = interp_fhr_idx * 6
interp_lon = x1(interp_fhr_idx)
interp_lat = y1(interp_fhr_idx)
# Return if no cone specified
if cone_size == 0:
return_dict = {'center_lon': interp_lon, 'center_lat': interp_lat}
if return_xarray:
import xarray as xr
return xr.Dataset(return_dict)
else:
return return_dict
# Interpolate cone radius temporally
cone_climo_hr = cone_climo_hr[:cone_day_cap]
cone_size = cone_size[:cone_day_cap]
cone_climo_fhrs = np.array(cone_climo_hr)
if flag_12 == 1:
interp_fhr += (subtract_by*6.0)
cone_climo_fhrs = cone_climo_fhrs[1:]
idxs = np.nonzero(np.in1d(np.array(fhr), np.array(cone_climo_hr)))
temp_arr = np.array(cone_size)[idxs]
interp_rad = np.apply_along_axis(lambda n: temporal_interpolation(
n, fhr, interp_fhr), axis=0, arr=temp_arr)
# Initialize 0.05 degree grid
gridlats = np.arange(min(interp_lat)-7, max(interp_lat)+7, grid_res)
gridlons = np.arange(min(interp_lon)-7, max(interp_lon)+7, grid_res)
gridlons2d, gridlats2d = np.meshgrid(gridlons, gridlats)
# Iterate through fhr, calculate cone & add into grid
large_coords = {'lat': gridlats, 'lon': gridlons}
griddata = np.zeros((gridlats2d.shape))
for i, (ilat, ilon, irad) in enumerate(zip(interp_lat, interp_lon, interp_rad)):
temp_grid, small_coords = add_radius(
gridlats, gridlons, ilat, ilon, irad, grid_res=grid_res)
plug_grid = np.zeros((griddata.shape))
plug_grid = plug_array(temp_grid, plug_grid,
small_coords, large_coords)
griddata = np.maximum(griddata, plug_grid)
if return_xarray:
import xarray as xr
cone = xr.DataArray(griddata, coords=[gridlats, gridlons], dims=[
'grid_lat', 'grid_lon'])
return_ds = {
'cone': cone,
'center_lon': interp_lon,
'center_lat': interp_lat
}
return_ds = xr.Dataset(return_ds)
return_ds.attrs['year'] = cone_year
return return_ds
else:
return_dict = {'lat': gridlats, 'lon': gridlons, 'lat2d': gridlats2d, 'lon2d': gridlons2d, 'cone': griddata,
'center_lon': interp_lon, 'center_lat': interp_lat, 'year': cone_year}
return return_dict
[docs]def calc_ensemble_ellipse(member_lons, member_lats):
r"""
Calculate an ellipse representing ensemble member location spread. This function follows the methodology of Hamill et al. (2011).
Parameters
----------
member_lons : list
List containing longitudes of ensemble members valid at a single time.
member_lats : list
List containing latitudes of ensemble members valid at a single time.
Returns
-------
dict
Dictionary containing the longitude and latitude of the ellipse.
Notes
-----
The ensemble ellipse used in this function follows the methodology of `Hamill et al. (2011)`_, denoting the spread in ensemble member cyclone positions. The size of the ellipse is calculated to contain 90% of ensemble members at any given time. This ellipse can be used to determine the primary type of ensemble variability:
* **Along-track variability** - if the major axis of the ellipse is parallel to the ensemble mean motion vector.
* **Across-track variability** - if the major axis of the ellipse is normal to the ensemble mean motion vector.
.. _Hamill et al. (2011): https://doi.org/10.1175/2010MWR3456.1
This code is adapted from NCAR Command Language (NCL) code courtesy of Ryan Torn and Thomas Hamill.
Examples
--------
>>> lons = [-60, -59, -59, -61, -67, -58, -60, -57]
>>> lats = [40, 40, 41, 39, 37, 42, 40, 41]
>>> ellipse = utils.calc_ensemble_ellipse(lons, lats)
>>> print(ellipse.keys())
dict_keys(['ellipse_lon', 'ellipse_lat'])
"""
# Compute ensemble mean lon & lat
mean_lon = np.average(member_lons)
mean_lat = np.average(member_lats)
Pb = [[0, 0], [0, 0]]
for i in range(len(member_lats)):
Pb[0][0] = Pb[0][0] + (member_lons[i]-mean_lon) * \
(member_lons[i]-mean_lon)
Pb[1][1] = Pb[1][1] + (member_lats[i]-mean_lat) * \
(member_lats[i]-mean_lat)
Pb[1][0] = Pb[1][0] + (member_lats[i]-mean_lat) * \
(member_lons[i]-mean_lon)
Pb[0][1] = Pb[1][0]
Pb = np.array(Pb) / float(len(member_lats)-1)
rho = Pb[1][0] / (np.sqrt(Pb[0][0]) * np.sqrt(Pb[1][1]))
sigmax = np.sqrt(Pb[0][0])
sigmay = np.sqrt(Pb[1][1])
fac = 1.0 / (2.0 * (1 - rho**2))
# Calculate lon & lat coordinates of ellipse
ellipse_lon = []
ellipse_lat = []
increment = np.pi/180.0
for radians in np.arange(0, (2.0*np.pi)+increment, increment):
xstart = np.cos(radians)
ystart = np.sin(radians)
for rdistance in np.arange(0., 2400.):
xloc = xstart * rdistance/80.0
yloc = ystart * rdistance/80.0
prob = np.exp(-1.0 * fac * ((xloc/sigmax)**2 + (yloc/sigmay)
** 2 - 2.0 * rho * (xloc/sigmax)*(yloc/sigmay)))
if prob < 0.256:
ellipse_lon.append(xloc + mean_lon)
ellipse_lat.append(yloc + mean_lat)
break
# Return ellipse data
return {'ellipse_lon': ellipse_lon, 'ellipse_lat': ellipse_lat}
[docs]def create_storm_dict(filepath, storm_name, storm_id, delimiter=',', time_format='%Y%m%d%H', **kwargs):
r"""
Creates a storm dict from custom user-provided data.
Parameters
----------
filepath : str
Relative or absolute file path containing custom storm data.
storm_name : str
Storm name for custom storm entry.
storm_id : str
Storm ID for custom storm entry.
Other Parameters
----------------
delimiter : str
Delimiter separating columns for text files. Default is ",".
time_format : str
Time format for which to convert times to Datetime objects. Default is ``"%Y%m%d%H"`` (e.g., ``"2015070112"`` for 1200 UTC 1 July 2015).
time_header : str
Name of time dimension. If not provided, function will attempt to locate it internally.
lat_header : str
Name of latitude dimension. If not provided, function will attempt to locate it internally.
lon_header : str
Name of longitude dimension. If not provided, function will attempt to locate it internally.
vmax_header : str
Name of maximum sustained wind (knots) dimension. If not provided, function will attempt to locate it internally.
mslp_header : str
Name of minimum MSLP (hPa) dimension. If not provided, function will attempt to locate it internally.
type_header : str
Name of storm type dimension. If not provided, function will attempt to locate it internally. If not part of entry data, storm type will be derived based on provided wind speed values.
Returns
-------
dict
Dictionary containing formatted custom storm data.
Notes
-----
This function creates a formatted storm data dictionary using custom user-provided data. The constraints for the parser are as follows:
1. Rows that begin with a ``#`` or ``\`` are automatically ignored.
2. The first non-commented row must be a header row.
3. The header row must contain entries for time, latitude, longitude, maximum sustained wind (knots), and minimum MSLP (hPa). The order of these columns does not matter.
4. Preferred header names are "time", "lat", "lon", "vmax" and "mslp" for the main 5 categories, but custom header names can be provided. Refer to the "Other Parameters" section above.
5. Providing a "type" column (e.g., "TS", "HU", "EX") is not required, but is recommended especially if dealing with subtropical or non-tropical types.
Below is an example file which we'll call ``data.txt``::
# The row below is a header row. The order of the columns doesn't matter.
time,lat,lon,vmax,mslp,type
2021080518,19.4,-59.9,25,1014,DB
2021080600,19.7,-60.2,25,1014,DB
2021080606,20.1,-60.5,30,1012,DB
2021080612,20.5,-60.8,30,1011,TD
2021080618,20.7,-61.3,30,1011,TD
2021080700,20.8,-61.8,35,1008,TS
2021080706,20.8,-62.6,35,1007,TS
2021080712,21.0,-63.3,40,1004,TS
2021080718,21.3,-64.1,45,1002,TS
2021080800,21.6,-65.1,55,998,TS
2021080806,22.0,-66.1,60,994,TS
2021080812,22.5,-66.9,65,989,HU
2021080818,23.2,-67.4,65,988,HU
2021080900,23.9,-67.6,60,992,TS
2021080906,25.0,-67.5,55,994,TS
2021080912,26.5,-67.1,55,993,TS
2021080918,28.0,-66.4,50,992,TS
2021081000,29.5,-65.6,50,994,TS
2021081006,31.4,-64.0,45,996,TS
2021081012,33.4,-62.0,45,997,EX
2021081018,35.4,-59.5,50,997,EX
Reading it into the parser returns the following dict:
>>> from tropycal import utils
>>> storm_dict = utils.create_storm_dict(filename='data.txt', storm_name='Test', storm_id='AL502021')
>>> print(storm_dict)
{'id': 'AL502021',
'operational_id': 'AL502021',
'name': 'Test',
'source_info': 'Custom User Data',
'source': 'custom',
'time': [datetime.datetime(2021, 8, 5, 18, 0),
datetime.datetime(2021, 8, 6, 0, 0),
....
datetime.datetime(2021, 8, 10, 12, 0),
datetime.datetime(2021, 8, 10, 18, 0)],
'extra_obs': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
'special': ['','',....,'',''],
'type': ['DB','DB','DB','TD','TD','TS','TS','TS','TS','TS','TS','HU','HU','TS','TS','TS','TS','TS','TS','EX','EX'],
'lat': [19.4,19.7,20.1,20.5,20.7,20.8,20.8,21.0,21.3,21.6,22.0,22.5,23.2,23.9,25.0,26.5,28.0,29.5,31.4,33.4,35.4],
'lon': [-59.9,-60.2,-60.5,-60.8,-61.3,-61.8,-62.6,-63.3,-64.1,-65.1,-66.1,-66.9,-67.4,-67.6,-67.5,-67.1,-66.4,-65.6,-64.0,-62.0,-59.5],
'vmax': [25.0,25.0,30.0,30.0,30.0,35.0,35.0,40.0,45.0,55.0,60.0,65.0,65.0,60.0,55.0,55.0,50.0,50.0,45.0,45.0,50.0],
'mslp': [1014,1014,1012,1011,1011,1008,1007,1004,1002,998,994,989,988,992,994,993,992,994,996,997,997],
'wmo_basin': ['north_atlantic','north_atlantic',....,'north_atlantic','north_atlantic'],
'ace': 3.7825,
'basin': 'north_atlantic',
'year': 2021,
'season': 2021}
This custom data can then be plugged into a Storm object:
>>> from tropycal import tracks
>>> storm = tracks.Storm(storm_dict)
>>> print(storm)
<tropycal.tracks.Storm>
Storm Summary:
Maximum Wind: 65 knots
Minimum Pressure: 988 hPa
Start Time: 1200 UTC 06 August 2021
End Time: 0600 UTC 10 August 2021
.
Variables:
time (datetime) [2021-08-05 18:00:00 .... 2021-08-10 18:00:00]
extra_obs (int32) [0 .... 0]
special (str) [ .... ]
type (str) [DB .... EX]
lat (float64) [19.4 .... 35.4]
lon (float64) [-59.9 .... -59.5]
vmax (float64) [25.0 .... 50.0]
mslp (float64) [1014.0 .... 997.0]
wmo_basin (str) [north_atlantic .... north_atlantic]
.
More Information:
id: AL502021
operational_id: AL502021
name: Test
source_info: Custom User Data
source: custom
ace: 3.8
basin: north_atlantic
year: 2021
season: 2021
realtime: False
invest: False
"""
# Pop kwargs
time_header = kwargs.pop('time_header', None)
lat_header = kwargs.pop('lat_header', None)
lon_header = kwargs.pop('lon_header', None)
vmax_header = kwargs.pop('vmax_header', None)
mslp_header = kwargs.pop('mslp_header', None)
type_header = kwargs.pop('type_header', None)
# Check for file extension
netcdf = True if filepath[-3:] == '.nc' else False
# Create empty dict
data = {'id': storm_id,
'operational_id': storm_id,
'name': storm_name,
'source_info': 'Custom User Data',
'source': 'custom',
'time': [],
'extra_obs': [],
'special': [],
'type': [],
'lat': [],
'lon': [],
'vmax': [],
'mslp': [],
'wmo_basin': [],
'ace': 0.0}
# Parse text file
if not netcdf:
# Store header data
header = {}
# Read file content
f = open(filepath, "r")
content = f.readlines()
f.close()
for line in content:
if len(line) < 5:
continue
if line[0] in ['#', '/']:
continue
line = line.split("\n")[0]
line = line.split("\r")[0]
# Split line
if delimiter in ['', ' ']:
delimiter = None
lineArray = line.split(delimiter)
lineArray = [i.replace(" ", "") for i in lineArray]
# Determine header
if len(header) == 0:
for value in lineArray:
if value == time_header or value.lower() in ['time', 'date', 'valid_time', 'valid_date']:
header['time'] = [value, lineArray.index(value)]
elif value == lat_header or value.lower() in ['lat', 'latitude', 'lat_0']:
header['lat'] = [value, lineArray.index(value)]
elif value == lon_header or value.lower() in ['lon', 'longitude', 'lon_0']:
header['lon'] = [value, lineArray.index(value)]
elif value == vmax_header or value.lower() in ['vmax', 'wind', 'wspd', 'max_wind', 'wind_speed']:
header['vmax'] = [value, lineArray.index(value)]
elif value == mslp_header or value.lower() in ['mslp', 'slp', 'min_mslp', 'pressure', 'pres', 'minimum_mslp']:
header['mslp'] = [value, lineArray.index(value)]
elif value == type_header or value.lower() in ['type', 'storm_type']:
header['type'] = [value, lineArray.index(value)]
found = [i in header.keys()
for i in ['time', 'lat', 'lon', 'vmax', 'mslp']]
if False in found:
raise ValueError(
"Data must have columns for 'time', 'lat', 'lon', 'vmax' and 'mslp'.")
continue
# Enter standard data into dict
enter_date = dt.strptime(
lineArray[header.get('time')[1]], time_format)
if enter_date in data['time']:
raise ValueError(
"Error: Multiple entries detected for the same valid time.")
data['time'].append(enter_date)
data['lat'].append(float(lineArray[header.get('lat')[1]]))
data['lon'].append(float(lineArray[header.get('lon')[1]]))
data['vmax'].append(float(lineArray[header.get('vmax')[1]]))
data['mslp'].append(float(lineArray[header.get('mslp')[1]]))
# Derive storm type if needed
if 'type' in header.keys():
temp_type = lineArray[header.get('type')[1]]
data['type'].append(temp_type)
else:
data['type'].append(get_storm_type(data['vmax'][-1], False))
# Derive ACE
if data['time'][-1].strftime('%H%M') in constants.STANDARD_HOURS and data['type'][-1] in constants.NAMED_TROPICAL_STORM_TYPES:
data['ace'] += accumulated_cyclone_energy(data['vmax'][-1])
# Derive basin
if len(data['wmo_basin']) == 0:
data['wmo_basin'].append(
get_basin(data['lat'][-1], data['lon'][-1], 'north_atlantic'))
data['basin'] = data['wmo_basin'][-1]
else:
data['wmo_basin'].append(
get_basin(data['lat'][-1], data['lon'][-1], data['basin']))
# Other entries
extra_obs = 0 if data['time'][-1].strftime(
'%H%M') in constants.STANDARD_HOURS else 1
data['extra_obs'].append(extra_obs)
data['special'].append('')
# Add other info
data['year'] = data['time'][0].year
data['season'] = data['time'][0].year
# Return dict
return data
[docs]def ships_parser(content):
r"""
Parses SHIPS text data into multiple sorted dictionaries.
Parameters
----------
content : str
SHIPS file content.
Returns
-------
dict
Dictionary containing parsed SHIPS data.
Notes
-----
This function is referenced internally when creating SHIPS objects, but can also be used as a standalone function.
"""
data = {}
data_ri = {}
data_attrs = {}
def split_first_group(line):
subset_line = line[15:]
chunk_size = 6
return [(subset_line[i:i+chunk_size]).replace(' ','') for i in range(0, len(subset_line), chunk_size)]
def split_prob(line):
line_subset_1 = int((line.split('threshold=')[1]).split('%')[0])
line_subset_2 = float((line.split('% is')[1]).split('times')[0])
if 'mean (' in line:
line_subset_3 = float((line.split('mean (')[1]).split('%')[0])
else:
line_subset_3 = float((line.split('mean(')[1]).split('%')[0])
return line_subset_1, line_subset_2, line_subset_3
content = content.split('\n')
for line in content:
# Attempt to retrieve storm name and forecast init
if len(line.strip()) > 5 and line.strip()[0] == '*' and 'UTC' in line and 'storm_name' not in data_attrs:
line_array = line.split()
data_attrs['forecast_init'] = dt.strptime(f'{line_array[3]} {line_array[4]}','%m/%d/%y %H')
storm_name = line_array[1]
if storm_name.upper() in ['UNNAMED', 'INVEST', 'UNKNOWN']:
# Determine suffix
storm_id = line_array[2]
storm_suffix = storm_id[1]
if storm_id[0] in ['C', 'E', 'W', 'I', 'S']:
storm_suffix = storm_id[0]
storm_name = f'{(line_array[2])[2:4]}{storm_suffix}'
data_attrs['storm_name'] = storm_name
# Parse first group into dict
first_group = {
'TIME (HR)': ['fhr',int],
'LAT (DEG N)': ['lat',float],
'LONG(DEG W)': ['lon',float],
'V (KT) NO LAND': ['vmax_noland_kt',int],
'V (KT) LAND': ['vmax_land_kt',int],
'V (KT) LGEM': ['vmax_lgem_kt',int],
'Storm Type': ['storm_type',str],
'SHEAR (KT)': ['shear_kt',int],
'SHEAR ADJ (KT)': ['shear_adj_kt',int],
'SHEAR DIR': ['shear_dir',int],
'SST (C)': ['sst_c',float],
'POT. INT. (KT)': ['vmax_pot_kt',int],
'200 MB T (C)': ['200mb_temp_c',float],
'TH_E DEV (C)': ['thetae_dev_c',int],
'700-500 MB RH': ['700_500_rh',int],
'MODEL VTX (KT)': ['model_vortex_kt',int],
'850 MB ENV VOR': ['850mb_env_vort',int],
'200 MB DIV': ['200mb_div',int],
'700-850 TADV': ['700_850_tadv',int],
'LAND (KM)': ['dist_land_km',int],
'STM SPEED (KT)': ['storm_speed_kt',int],
'HEAT CONTENT': ['heat_content',int]
}
checks = ['N/A','LOST','XX.X','XXX.X','DIS']
for key in first_group.keys():
if line.startswith(key):
data[first_group[key][0]] = [first_group[key][1](i) if i.upper() not in checks
else np.nan for i in split_first_group(line)]
if key == 'LONG(DEG W)':
data[first_group[key][0]] = [i*-1 if i < 180 else (i - 360) * -1
for i in data[first_group[key][0]]]
# Parse attributes
if 'INITIAL HEADING/SPEED (DEG/KT)' in line:
line_subset = line.split('INITIAL HEADING/SPEED (DEG/KT):')[1][:10]
data_attrs['storm_bearing_deg'] = int(line_subset.split('/')[0])
data_attrs['storm_motion_kt'] = int(line_subset.split('/')[1])
if 'T-12 MAX WIND' in line:
data_attrs['max_wind_t-12_kt'] = int(line.split('T-12 MAX WIND:')[1][:10])
if 'PRESSURE OF STEERING LEVEL (MB)' in line:
line_subset = line.split('PRESSURE OF STEERING LEVEL (MB):')[1]
line_subset_mean = (line_subset.split('MEAN=')[1]).split(')')[0]
data_attrs['steering_level_pres_hpa'] = int(line_subset.split('(')[0])
data_attrs['steering_level_pres_mean_hpa'] = int(line_subset_mean)
if 'GOES IR BRIGHTNESS TEMP. STD DEV.' in line:
line_subset = line.split('50-200 KM RAD:')[1]
line_subset_mean = (line_subset.split('MEAN=')[1]).split(')')[0]
data_attrs['brightness_temp_stdev'] = float(line_subset.split('(')[0])
data_attrs['brightness_temp_stdev_mean'] = float(line_subset_mean)
if 'GOES IR PIXELS WITH T' in line:
line_subset = line.split('50-200 KM RAD:')[1]
line_subset_mean = (line_subset.split('MEAN=')[1]).split(')')[0]
data_attrs['pixels_below_-20c'] = float(line_subset.split('(')[0])
data_attrs['pixels_below_-20c_mean'] = float(line_subset_mean)
# Rapid intensification probabilities
ri_group = {
'Prob RI for 20kt/ 12hr RI': '20kt/12hr',
'Prob RI for 25kt/ 24hr RI': '25kt/24hr',
'Prob RI for 30kt/ 24hr RI': '30kt/24hr',
'Prob RI for 35kt/ 24hr RI': '35kt/24hr',
'Prob RI for 40kt/ 24hr RI': '40kt/24hr',
'Prob RI for 45kt/ 36hr RI': '45kt/36hr',
'Prob RI for 55kt/ 48hr RI': '55kt/48hr',
'Prob RI for 65kt/ 72hr RI': '65kt/72hr',
'RI for 25 kt RI': '25kt/24hr',
'RI for 30 kt RI': '30kt/24hr',
'RI for 35 kt RI': '35kt/24hr',
'RI for 40 kt RI': '40kt/24hr',
}
for key in ri_group.keys():
if key in line:
prob, times, climo = split_prob(line)
data_ri[ri_group[key]] = {
'probability': prob if prob != 999 else np.nan,
'climo_mean': climo,
'prob / climo': times if times != 999 else np.nan,
}
# Add current location to attributes
data_attrs['lat'] = data['lat'][0]
data_attrs['lon'] = data['lon'][0]
return {
'data': data,
'data_ri': data_ri,
'data_attrs': data_attrs,
}
# ===========================================================================================================
# Private utilities
# These are primarily intended to be used internally. Do not add these to documentation.
# ===========================================================================================================
# Function for plugging small array into larger array
def plug_array(small, large, small_coords, large_coords):
r"""
Plug small array into large array with matching coords.
Parameters
----------
small : numpy.ndarray
Small array to be plugged into the larger array.
large : numpy.ndarray
Large array for the small array to be plugged into.
small_coords : dict
Dictionary containing 'lat' and 'lon' keys, whose values are numpy.ndarrays of lat & lon for the small array.
large_coords : dict
Dictionary containing 'lat' and 'lon' keys, whose values are numpy.ndarrays of lat & lon for the large array.
Returns
-------
numpy.ndarray
An array of the same dimensions as "large", with the small array plugged inside the large array.
"""
small_lat = np.round(small_coords['lat'], 2)
small_lon = np.round(small_coords['lon'], 2)
large_lat = np.round(large_coords['lat'], 2)
large_lon = np.round(large_coords['lon'], 2)
small_minlat = np.nanmin(small_lat)
small_maxlat = np.nanmax(small_lat)
small_minlon = np.nanmin(small_lon)
small_maxlon = np.nanmax(small_lon)
if small_minlat in large_lat:
minlat = np.where(large_lat == small_minlat)[0][0]
else:
minlat = min(large_lat)
if small_maxlat in large_lat:
maxlat = np.where(large_lat == small_maxlat)[0][0]
else:
maxlat = max(large_lat)
if small_minlon in large_lon:
minlon = np.where(large_lon == small_minlon)[0][0]
else:
minlon = min(large_lon)
if small_maxlon in large_lon:
maxlon = np.where(large_lon == small_maxlon)[0][0]
else:
maxlon = max(large_lon)
large[minlat:maxlat+1, minlon:maxlon+1] = small
return large
def calc_distance(lats2d, lons2d, lat, lon):
r"""
Calculates distance (km) for each gridpoint in a 2D array from a provided coordinate.
Parameters
----------
lats2d : numpy.ndarray
2D array containing latitude in degrees
lons2d : numpy.ndarray
2D array containing longitude in degrees
lat : float or int
Latitude of requested coordinate
lon : float or int
Longitude of requested coordinate
Returns
-------
list
First element returned is an empty 2D array of dimension (lons,lats). Second element returned is an array of the same shape containing the distance from the requested coordinate in km.
"""
# Define empty array
return_arr = np.zeros((lats2d.shape))
# Calculate distance from lat/lon at each gridpoint
r_earth = 6.371 * 10**6
dlat = np.subtract(np.radians(lats2d), np.radians(lat))
dlon = np.subtract(np.radians(lons2d), np.radians(lon))
a = np.sin(dlat/2) * np.sin(dlat/2) + np.cos(np.radians(lats2d)) * np.cos(np.radians(lat)) * np.sin(dlon/2) * np.sin(dlon/2)
c = 2 * np.arctan(np.sqrt(a), np.sqrt(1-a))
dist = (r_earth * c)/1000.0
return return_arr, dist
def add_radius(lats2d, lons2d, lat, lon, rad):
r"""
Determines whether a requested coordinate is within a specified radius in a 2D array.
Parameters
----------
lats : numpy.ndarray
2D array containing latitude in degrees
lons : numpy.ndarray
2D array containing longitude in degrees
lat : float or int
Latitude of requested coordinate
lon : float or int
Longitude of requested coordinate
rad : float or int
Requested radius in kilometers
res : float or int, optional
Resolution of grid to create. Default is 0.25 degrees.
Returns
-------
numpy.ndarray
2D array containing 1 where the requested coordinate is within the requested radius, and 0 otherwise.
"""
# Define empty array
return_arr = np.zeros((lats2d.shape))
# Calculate distance from lat/lon at each gridpoint
r_earth = 6.371 * 10**6
dlat = np.subtract(np.radians(lats2d), np.radians(lat))
dlon = np.subtract(np.radians(lons2d), np.radians(lon))
a = np.sin(dlat*0.5) * np.sin(dlat*0.5) + np.cos(np.radians(lats2d)) * np.cos(np.radians(lat)) * np.sin(dlon*0.5) * np.sin(dlon*0.5)
c = 2 * np.arctan(np.sqrt(a), np.sqrt(1-a))
dist = (r_earth * c) * 0.001
# Mask out values less than radius
return_arr[dist > rad] = 0
return_arr[dist <= rad] = 1
return return_arr
def add_radius_quick(lats, lons, lat, lon, rad, res=0.25):
r"""
Determines whether a requested coordinate is within a specified radius in a 2D array. Performs a faster calculation than the default "add_radius" function that is a good approximation outside of the poles.
Parameters
----------
lats : numpy.ndarray
1D array containing latitude in degrees
lons : numpy.ndarray
1D array containing longitude in degrees
lat : float or int
Latitude of requested coordinate
lon : float or int
Longitude of requested coordinate
rad : float or int
Requested radius in kilometers
res : float or int, optional
Resolution of grid to create. Default is 0.25 degrees.
Returns
-------
numpy.ndarray
2D array containing 1 where the requested coordinate is within the requested radius, and 0 otherwise.
"""
lons2d, lats2d = np.meshgrid(lons, lats)
return_arr = np.zeros((lats2d.shape))
dist = np.zeros((lats2d.shape)) + 9999
if lon is None or lat is None:
return return_arr
new_lats = np.arange(np.round(lat-5), np.round(lat+5+res), res)
if np.nanmin(lats) >= 0:
if np.nanmin(new_lats) < 0:
new_lats = np.arange(0, np.round(lat+5+res), res)
else:
if np.nanmax(new_lats) > 0:
new_lats = np.arange(np.round(lat-5), 0+res, res)
new_lons = np.arange(np.round(lon-10), np.round(lon+10+res), res)
new_lons_2d, new_lats_2d = np.meshgrid(new_lons, new_lats)
new_arr, new_dist = calc_distance(new_lats_2d, new_lons_2d, lat, lon)
return_arr = plug_array(new_arr, return_arr, {
'lat': new_lats, 'lon': new_lons}, {'lat': lats, 'lon': lons})
return_dist = plug_array(new_dist, dist, {'lat': new_lats, 'lon': new_lons}, {
'lat': lats, 'lon': lons})
# Mask out values less than radius
return_arr[return_dist > rad] = 0
return_arr[return_dist <= rad] = 1
return return_arr
def all_nan(arr):
r"""
Determine whether the entire array is filled with NaNs.
Parameters
----------
arr : list or numpy.ndarray
List or array to be checked.
Returns
-------
bool
Returns whether the array is filled with all NaNs.
"""
# Convert array to numpy array
arr_copy = np.array(arr)
# Check if there are non-NaN values in the array
if len(arr_copy[~np.isnan(arr_copy)]) == 0:
return True
else:
return False
def is_number(value):
r"""
Determine whether the provided value is a number.
Parameters
----------
value
A value to check the type of.
Returns
-------
bool
Returns True if the value is a number, otherwise False.
"""
return isinstance(value, (int, np.integer, float, np.floating))
def category_label_to_wind(category):
r"""
Convert Saffir-Simpson Hurricane Wind Scale category label to minimum threshold sustained wind speed in knots. Internal function.
Parameters
----------
category : int
Saffir-Simpson Hurricane Wind Scale category. Use 0 for tropical storm, -1 for tropical depression.
Returns
-------
int
Sustained wind speed in knots corresponding to the minimum threshold of the requested category.
"""
# Convert category to lowercase
category_lowercase = category.lower()
# Return thresholds based on category label
if category_lowercase == 'td' or category_lowercase == 'sd':
return category_to_wind(0) - 1
elif category_lowercase == 'ts' or category_lowercase == 'ss':
return category_to_wind(0)
elif category_lowercase == 'c1':
return category_to_wind(1)
elif category_lowercase == 'c2':
return category_to_wind(2)
elif category_lowercase == 'c3':
return category_to_wind(3)
elif category_lowercase == 'c4':
return category_to_wind(4)
else:
return category_to_wind(5)
def dynamic_map_extent(min_lon, max_lon, min_lat, max_lat, recon=False):
r"""
Sets up a dynamic map extent with an aspect ratio of 3:2 given latitude and longitude bounds.
Parameters:
-----------
min_lon : float
Minimum longitude bound.
max_lon : float
Maximum longitude bound.
min_lat : float
Minimum latitude bound.
max_lat : float
Maximum latitude bound.
recon : bool
Zoom in plots closer for recon plots.
Returns:
--------
list
List containing new west, east, north, south map bounds, respectively.
"""
# Get lat/lon bounds
bound_w = min_lon+0.0
bound_e = max_lon+0.0
bound_s = min_lat+0.0
bound_n = max_lat+0.0
# If only one coordinate point, artificially induce a spread
if bound_w == bound_e:
bound_w = bound_w - 0.6
bound_e = bound_e + 0.6
if bound_s == bound_n:
bound_n = bound_n + 0.6
bound_s = bound_s - 0.6
# Function for fixing map ratio
def fix_map_ratio(bound_w, bound_e, bound_n, bound_s, nthres=1.45):
xrng = abs(bound_w-bound_e)
yrng = abs(bound_n-bound_s)
diff = float(xrng) / float(yrng)
if diff < nthres: # plot too tall, need to make it wider
goal_diff = nthres * (yrng)
factor = abs(xrng - goal_diff) / 2.0
bound_w = bound_w - factor
bound_e = bound_e + factor
elif diff > nthres: # plot too wide, need to make it taller
goal_diff = xrng / nthres
factor = abs(yrng - goal_diff) / 2.0
bound_s = bound_s - factor
bound_n = bound_n + factor
return bound_w, bound_e, bound_n, bound_s
# First round of fixing ratio
bound_w, bound_e, bound_n, bound_s = fix_map_ratio(
bound_w, bound_e, bound_n, bound_s, 1.45)
# Adjust map width depending on extent of storm
xrng = abs(bound_e-bound_w)
yrng = abs(bound_n-bound_s)
factor = 0.1
if min(xrng, yrng) < 15.0:
factor = 0.2
if min(xrng, yrng) < 12.0:
factor = 0.4
if min(xrng, yrng) < 10.0:
factor = 0.6
if min(xrng, yrng) < 8.0:
factor = 0.75
if min(xrng, yrng) < 6.0:
factor = 0.9
if recon:
factor = factor * 0.3
bound_w = bound_w-(xrng*factor)
bound_e = bound_e+(xrng*factor)
bound_s = bound_s-(yrng*factor)
bound_n = bound_n+(yrng*factor)
# Second round of fixing ratio
bound_w, bound_e, bound_n, bound_s = fix_map_ratio(
bound_w, bound_e, bound_n, bound_s, 1.45)
# Return map bounds
return bound_w, bound_e, bound_s, bound_n
def read_url(url, split=True, subsplit=True):
f = urllib.request.urlopen(url)
content = f.read()
content = content.decode("utf-8")
if split:
content = content.split("\n")
if subsplit:
content = [(i.replace(" ", "")).split(",") for i in content]
f.close()
return content
class Distance:
def __init__(self, dist, units='kilometers'):
# Conversion fractions (numerator_denominator)
mi_km = 0.621371
nmi_km = 0.539957
m_km = 1000.
ft_km = 3280.84
if units in ['kilometers', 'km']:
self.kilometers = dist
elif units in ['miles', 'mi']:
self.kilometers = dist / mi_km
elif units in ['nauticalmiles', 'nautical', 'nmi']:
self.kilometers = dist / nmi_km
elif units in ['feet', 'ft']:
self.kilometers = dist / ft_km
elif units in ['meters', 'm']:
self.kilometers = dist / m_km
self.miles = self.kilometers * mi_km
self.nautical = self.kilometers * nmi_km
self.meters = self.kilometers * m_km
self.feet = self.kilometers * ft_km
class great_circle(Distance):
def __init__(self, start_point, end_point, **kwargs):
r"""
Parameters
----------
start_point : tuple or int
Starting pair of coordinates, in order of (latitde, longitude)
end_point : tuple or int
Starting pair of coordinates, in order of (latitde, longitude)
radius : float, optional
Radius of Earth. Default is 6371.009 km.
Returns
-------
great_circle
Instance of a great_circle object. To retrieve the distance, add the requested unit at the end (e.g., "great_circle(start,end).kilometers").
Notes
-----
Use spherical geometry to calculate the surface distance between two points. Uses the mean earth radius as defined by the International Union of Geodesy and Geophysics, approx 6371.009 km (for WGS-84), resulting in an error of up to about 0.5%. Otherwise set which radius of the earth to use by specifying a 'radius' keyword argument. It must be in kilometers.
"""
# Set Earth's radius
self.RADIUS = kwargs.pop('radius', 6371.009)
# Compute
dist = self.measure(start_point, end_point)
Distance.__init__(self, dist, units='kilometers')
def measure(self, start_point, end_point):
# Retrieve latitude and longitude coordinates from input pairs
lat1, lon1 = math.radians(
start_point[0]), math.radians(start_point[1] % 360)
lat2, lon2 = math.radians(
end_point[0]), math.radians(end_point[1] % 360)
# Compute sin and cos of coordinates
sin_lat1, cos_lat1 = math.sin(lat1), math.cos(lat1)
sin_lat2, cos_lat2 = math.sin(lat2), math.cos(lat2)
# Compute sin and cos of delta longitude
delta_lon = lon2 - lon1
cos_delta_lon, sin_delta_lon = math.cos(delta_lon), math.sin(delta_lon)
# Compute great circle distance
d = math.atan2(math.sqrt((cos_lat2 * sin_delta_lon) ** 2 +
(cos_lat1 * sin_lat2 -
sin_lat1 * cos_lat2 * cos_delta_lon) ** 2),
sin_lat1 * sin_lat2 + cos_lat1 * cos_lat2 * cos_delta_lon)
# Return great circle distance
return self.RADIUS * d
r"""
The two classes below are a modified version of Cartopy's shapereader functionality, specified to directly read in an already-existing Shapely object as opposed to expecting a local shapefile to be read in.
"""
class Record:
"""
A single logical entry from a shapefile, combining the attributes with their associated geometry. Adapted from Cartopy's Record class.
"""
def __init__(self, shape, attributes, fields):
self._shape = shape
self._bounds = None
if hasattr(shape, 'bbox'):
self._bounds = tuple(shape.bbox)
self._geometry = None
self.attributes = attributes
self._fields = fields
def __repr__(self):
return f'<Record: {self.geometry!r}, {self.attributes!r}, <fields>>'
def __str__(self):
return f'Record({self.geometry}, {self.attributes}, <fields>)'
@property
def bounds(self):
"""
The bounds of this Record's :meth:`~Record.geometry`.
"""
if self._bounds is None:
self._bounds = self.geometry.bounds
return self._bounds
@property
def geometry(self):
"""
A shapely.geometry instance for this Record.
The geometry may be ``None`` if a null shape is defined in the
shapefile.
"""
if not self._geometry and self._shape.shapeType != shapefile.NULL:
self._geometry = sgeom.shape(self._shape)
return self._geometry
class BasicReader:
"""
This is a modified version of Cartopy's BasicReader class. This allows to read in a shapefile fetched online without it being stored as a file locally.
"""
def __init__(self, reader):
# Validate the filename/shapefile
self._reader = reader
if reader.shp is None or reader.shx is None or reader.dbf is None:
raise ValueError("Unable to open shapefile")
self._fields = self._reader.fields
def close(self):
return self._reader.close()
def __len__(self):
return len(self._reader)
def geometries(self):
"""
Return an iterator of shapely geometries from the shapefile.
"""
to_return = []
for shape in self._reader.iterShapes():
# Skip the shape that can not be represented as geometry.
if shape.shapeType != shapefile.NULL:
to_return.append(sgeom.shape(shape))
return to_return
def records(self):
"""
Return an iterator of :class:`~Record` instances.
"""
# Ignore the "DeletionFlag" field which always comes first
to_return = []
fields = self._reader.fields[1:]
for shape_record in self._reader.iterShapeRecords():
attributes = shape_record.record.as_dict()
to_return.append(Record(shape_record.shape, attributes, fields))
return to_return