Source code for tropycal.tracks.season

r"""Functionality for storing and analyzing a year/season of cyclones."""

import numpy as np

# Import internal scripts
from .plot import TrackPlot
from .storm import Storm

# Import tools
from .tools import *
from ..utils import *
from .. import constants


[docs]class Season: r""" Initializes an instance of Season, retrieved via ``TrackDataset.get_season()``. Parameters ---------- season : dict Dict entry containing all storms within the requested season. info : dict Dict entry containing general information about the season. Returns ------- Season Instance of a Season object. """ def __setitem__(self, key, value): self.__dict__[key] = value def __getitem__(self, key): return self.__dict__[key] def __add__(self, new): # Add seasons # Ensure data sources and basins are the same if self.source_basin != new.source_basin: msg = 'Seasons can only be added for the same basin.' raise ValueError(msg) if self.source != new.source: msg = 'Seasons can only be added from the same source.' raise ValueError(msg) # Retrieve old & new dict entries dict_original = self.dict.copy() dict_new = new.dict.copy() # Retrieve copy of coordinates new_attrs = self.attrs.copy() # Add year to list of years if isinstance(self.attrs['year'], int): new_attrs['year'] = [self.year, new.year] else: new_attrs['year'].append(new.year) # Sort list of years new_attrs['year'] = (np.sort(new_attrs['year'])).tolist() # Update dict dict_original.update(dict_new) # Iterate over every year to create a new dict new_dict = {} for year in new_attrs['year']: for key in dict_original.keys(): if dict_original[key]['season'] == year: new_dict[key] = dict_original[key] # Return new Season object return Season(new_dict, new_attrs) def __repr__(self): # Label object summary = ["<tropycal.tracks.Season>"] # Format keys for summary season_summary = self.summary() summary_keys = { 'Total Storms': season_summary['season_storms'], 'Named Storms': season_summary['season_named'], 'Hurricanes': season_summary['season_hurricane'], 'Major Hurricanes': season_summary['season_major'], 'Season ACE': season_summary['season_ace'], } # Add season summary summary.append("Season Summary:") add_space = np.max([len(key) for key in summary_keys.keys()]) + 3 for key in summary_keys.keys(): key_name = key + ":" summary.append( f'{" "*4}{key_name:<{add_space}}{summary_keys[key]}') # Add additional information summary.append("\nMore Information:") add_space = np.max([len(key) for key in self.attrs.keys()]) + 3 for key in self.attrs.keys(): key_name = key + ":" summary.append(f'{" "*4}{key_name:<{add_space}}{self.attrs[key]}') return "\n".join(summary) def __init__(self, season, info): # Save the dict entry of the season self.dict = season # Add other attributes about the storm keys = info.keys() self.attrs = {} for key in keys: if not isinstance(info[key], list) and not isinstance(info[key], dict): self[key] = info[key] self.attrs[key] = info[key] if isinstance(info[key], list) and key == 'year': self[key] = info[key] self.attrs[key] = info[key]
[docs] def to_dataframe(self): r""" Converts the season dict into a pandas DataFrame object. Returns ------- `pandas.DataFrame` A pandas DataFrame object containing information about the season. """ # Try importing pandas try: import pandas as pd except ImportError as e: raise RuntimeError( "Error: pandas is not available. Install pandas in order to use this function.") from e # Get season info season_info = self.summary() season_info_keys = season_info['id'] # Set up empty dict for dataframe ds = { 'id': [], 'name': [], 'vmax': [], 'mslp': [], 'category': [], 'ace': [], 'start_time': [], 'end_time': [], 'start_lat': [], 'start_lon': [], } # Add every key containing a list into the dict keys = [k for k in self.dict.keys()] for key in keys: # Get tropical duration temp_type = np.array(self.dict[key]['type']) tropical_idx = np.where((temp_type == 'SS') | (temp_type == 'SD') | (temp_type == 'TD') | ( temp_type == 'TS') | (temp_type == 'HU') | (temp_type == 'TY') | (temp_type == 'ST')) if key in season_info_keys: sidx = season_info_keys.index(key) ds['id'].append(key) ds['name'].append(self.dict[key]['name']) ds['vmax'].append(season_info['max_wspd'][sidx]) ds['mslp'].append(season_info['min_mslp'][sidx]) ds['category'].append(season_info['category'][sidx]) ds['start_time'].append( np.array(self.dict[key]['time'])[tropical_idx][0]) ds['end_time'].append( np.array(self.dict[key]['time'])[tropical_idx][-1]) ds['start_lat'].append( np.array(self.dict[key]['lat'])[tropical_idx][0]) ds['start_lon'].append( np.array(self.dict[key]['lon'])[tropical_idx][0]) ds['ace'].append(np.round(season_info['ace'][sidx], 1)) # Convert entire dict to a DataFrame ds = pd.DataFrame(ds) # Return dataset return ds
[docs] def get_storm_id(self, storm): r""" Returns the storm ID (e.g., "AL012019") given the storm name and year. Parameters ---------- storm : tuple Tuple containing the storm name and year (e.g., ("Matthew",2016)). Returns ------- str or list If a single storm was found, returns a string containing its ID. Otherwise returns a list of matching IDs. """ # Error check if not isinstance(storm, tuple): raise TypeError("storm must be of type tuple.") if len(storm) != 2: raise ValueError( "storm must contain 2 elements, name (str) and year (int)") name, year = storm # Search for corresponding entry in keys keys_use = [] for key in self.dict.keys(): temp_year = self.dict[key]['year'] if temp_year == year: temp_name = self.dict[key]['name'] if temp_name == name.upper(): keys_use.append(key) # return key, or list of keys if len(keys_use) == 1: keys_use = keys_use[0] if len(keys_use) == 0: raise RuntimeError("Storm not found") return keys_use
[docs] def get_storm(self, storm): r""" Retrieves a Storm object for the requested storm. Parameters ---------- storm : str or tuple Requested storm. Can be either string of storm ID (e.g., "AL052019"), or tuple with storm name and year (e.g., ("Matthew",2016)). Returns ------- tropycal.tracks.Storm Object containing information about the requested storm, and methods for analyzing and plotting the storm. """ # Check if storm is str or tuple if isinstance(storm, str): key = storm elif isinstance(storm, tuple): key = self.get_storm_id((storm[0], storm[1])) else: raise RuntimeError( "Storm must be a string (e.g., 'AL052019') or tuple (e.g., ('Matthew',2016)).") # Retrieve key of given storm if isinstance(key, str): return Storm(self.dict[key]) else: error_message = ''.join([f"\n{i}" for i in key]) error_message = f"Multiple IDs were identified for the requested storm. Choose one of the following storm IDs and provide it as the 'storm' argument instead of a tuple:{error_message}" raise RuntimeError(error_message)
[docs] def plot(self, domain=None, ax=None, cartopy_proj=None, save_path=None, **kwargs): r""" Creates a plot of this season. Parameters ---------- domain : str Domain for the plot. Default is basin-wide. Please refer to :ref:`options-domain` for available domain options. ax : axes Instance of axes to plot on. If none, one will be generated. Default is none. cartopy_proj : ccrs Instance of a cartopy projection to use. If none, one will be generated. Default is none. save_path : str Relative or full path of directory to save the image in. If none, image will not be saved. Other Parameters ---------------- prop : dict Customization properties of storm track lines. Please refer to :ref:`options-prop` for available options. map_prop : dict Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options. Returns ------- ax Instance of axes containing the plot is returned. """ # Retrieve kwargs prop = kwargs.pop('prop', {}) map_prop = kwargs.pop('map_prop', {}) # Create instance of plot object self.plot_obj = TrackPlot() if self.basin in ['east_pacific', 'west_pacific', 'south_pacific', 'australia', 'all']: self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=180.0) else: self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=0.0) # Plot storm plot_ax = self.plot_obj.plot_season( self, domain, ax=ax, save_path=save_path, prop=prop, map_prop=map_prop) # Return axis return plot_ax
[docs] def summary(self): r""" Generates a summary for this season with various cumulative statistics. Returns ------- dict Dictionary containing various statistics about this season. """ # Determine if season object has a single or multiple seasons multi_season = isinstance(self.year, list) # Initialize dict with info about all of year's storms if not multi_season: summary_dict = { 'id': [], 'operational_id': [], 'name': [], 'max_wspd': [], 'min_mslp': [], 'category': [], 'ace': [], } else: summary_dict = { 'id': [[] for i in range(len(self.year))], 'operational_id': [[] for i in range(len(self.year))], 'name': [[] for i in range(len(self.year))], 'max_wspd': [[] for i in range(len(self.year))], 'min_mslp': [[] for i in range(len(self.year))], 'category': [[] for i in range(len(self.year))], 'ace': [[] for i in range(len(self.year))], 'seasons': self.year + [], 'season_start': [0 for i in range(len(self.year))], 'season_end': [0 for i in range(len(self.year))], 'season_storms': [0 for i in range(len(self.year))], 'season_named': [0 for i in range(len(self.year))], 'season_hurricane': [0 for i in range(len(self.year))], 'season_major': [0 for i in range(len(self.year))], 'season_ace': [0 for i in range(len(self.year))], 'season_subtrop_pure': [0 for i in range(len(self.year))], 'season_subtrop_partial': [0 for i in range(len(self.year))], } # Iterate over season(s) list_seasons = [self.year] if not multi_season else self.year + [] for season_idx, iter_season in enumerate(list_seasons): # Search for corresponding entry in keys count_ss_pure = 0 count_ss_partial = 0 iterate_id = 1 for key in self.dict.keys(): # Skip if using multi-season object and storm is outside of this season if multi_season and self.dict[key]['season'] != iter_season: continue # Retrieve info about storm, only in this basin temp_name = self.dict[key]['name'] temp_vmax = np.array(self.dict[key]['vmax']) temp_mslp = np.array(self.dict[key]['mslp']) temp_type = np.array(self.dict[key]['type']) temp_time = np.array(self.dict[key]['time']) temp_basin = np.array(self.dict[key]['wmo_basin']) temp_year = np.array([i.year for i in self.dict[key]['time']]) # Calculate ACE within basin temp_ace = 0.0 for ace_i, (i_time, i_vmax, i_basin, i_type) in enumerate(zip(temp_time, temp_vmax, temp_basin, temp_type)): if self.basin not in ['all', 'both'] and i_basin != self.basin: continue if i_time.strftime('%H%M') not in constants.STANDARD_HOURS: continue if i_type not in constants.NAMED_TROPICAL_STORM_TYPES: continue if self.basin == 'all' and i_time.year != self.year: continue if np.isnan(i_vmax): continue temp_ace += accumulated_cyclone_energy(i_vmax) temp_ace = np.round(temp_ace, 1) # Get indices of all tropical/subtropical time steps if self.basin == 'all': idx = np.where(((temp_type == 'SS') | (temp_type == 'SD') | (temp_type == 'TD') | (temp_type == 'TS') | ( temp_type == 'HU') | (temp_type == 'TY') | (temp_type == 'ST')) & (temp_year == self.year)) elif self.basin == 'both': idx = np.where(((temp_type == 'SS') | (temp_type == 'SD') | (temp_type == 'TD') | ( temp_type == 'TS') | (temp_type == 'HU') | (temp_type == 'TY') | (temp_type == 'ST'))) else: idx = np.where(((temp_type == 'SS') | (temp_type == 'SD') | (temp_type == 'TD') | (temp_type == 'TS') | ( temp_type == 'HU') | (temp_type == 'TY') | (temp_type == 'ST')) & (temp_basin == self.basin)) # Get times during existence of trop/subtrop storms if len(idx[0]) == 0: continue trop_time = temp_time[idx] if not multi_season: if 'season_start' not in summary_dict.keys(): summary_dict['season_start'] = trop_time[0] else: if trop_time[0] < summary_dict['season_start']: summary_dict['season_start'] = trop_time[0] if 'season_end' not in summary_dict.keys(): summary_dict['season_end'] = trop_time[-1] else: if trop_time[-1] > summary_dict['season_end']: summary_dict['season_end'] = trop_time[-1] else: if summary_dict['season_start'][season_idx] == 0: summary_dict['season_start'][season_idx] = trop_time[0] else: if trop_time[0] < summary_dict['season_start'][season_idx]: summary_dict['season_start'][season_idx] = trop_time[0] if summary_dict['season_end'][season_idx] == 0: summary_dict['season_end'][season_idx] = trop_time[-1] else: if trop_time[-1] > summary_dict['season_end'][season_idx]: summary_dict['season_end'][season_idx] = trop_time[-1] # Get max/min values and check for nan's np_wnd = np.array(temp_vmax[idx]) np_slp = np.array(temp_mslp[idx]) if len(np_wnd[~np.isnan(np_wnd)]) == 0: max_wnd = np.nan max_cat = -1 else: max_wnd = int(np.nanmax(temp_vmax[idx])) max_cat = wind_to_category(np.nanmax(temp_vmax[idx])) if len(np_slp[~np.isnan(np_slp)]) == 0: min_slp = np.nan else: min_slp = int(np.nanmin(temp_mslp[idx])) # Append to dict if not multi_season: summary_dict['id'].append(key) summary_dict['name'].append(temp_name) summary_dict['max_wspd'].append(max_wnd) summary_dict['min_mslp'].append(min_slp) summary_dict['category'].append(max_cat) summary_dict['ace'].append(temp_ace) summary_dict['operational_id'].append( self.dict[key]['operational_id']) else: summary_dict['id'][season_idx].append(key) summary_dict['name'][season_idx].append(temp_name) summary_dict['max_wspd'][season_idx].append(max_wnd) summary_dict['min_mslp'][season_idx].append(min_slp) summary_dict['category'][season_idx].append(max_cat) summary_dict['ace'][season_idx].append(temp_ace) summary_dict['operational_id'][season_idx].append( self.dict[key]['operational_id']) # Handle operational vs. non-operational storms # Check for purely subtropical storms if 'SS' in temp_type and True not in np.isin(temp_type, list(constants.TROPICAL_ONLY_STORM_TYPES)): count_ss_pure += 1 # Check for partially subtropical storms if 'SS' in temp_type: count_ss_partial += 1 # Add generic season info if not multi_season: narray = np.array(summary_dict['max_wspd']) narray = narray[~np.isnan(narray)] summary_dict['season_storms'] = len(narray[narray >= 0]) summary_dict['season_named'] = len(narray[narray >= 34]) summary_dict['season_hurricane'] = len(narray[narray >= 65]) summary_dict['season_major'] = len(narray[narray >= 100]) summary_dict['season_ace'] = np.round( np.sum(summary_dict['ace']), 1) summary_dict['season_subtrop_pure'] = count_ss_pure summary_dict['season_subtrop_partial'] = count_ss_partial else: narray = np.array(summary_dict['max_wspd'][season_idx]) narray = narray[~np.isnan(narray)] summary_dict['season_storms'][season_idx] = len( narray[narray >= 0]) summary_dict['season_named'][season_idx] = len( narray[narray >= 34]) summary_dict['season_hurricane'][season_idx] = len( narray[narray >= 65]) summary_dict['season_major'][season_idx] = len( narray[narray >= 100]) summary_dict['season_ace'][season_idx] = np.round( np.sum(summary_dict['ace'][season_idx]), 1) summary_dict['season_subtrop_pure'][season_idx] = count_ss_pure summary_dict['season_subtrop_partial'][season_idx] = count_ss_partial # Return object return summary_dict