r"""Functionality for storing and analyzing an individual realtime storm."""
import numpy as np
import scipy.interpolate as interp
import urllib
import warnings
from datetime import datetime as dt, timedelta
import requests
from ftplib import FTP
from ..tracks import *
from ..tracks.tools import *
from ..tracks.plot import TrackPlot
from ..utils import *
from .. import constants
from ..recon import ReconDataset
from ..ships import Ships
[docs]class RealtimeStorm(Storm):
r"""
Initializes an instance of RealtimeStorm. This object inherits all the methods and functionality of ``tropycal.tracks.Storm``, but with additional methods unique to this object, all containing the word "realtime" as part of the function name.
Parameters
----------
storm : dict
Dict entry of the requested storm.
Returns
-------
RealtimeStorm
Instance of a RealtimeStorm object.
Notes
-----
A RealtimeStorm object is retrieved from a Realtime object's ``get_storm()`` method, or directly as an attribute of the Realtime object. For example, if an active storm has an ID of 'EP012022', it can be retrieved as such:
.. code-block:: python
from tropycal import realtime
realtime_obj = realtime.Realtime()
storm = realtime_obj.get_storm('EP012022')
Now this storm's data is stored in the variable ``storm``, which is an instance of RealtimeStorm and can access all of the methods and attributes of a RealtimeStorm object.
All the variables associated with a RealtimeStorm object (e.g., lat, lon, time, vmax) can be accessed in two ways. The first is directly from the RealtimeStorm object:
>>> storm.lat
array([ 9.8, 10.3, 10.8, 11.4, 11.9, 12.1, 12.2, 12.4, 12.6, 12.8, 13. ,
12.9, 12.8, 12.9, 13.2, 13.6, 13.8, 13.9, 14. , 14. , 14.3, 14.6,
15.1, 15.4])
The second is via ``storm.vars``, which returns a dictionary of the variables associated with the RealtimeStorm object. This is also a quick way to access all of the variables associated with a RealtimeStorm object:
>>> variable_dict = storm.vars
>>> lat = variable_dict['lat']
>>> lon = variable_dict['lon']
>>> print(variable_dict.keys())
dict_keys(['time', 'extra_obs', 'special', 'type', 'lat', 'lon', 'vmax', 'mslp', 'wmo_basin'])
RealtimeStorm objects also have numerous attributes with information about the storm. ``storm.attrs`` returns a dictionary of the attributes for this RealtimeStorm object.
It should be noted that RealtimeStorm objects have additional attributes that Storm objects do not, specifically for 2 and 5 day NHC formation probability. These only display values for invests within NHC's area of responsibility; tropical cyclones or invests in JTWC's area of responsibility display "N/A".
>>> print(storm.attrs)
{'id': 'EP012022',
'operational_id': 'EP012022',
'name': 'AGATHA',
'year': 2022,
'season': 2022,
'basin': 'east_pacific',
'source_info': 'NHC Hurricane Database',
'invest': False,
'source_method': "NHC's Automated Tropical Cyclone Forecasting System (ATCF)",
'source_url': 'https://ftp.nhc.noaa.gov/atcf/btk/',
'source': 'hurdat',
'ace': 6.055,
'prob_2day': 'N/A',
'prob_5day': 'N/A',
'risk_2day': 'N/A',
'risk_5day': 'N/A',
'realtime': True}
"""
def __setitem__(self, key, value):
self.__dict__[key] = value
def __getitem__(self, key):
return self.__dict__[key]
def __repr__(self):
# Label object
summary = ["<tropycal.realtime.RealtimeStorm>"]
# Format keys for summary
type_array = np.array(self.dict['type'])
idx = np.where((type_array == 'SD') | (type_array == 'SS') | (type_array == 'TD') | (
type_array == 'TS') | (type_array == 'HU') | (type_array == 'TY') | (type_array == 'ST'))[0]
if self.invest and len(idx) == 0:
idx = np.array([True for i in type_array])
if len(idx) == 0:
start_time = 'N/A'
end_time = 'N/A'
max_wind = 'N/A'
min_mslp = 'N/A'
else:
time_tropical = np.array(self.dict['time'])[idx]
start_time = time_tropical[0].strftime("%H00 UTC %d %B %Y")
end_time = time_tropical[-1].strftime("%H00 UTC %d %B %Y")
max_wind = 'N/A' if all_nan(np.array(self.dict['vmax'])[idx]) else int(np.nanmax(np.array(self.dict['vmax'])[idx]))
min_mslp = 'N/A' if all_nan(np.array(self.dict['mslp'])[idx]) else int(np.nanmin(np.array(self.dict['mslp'])[idx]))
summary_keys = {
'Maximum Wind': f"{max_wind} knots",
'Minimum Pressure': f"{min_mslp} hPa",
'Start Time': start_time,
'End Time': end_time,
}
# Format keys for coordinates
variable_keys = {}
for key in self.vars.keys():
dtype = type(self.vars[key][0]).__name__
dtype = dtype.replace("_", "")
variable_keys[key] = f"({dtype}) [{self.vars[key][0]} .... {self.vars[key][-1]}]"
# Add storm summary
summary.append("Storm Summary:")
add_space = np.max([len(key) for key in summary_keys.keys()])+3
for key in summary_keys.keys():
key_name = key+":"
summary.append(
f'{" "*4}{key_name:<{add_space}}{summary_keys[key]}')
# Add coordinates
summary.append("\nVariables:")
add_space = np.max([len(key) for key in variable_keys.keys()])+3
for key in variable_keys.keys():
key_name = key
summary.append(
f'{" "*4}{key_name:<{add_space}}{variable_keys[key]}')
# Add additional information
summary.append("\nMore Information:")
add_space = np.max([len(key) for key in self.attrs.keys()])+3
for key in self.attrs.keys():
key_name = key+":"
val = '%0.1f' % (
self.attrs[key]) if key == 'ace' else self.attrs[key]
summary.append(f'{" "*4}{key_name:<{add_space}}{val}')
return "\n".join(summary)
def __init__(self, storm, stormTors=None):
# Save the dict entry of the storm
self.dict = storm
# Add other attributes about the storm
keys = self.dict.keys()
self.attrs = {}
self.vars = {}
for key in keys:
if key == 'realtime':
continue
if not isinstance(self.dict[key], list) and not isinstance(self.dict[key], dict):
self[key] = self.dict[key]
self.attrs[key] = self.dict[key]
if isinstance(self.dict[key], list) and not isinstance(self.dict[key], dict):
self.vars[key] = np.array(self.dict[key])
self[key] = np.array(self.dict[key])
# Assign tornado data
if stormTors is not None and isinstance(stormTors, dict):
self.stormTors = stormTors['data']
self.tornado_dist_thresh = stormTors['dist_thresh']
self.attrs['Tornado Count'] = len(stormTors['data'])
# Get Archer track data for this storm, if it exists
try:
self.get_archer()
except:
pass
# Initialize recon dataset instance
self.recon = ReconDataset(storm=self)
# Determine if storm object was retrieved via realtime object
if 'realtime' in keys and self.dict['realtime']:
self.realtime = True
self.attrs['realtime'] = True
else:
self.realtime = False
self.attrs['realtime'] = False
[docs] def download_graphic_realtime(self, save_path=""):
r"""
Download the latest official forecast track graphic. Available for both NHC and JTWC sources.
Parameters
----------
save_path : str
Filepath to save the image in. If blank, default is current working directory.
"""
# Check to ensure storm is not an invest
if self.invest:
raise RuntimeError(
"Error: NHC does not issue advisories for invests that have not been designated as Potential Tropical Cyclones.")
# Determine data source
if self.source == 'hurdat':
part1 = f"AT{self.id[2:4]}" if self.id[0:2] == "AL" else self.id[0:4]
url = f"https://www.nhc.noaa.gov/storm_graphics/{part1}/{self.id}_5day_cone_with_line_and_wind.png"
else:
url = f"https://www.nrlmry.navy.mil/atcf_web/docs/current_storms/{self.id.lower()}.gif"
url_ext = url.split(".")[-1]
# Try to download file
if requests.get(url).status_code != 200:
raise RuntimeError(
"Official forecast graphic is unavailable for this storm.")
# Download file
response = requests.get(url)
full_path = os.path.join(save_path, f"Forecast_{self.id}.{url_ext}")
with open(full_path, 'wb') as f:
f.write(response.content)
[docs] def get_discussion_realtime(self):
r"""
Retrieve the latest available forecast discussion. For JTWC storms, the Prognostic Reasoning product is retrieved.
Returns
-------
dict
Dict entry containing the latest official forecast discussion.
"""
# Check to ensure storm is not an invest
if self.invest:
raise RuntimeError(
"Error: NHC does not issue advisories for invests that have not been designated as Potential Tropical Cyclones.")
# Get latest forecast discussion for HURDAT source storm objects
if self.source == "hurdat":
return self.get_nhc_discussion(forecast=-1)
# Get latest forecast discussion for JTWC source storm objects
elif self.source == 'jtwc':
# Read in discussion file
url = f"https://www.metoc.navy.mil/jtwc/products/{self.id[0:2].lower()}{self.id[2:4]}{self.id[6:8]}prog.txt"
f = urllib.request.urlopen(url)
content = f.read()
content = content.decode("utf-8")
f.close()
return content
# Otherwise, return error message
else:
msg = "No realtime forecast discussion is available for this storm."
raise RuntimeError(msg)
[docs] def get_forecast_realtime(self, ssl_certificate=True):
r"""
Retrieve a dictionary containing the latest official forecast. Available for both NHC and JTWC sources.
Parameters
----------
ssl_certificate : boolean, optional
If a JTWC forecast, this determines whether to disable SSL certificate when retrieving data from JTWC. Default is True. Use False *ONLY* if True causes an SSL certification error.
Returns
-------
dict
Dictionary containing the latest official forecast.
Notes
-----
This dictionary includes a calculation for accumulated cyclone energy (ACE), cumulatively for the storm's lifespan through each forecast hour. This is done by linearly interpolating the forecast to 6-hour intervals and calculating 6-hourly ACE at each interval. For storms where forecast tropical cyclone type is available, ACE is not calculated for forecast periods that are neither tropical nor subtropical.
"""
# Check to ensure storm is not an invest
if self.invest:
raise RuntimeError(
"Error: NHC does not issue advisories for invests that have not been designated as Potential Tropical Cyclones.")
# NHC forecast data
if self.source == 'hurdat':
# Get forecast for this storm
try:
content = read_url(
f"https://ftp.nhc.noaa.gov/atcf/fst/{self.id.lower()}.fst")
except:
try:
content = read_url(
f"ftp://ftp.nhc.noaa.gov/atcf/fst/{self.id.lower()}.fst")
except:
raise RuntimeError(
"NHC forecast data is unavailable for this storm.")
# Iterate through every line in content:
forecasts = {}
for line in content:
# Get basic components
lineArray = [i.replace(" ", "") for i in line]
if len(lineArray) < 11:
continue
try:
basin, number, run_init, n_a, model, fhr, lat, lon, vmax, mslp, stype, rad, windcode, neq, seq, swq, nwq = lineArray[:17]
use_wind = True
except:
basin, number, run_init, n_a, model, fhr, lat, lon, vmax, mslp, stype = lineArray[:11]
use_wind = False
if model not in ["OFCL", "OFCI"]:
continue
if len(forecasts) == 0:
forecasts = {
'init': dt.strptime(run_init, '%Y%m%d%H'),
'fhr': [],
'lat': [],
'lon': [],
'vmax': [],
'mslp': [],
'type': [],
'windrad': [],
'cumulative_ace': [],
'cumulative_ace_fhr': [],
}
# Format lat & lon
fhr = int(fhr)
if "N" in lat:
lat_temp = lat.split("N")[0]
lat = np.round(float(lat_temp) * 0.1, 1)
elif "S" in lat:
lat_temp = lat.split("S")[0]
lat = np.round(float(lat_temp) * -0.1, 1)
if "W" in lon:
lon_temp = lon.split("W")[0]
lon = np.round(float(lon_temp) * -0.1, 1)
elif "E" in lon:
lon_temp = lon.split("E")[0]
lon = np.round(float(lon_temp) * 0.1, 1)
# Format vmax & MSLP
if vmax == '':
vmax = np.nan
else:
vmax = int(vmax)
if vmax < 10 or vmax > 300:
vmax = np.nan
if mslp == '':
mslp = np.nan
else:
mslp = int(mslp)
if mslp < 1:
mslp = np.nan
# Format wind radii
if use_wind:
try:
rad = int(rad)
if rad in [0, 35]:
rad = 34
neq = np.nan if windcode == '' else int(neq)
seq = np.nan if windcode in ['', 'AAA'] else int(seq)
swq = np.nan if windcode in ['', 'AAA'] else int(swq)
nwq = np.nan if windcode in ['', 'AAA'] else int(nwq)
except:
rad = 34
neq = np.nan
seq = np.nan
swq = np.nan
nwq = np.nan
else:
rad = 34
neq = np.nan
seq = np.nan
swq = np.nan
nwq = np.nan
# Add forecast data to dict if forecast hour isn't already there
if fhr not in forecasts['fhr']:
if model in ['OFCL', 'OFCI'] and fhr > 120:
pass
else:
if lat == 0.0 and lon == 0.0:
continue
forecasts['fhr'].append(fhr)
forecasts['lat'].append(lat)
forecasts['lon'].append(lon)
forecasts['vmax'].append(vmax)
forecasts['mslp'].append(mslp)
forecasts['windrad'].append(
{rad: [neq, seq, swq, nwq]})
# Get storm type, if it can be determined
if stype in ['', 'DB'] and vmax != 0 and not np.isnan(vmax):
stype = get_storm_type(vmax, False)
forecasts['type'].append(stype)
else:
ifhr = forecasts['fhr'].index(fhr)
forecasts['windrad'][ifhr][rad] = [neq, seq, swq, nwq]
# Retrieve JTWC forecast otherwise
else:
# Get forecast for this storm
if self.jtwc_source in ['jtwc', 'ucar']:
url = f"https://www.nrlmry.navy.mil/atcf_web/docs/current_storms/{self.id.lower()}.sum"
else:
url = f"https://www.ssd.noaa.gov/PS/TROP/DATA/ATCF/JTWC/{self.id.lower()}.fst"
if not ssl_certificate and self.jtwc_source in ['jtwc', 'ucar']:
import ssl
if requests.get(url, verify=False).status_code != 200:
raise RuntimeError(
"JTWC forecast data is unavailable for this storm.")
else:
if requests.get(url).status_code != 200:
raise RuntimeError(
"JTWC forecast data is unavailable for this storm.")
# Read file content
if not ssl_certificate and self.jtwc_source in ['jtwc', 'ucar']:
import ssl
f = urllib.request.urlopen(
url, context=ssl._create_unverified_context())
else:
f = urllib.request.urlopen(url)
content = f.read()
content = content.decode("utf-8")
content = content.split("\n")
f.close()
# Find starting index
start_idx = 0
for idx, line in enumerate(content):
lineArray = line.split(" ")
if 'WARNING' in lineArray[0] and len(lineArray) > 2:
start_idx = idx
break
if self.jtwc_source in ['jtwc', 'ucar']:
# Iterate through every line in content:
run_init = content[start_idx+2].split(" ")[0]
forecasts = {}
for line in content[start_idx+3:]:
# Exit once done retrieving forecast
if line == "AMP":
break
# Get basic components
lineArray = line.split(" ")
if len(lineArray) < 4:
continue
# Exit once done retrieving forecast
if lineArray[0] == "AMP":
break
if len(forecasts) == 0:
forecasts = {
'init': dt.strptime(run_init, '%Y%m%d%H'),
'fhr': [],
'lat': [],
'lon': [],
'vmax': [],
'mslp': [],
'windrad': [],
'cumulative_ace': [],
'cumulative_ace_fhr': [],
'type': []
}
# Forecast hour
fhr = int(lineArray[0].split("T")[1])
# Format lat & lon
lat = lineArray[1]
lon = lineArray[2]
if "N" in lat:
lat_temp = lat.split("N")[0]
lat = np.round(float(lat_temp) * 0.1, 1)
elif "S" in lat:
lat_temp = lat.split("S")[0]
lat = np.round(float(lat_temp) * -0.1, 1)
if "W" in lon:
lon_temp = lon.split("W")[0]
lon = np.round(float(lon_temp) * -0.1, 1)
elif "E" in lon:
lon_temp = lon.split("E")[0]
lon = np.round(float(lon_temp) * 0.1, 1)
# Format vmax & MSLP
vmax = int(lineArray[3])
if vmax < 10 or vmax > 300:
vmax = np.nan
mslp = np.nan
# Format wind radii
windrad = {}
for rad in (34, 50, 64):
try:
irad = list(lineArray).index(f'R{rad:03}')
windrad[rad] = [int(lineArray[irad+j])
for j in (1, 4, 7, 10)]
except:
continue
# Add forecast data to dict if forecast hour isn't already there
if fhr not in forecasts['fhr']:
if lat == 0.0 and lon == 0.0:
continue
forecasts['fhr'].append(fhr)
forecasts['lat'].append(lat)
forecasts['lon'].append(lon)
forecasts['vmax'].append(vmax)
forecasts['mslp'].append(mslp)
forecasts['windrad'].append(windrad)
# Get storm type, if it can be determined
stype = get_storm_type(vmax, False)
forecasts['type'].append(stype)
else:
content = [(i.replace(" ", "")).split(",") for i in content]
# Iterate through every line in content:
forecasts = {}
for line in content:
# Get basic components
lineArray = [i.replace(" ", "") for i in line]
if len(lineArray) < 11:
continue
try:
basin, number, run_init, n_a, model, fhr, lat, lon, vmax, mslp, stype, rad, windcode, neq, seq, swq, nwq = lineArray[ :17]
use_wind = True
except:
basin, number, run_init, n_a, model, fhr, lat, lon, vmax, mslp, stype = lineArray[:11]
use_wind = False
if model not in ["JTWC"]:
continue
if len(forecasts) == 0:
forecasts = {
'init': dt.strptime(run_init, '%Y%m%d%H'),
'fhr': [],
'lat': [],
'lon': [],
'vmax': [],
'mslp': [],
'type': [],
'windrad': [],
'cumulative_ace': [],
'cumulative_ace_fhr': []
}
# Format lat & lon
fhr = int(fhr)
if "N" in lat:
lat_temp = lat.split("N")[0]
lat = np.round(float(lat_temp) * 0.1, 1)
elif "S" in lat:
lat_temp = lat.split("S")[0]
lat = np.round(float(lat_temp) * -0.1, 1)
if "W" in lon:
lon_temp = lon.split("W")[0]
lon = np.round(float(lon_temp) * -0.1, 1)
elif "E" in lon:
lon_temp = lon.split("E")[0]
lon = np.round(float(lon_temp) * 0.1, 1)
# Format vmax & MSLP
if vmax == '':
vmax = np.nan
else:
vmax = int(vmax)
if vmax < 10 or vmax > 300:
vmax = np.nan
if mslp == '':
mslp = np.nan
else:
mslp = int(mslp)
if mslp < 1:
mslp = np.nan
# Format wind radii
if use_wind:
try:
rad = int(rad)
if rad in [0, 35]:
rad = 34
neq = np.nan if windcode == '' else int(neq)
seq = np.nan if windcode in [
'', 'AAA'] else int(seq)
swq = np.nan if windcode in [
'', 'AAA'] else int(swq)
nwq = np.nan if windcode in [
'', 'AAA'] else int(nwq)
except:
rad = 34
neq = np.nan
seq = np.nan
swq = np.nan
nwq = np.nan
else:
rad = 34
neq = np.nan
seq = np.nan
swq = np.nan
nwq = np.nan
# Add forecast data to dict if forecast hour isn't already there
if fhr not in forecasts['fhr']:
if model in ['OFCL', 'OFCI'] and fhr > 120:
pass
else:
if lat == 0.0 and lon == 0.0:
continue
forecasts['fhr'].append(fhr)
forecasts['lat'].append(lat)
forecasts['lon'].append(lon)
forecasts['vmax'].append(vmax)
forecasts['mslp'].append(mslp)
forecasts['windrad'].append(
{rad: [neq, seq, swq, nwq]})
# Get storm type, if it can be determined
if stype in ['', 'DB'] and vmax != 0 and not np.isnan(vmax):
stype = get_storm_type(vmax, False)
forecasts['type'].append(stype)
else:
ifhr = forecasts['fhr'].index(fhr)
forecasts['windrad'][ifhr][rad] = [neq, seq, swq, nwq]
# Determine ACE thus far (prior to initial forecast hour)
ace = 0.0
for i in range(len(self.time)):
if self.time[i] >= forecasts['init']:
continue
if self.type[i] not in constants.NAMED_TROPICAL_STORM_TYPES:
continue
ace += accumulated_cyclone_energy(self.vmax[i], hours=6)
# Add initial forecast hour ACE
ace += accumulated_cyclone_energy(forecasts['vmax'][0], hours=6)
forecasts['cumulative_ace_fhr'].append(0)
forecasts['cumulative_ace'].append(np.round(ace, 1))
# Interpolate forecast to 6-hour increments
def temporal_interpolation(value, orig_times, target_times, kind='linear'):
f = interp.interp1d(orig_times, value, kind=kind,
fill_value='extrapolate')
ynew = f(target_times)
return ynew
# Construct a 6-hour time range
interp_fhr = range(0, forecasts['fhr'][-1]+1, 6)
interp_vmax = temporal_interpolation(
forecasts['vmax'], forecasts['fhr'], interp_fhr)
# Interpolate storm type
interp_type = []
for dummy_i, (i_hour, i_vmax) in enumerate(zip(interp_fhr, interp_vmax)):
use_i = 0
for i in range(len(forecasts['fhr'])):
if forecasts['fhr'][i] > i_hour:
break
use_i = int(i + 0.0)
i_type = forecasts['type'][use_i]
if i_type in constants.TROPICAL_STORM_TYPES:
i_type = get_storm_type(i_vmax, False)
interp_type.append(i_type)
# Add forecast ACE
for i, (i_fhr, i_vmax, i_type) in enumerate(zip(interp_fhr[1:], interp_vmax[1:], interp_type[1:])):
# Add ACE if storm is a TC
if i_type in constants.NAMED_TROPICAL_STORM_TYPES:
ace += accumulated_cyclone_energy(i_vmax, hours=6)
# Add ACE to array
if i_fhr in forecasts['fhr']:
forecasts['cumulative_ace'].append(np.round(ace, 1))
forecasts['cumulative_ace_fhr'].append(i_fhr)
# Save forecast as attribute
self.latest_forecast = forecasts
return self.latest_forecast
[docs] def plot_forecast_realtime(self, track_labels='fhr', cone_days=5, domain="dynamic_forecast",
ax=None, cartopy_proj=None, save_path=None, ssl_certificate=True, **kwargs):
r"""
Plots the latest available official forecast. Available for both NHC and JTWC sources.
Parameters
----------
track_labels : str
Label format for forecast dots. Scroll down to notes for available options.
cone_days : int
Number of days to plot the forecast cone. Default is 5 days. Can select 2, 3, 4 or 5 days.
domain : str
Domain for the plot. Default is "dynamic_forecast". Please refer to :ref:`options-domain` for available domain options.
ax : axes
Instance of axes to plot on. If none, one will be generated. Default is none.
cartopy_proj : ccrs
Instance of a cartopy projection to use. If none, one will be generated. Default is none.
save_path : str
Relative or full path of directory to save the image in. If none, image will not be saved.
Other Parameters
----------------
ssl_certificate : boolean, optional
If a JTWC forecast, this determines whether to disable SSL certificate when retrieving data from JTWC. Default is True. Use False *ONLY* if True causes an SSL certification error.
prop : dict
Customization properties of NHC forecast plot. Please refer to :ref:`options-prop-nhc` or scroll down below for available options.
map_prop : dict
Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options.
Returns
-------
ax
Instance of axes containing the plot is returned.
Notes
-----
The following properties are available for customizing ``track_labels``.
.. list-table::
:widths: 25 75
:header-rows: 1
* - Value
- Description
* - ``""``
- No label
* - ``"fhr"``
- Forecast hour (e.g., 60). Default option.
* - ``"fhr_wind_mph"``
- Forecast hour and sustained wind in mph.
* - ``"fhr_wind_kt"``
- Forecast hour and sustained wind in knots.
* - ``"valid_utc"``
- Valid time in UTC.
* - ``"valid_edt"``
- Valid time in Eastern time (EDT).
* - ``"valid_cdt"``
- Valid time in Central time (CDT).
* - ``"valid_mdt"``
- Valid time in Mountain time (MDT).
* - ``"valid_pdt"``
- Valid time in Pacific time (PDT).
* - ``"valid_hst"``
- Valid time in Hawaii time (HST).
The following properties are available for plotting NHC forecasts, via ``prop``.
.. list-table::
:widths: 25 75
:header-rows: 1
* - Property
- Description
* - cone_lw
- Center line width for the cone of uncertainty. Default is 2.0.
* - cone_alpha
- Transparency for the cone of uncertainty. Default is 0.6.
* - cone_res
- Grid resolution for the cone of uncertainty in degrees. Default is 0.05.
"""
# Retrieve kwargs
prop = kwargs.pop('prop', {})
map_prop = kwargs.pop('map_prop', {})
# Check to ensure storm is not an invest
if self.invest:
raise RuntimeError(
"Error: NHC does not issue advisories for invests that have not been designated as Potential Tropical Cyclones.")
# Create instance of plot object
try:
self.plot_obj
except:
self.plot_obj = TrackPlot()
# Create cartopy projection
if cartopy_proj is None:
if max(self.dict['lon']) > 130 or min(self.dict['lon']) < -130:
self.plot_obj.create_cartopy(
proj='PlateCarree', central_longitude=180.0)
else:
self.plot_obj.create_cartopy(
proj='PlateCarree', central_longitude=0.0)
# Get forecast for this storm
try:
nhc_forecasts = (self.latest_forecast).copy()
except:
nhc_forecasts = self.get_forecast_realtime(
ssl_certificate=ssl_certificate)
# Add other info to forecast dict
nhc_forecasts['advisory_num'] = -1
nhc_forecasts['basin'] = self.basin
if self.source != "hurdat":
nhc_forecasts['cone'] = False
# Plot storm
plot_ax = self.plot_obj.plot_storm_nhc(
nhc_forecasts, self.dict, track_labels, cone_days, domain, ax=ax, save_path=save_path, prop=prop, map_prop=map_prop)
# Return axis
return plot_ax
[docs] def get_realtime_info(self, source='all'):
r"""
Returns a dict containing the latest available information about the storm. This function uses NHC Public Advisories, so it will differ from available Best Track data.
Parameters
----------
source : str
Data source to use. Default is "all". Available options are:
* **"all"** = Latest from either public advisory or best track. Both NHC & JTWC.
* **"public_advisory"** = Latest public advisory. NHC only.
* **"best_track"** = Latest Best Track file data. Both NHC & JTWC.
Returns
-------
dict
Dictionary containing current storm information.
"""
# Error check
if not isinstance(source, str):
msg = "\"source\" must be of type str."
raise TypeError(msg)
if source not in ['all', 'public_advisory', 'best_track']:
msg = "\"source\" must be 'all', 'public_advisory', or 'best_track'."
raise ValueError(msg)
if source == 'public_advisory' and self.source != 'hurdat':
msg = "A source of 'public_advisory' can only be used for storms in NHC's area of responsibility."
raise RuntimeError(msg)
# Check to ensure storm is not an invest
if self.invest:
if self.source == 'hurdat':
msg = "NHC does not issue public advisories on invests. Defaulting to best track method."
warnings.warn(msg)
source = 'best_track'
# Declare empty dict
current_advisory = {}
# If source is all, determine which method to use
if source == 'all':
if self.source == 'hurdat':
# Check to see which is the latest advisory
latest_btk = self.time[-1]
# Get latest available public advisory
advisory_found = False
try:
content = read_url(
f"https://ftp.nhc.noaa.gov/atcf/adv/{self.id.lower()}_info.xml", subsplit=False)
advisory_found = True
except:
try:
content = read_url(
f"ftp://ftp.nhc.noaa.gov/atcf/adv/{self.id.lower()}_info.xml", subsplit=False)
advisory_found = True
except:
pass
# Get UTC time of advisory
if advisory_found:
results = [i for i in content if 'messageDateTimeUTC' in i][0]
result = (results.split(">")[1]).split("<")[0]
latest_advisory = dt.strptime(result, '%Y%m%d %I:%M:%S %p UTC')
# Check which one to use
if latest_btk > latest_advisory:
source = 'best_track'
else:
source = 'public_advisory'
else:
source = 'best_track'
else:
source = 'best_track'
# If public advisory, retrieve this data
if source == 'public_advisory':
# Add source
current_advisory['source'] = 'NHC Public Advisory'
# Get latest available public advisory
try:
content = read_url(
f"https://ftp.nhc.noaa.gov/atcf/adv/{self.id.lower()}_info.xml", subsplit=False)
except:
content = read_url(
f"ftp://ftp.nhc.noaa.gov/atcf/adv/{self.id.lower()}_info.xml", subsplit=False)
# Get public advisory number
results = [i for i in content if 'advisoryNumber' in i][0]
result = (results.split(">")[1]).split("<")[0]
current_advisory['advisory_number'] = result
# Get UTC time of advisory
results = [i for i in content if 'messageDateTimeUTC' in i][0]
result = (results.split(">")[1]).split("<")[0]
result = dt.strptime(result, '%Y%m%d %I:%M:%S %p UTC')
current_advisory['time_utc'] = result
# Get storm type
results = [i for i in content if 'systemType' in i][0]
result = (results.split(">")[1]).split("<")[0]
current_advisory['type'] = result.title()
# Get storm name
results = [i for i in content if 'systemName' in i][0]
result = (results.split(">")[1]).split("<")[0]
current_advisory['name'] = result.title()
# Get coordinates
results = [i for i in content if 'centerLocLatitude' in i][0]
result = (results.split(">")[1]).split("<")[0]
current_advisory['lat'] = float(result)
results = [i for i in content if 'centerLocLongitude' in i][0]
result = (results.split(">")[1]).split("<")[0]
current_advisory['lon'] = float(result)
# Get sustained wind speed
results = [i for i in content if 'systemIntensityMph' in i][0]
result = (results.split(">")[1]).split("<")[0]
current_advisory['wind_mph'] = int(result)
results = [i for i in content if 'systemIntensityKph' in i][0]
result = (results.split(">")[1]).split("<")[0]
current_advisory['wind_kph'] = int(result)
results = [i for i in content if 'systemIntensityKts' in i][0]
result = (results.split(">")[1]).split("<")[0]
current_advisory['wind_kt'] = int(result)
# Get MSLP
results = [i for i in content if 'systemMslpMb' in i][0]
result = (results.split(">")[1]).split("<")[0]
current_advisory['mslp'] = int(result)
# Get storm category
current_advisory['category'] = wind_to_category(
current_advisory['wind_kt'])
# Get storm direction
results = [i for i in content if 'systemDirectionOfMotion' in i][0]
result = (results.split(">")[1]).split("<")[0]
current_advisory['motion_direction'] = result.split(" OR ")[0]
results = [i for i in content if 'systemDirectionOfMotion' in i][0]
result = (results.split(">")[1]).split("<")[0]
try:
current_advisory['motion_direction_degrees'] = int(
(result.split(" OR ")[1]).split(" DEGREES")[0])
except:
current_advisory['motion_direction_degrees'] = 0
# Get storm speed
results = [i for i in content if 'systemSpeedMph' in i][0]
result = (results.split(">")[1]).split("<")[0]
current_advisory['motion_mph'] = int(result)
results = [i for i in content if 'systemSpeedKph' in i][0]
result = (results.split(">")[1]).split("<")[0]
current_advisory['motion_kph'] = int(result)
results = [i for i in content if 'systemSpeedKts' in i][0]
result = (results.split(">")[1]).split("<")[0]
current_advisory['motion_kt'] = int(result)
# Best track data
else:
# Add source
if self.source == 'hurdat':
current_advisory['source'] = 'NHC Best Track'
else:
current_advisory['source'] = 'JTWC Best Track'
# Get public advisory number
current_advisory['advisory_number'] = 'n/a'
# Get UTC time of advisory
current_advisory['time_utc'] = self.time[-1]
# Get storm type
subtrop_flag = self.type[-1] in constants.SUBTROPICAL_ONLY_STORM_TYPES
current_advisory['type'] = get_storm_classification(
self.vmax[-1], subtrop_flag, self.wmo_basin[-1])
# Check for non-tropical storm types
if self.type[-1] not in constants.TROPICAL_STORM_TYPES:
if all(type not in self.type for type in constants.TROPICAL_STORM_TYPES):
if self.invest:
current_advisory['type'] = 'Invest'
else:
current_advisory['type'] = 'Potential Tropical Cyclone'
else:
current_advisory['type'] = 'Post-Tropical Cyclone'
elif self.source in ['jtwc', 'ucar']:
if self.invest:
current_advisory['type'] = 'Invest'
# Get storm name
current_advisory['name'] = self.name.title()
# Get coordinates
current_advisory['lat'] = self.lat[-1]
current_advisory['lon'] = self.lon[-1]
# Get sustained wind speed
if np.isnan(self.vmax[-1]):
current_advisory['wind_mph'] = np.nan
current_advisory['wind_kph'] = np.nan
current_advisory['wind_kt'] = np.nan
else:
current_advisory['wind_mph'] = knots_to_mph(self.vmax[-1])
current_advisory['wind_kph'] = int(self.vmax[-1] * 1.852)
current_advisory['wind_kt'] = self.vmax[-1]
# Get MSLP
current_advisory['mslp'] = np.nan if np.isnan(self.mslp[-1]) else int(self.mslp[-1])
# Get storm category
current_advisory['category'] = wind_to_category(
current_advisory['wind_kt'])
# Determine motion direction and degrees
try:
# Cannot calculate motion if there's only one data point
if len(self.lon) == 1:
# Get storm direction
current_advisory['motion_direction'] = 'n/a'
current_advisory['motion_direction_degrees'] = 'n/a'
# Get storm speed
current_advisory['motion_mph'] = 'n/a'
current_advisory['motion_kph'] = 'n/a'
current_advisory['motion_kt'] = 'n/a'
# Otherwise, use great_circle to calculate
else:
# Get points
start_point = (self.lat[-2], self.lon[-2])
end_point = (self.lat[-1], self.lon[-1])
# Get time since last update
hour_diff = (
self.time[-1] - self.time[-2]).total_seconds() / 3600.0
# Calculate zonal and meridional position change in km
x = great_circle(
(self.lat[-2], self.lon[-2]), (self.lat[-2], self.lon[-1])).kilometers
if self.lon[-1] < self.lon[-2]:
x = x * -1
y = great_circle(
(self.lat[-2], self.lon[-2]), (self.lat[-1], self.lon[-2])).kilometers
if self.lat[-1] < self.lat[-2]:
y = y * -1
# Calculate motion direction vector
idir = np.degrees(np.arctan2(x, y))
if idir < 0:
idir += 360.0
# Calculate motion direction string
def deg_str(d):
dirs = ["N", "NNE", "NE", "ENE", "E", "ESE", "SE", "SSE",
"S", "SSW", "SW", "WSW", "W", "WNW", "NW", "NNW"]
ix = int((d + 11.25)/22.5)
return dirs[ix % 16]
dirs = deg_str(idir)
# Update storm direction
current_advisory['motion_direction'] = dirs
current_advisory['motion_direction_degrees'] = int(
np.round(idir, 0))
# Get storm speed
current_advisory['motion_mph'] = int(
np.round(great_circle(start_point, end_point).miles / float(hour_diff), 0))
current_advisory['motion_kph'] = int(np.round(great_circle(
start_point, end_point).kilometers / float(hour_diff), 0))
current_advisory['motion_kt'] = int(
np.round(current_advisory['motion_mph'] * 0.868976, 0))
# Otherwise, can't calculate motion
except:
# Get storm direction
current_advisory['motion_direction'] = 'n/a'
current_advisory['motion_direction_degrees'] = 'n/a'
# Get storm speed
current_advisory['motion_mph'] = 'n/a'
current_advisory['motion_kph'] = 'n/a'
current_advisory['motion_kt'] = 'n/a'
# Return dict
return current_advisory
def __get_public_advisory(self):
# Get list of all public advisories for this storm
url_disco = 'https://ftp.nhc.noaa.gov/atcf/pub/'
try:
page = requests.get(url_disco).text
content = page.split("\n")
files = []
for line in content:
if ".public" in line and self.id.lower() in line:
filename = line.split('">')[1]
filename = filename.split("</a>")[0]
files.append(filename)
del content
except:
ftp = FTP('ftp.nhc.noaa.gov')
ftp.login()
ftp.cwd('atcf/pub')
files = ftp.nlst()
out = ftp.quit()
# Keep only largest number
numbers = [int(i.split(".")[-1]) for i in files]
max_number = np.nanmax(numbers)
if max_number >= 100:
max_number = str(max_number)
elif max_number >= 10:
max_number = f"0{max_number}"
else:
max_number = f"00{max_number}"
files = [i for i in files if f".{max_number}" in i]
# Determine if there's an intermediate advisory available
if len(files) > 1:
advisory_letter = []
for file in files:
if 'public_' in file:
letter = (file.split("public_")[1]).split(".")[0]
advisory_letter.append(letter)
max_letter = max(advisory_letter)
files = [i for i in files if f".public_{max_letter}" in i]
# Read file containing advisory
content = read_url(url_disco + files[0], subsplit=False)
# Figure out time issued
hr = content[6].split(" ")[0]
zone = content[6].split(" ")[2]
disco_time = num_to_str2(int(hr)) + ' '.join(content[6].split(" ")[1:])
format_time = content[6].split(" ")[0]
if len(format_time) == 3:
format_time = "0" + format_time
format_time = format_time + " " + ' '.join(content[6].split(" ")[1:])
disco_time = dt.strptime(format_time, f'%I00 %p {zone} %a %b %d %Y')
time_zones = {
'ADT': -3,
'AST': -4,
'EDT': -4,
'EST': -5,
'CDT': -5,
'CST': -6,
'MDT': -6,
'MST': -7,
'PDT': -7,
'PST': -8,
'HDT': -9,
'HST': -10
}
offset = time_zones.get(zone, 0)
disco_time = disco_time + timedelta(hours=offset*-1)
[docs] def get_ships_realtime(self):
r"""
Retrieves a Ships object containing the latest SHIPS data.
Returns
-------
tropycal.ships.Ships
Instance of a Ships object containing the latest SHIPS data.
Notes
-----
1. While the SHIPS data source used for ``Storm.get_ships()`` updates on a frequent basis, it may lag slightly behind the latest available data on the NHC ATCF server, especially for newly formed storms. This function helps to mitigate this slight delay.
2. On rare occasions, SHIPS data files from UCAR have empty data associated with them. In these cases, a value of None is returned.
"""
reformatted_id = f'{self.id[:-4]}{self.id[-2:]}'
# Retrieve list of SHIPS files from NHC ATCF
try:
page = requests.get('https://ftp.nhc.noaa.gov/atcf/stext/').text
except:
raise RuntimeError('Unable to fetch SHIPS data from NHC ATCF.')
content = page.split("\n")
files = []
for line in content:
if '<a href="' in line and '_ships.txt' in line:
filename = (line.split('<a href="')[1]).split('">')[0]
# Add entries definitely associated with this storm
if reformatted_id == filename.split('_ships')[0][-6:]:
files.append(filename)
# Organize by date
sorted_files = sorted([dt.strptime(i[:8],'%y%m%d%H') for i in files])
# Fetch latest file
filename = f'{sorted_files[-1].strftime("%y%m%d%H")}{reformatted_id}_ships.txt'
content = read_url(f'https://ftp.nhc.noaa.gov/atcf/stext/{filename}', split=False, subsplit=False)
# Make sure file has content
if len(content) < 10:
warnings.warn('Improper SHIPS entry for this time. Returning a value of None.')
return None
# Return Ships instance
return Ships(content)
def __get_ensembles_eps(self):
# This function is currently not functioning. The path to retrieve EPS ensemble data is:
# ftp://wmo:essential@dissemination.ecmwf.int/20200518120000/
# A_JSXX01ECEP181200_C_ECMP_20200518120000_tropical_cyclone_track_AMPHAN_86p3degE_14degN_bufr4.bin
return