Source code for tropycal.recon.dataset

import os
import numpy as np
from datetime import datetime as dt, timedelta
import pandas as pd
import requests
import pickle
import copy
import urllib3

from scipy.interpolate import interp1d
from scipy.ndimage import gaussian_filter1d as gfilt1d
from scipy.ndimage import minimum_filter
import matplotlib.dates as mdates

try:
    import matplotlib.colors as mcolors
    import matplotlib.pyplot as plt
    import cartopy.crs as ccrs
except ImportError:
    warnings.warn(
        "Warning: Matplotlib is not installed in your python environment. Plotting functions will not work.")

from .plot import *
from ..plot import Plot
from .realtime import Mission

# Import tools
from .tools import *
from ..utils import *


[docs]class ReconDataset: r""" Creates an instance of a ReconDataset object containing all recon data for a single storm. Parameters ---------- storm : tropycal.tracks.Storm Requested Storm object. Returns ------- ReconDataset An instance of ReconDataset. Notes ----- .. warning:: Recon data is currently only available from 1989 onwards. ReconDataset and its subclasses (hdobs, dropsondes and vdms) consist the **storm-centric** part of the recon module, meaning that recon data is retrieved specifically for tropical cyclones, and all recon missions for the requested storm are additionally transformed to storm-centric coordinates. This differs from realtime recon functionality, which is **mission-centric**. This storm-centric functionality allows for additional recon analysis and visualization functions, such as derived hovmollers and spatial maps for example. As of Tropycal v0.4, Recon data can only be retrieved for tropical cyclones, not for invests. ReconDataset will contain nothing the first time it's initialized, but contains methods to retrieve the three sub-classes of recon: .. list-table:: :widths: 25 75 :header-rows: 1 * - Class - Description * - hdobs - Class containing all High Density Observations (HDOBs) for this Storm. * - dropsondes - Class containing all dropsondes for this Storm. * - vdms - Class containing all Vortex Data Messages (VDMs) for this Storm. Each of these sub-classes can be initialized as a sub-class of ReconDataset as follows. Note that this may take some time, especially for storms with many recon missions. .. code-block:: python #Retrieve Hurricane Michael (2018) from TrackDataset basin = tracks.TrackDataset() storm = basin.get_storm(('michael',2018)) #Retrieve all HDOBs for this storm storm.recon.get_hdobs() #Retrieve all dropsondes for this storm storm.recon.get_dropsondes() #Retrieve all VDMs for this storm storm.recon.get_vdms() Once this data has been read in, these subclasses and their associated methods and attributes can be accessed from within the `recon` object as follows, using HDOBs for example: .. code-block:: python #Retrieve Pandas DataFrame of all HDOB observations storm.recon.hdobs.data #Plot all HDOB points storm.recon.hdobs.plot_points() #Plot derived hovmoller from HDOB points storm.recon.hdobs.plot_hovmoller() Individual missions can also be retrieved as ``Mission`` objects. For example, this code retrieves a ``Mission`` object for the second mission of this storm: .. code-block:: python #This line prints all available mission IDs from this storm print(storm.recon.find_mission()) #This line retrieves the 2nd mission from this storm mission = storm.recon.get_mission(2) """ def __repr__(self): info = [] for name in ['hdobs', 'dropsondes', 'vdms']: try: info.append(self.__dict__[name].__repr__()) except: info.append('') return '\n'.join(info) def __init__(self, storm): self.source = 'https://www.nhc.noaa.gov/archive/recon/' self.storm = storm
[docs] def get_hdobs(self, data=None): r""" Retrieve High Density Observations (HDOBs) for this storm. Parameters ---------- data : str, optional String representing the path of a pickle file containing HDOBs data, saved via ``hdobs.to_pickle()``. If none, data is read from NHC. Notes ----- This function has no return value, but stores the resulting HDOBs object within this ReconDataset instance. All of its methods can then be accessed as follows, for the following example storm: .. code-block:: python from tropycal import tracks #Read basin dataset basin = tracks.TrackDataset() #Read storm object storm = basin.get_storm(('michael',2018)) #Read hdobs data storm.recon.get_hdobs() #Plot all HDOB points storm.recon.hdobs.plot_points() """ self.hdobs = hdobs(self.storm, data)
[docs] def get_dropsondes(self, data=None): r""" Retrieve dropsondes for this storm. Parameters ---------- data : str, optional String representing the path of a pickle file containing dropsonde data, saved via ``dropsondes.to_pickle()``. If none, data is read from NHC. Notes ----- This function has no return value, but stores the resulting dropsondes object within this ReconDataset instance. All of its methods can then be accessed as follows, for the following example storm: .. code-block:: python from tropycal import tracks #Read basin dataset basin = tracks.TrackDataset() #Read storm object storm = basin.get_storm(('michael',2018)) #Read dropsondes data storm.recon.get_dropsondes() #Plot all dropsonde points storm.recon.dropsondes.plot_points() """ self.dropsondes = dropsondes(self.storm, data)
[docs] def get_vdms(self, data=None): r""" Retrieve Vortex Data Messages (VDMs) for this storm. Parameters ---------- data : str, optional String representing the path of a pickle file containing VDM data, saved via ``vdms.to_pickle()``. If none, data is read from NHC. Notes ----- This function has no return value, but stores the resulting VDMs object within this ReconDataset instance. All of its methods can then be accessed as follows, for the following example storm: .. code-block:: python from tropycal import tracks #Read basin dataset basin = tracks.TrackDataset() #Read storm object storm = basin.get_storm(('michael',2018)) #Read VDM data storm.recon.get_vdms() #Plot all VDM points storm.recon.vdms.plot_points() """ self.vdms = vdms(self.storm, data)
[docs] def get_mission(self, number): r""" Retrieve a Mission object for a given mission number for this storm. Parameters ---------- number : int or str Requested mission number. Can be an integer (1) or a string with two characters ("01"). Returns ------- Mission Instance of a Mission object for the requested mission. """ def str2(number): if isinstance(number, str): return number if number < 10: return f"0{number}" return str(number) # Automatically retrieve data if not already available try: self.vdms except: self.get_vdms() try: self.hdobs except: self.get_hdobs() try: self.dropsondes except: self.get_dropsondes() # Search through all missions to find the full mission ID missions = [] for mission in np.unique(self.hdobs.data['mission']): try: missions.append(int(mission)) except: pass missions = list(np.sort(missions)) if isinstance(number, str): missions = [str2(i) for i in missions] if number not in missions: raise ValueError("Requested mission ID is not available.") # Retrieve data for mission hdobs_mission = self.hdobs.data.loc[self.hdobs.data['mission'] == str2( number)] mission_id = hdobs_mission['mission_id'].values[0] vdms_mission = [ i for i in self.vdms.data if i['mission_id'] == mission_id] dropsondes_mission = [ i for i in self.dropsondes.data if i['mission_id'] == mission_id] mission_dict = { 'hdobs': hdobs_mission, 'vdms': vdms_mission, 'dropsondes': dropsondes_mission, 'aircraft': mission_id.split("-")[0], 'storm_name': mission_id.split("-")[2] } # Get sources try: sources = list( np.unique([self.vdms.source, self.hdobs.source, self.dropsondes.source])) if len(sources) == 1: sources = sources[0] except: sources = 'National Hurricane Center (NHC)' mission_dict['source'] = sources return Mission(mission_dict, mission_id)
[docs] def update(self): r""" Update with the latest data for an ongoing storm. Notes ----- This function has no return value, but simply updates all existing sub-classes of ReconDataset. """ for name in ['hdobs', 'dropsondes', 'vdms']: try: self.__dict__[name].update() except: print(f'No {name} object to update')
[docs] def get_track(self, time=None): r""" Retrieve coordinates of recon track for one or more times. Parameters ---------- time : datetime.datetime or list, optional Datetime object or list of datetime objects representing the requested time. Returns ------- tuple (lon,lat) coordinates. Notes ----- The track from which coordinate(s) are returned is generated by an optimal combination of Best Track and Recon (VDMs and/or HDOBs) tracks. """ if time is None or 'trackfunc' not in self.__dict__.keys(): btk = self.storm.to_dataframe()[['time', 'lon', 'lat']] try: if 'vdms' not in self.__dict__.keys(): print('Getting VDMs for track') self.get_vdms() rec = pd.DataFrame( [{k: d[k] for k in ('time', 'lon', 'lat')} for d in self.vdms.data]) except: try: rec = storm.recon.hdobs.sel(iscenter=1).data except: rec = None if rec is None: track = copy.copy(btk) else: track = copy.copy(rec) for i, row in btk.iterrows(): if min(abs(row['time'] - rec['time'])) > timedelta(hours=3): track.loc[len(track.index)] = row track = track.sort_values(by='time').reset_index(drop=True) # Interpolate center position to time of each ob datenum = [mdates.date2num(t) for t in track['time']] f1 = interp1d( datenum, track['lon'], fill_value='extrapolate', kind='quadratic') f2 = interp1d( datenum, track['lat'], fill_value='extrapolate', kind='quadratic') self.trackfunc = (f1, f2) if time is not None: datenum = [] for t in time: try: datenum.append(mdates.date2num(t)) except: datenum.append(np.nan) track = tuple([f(datenum) for f in self.trackfunc]) return track
[docs] def find_mission(self, time=None, distance=None): r""" Returns the name of a mission or list of recon missions for this storm. Parameters ---------- time : datetime.datetime or list, optional Datetime object or list of datetime objects representing the time of the requested mission. If none, all missions will be returned. distance : int, optional Distance from storm center, in kilometers. Returns ------- list The IDs of any/all missions that had in-storm observations during the specified time. Notes ----- Especially in earlier years, missions are not always numbered sequentially (e.g., the first mission might not have an ID of "01"). To get a ``Mission`` object for one or more mission IDs, use the mission ID as an argument in ``ReconDataset.get_mission()``. For example, to retrieve a Mission object for every mission valid at a requested time, assuming that ``ReconDataset`` is linked to a Storm object: .. code-block:: python #Enter a requested time here import datetime as dt requested_time = dt.datetime(2020,8,12,12) #enter your requested time here #Get all active mission ID(s), if any, for this time mission_ids = storm.recon.find_mission(requested_time) #Get Mission object for each mission for mission_id in mission_ids: mission = storm.recon.get_mission(mission_id) print(mission) """ # Return all missions if time is None if time is None: if distance is None: data = self.hdobs.data else: data = self.hdobs.sel(distance=distance).data missions = np.unique(data['mission'].values) return list(missions) # Filter temporally if isinstance(time, list): t1 = min(time) t2 = max(time) else: t1 = t2 = time # Filter spatially if distance is None: data = self.hdobs.data else: data = self.hdobs.sel(distance=distance).data # Find and return missions selected = [] mission_groups = data.groupby('mission') for g in mission_groups: t_start, t_end = (min(g[1]['time']), max(g[1]['time'])) if t_start <= t1 <= t_end or t_start <= t2 <= t_end or t1 < t_start < t2: selected.append(g[0]) return selected
[docs] def plot_summary(self, mission=None, save_path=None): r""" Plot summary map of all recon data. Parameters ---------- mission : str, optional String with mission name. Will plot summary for the specified mission, otherwise plots for all missions (default). save_path : str Relative or full path of directory to save the image in. If none, image will not be saved. Returns ------- ax Instance of axes containing the plot. Notes ----- HDOB data needs to be read into the recon object to use this function. To do so, use the ``ReconDataset.get_hdobs()`` function. """ # Error check if 'hdobs' not in self.__dict__.keys(): raise RuntimeError( "hdobs needs to be read into the 'recon' object first. Use the 'ReconDataset.get_hdobs()' method to read in HDOBs data.") prop = { 'hdobs': {'ms': 5, 'marker': 'o'}, 'dropsondes': {'ms': 25, 'marker': 'v'}, 'vdms': {'ms': 100, 'marker': 's'} } hdobs = self.hdobs.sel(mission=mission) ax = hdobs.plot_points('pkwnd', prop={'cmap': { 1: 'firebrick', 2: 'tomato', 4: 'gold', 6: 'lemonchiffon'}, 'levels': (0, 200), 'ms': 2}) if 'dropsondes' in self.__dict__.keys(): dropsondes = self.dropsondes.sel(mission=mission) drop_lons = [] drop_lats = [] for drop in dropsondes.data: if np.isnan(drop['TOPlon']): drop_lons.append(drop['lon']) else: drop_lons.append(drop['TOPlon']) if np.isnan(drop['TOPlat']): drop_lats.append(drop['lat']) else: drop_lats.append(drop['TOPlat']) ax.scatter(drop_lons, drop_lats, s=50, marker='v', edgecolor='w', linewidth=0.5, color='darkblue', transform=ccrs.PlateCarree()) if 'vdms' in self.__dict__.keys(): vdms = self.vdms.sel(mission=mission) ax.scatter(*zip(*[(d['lon'], d['lat']) for d in vdms.data]), s=80, marker='H', edgecolor='w', linewidth=1, color='k', transform=ccrs.PlateCarree()) title_left = ax.get_title(loc='left').split('\n') newtitle = title_left[0] + '\nRecon summary' + \ ['', f' for mission {mission}'][mission is not None] ax.set_title(newtitle, fontsize=17, fontweight='bold', loc='left') if save_path is not None and isinstance(save_path, str): plt.savefig(save_path, bbox_inches='tight') return ax
[docs]class hdobs: r""" Creates an instance of an HDOBs object containing all recon High Density Observations (HDOBs) for a single storm. Parameters ---------- storm : tropycal.tracks.Storm Requested storm. data : str, optional Filepath of pickle file containing HDOBs data retrieved from ``hdobs.to_pickle()``. If provided, data will be retrieved from the local pickle file instead of the NHC server. update : bool If True, search for new data, following existing data in the dropsonde object, and concatenate. Default is False. Returns ------- Dataset An instance of HDOBs, initialized with a dataframe of HDOB. Notes ----- .. warning:: Recon data is currently only available from 1989 onwards. There are two recommended ways of retrieving an hdob object. Since the ``ReconDataset``, ``hdobs``, ``dropsondes`` and ``vdms`` classes are **storm-centric**, a Storm object is required for both methods. .. code-block:: python #Retrieve Hurricane Michael (2018) from TrackDataset basin = tracks.TrackDataset() storm = basin.get_storm(('michael',2018)) The first method is to use the empty instance of ReconDataset already initialized in the Storm object, which has a ``get_hdobs()`` method thus allowing all of the hdobs attributes and methods to be accessed from the Storm object. As a result, a Storm object does not need to be provided as an argument. .. code-block:: python #Retrieve all HDOBs for this storm storm.recon.get_hdobs() #Retrieve the raw HDOBs data storm.recon.hdobs.data #Use the plot_points() method of hdobs storm.recon.hdobs.plot_points() The second method is to use the hdobs class independently of the other recon classes: .. code-block:: python from tropycal.recon import hdobs #Retrieve all HDOBs for this storm, passing the Storm object as an argument hdobs_obj = hdobs(storm) #Retrieve the raw HDOBs data hdobs_obj.data #Use the plot_points() method of hdobs hdobs_obj.plot_points() """ def __repr__(self): summary = ["<tropycal.recon.hdobs>"] # Find maximum wind and minimum pressure max_wspd = np.nanmax(self.data['wspd']) max_pkwnd = np.nanmax(self.data['pkwnd']) max_sfmr = np.nanmax(self.data['sfmr']) min_psfc = np.nanmin(self.data['p_sfc']) time_range = [pd.to_datetime(t) for t in ( np.nanmin(self.data['time']), np.nanmax(self.data['time']))] # Add general summary emdash = '\u2014' summary_keys = { 'Storm': f'{self.storm.name} {self.storm.year}', 'Missions': len(set(self.data['mission'])), 'Time range': f"{time_range[0]:%b-%d %H:%M} {emdash} {time_range[1]:%b-%d %H:%M}", 'Max 30sec flight level wind': f"{max_wspd} knots", 'Max 10sec flight level wind': f"{max_pkwnd} knots", 'Max SFMR wind': f"{max_sfmr} knots", 'Min surface pressure': f"{min_psfc} hPa", 'Source': self.source } # Add dataset summary summary.append("Dataset Summary:") add_space = np.max([len(key) for key in summary_keys.keys()]) + 3 for key in summary_keys.keys(): key_name = key + ":" summary.append( f'{" "*4}{key_name:<{add_space}}{summary_keys[key]}') return "\n".join(summary) def __init__(self, storm, data=None, update=False): self.storm = storm self.data = None self.format = 1 self.source = 'National Hurricane Center (NHC)' # Get URL based on storm year if storm.year >= 2012: self.format = 1 if storm.basin == 'north_atlantic': archive_url = [ f'https://www.nhc.noaa.gov/archive/recon/{self.storm.year}/AHONT1/'] else: archive_url = [ f'https://www.nhc.noaa.gov/archive/recon/{self.storm.year}/AHOPN1/'] elif storm.year <= 2011 and storm.year >= 2008: self.format = 2 if storm.basin == 'north_atlantic': archive_url = [f'https://www.nhc.noaa.gov/archive/recon/{self.storm.year}/HDOB/NOAA/URNT15/', f'https://www.nhc.noaa.gov/archive/recon/{self.storm.year}/HDOB/USAF/URNT15/'] else: archive_url = [f'https://www.nhc.noaa.gov/archive/recon/{self.storm.year}/HDOB/NOAA/URPN15/', f'https://www.nhc.noaa.gov/archive/recon/{self.storm.year}/HDOB/USAF/URPN15/'] elif storm.year == 2007: self.format = 3 archive_url = [f'https://www.nhc.noaa.gov/archive/recon/{self.storm.year}/HDOB/NOAA/', f'https://www.nhc.noaa.gov/archive/recon/{self.storm.year}/HDOB/USAF/'] elif storm.year == 2006: self.format = 4 archive_url = [ f'https://www.nhc.noaa.gov/archive/recon/{self.storm.year}/HDOB/'] elif storm.year >= 2002: self.format = 5 archive_url = [ f'https://www.nhc.noaa.gov/archive/recon/{self.storm.year}/{self.storm.name.upper()}/'] elif storm.year <= 2001 and storm.year >= 1989: self.format = 6 archive_url = [ f'https://www.nhc.noaa.gov/archive/recon/{self.storm.year}/{self.storm.name.lower()}/'] else: raise RuntimeError("Recon data is not available prior to 1989.") if isinstance(data, str): with open(data, 'rb') as f: self.data = pickle.load(f) elif data is not None: self.data = data if data is None or update: try: start_time = max(self.data['time']) except: start_time = min(self.storm.dict['time']) - timedelta(hours=12) end_time = max(self.storm.dict['time']) + timedelta(hours=12) timestr = [f'{start_time:%Y%m%d}'] +\ [f'{t:%Y%m%d}' for t in self.storm.dict['time'] if t > start_time] +\ [f'{end_time:%Y%m%d}'] # Retrieve list of files in URL(s) and filter by storm dates if self.format in [1, 3, 4]: page = requests.get(archive_url[0]).text content = page.split("\n") files = [] for line in content: if ".txt" in line: files.append( ((line.split('txt">')[1]).split("</a>")[0]).split(".")) del content files = sorted([i for i in files if i[1][:8] in timestr], key=lambda x: x[1]) linksub = [archive_url[0] + '.'.join(l) for l in files] elif self.format == 2: linksub = [] for url in archive_url: files = [] page = requests.get(url).text content = page.split("\n") for line in content: if ".txt" in line: files.append( ((line.split('txt">')[1]).split("</a>")[0]).split(".")) del content linksub += sorted([url + '.'.join(i) for i in files if i[1][:8] in timestr], key=lambda x: x[1]) linksub = sorted(linksub) elif self.format == 5: page = requests.get(archive_url[0]).text content = page.split("\n") files = [] for line in content: if ".txt" in line and 'HDOBS' in line: files.append( ((line.split('txt">')[1]).split("</a>")[0])) del content linksub = [archive_url[0] + l for l in files] elif self.format == 6: page = requests.get(archive_url[0]).text content = page.split("\n") files = [] for line in content: if ".txt" in line: files.append( ((line.split('txt">')[1]).split("</a>")[0])) del content linksub = [archive_url[0] + l for l in files if l[0] in ['H', 'h', 'M', 'm']] # Initiate urllib3 urllib3.disable_warnings() http = urllib3.PoolManager() # Read through all files timer_start = dt.now() print( f'Searching through recon HDOB files between {timestr[0]} and {timestr[-1]} ...') filecount, unreadable = 0, 0 found = False for link in linksub: # Read URL response = http.request('GET', link) content = response.data.decode('utf-8') # Find mission name line row = 3 if self.format <= 4 else 0 while len(content.split("\n")[row]) < 3 or content.split("\n")[row][:3] in ["SXX", "URN", "URP", "YYX"]: row += 1 if row >= 100: break if row >= 100: continue missionname = [i.split() for i in content.split('\n')][row][1] # Check for mission name to storm match by format if self.format != 6: check = missionname[2:5] == self.storm.operational_id[2:4] + \ self.storm.operational_id[0] else: check = True # Read HDOBs if this file matches the requested storm if check: filecount += 1 try: # Decode HDOBs by format if self.format <= 3: iter_hdob = decode_hdob(content, mission_row=row) elif self.format == 4: strdate = (link.split('.')[-2])[:8] iter_hdob = decode_hdob_2006( content, strdate, mission_row=row) elif self.format == 6: # Check for date day = int(content.split("\n")[ row - 1].split()[2][:2]) for iter_date in storm.dict['time']: found_date = False if iter_date.day == day: date = dt(iter_date.year, iter_date.month, iter_date.day) strdate = date.strftime('%Y%m%d') found_date = True break if not found_date: continue iter_hdob = decode_hdob_2006( content, strdate, mission_row=row) elif self.format == 5: # Split content by 10/20 minute blocks strdate = (link.split('.')[-3]).split("_")[-1] content_split = content.split("NNNN") iter_hdob = None for iter_content in content_split: iter_split = iter_content.split("\n") if len(iter_split) < 10: continue # Search for starting line of data within sub-block found = False for line in iter_split: if missionname in line: found = True temp_row = 0 while len(iter_split[temp_row]) < 3 or iter_split[temp_row][:3] in ["SXX", "URN", "URP"]: temp_row += 1 if temp_row >= 100: break if temp_row >= 100: break # Parse data by format if 'NOAA' in link: iter_hdob_loop = decode_hdob_2005_noaa( iter_content, strdate, temp_row) else: iter_hdob_loop = decode_hdob_2006( iter_content, strdate, temp_row) # Append HDOBs to full data if iter_hdob is None: iter_hdob = copy.copy(iter_hdob_loop) elif max(iter_hdob_loop['time']) > start_time: iter_hdob = pd.concat( [iter_hdob, iter_hdob_loop]) else: pass # Append HDOBs to full data if self.data is None: self.data = copy.copy(iter_hdob) elif max(iter_hdob['time']) > start_time: self.data = pd.concat([self.data, iter_hdob]) else: pass except: unreadable += 1 print(f'--> Completed reading in recon HDOB files ({(dt.now()-timer_start).total_seconds():.1f} seconds)' + f'\nRead {filecount} files' + f'\nUnable to decode {unreadable} files') # This code will crash if no HDOBs are available try: # Sort data by time self.data.sort_values(['time'], inplace=True) # Recenter self._recenter() self.keys = list(self.data.keys()) except: self.keys = []
[docs] def update(self): r""" Update with the latest data for an ongoing storm. Notes ----- This function has no return value, but simply updates the internal HDOB data with new observations since the object was created. """ self = self.__init__(storm=self.storm, data=self.data, update=True)
def _find_centers(self, data=None): if data is None: data = self.data data = data.sort_values(['mission', 'time']) def fill_nan(A): # Interpolate to fill nan values A = np.array(A) inds = np.arange(len(A)) good = np.where(np.isfinite(A)) good_grad = np.interp(inds, good[0], np.gradient(good[0])) if len(good[0]) >= 3: f = interp1d(inds[good], A[good], bounds_error=False, kind='quadratic') B = np.where((np.isfinite(A)[good[0][0]:good[0][-1] + 1]) | (good_grad[good[0][0]:good[0][-1] + 1] > 3), A[good[0][0]:good[0][-1] + 1], f(inds[good[0][0]:good[0][-1] + 1])) return [np.nan] * good[0][0] + list(B) + [np.nan] * (inds[-1] - good[0][-1]) else: return [np.nan] * len(A) missiondata = data.groupby('mission') dfs = [] for group in missiondata: mdata = group[1] # Check that sfc pressure spread is big enough to identify real minima if np.nanpercentile(mdata['p_sfc'], 95) - np.nanpercentile(mdata['p_sfc'], 5) > 8: # Interp p_sfc across missing data p_sfc_interp = fill_nan(mdata['p_sfc']) # Interp wspd across missing data wspd_interp = fill_nan(mdata['wspd']) # Smooth p_sfc and wspd p_sfc_smooth = [ np.nan] * 1 + list(np.convolve(p_sfc_interp, [1 / 3] * 3, mode='valid')) + [np.nan] * 1 wspd_smooth = [ np.nan] * 1 + list(np.convolve(wspd_interp, [1 / 3] * 3, mode='valid')) + [np.nan] * 1 # Add wspd to p_sfc to encourage finding p mins with wspd mins # and prevent finding p mins in intense thunderstorms pw_test = np.array(p_sfc_smooth) + np.array(wspd_smooth) * .1 # Find mins in 20-minute windows imin = np.nonzero(pw_test == minimum_filter(pw_test, 40))[0] # Only use mins if below 10th %ile of mission p_sfc data and when plane p is 550-950mb # and not in takeoff and landing time windows plane_p = fill_nan(mdata['plane_p']) imin = [i for i in imin if 800 < p_sfc_interp[i] < np.nanpercentile(mdata['p_sfc'], 10) and 550 < plane_p[i] < 950 and i > 60 and i < len(mdata) - 60] else: imin = [] mdata['iscenter'] = np.array( [1 if i in imin else 0 for i in range(len(mdata))]) dfs.append(mdata) data = pd.concat(dfs) numcenters = sum(data['iscenter']) print(f'Found {numcenters} center passes') return data def _recenter(self): data = copy.copy(self.data) # Interpolate center position to time of each ob interp_clon, interp_clat = self.storm.recon.get_track(data['time']) # Get x,y distance of each ob from coinciding interped center position data['xdist'] = [great_circle((interp_clat[i], interp_clon[i]), (interp_clat[i], data['lon'].values[i])).kilometers * [1, -1][int(data['lon'].values[i] < interp_clon[i])] for i in range(len(data))] data['ydist'] = [great_circle((interp_clat[i], interp_clon[i]), (data['lat'].values[i], interp_clon[i])).kilometers * [1, -1][int(data['lat'].values[i] < interp_clat[i])] for i in range(len(data))] data['distance'] = [(i**2 + j**2)**.5 for i, j in zip(data['xdist'], data['ydist'])] imin = np.nonzero(data['distance'].values == minimum_filter(data['distance'].values, 40))[0] data['iscenter'] = np.array( [1 if i in imin and data['distance'].values[i] < 10 else 0 for i in range(len(data))]) # print('Completed hdob center-relative coordinates') self.data = data
[docs] def sel(self, mission=None, time=None, domain=None, plane_p=None, plane_z=None, p_sfc=None, temp=None, dwpt=None, wdir=None, wspd=None, pkwnd=None, sfmr=None, noflag=None, iscenter=None, distance=None): r""" Select a subset of HDOBs by any of its parameters and return a new hdobs object. Parameters ---------- mission : str Mission name (number + storm id), e.g. mission 7 for AL05 is '0705L' time : list/tuple of datetimes list/tuple of start time and end time datetime objects. Default is None, which returns all points domain : dict dictionary with keys 'n', 's', 'e', 'w' corresponding to boundaries of domain plane_p : list/tuple of float/int list/tuple of plane_p bounds (min,max). None in either position of a tuple means it is boundless on that side. plane_z : list/tuple of float/int list/tuple of plane_z bounds (min,max). None in either position of a tuple means it is boundless on that side. Returns ------- hdobs object A new hdobs object that satisfies the intersection of all subsetting. """ NEW_DATA = copy.copy(self.data) # Apply mission filter if mission is not None: mission = str(mission) NEW_DATA = NEW_DATA.loc[NEW_DATA['mission'] == mission] # Apply time filter if time is not None: bounds = get_bounds(NEW_DATA['time'], time) NEW_DATA = NEW_DATA.loc[(NEW_DATA['time'] > bounds[0]) & ( NEW_DATA['time'] < bounds[1])] # Apply domain filter if domain is not None: tmp = {k[0].lower(): v for k, v in domain.items()} domain = {'n': 90, 's': -90, 'e': 359.99, 'w': 0} domain.update(tmp) bounds = get_bounds( NEW_DATA['lon'] % 360, (domain['w'] % 360, domain['e'] % 360)) NEW_DATA = NEW_DATA.loc[(NEW_DATA['lon'] % 360 >= bounds[0]) & ( NEW_DATA['lon'] % 360 <= bounds[1])] bounds = get_bounds(NEW_DATA['lat'], (domain['s'], domain['n'])) NEW_DATA = NEW_DATA.loc[(NEW_DATA['lat'] >= bounds[0]) & ( NEW_DATA['lat'] <= bounds[1])] # Apply flight pressure filter if plane_p is not None: bounds = get_bounds(NEW_DATA['plane_p'], plane_p) NEW_DATA = NEW_DATA.loc[(NEW_DATA['plane_p'] > bounds[0]) & ( NEW_DATA['plane_p'] < bounds[1])] # Apply flight height filter if plane_z is not None: bounds = get_bounds(NEW_DATA['plane_z'], plane_z) NEW_DATA = NEW_DATA.loc[(NEW_DATA['plane_z'] > bounds[0]) & ( NEW_DATA['plane_z'] < bounds[1])] # Apply surface pressure filter if p_sfc is not None: bounds = get_bounds(NEW_DATA['p_sfc'], p_sfc) NEW_DATA = NEW_DATA.loc[(NEW_DATA['p_sfc'] > bounds[0]) & ( NEW_DATA['p_sfc'] < bounds[1])] # Apply temperature filter if temp is not None: bounds = get_bounds(NEW_DATA['temp'], temp) NEW_DATA = NEW_DATA.loc[(NEW_DATA['temp'] > bounds[0]) & ( NEW_DATA['temp'] < bounds[1])] # Apply dew point filter if dwpt is not None: bounds = get_bounds(NEW_DATA['dwpt'], dwpt) NEW_DATA = NEW_DATA.loc[(NEW_DATA['dwpt'] > bounds[0]) & ( NEW_DATA['dwpt'] < bounds[1])] # Apply wind direction filter if wdir is not None: bounds = get_bounds(NEW_DATA['wdir'], wdir) NEW_DATA = NEW_DATA.loc[(NEW_DATA['wdir'] > bounds[0]) & ( NEW_DATA['wdir'] < bounds[1])] # Apply wind speed filter if wspd is not None: bounds = get_bounds(NEW_DATA['wspd'], wspd) NEW_DATA = NEW_DATA.loc[(NEW_DATA['wspd'] > bounds[0]) & ( NEW_DATA['wspd'] < bounds[1])] # Apply peak wind filter if pkwnd is not None: bounds = get_bounds(NEW_DATA['pkwnd'], pkwnd) NEW_DATA = NEW_DATA.loc[(NEW_DATA['pkwnd'] > bounds[0]) & ( NEW_DATA['pkwnd'] < bounds[1])] # Apply sfmr filter if sfmr is not None: bounds = get_bounds(NEW_DATA['sfmr'], sfmr) NEW_DATA = NEW_DATA.loc[(NEW_DATA['sfmr'] > bounds[0]) & ( NEW_DATA['sfmr'] < bounds[1])] # Apply iscenter filter if iscenter is not None: NEW_DATA = NEW_DATA.loc[NEW_DATA['iscenter'] == iscenter] # Apply distance filter if distance is not None: NEW_DATA = NEW_DATA.loc[NEW_DATA['distance'] < distance] NEW_OBJ = hdobs(storm=self.storm, data=NEW_DATA) return NEW_OBJ
[docs] def to_pickle(self, filename): r""" Save HDOB data (Pandas dataframe) to a pickle file. Parameters ---------- filename : str name of file to save pickle file to. Notes ----- This method saves the HDOBs data as a pickle within the current working directory, given a filename as an argument. For example, assume ``hdobs`` was retrieved from a Storm object (using the first method described in the ``hdobs`` class documentation). The HDOBs data would be saved to a pickle file as follows: >>> storm.recon.hdobs.to_pickle("mystorm_hdobs.pickle") Now the HDOBs data is saved locally, and next time recon data for this storm needs to be analyzed, this allows to bypass re-reading the HDOBs data from the NHC server by providing the pickle file as an argument: >>> storm.recon.get_hdobs("mystorm_hdobs.pickle") """ with open(filename, 'wb') as f: pickle.dump(self.data, f)
[docs] def plot_time_series(self, varname=('p_sfc', 'wspd'), mission=None, time=None, realtime=False, **kwargs): r""" Plots a time series of one or two variables on an axis. Parameters ---------- varname : str or tuple If one variable to plot, varname is a string of the variable name. If two variables to plot, varname is a tuple of the left and right variable names, respectively. Available varnames are: * **p_sfc** - Mean Sea Level Pressure (hPa) * **temp** - Flight Level Temperature (C) * **dwpt** - Flight Level Dewpoint (C) * **wspd** - Flight Level Wind (kt) * **sfmr** - Surface Wind (kt) * **pkwnd** - Peak Wind Gust (kt) * **rain** - Rain Rate (mm/hr) * **plane_z** - Geopotential Height (m) * **plane_p** - Pressure (hPa) mission : int Mission number to plot. If None, all missions for this storm are plotted. time : tuple Tuple of start and end times (datetime.datetime) to plot. If None, all times available are plotted. realtime : bool If True, the most recent 2 hours of the mission will plot, overriding the time argument. Default is False. Other Parameters ---------------- left_prop : dict Dictionary of properties for the left line. Scroll down for more information. right_prop : dict Dictionary of properties for the right line. Scroll down for more information. Returns ------- ax Instance of axes containing the plot is returned. Notes ----- The following properties are available for customizing the plot, via ``left_prop`` and ``right_prop``. .. list-table:: :widths: 25 75 :header-rows: 1 * - Property - Description * - ms - Marker size. If zero, none will be plotted. Default is zero. * - color - Color of lines (and markers if used). Default varies per varname. * - linewidth - Line width. Default is 1.0. """ # Pop kwargs left_prop = kwargs.pop('left_prop', {}) right_prop = kwargs.pop('right_prop', {}) # Retrieve variables twin_ax = False if isinstance(varname, tuple): varname_right = varname[1] varname = varname[0] twin_ax = True varname_right_info = time_series_plot(varname_right) varname_info = time_series_plot(varname) # Filter by mission def str2(number): if number < 10: return f'0{number}' return str(number) if mission is not None: df = self.data.loc[self.data['mission'] == str2(mission)] if len(df) == 0: raise ValueError("Mission number provided is invalid.") else: df = self.data # Filter by time or realtime flag if realtime: end_time = pd.to_datetime(df['time'].values[-1]) df = df.loc[(df['time'] >= end_time - timedelta(hours=2)) & (df['time'] <= end_time)] elif time is not None: df = df.loc[(df['time'] >= time[0]) & (df['time'] <= time[1])] if len(df) == 0: raise ValueError("Time range provided is invalid.") # Filter by default kwargs left_prop_default = { 'ms': 0, 'color': varname_info['color'], 'linewidth': 1 } for key in left_prop.keys(): left_prop_default[key] = left_prop[key] left_prop = left_prop_default if twin_ax: right_prop_default = { 'ms': 0, 'color': varname_right_info['color'], 'linewidth': 1 } for key in right_prop.keys(): right_prop_default[key] = right_prop[key] right_prop = right_prop_default # ---------------------------------------------------------------------------------- # Create figure fig, ax = plt.subplots(figsize=(9, 6), dpi=200) if twin_ax: ax.grid(axis='x') else: ax.grid() # Plot line line1 = ax.plot(df['time'], df[varname], color=left_prop['color'], linewidth=left_prop['linewidth'], label=varname_info['name']) ax.set_ylabel(varname_info['full_name']) # Plot dots if left_prop['ms'] >= 1: plot_times = df['time'].values plot_var = df[varname].values plot_times = [plot_times[i] for i in range( len(plot_times)) if varname not in df['flag'].values[i]] plot_var = [plot_var[i] for i in range( len(plot_var)) if varname not in df['flag'].values[i]] ax.plot(plot_times, plot_var, 'o', color=left_prop['color'], ms=left_prop['ms']) # Format x-axis dates ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%Mz\n%m/%d')) # Add twin axis if twin_ax: ax2 = ax.twinx() # Plot line line2 = ax2.plot(df['time'], df[varname_right], color=right_prop['color'], linewidth=right_prop['linewidth'], label=varname_right_info['name']) ax2.set_ylabel(varname_right_info['full_name']) # Plot dots if right_prop['ms'] >= 1: plot_times = df['time'].values plot_var = df[varname_right].values plot_times = [plot_times[i] for i in range( len(plot_times)) if varname_right not in df['flag'].values[i]] plot_var = [plot_var[i] for i in range( len(plot_var)) if varname_right not in df['flag'].values[i]] ax2.plot(plot_times, plot_var, 'o', color=right_prop['color'], ms=right_prop['ms']) # Add legend lines = line1 + line2 labels = [l.get_label() for l in lines] ax.legend(lines, labels) # Special handling if both are in units of Celsius same_unit = False if varname in ['temp', 'dwpt'] and varname_right in ['temp', 'dwpt']: same_unit = True if varname in ['sfmr', 'wspd', 'pkwnd'] and varname_right in ['sfmr', 'wspd', 'pkwnd']: same_unit = True if same_unit: min_val = np.nanmin( [np.nanmin(df[varname]), np.nanmin(df[varname_right])]) max_val = np.nanmax( [np.nanmax(df[varname]), np.nanmax(df[varname_right])]) * 1.05 min_val = min_val * 1.05 if min_val < 0 else min_val * 0.95 if np.isnan(min_val): min_val = 0 if np.isnan(max_val): max_val = 0 if min_val == max_val: min_val = 0 max_val = 10 ax.set_ylim(min_val, max_val) ax2.set_ylim(min_val, max_val) # Add titles storm_data = self.storm.dict type_array = np.array(storm_data['type']) idx = np.where((type_array == 'SD') | (type_array == 'SS') | (type_array == 'TD') | ( type_array == 'TS') | (type_array == 'HU') | (type_array == 'TY') | (type_array == 'ST')) if ('invest' in storm_data.keys() and not storm_data['invest']) or len(idx[0]) > 0: tropical_vmax = np.array(storm_data['vmax'])[idx] add_ptc_flag = False if len(tropical_vmax) == 0: add_ptc_flag = True idx = np.where((type_array == 'LO') | (type_array == 'DB')) tropical_vmax = np.array(storm_data['vmax'])[idx] subtrop = classify_subtropical(np.array(storm_data['type'])) peak_idx = storm_data['vmax'].index(np.nanmax(tropical_vmax)) peak_basin = storm_data['wmo_basin'][peak_idx] storm_type = get_storm_classification( np.nanmax(tropical_vmax), subtrop, peak_basin) if add_ptc_flag: storm_type = "Potential Tropical Cyclone" # Plot title title_string = f"{storm_type} {storm_data['name']}" if mission is None: title_string += f"\nRecon Aircraft HDOBs | All Missions" else: title_string += f"\nRecon Aircraft HDOBs | Mission #{mission}" ax.set_title(title_string, loc='left', fontweight='bold') ax.set_title("Plot generated using Tropycal", fontsize=8, loc='right') # Return plot return ax
[docs] def plot_points(self, varname='wspd', domain="dynamic", radlim=None, barbs=False, ax=None, cartopy_proj=None, **kwargs): r""" Creates a plot of recon data points. Parameters ---------- varname : str Variable to plot. Can be one of the following keys in dataframe: * **"sfmr"** = SFMR surface wind * **"wspd"** = 30-second flight level wind (default) * **"pkwnd"** = 10-second flight level wind * **"p_sfc"** = extrapolated surface pressure domain : str Domain for the plot. Default is "dynamic". Please refer to :ref:`options-domain` for available domain options. radlim : int Radius (in km) away from storm center to include points. If none (default), all points are plotted. barbs : bool If True, plots wind barbs. If False (default), plots dots. ax : axes Instance of axes to plot on. If none, one will be generated. Default is none. cartopy_proj : ccrs Instance of a cartopy projection to use. If none, one will be generated. Default is none. Other Parameters ---------------- prop : dict Customization properties of recon plot. Please refer to :ref:`options-prop-recon-plot` for available options. map_prop : dict Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options. Returns ------- ax Instance of axes containing the plot is returned. Notes ----- 1. Plotting wind barbs only works for wind related variables. ``barbs`` will be automatically set to False for non-wind variables. 2. The special colormap **category_recon** can be used in the prop dict (``prop={'cmap':'category_recon'}``). This uses the standard SSHWS colormap, but with a new color for wind between 50 and 64 knots. """ # Change barbs if varname == 'p_sfc': barbs = False # Pop kwargs prop = kwargs.pop('prop', {}) map_prop = kwargs.pop('map_prop', {}) # Get plot data dfRecon = self.data # Create instance of plot object self.plot_obj = ReconPlot() # Create cartopy projection if cartopy_proj is None: self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=0.0) cartopy_proj = self.plot_obj.proj # Plot recon plot_ax = self.plot_obj.plot_points( self.storm, dfRecon, domain, varname=varname, radlim=radlim, barbs=barbs, ax=ax, prop=prop, map_prop=map_prop) # Return axis return plot_ax
[docs] def plot_hovmoller(self, varname='wspd', radlim=None, window=6, align='center', ax=None, **kwargs): r""" Creates a hovmoller plot of azimuthally-averaged recon data. Parameters ---------- varname : str Variable to average and plot. Available variable names are: * **"sfmr"** = SFMR surface wind * **"wspd"** = 30-second flight level wind (default) * **"pkwnd"** = 10-second flight level wind * **"p_sfc"** = extrapolated surface pressure radlim : int, optional Radius from storm center, in kilometers, to plot in hovmoller. Default is 200 km. window : int, optional Window of hours to interpolate between observations. Default is 6 hours. align : str, optional Alignment of window. Default is 'center'. ax : axes, optional Instance of axes to plot on. If none, one will be generated. Default is none. Other Parameters ---------------- prop : dict Customization properties for recon plot. Please refer to :ref:`options-prop-recon-hovmoller` for available options. track_dict : dict, optional Storm track dictionary. If None (default), internal storm center track is used. Returns ------- ax Axes instance containing the plot. Notes ----- The special colormap **category_recon** can be used in the prop dict (``prop={'cmap':'category_recon'}``). This uses the standard SSHWS colormap, but with a new color for wind between 50 and 64 knots. """ # Pop kwargs track_dict = kwargs.pop('track_dict', None) prop = kwargs.pop('prop', {}) default_prop = { 'cmap': 'category', 'levels': None, 'smooth_contourf': False } for key in default_prop.keys(): if key not in prop.keys(): prop[key] = default_prop[key] # Get recon data dfRecon = self.data # Retrieve track dictionary if none is specified if track_dict is None: track_dict = self.storm.dict # Interpolate recon data to a hovmoller iRecon = interpRecon(dfRecon, varname, radlim, window=window, align=align) Hov_dict = iRecon.interpHovmoller(track_dict) # title = get_recon_title(varname) #may not be necessary # If no contour levels specified, generate levels based on data min and max if prop['levels'] is None: prop['levels'] = (np.nanmin(Hov_dict['hovmoller']), np.nanmax(Hov_dict['hovmoller'])) # Retrieve updated contour levels and colormap based on input arguments and variable type cmap, clevs = get_cmap_levels(varname, prop['cmap'], prop['levels']) # Retrieve hovmoller times, radii and data time = Hov_dict['time'] radius = Hov_dict['radius'] vardata = Hov_dict['hovmoller'] # Error check time time = [dt.strptime((i.strftime('%Y%m%d%H%M')), '%Y%m%d%H%M') for i in time] # ------------------------------------------------------------------------------ # Create plot plt.figure(figsize=(9, 9), dpi=150) ax = plt.subplot() # Plot surface category colors individually, necessitating normalizing colormap if varname in ['vmax', 'sfmr', 'wspd', 'fl_to_sfc'] and prop['cmap'] in ['category', 'category_recon']: norm = mcolors.BoundaryNorm(clevs, cmap.N) cf = ax.contourf(radius, time, gfilt1d(vardata, sigma=3, axis=1), levels=clevs, cmap=cmap, norm=norm) # Multiple clevels or without smooth contouring elif len(prop['levels']) > 2 or not prop['smooth_contourf']: cf = ax.contourf(radius, time, gfilt1d(vardata, sigma=3, axis=1), levels=clevs, cmap=cmap) # Automatically generated levels with smooth contouring else: cf = ax.contourf(radius, time, gfilt1d(vardata, sigma=3, axis=1), cmap=cmap, levels=np.linspace(min(prop['levels']), max(prop['levels']), 256)) ax.axis([0, max(radius), min(time), max(time)]) # Plot colorbar cbar = plt.colorbar(cf, orientation='horizontal', pad=0.1) # Format y-label ticks and labels as dates ax.yaxis.set_major_formatter(mdates.DateFormatter('%m-%d %H')) for tick in ax.xaxis.get_major_ticks(): tick.label.set_fontsize(14) for tick in ax.yaxis.get_major_ticks(): tick.label.set_fontsize(14) # Set axes labels ax.set_ylabel('UTC Time (MM-DD HH)', fontsize=15) ax.set_xlabel('Radius (km)', fontsize=15) # -------------------------------------------------------------------------------------- # Generate left and right title strings title_left, title_right = hovmoller_plot_title( self.storm, Hov_dict, varname) ax.set_title(title_left, loc='left', fontsize=16, fontweight='bold') ax.set_title(title_right, loc='right', fontsize=12) # Add plot credit ax.text(0.02, 0.02, 'Plot generated by Tropycal', ha='left', va='bottom', transform=ax.transAxes) # Return axis return ax
[docs] def plot_maps(self, time=None, varname='wspd', recon_stats=None, domain="dynamic", window=6, align='center', radlim=None, ax=None, cartopy_proj=None, save_dir=None, **kwargs): r""" Creates maps of interpolated recon data. Parameters ---------- time : datetime.datetime or list Single datetime object, or list/tuple of datetime objects containing the start and end times to plot between. If None (default), all times will be plotted. varname : str or tuple Variable to plot. Can be one of the following keys in dataframe: * **"sfmr"** = SFMR surface wind * **"wspd"** = 30-second flight level wind (default) * **"pkwnd"** = 10-second flight level wind * **"p_sfc"** = extrapolated surface pressure domain : str Domain for the plot. Default is "dynamic". Please refer to :ref:`options-domain` for available domain options. radlim : int, optional Radius from storm center, in kilometers, to plot. Default is 200 km. window : int, optional Window of hours to interpolate between observations. Default is 6 hours. align : str, optional Alignment of window. Default is 'center'. ax : axes, optional Instance of axes to plot on. If none, one will be generated. Default is none. cartopy_proj : ccrs, optional Instance of a cartopy projection to use. If none, one will be generated. Default is none. save_dir : str, optional Directory to save output images in. If None, images will not be saved. Default is None. Other Parameters ---------------- prop : dict Customization properties of recon plot. Please refer to :ref:`options-prop-recon-swath` for available options. map_prop : dict Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options. track_dict : dict, optional Storm track dictionary. If None (default), internal storm center track is used. """ # Pop kwargs prop = kwargs.pop('prop', {}) map_prop = kwargs.pop('map_prop', {}) track_dict = kwargs.pop('track_dict', None) # Get plot data ONE_MAP = False if time is None: dfRecon = self.data elif isinstance(time, (tuple, list)): dfRecon = self.sel(time=time).data elif isinstance(time, dt): dfRecon = self.sel( time=(time - timedelta(hours=6), time + timedelta(hours=6))).data ONE_MAP = True MULTIVAR = False if isinstance(varname, (tuple, list)): MULTIVAR = True if track_dict is None: track_dict = self.storm.dict # Error check for time dimension name if 'time' not in track_dict.keys(): track_dict['time'] = track_dict['time'] if ONE_MAP: f = interp1d(mdates.date2num( track_dict['time']), track_dict['lon'], fill_value='extrapolate') clon = f(mdates.date2num(time)) f = interp1d(mdates.date2num( track_dict['time']), track_dict['lat'], fill_value='extrapolate') clat = f(mdates.date2num(time)) # clon = np.interp(mdates.date2num(recon_select),mdates.date2num(track_dict['time']),track_dict['lon']) # clat = np.interp(mdates.date2num(recon_select),mdates.date2num(track_dict['time']),track_dict['lat']) track_dict = { 'time': time, 'lon': clon, 'lat': clat } if MULTIVAR: Maps = [] for v in varname: iRecon = interpRecon(dfRecon, v, radlim, window=window, align=align) tmpMaps = iRecon.interpMaps(track_dict) Maps.append(tmpMaps) else: iRecon = interpRecon(dfRecon, varname, radlim, window=window, align=align) Maps = iRecon.interpMaps(track_dict) # titlename,units = get_recon_title(varname) if 'levels' not in prop.keys() or 'levels' in prop.keys() and prop['levels'] is None: prop['levels'] = np.arange(np.floor(np.nanmin(Maps['maps']) / 10) * 10, np.ceil(np.nanmax(Maps['maps']) / 10) * 10 + 1, 10) if not ONE_MAP: if save_dir is True: save_dir = f'{self.storm}{self.year}_maps' try: os.system(f'mkdir {save_dir}') except: pass if MULTIVAR: Maps2 = Maps[1] Maps = Maps[0] print(np.nanmax(Maps['maps']), np.nanmin(Maps2['maps'])) figs = [] for i, t in enumerate(Maps['time']): Maps_sub = { 'time': t, 'grid_x': Maps['grid_x'], 'grid_y': Maps['grid_y'], 'maps': Maps['maps'][i], 'center_lon': Maps['center_lon'][i], 'center_lat': Maps['center_lat'][i], 'stats': Maps['stats'] } # Create instance of plot object self.plot_obj = ReconPlot() # Create cartopy projection self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=0.0) cartopy_proj = self.plot_obj.proj # Maintain the same lat / lon dimensions for all dynamic maps # Determined by the dynamic domain from the first map if i > 0 and domain == 'dynamic': d1 = { 'n': Maps_sub['center_lat'] + dlat, 's': Maps_sub['center_lat'] - dlat, 'e': Maps_sub['center_lon'] + dlon, 'w': Maps_sub['center_lon'] - dlon } else: d1 = domain # Plot recon if MULTIVAR: Maps_sub1 = dict(Maps_sub) Maps_sub2 = dict(Maps_sub) Maps_sub = [Maps_sub1, Maps_sub2] Maps_sub[1]['maps'] = Maps2['maps'][i] print(np.nanmax(Maps_sub[0]['maps']), np.nanmin(Maps_sub[1]['maps'])) plot_ax, d0 = self.plot_obj.plot_maps(self.storm, Maps_sub, varname, recon_stats, domain=d1, ax=ax, return_domain=True, prop=prop, map_prop=map_prop) # Get domain dimensions from the first map if i == 0: dlat = .5 * (d0['n'] - d0['s']) dlon = .5 * (d0['e'] - d0['w']) figs.append(plot_ax) if save_dir is not None: plt.savefig( f'{save_dir}/{t.strftime("%Y%m%d%H%M")}.png', bbox_inches='tight') plt.close() if save_dir is None: return figs else: # Create instance of plot object self.plot_obj = ReconPlot() # Create cartopy projection if cartopy_proj is None: self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=0.0) cartopy_proj = self.plot_obj.proj # Plot recon plot_ax = self.plot_obj.plot_maps( self.storm, Maps, varname, recon_stats, domain, ax, prop=prop, map_prop=map_prop) # Return axis return plot_ax
[docs] def plot_swath(self, varname='wspd', domain="dynamic", ax=None, cartopy_proj=None, **kwargs): r""" Creates a map plot of a swath of interpolated recon data. Parameters ---------- varname : str Variable to plot. Can be one of the following keys in dataframe: * **"sfmr"** = SFMR surface wind * **"wspd"** = 30-second flight level wind (default) * **"pkwnd"** = 10-second flight level wind * **"p_sfc"** = extrapolated surface pressure domain : str Domain for the plot. Default is "dynamic". Please refer to :ref:`options-domain` for available domain options. ax : axes Instance of axes to plot on. If none, one will be generated. Default is none. cartopy_proj : ccrs Instance of a cartopy projection to use. If none, one will be generated. Default is none. Other Parameters ---------------- prop : dict Customization properties of recon plot. Please refer to :ref:`options-prop-recon-swath` for available options. map_prop : dict Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options. track_dict : dict, optional Storm track dictionary. If None (default), internal storm center track is used. swathfunc : function Function to operate on interpolated recon data (e.g., np.max, np.min, or percentile function). Default is np.min for pressure, otherwise np.max. """ # Pop kwargs prop = kwargs.pop('prop', {}) map_prop = kwargs.pop('map_prop', {}) track_dict = kwargs.pop('track_dict', None) swathfunc = kwargs.pop('swathfunc', None) # Get plot data dfRecon = self.data if track_dict is None: track_dict = self.storm.dict if swathfunc is None: if varname == 'p_sfc': swathfunc = np.min else: swathfunc = np.max iRecon = interpRecon(dfRecon, varname) Maps = iRecon.interpMaps(track_dict, interval=.2) # Create instance of plot object self.plot_obj = ReconPlot() # Create cartopy projection if cartopy_proj is None: self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=0.0) cartopy_proj = self.plot_obj.proj # Plot recon plot_ax = self.plot_obj.plot_swath( self.storm, Maps, varname, swathfunc, track_dict, domain, ax, prop=prop, map_prop=map_prop) # Return axis return plot_ax
[docs] def gridded_stats(self, request, thresh={}, binsize=1, domain="dynamic", ax=None, return_array=False, cartopy_proj=None, prop={}, map_prop={}): r""" Creates a plot of gridded statistics. Parameters ---------- request : str This string is a descriptor for what you want to plot. It will be used to define the variable (e.g. 'wind' --> 'vmax') and the function (e.g. 'maximum' --> np.max()). This string is also used as the plot title. Variable words to use in request: * **wind** - (kt). Flight level wind. * **30s wind** - (kt). 30-second flight level wind. * **10s wind** - (kt). 10-second flight level wind. * **sfmr** - (kt). SFMR wind. * **pressure** - (hPa). Minimum pressure. Units of all wind variables are knots and pressure variables are hPa. These are added into the title. Function words to use in request: * **maximum** * **minimum** Example usage: "maximum wind", "minimum pressure" thresh : dict, optional Keywords in self.keys Units of all wind variables = kt, and pressure variables = hPa. These are added to the subtitle. binsize : float, optional Grid resolution in degrees. Default is 1 degree. domain : str, optional Domain for the plot. Default is "dynamic". Please refer to :ref:`options-domain` for available domain options. ax : axes, optional Instance of axes to plot on. If none, one will be generated. Default is none. return_array : bool, optional If True, returns the gridded 2D array used to generate the plot. Default is False. cartopy_proj : ccrs, optional Instance of a cartopy projection to use. If none, one will be generated. Default is none. Other Parameters ---------------- prop : dict, optional Customization properties of plot. Please refer to :ref:`options-prop-gridded` for available options. map_prop : dict, optional Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options. Returns ------- By default, the plot axes is returned. If "return_array" are set to True, a dictionary is returned containing both the axes and data array. """ default_prop = { 'smooth': None } for key in prop.keys(): default_prop[key] = prop[key] prop = default_prop # Update thresh based on input default_thresh = { 'sample_min': 1, 'p_max': np.nan, 'v_min': np.nan, 'dv_min': np.nan, 'dp_max': np.nan, 'dv_max': np.nan, 'dp_min': np.nan, 'dt_window': 24, 'dt_align': 'middle' } for key in thresh: default_thresh[key] = thresh[key] thresh = default_thresh # Retrieve the requested function, variable for computing stats, and plot title. These modify thresh if necessary. thresh, func = find_func(request, thresh) thresh, varname = find_var(request, thresh) # Format left title for plot endash = u"\u2013" dot = u"\u2022" title_L = request.lower() for name in ['wind', 'vmax']: title_L = title_L.replace(name, 'wind (kt)') for name in ['sfmr']: title_L = title_L.replace(name, 'sfmr (kt)') for name in ['pressure', 'mslp']: title_L = title_L.replace(name, 'pressure (hPa)') title_L = (title_L.title()).replace('Hpa','hPa') # --------------------------------------------------------------------------------------------------- points = self.data # Round lat/lon points down to nearest bin def to_bin(x): return np.floor(x / binsize) * binsize points["latbin"] = points.lat.map(to_bin) points["lonbin"] = points.lon.map(to_bin) # --------------------------------------------------------------------------------------------------- # Group by latbin,lonbin,stormid print("--> Grouping by lat/lon") groups = points.groupby(["latbin", "lonbin"]) # Loops through groups, and apply stat func to obs # Constructs a new dataframe containing the lat/lon bins and plotting variable new_df = { 'latbin': [], 'lonbin': [], varname: [] } for g in groups: new_df[varname].append(func(g[1][varname].values)) new_df['latbin'].append(g[0][0]) new_df['lonbin'].append(g[0][1]) new_df = pd.DataFrame.from_dict(new_df) # --------------------------------------------------------------------------------------------------- # Group again by latbin,lonbin # Construct two 1D lists: zi (grid values) and coords, that correspond to the 2D grid groups = new_df.groupby(["latbin", "lonbin"]) zi = [func(g[1][varname]) if len(g[1]) >= thresh['sample_min'] else np.nan for g in groups] # Construct a 1D array of coordinates coords = [g[0] for g in groups] # Construct a 2D longitude and latitude grid, using the specified binsize resolution if prop['smooth'] is not None: all_lats = [(round(l / binsize) * binsize) for l in self.data['lat']] all_lons = [(round(l / binsize) * binsize) % 360 for l in self.data['lon']] xi = np.arange(min(all_lons) - binsize, max(all_lons) + 2 * binsize, binsize) yi = np.arange(min(all_lats) - binsize, max(all_lats) + 2 * binsize, binsize) else: xi = np.arange(np.nanmin( points["lonbin"]) - binsize, np.nanmax(points["lonbin"]) + 2 * binsize, binsize) yi = np.arange(np.nanmin( points["latbin"]) - binsize, np.nanmax(points["latbin"]) + 2 * binsize, binsize) grid_x, grid_y = np.meshgrid(xi, yi) # Construct a 2D grid for the z value, depending on whether vector or scalar quantity grid_z = np.ones(grid_x.shape) * np.nan for c, z in zip(coords, zi): grid_z[np.where((grid_y == c[0]) & (grid_x == c[1]))] = z # --------------------------------------------------------------------------------------------------- # Create instance of plot object plot_obj = Plot() # Create cartopy projection using basin if cartopy_proj is None: if max(points['lon']) > 150 or min(points['lon']) < -150: plot_obj.create_cartopy( proj='PlateCarree', central_longitude=180.0) else: plot_obj.create_cartopy( proj='PlateCarree', central_longitude=0.0) prop['title_L'] = f'{self.storm.name} | {title_L}' prop['title_R'] = '' if domain == "dynamic": domain = { 'W': min(self.data['lon']), 'E': max(self.data['lon']), 'S': min(self.data['lat']), 'N': max(self.data['lat']) } # Plot gridded field plot_ax = plot_obj.plot_gridded( grid_x, grid_y, grid_z, varname, domain=domain, ax=ax, prop=prop, map_prop=map_prop) # Format grid into xarray if specified if return_array: try: # Import xarray and construct DataArray, replacing NaNs with zeros import xarray as xr arr = xr.DataArray(np.nan_to_num(grid_z), coords=[ grid_y.T[0], grid_x[0]], dims=['lat', 'lon']) return arr except ImportError as e: raise RuntimeError( "Error: xarray is not available. Install xarray in order to use the 'return_array' flag.") from e # Return axis if return_array: return {'ax': plot_ax, 'array': arr} else: return plot_ax
[docs]class dropsondes: r""" Creates an instance of a Dropsondes object containing all dropsonde data for a single storm. Parameters ---------- storm : tropycal.tracks.Storm Requested storm. data : str, optional Filepath of pickle file containing dropsondes data retrieved from ``dropsondes.to_pickle()``. If provided, data will be retrieved from the local pickle file instead of the NHC server. update : bool True = search for new data, following existing data in the dropsonde object, and concatenate. Returns ------- Dataset An instance of dropsondes. Notes ----- .. warning:: Recon data is currently only available from 2006 onwards. There are two recommended ways of retrieving a dropsondes object. Since the ``ReconDataset``, ``hdobs``, ``dropsondes`` and ``vdms`` classes are **storm-centric**, a Storm object is required for both methods. .. code-block:: python #Retrieve Hurricane Michael (2018) from TrackDataset basin = tracks.TrackDataset() storm = basin.get_storm(('michael',2018)) The first method is to use the empty instance of ReconDataset already initialized in the Storm object, which has a ``get_dropsondes()`` method thus allowing all of the dropsondes attributes and methods to be accessed from the Storm object. As a result, a Storm object does not need to be provided as an argument. .. code-block:: python #Retrieve all dropsondes for this storm storm.recon.get_dropsondes() #Retrieve the raw dropsondes data storm.recon.dropsondes.data #Use the plot_points() method of dropsondes storm.recon.dropsondes.plot_points() The second method is to use the dropsondes class independently of the other recon classes: .. code-block:: python from tropycal.recon import dropsondes #Retrieve all dropsondes for this storm, passing the Storm object as an argument dropsondes_obj = dropsondes(storm) #Retrieve the raw dropsondes data dropsondes_obj.data #Use the plot_points() method of dropsondes dropsondes_obj.plot_points() """ def __repr__(self): summary = ["<tropycal.recon.dropsondes>"] def isNA(x, units): if np.isnan(x): return 'N/A' else: return f'{x} {units}' # Find maximum wind and minimum pressure max_MBLspd = isNA(np.nanmax([i['MBLspd'] for i in self.data]), 'knots') max_DLMspd = isNA(np.nanmax([i['DLMspd'] for i in self.data]), 'knots') max_WL150spd = isNA(np.nanmax([i['WL150spd'] for i in self.data]), 'knots') min_slp = isNA(np.nanmin([i['slp'] for i in self.data]), 'hPa') missions = set([i['mission'] for i in self.data]) # Add general summary emdash = '\u2014' summary_keys = { 'Storm': f'{self.storm.name} {self.storm.year}', 'Missions': len(missions), 'Dropsondes': len(self.data), 'Max 500m-avg wind': max_MBLspd, 'Max 150m-avg wind': max_WL150spd, 'Min sea level pressure': min_slp, 'Source': self.source } # Add dataset summary summary.append("Dataset Summary:") add_space = np.max([len(key) for key in summary_keys.keys()]) + 3 for key in summary_keys.keys(): key_name = key + ":" summary.append( f'{" "*4}{key_name:<{add_space}}{summary_keys[key]}') return "\n".join(summary) def __init__(self, storm, data=None, update=False): self.storm = storm self.source = 'National Hurricane Center (NHC)' if storm.year >= 2006: self.format = 1 if storm.basin == 'north_atlantic': archive_url = f'https://www.nhc.noaa.gov/archive/recon/{self.storm.year}/REPNT3/' else: archive_url = f'https://www.nhc.noaa.gov/archive/recon/{self.storm.year}/REPPN3/' elif storm.year >= 2002 and storm.year <= 2005: self.format = 2 archive_url = f'https://www.nhc.noaa.gov/archive/recon/{self.storm.year}/{self.storm.name.upper()}/' elif storm.year >= 1989 and storm.year <= 2001: self.format = 3 archive_url = f'https://www.nhc.noaa.gov/archive/recon/{self.storm.year}/{self.storm.name.lower()}/' else: raise RuntimeError("Recon data is not available prior to 1989.") self.data = None if isinstance(data, str): with open(data, 'rb') as f: self.data = pickle.load(f) elif data is not None: self.data = data if data is None or update: try: start_time = max(self.data['time']) except: start_time = min(self.storm.dict['time']) - timedelta(days=1) end_time = max(self.storm.dict['time']) + timedelta(days=1) timeboundstrs = [f'{t:%Y%m%d%H%M}' for t in (start_time, end_time)] # Retrieve list of files in URL and filter by storm dates page = requests.get(archive_url).text content = page.split("\n") files = [] if self.format == 1: for line in content: if ".txt" in line: files.append( ((line.split('txt">')[1]).split("</a>")[0]).split(".")) del content files = sorted([i for i in files if i[1] >= min( timeboundstrs) and i[1] <= max(timeboundstrs)], key=lambda x: x[1]) linksub = [archive_url + '.'.join(l) for l in files] elif self.format == 2: for line in content: if ".txt" in line and 'DROPS' in line: files.append( ((line.split('txt">')[1]).split("</a>")[0])) del content linksub = [archive_url + l for l in files] elif self.format == 3: for line in content: if ".txt" in line: files.append( ((line.split('txt">')[1]).split("</a>")[0])) del content linksub = [archive_url + l for l in files if l[0] in ['D', 'd']] urllib3.disable_warnings() http = urllib3.PoolManager() timer_start = dt.now() print( f'Searching through recon dropsonde files between {timeboundstrs[0]} and {timeboundstrs[-1]} ...') filecount = 0 for link in linksub: response = http.request('GET', link) content = response.data.decode('utf-8') # Post-2006 format if self.format == 1: datestamp = dt.strptime(link.split('.')[-2], '%Y%m%d%H%M') try: missionname, tmp = decode_dropsonde( content, date=datestamp) except: continue testkeys = ('TOPtime', 'lat', 'lon') if missionname[2:5] == self.storm.operational_id[2:4] + self.storm.operational_id[0]: filecount += 1 if self.data is None: self.data = [copy.copy(tmp)] elif [tmp[k] for k in testkeys] not in [[d[k] for k in testkeys] for d in self.data]: self.data.append(tmp) else: pass # Pre-2002 format elif self.format == 3: # Check for date try: day = int(content.split("\n")[0].split()[2][:2]) for iter_date in storm.dict['time']: found_date = False if iter_date.day == day: date = dt(iter_date.year, iter_date.month, iter_date.day) found_date = True break if not found_date: continue missionname, tmp = decode_dropsonde( content.replace(";", ""), date=date) # Add date to mission hh = int(content.split("\n")[0].split()[2][2:4]) mm = int(content.split("\n")[0].split()[2][4:6]) tmp['TOPtime'] = dt( iter_date.year, iter_date.month, iter_date.day, hh, mm) if np.isnan(tmp['TOPlat']): tmp['TOPlat'] = tmp['lat'] if np.isnan(tmp['TOPlon']): tmp['TOPlon'] = tmp['lon'] testkeys = ('TOPtime', 'lat', 'lon') filecount += 1 if self.data is None: self.data = [copy.copy(tmp)] elif [tmp[k] for k in testkeys] not in [[d[k] for k in testkeys] for d in self.data]: self.data.append(tmp) else: pass except: pass # Pre-2006 format elif self.format == 2: strdate = (link.split('.')[-3]).split("_")[-1] content_split = content.split("NNNN") for iter_content in content_split: iter_split = iter_content.split("\n") if len(iter_split) < 6: continue # Format date found_date = False for line in iter_split: if 'UZNT13' in line: date_string = line.split()[2] found_date = True datestamp = dt.strptime(strdate, '%Y%m%d') datestamp = datestamp.replace(day=int(date_string[:2]), hour=int( date_string[2:4]), minute=int(date_string[4:6])) # Decode dropsondes if not found_date: continue try: missionname, tmp = decode_dropsonde( iter_content, date=datestamp) except: continue testkeys = ('lat', 'lon') filecount += 1 if self.data is None: self.data = [copy.copy(tmp)] elif [tmp[k] for k in testkeys] not in [[d[k] for k in testkeys] for d in self.data]: self.data.append(tmp) else: pass print(f'--> Completed reading in recon dropsonde files ({(dt.now()-timer_start).total_seconds():.1f} seconds)' + f'\nRead {filecount} files') try: self._recenter() self.keys = sorted( list(set([k for d in self.data for k in d.keys()]))) except: self.keys = []
[docs] def update(self): r""" Update with the latest data for an ongoing storm. Notes ----- This function has no return value, but simply updates the internal dropsonde data with new observations since the object was created. """ newobj = dropsondes(storm=self.storm, data=self.data, update=True) return newobj
def _recenter(self): data = copy.copy(self.data) # Get x,y distance of each ob from coinciding interped center position for stage in ('TOP', 'BOTTOM'): # Interpolate center position to time of each ob interp_clon, interp_clat = self.storm.recon.get_track( [d[f'{stage}time'] for d in data]) # Get x,y distance of each ob from coinciding interped center position for i, d in enumerate(data): d.update({f'{stage}xdist': great_circle((interp_clat[i], interp_clon[i]), (interp_clat[i], d[f'{stage}lon'])).kilometers * [1, -1][int(d[f'{stage}lon'] < interp_clon[i])]}) d.update({f'{stage}ydist': great_circle((interp_clat[i], interp_clon[i]), (d[f'{stage}lat'], interp_clon[i])).kilometers * [1, -1][int(d[f'{stage}lat'] < interp_clat[i])]}) d.update({f'{stage}distance': ( d[f'{stage}xdist']**2 + d[f'{stage}ydist']**2)**.5}) # print('Completed dropsonde center-relative coordinates') self.data = data
[docs] def isel(self, index): r""" Select a single dropsonde by index of the list. Parameters ---------- index : int Integer containing the index of the dropsonde. Returns ------- dropsondes Instance of Dropsondes for the single requested dropsonde. """ NEW_DATA = copy.copy(self.data) NEW_DATA = [NEW_DATA[index]] NEW_OBJ = dropsondes(storm=self.storm, data=NEW_DATA) return NEW_OBJ
[docs] def sel(self, mission=None, time=None, domain=None, location=None, top=None, slp=None, MBLspd=None, WL150spd=None, DLMspd=None): r""" Select a subset of dropsondes by any of its parameters and return a new dropsondes object. Parameters ---------- mission : str Mission name (number + storm id), e.g. mission 7 for AL05 is '0705L' time : list/tuple of datetimes list/tuple of start time and end time datetime objects. Default is None, which returns all points domain : dict dictionary with keys 'n', 's', 'e', 'w' corresponding to boundaries of domain. location : str Location of dropsonde. Can be "eyewall" or "center". top : tuple Tuple containing range of pressures (in hPa) of the top of the dropsonde level. slp : tuple Tuple containing range of pressures (in hPa) of the bottom of the dropsonde near surface. Returns ------- dropsondes A new dropsondes object that satisfies the intersection of all subsetting. """ NEW_DATA = copy.copy(pd.DataFrame(self.data)) # Apply mission filter if mission is not None: mission = str(mission) NEW_DATA = NEW_DATA.loc[NEW_DATA['mission'] == mission] # Apply time filter if time is not None: try: if isinstance(time, (tuple, list)): bounds = get_bounds(NEW_DATA['TOPtime'], time) NEW_DATA = NEW_DATA.loc[(NEW_DATA['TOPtime'] >= bounds[0]) & ( NEW_DATA['TOPtime'] <= bounds[1])] else: i = np.argmin(abs(time - NEW_DATA['TOPtime'])) return self.isel(i) except: if isinstance(time, (tuple, list)): bounds = get_bounds(NEW_DATA['BOTTOMtime'], time) NEW_DATA = NEW_DATA.loc[(NEW_DATA['BOTTOMtime'] >= bounds[0]) & ( NEW_DATA['BOTTOMtime'] <= bounds[1])] else: i = np.argmin(abs(time - NEW_DATA['BOTTOMtime'])) return self.isel(i) # Apply domain filter if domain is not None: tmp = {k[0].lower(): v for k, v in domain.items()} domain = {'n': 90, 's': -90, 'e': 359.99, 'w': 0} domain.update(tmp) bounds = get_bounds( NEW_DATA['lon'] % 360, (domain['w'] % 360, domain['e'] % 360)) NEW_DATA = NEW_DATA.loc[(NEW_DATA['lon'] % 360 >= bounds[0]) & ( NEW_DATA['lon'] % 360 <= bounds[1])] bounds = get_bounds(NEW_DATA['lat'], (domain['s'], domain['n'])) NEW_DATA = NEW_DATA.loc[(NEW_DATA['lat'] >= bounds[0]) & ( NEW_DATA['lat'] <= bounds[1])] # Apply location filter if location is not None: NEW_DATA = NEW_DATA.loc[NEW_DATA['location'] == location.upper()] # Apply top standard level filter if top is not None: bounds = get_bounds(NEW_DATA['top'], top) NEW_DATA = NEW_DATA.loc[(NEW_DATA['top'] >= bounds[0]) & ( NEW_DATA['top'] <= bounds[1])] # Apply surface pressure filter if slp is not None: bounds = get_bounds(NEW_DATA['slp'], slp) NEW_DATA = NEW_DATA.loc[(NEW_DATA['slp'] >= bounds[0]) & ( NEW_DATA['slp'] <= bounds[1])] # Apply MBL wind speed filter if MBLspd is not None: bounds = get_bounds(NEW_DATA['MBLspd'], MBLspd) NEW_DATA = NEW_DATA.loc[(NEW_DATA['MBLspd'] >= bounds[0]) & ( NEW_DATA['MBLspd'] <= bounds[1])] # Apply DLM wind speed filter if DLMspd is not None: bounds = get_bounds(NEW_DATA['DLMspd'], DLMspd) NEW_DATA = NEW_DATA.loc[(NEW_DATA['DLMspd'] >= bounds[0]) & ( NEW_DATA['DLMspd'] <= bounds[1])] # Apply WL150 wind speed filter if WL150spd is not None: bounds = get_bounds(NEW_DATA['WL150spd'], WL150spd) NEW_DATA = NEW_DATA.loc[(NEW_DATA['WL150spd'] >= bounds[0]) & ( NEW_DATA['WL150spd'] <= bounds[1])] NEW_OBJ = dropsondes(storm=self.storm, data=list( NEW_DATA.T.to_dict().values())) return NEW_OBJ
[docs] def to_pickle(self, filename): r""" Save dropsonde data (list of dictionaries) to a pickle file Parameters ---------- filename : str name of file to save pickle file to. Notes ----- This method saves the dropsondes data as a pickle within the current working directory, given a filename as an argument. For example, assume ``dropsondes`` was retrieved from a Storm object (using the first method described in the ``dropsondes`` class documentation). The dropsondes data would be saved to a pickle file as follows: >>> storm.recon.dropsondes.to_pickle(data="mystorm_dropsondes.pickle") Now the dropsondes data is saved locally, and next time recon data for this storm needs to be analyzed, this allows to bypass re-reading the dropsondes data from the NHC server by providing the pickle file as an argument: >>> storm.recon.get_dropsondes(data="mystorm_dropsondes.pickle") """ with open(filename, 'wb') as f: pickle.dump(self.data, f)
[docs] def plot_points(self, varname='slp', level=None, domain="dynamic", ax=None, cartopy_proj=None, **kwargs): r""" Creates a plot of dropsonde data points. Parameters ---------- varname : str Variable to plot. Can be one of the keys in the dropsonde dictionary (retrieved using ``recon.dropsondes.data``). level : int, optional Pressure level (in hPa) to plot varname for. Only valid if varname is in "pres", "hgt", "temp", "dwpt", "wdir", "wspd". domain : str/dict Domain for the plot. Default is "dynamic". Please refer to :ref:`options-domain` for available domain options. ax : axes Instance of axes to plot on. If none, one will be generated. Default is none. cartopy_proj : ccrs Instance of a cartopy projection to use. If none, one will be generated. Default is none. Other Parameters ---------------- prop : dict Customization properties of recon plot. Please refer to :ref:`options-prop-recon-plot` for available options. map_prop : dict Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options. Returns ------- ax Instance of axes containing the plot is returned. Notes ----- To retrieve all possible varnames, check the ``data`` attribute of this Dropsonde object. For example, if ReconDataset was retrieved through a Storm object as in the example below, the possible varnames would be retrieved as follows: .. code-block:: python from tropycal import tracks #Get dataset object basin = tracks.TrackDataset() #Get storm object storm = basin.get_storm(('michael',2018)) #Get dropsondes for this storm storm.recon.get_dropsondes() #Retrieve list of all possible varnames print(storm.recon.dropsondes.data) """ # Pop kwargs prop = kwargs.pop('prop', {}) map_prop = kwargs.pop('map_prop', {}) # Get plot data if level is not None: plotdata = [m['levels'].loc[m['levels']['pres'] == level][varname].to_numpy()[0] if 'levels' in m.keys() and level in m['levels']['pres'].to_numpy() else np.nan for m in self.data] else: plotdata = [m[varname] if varname in m.keys() else np.nan for m in self.data] # Make sure data doesn't have NaNs check_data = [m['BOTTOMlat'] for m in self.data if not np.isnan(m['BOTTOMlat'])] if len(check_data) == 0: dfRecon = pd.DataFrame.from_dict({'time': [m['TOPtime'] for m in self.data], 'lat': [m['TOPlat'] for m in self.data], 'lon': [m['TOPlon'] for m in self.data], varname: plotdata}) else: dfRecon = pd.DataFrame.from_dict({'time': [m['BOTTOMtime'] for m in self.data], 'lat': [m['BOTTOMlat'] for m in self.data], 'lon': [m['BOTTOMlon'] for m in self.data], varname: plotdata}) # Create instance of plot object self.plot_obj = ReconPlot() # Create cartopy projection if cartopy_proj is None: self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=0.0) cartopy_proj = self.plot_obj.proj # Plot recon plot_ax = self.plot_obj.plot_points(self.storm, dfRecon, domain, varname=( varname, level), ax=ax, prop=prop, map_prop=map_prop) # Return axis return plot_ax
[docs] def plot_skewt(self, time=None): r""" Plot a Skew-T chart for selected dropsondes. Parameters ---------- time : datetime.datetime, optional Time closest to requested Skew-T. If none, all dropsondes will plot. Returns ------- list Returns a list of figures, or a single figure for a single plot. """ storm_data = self.storm.dict if time is None: dict_list = self.data else: dict_list = self.sel(time=time).data # Format storm name storm_data = self.storm.dict type_array = np.array(storm_data['type']) idx = np.where((type_array == 'SD') | (type_array == 'SS') | (type_array == 'TD') | ( type_array == 'TS') | (type_array == 'HU') | (type_array == 'TY') | (type_array == 'ST')) if ('invest' in storm_data.keys() and not storm_data['invest']) or len(idx[0]) > 0: tropical_vmax = np.array(storm_data['vmax'])[idx] add_ptc_flag = False if len(tropical_vmax) == 0: add_ptc_flag = True idx = np.where((type_array == 'LO') | (type_array == 'DB')) tropical_vmax = np.array(storm_data['vmax'])[idx] subtrop = classify_subtropical(np.array(storm_data['type'])) peak_idx = storm_data['vmax'].index(np.nanmax(tropical_vmax)) peak_basin = storm_data['wmo_basin'][peak_idx] storm_type = get_storm_classification( np.nanmax(tropical_vmax), subtrop, peak_basin) if add_ptc_flag: storm_type = "Potential Tropical Cyclone" title_string = f'{storm_type} {storm_data["name"]}\nDropsonde DDD, Mission MMM' # Plot Skew-T return plot_skewt(dict_list, title_string)
[docs]class vdms: r""" Creates an instance of a VDMs object containing all Vortex Data Message (VDM) data for a single storm. Parameters ---------- storm : tropycal.tracks.Storm Requested storm. data : str, optional Filepath of pickle file containing VDM data retrieved from ``vdms.to_pickle()``. If provided, data will be retrieved from the local pickle file instead of the NHC server. update : bool True = search for new data, following existing data in the dropsonde object, and concatenate. Returns ------- Dataset An instance of VDMs. Notes ----- .. warning:: Recon data is currently only available from 1989 onwards. VDM data is currently retrieved from the National Hurricane Center from 2006 onwards, and UCAR from 1989 through 2005. There are two recommended ways of retrieving a vdms object. Since the ``ReconDataset``, ``hdobs``, ``dropsondes`` and ``vdms`` classes are **storm-centric**, a Storm object is required for both methods. .. code-block:: python #Retrieve Hurricane Michael (2018) from TrackDataset basin = tracks.TrackDataset() storm = basin.get_storm(('michael',2018)) The first method is to use the empty instance of ReconDataset already initialized in the Storm object, which has a ``get_vdms()`` method thus allowing all of the vdms attributes and methods to be accessed from the Storm object. As a result, a Storm object does not need to be provided as an argument. .. code-block:: python #Retrieve all VDMs for this storm storm.recon.get_vdms() #Retrieve the raw VDM data storm.recon.vdms.data #Use the plot_points() method of hdobs storm.recon.vdms.plot_points() The second method is to use the vdms class independently of the other recon classes: .. code-block:: python from tropycal.recon import vdms #Retrieve all VDMs for this storm, passing the Storm object as an argument vdms_obj = vdms(storm) #Retrieve the raw VDM data vdms_obj.data #Use the plot_points() method of vdms vdms_obj.plot_points() """ def __repr__(self): summary = ["<tropycal.recon.vdms>"] # Find maximum wind and minimum pressure time_range = (np.nanmin([i['time'] for i in self.data]), np.nanmax( [i['time'] for i in self.data])) time_range = list(set(time_range)) min_slp = np.nanmin([i['Minimum Sea Level Pressure (hPa)'] for i in self.data]) min_slp = 'N/A' if np.isnan(min_slp) else min_slp missions = set([i['mission'] for i in self.data]) # Add general summary emdash = '\u2014' summary_keys = { 'Storm': f'{self.storm.name} {self.storm.year}', 'Missions': len(missions), 'VDMs': len(self.data), 'Min sea level pressure': f"{min_slp} hPa", 'Source': self.source } # Add dataset summary summary.append("Dataset Summary:") add_space = np.max([len(key) for key in summary_keys.keys()]) + 3 for key in summary_keys.keys(): key_name = key + ":" summary.append( f'{" "*4}{key_name:<{add_space}}{summary_keys[key]}') return "\n".join(summary) def __init__(self, storm, data=None, update=False): self.storm = storm self.source = 'National Hurricane Center (NHC)' if storm.year >= 2006: self.format = 1 if storm.basin == 'north_atlantic': archive_url = f'https://www.nhc.noaa.gov/archive/recon/{self.storm.year}/REPNT2/' else: archive_url = f'https://www.nhc.noaa.gov/archive/recon/{self.storm.year}/REPPN2/' elif storm.year >= 1989: self.format = 2 self.source = "UCAR's Tropical Cyclone Guidance Project (TCGP)" archive_url = f'http://hurricanes.ral.ucar.edu/structure/vortex/vdm_data/{self.storm.year}/' else: raise RuntimeError("Recon data is not available prior to 1989.") timestr = [f'{t:%Y%m%d}' for t in self.storm.dict['time']] # Retrieve list of files in URL and filter by storm dates page = requests.get(archive_url).text content = page.split("\n") files = [] for line in content: if ".txt" in line: files.append( ((line.split('txt">')[1]).split("</a>")[0]).split(".")) del content if self.format == 1: files = sorted([i for i in files if i[1][:8] in timestr], key=lambda x: x[1]) linksub = [archive_url + '.'.join(l) for l in files] elif self.format == 2: files = [f[0] for f in files] linksub = [archive_url + l for l in files if storm.name.upper() in l] self.data = [] if data is None: urllib3.disable_warnings() http = urllib3.PoolManager() filecount = 0 timer_start = dt.now() print( f'Searching through recon VDM files between {timestr[0]} and {timestr[-1]} ...') for link in linksub: response = http.request('GET', link) content = response.data.decode('utf-8') # Parse with NHC format if self.format == 1: try: date = link.split('.')[-2] date = dt(int(date[:4]), int( date[4:6]), int(date[6:8])) missionname, tmp = decode_vdm(content, date) except: continue testkeys = ('time', 'lat', 'lon') if missionname[2:5] == self.storm.operational_id[2:4] + self.storm.operational_id[0]: if self.data is None: self.data = [copy.copy(tmp)] filecount += 1 elif [tmp[k] for k in testkeys] not in [[d[k] for k in testkeys] for d in self.data]: self.data.append(tmp) filecount += 1 else: pass # Parse with UCAR format elif self.format == 2: content_split = content.split("URNT12") content_split = ['URNT12' + i for i in content_split] for iter_content in content_split: try: # Check for line length iter_split = iter_content.split("\n") if len(iter_split) < 10: continue # Check for date for line in iter_split: if line[:2] == 'A.': day = int((line[3:].split('/'))[0]) for iter_date in storm.dict['time']: found_date = False if iter_date.day == day: date = dt(iter_date.year, iter_date.month, iter_date.day) found_date = True break if not found_date: continue # Decode VDMs missionname, tmp = decode_vdm(iter_content, date) testkeys = ('time', 'lat', 'lon') if self.data is None: self.data = [copy.copy(tmp)] filecount += 1 elif [tmp[k] for k in testkeys] not in [[d[k] for k in testkeys] for d in self.data]: self.data.append(tmp) filecount += 1 else: pass except: continue print(f'--> Completed reading in recon VDM files ({(dt.now()-timer_start).total_seconds():.1f} seconds)' + f'\nRead {filecount} files') elif isinstance(data, str): with open(data, 'rb') as f: self.data = pickle.load(f) else: self.data = data self.keys = sorted(list(set([k for d in self.data for k in d.keys()])))
[docs] def update(self): r""" Update with the latest data for an ongoing storm. Notes ----- This function has no return value, but simply updates the internal VDM data with new observations since the object was created. """ newobj = vdms(storm=self.storm, data=self.data, update=True) return newobj
[docs] def isel(self, index): r""" Select a single VDM by index of the list. Parameters ---------- index : int Integer containing the index of the dropsonde. Returns ------- vdms Instance of VDMs for the single requested VDM. """ NEW_DATA = copy.copy(self.data) NEW_DATA = [NEW_DATA[index]] NEW_OBJ = vdms(storm=self.storm, data=NEW_DATA) return NEW_OBJ
[docs] def sel(self, mission=None, time=None, domain=None): r""" Select a subset of VDMs by any of its parameters and return a new vdms object. Parameters ---------- mission : str Mission name (number + storm id), e.g. mission 7 for AL05 is '0705L' time : list/tuple of datetimes list/tuple of start time and end time datetime objects. Default is None, which returns all points domain : dict dictionary with keys 'n', 's', 'e', 'w' corresponding to boundaries of domain Returns ------- vdms object A new vdms object that satisfies the intersection of all subsetting. """ NEW_DATA = copy.copy(pd.DataFrame(self.data)) # Apply mission filter if mission is not None: mission = str(mission) NEW_DATA = NEW_DATA.loc[NEW_DATA['mission'] == mission] # Apply time filter if time is not None: bounds = get_bounds(NEW_DATA['time'], time) NEW_DATA = NEW_DATA.loc[(NEW_DATA['time'] >= bounds[0]) & ( NEW_DATA['time'] <= bounds[1])] # Apply domain filter if domain is not None: tmp = {k[0].lower(): v for k, v in domain.items()} domain = {'n': 90, 's': -90, 'e': 359.99, 'w': 0} domain.update(tmp) bounds = get_bounds( NEW_DATA['lon'] % 360, (domain['w'] % 360, domain['e'] % 360)) NEW_DATA = NEW_DATA.loc[(NEW_DATA['lon'] % 360 >= bounds[0]) & ( NEW_DATA['lon'] % 360 <= bounds[1])] bounds = get_bounds(NEW_DATA['lat'], (domain['s'], domain['n'])) NEW_DATA = NEW_DATA.loc[(NEW_DATA['lat'] >= bounds[0]) & ( NEW_DATA['lat'] <= bounds[1])] NEW_OBJ = vdms(storm=self.storm, data=list( NEW_DATA.T.to_dict().values())) return NEW_OBJ
[docs] def to_pickle(self, filename): r""" Save VDM data (list of dictionaries) to a pickle file Parameters ---------- filename : str name of file to save pickle file to. Notes ----- This method saves the VDMs data as a pickle within the current working directory, given a filename as an argument. For example, assume ``vdms`` was retrieved from a Storm object (using the first method described in the ``vdms`` class documentation). The VDMs data would be saved to a pickle file as follows: >>> storm.recon.vdms.to_pickle(data="mystorm_vdms.pickle") Now the VDMs data is saved locally, and next time recon data for this storm needs to be analyzed, this allows to bypass re-reading the VDMs data from the NHC server by providing the pickle file as an argument: >>> storm.recon.get_vdms("mystorm_vdms.pickle") """ with open(filename, 'wb') as f: pickle.dump(self.data, f)
[docs] def plot_time_series(self, time=None, best_track=False, dots=True): r""" Creates a time series of MSLP VDM data. Parameters ---------- time : tuple, optional Tuple of start and end datetime.datetime objects for plot. If None, all times will be plotted. best_track : bool, optional If True, Best Track MSLP will be plotted alongside VDM MSLP. Default is False. dots : bool, optional If True, dots will be plotted for each VDM point. Default is True. Returns ------- ax Instance of axes containing the plot is returned. """ # Retrieve data storm_data = self.storm.dict data = self.data # Retrive data and subset by time if time is not None: times = [i['time'] for i in data if i['time'] >= time[0] and i['time'] <= time[1]] mslp = [i['Minimum Sea Level Pressure (hPa)'] for i in data if i['time'] >= time[0] and i['time'] <= time[1]] else: times = [i['time'] for i in data] mslp = [i['Minimum Sea Level Pressure (hPa)'] for i in data] # Create figure fig, ax = plt.subplots(figsize=(9, 6), dpi=200) ax.grid() # Plot VDM MSLP ax.plot(times, mslp, color='b', alpha=0.5, label='VDM MSLP (hPa)') if dots: ax.plot(times, mslp, 'o', color='b') # Retrieve & plot Best Track data if best_track: if time is not None: times_btk = [i for i in storm_data['time'] if i >= time[0] and i <= time[1]] mslp_btk = [storm_data['mslp'][i] for i in range(len( storm_data['mslp'])) if storm_data['time'][i] >= time[0] and storm_data['time'][i] <= time[1]] else: times_btk = [i for i in storm_data['time']] mslp_btk = [i for i in storm_data['mslp']] ax.plot(times_btk, mslp_btk, color='r', alpha=0.25, label='Best Track MSLP (hPa)') if dots: ax.plot(times_btk, mslp_btk, 'o', color='r', alpha=0.5) # Add labels ax.set_ylabel("MSLP (hPa)") ax.set_xlabel("Vortex Data Message time (UTC)") # Add time labels times_use = [] start_time = times[0].replace(hour=0) total_days = (times[-1] - start_time).total_seconds() / 86400 increment_hour = 6 if total_days > 3: increment_hour = 12 if total_days > 6: increment_hour = 24 while start_time <= (times[-1] + timedelta(hours=increment_hour)): times_use.append(start_time) start_time += timedelta(hours=increment_hour) ax.set_xticks(times_use) ax.set_xlim(times[0] - timedelta(hours=6), times[-1] + timedelta(hours=6)) ax.xaxis.set_major_formatter(mdates.DateFormatter('%H UTC\n%b %d')) # Add titles type_array = np.array(storm_data['type']) idx = np.where((type_array == 'SD') | (type_array == 'SS') | (type_array == 'TD') | ( type_array == 'TS') | (type_array == 'HU') | (type_array == 'TY') | (type_array == 'ST')) if ('invest' in storm_data.keys() and not storm_data['invest']) or len(idx[0]) > 0: tropical_vmax = np.array(storm_data['vmax'])[idx] add_ptc_flag = False if len(tropical_vmax) == 0: add_ptc_flag = True idx = np.where((type_array == 'LO') | (type_array == 'DB')) tropical_vmax = np.array(storm_data['vmax'])[idx] subtrop = classify_subtropical(np.array(storm_data['type'])) peak_idx = storm_data['vmax'].index(np.nanmax(tropical_vmax)) peak_basin = storm_data['wmo_basin'][peak_idx] storm_type = get_storm_classification( np.nanmax(tropical_vmax), subtrop, peak_basin) if add_ptc_flag: storm_type = "Potential Tropical Cyclone" if best_track: ax.legend() ax.set_title( f'{storm_type} {storm_data["name"]}\nVDM Minimum Sea Level Pressure (hPa)', loc='left', fontweight='bold') return ax
[docs] def plot_points(self, varname='Minimum Sea Level Pressure (hPa)', domain="dynamic", ax=None, cartopy_proj=None, **kwargs): r""" Creates a plot of recon data points. Parameters ---------- varname : str Variable to plot. Currently the best option is "Minimum Sea Level Pressure (hPa)". domain : str Domain for the plot. Default is "dynamic". Please refer to :ref:`options-domain` for available domain options. ax : axes Instance of axes to plot on. If none, one will be generated. Default is none. cartopy_proj : ccrs Instance of a cartopy projection to use. If none, one will be generated. Default is none. Other Parameters ---------------- prop : dict Customization properties of recon plot. Please refer to :ref:`options-prop-recon-plot` for available options. map_prop : dict Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options. Returns ------- ax Instance of axes containing the plot is returned. """ # Pop kwargs prop = kwargs.pop('prop', {}) map_prop = kwargs.pop('map_prop', {}) # Get plot data plotdata = [m[varname] if varname in m.keys() else np.nan for m in self.data] dfRecon = pd.DataFrame.from_dict({'time': [m['time'] for m in self.data], 'lat': [m['lat'] for m in self.data], 'lon': [m['lon'] for m in self.data], varname: plotdata}) # Create instance of plot object self.plot_obj = ReconPlot() # Create cartopy projection if cartopy_proj is None: self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=0.0) cartopy_proj = self.plot_obj.proj # Plot recon plot_ax = self.plot_obj.plot_points( self.storm, dfRecon, domain, varname=varname, ax=ax, prop=prop, map_prop=map_prop) # Return axis return plot_ax