r"""Functionality for storing and analyzing an individual storm."""
import re
import numpy as np
import xarray as xr
import pandas as pd
import urllib
import warnings
from datetime import datetime as dt, timedelta
import requests
import copy
# Import internal scripts
from .plot import TrackPlot
from ..tornado import *
from ..recon import ReconDataset
from ..ships import Ships
# Import tools
from .tools import *
from ..utils import *
try:
import zipfile
import gzip
from io import BytesIO
import tarfile
except ImportError:
warnings.warn(
"Warning: The libraries necessary for online NHC forecast retrieval aren't available (gzip, io, tarfile).")
try:
import matplotlib.lines as mlines
import matplotlib.patheffects as path_effects
import matplotlib.pyplot as plt
except ImportError:
warnings.warn(
"Warning: Matplotlib is not installed in your python environment. Plotting functions will not work.")
[docs]class Storm:
r"""
Initializes an instance of Storm, retrieved via ``TrackDataset.get_storm()``.
Parameters
----------
storm : dict
Dict entry of the requested storm.
Other Parameters
----------------
stormTors : dict, optional
Dict entry containing tornado data assicated with the storm. Populated directly from tropycal.tracks.TrackDataset.
Returns
-------
Storm
Instance of a Storm object.
Notes
-----
A Storm object is retrieved from TrackDataset's ``get_storm()`` method. For example, if the dataset read in is the default North Atlantic and the desired storm is Hurricane Michael (2018), it would be retrieved as follows:
.. code-block:: python
from tropycal import tracks
basin = tracks.TrackDataset()
storm = basin.get_storm(('michael',2018))
Now Hurricane Michael's data is stored in the variable ``storm``, which is an instance of Storm and can access all of the methods and attributes of a Storm object.
All the variables associated with a Storm object (e.g., lat, lon, time, vmax) can be accessed in two ways. The first is directly from the Storm object:
>>> storm.lat
array([17.8, 18.1, 18.4, 18.8, 19.1, 19.7, 20.2, 20.9, 21.7, 22.7, 23.7,
24.6, 25.6, 26.6, 27.7, 29. , 30. , 30.2, 31.5, 32.8, 34.1, 35.6,
36.5, 37.3, 39.1, 41.1, 43.1, 44.8, 46.4, 47.6, 48.4, 48.8, 48.6,
47.5, 45.9, 44.4, 42.8, 41.2])
The second is via ``storm.vars``, which returns a dictionary of the variables associated with the Storm object. This is also a quick way to access all of the variables associated with a Storm object:
>>> variable_dict = storm.vars
>>> lat = variable_dict['lat']
>>> lon = variable_dict['lon']
>>> print(variable_dict.keys())
dict_keys(['time', 'extra_obs', 'special', 'type', 'lat', 'lon', 'vmax', 'mslp', 'wmo_basin'])
Storm objects also have numerous attributes with information about the storm. ``storm.attrs`` returns a dictionary of the attributes for this Storm object.
>>> print(storm.attrs)
{'id': 'AL142018',
'operational_id': 'AL142018',
'name': 'MICHAEL',
'year': 2018,
'season': 2018,
'basin': 'north_atlantic',
'source_info': 'NHC Hurricane Database',
'source': 'hurdat',
'ace': 12.5,
'realtime': False,
'invest': False}
"""
def __setitem__(self, key, value):
self.__dict__[key] = value
def __getitem__(self, key):
return self.__dict__[key]
def __repr__(self):
# Label object
summary = ["<tropycal.tracks.Storm>"]
# Format keys for summary
type_array = np.array(self.dict['type'])
idx = np.where((type_array == 'SD') | (type_array == 'SS') | (type_array == 'TD') | (
type_array == 'TS') | (type_array == 'HU') | (type_array == 'TY') | (type_array == 'ST'))[0]
if self.invest and len(idx) == 0:
idx = np.array([True for i in type_array])
if len(idx) == 0:
start_time = 'N/A'
end_time = 'N/A'
max_wind = 'N/A'
min_mslp = 'N/A'
else:
time_tropical = np.array(self.dict['time'])[idx]
start_time = time_tropical[0].strftime("%H00 UTC %d %B %Y")
end_time = time_tropical[-1].strftime("%H00 UTC %d %B %Y")
max_wind = 'N/A' if all_nan(np.array(self.dict['vmax'])[idx]) else int(
np.nanmax(np.array(self.dict['vmax'])[idx]))
min_mslp = 'N/A' if all_nan(np.array(self.dict['mslp'])[idx]) else int(
np.nanmin(np.array(self.dict['mslp'])[idx]))
summary_keys = {
'Maximum Wind': f"{max_wind} knots",
'Minimum Pressure': f"{min_mslp} hPa",
'Start Time': start_time,
'End Time': end_time,
}
# Format keys for coordinates
variable_keys = {}
for key in self.vars.keys():
dtype = type(self.vars[key][0]).__name__
dtype = dtype.replace("_", "")
variable_keys[key] = f"({dtype}) [{self.vars[key][0]} .... {self.vars[key][-1]}]"
# Add storm summary
summary.append("Storm Summary:")
add_space = np.max([len(key) for key in summary_keys.keys()]) + 3
for key in summary_keys.keys():
key_name = key + ":"
summary.append(
f'{" "*4}{key_name:<{add_space}}{summary_keys[key]}')
# Add coordinates
summary.append("\nVariables:")
add_space = np.max([len(key) for key in variable_keys.keys()]) + 3
for key in variable_keys.keys():
key_name = key
summary.append(
f'{" "*4}{key_name:<{add_space}}{variable_keys[key]}')
# Add additional information
summary.append("\nMore Information:")
add_space = np.max([len(key) for key in self.attrs.keys()]) + 3
for key in self.attrs.keys():
key_name = key + ":"
val = '%0.1f' % (
self.attrs[key]) if key == 'ace' else self.attrs[key]
summary.append(f'{" "*4}{key_name:<{add_space}}{val}')
return "\n".join(summary)
def __init__(self, storm, stormTors=None):
# Save the dict entry of the storm
self.dict = storm
# Add other attributes about the storm
keys = self.dict.keys()
self.attrs = {}
self.vars = {}
for key in keys:
if key in ['realtime', 'invest', 'subset']:
continue
if not isinstance(self.dict[key], list) and not isinstance(self.dict[key], dict):
self[key] = self.dict[key]
self.attrs[key] = self.dict[key]
if isinstance(self.dict[key], list) and not isinstance(self.dict[key], dict):
self.vars[key] = np.array(self.dict[key])
self[key] = np.array(self.dict[key])
# Assign tornado data
if stormTors is not None and isinstance(stormTors, dict):
self.stormTors = stormTors['data']
self.tornado_dist_thresh = stormTors['dist_thresh']
self.attrs['Tornado Count'] = len(stormTors['data'])
# Get Archer track data for this storm, if it exists
try:
self.get_archer()
except:
pass
# Initialize recon dataset instance
self.recon = ReconDataset(storm=self)
# Determine if storm object was retrieved via realtime object
if 'realtime' in keys and self.dict['realtime']:
self.realtime = True
self.attrs['realtime'] = True
else:
self.realtime = False
self.attrs['realtime'] = False
# Determine if storm object is an invest
if 'invest' in keys and self.dict['invest']:
self.invest = True
self.attrs['invest'] = True
else:
self.invest = False
self.attrs['invest'] = False
# Determine if storm object is subset
if 'subset' in keys and self.dict['subset']:
self.subset = True
self.attrs['subset'] = True
else:
self.subset = False
self.attrs['subset'] = False
[docs] def sel(self, time=None, lat=None, lon=None, vmax=None, mslp=None,
dvmax_dt=None, dmslp_dt=None, stormtype=None, method='exact'):
r"""
Subset this storm by any of its parameters and return a new storm object.
Parameters
----------
time : datetime.datetime or list/tuple of datetimes
Datetime object for single point, or list/tuple of start time and end time.
Default is None, which returns all points
lat : float/int or list/tuple of float/int
Float/int for single point, or list/tuple of latitude bounds (S,N).
None in either position of a tuple means it is boundless on that side.
lon : float/int or list/tuple of float/int
Float/int for single point, or list/tuple of longitude bounds (W,E).
If either lat or lon is a tuple, the other can be None for no bounds.
If either is a tuple, the other canNOT be a float/int.
vmax : list/tuple of float/int
list/tuple of vmax bounds (min,max).
None in either position of a tuple means it is boundless on that side.
mslp : list/tuple of float/int
list/tuple of mslp bounds (min,max).
None in either position of a tuple means it is boundless on that side.
dvmax_dt : list/tuple of float/int
list/tuple of vmax bounds (min,max). ONLY AVAILABLE AFTER INTERP.
None in either position of a tuple means it is boundless on that side.
dmslp_dt : list/tuple of float/int
list/tuple of mslp bounds (min,max). ONLY AVAILABLE AFTER INTERP.
None in either position of a tuple means it is boundless on that side.
stormtype : list/tuple of str
list/tuple of stormtypes (options: 'LO','EX','TD','SD','TS','SS','HU')
method : str
Applies for single point selection in time and lat/lon.
'exact' requires a point to match exactly with the request. (default)
'nearest' returns the nearest point to the request
'floor' ONLY for time, returns the nearest point before the request
'ceil' ONLY for time, returns the neartest point after the request
Returns
-------
storm object
A new storm object that satisfies the intersection of all subsetting.
"""
# create copy of storm object
new_dict = copy.deepcopy(self.dict)
new_dict['subset'] = True
NEW_STORM = Storm(new_dict)
idx_final = np.arange(len(self.time))
# apply time filter
if time is None:
idx = copy.copy(idx_final)
elif isinstance(time, dt):
time_diff = np.array([(time - i).total_seconds()
for i in NEW_STORM.time])
idx = np.abs(time_diff).argmin()
if time_diff[idx] != 0:
if method == 'exact':
msg = f'no exact match for {time}. Use different time or method.'
raise ValueError(msg)
elif method == 'floor' and time_diff[idx] < 0:
idx += -1
if idx < 0:
msg = f'no points before {time}. Use different time or method.'
raise ValueError(msg)
elif method == 'ceil' and time_diff[idx] > 0:
idx += 1
if idx >= len(time_diff):
msg = f'no points after {time}. Use different time or method.'
raise ValueError(msg)
elif isinstance(time, (tuple, list)) and len(time) == 2:
time0, time1 = time
if time0 is None:
time0 = min(NEW_STORM.time)
elif not isinstance(time0, dt):
msg = 'time bounds must be of type datetime.datetime or None.'
raise TypeError(msg)
if time1 is None:
time1 = max(NEW_STORM.time)
elif not isinstance(time1, dt):
msg = 'time bounds must be of type datetime.datetime or None.'
raise TypeError(msg)
tmptimes = np.array(NEW_STORM.time)
idx = np.where((tmptimes >= time0) & (tmptimes <= time1))[0]
if len(idx) == 0:
msg = f'no points between {time}. Use different time bounds.'
raise ValueError(msg)
else:
msg = 'time must be of type datetime.datetime, tuple/list, or None.'
raise TypeError(msg)
# update idx_final
idx_final = list(set(idx_final) & set(listify(idx)))
# apply lat/lon filter
if lat is None and lon is None:
idx = copy.copy(idx_final)
elif is_number(lat) and is_number(lon):
dist = np.array([great_circle((lat, lon), (x, y)).kilometers for x, y in zip(
NEW_STORM.lon, NEW_STORM.lat)])
idx = np.abs(dist).argmin()
if dist[idx] != 0:
if method == 'exact':
msg = f'no exact match for {lat}/{lon}. Use different location or method.'
raise ValueError(msg)
elif method in ('floor', 'ceil'):
warnings.warn(
'floor and ceil do not apply to lat/lon filtering. Using nearest instead.')
elif (isinstance(lat, (tuple, list)) and len(lat) == 2) or (isinstance(lon, (tuple, list)) and len(lon) == 2):
if not isinstance(lat, (tuple, list)):
lat = (None, None)
if not isinstance(lon, (tuple, list)):
lon = (None, None)
lat0, lat1 = lat
lon0, lon1 = lon
if lat0 is None:
lat0 = min(NEW_STORM.lat)
elif not is_number(lat0):
msg = 'lat/lon bounds must be of type float/int or None.'
raise TypeError(msg)
if lat1 is None:
lat1 = max(NEW_STORM.lat)
elif not is_number(lat1):
msg = 'lat/lon bounds must be of type float/int or None.'
raise TypeError(msg)
if lon0 is None:
lon0 = min(NEW_STORM.lon)
elif not is_number(lon0):
msg = 'lat/lon bounds must be of type float/int or None.'
raise TypeError(msg)
if lon1 is None:
lon1 = max(NEW_STORM.lon)
elif not is_number(lon1):
msg = 'lat/lon bounds must be of type float/int or None.'
raise TypeError(msg)
tmplat, tmplon = np.array(
NEW_STORM.lat), np.array(NEW_STORM.lon) % 360
idx = np.where((tmplat >= lat0) & (tmplat <= lat1) &
(tmplon >= lon0 % 360) & (tmplon <= lon1 % 360))[0]
if len(idx) == 0:
msg = f'no points in {lat}/{lon} box. Use different lat/lon bounds.'
raise ValueError(msg)
else:
msg = 'lat and lon must be of the same type: float/int, tuple/list, or None.'
raise TypeError(msg)
# update idx_final
idx_final = list(set(idx_final) & set(listify(idx)))
# apply vmax filter
if vmax is None:
idx = copy.copy(idx_final)
elif isinstance(vmax, (tuple, list)) and len(vmax) == 2:
vmax0, vmax1 = vmax
if vmax0 is None:
vmax0 = np.nanmin(NEW_STORM.vmax)
elif not is_number(vmax0):
msg = 'vmax bounds must be of type float/int or None.'
raise TypeError(msg)
if vmax1 is None:
vmax1 = np.nanmax(NEW_STORM.vmax)
elif not is_number(vmax1):
msg = 'vmax bounds must be of type float/int or None.'
raise TypeError(msg)
tmpvmax = np.array(NEW_STORM.vmax)
idx = np.where((tmpvmax >= vmax0) & (tmpvmax <= vmax1))[0]
if len(idx) == 0:
msg = f'no points with vmax between {vmax}. Use different vmax bounds.'
raise ValueError(msg)
else:
msg = 'vmax must be of type tuple/list, or None.'
raise TypeError(msg)
# update idx_final
idx_final = list(set(idx_final) & set(listify(idx)))
# apply mslp filter
if mslp is None:
idx = copy.copy(idx_final)
elif isinstance(mslp, (tuple, list)) and len(mslp) == 2:
mslp0, mslp1 = mslp
if mslp0 is None:
mslp0 = np.nanmin(NEW_STORM.mslp)
elif not is_number(mslp0):
msg = 'mslp bounds must be of type float/int or None.'
raise TypeError(msg)
if mslp1 is None:
mslp1 = np.nanmax(NEW_STORM.mslp)
elif not is_number(mslp1):
msg = 'mslp bounds must be of type float/int or None.'
raise TypeError(msg)
tmpmslp = np.array(NEW_STORM.mslp)
idx = np.where((tmpmslp >= mslp0) & (tmpmslp <= mslp1))[0]
if len(idx) == 0:
msg = f'no points with mslp between {mslp}. Use different dmslp_dt bounds.'
raise ValueError(msg)
else:
msg = 'vmax must be of type tuple/list, or None.'
raise TypeError(msg)
# update idx_final
idx_final = list(set(idx_final) & set(listify(idx)))
# apply dvmax_dt filter
if dvmax_dt is None:
idx = copy.copy(idx_final)
elif 'dvmax_dt' not in NEW_STORM.dict.keys():
msg = 'dvmax_dt not in storm data. Create new object with interp first.'
raise KeyError(msg)
elif isinstance(dvmax_dt, (tuple, list)) and len(dvmax_dt) == 2:
dvmax_dt0, dvmax_dt1 = dvmax_dt
if dvmax_dt0 is None:
dvmax_dt0 = np.nanmin(NEW_STORM.dvmax_dt)
elif not is_number(dvmax_dt0):
msg = 'dmslp_dt bounds must be of type float/int or None.'
raise TypeError(msg)
if dvmax_dt1 is None:
dvmax_dt1 = np.nanmax(NEW_STORM.dvmax_dt)
elif not is_number(dvmax_dt1):
msg = 'dmslp_dt bounds must be of type float/int or None.'
raise TypeError(msg)
tmpvmax = np.array(NEW_STORM.dvmax_dt)
idx = np.where((tmpvmax >= dvmax_dt0) & (tmpvmax <= dvmax_dt1))[0]
if len(idx) == 0:
msg = f'no points with dvmax_dt between {dvmax_dt}. Use different dvmax_dt bounds.'
raise ValueError(msg)
# update idx_final
idx_final = list(set(idx_final) & set(listify(idx)))
# apply dmslp_dt filter
if dmslp_dt is None:
idx = copy.copy(idx_final)
elif 'dmslp_dt' not in NEW_STORM.dict.keys():
msg = 'dmslp_dt not in storm data. Create new object with interp first.'
raise KeyError(msg)
elif isinstance(dmslp_dt, (tuple, list)) and len(dmslp_dt) == 2:
dmslp_dt0, dmslp_dt1 = dmslp_dt
if dmslp_dt0 is None:
dmslp_dt0 = np.nanmin(NEW_STORM.dmslp_dt)
elif not is_number(dmslp_dt0):
msg = 'dmslp_dt bounds must be of type float/int or None.'
raise TypeError(msg)
if dmslp_dt1 is None:
dmslp_dt1 = np.nanmax(NEW_STORM.dmslp_dt)
elif not is_number(dmslp_dt1):
msg = 'dmslp_dt bounds must be of type float/int or None.'
raise TypeError(msg)
tmpmslp = np.array(NEW_STORM.dmslp_dt)
idx = np.where((tmpmslp >= dmslp_dt0) & (tmpmslp <= dmslp_dt1))[0]
if len(idx) == 0:
msg = f'no points with dmslp_dt between {dmslp_dt}. Use different dmslp_dt bounds.'
raise ValueError(msg)
# update idx_final
idx_final = list(set(idx_final) & set(listify(idx)))
# apply stormtype filter
if stormtype is None:
idx = copy.copy(idx_final)
elif isinstance(stormtype, (tuple, list, str)):
idx = [i for i, j in enumerate(
NEW_STORM.type) if j in listify(stormtype)]
if len(idx) == 0:
msg = f'no points with type {stormtype}. Use different stormtype.'
raise ValueError(msg)
else:
msg = 'stormtype must be of type tuple/list, str, or None.'
raise TypeError(msg)
# update idx_final
idx_final = sorted(list(set(idx_final) & set(listify(idx))))
# Construct new storm dict with subset elements
for key in NEW_STORM.dict.keys():
if isinstance(NEW_STORM.dict[key], list):
NEW_STORM.dict[key] = [NEW_STORM.dict[key][i]
for i in idx_final]
else:
NEW_STORM.dict[key] = NEW_STORM.dict[key]
# Add other attributes to new storm object
if key == 'realtime':
continue
if not isinstance(NEW_STORM.dict[key], list) and not isinstance(NEW_STORM.dict[key], dict):
NEW_STORM[key] = NEW_STORM.dict[key]
NEW_STORM.attrs[key] = NEW_STORM.dict[key]
if isinstance(NEW_STORM.dict[key], list) and not isinstance(NEW_STORM.dict[key], dict):
NEW_STORM.vars[key] = np.array(NEW_STORM.dict[key])
NEW_STORM[key] = np.array(NEW_STORM.dict[key])
return NEW_STORM
[docs] def interp(self, hours=1, dt_window=24, dt_align='middle', method='linear'):
r"""
Interpolate a storm temporally to a specified time resolution.
Parameters
----------
hours : int or float
Temporal resolution in hours (or fraction of an hour) to interpolate storm data to. Default is 1 hour.
dt_window : int
Time window in hours over which to calculate temporal change data. Default is 24 hours.
dt_align : str
Whether to align the temporal change window as "start", "middle" (default) or "end" of the dt_window time period.
method : str
Interpolation method for lat/lon coordinates passed to scipy. Options are "linear" (default) or "quadratic".
Returns
-------
tropycal.tracks.Storm
New Storm object containing the updated dictionary.
Notes
-----
When interpolating data using a non-linear method, all non-standard hour observations (i.e., not within 00, 06, 12 or 18 UTC) are ignored for latitude & longitude interpolation in order to produce a smoother line.
"""
NEW_STORM = copy.deepcopy(self)
newdict = interp_storm(NEW_STORM.dict, hours,
dt_window, dt_align, method)
for key in newdict.keys():
NEW_STORM.dict[key] = newdict[key]
# Add other attributes to new storm object
for key in NEW_STORM.dict.keys():
if key == 'realtime':
continue
if not isinstance(NEW_STORM.dict[key], (np.ndarray, list)) and not isinstance(NEW_STORM.dict[key], dict):
NEW_STORM[key] = NEW_STORM.dict[key]
NEW_STORM.attrs[key] = NEW_STORM.dict[key]
if isinstance(NEW_STORM.dict[key], (np.ndarray, list)) and not isinstance(NEW_STORM.dict[key], dict):
NEW_STORM.dict[key] = list(NEW_STORM.dict[key])
NEW_STORM.vars[key] = np.array(NEW_STORM.dict[key])
NEW_STORM[key] = np.array(NEW_STORM.dict[key])
return NEW_STORM
[docs] def to_dict(self):
r"""
Returns the dict entry for the storm.
Returns
-------
dict
A dictionary containing information about the storm.
"""
# Return dict
return self.dict
[docs] def to_xarray(self):
r"""
Converts the storm dict into an xarray Dataset object.
Returns
-------
xarray.Dataset
An xarray Dataset object containing information about the storm.
"""
# Set up empty dict for dataset
time = self.dict['time']
ds = {}
attrs = {}
# Add every key containing a list into the dict, otherwise add as an attribute
keys = [k for k in self.dict.keys() if k != 'time']
for key in keys:
if isinstance(self.dict[key], list):
ds[key] = xr.DataArray(self.dict[key], coords=[
time], dims=['time'])
else:
attrs[key] = self.dict[key]
# Convert entire dict to a Dataset
ds = xr.Dataset(ds, attrs=attrs)
# Return dataset
return ds
[docs] def to_dataframe(self, attrs_as_columns=False):
r"""
Converts the storm dict into a pandas DataFrame object.
Parameters
----------
attrs_as_columns : bool
If True, adds Storm object attributes as columns in the DataFrame returned. Default is False.
Returns
-------
pandas.DataFrame
A pandas DataFrame object containing information about the storm.
"""
# Set up empty dict for dataframe
ds = {}
# Add every key containing a list into the dict
keys = [k for k in self.dict.keys()]
for key in keys:
if isinstance(self.dict[key], list):
ds[key] = self.dict[key]
else:
if attrs_as_columns:
ds[key] = self.dict[key]
# Convert entire dict to a DataFrame
ds = pd.DataFrame(ds)
# Return dataset
return ds
[docs] def plot(self, domain="dynamic", plot_all_dots=False, ax=None, cartopy_proj=None, save_path=None, **kwargs):
r"""
Creates a plot of the observed track of the storm.
Parameters
----------
domain : str
Domain for the plot. Default is "dynamic". "dynamic_tropical" is also available. Please refer to :ref:`options-domain` for available domain options.
plot_all_dots : bool
Whether to plot dots for all observations along the track. If false, dots will be plotted every 6 hours. Default is false.
ax : axes
Instance of axes to plot on. If none, one will be generated. Default is none.
cartopy_proj : ccrs
Instance of a cartopy projection to use. If none, one will be generated. Default is none.
save_path : str
Relative or full path of directory to save the image in. If none, image will not be saved.
Other Parameters
----------------
prop : dict
Customization properties of storm track lines. Please refer to :ref:`options-prop` for available options.
map_prop : dict
Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options.
Returns
-------
ax
Instance of axes containing the plot is returned.
"""
# Retrieve kwargs
prop = kwargs.pop('prop', {})
map_prop = kwargs.pop('map_prop', {})
# Create instance of plot object
try:
self.plot_obj
except:
self.plot_obj = TrackPlot()
# Create cartopy projection
if cartopy_proj is not None:
self.plot_obj.proj = cartopy_proj
elif max(self.dict['lon']) > 150 or min(self.dict['lon']) < -150:
self.plot_obj.create_cartopy(
proj='PlateCarree', central_longitude=180.0)
else:
self.plot_obj.create_cartopy(
proj='PlateCarree', central_longitude=0.0)
# Plot storm
plot_ax = self.plot_obj.plot_storms(
[self.dict], domain, plot_all_dots=plot_all_dots, ax=ax, prop=prop, map_prop=map_prop, save_path=save_path)
# Return axis
return plot_ax
[docs] def plot_nhc_forecast(self, forecast, track_labels='fhr', cone_days=5, domain="dynamic_forecast",
ax=None, cartopy_proj=None, save_path=None, **kwargs):
r"""
Creates a plot of the operational NHC forecast track along with observed track data.
Parameters
----------
forecast : int or datetime.datetime
Integer representing the forecast number, or datetime object for the closest issued forecast to this time.
track_labels : str
Label format for forecast dots. Scroll down to notes for available options.
cone_days : int
Number of days to plot the forecast cone. Default is 5 days. Can select 2, 3, 4 or 5 days.
domain : str
Domain for the plot. Default is "dynamic_forecast". Please refer to :ref:`options-domain` for available domain options.
ax : axes
Instance of axes to plot on. If none, one will be generated. Default is none.
cartopy_proj : ccrs
Instance of a cartopy projection to use. If none, one will be generated. Default is none.
save_path : str
Relative or full path of directory to save the image in. If none, image will not be saved.
Other Parameters
----------------
prop : dict
Customization properties of NHC forecast plot. Please refer to :ref:`options-prop-nhc` or scroll down below for available options.
map_prop : dict
Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options.
Returns
-------
ax
Instance of axes containing the plot is returned.
Notes
-----
The following properties are available for customizing ``track_labels``.
.. list-table::
:widths: 25 75
:header-rows: 1
* - Value
- Description
* - ``""``
- No label
* - ``"fhr"``
- Forecast hour (e.g., 60). Default option.
* - ``"fhr_wind_mph"``
- Forecast hour and sustained wind in mph.
* - ``"fhr_wind_kt"``
- Forecast hour and sustained wind in knots.
* - ``"valid_utc"``
- Valid time in UTC.
* - ``"valid_edt"``
- Valid time in Eastern time (EDT).
* - ``"valid_cdt"``
- Valid time in Central time (CDT).
* - ``"valid_mdt"``
- Valid time in Mountain time (MDT).
* - ``"valid_pdt"``
- Valid time in Pacific time (PDT).
* - ``"valid_hst"``
- Valid time in Hawaii time (HST).
The following properties are available for plotting NHC forecasts, via ``prop``.
.. list-table::
:widths: 25 75
:header-rows: 1
* - Property
- Description
* - cone_lw
- Center line width for the cone of uncertainty. Default is 2.0.
* - cone_alpha
- Transparency for the cone of uncertainty. Default is 0.6.
* - cone_res
- Grid resolution for the cone of uncertainty in degrees. Default is 0.05.
"""
# Retrieve kwargs
prop = kwargs.pop('prop', {})
map_prop = kwargs.pop('map_prop', {})
# Check to ensure the data source is HURDAT
if self.source != "hurdat":
raise RuntimeError(
"Error: NHC data can only be accessed when HURDAT is used as the data source.")
# Check to ensure storm is not an invest
if self.invest:
raise RuntimeError(
"Error: NHC does not issue advisories for invests that have not been designated as Potential Tropical Cyclones.")
# Create instance of plot object
try:
self.plot_obj
except:
self.plot_obj = TrackPlot()
# Create cartopy projection
if cartopy_proj is None:
if max(self.dict['lon']) > 130 or min(self.dict['lon']) < -130:
self.plot_obj.create_cartopy(
proj='PlateCarree', central_longitude=180.0)
else:
self.plot_obj.create_cartopy(
proj='PlateCarree', central_longitude=0.0)
# Get forecasts dict saved into storm object, if it hasn't been already
try:
self.forecast_dict
except:
self.get_operational_forecasts()
# Get all NHC forecast entries
nhc_forecasts = self.forecast_dict['OFCL']
carq_forecasts = self.forecast_dict['CARQ']
# Get list of all NHC forecast initializations
nhc_forecast_init = [k for k in nhc_forecasts.keys()]
carq_forecast_init = [k for k in carq_forecasts.keys()]
# Find closest matching time to the provided forecast date, or time
if isinstance(forecast, int):
forecast_dict = nhc_forecasts[nhc_forecast_init[forecast - 1]]
advisory_num = forecast + 0
elif isinstance(forecast, dt):
nhc_forecast_init_dt = [dt.strptime(
k, '%Y%m%d%H') for k in nhc_forecast_init]
time_diff = np.array(
[(i - forecast).days + (i - forecast).seconds / 86400 for i in nhc_forecast_init_dt])
closest_idx = np.abs(time_diff).argmin()
forecast_dict = nhc_forecasts[nhc_forecast_init[closest_idx]]
advisory_num = closest_idx + 1
if np.abs(time_diff[closest_idx]) >= 1.0:
warnings.warn(
"The time provided is outside of the duration of the storm. Returning the closest available NHC forecast.")
else:
raise RuntimeError(
"Error: Input variable 'forecast' must be of type 'int' or 'datetime.datetime'")
# Get observed track as per NHC analyses
track_dict = {
'lat': [],
'lon': [],
'vmax': [],
'type': [],
'mslp': [],
'time': [],
'extra_obs': [],
'special': [],
'ace': 0.0,
}
use_carq = True
for k in nhc_forecast_init:
hrs = nhc_forecasts[k]['fhr']
hrs_carq = carq_forecasts[k]['fhr'] if k in carq_forecast_init else [
]
# Account for old years when hour 0 wasn't included directly
# if 0 not in hrs and k in carq_forecast_init and 0 in hrs_carq:
if self.dict['year'] < 2000 and k in carq_forecast_init and 0 in hrs_carq:
use_carq = True
hr_idx = hrs_carq.index(0)
track_dict['lat'].append(carq_forecasts[k]['lat'][hr_idx])
track_dict['lon'].append(carq_forecasts[k]['lon'][hr_idx])
track_dict['vmax'].append(carq_forecasts[k]['vmax'][hr_idx])
track_dict['mslp'].append(np.nan)
track_dict['time'].append(carq_forecasts[k]['init'])
itype = carq_forecasts[k]['type'][hr_idx]
if itype == "":
itype = get_storm_type(carq_forecasts[k]['vmax'][0], False)
track_dict['type'].append(itype)
hr = carq_forecasts[k]['init'].strftime("%H%M")
track_dict['extra_obs'].append(0) if hr in [
'0300', '0900', '1500', '2100'] else track_dict['extra_obs'].append(1)
track_dict['special'].append("")
else:
use_carq = False
if 3 in hrs:
hr_idx = hrs.index(3)
hr_add = 3
else:
hr_idx = 0
hr_add = 0
track_dict['lat'].append(nhc_forecasts[k]['lat'][hr_idx])
track_dict['lon'].append(nhc_forecasts[k]['lon'][hr_idx])
track_dict['vmax'].append(nhc_forecasts[k]['vmax'][hr_idx])
track_dict['mslp'].append(np.nan)
track_dict['time'].append(
nhc_forecasts[k]['init'] + timedelta(hours=hr_add))
itype = nhc_forecasts[k]['type'][hr_idx]
if itype == "":
itype = get_storm_type(nhc_forecasts[k]['vmax'][0], False)
track_dict['type'].append(itype)
hr = nhc_forecasts[k]['init'].strftime("%H%M")
track_dict['extra_obs'].append(0) if hr in [
'0300', '0900', '1500', '2100'] else track_dict['extra_obs'].append(1)
track_dict['special'].append("")
# Add main elements from storm dict
for key in ['id', 'operational_id', 'name', 'year']:
track_dict[key] = self.dict[key]
# Add carq to forecast dict as hour 0, if available
if use_carq and forecast_dict['init'] in track_dict['time']:
insert_idx = track_dict['time'].index(forecast_dict['init'])
if 0 in forecast_dict['fhr']:
forecast_dict['lat'][0] = track_dict['lat'][insert_idx]
forecast_dict['lon'][0] = track_dict['lon'][insert_idx]
forecast_dict['vmax'][0] = track_dict['vmax'][insert_idx]
forecast_dict['mslp'][0] = track_dict['mslp'][insert_idx]
forecast_dict['type'][0] = track_dict['type'][insert_idx]
else:
forecast_dict['fhr'].insert(0, 0)
forecast_dict['lat'].insert(0, track_dict['lat'][insert_idx])
forecast_dict['lon'].insert(0, track_dict['lon'][insert_idx])
forecast_dict['vmax'].insert(0, track_dict['vmax'][insert_idx])
forecast_dict['mslp'].insert(0, track_dict['mslp'][insert_idx])
forecast_dict['type'].insert(0, track_dict['type'][insert_idx])
# Add other info to forecast dict
forecast_dict['advisory_num'] = advisory_num
forecast_dict['basin'] = self.basin
# Plot storm
plot_ax = self.plot_obj.plot_storm_nhc(
forecast_dict, track_dict, track_labels, cone_days, domain, ax=ax, save_path=save_path, prop=prop, map_prop=map_prop)
# Return axis
return plot_ax
[docs] def plot_models(self, forecast=None, plot_btk=False, domain="dynamic", ax=None, cartopy_proj=None, save_path=None, **kwargs):
r"""
Creates a plot of operational model forecast tracks.
Parameters
----------
forecast : datetime.datetime, optional
Datetime object representing the forecast initialization. If None (default), fetches the latest forecast.
plot_btk : bool, optional
If True, Best Track will be plotted alongside operational forecast models. Default is False.
domain : str
Domain for the plot. Default is "dynamic". Please refer to :ref:`options-domain` for available domain options.
ax : axes
Instance of axes to plot on. If none, one will be generated. Default is none.
cartopy_proj : ccrs
Instance of a cartopy projection to use. If none, one will be generated. Default is none.
save_path : str
Relative or full path of directory to save the image in. If none, image will not be saved.
Other Parameters
----------------
models : dict
Dictionary with **key** = model name (case-insensitive) and **value** = model color. Scroll below for available model names.
prop : dict
Customization properties of forecast lines. Scroll below for available options.
map_prop : dict
Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options.
Returns
-------
ax
Instance of axes containing the plot is returned.
Notes
-----
.. note::
1. For years before the HMON model was available, the HMON key instead defaults to the old GFDL model.
2. For storms in the JTWC area of responsibility, the NHC key defaults to JTWC.
The following model names are available as keys in the "models" dict. These names are case-insensitive. To avoid plotting any of these models, set the value to None instead of a color (e.g., ``models = {'gfs':None}`` or ``models = {'GFS':None}``).
.. list-table::
:widths: 25 75
:header-rows: 1
* - Model Acronym
- Full Model Name
* - CMC
- Canadian Meteorological Centre (CMC)
* - GFS
- Global Forecast System (GFS)
* - UKM
- UK Met Office (UKMET)
* - ECM
- European Centre for Medium-range Weather Forecasts (ECMWF)
* - HMON
- Hurricanes in a Multi-scale Ocean-coupled Non-hydrostatic Model (HMON)
* - HWRF
- Hurricane Weather Research and Forecast (HWRF)
* - HAFSA
- Hurricane Analysis and Forecast System A (HAFS-A)
* - HAFSB
- Hurricane Analysis and Forecast System B (HAFS-B)
* - NHC
- National Hurricane Center (NHC)
The following properties are available for customizing forecast model tracks, via ``prop``.
.. list-table::
:widths: 25 75
:header-rows: 1
* - Property
- Description
* - linewidth
- Line width of forecast model track. Default is 2.5.
* - marker
- Marker type for forecast hours. Options are 'label' (default), 'dot' or None.
* - marker_hours
- List of forecast hours to mark. Default is [24,48,72,96,120,144,168].
"""
# Dictionary mapping model names to the interpolated model key
dict_models = {
'cmc': 'CMC2',
'gfs': 'AVNI',
'ukm': 'UKX2',
'ecm': 'ECO2',
'hmon': 'HMNI',
'hwrf': 'HWFI',
'hafsa': 'HFAI',
'hafsb': 'HFBI',
'nhc': 'OFCI',
}
backup_models = {
'gfs': ['AVNO', 'AVNX'],
'ukm': ['UKM2', 'UKM'],
'cmc': ['CMC'],
'hmon': ['GFDI', 'GFDL'],
'nhc': ['OFCL', 'JTWC'],
'hwrf': ['HWRF'],
'hafsa': ['HFSA'],
'hafsb': ['HFSB'],
}
# Pop kwargs
prop = kwargs.pop('prop', {})
models = kwargs.pop('models', {})
map_prop = kwargs.pop('map_prop', {})
# Create instance of plot object
try:
self.plot_obj
except:
self.plot_obj = TrackPlot()
# -------------------------------------------------------------------------
# Get forecasts dict saved into storm object, if it hasn't been already
try:
self.forecast_dict
except:
self.get_operational_forecasts()
# Fetch latest forecast if None
if forecast is None:
check_keys = ['AVNI', 'OFCI', 'HWFI', 'HFAI']
if 'HWFI' not in self.forecast_dict.keys():
check_keys[2] = 'HWRF'
if 'HWRF' not in self.forecast_dict.keys() and 'HWRF' in check_keys:
check_keys.pop(check_keys.index('HWRF'))
if 'OFCI' not in self.forecast_dict.keys():
check_keys[1] = 'OFCL'
if 'OFCL' not in self.forecast_dict.keys():
check_keys[1] = 'JTWC'
if 'JTWC' not in self.forecast_dict.keys() and 'JTWC' in check_keys:
check_keys.pop(check_keys.index('JTWC'))
if 'AVNI' not in self.forecast_dict.keys():
check_keys[0] = 'AVNO'
if 'AVNO' not in self.forecast_dict.keys():
check_keys[0] = 'AVNX'
if 'AVNX' not in self.forecast_dict.keys() and 'AVNX' in check_keys:
check_keys.pop(check_keys.index('AVNX'))
if 'HFAI' not in self.forecast_dict.keys():
check_keys[3] = 'HFSA'
if 'HFSA' not in self.forecast_dict.keys() and 'HWRF' in check_keys:
check_keys.pop(check_keys.index('HFSA'))
if len(check_keys) == 0:
raise ValueError("No models are available for this storm.")
inits = [dt.strptime(
[k for k in self.forecast_dict[key]][-1], '%Y%m%d%H') for key in check_keys]
forecast = min(inits)
# Error check forecast time
if forecast < self.time[0] or forecast > self.time[-1]:
raise ValueError(
"Requested forecast is outside of the storm's duration.")
# Construct forecast dict
ds = {}
proj_lons = []
forecast_str = forecast.strftime('%Y%m%d%H')
input_keys = [k for k in models.keys()]
input_keys_lower = [k.lower() for k in models.keys()]
for key in dict_models.keys():
# Only proceed if model isn't not requested
if key in input_keys_lower:
idx = input_keys_lower.index(key)
if models[input_keys[idx]] is None:
continue
# Find official key
official_key = dict_models[key]
found = False
if official_key not in self.forecast_dict.keys():
if key in backup_models.keys():
for backup_key in backup_models[key]:
if backup_key in self.forecast_dict.keys():
official_key = backup_key
found = True
break
else:
found = True
# Check for 2 vs. I if needed
if not found or forecast_str not in self.forecast_dict[official_key].keys():
if '2' in official_key:
official_key = dict_models[key].replace('2', 'I')
if official_key not in self.forecast_dict.keys():
if key in backup_models.keys():
found = False
for backup_key_iter in backup_models[key]:
backup_key = backup_key_iter.replace('2', 'I')
if backup_key in self.forecast_dict.keys():
official_key = backup_key
found = True
break
if not found:
continue
else:
continue
else:
continue
# Append forecast data if it exists for this initialization
if forecast_str not in self.forecast_dict[official_key].keys():
continue
enter_key = key + ''
if key.lower() == 'hmon' and 'gf' in official_key.lower():
enter_key = 'gfdl'
if key.lower() == 'nhc' and 'jt' in official_key.lower():
enter_key = 'jtwc'
ds[enter_key] = copy.deepcopy(
self.forecast_dict[official_key][forecast_str])
# Filter out to hour 168
if ds[enter_key]['fhr'][-1] > 168:
idx = ds[enter_key]['fhr'].index(168)
for key in ds[enter_key].keys():
if isinstance(ds[enter_key][key], list):
ds[enter_key][key] = ds[enter_key][key][:idx + 1]
proj_lons += ds[enter_key]['lon']
# Proceed if data exists
if len(ds) == 0:
raise RuntimeError(
"No forecasts are available for the given parameters.")
# Create cartopy projection
if cartopy_proj is not None:
self.plot_obj.proj = cartopy_proj
elif np.nanmax(proj_lons) > 150 or np.nanmin(proj_lons) < -150:
self.plot_obj.create_cartopy(
proj='PlateCarree', central_longitude=180.0)
else:
self.plot_obj.create_cartopy(
proj='PlateCarree', central_longitude=0.0)
# Account for cases crossing dateline
if np.nanmax(proj_lons) > 150 or np.nanmin(proj_lons) < -150:
for key in ds.keys():
new_lons = np.array(ds[key]['lon'])
new_lons[new_lons < 0] = new_lons[new_lons < 0] + 360.0
ds[key]['lon'] = new_lons.tolist()
# Plot storm
plot_ax = self.plot_obj.plot_models(
forecast, plot_btk, self.dict, ds, models, domain, ax=ax, prop=prop, map_prop=map_prop, save_path=save_path)
# Return axis
return plot_ax
[docs] def plot_models_wind(self, forecast=None, plot_btk=False, **kwargs):
r"""
Creates a plot of operational model forecast wind speed.
Parameters
----------
forecast : datetime.datetime, optional
Datetime object representing the forecast initialization. If None (default), fetches the latest forecast.
plot_btk : bool, optional
If True, Best Track will be plotted alongside operational forecast models. Default is False.
Other Parameters
----------------
models : dict
Dictionary with **key** = model name (case-insensitive) and **value** = model color. Scroll below for available model names.
prop : dict
Customization properties of forecast lines. Scroll below for available options.
Returns
-------
ax
Instance of axes containing the plot is returned.
Notes
-----
.. note::
1. For years before the HMON model was available, the HMON key instead defaults to the old GFDL model.
2. For storms in the JTWC area of responsibility, the NHC key defaults to JTWC.
The following model names are available as keys in the "models" dict. These names are case-insensitive. To avoid plotting any of these models, set the value to None instead of a color (e.g., ``models = {'gfs':None}`` or ``models = {'GFS':None}``).
.. list-table::
:widths: 25 75
:header-rows: 1
* - Model Acronym
- Full Model Name
* - CMC
- Canadian Meteorological Centre (CMC)
* - GFS
- Global Forecast System (GFS)
* - UKM
- UK Met Office (UKMET)
* - ECM
- European Centre for Medium-range Weather Forecasts (ECMWF)
* - HMON
- Hurricanes in a Multi-scale Ocean-coupled Non-hydrostatic Model (HMON)
* - HWRF
- Hurricane Weather Research and Forecast (HWRF)
* - HAFSA
- Hurricane Analysis and Forecast System A (HAFS-A)
* - HAFSB
- Hurricane Analysis and Forecast System B (HAFS-B)
* - SHIPS
- Statistical SHIPS model
* - NHC
- National Hurricane Center (NHC)
The following properties are available for customizing the plot, via ``prop``.
.. list-table::
:widths: 25 75
:header-rows: 1
* - Property
- Description
* - linewidth
- Line width of forecast model intensity. Default is 2.0.
"""
# Dictionary mapping model names to the interpolated model key
dict_models = {
'cmc': 'CMC2',
'gfs': 'AVNI',
'ukm': 'UKX2',
'ecm': 'ECO2',
'hmon': 'HMNI',
'hwrf': 'HWFI',
'hafsa': 'HFAI',
'hafsb': 'HFBI',
'ships': 'DSHP',
'nhc': 'OFCI',
}
backup_models = {
'gfs': ['AVNO', 'AVNX'],
'ukm': ['UKM2', 'UKM'],
'hmon': ['GFDI', 'GFDL'],
'nhc': ['OFCL', 'JTWC'],
'hwrf': ['HWRF'],
'hafsa': ['HFSA'],
'hafsb': ['HFSB'],
'ships': ['SHIP'],
}
# Pop kwargs
models = kwargs.pop('models', {})
prop = kwargs.pop('prop', {})
# -------------------------------------------------------------------------
# Get forecasts dict saved into storm object, if it hasn't been already
try:
self.forecast_dict
except:
self.get_operational_forecasts()
# Fetch latest forecast if None
if forecast is None:
check_keys = ['AVNI', 'OFCI', 'HWFI', 'HFAI']
if 'HWFI' not in self.forecast_dict.keys():
check_keys[2] = 'HWRF'
if 'HWRF' not in self.forecast_dict.keys() and 'HWRF' in check_keys:
check_keys.pop(check_keys.index('HWRF'))
if 'OFCI' not in self.forecast_dict.keys():
check_keys[1] = 'OFCL'
if 'OFCL' not in self.forecast_dict.keys():
check_keys[1] = 'JTWC'
if 'JTWC' not in self.forecast_dict.keys() and 'JTWC' in check_keys:
check_keys.pop(check_keys.index('JTWC'))
if 'AVNI' not in self.forecast_dict.keys():
check_keys[0] = 'AVNO'
if 'AVNO' not in self.forecast_dict.keys():
check_keys[0] = 'AVNX'
if 'AVNX' not in self.forecast_dict.keys() and 'AVNX' in check_keys:
check_keys.pop(check_keys.index('AVNX'))
if 'HFAI' not in self.forecast_dict.keys():
check_keys[3] = 'HFSA'
if 'HFSA' not in self.forecast_dict.keys() and 'HWRF' in check_keys:
check_keys.pop(check_keys.index('HFSA'))
if len(check_keys) == 0:
raise ValueError("No models are available for this storm.")
inits = [dt.strptime(
[k for k in self.forecast_dict[key]][-1], '%Y%m%d%H') for key in check_keys]
forecast = min(inits)
# Error check forecast time
if forecast < self.time[0] or forecast > self.time[-1]:
raise ValueError(
"Requested forecast is outside of the storm's duration.")
# Construct forecast dict
ds = {}
forecast_str = forecast.strftime('%Y%m%d%H')
input_keys = [k for k in models.keys()]
input_keys_lower = [k.lower() for k in models.keys()]
for key in dict_models.keys():
# Only proceed if model isn't not requested
if key in input_keys_lower:
idx = input_keys_lower.index(key)
if models[input_keys[idx]] is None:
continue
# Find official key
official_key = dict_models[key]
found = False
if official_key not in self.forecast_dict.keys():
if key in backup_models.keys():
for backup_key in backup_models[key]:
if backup_key in self.forecast_dict.keys():
official_key = backup_key
found = True
break
else:
found = True
# Check for 2 vs. I if needed
if not found or forecast_str not in self.forecast_dict[official_key].keys():
if '2' in official_key:
official_key = dict_models[key].replace('2', 'I')
if official_key not in self.forecast_dict.keys():
if key in backup_models.keys():
found = False
for backup_key_iter in backup_models[key]:
backup_key = backup_key_iter.replace('2', 'I')
if backup_key in self.forecast_dict.keys():
official_key = backup_key
found = True
break
if not found:
continue
else:
continue
else:
continue
# Append forecast data if it exists for this initialization
if forecast_str not in self.forecast_dict[official_key].keys():
continue
enter_key = key + ''
if key.lower() == 'hmon' and 'gf' in official_key.lower():
enter_key = 'gfdl'
if key.lower() == 'nhc' and 'jt' in official_key.lower():
enter_key = 'jtwc'
ds[enter_key] = copy.deepcopy(
self.forecast_dict[official_key][forecast_str])
# Filter out to hour 168
if ds[enter_key]['fhr'][-1] > 168:
idx = ds[enter_key]['fhr'].index(168)
for key in ds[enter_key].keys():
if isinstance(ds[enter_key][key], list):
ds[enter_key][key] = ds[enter_key][key][:idx + 1]
# Proceed if data exists
if len(ds) == 0:
raise RuntimeError(
"No forecasts are available for the given parameters.")
# Set default properties
default_model = {'nhc': 'k',
'ships': 'gray',
'gfs': '#0000ff',
'ecm': '#ff1493',
'cmc': '#1e90ff',
'ukm': '#00ff00',
'hmon': '#ff8c00',
'hwrf': '#66cdaa',
'hafsa': '#C659F9',
'hafsb': '#8915BB'}
default_prop = {'linewidth': 2.0}
# Update properties
for key in models.keys():
default_model[key] = models[key]
for key in prop.keys():
default_prop[key] = prop[key]
# Fix GFDL
if 'gfdl' in ds.keys():
default_model['gfdl'] = default_model['hmon']
if 'jtwc' in ds.keys():
default_model['jtwc'] = default_model['nhc']
# Create figure
fig, ax = plt.subplots(figsize=(11,7.5), dpi=200)
ax.grid()
# Derive plot extrema
max_fhr = max([max(ds[entry]['fhr']) for entry in ds.keys()])
max_wind = np.nanmax([np.nanmax(ds[entry]['vmax']) for entry in ds.keys()])
min_wind = np.nanmin([np.nanmin(ds[entry]['vmax']) for entry in ds.keys()])
# Add category fills
for category in range(0, 6):
bound_upper = category_to_wind(category)
bound_lower = category_to_wind(category-1)
ax.fill_between([0, max_fhr], bound_lower, bound_upper, color=get_colors_sshws(bound_lower), alpha=0.1)
ax.fill_between([0, max_fhr], category_to_wind(5), 250, color=get_colors_sshws(category_to_wind(5)), alpha=0.1)
# Plot models
for key in ds.keys():
# Skip if model not requested
if default_model[key] is None or len(ds[key]['vmax']) <= 1:
continue
# Fix label for HAFS
if 'hafs' in key:
idx = key.index('hafs')
model_label = f"HAFS-{key[idx + len('hafs'):].upper()}"
else:
model_label = key.upper()
ax.plot(ds[key]['fhr'], ds[key]['vmax'], linewidth=default_prop['linewidth'],
zorder=6, color=default_model[key], label=model_label)
# Derive and plot Best Track
if plot_btk:
# Determine range of forecast data in best track
idx_start = self.dict['time'].index(forecast)
end_date = forecast + timedelta(hours=max_fhr)
if end_date in self.dict['time']:
idx_end = self.dict['time'].index(end_date)
else:
idx_end = len(self.dict['time'])
# Plot best track line
vmax = self.dict['vmax'][idx_start:idx_end + 1]
storm_times = self.dict['time'][idx_start:idx_end + 1]
storm_fhr = [(i-forecast).total_seconds()/3600.0 for i in storm_times]
ax.plot(storm_fhr, vmax, linestyle=':', color='k', zorder=5,
linewidth=default_prop['linewidth'], label='Best Track')
max_wind = max(max_wind, np.nanmax(vmax))
min_wind = min(min_wind, np.nanmin(vmax))
# Set plot bounds and labels
y_factor = (max_wind - min_wind) * 0.1
ax.set_xticks(np.arange(0, max_fhr+1, 24))
ax.set_xlim(0, max_fhr)
ax.set_ylim(min_wind - y_factor, max_wind + y_factor)
ax.set_xlabel('Forecast Hour', fontsize=12)
ax.set_ylabel('Sustained Wind (kt)', fontsize=12)
ax.tick_params(axis='both', which='major', labelsize=12)
ax.tick_params(axis='both', which='minor', labelsize=12)
# Add category labels
for category in range(0, 6):
threshold = category_to_wind(category)
color = get_colors_sshws(threshold)
text_color = get_colors_sshws(threshold)
if category == 1:
text_color = get_colors_sshws(category_to_wind(2))
text_label = f'Category {category}' if category > 0 else 'Tropical Storm'
ax.axhline(threshold, linestyle='dotted', color=color, zorder=5, alpha=0.5)
a = ax.text(0.015, (threshold - ax.get_ylim()[0]) / (ax.get_ylim()[1] - ax.get_ylim()[0]) + 0.02,
text_label, color=text_color, alpha=0.8, clip_on=True, zorder=5, transform=ax.transAxes)
a.set_path_effects([path_effects.Stroke(linewidth=5, foreground='w'),
path_effects.Normal()])
ax.set_ylim(min_wind - y_factor, max_wind + y_factor)
# Add legend
l = ax.legend(loc=1, prop={'size': 12})
l.set_zorder(1001)
# Plot title
plot_title = f"Model Forecast Intensity for {self.name.title()}"
ax.set_title(plot_title, fontsize=16, loc='left', fontweight='bold')
title_str = f"Initialized {forecast.strftime('%H%M UTC %d %B %Y')}"
ax.set_title(title_str, fontsize=12, loc='right')
# Add label
a = ax.text(0.99, 0.01, 'Plot generated using Tropycal',
ha='right', va='bottom', fontsize=10, alpha=0.6, transform=ax.transAxes)
a.set_path_effects([path_effects.Stroke(linewidth=5, foreground='w'),
path_effects.Normal()])
# Return axis
return ax
[docs] def plot_ensembles(self, forecast=None, fhr=None, interpolate=True, domain="dynamic", ax=None, cartopy_proj=None, save_path=None, **kwargs):
r"""
Creates a plot of individual GEFS ensemble tracks.
Parameters
----------
forecast : datetime.datetime, optional
Datetime object representing the GEFS run initialization. If None (default), fetches the latest run.
fhr : int, optional
Forecast hour to plot. If None (default), a cumulative plot of all forecast hours will be produced. If an integer, a single plot will be produced.
interpolate : bool, optional
If True, and fhr is None, track density data will be interpolated to hourly. Default is True (1-hourly track density data). False plots density using 6-hourly track data.
domain : str
Domain for the plot. Default is "dynamic". Please refer to :ref:`options-domain` for available domain options.
ax : axes
Instance of axes to plot on. If none, one will be generated. Default is none.
cartopy_proj : ccrs
Instance of a cartopy projection to use. If none, one will be generated. Default is none.
save_path : str
Relative or full path of directory to save the image in. If none, image will not be saved.
Other Parameters
----------------
prop_members : dict
Customization properties of GEFS ensemble member track lines. Scroll down below for available options.
prop_mean : dict
Customization properties of GEFS ensemble mean track. Scroll down below for available options.
prop_gfs : dict
Customization properties of GFS forecast track. Scroll down below for available options.
prop_btk : dict
Customization properties of Best Track line. Scroll down below for available options.
prop_ellipse : dict
Customization properties of GEFS ensemble ellipse. Scroll down below for available options.
prop_density : dict
Customization properties of GEFS ensemble track density. Scroll down below for available options.
map_prop : dict
Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options.
Returns
-------
ax
Instance of axes containing the plot is returned.
Notes
-----
.. note::
The total number of GEFS members available for analysis is as follows:
* **2020 - present** - 31 members
* **2006 - 2019** - 21 members
* **2005 & back** - 5 members
As the density plot and ensemble ellipse require a minimum of 10 ensemble members, they will not be generated for storms from 2005 and earlier.
Additionally, ellipses are not generated if using the default ``fhr=None``, meaning a cumulative track density plot is generated instead.
The ensemble ellipse used in this function follows the methodology of `Hamill et al. (2011)`_, denoting the spread in ensemble member cyclone positions. The size of the ellipse is calculated to contain 90% of ensemble members at any given time. This ellipse can be used to determine the primary type of ensemble variability:
* **Along-track variability** - if the major axis of the ellipse is parallel to the ensemble mean motion vector.
* **Across-track variability** - if the major axis of the ellipse is normal to the ensemble mean motion vector.
.. _Hamill et al. (2011): https://doi.org/10.1175/2010MWR3456.1
The following properties are available for customizing ensemble member tracks, via ``prop_members``.
.. list-table::
:widths: 25 75
:header-rows: 1
* - Property
- Description
* - plot
- Boolean to determine whether to plot ensemble member tracks. Default is True.
* - linewidth
- Forecast track linewidth. Default is 0.2.
* - linecolor
- Forecast track line color. Default is black.
* - color_var
- Variable name to color ensemble members by ('vmax' or 'mslp'). Default is None.
* - cmap
- If ``color_var`` is specified, matplotlib colormap to color the variable by.
* - levels
- If ``color_var`` is specified, list of contour levels to color the variable by.
The following properties are available for customizing ensemble mean track, via ``prop_mean``.
.. list-table::
:widths: 25 75
:header-rows: 1
* - Property
- Description
* - plot
- Boolean to determine whether to plot ensemble mean forecast track. Default is True.
* - linewidth
- Forecast track linewidth. Default is 3.0.
* - linecolor
- Forecast track line color. Default is black.
The following properties are available for customizing GFS forecast track, via ``prop_gfs``.
.. list-table::
:widths: 25 75
:header-rows: 1
* - Property
- Description
* - plot
- Boolean to determine whether to plot GFS forecast track. Default is True.
* - linewidth
- Forecast track linewidth. Default is 3.0.
* - linecolor
- Forecast track line color. Default is red.
The following properties are available for customizing Best Track line, via ``prop_btk``.
.. list-table::
:widths: 25 75
:header-rows: 1
* - Property
- Description
* - plot
- Boolean to determine whether to plot Best Track line. Default is True.
* - linewidth
- Best Track linewidth. Default is 2.5.
* - linecolor
- Best Track line color. Default is blue.
The following properties are available for customizing the ensemble ellipse plot, via ``prop_ellipse``.
.. list-table::
:widths: 25 75
:header-rows: 1
* - Property
- Description
* - plot
- Boolean to determine whether to plot ensemble member ellipse. Default is True.
* - linewidth
- Ellipse linewidth. Default is 3.0.
* - linecolor
- Ellipse line color. Default is blue.
The following properties are available for customizing ensemble member track density, via ``prop_density``.
.. list-table::
:widths: 25 75
:header-rows: 1
* - Property
- Description
* - plot
- Boolean to determine whether to plot ensemble member track density. Default is True.
* - radius
- Radius (in km) for which to calculate track density. Default is 200 km.
* - cmap
- Matplotlib colormap for track density plot. Default is "plasma_r".
* - levels
- List of levels for contour filling track density.
"""
# Pop kwargs
prop_members = kwargs.pop('prop_members', {})
prop_mean = kwargs.pop('prop_mean', {})
prop_gfs = kwargs.pop('prop_gfs', {})
prop_btk = kwargs.pop('prop_btk', {})
prop_ellipse = kwargs.pop('prop_ellipse', {})
prop_density = kwargs.pop('prop_density', {})
map_prop = kwargs.pop('map_prop', {})
# Create instance of plot object
try:
self.plot_obj
except:
self.plot_obj = TrackPlot()
# -------------------------------------------------------------------------
# Get forecasts dict saved into storm object, if it hasn't been already
try:
self.forecast_dict
except:
self.get_operational_forecasts()
# Fetch latest forecast if None
if forecast is None:
inits = []
for key in ['AC00', 'AP01', 'AP02', 'AP03', 'AP04', 'AP05']:
if key in self.forecast_dict.keys():
inits.append(dt.strptime(
[k for k in self.forecast_dict[key]][-1], '%Y%m%d%H'))
if len(inits) > 0:
forecast = min(inits)
else:
raise RuntimeError(
"Error: Could not determine the latest available GEFS forecast.")
# Determine max members by year
nens = 21
if self.year >= 2020 and ('AP21' in self.forecast_dict.keys() or 'AP22' in self.forecast_dict.keys() or 'AP23' in self.forecast_dict.keys()):
nens = 31
# Enforce fhr type
if isinstance(fhr, list):
fhr = fhr[0]
# If this forecast init was recently used, don't re-calculate
init_used = False
try:
if self.gefs_init == forecast:
init_used = True
except:
pass
# Only calculate if needed to
if not init_used:
print("--> Starting to calculate ellipse data")
# Create dict to store all data in
ds = {'gfs': {'fhr': [], 'lat': [], 'lon': [], 'vmax': [], 'mslp': [], 'time': []},
'gefs': {'fhr': [], 'lat': [], 'lon': [], 'vmax': [], 'mslp': [], 'time': [],
'members': [], 'ellipse_lat': [], 'ellipse_lon': []}
}
# String formatting for ensembles
def str2(ens):
if ens == 0:
return "AC00"
if ens < 10:
return f"AP0{ens}"
return f"AP{ens}"
# Get GFS forecast entry (AVNX is valid for RAL a-deck source)
gfs_key = 'AVNO' if 'AVNO' in self.forecast_dict.keys() else 'AVNX'
try:
forecast_gfs = self.forecast_dict[gfs_key][forecast.strftime(
"%Y%m%d%H")]
except:
raise RuntimeError(
"The requested GFS initialization isn't available for this storm.")
# Enter into dict entry
ds['gfs']['fhr'] = [int(i) for i in forecast_gfs['fhr']]
ds['gfs']['lat'] = [np.round(i, 1) for i in forecast_gfs['lat']]
ds['gfs']['lon'] = [np.round(i, 1) for i in forecast_gfs['lon']]
ds['gfs']['vmax'] = [float(i) for i in forecast_gfs['vmax']]
ds['gfs']['mslp'] = forecast_gfs['mslp']
ds['gfs']['time'] = [forecast +
timedelta(hours=i) for i in forecast_gfs['fhr']]
# Retrieve GEFS ensemble data (30 members 2019-present, 20 members prior)
for ens in range(0, nens):
# Create dict entry
ds[f'gefs_{ens}'] = {
'fhr': [],
'lat': [],
'lon': [],
'vmax': [],
'mslp': [],
'time': [],
}
# Retrieve ensemble member data
ens_str = str2(ens)
if ens_str not in self.forecast_dict.keys():
continue
if forecast.strftime("%Y%m%d%H") not in self.forecast_dict[ens_str].keys():
continue
forecast_ens = self.forecast_dict[ens_str][forecast.strftime(
"%Y%m%d%H")]
# Enter into dict entry
ds[f'gefs_{ens}']['fhr'] = [int(i)
for i in forecast_ens['fhr']]
ds[f'gefs_{ens}']['lat'] = [
np.round(i, 1) for i in forecast_ens['lat']]
ds[f'gefs_{ens}']['lon'] = [
np.round(i, 1) for i in forecast_ens['lon']]
ds[f'gefs_{ens}']['vmax'] = [
float(i) for i in forecast_ens['vmax']]
ds[f'gefs_{ens}']['mslp'] = forecast_ens['mslp']
ds[f'gefs_{ens}']['time'] = [forecast +
timedelta(hours=i) for i in forecast_ens['fhr']]
# Construct ensemble mean data
# Iterate through all forecast hours
for iter_fhr in range(0, 246, 6):
# Temporary data arrays
temp_data = {}
for key in ds['gfs'].keys():
if key not in ['time', 'fhr']:
temp_data[key] = []
# Iterate through ensemble member
for ens in range(nens):
# Determine if member has data valid at this forecast hour
if iter_fhr in ds[f'gefs_{ens}']['fhr']:
# Retrieve index
idx = ds[f'gefs_{ens}']['fhr'].index(iter_fhr)
# Append data
for key in ds['gfs'].keys():
if key not in ['time', 'fhr']:
temp_data[key].append(
ds[f'gefs_{ens}'][key][idx])
# Proceed if 20 or more ensemble members
if len(temp_data['lat']) >= 10:
# Append data
for key in ds['gfs'].keys():
if key not in ['time', 'fhr']:
ds['gefs'][key].append(np.nanmean(temp_data[key]))
ds['gefs']['fhr'].append(iter_fhr)
ds['gefs']['time'].append(
forecast + timedelta(hours=iter_fhr))
ds['gefs']['members'].append(len(temp_data['lat']))
# Calculate ellipse data
if prop_ellipse is not None:
try:
ellipse_data = calc_ensemble_ellipse(
temp_data['lon'], temp_data['lat'])
ds['gefs']['ellipse_lon'].append(
ellipse_data['ellipse_lon'])
ds['gefs']['ellipse_lat'].append(
ellipse_data['ellipse_lat'])
except:
ds['gefs']['ellipse_lon'].append([])
ds['gefs']['ellipse_lat'].append([])
else:
ds['gefs']['ellipse_lon'].append([])
ds['gefs']['ellipse_lat'].append([])
# Save data for future use if needed
self.gefs_init = forecast
self.ds = ds
print("--> Done calculating ellipse data")
# Determine lon bounds for cartopy projection
proj_lons = []
for key in self.ds.keys():
proj_lons += self.ds[key]['lon']
if fhr is not None and fhr in self.ds['gefs']['fhr']:
fhr_idx = self.ds['gefs']['fhr'].index(fhr)
proj_lons += self.ds['gefs']['ellipse_lon'][fhr_idx]
# Create cartopy projection
if cartopy_proj is not None:
self.plot_obj.proj = cartopy_proj
elif np.nanmax(proj_lons) > 150 or np.nanmin(proj_lons) < -150:
self.plot_obj.create_cartopy(
proj='PlateCarree', central_longitude=180.0)
else:
self.plot_obj.create_cartopy(
proj='PlateCarree', central_longitude=0.0)
# Account for cases crossing dateline
ds = copy.deepcopy(self.ds)
if np.nanmax(proj_lons) > 150 or np.nanmin(proj_lons) < -150:
for key in ds.keys():
new_lons = np.array(ds[key]['lon'])
new_lons[new_lons < 0] = new_lons[new_lons < 0] + 360.0
ds[key]['lon'] = new_lons.tolist()
# Re-calculate GEFS mean
for iter_hr in ds['gefs']['fhr']:
fhr_idx = ds['gefs']['fhr'].index(iter_hr)
ds['gefs']['lon'][fhr_idx] = np.nanmean([ds[f'gefs_{ens}']['lon'][ds[f'gefs_{ens}']['fhr'].index(
iter_hr)] for ens in range(nens) if iter_hr in ds[f'gefs_{ens}']['fhr']])
# Plot storm
plot_ax = self.plot_obj.plot_ensembles(forecast, self.dict, fhr, interpolate, prop_members, prop_mean,
prop_gfs, prop_btk, prop_ellipse, prop_density, nens, domain,
ds, ax=ax, map_prop=map_prop, save_path=save_path)
# Return axis
return plot_ax
[docs] def list_nhc_discussions(self):
r"""
Retrieves a list of NHC forecast discussions for this storm, archived on https://ftp.nhc.noaa.gov/atcf/archive/.
Returns
-------
dict
Dictionary containing entries for each forecast discussion for this storm.
"""
# Check to ensure the data source is HURDAT
if self.source != "hurdat":
raise RuntimeError(
"Error: NHC data can only be accessed when HURDAT is used as the data source.")
# Check to ensure storm is not an invest
if self.invest:
raise RuntimeError(
"Error: NHC does not issue advisories for invests that have not been designated as Potential Tropical Cyclones.")
# Get storm ID & corresponding data URL
storm_id = self.dict['operational_id']
storm_year = self.dict['year']
# Error check
if storm_id == '':
raise RuntimeError(
"Error: This storm was identified post-operationally. No NHC operational data is available.")
# Get list of available NHC advisories & map discussions
if storm_year == (dt.now()).year:
# Get list of all discussions for all storms this year
try:
url_disco = 'https://ftp.nhc.noaa.gov/atcf/dis/'
page = requests.get(url_disco).text
content = page.split("\n")
files = []
for line in content:
if ".discus." in line and self.id.lower() in line:
filename = line.split('">')[1]
filename = filename.split("</a>")[0]
files.append(filename)
del content
except:
ftp = FTP('ftp.nhc.noaa.gov')
ftp.login()
ftp.cwd('atcf/dis')
files = ftp.nlst()
files = [
i for i in files if ".discus." in i and self.id.lower() in i]
ftp.quit()
# Read in all NHC forecast discussions
discos = {
'id': [],
'utc_time': [],
'url': [],
'mode': 0,
}
for file in files:
# Get info about forecast
file_info = file.split(".")
disco_number = int(file_info[2])
# Open file to get info about time issued
f = urllib.request.urlopen(url_disco + file)
content = f.read()
content = content.decode("utf-8")
content = content.split("\n")
f.close()
# Figure out time issued
hr = content[5].split(" ")[0]
zone = content[5].split(" ")[2]
disco_time = num_to_str2(int(hr)) + \
' '.join(content[5].split(" ")[1:])
format_time = content[5].split(" ")[0]
if len(format_time) == 3:
format_time = "0" + format_time
format_time = format_time + " " + \
' '.join(content[5].split(" ")[1:])
disco_time = dt.strptime(
format_time, f'%I00 %p {zone} %a %b %d %Y')
time_zones = {
'ADT': -3,
'AST': -4,
'EDT': -4,
'EST': -5,
'CDT': -5,
'CST': -6,
'MDT': -6,
'MST': -7,
'PDT': -7,
'PST': -8,
'HDT': -9,
'HST': -10}
offset = time_zones.get(zone, 0)
disco_time = disco_time + timedelta(hours=offset * -1)
# Add times issued
discos['id'].append(disco_number)
discos['utc_time'].append(disco_time)
discos['url'].append(url_disco + file)
elif storm_year < 1992:
raise RuntimeError("NHC discussion data is unavailable.")
elif storm_year < 2000:
# Get directory path of storm and read it in
url_disco = f"https://ftp.nhc.noaa.gov/atcf/archive/{storm_year}/messages/"
url = url_disco + f'{storm_id.lower()}_msg.zip'
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
file_like_object = BytesIO(response.read())
tar = zipfile.ZipFile(file_like_object)
except:
try:
url_disco = f"ftp://ftp.nhc.noaa.gov/atcf/archive/{storm_year}/messages/"
url = url_disco + f'{storm_id.lower()}_msg.zip'
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
file_like_object = BytesIO(response.read())
tar = zipfile.ZipFile(file_like_object)
except:
raise RuntimeError("NHC discussion data is unavailable.")
# Get file list
members = '\n'.join([i for i in tar.namelist()])
nums = "[0123456789]"
search_pattern = f'n{storm_id[0:4].lower()}{str(storm_year)[2:]}.[01]{nums}{nums}'
pattern = re.compile(search_pattern)
filelist = pattern.findall(members)
files = []
for file in filelist:
if file not in files:
files.append(file) # remove duplicates
# Read in all NHC forecast discussions
discos = {
'id': [],
'utc_time': [],
'url': [],
'mode': 4,
}
for file in files:
# Get info about forecast
file_info = file.split(".")
disco_number = int(file_info[1])
# Open file to get info about time issued
members = tar.namelist()
members_names = [i for i in members]
idx = members_names.index(file)
content = (tar.read(members[idx])).decode()
content = content.split("\n")
# Figure out time issued
slice_idx = 5 if storm_year < 1998 else 4
for temp_idx in [slice_idx, slice_idx - 1, slice_idx + 1, slice_idx - 2, slice_idx + 2]:
try:
hr = content[temp_idx].split(" ")[0]
if 'NOON' in content[temp_idx]:
temp_line = content[temp_idx].replace(
"NOON", "12 PM")
zone = temp_line.split(" ")[1]
disco_time = dt.strptime(
temp_line.rstrip(), f'%I %p {zone} %a %b %d %Y')
else:
zone = content[temp_idx].split(" ")[2]
disco_time = dt.strptime(
content[temp_idx].rstrip(), f'%I %p {zone} %a %b %d %Y')
except:
pass
time_zones = {
'ADT': -3,
'AST': -4,
'EDT': -4,
'EST': -5,
'CDT': -5,
'CST': -6,
'MDT': -6,
'MST': -7,
'PDT': -7,
'PST': -8,
'HDT': -9,
'HST': -10}
offset = time_zones.get(zone, 0)
disco_time = disco_time + timedelta(hours=offset * -1)
# Add times issued
discos['id'].append(disco_number)
discos['utc_time'].append(disco_time)
discos['url'].append(file)
response.close()
tar.close()
elif storm_year == 2000:
# Get directory path of storm and read it in
url_disco = f"https://ftp.nhc.noaa.gov/atcf/archive/{storm_year}/messages/"
url = url_disco + f'{storm_id.lower()}.msgs.tar.gz'
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
file_like_object = BytesIO(response.read())
tar = tarfile.open(fileobj=file_like_object)
except:
try:
url_disco = f"ftp://ftp.nhc.noaa.gov/atcf/archive/{storm_year}/messages/"
url = url_disco + f'{storm_id.lower()}.msgs.tar.gz'
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
file_like_object = BytesIO(response.read())
tar = tarfile.open(fileobj=file_like_object)
except:
raise RuntimeError("NHC discussion data is unavailable.")
# Get file list
members = '\n'.join([i.name for i in tar.getmembers()])
nums = "[0123456789]"
search_pattern = f'N{storm_id[0:4]}{str(storm_year)[2:]}.[01]{nums}{nums}'
pattern = re.compile(search_pattern)
filelist = pattern.findall(members)
files = []
for file in filelist:
if file not in files:
files.append(file) # remove duplicates
# Read in all NHC forecast discussions
discos = {
'id': [],
'utc_time': [],
'url': [],
'mode': 3,
}
for file in files:
# Get info about forecast
file_info = file.split(".")
disco_number = int(file_info[1])
# Open file to get info about time issued
members = tar.getmembers()
members_names = [i.name for i in members]
idx = members_names.index(file)
f = tar.extractfile(members[idx])
content = (f.read()).decode()
f.close()
content = content.split("\n")
# Figure out time issued
hr = content[4].split(" ")[0]
zone = content[4].split(" ")[2]
disco_time = num_to_str2(int(hr)) + \
' '.join(content[4].split(" ")[1:])
disco_time = dt.strptime(
content[4], f'%I %p {zone} %a %b %d %Y')
time_zones = {
'ADT': -3,
'AST': -4,
'EDT': -4,
'EST': -5,
'CDT': -5,
'CST': -6,
'MDT': -6,
'MST': -7,
'PDT': -7,
'PST': -8,
'HDT': -9,
'HST': -10}
offset = time_zones.get(zone, 0)
disco_time = disco_time + timedelta(hours=offset * -1)
# Add times issued
discos['id'].append(disco_number)
discos['utc_time'].append(disco_time)
discos['url'].append(file)
response.close()
tar.close()
elif storm_year in range(2001, 2006):
# Get directory path of storm and read it in
try:
url_disco = f"https://ftp.nhc.noaa.gov/atcf/archive/{storm_year}/messages/"
url = url_disco + f'{storm_id.lower()}_msgs.tar.gz'
if storm_year < 2003:
url = url_disco + f'{storm_id.lower()}.msgs.tar.gz'
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
file_like_object = BytesIO(response.read())
tar = tarfile.open(fileobj=file_like_object)
except:
try:
url_disco = f"ftp://ftp.nhc.noaa.gov/atcf/archive/{storm_year}/messages/"
url = url_disco + f'{storm_id.lower()}_msgs.tar.gz'
if storm_year < 2003:
url = url_disco + f'{storm_id.lower()}.msgs.tar.gz'
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
file_like_object = BytesIO(response.read())
tar = tarfile.open(fileobj=file_like_object)
except:
raise RuntimeError("NHC discussion data is unavailable.")
# Get file list
members = '\n'.join([i.name for i in tar.getmembers()])
nums = "[0123456789]"
search_pattern = f'{storm_id.lower()}.discus.[01]{nums}{nums}.{nums}{nums}{nums}{nums}{nums}{nums}{nums}{nums}'
pattern = re.compile(search_pattern)
filelist = pattern.findall(members)
files = []
for file in filelist:
if file not in files:
files.append(file) # remove duplicates
response.close()
tar.close()
# Read in all NHC forecast discussions
discos = {
'id': [],
'utc_time': [],
'url': [],
'mode': 1,
}
for file in files:
# Get info about forecast
file_info = file.split(".")
disco_number = int(file_info[2])
disco_time = file_info[3]
disco_year = storm_year
if disco_time[0:2] == "01" and int(storm_id[2:4]) > 3:
disco_year = storm_year + 1
disco_time = dt.strptime(
str(disco_year) + disco_time, '%Y%m%d%H%M')
discos['id'].append(disco_number)
discos['utc_time'].append(disco_time)
discos['url'].append(file)
if storm_year < 2003:
discos['mode'] = 2
else:
# Retrieve list of NHC discussions for this storm
try:
url_disco = f"https://ftp.nhc.noaa.gov/atcf/archive/{storm_year}/messages/"
path_disco = urllib.request.urlopen(url_disco)
string = path_disco.read().decode('utf-8')
nums = "[0123456789]"
search_pattern = f'{storm_id.lower()}.discus.[01]{nums}{nums}.{nums}{nums}{nums}{nums}{nums}{nums}{nums}{nums}'
pattern = re.compile(search_pattern)
filelist = pattern.findall(string)
files = []
for file in filelist:
if file not in files:
files.append(file) # remove duplicates
path_disco.close()
except:
try:
url_disco = f"ftp://ftp.nhc.noaa.gov/atcf/archive/{storm_year}/messages/"
path_disco = urllib.request.urlopen(url_disco)
string = path_disco.read().decode('utf-8')
nums = "[0123456789]"
search_pattern = f'{storm_id.lower()}.discus.[01]{nums}{nums}.{nums}{nums}{nums}{nums}{nums}{nums}{nums}{nums}'
pattern = re.compile(search_pattern)
filelist = pattern.findall(string)
files = []
for file in filelist:
if file not in files:
files.append(file) # remove duplicates
path_disco.close()
except:
raise RuntimeError("NHC discussion data is unavailable.")
# Read in all NHC forecast discussions
discos = {
'id': [],
'utc_time': [],
'url': [],
'mode': 0,
}
for file in files:
# Get info about forecast
file_info = file.split(".")
disco_number = int(file_info[2])
disco_time = file_info[3]
disco_year = storm_year
if disco_time[0:2] == "01" and int(storm_id[2:4]) > 3:
disco_year = storm_year + 1
disco_time = dt.strptime(
str(disco_year) + disco_time, '%Y%m%d%H%M')
discos['id'].append(disco_number)
discos['utc_time'].append(disco_time)
discos['url'].append(url_disco + file)
# Return dict entry
try:
discos
except:
raise RuntimeError("NHC discussion data is unavailable.")
if len(discos['id']) == 0:
raise RuntimeError("NHC discussion data is unavailable.")
return discos
[docs] def get_nhc_discussion(self, forecast, save_path=None):
r"""
Retrieves a single NHC forecast discussion.
Parameters
----------
forecast : datetime.datetime or int
Datetime object representing the desired forecast discussion time (in UTC), or integer representing the forecast discussion ID. If -1 is passed, the latest forecast discussion is returned.
save_path : str, optional
Directory path to save the forecast discussion text to. If None (default), forecast won't be saved.
Returns
-------
dict
Dictionary containing the forecast discussion text and accompanying information about this discussion.
"""
# Check to ensure the data source is HURDAT
if self.source != "hurdat":
msg = "Error: NHC data can only be accessed when HURDAT is used as the data source."
raise RuntimeError(msg)
# Check to ensure storm is not an invest
if self.invest:
raise RuntimeError(
"Error: NHC does not issue advisories for invests that have not been designated as Potential Tropical Cyclones.")
# Get storm ID & corresponding data URL
storm_id = self.dict['operational_id']
storm_year = self.dict['year']
# Error check
if storm_id == '':
msg = "No NHC operational data is available for this storm."
raise RuntimeError(msg)
# Error check
if not isinstance(forecast, int) and not isinstance(forecast, dt):
msg = "forecast must be of type int or datetime.datetime"
raise TypeError(msg)
# Get list of storm discussions
disco_dict = self.list_nhc_discussions()
if isinstance(forecast, dt):
# Find all discussion times
disco_times = disco_dict['utc_time']
disco_ids = [int(i) for i in disco_dict['id']]
disco_diff = np.array(
[(i - forecast).days + (i - forecast).seconds / 86400 for i in disco_times])
# Find most recent discussion
indices = np.argwhere(disco_diff <= 0)
if len(indices) > 0:
closest_idx = indices[-1][0]
else:
closest_idx = 0
closest_diff = disco_diff[closest_idx]
closest_id = disco_ids[closest_idx]
closest_time = disco_times[closest_idx]
# Raise warning if difference is >=1 day
if np.abs(closest_diff) >= 1.0:
warnings.warn("The time provided is unavailable or outside of the duration of the storm. Use the \"list_nhc_discussions()\" function to retrieve a list of available NHC discussions for this storm. Returning the closest available NHC discussion.")
if isinstance(forecast, int):
# Find closest discussion ID to the one provided
disco_times = disco_dict['utc_time']
disco_ids = [int(i) for i in disco_dict['id']]
if forecast == -1:
closest_idx = -1
else:
disco_diff = np.array([i - forecast for i in disco_ids])
closest_idx = np.abs(disco_diff).argmin()
closest_diff = disco_diff[closest_idx]
# Raise warning if difference is >=1 ids
if np.abs(closest_diff) >= 2.0:
msg = "The ID provided is unavailable or outside of the duration of the storm. Use the \"list_nhc_discussions()\" function to retrieve a list of available NHC discussions for this storm. Returning the closest available NHC discussion."
warnings.warn(msg)
closest_id = disco_ids[closest_idx]
closest_time = disco_times[closest_idx]
# Read content of NHC forecast discussion
if disco_dict['mode'] == 0:
url_disco = disco_dict['url'][closest_idx]
if requests.get(url_disco).status_code != 200:
raise RuntimeError("NHC discussion data is unavailable.")
f = urllib.request.urlopen(url_disco)
content = f.read()
content = content.decode("utf-8")
f.close()
elif disco_dict['mode'] in [1, 2, 3]:
# Get directory path of storm and read it in
url_disco = f"https://ftp.nhc.noaa.gov/atcf/archive/{storm_year}/messages/"
url = url_disco + f'{storm_id.lower()}_msgs.tar.gz'
if disco_dict['mode'] in [2, 3]:
url = url_disco + f'{storm_id.lower()}.msgs.tar.gz'
if requests.get(url).status_code != 200:
raise RuntimeError("NHC discussion data is unavailable.")
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
file_like_object = BytesIO(response.read())
tar = tarfile.open(fileobj=file_like_object)
members = tar.getmembers()
members_names = [i.name for i in members]
idx = members_names.index(disco_dict['url'][closest_idx])
f = tar.extractfile(members[idx])
content = (f.read()).decode()
f.close()
tar.close()
response.close()
elif disco_dict['mode'] in [4]:
# Get directory path of storm and read it in
url_disco = f"https://ftp.nhc.noaa.gov/atcf/archive/{storm_year}/messages/"
url = url_disco + f'{storm_id.lower()}_msg.zip'
if requests.get(url).status_code != 200:
raise RuntimeError("NHC discussion data is unavailable.")
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
file_like_object = BytesIO(response.read())
tar = zipfile.ZipFile(file_like_object)
members = tar.namelist()
members_names = [i for i in members]
idx = members_names.index(disco_dict['url'][closest_idx])
content = (tar.read(members[idx])).decode()
tar.close()
response.close()
# Save file, if specified
if save_path is not None:
closest_time = disco_times[closest_idx].strftime("%Y%m%d_%H%M")
fname = f"nhc_disco_{self.name.lower()}_{self.year}_{closest_time}.txt"
o = open(save_path + fname, "w")
o.write(content)
o.close()
# Return text of NHC forecast discussion
return {'id': closest_id, 'time_issued': closest_time, 'text': content}
[docs] def query_nhc_discussions(self, query):
r"""
Searches for the given word or phrase through all NHC forecast discussions for this storm.
Parameters
----------
query : str or list
String or list representing a word(s) or phrase(s) to search for within the NHC forecast discussions (e.g., "rapid intensification"). Query is case insensitive.
Returns
-------
list
List of dictionaries containing all relevant forecast discussions.
"""
# Check to ensure the data source is HURDAT
if self.source != "hurdat":
msg = "Error: NHC data can only be accessed when HURDAT is used as the data source."
raise RuntimeError(msg)
# Check to ensure storm is not an invest
if self.invest:
raise RuntimeError(
"Error: NHC does not issue advisories for invests that have not been designated as Potential Tropical Cyclones.")
# Get storm ID & corresponding data URL
storm_id = self.dict['operational_id']
# Error check
if storm_id == '':
msg = "No NHC operational data is available for this storm."
raise RuntimeError(msg)
if not isinstance(query, str) and not isinstance(query, list):
msg = "'query' must be of type str or list."
raise TypeError(msg)
if isinstance(query, list):
for i in query:
if not isinstance(i, str):
msg = "Entries of list 'query' must be of type str."
raise TypeError(msg)
# Get list of storm discussions
disco_dict = self.list_nhc_discussions()
# Iterate over every entry to retrieve its discussion text
output = []
for idx, forecast_time in enumerate(disco_dict['utc_time']):
# Get forecast discussion
forecast = self.get_nhc_discussion(forecast=forecast_time)
# Get forecast text and query for word
text = forecast['text'].lower()
# If found, add into list
if isinstance(query, str):
if text.find(query.lower()) >= 0:
output.append(forecast)
else:
found = False
for i_query in query:
if text.find(i_query.lower()) >= 0:
found = True
if found:
output.append(forecast)
# Return list
return output
[docs] def get_operational_forecasts(self):
r"""
Retrieves operational model and NHC forecasts throughout the entire life duration of the storm.
Returns
-------
dict
Dictionary containing all forecast entries.
Notes
-----
This function fetches all available forecasts, whether operational (e.g., NHC or JTWC), deterministic or ensemble model forecasts, from the Best Track a-deck as far back as data allows (1954 for NHC's area of responsibility, 2019 for JTWC).
For example, this code retrieves all forecasts for Hurricane Michael (2018):
.. code-block:: python
#Get Storm object
from tropycal import tracks
basin = tracks.TrackDataset()
storm = basin.get_storm(('michael',2018))
#Retrieve all forecasts
forecasts = storm.get_operational_forecasts()
The resulting dict is structured as follows:
>>> print(forecasts.keys())
dict_keys(['CARQ', 'NAM', 'AC00', 'AEMN', 'AP01', 'AP02', 'AP03', 'AP04', 'AP05', 'AP06', 'AP07', 'AP08', 'AP09',
'AP10', 'AP11', 'AP12', 'AP13', 'AP14', 'AP15', 'AP16', 'AP17', 'AP18', 'AP19', 'AP20', 'AVNO', 'AVNX', 'CLP5',
'CTCX', 'DSHP', 'GFSO', 'HCCA', 'IVCN', 'IVDR', 'LGEM', 'OCD5', 'PRFV', 'SHF5', 'SHIP', 'TABD', 'TABM', 'TABS',
'TCLP', 'XTRP', 'CMC', 'NGX', 'UKX', 'AEMI', 'AHNI', 'AVNI', 'CEMN', 'CHCI', 'CTCI', 'DSPE', 'EGRR', 'LGME',
'NAMI', 'NEMN', 'RVCN', 'RVCX', 'SHPE', 'TBDE', 'TBME', 'TBSE', 'TVCA', 'TVCE', 'TVCN', 'TVCX', 'TVDG', 'UE00',
'UE01', 'UE02', 'UE03', 'UE04', 'UE05', 'UE06', 'UE07', 'UE08', 'UE09', 'UE10', 'UE11', 'UE12', 'UE13', 'UE14',
'UE15', 'UE16', 'UE17', 'UE18', 'UE19', 'UE20', 'UE21', 'UE22', 'UE23', 'UE24', 'UE25', 'UE26', 'UE27', 'UE28',
'UE29', 'UE30', 'UE31', 'UE32', 'UE33', 'UE34', 'UE35', 'UEMN', 'CEMI', 'CMCI', 'COTC', 'EGRI', 'HMON', 'HWRF',
'NGXI', 'NVGM', 'PRV2', 'PRVI', 'UEMI', 'UKXI', 'CEM2', 'CMC2', 'COTI', 'EGR2', 'HHFI', 'HHNI', 'HMNI', 'HWFI',
'ICON', 'IVRI', 'NGX2', 'NVGI', 'OFCP', 'TCOA', 'TCOE', 'TCON', 'UEM2', 'UKX2', 'CHC2', 'CTC2', 'NAM2', 'OFCL',
'OFPI', 'AEM2', 'AHN2', 'AVN2', 'DRCL', 'HHF2', 'HHN2', 'HMN2', 'HWF2', 'NVG2', 'OFCI', 'OFP2', 'FSSE', 'RI25', 'RI30'])
Each of these keys represents a forecast model/ensemble member/center. If we select the GFS (AVNO), we now get a dictionary containing all forecast initializations for this model:
>>> print(forecasts['AVNO'].keys())
dict_keys(['2018100518', '2018100600', '2018100606', '2018100612', '2018100700', '2018100706', '2018100712',
'2018100718', '2018100800', '2018100806', '2018100812', '2018100818', '2018100900', '2018100906', '2018100912',
'2018100918', '2018101000', '2018101006', '2018101012', '2018101018', '2018101100', '2018101106', '2018101112',
'2018101118', '2018101200', '2018101206', '2018101212'])
Providing a forecast initialization, for example 1200 UTC 8 October 2018 (2018100812), we now get a forecast dict containing the GFS initialized at this time:
>>> print(forecasts['AVNO']['2018100812'])
{'fhr': [0, 6, 12, 18, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78, 84, 90, 96, 102, 108, 114, 120, 126, 132, 138, 144],
'lat': [20.8, 21.7, 22.7, 24.0, 25.1, 25.9, 26.9, 27.9, 29.0, 30.0, 31.1, 32.1, 33.1, 34.1, 35.4, 37.2, 38.9, 40.2,
42.4, 44.9, 46.4, 48.2, 49.3, 50.5, 51.3],
'lon': [-85.1, -85.1, -85.2, -85.6, -86.4, -86.8, -86.9, -86.9, -86.6, -86.2, -85.3, -84.4, -83.0, -81.3, -78.9,
-75.9, -72.2, -67.8, -62.5, -56.5, -50.5, -44.4, -38.8, -32.8, -27.9],
'vmax': [57, 62, 61, 67, 74, 69, 75, 78, 80, 76, 47, 38, 34, 36, 38, 41, 52, 58, 55, 51, 48, 44, 37, 36, 37],
'mslp': [984, 982, 979, 974, 972, 970, 966, 964, 957, 959, 969, 982, 988, 993, 994, 990, 984, 978, 978, 975, 979,
984, 987, 989, 991],
'type': ['XX', 'XX', 'XX', 'XX', 'XX', 'XX', 'XX', 'XX', 'XX', 'XX', 'XX', 'XX', 'XX', 'XX', 'XX', 'XX', 'XX',
'XX', 'XX', 'XX', 'XX', 'XX', 'XX', 'XX', 'XX'],
'init': datetime.datetime(2018, 10, 8, 12, 0)}
"""
# Real time ensemble data:
# https://www.ftp.ncep.noaa.gov/data/nccf/com/ens_tracker/prod/
# If forecasts dict already exist, simply return the dict
try:
self.forecast_dict
return self.forecast_dict
except:
pass
# Follow HURDAT procedure
if self.source == "hurdat":
# Get storm ID & corresponding data URL
storm_id = self.dict['operational_id']
storm_year = self.dict['year']
if storm_year <= 2006:
storm_id = self.dict['id']
if storm_year < 1954:
msg = "Forecast data is unavailable for storms prior to 1954."
raise RuntimeError(msg)
# Error check
if storm_id == '':
msg = "No NHC operational data is available for this storm."
raise RuntimeError(msg)
# Check if archive directory exists for requested year, if not redirect to realtime directory
url_models = f"https://ftp.nhc.noaa.gov/atcf/archive/{storm_year}/a{storm_id.lower()}.dat.gz"
if requests.get(url_models).status_code != 200:
url_models = f"https://ftp.nhc.noaa.gov/atcf/aid_public/a{storm_id.lower()}.dat.gz"
# Retrieve model data text
if requests.get(url_models).status_code == 200:
request = urllib.request.Request(url_models)
response = urllib.request.urlopen(request)
sio_buffer = BytesIO(response.read())
gzf = gzip.GzipFile(fileobj=sio_buffer)
data = gzf.read()
content = data.splitlines()
content = [(i.decode()).split(",") for i in content]
content = [i for i in content if len(i) > 10]
response.close()
else:
raise RuntimeError(
"No operational model data is available for this storm.")
# Follow JTWC procedure
else:
url_models_noaa = f"https://www.ssd.noaa.gov/PS/TROP/DATA/ATCF/JTWC/a{self.id.lower()}.dat"
url_models_ucar = f"http://hurricanes.ral.ucar.edu/repository/data/adecks_open/{self.year}/a{self.id.lower()}.dat"
# Retrieve model data text
try:
content = read_url(url_models_noaa, split=True, subsplit=False)
except:
try:
content = read_url(
url_models_ucar, split=True, subsplit=False)
except:
raise RuntimeError(
"No operational model data is available for this storm.")
content = [i.split(",") for i in content]
content = [i for i in content if len(i) > 10]
# Iterate through every line in content:
forecasts = {}
for line in content:
# Get basic components
lineArray = [i.replace(" ", "") for i in line]
try:
basin, number, run_init, n_a, model, fhr, lat, lon, vmax, mslp, stype, rad, windcode, neq, seq, swq, nwq = lineArray[
:17]
use_wind = True
except:
basin, number, run_init, n_a, model, fhr, lat, lon, vmax, mslp, stype = lineArray[
:11]
use_wind = False
# Check init time is within storm time range
run_init_dt = dt.strptime(run_init, '%Y%m%d%H')
if run_init_dt < self.dict['time'][0] - timedelta(hours=6) or run_init_dt > self.dict['time'][-1] + timedelta(hours=6):
continue
# Skip erroneous lines
try:
if int(fhr) > 240:
continue
except:
continue
# Enter into forecast dict
if model not in forecasts.keys():
forecasts[model] = {}
if run_init not in forecasts[model].keys():
forecasts[model][run_init] = {
'init': run_init_dt, 'fhr': [], 'lat': [], 'lon': [], 'vmax': [], 'mslp': [], 'type': [], 'windrad': []
}
# Format lat & lon
fhr = int(fhr)
if "N" in lat:
lat_temp = lat.split("N")[0]
lat = round(float(lat_temp) * 0.1, 1)
elif "S" in lat:
lat_temp = lat.split("S")[0]
lat = round(float(lat_temp) * -0.1, 1)
if "W" in lon:
lon_temp = lon.split("W")[0]
lon = round(float(lon_temp) * -0.1, 1)
elif "E" in lon:
lon_temp = lon.split("E")[0]
lon = round(float(lon_temp) * 0.1, 1)
# Format vmax & MSLP
if vmax == '':
vmax = np.nan
else:
vmax = int(vmax)
if vmax < 10 or vmax > 300:
vmax = np.nan
if mslp == '':
mslp = np.nan
else:
mslp = int(mslp)
if mslp < 1:
mslp = np.nan
# Format wind radii
if use_wind:
try:
rad = int(rad)
if rad in [0, 35]:
rad = 34
neq = np.nan if windcode == '' else int(neq)
seq = np.nan if windcode in ['', 'AAA'] else int(seq)
swq = np.nan if windcode in ['', 'AAA'] else int(swq)
nwq = np.nan if windcode in ['', 'AAA'] else int(nwq)
except:
rad = 34
neq = np.nan
seq = np.nan
swq = np.nan
nwq = np.nan
else:
rad = 34
neq = np.nan
seq = np.nan
swq = np.nan
nwq = np.nan
# Add forecast data to dict if forecast hour isn't already there
if fhr not in forecasts[model][run_init]['fhr']:
if model in ['OFCL', 'OFCI'] and fhr > 120:
pass
else:
if lat == 0.0 and lon == 0.0:
continue
forecasts[model][run_init]['fhr'].append(fhr)
forecasts[model][run_init]['lat'].append(lat)
forecasts[model][run_init]['lon'].append(lon)
forecasts[model][run_init]['vmax'].append(vmax)
forecasts[model][run_init]['mslp'].append(mslp)
forecasts[model][run_init]['windrad'].append(
{rad: [neq, seq, swq, nwq]})
# Get storm type, if it can be determined
if stype in ['', 'DB'] and vmax != 0 and not np.isnan(vmax):
stype = get_storm_type(vmax, False)
forecasts[model][run_init]['type'].append(stype)
else:
ifhr = forecasts[model][run_init]['fhr'].index(fhr)
forecasts[model][run_init]['windrad'][ifhr][rad] = [
neq, seq, swq, nwq]
# Save dict locally
self.forecast_dict = forecasts
# Return dict
return forecasts
[docs] def get_nhc_forecast_dict(self, time):
r"""
Retreive a dictionary of official NHC forecasts for a valid time.
Parameters
----------
time : datetime.datetime
Time of requested forecast.
Returns
-------
dict
Dictionary containing forecast data.
Notes
-----
This dict can be provided to ``utils.generate_nhc_cone()`` to generate the cone of uncertainty. Below is an example forecast dict for Hurricane Michael (2018):
>>> storm.get_nhc_forecast_dict(dt.datetime(2018,10,8,0))
{'fhr': [0, 3, 12, 24, 36, 48, 72, 96, 120],
'lat': [19.8, 20.0, 21.1, 22.7, 24.4, 26.3, 30.4, 34.9, 40.7],
'lon': [-85.4, -85.4, -85.3, -85.6, -86.0, -86.1, -84.5, -78.4, -64.4],
'vmax': [50, 50, 60, 65, 75, 85, 75, 55, 55],
'mslp': [nan, 997, nan, nan, nan, nan, nan, nan, nan],
'type': ['TS', 'TS', 'TS', 'HU', 'HU', 'HU', 'HU', 'TS', 'TS'],
'windrad': [{34: [120, 150, 90, 90], 50: [40, 0, 0, 0]},
{34: [120, 150, 90, 90], 50: [40, 0, 0, 0]},
{34: [120, 150, 90, 90], 50: [40, 40, 0, 0]},
{34: [130, 140, 90, 90], 50: [50, 50, 0, 0], 64: [20, 20, 0, 0]},
{34: [130, 130, 80, 90], 50: [50, 50, 0, 0], 64: [20, 20, 0, 0]},
{34: [130, 130, 70, 90], 50: [60, 60, 30, 40], 64: [25, 25, 15, 25]},
{34: [130, 130, 70, 80], 50: [60, 60, 30, 40]},
{34: [0, 0, 0, 0]},
{34: [0, 0, 0, 0]}],
'init': datetime.datetime(2018, 10, 8, 0, 0)}
As of Tropycal v0.5, ``windrad`` represents the forecast sustained wind radii (34, 50 and 64 knots) organized by [NE quadrant,SE quadrant,SW quadrant,NW quadrant] in nautical miles.
"""
# Check to ensure the data source is HURDAT
if self.source != "hurdat":
raise RuntimeError(
"Error: NHC data can only be accessed when HURDAT is used as the data source.")
# Check to ensure storm is not an invest
if self.invest:
raise RuntimeError(
"Error: NHC does not issue advisories for invests that have not been designated as Potential Tropical Cyclones.")
# Get forecasts dict saved into storm object, if it hasn't been already
try:
self.forecast_dict
except:
self.get_operational_forecasts()
# Get all NHC forecast entries
nhc_forecasts = self.forecast_dict['OFCL']
# Get list of all NHC forecast initializations
nhc_forecast_init = [k for k in nhc_forecasts.keys()]
# Find closest matching time to the provided forecast time
nhc_forecast_init_dt = [dt.strptime(
k, '%Y%m%d%H') for k in nhc_forecast_init]
time_diff = np.array(
[(i - time).days + (i - time).seconds / 86400 for i in nhc_forecast_init_dt])
closest_idx = np.abs(time_diff).argmin()
forecast_dict = nhc_forecasts[nhc_forecast_init[closest_idx]]
if np.abs(time_diff[closest_idx]) >= 1.0:
warnings.warn(
f"The time provided is outside of the duration of the storm. Returning the closest available NHC forecast.")
return forecast_dict
[docs] def download_tcr(self, save_path=""):
r"""
Downloads the NHC offical Tropical Cyclone Report (TCR) for the requested storm to the requested directory. Available only for storms with advisories issued by the National Hurricane Center.
Parameters
----------
save_path : str
Path of directory to download the TCR into. Default is current working directory.
"""
# Check to ensure storm is not an invest
if self.invest:
raise RuntimeError(
"Error: NHC does not issue advisories for invests that have not been designated as Potential Tropical Cyclones.")
# Error check
if self.source != "hurdat":
msg = "NHC data can only be accessed when HURDAT is used as the data source."
raise RuntimeError(msg)
if self.year < 1995:
msg = "Tropical Cyclone Reports are unavailable prior to 1995."
raise RuntimeError(msg)
if not isinstance(save_path, str):
msg = "'save_path' must be of type str."
raise TypeError(msg)
# Format URL
storm_id = self.dict['id'].upper()
storm_name = self.dict['name'].title()
url = f"https://www.nhc.noaa.gov/data/tcr/{storm_id}_{storm_name}.pdf"
# Check to make sure PDF is available
request = requests.get(url)
if request.status_code != 200:
msg = "This tropical cyclone does not have a Tropical Cyclone Report (TCR) available."
raise RuntimeError(msg)
# Retrieve PDF
response = requests.get(url)
full_path = os.path.join(save_path, f"TCR_{storm_id}_{storm_name}.pdf")
with open(full_path, 'wb') as f:
f.write(response.content)
[docs] def plot_tors(self, dist_thresh=1000, Tors=None, domain="dynamic", plotPPH=False, plot_all=False,
ax=None, cartopy_proj=None, save_path=None, **kwargs):
r"""
Creates a plot of the storm and associated tornado tracks.
Parameters
----------
dist_thresh : int
Distance threshold (in kilometers) from the tropical cyclone track over which to attribute tornadoes to the TC. Default is 1000 km.
Tors : pandas.DataFrame
DataFrame containing tornado data associated with the storm. If None, data is automatically retrieved from TornadoDatabase. A dataframe of tornadoes associated with the TC will then be saved to this instance of storm for future use.
domain : str
Domain for the plot. Default is "dynamic". Please refer to :ref:`options-domain` for available domain options.
plotPPH : bool or str
Whether to plot practically perfect forecast (PPH). True defaults to "daily". Default is False.
* **False** - no PPH plot.
* **True** - defaults to "daily".
* **"total"** - probability of a tornado within 25mi of a point during the period of time selected.
* **"daily"** - average probability of a tornado within 25mi of a point during a day starting at 12 UTC.
plot_all : bool
Whether to plot dots for all observations along the track. If false, dots will be plotted every 6 hours. Default is false.
ax : axes
Instance of axes to plot on. If none, one will be generated. Default is none.
cartopy_proj : ccrs
Instance of a cartopy projection to use. If none, one will be generated. Default is none.
save_path : str
Relative or full path of directory to save the image in. If none, image will not be saved.
Other Parameters
----------------
prop : dict
Customization properties of plot.
map_prop : dict
Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options.
Returns
-------
ax
Instance of axes containing the plot is returned.
"""
# Retrieve kwargs
prop = kwargs.pop('prop', {})
map_prop = kwargs.pop('map_prop', {})
# Set default colormap for TC plots to Wistia
try:
prop['PPHcolors']
except:
prop['PPHcolors'] = 'Wistia'
if Tors is None:
try:
self.stormTors
except:
warn_message = "Reading in tornado data for this storm. If you seek to analyze tornado data for multiple storms, run \"TrackDataset.assign_storm_tornadoes()\" to avoid this warning in the future."
warnings.warn(warn_message)
Tors = TornadoDataset()
self.stormTors = Tors.get_storm_tornadoes(self, dist_thresh)
# Error check if no tornadoes are found
if len(self.stormTors) == 0:
raise RuntimeError("No tornadoes were found with this storm.")
# Warning if few tornadoes were found
if len(self.stormTors) < 5:
warn_message = f"{len(self.stormTors)} tornadoes were found with this storm. Default domain to east_conus."
warnings.warn(warn_message)
domain = 'east_conus'
# Create instance of plot object
self.plot_obj_tc = TrackPlot()
try:
self.plot_obj_tor = TornadoPlot()
except:
from ..tornado.plot import TornadoPlot
self.plot_obj_tor = TornadoPlot()
# Create cartopy projection
if cartopy_proj is None:
if max(self.dict['lon']) > 150 or min(self.dict['lon']) < -150:
self.plot_obj_tor.create_cartopy(
proj='PlateCarree', central_longitude=180.0)
self.plot_obj_tc.create_cartopy(
proj='PlateCarree', central_longitude=180.0)
else:
self.plot_obj_tor.create_cartopy(
proj='PlateCarree', central_longitude=0.0)
self.plot_obj_tc.create_cartopy(
proj='PlateCarree', central_longitude=0.0)
# Plot tornadoes
plot_ax, leg_tor, domain = self.plot_obj_tor.plot_tornadoes(self.stormTors, domain, ax=ax, return_ax=True, return_domain=True,
plotPPH=plotPPH, prop=prop, map_prop=map_prop)
tor_title = plot_ax.get_title('left')
# Plot storm
plot_ax = self.plot_obj_tc.plot_storms(
[self.dict], domain=domain, ax=plot_ax, prop=prop, map_prop=map_prop)
plot_ax.add_artist(leg_tor)
storm_title = plot_ax.get_title('left')
plot_ax.set_title(f'{storm_title}\n{tor_title}',
loc='left', fontsize=17, fontweight='bold')
# Save plot
if save_path is not None and isinstance(save_path, str):
plt.savefig(save_path, bbox_inches='tight')
# Return axis
return plot_ax
[docs] def plot_TCtors_rotated(self, dist_thresh=1000, save_path=None):
r"""
Plot tracks of tornadoes relative to the storm motion vector of the tropical cyclone.
Parameters
----------
dist_thresh : int
Distance threshold (in kilometers) from the tropical cyclone track over which to attribute tornadoes to the TC. Default is 1000 km. Ignored if tornado data was passed into Storm from TrackDataset.
save_path : str
Relative or full path of directory to save the image in. If none, image will not be saved.
Returns
-------
ax
Instance of axes containing the plot is returned.
Notes
-----
The motion vector is oriented upwards (in the +y direction).
"""
# Checks to see if stormTors exists
try:
self.stormTors
dist_thresh = self.tornado_dist_thresh
except:
warn_message = "Reading in tornado data for this storm. If you seek to analyze tornado data for multiple storms, run \"TrackDataset.assign_storm_tornadoes()\" to avoid this warning in the future."
warnings.warn(warn_message)
Tors = TornadoDataset()
stormTors = Tors.get_storm_tornadoes(self, dist_thresh)
self.stormTors = Tors.rotateToHeading(self, stormTors)
# Create figure for plotting
plt.figure(figsize=(9, 9), dpi=150)
ax = plt.subplot()
# Default EF color scale
EFcolors = get_colors_ef('default')
# Plot all tornado tracks in motion relative coords
for _, row in self.stormTors.iterrows():
plt.plot([row['rot_xdist_s'], row['rot_xdist_e'] + .01], [row['rot_ydist_s'], row['rot_ydist_e'] + .01],
lw=2, c=EFcolors[row['mag']])
# Plot dist_thresh radius
ax.set_facecolor('#F6F6F6')
circle = plt.Circle((0, 0), dist_thresh, color='w')
ax.add_artist(circle)
an = np.linspace(0, 2 * np.pi, 100)
ax.plot(dist_thresh * np.cos(an), dist_thresh * np.sin(an), 'k')
ax.plot([-dist_thresh, dist_thresh], [0, 0], 'k--', lw=.5)
ax.plot([0, 0], [-dist_thresh, dist_thresh], 'k--', lw=.5)
# Plot motion vector
plt.arrow(0, -dist_thresh * .1, 0, dist_thresh * .2, length_includes_head=True,
head_width=45, head_length=45, fc='k', lw=2, zorder=100)
# Labels
ax.set_aspect('equal', 'box')
ax.set_xlabel('Left/Right of Storm Heading (km)', fontsize=13)
ax.set_ylabel('Behind/Ahead of Storm Heading (km)', fontsize=13)
ax.set_title(
f'{self.name} {self.year} tornadoes relative to heading', fontsize=17)
ax.tick_params(axis='both', which='major', labelsize=11.5)
# Add legend
handles = []
for ef, color in enumerate(EFcolors):
count = len(self.stormTors[self.stormTors['mag'] == ef])
handles.append(mlines.Line2D([], [], linestyle='-',
color=color, label=f'EF-{ef} ({count})'))
ax.legend(handles=handles, loc='lower left', fontsize=11.5)
# Add attribution
ax.text(0.99, 0.01, plot_credit(), fontsize=8, color='k', alpha=0.7,
transform=ax.transAxes, ha='right', va='bottom', zorder=10)
# Save plot
if save_path is not None and isinstance(save_path, str):
plt.savefig(save_path, bbox_inches='tight')
# Return axis or show figure
return ax
[docs] def get_recon(self, path_vdm=None, path_hdobs=None, path_dropsondes=None):
r"""
Retrieves all aircraft reconnaissance data for this storm.
Parameters
----------
path_vdm : str, optional
Filepath of pickle file containing VDM data retrieved from ``vdms.to_pickle()``. If provided, data will be retrieved from the local pickle file instead of the NHC server.
path_hdobs : str, optional
Filepath of pickle file containing HDOBs data retrieved from ``hdobs.to_pickle()``. If provided, data will be retrieved from the local pickle file instead of the NHC server.
path_dropsondes : str, optional
Filepath of pickle file containing dropsonde data retrieved from ``dropsondes.to_pickle()``. If provided, data will be retrieved from the local pickle file instead of the NHC server.
Returns
-------
ReconDataset
Instance of ReconDataset is returned.
Notes
-----
In addition to returning an instance of ``ReconDataset``, this function additionally stores it as an attribute of this Storm object, such that all attributes and methods associated with the ``vdms``, ``hdobs`` and ``dropsondes`` classes can be directly accessed from this Storm object.
One method of accessing the ``hdobs.plot_points()`` method is as follows:
.. code-block:: python
#Get data for Hurricane Michael (2018)
from tropycal import tracks
basin = tracks.TrackDataset()
storm = basin.get_storm(('michael',2018))
#Get all recon data for this storm
storm.get_recon()
#Plot HDOBs points
storm.recon.hdobs.plot_points()
The other method is using the returned ReconDataset instance from this function:
.. code-block:: python
#Get data for Hurricane Michael (2018)
from tropycal import tracks
basin = tracks.TrackDataset()
storm = basin.get_storm(('michael',2018))
#Get all recon data for this storm
recon = storm.get_recon()
#Plot HDOBs points
recon.hdobs.plot_points()
"""
self.recon.get_vdms(data=path_vdm)
self.recon.get_hdobs(data=path_hdobs)
self.recon.get_dropsondes(data=path_dropsondes)
return self.recon
[docs] def search_ships(self):
r"""
Searches for available SHIPS files for this storm, if available.
Returns
-------
list
List of available SHIPS times.
Notes
-----
SHIPS data is available courtesy of the UCAR Research Applications Laboratory (RAL).
These available times can be plugged into ``Storm.get_ships()`` to get an object containing SHIPS data initialized at this time.
"""
# Error check
if self.year <= 2010:
raise ValueError('SHIPS data is unavailable prior to 2011.')
# Format basin name and ID
basin_dict = {
'north_atlantic':'northatlantic',
'east_pacific':'northeastpacific',
'west_pacific':'northwestpacific',
'north_indian':'northindian'
}
basin_name = basin_dict.get(self.basin,'southernhemisphere')
reformatted_id = f'{self.id[:-4]}{self.id[-2:]}'
# Format URL from RAL and retrieve file list
url = f'http://hurricanes.ral.ucar.edu/realtime/plots/{basin_name}/{self.year}/{self.id.lower()}/stext/'
try:
page = requests.get(url).text
except:
raise ValueError('SHIPS data is unavailable for the requested storm.')
content = page.split("\n")
files = []
for line in content:
if '<a href="' in line and '_ships.txt' in line:
filename = (line.split('<a href="')[1]).split('">')[0]
# Remove entries outside of duration of storm
time = dt.strptime(filename[:8],'%y%m%d%H')
if time < self.time[0] or time > self.time[-1]: continue
# Add entries definitely associated with this storm
if reformatted_id == filename.split('_ships')[0][-6:]:
files.append(filename)
# Organize by date and format for printing
return sorted([dt.strptime(i[:8],'%y%m%d%H') for i in files])
[docs] def get_ships(self, time):
r"""
Retrieves a Ships object containing SHIPS data for a requested time.
Parameters
----------
time : datetime.datetime
Requested time of SHIPS forecast.
Returns
-------
tropycal.ships.Ships
Instance of a Ships object containing SHIPS data for the requested time.
Notes
-----
SHIPS data is available courtesy of the UCAR Research Applications Laboratory (RAL).
1. A list of available times for SHIPS data can be retrieved using ``Storm.search_ships()``.
2. On rare occasions, SHIPS data files from UCAR have empty data associated with them. In these cases, a value of None is returned.
"""
# Format URL
basin_dict = {
'north_atlantic':'northatlantic',
'east_pacific':'northeastpacific',
'west_pacific':'northwestpacific',
'north_indian':'northindian'
}
basin_name = basin_dict.get(self.basin,'southernhemisphere')
url = f'http://hurricanes.ral.ucar.edu/realtime/plots/{basin_name}/{self.year}/{self.id.lower()}/stext/'
url += f'{time.strftime("%y%m%d%H")}{self.id[:-4]}{self.id[-2:]}_ships.txt'
# Fetch SHIPS content
try:
content = read_url(url, split=False, subsplit=False)
if len(content) < 10:
warnings.warn('Improper SHIPS entry for this time. Returning a value of None.')
return None
except:
raise ValueError('SHIPS data is unavailable for the requested storm or time.')
return Ships(content)
[docs] def get_ships_analysis(self):
r"""
Creates a custom Storm object consisting of SHIPS initialized analyses. This includes the various diagnostics SHIPS provides, such as sea surface temperatures and ocean heat content, as variables of this object.
Returns
-------
tropycal.tracks.Storm
Custom Storm object containing SHIPS analysis data.
Notes
-----
1. SHIPS does not include MSLP data. As such, all MSLP data for this Storm object is set to NaN.
2. Track and intensity data are based off operational analyses, and as such may differ from the storm's HURDATv2 entry.
3. Storm type may erroneously reflect a tropical cyclone during times when the storm was extratropical, especially towards the end of its lifetime.
"""
# Get SHIPS times
times = self.search_ships()
if len(times) <= 1:
raise RuntimeError('SHIPS data is unavailable for the requested storm.')
# Declare dict
new_dict = {
'time': [],
'mslp': [],
'type': [],
'vmax': [],
'wmo_basin': [],
}
for attr in ['name', 'id', 'operational_id', 'year', 'season', 'basin', 'realtime']:
new_dict[attr] = self[attr]
new_dict['ace'] = 0.0
# Construct data
for time in times:
ships = self.get_ships(time)
if ships is None: continue
if np.isnan(ships.lat[0]) or np.isnan(ships.lon[0]): continue
# Add relevant variables
new_dict['time'].append(time)
new_dict['mslp'].append(np.nan)
for key in ships.dict.keys():
if key in ['fhr', 'vmax_noland_kt', 'vmax_lgem_kt']: continue
# Special handling for storm type
if key == 'storm_type':
subtropical_flag = False
derived_type = 'EX'
try:
if ships.dict['storm_type'][0] == 'SUBT':
subtropical_flag = True
derived_type = get_storm_type(ships.dict['vmax_land_kt'][0], subtropical_flag)
if ships.dict['storm_type'][0] not in ['TROP', 'SUBT']:
derived_type = 'EX'
except:
pass
new_dict['type'].append(derived_type)
# vmax handling
elif key == 'vmax_land_kt':
new_dict['vmax'].append(ships.dict[key][0])
# Normal handling
elif key in new_dict:
new_dict[key].append(ships.dict[key][0])
else:
new_dict[key] = [ships.dict[key][0]]
# Derive ACE
if not np.isnan(new_dict['vmax'][-1]):
new_dict['ace'] += accumulated_cyclone_energy(new_dict['vmax'][-1])
# Derive basin
new_dict['wmo_basin'].append(get_basin(new_dict['lat'][-1],
new_dict['lon'][-1],
self.basin))
# Add other attributes
new_dict['source_info'] = 'SHIPS Analysis'
new_dict['source_method'] = 'UCAR SHIPS Archive'
new_dict['source_url'] = 'https://hurricanes.ral.ucar.edu/'
new_dict['invest'] = False
new_dict['source'] = 'ships'
new_dict['prob_2day'] = 'N/A'
new_dict['prob_7day'] = 'N/A'
new_dict['risk_2day'] = 'N/A'
new_dict['risk_7day'] = 'N/A'
return Storm(new_dict)
[docs] def get_archer(self):
r"""
Retrieves satellite-derived ARCHER track data for this storm, if available.
Returns
-------
dict
Dictionary containing ARCHER data for this storm.
Notes
-----
The ARCHER (Automated Rotational Center Hurricane Eye Retrieval) data is provided courtesy of the `University of Wisconsin`_. This data is at a much higher temporal resolution than the Best Track data.
This function additionally saves the ARCHER data as an attribute of this object (storm.archer).
.. _University of Wisconsin: http://tropic.ssec.wisc.edu/real-time/archerOnline/web/index.shtml
"""
# Format URL
url = f'http://tropic.ssec.wisc.edu/real-time/adt/archive{self.year}/{self.id[2:4]}{self.id[1]}-list.txt'
# Read in data
a = requests.get(url).content.decode("utf-8")
content = [[c.strip() for c in b.split()] for b in a.split('\n')]
# data = [[dt.strptime(line[0]+'/'+line[1][:4],'%Y%b%d/%H%M'),-1*float(line[-4]),float(line[-5])] for line in content[-100:-3]]
archer = {}
for name in ['time', 'lat', 'lon', 'mnCldTmp']:
archer[name] = []
for i, line in enumerate(content):
try:
ndx = ('MWinit' in line[-1])
archer['time'].append(dt.strptime(
line[0] + '/' + line[1][:4], '%Y%b%d/%H%M'))
archer['lat'].append(float(line[-5 - ndx]))
archer['lon'].append(-1 * float(line[-4 - ndx]))
archer['mnCldTmp'].append(float(line[-9 - ndx]))
except:
continue
self.archer = archer
return archer