Source code for tropycal.tracks.dataset

r"""Functionality for storing and analyzing an entire cyclone dataset."""

import re
import calendar
import numpy as np
import xarray as xr
import pandas as pd
import scipy.stats as stats
import urllib
import warnings
from datetime import datetime as dt, timedelta
from scipy.ndimage import gaussian_filter as gfilt
from matplotlib import path

# Import internal scripts
from ..plot import Plot
from .plot import TrackPlot
from .storm import Storm
from .season import Season
from ..tornado import *

# Import tools
from .tools import *
from ..utils import *
from .. import constants

# Import matplotlib
try:
    import matplotlib.lines as mlines
    import matplotlib.pyplot as plt
    import matplotlib.dates as mdates
except ImportError:
    warnings.warn(
        "Warning: Matplotlib is not installed in your python environment. Plotting functions will not work.")


[docs]class TrackDataset: r""" Creates an instance of a TrackDataset object containing various cyclone data. Parameters ---------- basin : str Ocean basin(s) to load data for. Can be any of the following: .. list-table:: :widths: 25 75 :header-rows: 1 * - Name - Source(s) * - "north_atlantic" - HURDAT2, IBTrACS * - "east_pacific" - HURDAT2, IBTrACS * - "both" - HURDAT2 ("north_atlantic" & "east_pacific" combined) * - "west_pacific" - IBTrACS * - "north_indian" - IBTrACS * - "south_indian" - IBTrACS * - "australia" - IBTrACS* (special case) * - "south_pacific" - IBTrACS * - "south_atlantic" - IBTrACS * - "all" - IBTrACS source : str Data source to read in. Default is HURDAT2. * **hurdat** - HURDAT2 data source for the North Atlantic and East/Central Pacific basins * **ibtracs** - ibtracs data source for regional or global data include_btk : bool, optional If True, the best track data from NHC for the most recent years where it doesn't exist in HURDAT2 will be added into the dataset. Valid for "north_atlantic" and "east_pacific" basins. Default is False. interpolate_data : bool, optional If True, interpolates all storm data to hourly. Default is False. Other Parameters ---------------- atlantic_url : str, optional URL containing the Atlantic HURDAT2 dataset. Can be changed to a local txt reference file. Default is retrieval from online URL. pacific_url : str, optional URL containing the Pacific HURDAT2 dataset. Can be changed to a local txt reference file. Default is retrieval from online URL. ibtracs_url : str, optional URL containing the ibtracs dataset. Can be changed to a local txt reference file. Can be a regional or all ibtracs file. If regional, the basin should match the argument basin provided earlier. Default is retrieval from online URL. catarina : bool, optional Modify the dataset to include cyclone track data for Cyclone Catarina (2004) from McTaggart-Cowan et al. (2006). Default is False. ibtracs_hurdat : bool, optional Replace ibtracs data for the North Atlantic and East/Central Pacific basins with HURDAT data. Default is False. ibtracs_mode : str, optional Mode of reading ibtracs in. Default is "jtwc". * **wmo** = official World Meteorological Organization data. Caveat is sustained wind methodology is inconsistent between basins. * **jtwc** = default. Unofficial data from the Joint Typhoon Warning Center. Caveat is some storms are missing and some storm data is inaccurate. * **jtwc_neumann** = JTWC data modified with the Neumann reanalysis for the Southern Hemisphere. Improves upon some storms (e.g., Cyclone Tracy 1974) while degrading others. Returns ------- Dataset An instance of Dataset. Notes ----- This object contains information about all storms in a basin, as well as methods to analyze the dataset and to retrieve individual storms from the dataset. The following block of code creates an instance of a TrackDataset() object and stores it in a variable called "basin": .. code-block:: python from tropycal import tracks basin = tracks.TrackDataset() With an instance of TrackDataset created, any of the methods listed below can be accessed via the "basin" variable: .. code-block:: python storm = basin.get_storm(("katrina",2005)) For IBTrACS datasets, please refer to :ref:`ibtracs-caveats` for pros and cons of each mode of IBTrACS data available. .. note:: 1. If using ``basin="both"``, this combines the North Atlantic and East/Central Pacific HURDATv2 data into a single TrackDataset object. As of Tropycal v0.5, this now merges cross-basin storms (i.e., North Atlantic to East Pacific) which were reclassified with a new East Pacific ID into single Storm objects. 2. If using ``basin="australia", source="ibtracs"``, since IBTrACS doesn't provide an Australia-only basin file by default, this will fetch the full global IBTrACS data and filter storms to only those that existed in the Australia basin. """ def __repr__(self): summary = ["<tropycal.tracks.Dataset>"] # Find maximum wind and minimum pressure max_wind = int( np.nanmax([x for stormid in self.keys for x in self.data[stormid]['vmax']])) max_wind_name = "" min_mslp = int( np.nanmin([x for stormid in self.keys for x in self.data[stormid]['mslp']])) min_mslp_name = "" for key in self.keys[::-1]: array_vmax = np.array(self.data[key]['vmax']) array_mslp = np.array(self.data[key]['mslp']) if len(array_vmax[~np.isnan(array_vmax)]) > 0 and np.nanmax(array_vmax) == max_wind: max_wind_name = f"{self.data[key]['name'].title()} {self.data[key]['year']}" if len(array_mslp[~np.isnan(array_mslp)]) > 0 and np.nanmin(array_mslp) == min_mslp: min_mslp_name = f"{self.data[key]['name'].title()} {self.data[key]['year']}" # Add general summary emdash = '\u2014' summary_keys = { 'Basin': self.basin, 'Source': self.source + [', ' + self.ibtracs_mode, ''][self.source == 'hurdat'], 'Number of storms': len(self.keys), 'Maximum wind': f"{max_wind} knots ({max_wind_name})", 'Minimum pressure': f"{min_mslp} hPa ({min_mslp_name})", 'Year range': f"{self.data[self.keys[0]]['year']} {emdash} {self.data[self.keys[-1]]['year']}", } # Add dataset summary summary.append("Dataset Summary:") add_space = np.max([len(key) for key in summary_keys.keys()]) + 3 for key in summary_keys.keys(): key_name = key + ":" summary.append( f'{" "*4}{key_name:<{add_space}}{summary_keys[key]}') return "\n".join(summary) def __init__(self, basin='north_atlantic', source='hurdat', include_btk=False, interpolate_data=False, **kwargs): # kwargs atlantic_url = kwargs.pop('atlantic_url', 'fetch') pacific_url = kwargs.pop('pacific_url', 'fetch') ibtracs_url = kwargs.pop( 'ibtracs_url', 'https://www.ncei.noaa.gov/data/international-best-track-archive-for-climate-stewardship-ibtracs/v04r00/access/csv/ibtracs.(basin).list.v04r00.csv') ibtracs_mode = kwargs.pop('ibtracs_mode', 'jtwc') catarina = kwargs.pop('catarina', False) ibtracs_hurdat = kwargs.pop('ibtracs_hurdat', False) # Error check if ibtracs_mode not in ['wmo', 'jtwc', 'jtwc_neumann']: raise ValueError( "ibtracs_mode must be either 'wmo', 'jtwc', or 'jtwc_neumann'") # Get latest HURDATv2 files if requested fetch_condition_1 = basin in constants.NHC_BASINS and source == 'hurdat' if fetch_condition_1 or ibtracs_hurdat: condition_1 = basin == 'north_atlantic' and atlantic_url == 'fetch' condition_2 = basin == 'east_pacific' and pacific_url == 'fetch' condition_3 = basin in ['both','all'] and ( atlantic_url == 'fetch' or pacific_url == 'fetch') if condition_1 or condition_2 or condition_3: atlantic_url, pacific_url = find_latest_hurdat_files() # Store input arguments self.proj = None # for plotting self.basin = basin.lower() self.atlantic_url = str(atlantic_url) self.pacific_url = str(pacific_url) self.ibtracs_url = str(ibtracs_url) self.source = source # Modification flags self.catarina = catarina self.ibtracs_mode = ibtracs_mode if ibtracs_mode == 'jtwc_neumann': self.neumann = True else: self.neumann = False # initialize empty dict self.data = {} # Read in from specified data source if source == 'hurdat': self.__read_hurdat() elif source == 'ibtracs': self.__read_ibtracs() else: raise RuntimeError( "Accepted values for 'source' are 'hurdat' or 'ibtracs'") # Replace ibtracs with hurdat for atl/pac basins if source == 'ibtracs' and ibtracs_hurdat: if self.basin in ['north_atlantic', 'east_pacific']: self.__read_hurdat() elif self.basin == 'all': self.basin = 'both' self.__read_hurdat(override_basin=True) self.basin = 'all' # Read in best track data if include_btk and basin in ['north_atlantic', 'east_pacific', 'both']: self.__read_btk() # Delete duplicate entries check = [] check_ids = [] keys = [k for k in self.data.keys()] for key in keys: if self.data[key]['name'].lower() == 'unnamed': continue check_id = f"{self.data[key]['name']},{self.data[key]['year']},{self.data[key]['time'][0].month}" if check_id not in check: check.append(check_id) check_ids.append(key) else: existing_id = check_ids[check.index(check_id)] if len(self.data[key]['vmax']) > len(self.data[existing_id]['vmax']): del self.data[existing_id] check_ids.pop(check_ids.index(existing_id)) check_ids.append(key) else: del self.data[key] # Join storms for atlantic-pacific crossovers if self.basin == 'both': join_keys = [['AL081993', 'EP141993'], ['AL181971', 'EP151971'], ['AL141974', 'EP151974'], [ 'AL161978', 'EP151978'], ['AL111988', 'EP131988'], ['AL031996', 'EP071996']] for key in join_keys: # Append East Pacific data to Atlantic data for idx, i_time in enumerate(self.data[key[1]]['time']): if i_time in self.data[key[0]]['time']: continue for var in [i for i in self.data[key[1]].keys() if isinstance(self.data[key[1]][i], list)]: self.data[key[0]][var].append( self.data[key[1]][var][idx]) if i_time.strftime('%H%M') in constants.STANDARD_HOURS and self.data[key[1]]['type'][idx] in constants.NAMED_TROPICAL_STORM_TYPES: self.data[key[0]]['ace'] += accumulated_cyclone_energy( self.data[key[1]]['vmax'][idx]) # Rename storm if needed if self.data[key[1]]['name'].lower() == 'unnamed' or np.nanmax(self.data[key[1]]['vmax']) < 35: pass else: self.data[key[0] ]['name'] = f"{self.data[key[0]]['name']}-{self.data[key[1]]['name']}" # Remove Pacific storm from data del self.data[key[1]] # Add keys of all storms to object keys = self.data.keys() self.keys = [k for k in keys] # Create array of zero-ones for existence of tornado data for a given storm self.keys_tors = [0 for key in self.keys] # Add dict to store all storm-specific tornado data in self.data_tors = {} # If interpolate_data, interpolate each storm and save to dictionary. self.data_interp = {} if interpolate_data: self.__interpolate_storms(self.keys) # Round ACE for all entries for key in self.keys: self.data[key]['ace'] = round(self.data[key]['ace'], 4) # --------------------------------------------------------------- # Find maximum wind and minimum pressure max_wind = int( np.nanmax([x for stormid in self.keys for x in self.data[stormid]['vmax']])) max_wind_name = "" min_mslp = int( np.nanmin([x for stormid in self.keys for x in self.data[stormid]['mslp']])) min_mslp_name = "" for key in self.keys[::-1]: array_vmax = np.array(self.data[key]['vmax']) array_mslp = np.array(self.data[key]['mslp']) if len(array_vmax[~np.isnan(array_vmax)]) > 0 and np.nanmax(array_vmax) == max_wind: max_wind_tuple = ( self.data[key]['name'], self.data[key]['year']) if len(array_mslp[~np.isnan(array_mslp)]) > 0 and np.nanmin(array_mslp) == min_mslp: min_mslp_tuple = ( self.data[key]['name'], self.data[key]['year']) # Add attributes self.attrs = { 'basin': self.basin, 'source': self.source, 'ibtracs_mode': self.ibtracs_mode if self.source == 'ibtracs' else '', 'start_year': self.data[self.keys[0]]['year'], 'end_year': self.data[self.keys[-1]]['year'], 'max_wind': max_wind_tuple, 'min_mslp': min_mslp_tuple, } def __read_hurdat(self, override_basin=False): r""" Reads in HURDATv2 data into the Dataset object. """ # Time duration to read in HURDAT start_time = dt.now() print("--> Starting to read in HURDAT2 data") # Quick error check atl_online = True pac_online = True fcheck = "https://www.nhc.noaa.gov/data/hurdat/" fcheck2 = "https://www.aoml.noaa.gov/hrd/hurdat/" if fcheck not in self.atlantic_url and fcheck2 not in self.atlantic_url: if "http" in self.atlantic_url: raise RuntimeError("URL provided is not via NHC or HRD") else: atl_online = False if fcheck not in self.pacific_url and fcheck2 not in self.pacific_url: if "http" in self.pacific_url: raise RuntimeError("URL provided is not via NHC or HRD") else: pac_online = False # Check if basin is valid if self.basin.lower() not in ['north_atlantic', 'east_pacific', 'both']: raise RuntimeError( "Only valid basins are 'north_atlantic', 'east_pacific' or 'both'") def read_hurdat(path, flag): if flag: content = read_url(path) else: f = open(path, "r") content = f.readlines() content = [(i.replace(" ", "")).split(",") for i in content] f.close() return content # read in HURDAT2 file from URL if self.basin == 'north_atlantic': content = read_hurdat(self.atlantic_url, atl_online) elif self.basin == 'east_pacific': content = read_hurdat(self.pacific_url, pac_online) elif self.basin == 'both': content = read_hurdat(self.atlantic_url, atl_online) content += read_hurdat(self.pacific_url, pac_online) # keep current storm ID for iteration current_id = "" # iterate through every line for line in content: # Skip if line is empty if len(line) < 2: continue if line[0][0] == "<": continue # identify if this is a header for a storm or content of storm if line[0][0] in ['A', 'C', 'E']: # Determine basin add_basin = 'north_atlantic' if line[0][0] == 'C': add_basin = 'east_pacific' elif line[0][0] == 'E': add_basin = 'east_pacific' if override_basin: add_basin = 'all' # add empty entry into dict self.data[line[0]] = {'id': line[0], 'operational_id': '', 'name': line[1], 'year': int( line[0][4:]), 'season': int(line[0][4:]), 'basin': add_basin, 'source_info': 'NHC Hurricane Database'} self.data[line[0]]['source'] = self.source current_id = line[0] # add empty lists for val in ['time', 'extra_obs', 'special', 'type', 'lat', 'lon', 'vmax', 'mslp', 'wmo_basin']: self.data[line[0]][val] = [] self.data[line[0]]['ace'] = 0.0 # if not a header, enter storm info into its dict entry else: # Retrieve important info about storm yyyymmdd, hhmm, special, storm_type, lat, lon, vmax, mslp = line[0:8] # Check time doesn't already exist in dict time = dt.strptime(yyyymmdd + hhmm, '%Y%m%d%H%M') if time in self.data[current_id]['time']: # Hard-code fix if current_id == "AL151966" and yyyymmdd == "19661004": time = dt.strptime("19661006" + hhmm, '%Y%m%d%H%M') else: continue # Parse into format to be entered into dict if "N" in lat: lat = round(float(lat.split("N")[0]), 1) elif "S" in lat: lat = round(float(lat.split("S")[0]), 1) * -1.0 if "W" in lon: lon = round(float(lon.split("W")[0]), 1) * -1.0 elif "E" in lon: lon = round(float(lon.split("E")[0]), 1) vmax = int(vmax) mslp = int(mslp) # Fix longitude for Atlantic storms east of the prime meridian if add_basin == 'north_atlantic' and lon < -180: lon += 360.0 # Handle missing data if vmax < 0: vmax = np.nan if mslp < 800: mslp = np.nan # Handle off-hour obs if hhmm in constants.STANDARD_HOURS: self.data[current_id]['extra_obs'].append(0) else: self.data[current_id]['extra_obs'].append(1) # Append into dict self.data[current_id]['time'].append(time) self.data[current_id]['special'].append(special) self.data[current_id]['type'].append(storm_type) self.data[current_id]['lat'].append(lat) self.data[current_id]['lon'].append(lon) self.data[current_id]['vmax'].append(vmax) self.data[current_id]['mslp'].append(mslp) # Add basin previous_basin = add_basin + '' if len(self.data[current_id]['wmo_basin']) > 0: previous_basin = self.data[current_id]['wmo_basin'][-1] if previous_basin not in constants.NHC_BASINS: previous_basin = 'east_pacific' self.data[current_id]['wmo_basin'].append(get_basin(lat, lon, previous_basin)) # Calculate ACE & append to storm total if not np.isnan(vmax): ace = accumulated_cyclone_energy(vmax) if hhmm in constants.STANDARD_HOURS and storm_type in constants.NAMED_TROPICAL_STORM_TYPES: self.data[current_id]['ace'] += np.round(ace, 4) # Account for operationally unnamed storms current_year = 0 current_year_id = 1 for key in self.data.keys(): storm_data = self.data[key] storm_name = storm_data['name'] storm_year = storm_data['year'] storm_vmax = storm_data['vmax'] storm_id = storm_data['id'] # Get max wind for storm np_wnd = np.array(storm_vmax) if len(np_wnd[~np.isnan(np_wnd)]) == 0: max_wnd = np.nan else: max_wnd = int(np.nanmax(storm_vmax)) # Fix current year if current_year == 0: current_year = storm_year else: if storm_year != current_year: current_year = storm_year current_year_id = 1 # special fix for 1992 in Atlantic if current_year == 1992 and self.data[current_id]['basin'] == 'north_atlantic': current_year_id = 2 # Estimate operational storm ID (which sometimes differs from HURDAT2 ID) blocked_list = ['EP072020'] increment_but_pass = [] if storm_name == 'UNNAMED' and max_wnd != np.nan and max_wnd >= 34 and storm_id not in blocked_list: if storm_id in increment_but_pass: current_year_id += 1 pass elif storm_id[0:2] == 'CP': self.data[key]['operational_id'] = storm_id + '' else: # Skip potential TCs if f"{storm_id[0:2]}{num_to_str2(current_year_id)}{storm_year}" != storm_id and storm_year >= 2017: current_year_id += 1 self.data[key][ 'operational_id'] = f"{storm_id[0:2]}{num_to_str2(current_year_id)}{storm_year}" current_year_id += 1 # Swap operational storm IDs, if necessary swap_list = ['EP101994', 'EP111994'] swap_pair = ['EP111994', 'EP101994'] if self.data[key]['operational_id'] in swap_list: swap_idx = swap_list.index(self.data[key]['operational_id']) self.data[key]['operational_id'] = swap_pair[swap_idx] # Determine time elapsed time_elapsed = dt.now() - start_time tsec = str(round(time_elapsed.total_seconds(), 2)) print(f"--> Completed reading in HURDAT2 data ({tsec} seconds)") def __read_btk(self): r""" Reads in best track data into the Dataset object. """ # Time duration to read in best track start_time = dt.now() print("--> Starting to read in best track data") # Get range of years missing start_year = self.data[([k for k in self.data.keys()])[-1]]['year'] + 1 end_year = (dt.now()).year # Get list of files in online directory use_ftp = False try: urlpath = urllib.request.urlopen( 'https://ftp.nhc.noaa.gov/atcf/btk/') string = urlpath.read().decode('utf-8') except: use_ftp = True urlpath = urllib.request.urlopen( 'ftp://ftp.nhc.noaa.gov/atcf/btk/') string = urlpath.read().decode('utf-8') # Get relevant filenames from directory files = [] files_years = [] for iyear in range(start_year, end_year + 1): if self.basin == 'north_atlantic': search_pattern = f'bal[01234][0123456789]{iyear}.dat' elif self.basin == 'east_pacific': search_pattern = f'b[ec]p[01234][0123456789]{iyear}.dat' elif self.basin == 'both': search_pattern = f'b[aec][lp][01234][0123456789]{iyear}.dat' pattern = re.compile(search_pattern) filelist = pattern.findall(string) for filename in filelist: if filename not in files: files.append(filename) if iyear not in files_years: files_years.append(iyear) # If no files are available, go into archive directory archive_years = [] for iyear in range(start_year, end_year): if iyear not in files_years: archive_years.append(iyear) # retrieve list of storms for that year from the archive path_season = urllib.request.urlopen( f'http://hurricanes.ral.ucar.edu/repository/data/bdecks_open/{iyear}/') string = path_season.read().decode('utf-8') nums = "[0123456789]" search_pattern = f'bal[0123]{nums}{iyear}.dat' pattern = re.compile(search_pattern) filelist = pattern.findall(string) for file in filelist: if file not in files: files.append(file) # For each file, read in file content and add to hurdat dict for file in files: # Get file ID stormid = ((file.split(".dat")[0])[1:]).upper() # Determine basin add_basin = 'north_atlantic' if stormid[0] == 'C': add_basin = 'east_pacific' elif stormid[0] == 'E': add_basin = 'east_pacific' # add empty entry into dict self.data[stormid] = {'id': stormid, 'operational_id': stormid, 'name': '', 'year': int(stormid[4:8]), 'season': int( stormid[4:8]), 'basin': add_basin, 'source_info': 'NHC Hurricane Database', 'source_method': "NHC's Automated Tropical Cyclone Forecasting System (ATCF)", 'source_url': "https://ftp.nhc.noaa.gov/atcf/btk/"} self.data[stormid]['source'] = self.source # add empty lists for val in ['time', 'extra_obs', 'special', 'type', 'lat', 'lon', 'vmax', 'mslp', 'wmo_basin']: self.data[stormid][val] = [] self.data[stormid]['ace'] = 0.0 # Read in file if use_ftp: url = f"ftp://ftp.nhc.noaa.gov/atcf/btk/{file}" else: url = f"https://ftp.nhc.noaa.gov/atcf/btk/{file}" if int(stormid[4:8]) in archive_years: url = f"http://hurricanes.ral.ucar.edu/repository/data/bdecks_open/{int(stormid[4:8])}/b{stormid.lower()}.dat" content = read_url(url) # iterate through file lines for line in content: if len(line) < 28: continue # Get time of obs time = dt.strptime(line[2], '%Y%m%d%H') date_hhmm = time.strftime('%H%M') if date_hhmm not in constants.STANDARD_HOURS: continue # Ensure obs aren't being repeated if time in self.data[stormid]['time']: continue # Get latitude into number btk_lat_temp = line[6].split("N")[0] btk_lat = float(btk_lat_temp) * 0.1 # Get longitude into number if "W" in line[7]: btk_lon_temp = line[7].split("W")[0] btk_lon = float(btk_lon_temp) * -0.1 elif "E" in line[7]: btk_lon_temp = line[7].split("E")[0] btk_lon = float(btk_lon_temp) * 0.1 # Get other relevant variables btk_wind = int(line[8]) btk_mslp = int(line[9]) btk_type = line[10] name = line[27] # Replace with NaNs if btk_wind > 250 or btk_wind < 10: btk_wind = np.nan if btk_mslp > 1040 or btk_mslp < 800: btk_mslp = np.nan # Add extra obs self.data[stormid]['extra_obs'].append(0) # Append into dict self.data[stormid]['time'].append(time) self.data[stormid]['special'].append('') self.data[stormid]['type'].append(btk_type) self.data[stormid]['lat'].append(round(btk_lat, 1)) self.data[stormid]['lon'].append(round(btk_lon, 1)) self.data[stormid]['vmax'].append(btk_wind) self.data[stormid]['mslp'].append(btk_mslp) # Add basin if self.basin == 'both': origin_basin = 'north_atlantic' if stormid[0:2] == 'AL' else 'east_pacific' else: origin_basin = self.basin + '' if self.basin == 'east_pacific': check_basin = get_basin( self.data[stormid]['lat'][0], self.data[stormid]['lon'][0], self.basin) if check_basin != self.basin: origin_basin = 'north_atlantic' self.data[stormid]['wmo_basin'].append( get_basin(btk_lat, btk_lon, origin_basin)) # Calculate ACE & append to storm total if not np.isnan(btk_wind): ace = accumulated_cyclone_energy(btk_wind) if btk_type in constants.NAMED_TROPICAL_STORM_TYPES: self.data[stormid]['ace'] += np.round(ace, 4) # Fix name for unnamed storms if name.upper() == 'INVEST' and True in np.isin(self.data[stormid]['type'], list(constants.TROPICAL_STORM_TYPES)): name = 'UNNAMED' # Add storm name self.data[stormid]['name'] = name # Determine time elapsed time_elapsed = dt.now() - start_time tsec = str(round(time_elapsed.total_seconds(), 2)) print(f"--> Completed reading in best track data ({tsec} seconds)") def __read_ibtracs(self): r""" Reads in ibtracs data into the Dataset object. """ # Time duration to read in ibtracs start_time = dt.now() print("--> Starting to read in ibtracs data") # Quick error check ibtracs_online = True fcheck = "https://www.ncei.noaa.gov/data/international-best-track-archive-for-climate-stewardship-ibtracs/" if fcheck not in self.ibtracs_url: if "http" in self.ibtracs_url: raise RuntimeError("URL provided is not via NCEI") else: ibtracs_online = False # convert to ibtracs basin basin_convert = {'all': 'ALL', 'east_pacific': 'EP', 'north_atlantic': 'NA', 'north_indian': 'NI', 'south_atlantic': 'SA', 'south_indian': 'SI', 'south_pacific': 'SP', 'west_pacific': 'WP'} ibtracs_basin = basin_convert.get(self.basin, '') # read in ibtracs file if ibtracs_online: enter_basin = 'ALL' if self.basin == 'australia' else ibtracs_basin + '' path = self.ibtracs_url.replace("(basin)", enter_basin) content = read_url(path) else: f = open(self.ibtracs_url, "r") content = f.readlines() content = [(i.replace(" ", "")).split(",") for i in content] f.close() # Initialize empty dict for neumann data neumann = {} # ibtracs ID to jtwc ID mapping map_duplicate_id = {} map_all_id = {} map_id = {} # Blacklist of storm IDs not NOT merge do_not_merge = ['WP371996', 'WP391996'] for line in content[2:]: if len(line) < 150: continue ibtracs_id, year, adv_number, basin, subbasin, name, time, wmo_type, wmo_lat, wmo_lon, wmo_vmax, wmo_mslp, agency, track_type, dist_land, dist_landfall, iflag, usa_agency, storm_id, lat, lon, special, storm_type, vmax, mslp = line[ :25] time = dt.strptime(time, '%Y-%m-%d%H:%M:00') # Fix name to be consistent with HURDAT if name == 'NOT_NAMED': name = 'UNNAMED' if name[-1] == '-': name = name[:-1] # Hard code fix for faulty IBTrACS data if storm_id == 'EP121989' and name == 'HENRIETTE': storm_id = 'EP111989' if storm_id == 'WP391996': name = 'UNNAMED' # Add storm to list of keys if self.ibtracs_mode == 'wmo' and ibtracs_id not in self.data.keys(): # add empty entry into dict self.data[ibtracs_id] = {'id': storm_id, 'operational_id': '', 'name': name, 'year': time.year, 'season': int(year), 'basin': self.basin} self.data[ibtracs_id]['source'] = self.source self.data[ibtracs_id][ 'source_info'] = 'World Meteorological Organization (official)' self.data[ibtracs_id]['notes'] = "'vmax' = wind converted to 1-minute using the 0.88 conversion factor. 'vmax_orig' = original vmax as assessed by its respective WMO agency." # add empty lists for val in ['time', 'extra_obs', 'special', 'type', 'lat', 'lon', 'vmax', 'vmax_orig', 'mslp', 'wmo_basin']: self.data[ibtracs_id][val] = [] self.data[ibtracs_id]['ace'] = 0.0 elif storm_id != '': # ID entry method to use use_id = storm_id # Hard code problematic early Atlantic IDs check_ids = ['AL041885', 'AL031870'] if use_id in check_ids and use_id in self.data.keys() and ibtracs_id in map_all_id.keys() and map_all_id[ibtracs_id] != use_id: map_all_id[ibtracs_id] = use_id if ibtracs_id not in map_all_id.keys(): # Check for East Pacific case (overwrite prev entry if data is from 1982 backwards) east_pacific_case = False if use_id in self.data.keys() and use_id[0:2] in ['EP', 'CP']: if self.data[use_id]['year'] <= 1982: east_pacific_case = True # Check if this is an extension of an existing storm if use_id in self.data.keys() and not east_pacific_case: # Add to list of duplicate keys if use_id not in map_duplicate_id: flipped_dict = dict([(v, k) for k, v in map_all_id.items()]) map_duplicate_id[use_id] = [ibtracs_id] elif ibtracs_id not in map_duplicate_id[use_id]: map_duplicate_id[use_id].append(ibtracs_id) # Add name if previous one is unnamed if self.data[use_id]['name'] == 'UNNAMED' and name != 'UNNAMED': self.data[use_id]['name'] = name # Otherwise, add storm to entry else: map_all_id[ibtracs_id] = use_id # Clear any previous entry if use_id in self.data.keys(): del self.data[use_id] # Add empty entry into dict self.data[use_id] = {'id': storm_id, 'operational_id': '', 'name': name, 'year': time.year, 'season': int(year), 'basin': self.basin} self.data[use_id]['source'] = self.source self.data[use_id][ 'source_info'] = 'Joint Typhoon Warning Center (unofficial)' if self.neumann: self.data[use_id]['source_info'] += ' & Charles Neumann reanalysis for South Hemisphere storms' # add empty lists for val in ['time', 'extra_obs', 'special', 'type', 'lat', 'lon', 'vmax', 'mslp', 'wmo_type', 'wmo_lat', 'wmo_lon', 'wmo_vmax', 'wmo_mslp', 'wmo_basin']: self.data[use_id][val] = [] self.data[use_id]['ace'] = 0.0 # Case if IBTrACS ID already exists but a separate JTWC ID exists for it elif use_id not in self.data.keys(): # Check for potential conflict match old_id = map_all_id[ibtracs_id] check_id_new = use_id[0:2] + use_id[4:] check_id_old = old_id[0:2] + old_id[4:] if check_id_new == check_id_old and use_id != old_id: # Hard code fix for certain storms, and for early Atlantic entries check_keys = ['IO022018'] if use_id in check_keys or (use_id[0:2] == old_id[0:2] and use_id[0:2] == 'AL'): # Clear any previous entry if use_id in self.data.keys(): del self.data[use_id] # Add empty entry into dict map_all_id[ibtracs_id] = use_id self.data[use_id] = {'id': storm_id, 'operational_id': '', 'name': name, 'year': time.year, 'season': int(year), 'basin': self.basin} self.data[use_id]['source'] = self.source self.data[use_id][ 'source_info'] = 'Joint Typhoon Warning Center (unofficial)' if self.neumann: self.data[use_id]['source_info'] += '& Charles Neumann reanalysis for South Hemisphere storms' # add empty lists for val in ['time', 'extra_obs', 'special', 'type', 'lat', 'lon', 'vmax', 'mslp', 'wmo_type', 'wmo_lat', 'wmo_lon', 'wmo_vmax', 'wmo_mslp', 'wmo_basin']: self.data[use_id][val] = [] self.data[use_id]['ace'] = 0.0 # Special name fix for certain storms if use_id == 'IO022018': self.data['IO012018']['name'] = 'SAGAR' self.data['IO022018']['name'] = 'MEKUNU' # Get neumann data for storms containing it if self.neumann: neumann_lat, neumann_lon, neumann_type, neumann_vmax, neumann_mslp = line[ 141:146] if neumann_lat != "" and neumann_lon != "": # Add storm to list of keys if ibtracs_id not in neumann.keys(): neumann[ibtracs_id] = {'id': storm_id, 'operational_id': '', 'name': name, 'year': time.year, 'season': int(year), 'basin': self.basin} neumann[ibtracs_id]['source'] = self.source neumann[ibtracs_id][ 'source_info'] = 'Joint Typhoon Warning Center (unofficial) & Charles Neumann reanalysis for South Hemisphere storms' for val in ['time', 'extra_obs', 'special', 'type', 'lat', 'lon', 'vmax', 'mslp', 'wmo_basin']: neumann[ibtracs_id][val] = [] neumann[ibtracs_id]['ace'] = 0.0 # Retrieve data neumann_time = time + timedelta(hours=0) neumann_lat = float(wmo_lat) neumann_lon = float(wmo_lon) neumann_vmax = np.nan if neumann_vmax == "" else int( neumann_vmax) neumann_mslp = np.nan if neumann_mslp == "" else int( neumann_mslp) if not np.isnan(neumann_vmax): if str(neumann_vmax)[-1] in ['4', '9']: neumann_vmax += 1 if str(neumann_vmax)[-1] in ['1', '6']: neumann_vmax = neumann_vmax - 1 # Edit basin basin_reverse = {v: k for k, v in basin_convert.items()} wmo_basin = basin_reverse.get(basin, '') if subbasin in ['WA', 'EA']: wmo_basin = 'australia' neumann[ibtracs_id]['wmo_basin'].append(wmo_basin) if neumann_type == 'TC': if neumann_vmax < 34: neumann_type = 'TD' elif neumann_vmax < 64: neumann_type = 'TS' elif wmo_basin in constants.NHC_BASINS: neumann_type = 'HU' elif neumann_vmax < 130: neumann_type = 'TY' else: neumann_type = 'ST' elif neumann_type == 'MM' or neumann_type == '': neumann_type = 'LO' neumann[ibtracs_id]['time'].append(neumann_time) neumann[ibtracs_id]['special'].append(special) neumann[ibtracs_id]['type'].append(neumann_type) neumann[ibtracs_id]['lat'].append(neumann_lat) neumann[ibtracs_id]['lon'].append(neumann_lon) neumann[ibtracs_id]['vmax'].append(neumann_vmax) neumann[ibtracs_id]['mslp'].append(neumann_mslp) hhmm = neumann_time.strftime('%H%M') if hhmm in constants.STANDARD_HOURS: neumann[ibtracs_id]['extra_obs'].append(0) else: neumann[ibtracs_id]['extra_obs'].append(1) # Calculate ACE & append to storm total if not np.isnan(neumann_vmax): ace = accumulated_cyclone_energy(neumann_vmax) if hhmm in constants.STANDARD_HOURS and neumann_type in constants.NAMED_TROPICAL_STORM_TYPES and not np.isnan(ace): neumann[ibtracs_id]['ace'] += np.round(ace, 4) # Skip missing entries if self.ibtracs_mode == 'wmo': if wmo_lat == "" or wmo_lon == "": continue if agency == "": continue else: if lat == "" or lon == "": continue if usa_agency == "" and track_type != "PROVISIONAL": continue # map JTWC to ibtracs ID (for neumann replacement) if self.neumann: if ibtracs_id not in map_id.keys(): map_id[ibtracs_id] = storm_id # Handle WMO mode if self.ibtracs_mode == 'wmo': # Retrieve data dist_land = int(dist_land) # Properly format WMO variables wmo_lat = float(wmo_lat) wmo_lon = float(wmo_lon) wmo_vmax = np.nan if wmo_vmax == "" else int(wmo_vmax) wmo_mslp = np.nan if wmo_mslp == "" else int(wmo_mslp) # Edit basin basin_reverse = {v: k for k, v in basin_convert.items()} wmo_basin = basin_reverse.get(basin, '') if subbasin in ['WA', 'EA']: wmo_basin = 'australia' self.data[ibtracs_id]['wmo_basin'].append(wmo_basin) # Account for wind discrepancy if wmo_basin not in ['north_atlantic', 'east_pacific'] and not np.isnan(wmo_vmax): jtwc_vmax = int(wmo_vmax / 0.88) else: if not np.isnan(wmo_vmax): jtwc_vmax = int(wmo_vmax + 0.0) else: jtwc_vmax = np.nan if not np.isnan(jtwc_vmax): if str(jtwc_vmax)[-1] in ['4', '9']: jtwc_vmax += 1 if str(jtwc_vmax)[-1] in ['1', '6']: jtwc_vmax = jtwc_vmax - 1 # Convert storm type from ibtracs to hurdat style """ DS - Disturbance TS - Tropical ET - Extratropical SS - Subtropical NR - Not reported MX - Mixture (contradicting nature reports from different agencies) """ if wmo_type == "DS": storm_type = "LO" elif wmo_type == "TS": if np.isnan(jtwc_vmax): storm_type = 'LO' elif jtwc_vmax < 34: storm_type = 'TD' elif jtwc_vmax < 64: storm_type = 'TS' elif wmo_basin in constants.NHC_BASINS: storm_type = 'HU' elif jtwc_vmax < 130: storm_type = 'TY' else: storm_type = 'ST' elif wmo_type == 'SS': if np.isnan(jtwc_vmax): storm_type = 'LO' elif jtwc_vmax < 34: storm_type = 'SD' else: storm_type = 'SS' elif wmo_type in ['ET', 'MX']: wmo_type = 'EX' else: storm_type = 'LO' # Handle missing data if wmo_vmax < 0: wmo_vmax = np.nan if wmo_mslp < 800: wmo_mslp = np.nan self.data[ibtracs_id]['time'].append(time) self.data[ibtracs_id]['special'].append(special) self.data[ibtracs_id]['type'].append(storm_type) self.data[ibtracs_id]['lat'].append(wmo_lat) self.data[ibtracs_id]['lon'].append(wmo_lon) self.data[ibtracs_id]['vmax'].append(jtwc_vmax) self.data[ibtracs_id]['vmax_orig'].append(wmo_vmax) self.data[ibtracs_id]['mslp'].append(wmo_mslp) hhmm = time.strftime('%H%M') if hhmm in constants.STANDARD_HOURS: self.data[ibtracs_id]['extra_obs'].append(0) else: self.data[ibtracs_id]['extra_obs'].append(1) # Calculate ACE & append to storm total if not np.isnan(jtwc_vmax): ace = accumulated_cyclone_energy(jtwc_vmax) if hhmm in constants.STANDARD_HOURS and storm_type in constants.NAMED_TROPICAL_STORM_TYPES and not np.isnan(ace): self.data[ibtracs_id]['ace'] += np.round(ace, 4) # Handle non-WMO mode else: if storm_id not in map_duplicate_id.keys() and storm_id not in do_not_merge: orig_storm_id = storm_id + '' storm_id = map_all_id.get(ibtracs_id, None) if storm_id == '' or storm_id is None: continue # Retrieve data dist_land = int(dist_land) # Properly format WMO variables wmo_lat = float(wmo_lat) wmo_lon = float(wmo_lon) wmo_vmax = np.nan if wmo_vmax == "" else int(wmo_vmax) wmo_mslp = np.nan if wmo_mslp == "" else int(wmo_mslp) # Properly format hurdat-style variables lat = float(lat) lon = float(lon) vmax = np.nan if vmax == "" else int(vmax) mslp = np.nan if mslp == "" else int(mslp) if not np.isnan(vmax): if str(vmax)[-1] in ['4', '9']: vmax += 1 if str(vmax)[-1] in ['1', '6']: vmax = vmax - 1 # Avoid duplicate entries if time in self.data[storm_id]['time']: continue # Edit basin basin_reverse = {v: k for k, v in basin_convert.items()} wmo_basin = basin_reverse.get(basin, '') if subbasin in ['WA', 'EA']: wmo_basin = 'australia' if storm_id == 'AL041932': wmo_basin = 'north_atlantic' self.data[storm_id]['wmo_basin'].append(wmo_basin) # Convert storm type from ibtracs to hurdat style if storm_type == "": if wmo_type == 'TS': if vmax < 34: storm_type = 'TD' elif vmax < 64: storm_type = 'TS' elif wmo_basin in constants.NHC_BASINS: storm_type = 'HU' elif vmax < 130: storm_type = 'TY' else: storm_type = 'ST' elif wmo_type == 'SS': if vmax < 34: storm_type = 'SD' else: storm_type = 'SS' elif wmo_type in ['ET', 'MX']: wmo_type = 'EX' elif storm_type == 'DS': storm_type = 'LO' else: if np.isnan(vmax): storm_type = 'LO' elif vmax < 34: storm_type = 'TD' elif vmax < 64: storm_type = 'TS' elif wmo_basin in constants.NHC_BASINS: storm_type = 'HU' elif vmax < 130: storm_type = 'TY' else: storm_type = 'ST' # Handle missing data if vmax < 0: vmax = np.nan if mslp < 800: mslp = np.nan self.data[storm_id]['time'].append(time) self.data[storm_id]['special'].append(special) self.data[storm_id]['wmo_type'].append(wmo_type) self.data[storm_id]['wmo_lat'].append(wmo_lat) self.data[storm_id]['wmo_lon'].append(wmo_lon) self.data[storm_id]['wmo_vmax'].append(wmo_vmax) self.data[storm_id]['wmo_mslp'].append(wmo_mslp) self.data[storm_id]['type'].append(storm_type) self.data[storm_id]['lat'].append(lat) self.data[storm_id]['lon'].append(lon) self.data[storm_id]['vmax'].append(vmax) self.data[storm_id]['mslp'].append(mslp) hhmm = time.strftime('%H%M') if hhmm in constants.STANDARD_HOURS: self.data[storm_id]['extra_obs'].append(0) else: self.data[storm_id]['extra_obs'].append(1) # Calculate ACE & append to storm total if not np.isnan(vmax): ace = accumulated_cyclone_energy(vmax) if hhmm in constants.STANDARD_HOURS and storm_type in constants.NAMED_TROPICAL_STORM_TYPES and not np.isnan(ace): self.data[storm_id]['ace'] += np.round(ace, 4) # Remove empty entries all_keys = [k for k in self.data.keys()] for key in all_keys: if len(self.data[key]['lat']) == 0: del (self.data[key]) # Replace neumann entries if self.neumann: # iterate through every neumann entry for key in neumann.keys(): # get corresponding JTWC ID jtwc_id = map_id.get(key, '') if jtwc_id == '': continue # plug dict entry old_entry = self.data[jtwc_id] self.data[jtwc_id] = neumann[key] # replace id self.data[jtwc_id]['id'] = jtwc_id # Remove entries where requested basin doesn't appear if self.basin not in ['all', 'both']: all_keys = [k for k in self.data.keys()] for key in all_keys: if self.basin not in self.data[key]['wmo_basin']: del (self.data[key]) # Fix cyclone Catarina, if specified & requested all_keys = [k for k in self.data.keys()] if '2004086S29318' in all_keys and self.catarina: self.data['2004086S29318'] = cyclone_catarina() elif 'AL502004' in all_keys and self.catarina: self.data['AL502004'] = cyclone_catarina() # Sort data temporally for key in self.data.keys(): storm_times = self.data[key]['time'] for key2 in self.data[key].keys(): if isinstance(self.data[key][key2], list): self.data[key][key2] = [x for _, x in sorted( zip(storm_times, self.data[key][key2]))] # Determine time elapsed time_elapsed = dt.now() - start_time tsec = str(round(time_elapsed.total_seconds(), 2)) print(f"--> Completed reading in ibtracs data ({tsec} seconds)") def __interpolate_storms(self, keys): r""" Interpolates storm data temporally to hourly. This is done for every provided key, and is stored in a separate internal dict. Parameters ---------- keys : list List of keys to be interpolated to hourly data. """ # Check if operation needs to be performed count = 0 for key in keys: if key not in self.data_interp.keys(): count += 1 if count == 0: return start_time = dt.now() print("--> Starting to interpolate storms") for key in keys: if key not in self.data_interp.keys(): self.data_interp[key] = interp_storm( self.data[key].copy(), hours=1, dt_window=24, dt_align='middle') # Determine time elapsed time_elapsed = dt.now() - start_time tsec = str(round(time_elapsed.total_seconds(), 2)) print(f"--> Completed interpolating storms ({tsec} seconds)")
[docs] def get_storm_id(self, storm): r""" Returns the storm ID (e.g., "AL012019") given the storm name and year. Parameters ---------- storm : tuple Tuple containing the storm name and year (e.g., ("Matthew",2016)). Returns ------- str or list If a single storm was found, returns a string containing its ID. Otherwise returns a list of matching IDs. """ # Error check if not isinstance(storm, tuple): raise TypeError("storm must be of type tuple.") if len(storm) != 2: raise ValueError( "storm must contain 2 elements, name (str) and year (int)") name, year = storm # Search for corresponding entry in keys keys_use = [] for key in self.keys: temp_year = self.data[key]['year'] if temp_year == year: temp_name = self.data[key]['name'] if temp_name == name.upper(): keys_use.append(key) # return key, or list of keys if len(keys_use) == 1: keys_use = keys_use[0] if len(keys_use) == 0: raise RuntimeError("Storm not found") return keys_use
[docs] def get_storm_tuple(self, storm): r""" Returns the storm tuple (e.g., ("Dorian",2019)) given the storm id. Parameters ---------- storm : string String containing the storm ID (e.g., "AL052019"). Returns ------- tuple Returns a list of matching IDs. """ # Error check if not isinstance(storm, str): raise TypeError("storm must be of type string.") try: name = self.data[storm]['name'] year = self.data[storm]['year'] except: raise RuntimeError("Storm not found") return (name, year)
[docs] def get_storm(self, storm): r""" Retrieves a Storm object for the requested storm. Parameters ---------- storm : str or tuple Requested storm. Can be either string of storm ID (e.g., "AL052019"), or tuple with storm name and year (e.g., ("Matthew",2016)). Returns ------- tropycal.tracks.Storm Object containing information about the requested storm, and methods for analyzing and plotting the storm. """ # Check if storm is str or tuple if isinstance(storm, str): key = storm elif isinstance(storm, tuple): key = self.get_storm_id((storm[0], storm[1])) else: raise RuntimeError( "Storm must be a string (e.g., 'AL052019') or tuple (e.g., ('Matthew',2016)).") # Retrieve key of given storm if isinstance(key, str): # Check to see if tornado data exists for this storm if np.max(self.keys_tors) == 1: if key in self.data_tors.keys(): return Storm(self.data[key], {'data': self.data_tors[key], 'dist_thresh': self.tornado_dist_thresh}) else: return Storm(self.data[key]) else: return Storm(self.data[key]) else: error_message = ''.join([f"\n{i}" for i in key]) error_message = f"Multiple IDs were identified for the requested storm. Choose one of the following storm IDs and provide it as the 'storm' argument instead of a tuple:{error_message}" raise RuntimeError(error_message)
[docs] def plot_storm(self, storm, domain="dynamic", plot_all_dots=False, ax=None, cartopy_proj=None, save_path=None, **kwargs): r""" Creates a plot of a single storm. Parameters ---------- storm : str, tuple or dict Requested storm. Can be either string of storm ID (e.g., "AL052019"), tuple with storm name and year (e.g., ("Matthew",2016)), or a dict entry. domain : str Domain for the plot. Default is "dynamic". "dynamic_tropical" is also available. Please refer to :ref:`options-domain` for available domain options. plot_all_dots : bool Whether to plot dots for all observations along the track. If false, dots will be plotted every 6 hours. Default is false. ax : axes Instance of axes to plot on. If none, one will be generated. Default is none. cartopy_proj : ccrs Instance of a cartopy projection to use. If none, one will be generated. Default is none. save_path : str Relative or full path of directory to save the image in. If none, image will not be saved. Other Parameters ---------------- prop : dict Customization properties of storm track lines. Please refer to :ref:`options-prop` for available options. map_prop : dict Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options. Returns ------- ax Instance of axes containing the plot is returned. """ # Retrieve kwargs prop = kwargs.pop('prop', {}) map_prop = kwargs.pop('map_prop', {}) # Retrieve requested storm if not isinstance(storm, dict): storm_dict = self.get_storm(storm).dict else: storm_dict = storm # Create instance of plot object try: self.plot_obj except: self.plot_obj = TrackPlot() # Create cartopy projection if cartopy_proj is not None: self.plot_obj.proj = cartopy_proj elif max(storm_dict['lon']) > 150 or min(storm_dict['lon']) < -150: self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=180.0) else: self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=0.0) # Plot storm plot_ax = self.plot_obj.plot_storms( [storm_dict], domain, plot_all_dots=plot_all_dots, ax=ax, save_path=save_path, prop=prop, map_prop=map_prop) # Return axis return plot_ax
[docs] def plot_storms(self, storms, domain="dynamic", title="TC Track Composite", plot_all_dots=False, ax=None, cartopy_proj=None, save_path=None, **kwargs): r""" Creates a plot of multiple storms. Parameters ---------- storms : list List of requested storms. List can contain either strings of storm ID (e.g., "AL052019"), tuples with storm name and year (e.g., ("Matthew",2016)), or dict entries. domain : str Domain for the plot. Default is "dynamic". Please refer to :ref:`options-domain` for available domain options. title : str Title string to display on the plot. Default is "TC Track Composite". plot_all_dots : bool Whether to plot dots for all observations along the track. If false, dots will be plotted every 6 hours. Default is false. ax : axes Instance of axes to plot on. If none, one will be generated. Default is none. cartopy_proj : ccrs Instance of a cartopy projection to use. If none, one will be generated. Default is none. save_path : str Relative or full path of directory to save the image in. If none, image will not be saved. Other Parameters ---------------- prop : dict Customization properties of storm track lines. Please refer to :ref:`options-prop` for available options. map_prop : dict Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options. Returns ------- ax Instance of axes containing the plot is returned. """ # Retrieve kwargs prop = kwargs.pop('prop', {}) map_prop = kwargs.pop('map_prop', {}) # Create instance of plot object try: self.plot_obj except: self.plot_obj = TrackPlot() # Identify plot domain for all requested storms max_lon = -9999 min_lon = 9999 storm_dicts = [] for storm in storms: # Retrieve requested storm if not isinstance(storm, dict): storm_dict = self.get_storm(storm).dict else: storm_dict = storm storm_dicts.append(storm_dict) # Add to array of max/min lat/lons if max(storm_dict['lon']) > max_lon: max_lon = max(storm_dict['lon']) if min(storm_dict['lon']) < min_lon: min_lon = min(storm_dict['lon']) # Create cartopy projection if cartopy_proj is not None: self.plot_obj.proj = cartopy_proj elif max_lon > 150 or min_lon < -150: self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=180.0) else: self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=0.0) # Plot storm plot_ax = self.plot_obj.plot_storms( storm_dicts, domain, title, plot_all_dots, ax=ax, save_path=save_path, prop=prop, map_prop=map_prop) # Return axis return plot_ax
[docs] def plot_season(self, year, domain=None, ax=None, cartopy_proj=None, save_path=None, **kwargs): r""" Creates a plot of a single season. Parameters ---------- year : int Year to retrieve season data. If in southern hemisphere, year is the 2nd year of the season (e.g., 1975 for 1974-1975). domain : str Domain for the plot. Default is basin-wide. Please refer to :ref:`options-domain` for available domain options. ax : axes Instance of axes to plot on. If none, one will be generated. Default is none. cartopy_proj : ccrs Instance of a cartopy projection to use. If none, one will be generated. Default is none. save_path : str Relative or full path of directory to save the image in. If none, image will not be saved. Other Parameters ---------------- prop : dict Customization properties of storm track lines. Please refer to :ref:`options-prop` for available options. map_prop : dict Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options. Returns ------- ax Instance of axes containing the plot is returned. """ # Retrieve kwargs prop = kwargs.pop('prop', {}) map_prop = kwargs.pop('map_prop', {}) # Retrieve season object season = self.get_season(year) # Create instance of plot object try: self.plot_obj except: self.plot_obj = TrackPlot() # Create cartopy projection if cartopy_proj is not None: self.plot_obj.proj = cartopy_proj elif season.basin in ['east_pacific', 'west_pacific', 'south_pacific', 'australia', 'all']: self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=180.0) else: self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=0.0) # Plot season plot_ax = self.plot_obj.plot_season( season, domain, ax=ax, save_path=save_path, prop=prop, map_prop=map_prop) # Return axis return plot_ax
[docs] def search_name(self, name): r""" Searches for hurricane seasons containing a storm of the requested name. Parameters ---------- name : str Name to search through the dataset for. Returns ------- list List containing the hurricane seasons where a storm of the requested name was found. """ # get keys for all storms in requested year years = [self.data[key]['year'] for key in self.keys if self.data[key]['name'] == name.upper()] return years
[docs] def download_tcr(self, storm, save_path=""): r""" Downloads the NHC offical Tropical Cyclone Report (TCR) for the requested storm to the requested directory. Available only for storms with advisories issued by the National Hurricane Center. Parameters ---------- storm : str, tuple or dict Requested storm. Can be either string of storm ID (e.g., "AL052019"), tuple with storm name and year (e.g., ("Matthew",2016)), or a dict entry. save_path : str Path of directory to download the TCR into. Default is current working directory. """ # Retrieve requested storm if not isinstance(storm, dict): storm_dict = self.get_storm(storm) else: storm_dict = self.get_storm(storm.id) # Error check if self.source != "hurdat": msg = "NHC data can only be accessed when HURDAT is used as the data source." raise RuntimeError(msg) if self.year < 1995: msg = "Tropical Cyclone Reports are unavailable prior to 1995." raise RuntimeError(msg) if not isinstance(save_path, str): msg = "'save_path' must be of type str." raise TypeError(msg) # Format URL storm_id = self.dict['id'].upper() storm_name = self.dict['name'].title() url = f"https://www.nhc.noaa.gov/data/tcr/{storm_id}_{storm_name}.pdf" # Check to make sure PDF is available request = requests.get(url) if request.status_code != 200: msg = "This tropical cyclone does not have a Tropical Cyclone Report (TCR) available." raise RuntimeError(msg) # Retrieve PDF response = requests.get(url) full_path = os.path.join(save_path, f"TCR_{storm_id}_{storm_name}.pdf") with open(full_path, 'wb') as f: f.write(response.content)
def __retrieve_season(self, year, basin): # Initialize dict to be populated season_dict = {} # Search for corresponding entry in keys basin_list = [] for key in self.keys: # Get year for 'all' (global data), otherwise get season if self.basin == 'all' and basin == 'all': temp_year = int(year) if int(year) in [ i.year for i in self.data[key]['time']] else 0 else: temp_year = self.data[key]['season'] # Proceed if year/season is a match if temp_year == int(year): temp_basin = self.data[key]['basin'] temp_wmo_basin = self.data[key]['wmo_basin'] if temp_basin == 'all': if basin == 'all': season_dict[key] = self.data[key] basin_list.append('all') elif basin in temp_wmo_basin: season_dict[key] = self.data[key] basin_list.append(self.data[key]['wmo_basin'][0]) else: season_dict[key] = self.data[key] basin_list.append(self.data[key]['wmo_basin'][0]) # Error check if len(season_dict) == 0: raise RuntimeError( "No storms were identified for the given year in the given basin.") # Add attributes first_key = [k for k in season_dict.keys()][0] season_info = {} season_info['year'] = year season_info['basin'] = max(set(basin_list), key=basin_list.count) season_info['source_basin'] = season_dict[first_key]['basin'] season_info['source'] = season_dict[first_key]['source'] season_info['source_info'] = season_dict[first_key]['source_info'] # Fix basin if self.basin == 'all' and basin == 'all': season_info['basin'] = 'all' if self.basin == 'both': season_info['basin'] = 'both' # Return object return Season(season_dict, season_info)
[docs] def get_season(self, year, basin='all'): r""" Retrieves a Season object for the requested season or seasons. Parameters ---------- year : int or list Year(s) to retrieve season data. If in southern hemisphere, year is the 2nd year of the season (e.g., 1975 for 1974-1975). Use of multiple years is only permissible for hurdat sources. basin : str, optional If using a global ibtracs dataset, this specifies which basin to load in. Otherwise this argument is ignored. Returns ------- tropycal.tracks.Season Object containing every storm entry for the given season, and methods for analyzing and plotting the season. """ # Error checks if not is_number(year) and not isinstance(year, list): msg = "'year' must be of type int or list." raise TypeError(msg) if isinstance(year, list): for i in year: if not is_number(i): msg = "Elements of list 'year' must be of type int." raise TypeError(msg) # Retrieve season object(s) if is_number(year): return self.__retrieve_season(year, basin) else: return_season = self.__retrieve_season(year[0], basin) for i_year in year[1:]: return_season = return_season + \ self.__retrieve_season(i_year, basin) return return_season
[docs] def ace_climo(self, plot_year=None, compare_years=None, climo_bounds=None, month_range=None, rolling_sum=0, return_dict=False, save_path=None): r""" Creates and plots a climatology of accumulated cyclone energy (ACE). Parameters ---------- plot_year : int Year to highlight. If current year, plot will be drawn through today. If none, no year will be highlighted. compare_years : int or list Seasons to compare against. Can be either a single season (int), or a range or list of seasons (list). climo_bounds : tuple Start and end years to compute the climatology over. Default is from 1950 to last year. month_range : tuple Start and end months to plot (e.g., ``(5,10)``). Default is peak hurricane season by basin. rolling_sum : int Days to calculate a rolling sum over. Default is 0 (annual running sum). return_dict : bool If False (default), plot axes will be returned. If True, a dictionary containing the raw data is returned. save_path : str Determines the file path to save the image to. If blank or none, image will be directly shown. Returns ------- axes or dict By default, the plot axes is returned. If return_dict is True, a dictionary containing the raw ACE climatology data is returned. Notes ----- If in southern hemisphere, year is the 2nd year of the season (e.g., 1975 for 1974-1975). """ # Retrieve current year cur_year = dt.now().year if climo_bounds is None: climo_bounds = (1950, dt.now().year - 1) # Create empty dict ace = {} # Iterate over every year of HURDAT available end_year = self.data[self.keys[-1]]['year'] years = [yr for yr in range( 1851, cur_year + 1) if (min(climo_bounds) <= yr <= max(climo_bounds)) or yr == plot_year] for year in years: # Get info for this year if self.basin == 'all': storm_ids = [key for key in self.data.keys() if year in [ i.year for i in self.data[key]['time']]] else: try: season = self.get_season(year) storm_ids = season.summary()['id'] except: continue # Generate list of dates for this year if self.basin in constants.SOUTH_HEMISPHERE_BASINS: year_dates = np.array([dt.strptime(((pd.to_datetime(i)).strftime('%Y%m%d%H')), '%Y%m%d%H') for i in np.arange(dt(year - 1, 7, 1), dt(year, 7, 1), timedelta(hours=6))]) else: year_dates = np.array([dt.strptime(((pd.to_datetime(i)).strftime('%Y%m%d%H')), '%Y%m%d%H') for i in np.arange(dt(year, 1, 1), dt(year + 1, 1, 1), timedelta(hours=6))]) # Remove 2/29 from dates if calendar.isleap(year): year_dates = year_dates[year_dates != dt(year, 2, 29, 0)] year_dates = year_dates[year_dates != dt(year, 2, 29, 6)] year_dates = year_dates[year_dates != dt(year, 2, 29, 12)] year_dates = year_dates[year_dates != dt(year, 2, 29, 18)] # Additional empty arrays year_timestep_ace = np.zeros((year_dates.shape)) year_genesis = [] # Get list of storms for this year for storm in storm_ids: # Get HURDAT data for this storm storm_data = self.data[storm] storm_date_y = np.array([int(i.strftime('%Y')) for i in storm_data['time']]) storm_date_h = np.array([i.strftime('%H%M') for i in storm_data['time']]) storm_date_m = [i.strftime('%m%d') for i in storm_data['time']] storm_date = np.array(storm_data['time']) storm_type = np.array(storm_data['type']) storm_vmax = np.array(storm_data['vmax']) storm_basin = np.array(storm_data['wmo_basin']) # Subset to remove obs not useful for ace idx1 = ((storm_type == 'SS') | (storm_type == 'TS') | ( storm_type == 'HU') | (storm_type == 'TY') | (storm_type == 'ST')) idx2 = ~np.isnan(storm_vmax) idx3 = ((storm_date_h == '0000') | (storm_date_h == '0600') | ( storm_date_h == '1200') | (storm_date_h == '1800')) idx4 = storm_date_y == year if self.basin in constants.SOUTH_HEMISPHERE_BASINS: idx4[idx4 == False] = True if self.basin not in ['all', 'both']: idx4 = (idx4) & (storm_basin == self.basin) storm_date = storm_date[(idx1) & (idx2) & (idx3) & (idx4)] storm_type = storm_type[(idx1) & (idx2) & (idx3) & (idx4)] storm_vmax = storm_vmax[(idx1) & (idx2) & (idx3) & (idx4)] if len(storm_vmax) == 0: continue # Continue if doesn't apply to this storm storm_ace = accumulated_cyclone_energy(storm_vmax) # Account for storms on february 29th by pushing them forward 1 day if '0229' in storm_date_m: storm_date_temp = [] for idate in storm_date: dt_date = pd.to_datetime(idate) if dt_date.strftime('%m%d') == '0229' or dt_date.strftime('%m') == '03': dt_date += timedelta(hours=24) storm_date_temp.append(dt_date) storm_date = storm_date_temp # Append ACE to cumulative sum idx = np.nonzero(np.in1d(year_dates, storm_date)) year_timestep_ace[idx] += storm_ace year_genesis.append( np.where(year_dates == storm_date[0])[0][0]) # Calculate cumulative sum of year if rolling_sum == 0: year_cumulative_ace = np.cumsum(year_timestep_ace) year_genesis = np.array(year_genesis) # Attach to dict ace[str(year)] = {} ace[str(year)]['time'] = year_dates ace[str(year)]['ace'] = year_cumulative_ace ace[str(year)]['genesis_index'] = year_genesis else: year_cumulative_ace = np.sum(rolling_window( year_timestep_ace, rolling_sum * 4), axis=1) year_genesis = np.array(year_genesis) - ((rolling_sum * 4) - 1) # Attach to dict ace[str(year)] = {} ace[str(year)]['time'] = year_dates[((rolling_sum * 4) - 1):] ace[str(year)]['ace'] = year_cumulative_ace ace[str(year)]['genesis_index'] = year_genesis # ------------------------------------------------------------------------------------------ # Construct non-leap year julian day array julian_x = np.arange(365 * 4.0) / 4.0 julian = np.arange(365 * 4.0) / 4.0 if self.basin in constants.SOUTH_HEMISPHERE_BASINS: julian = list(julian[182 * 4:]) + list(julian[:182 * 4]) if rolling_sum != 0: julian = julian[((rolling_sum * 4) - 1):] julian_x = julian_x[((rolling_sum * 4) - 1):] # Get julian days for a non-leap year months_julian = months_in_julian(2019) julian_start = months_julian['start'] julian_midpoint = months_julian['midpoint'] julian_name = months_julian['name'] julian_months = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] if self.basin in constants.SOUTH_HEMISPHERE_BASINS: julian_start = list( np.array(julian_start[6:]) - 181) + list(np.array(julian_start[:6]) + 183) julian_midpoint = list(np.array( julian_midpoint[6:]) - 181) + list(np.array(julian_midpoint[:6]) + 183) julian_name = julian_name[6:] + julian_name[:6] julian_months = [7, 8, 9, 10, 11, 12, 1, 2, 3, 4, 5, 6] # Construct percentile arrays all_ace = np.ones((len(years), len(julian))) * np.nan for year in range(min(climo_bounds), max(climo_bounds) + 1): all_ace[years.index(year)] = ace[str(year)]['ace'] pmin, p10, p25, p40, p60, p75, p90, pmax = np.nanpercentile( all_ace, [0, 10, 25, 40, 60, 75, 90, 100], axis=0) # Return if not plotting if return_dict: return ace # ------------------------------------------------------------------------------------------ # Create figure fig, ax = plt.subplots(figsize=(9, 7), dpi=200) # Set up x-axis ax.grid(axis='y', linewidth=0.5, color='k', alpha=0.2, zorder=1, linestyle='--') ax.set_xticks(julian_midpoint) ax.set_xticklabels(julian_name) for i, (istart, iend) in enumerate(zip(julian_start[:-1][::2], julian_start[1:][::2])): ax.axvspan(istart, iend, color='#e4e4e4', alpha=0.5, zorder=0) # Set x-axis bounds if month_range is None: ax.set_xlim(julian_start[4], julian_x[-1]) else: start_month = julian_months.index(int(month_range[0])) end_month = julian_months.index(int(month_range[1])) start_julian = julian_x[0] if start_month == 0 else julian_start[start_month] end_julian = julian_x[-1] if end_month == len( julian_months) - 1 else julian_start[end_month + 1] ax.set_xlim(start_julian, end_julian) # Add plot title basin_title = self.basin.title().replace( '_', ' ') if self.basin != 'all' else 'Global' plot_year_title = '' if plot_year is None: title_string = f"{basin_title} Accumulated Cyclone Energy Climatology" else: plot_year_title = str( plot_year) if self.basin not in constants.SOUTH_HEMISPHERE_BASINS else f'{plot_year-1}-{plot_year}' cur_year = (dt.now()).year if plot_year == cur_year: add_current = f"(through {dt.now().strftime('%m/%d')})" else: add_current = "" title_string = f"{plot_year_title} {basin_title} Accumulated Cyclone Energy {add_current}" if rolling_sum != 0: title_add = f"\n{rolling_sum}-Day Running Sum" else: title_add = "" ax.set_title(f"{title_string}{title_add}", fontsize=12, fontweight='bold', loc='left') # Plot requested year if plot_year is not None: year_julian_x = np.copy(julian_x) year_ace = ace[str(plot_year)]['ace'] year_genesis = ace[str(plot_year)]['genesis_index'] # Check to see if this is current year cur_year = (dt.now()).year if plot_year == cur_year: start_time = dt(2019, 1, 1) if self.basin in constants.SOUTH_HEMISPHERE_BASINS: start_time = dt(2018, 7, 1) end_time = (dt.now()).replace(year=2019, minute=0, second=0) temp_julian = ((end_time - start_time).days + (end_time - start_time).seconds / 86400.0) + 1 cur_julian = int(temp_julian) * 4 - int(rolling_sum * 4) year_julian_x = year_julian_x[:cur_julian + 1] year_ace = year_ace[:cur_julian + 1] year_genesis = year_genesis[year_genesis <= cur_julian] ax.plot(year_julian_x[-1], year_ace[-1], 'o', color='k', ms=8, mec='w', mew=0.8, zorder=8) ax.plot(year_julian_x, year_ace, '-', color='w', linewidth=2.8, zorder=6) ax.plot(year_julian_x, year_ace, '-', color='k', linewidth=2.0, zorder=6, label=f'{plot_year_title} ACE ({np.max(year_ace):.1f})') ax.plot(year_julian_x[year_genesis], year_ace[year_genesis], 'D', color='k', ms=5, mec='w', mew=0.5, zorder=7, label='TC Genesis') # Plot comparison years if compare_years is not None: if isinstance(compare_years, int): compare_years = [compare_years] for year in compare_years: year_julian_x = np.copy(julian_x) year_ace = ace[str(year)]['ace'] year_genesis = ace[str(year)]['genesis_index'] # Check to see if this is current year cur_year = (dt.now()).year if year == cur_year: cur_julian = int(convert_to_julian((dt.now()).replace( year=2019, minute=0, second=0))) * 4 - int(rolling_sum * 4) year_julian_x = year_julian_x[:cur_julian + 1] year_ace = year_ace[:cur_julian + 1] year_genesis = year_genesis[:cur_julian + 1] ax.plot(year_julian_x[-1], year_ace[-1], 'o', color='#333333', alpha=0.3, ms=6, zorder=5) if len(compare_years) <= 5: label_year = str( year) if self.basin not in constants.SOUTH_HEMISPHERE_BASINS else f'{year-1}-{year}' ax.plot(year_julian_x, year_ace, '-', color='k', linewidth=1.0, alpha=0.5, zorder=3, label=f'{year} ACE ({np.max(year_ace):.1f})') ax.plot(year_julian_x[year_genesis], year_ace[year_genesis], 'D', color='#333333', ms=3, alpha=0.3, zorder=4) ax.text(year_julian_x[-2], year_ace[-2] + 2, str(year), fontsize=7, fontweight='bold', alpha=0.7, ha='right', va='bottom') else: ax.plot(year_julian_x, year_ace, '-', color='k', linewidth=1.0, alpha=0.15, zorder=3) # Plot all climatological values pmin_masked = np.array(pmin) pmin_masked = np.ma.masked_where(pmin_masked == 0, pmin_masked) ax.plot(julian_x, pmax, '--', color='r', zorder=2, label=f'Max ({np.max(pmax):.1f})') ax.plot(julian_x, pmin_masked, '--', color='b', zorder=2, label=f'Min ({np.max(pmin):.1f})') ax.fill_between(julian_x, p10, p90, color='#60CE56', alpha=0.3, zorder=2, label='Climo 10-90%') ax.fill_between(julian_x, p25, p75, color='#16A147', alpha=0.3, zorder=2, label='Climo 25-75%') ax.fill_between(julian_x, p40, p60, color='#00782A', alpha=0.3, zorder=2, label='Climo 40-60%') # Add legend & plot credit ax.legend(loc=2) endash = u"\u2013" credit_text = plot_credit() add_credit(ax, credit_text) ax.text(0.99, 0.99, f'Climatology from {climo_bounds[0]}{endash}{climo_bounds[-1]}', fontsize=9, color='k', alpha=0.7, transform=ax.transAxes, ha='right', va='top', zorder=10) # Show/save plot and close if save_path is not None and isinstance(save_path, str): plt.savefig(save_path, bbox_inches='tight', facecolor='w') return ax
[docs] def hurricane_days_climo(self, plot_year=None, compare_years=None, climo_bounds=None, month_range=None, rolling_sum=0, category=None, return_dict=False, save_path=None): r""" Creates a climatology of tropical storm/hurricane/major hurricane days. Parameters ---------- plot_year : int Year to highlight. If current year, plot will be drawn through today. If none, no year will be highlighted. compare_years : int or list Seasons to compare against. Can be either a single season (int), or a range or list of seasons (list). climo_bounds : tuple Start and end years to compute the climatology over. Default is from 1950 to last year. month_range : tuple Start and end months to plot (e.g., ``(5,10)``). Default is peak hurricane season by basin. rolling_sum : int Days to calculate a rolling sum over. Default is 0 (annual running sum). category : int SSHWS Category to generate the data and plot for. Use 0 for tropical storm. If None (default), a plot will be generated for all categories. return_dict : bool If False (default), plot axes will be returned. If True, a dictionary containing the raw data is returned. save_path : str Determines the file path to save the image to. If blank or none, image will be directly shown. Returns ------- axes or dict By default, the plot axes is returned. If return_dict is True, a dictionary containing the raw data is returned. Notes ----- If in southern hemisphere, year is the 2nd year of the season (e.g., 1975 for 1974-1975). """ # Retrieve current year cur_year = dt.now().year if climo_bounds is None: climo_bounds = (1950, dt.now().year - 1) # Create empty dict tc_days = {} # Function for counting TC days above a wind threshold def duration_thres(arr, thres): arr2 = np.zeros((arr.shape)) arr2[arr >= thres] = (6.0 / 24.0) return arr2 # Iterate over every year of HURDAT available end_year = self.data[self.keys[-1]]['year'] years = [yr for yr in range( 1851, cur_year + 1) if (min(climo_bounds) <= yr <= max(climo_bounds)) or yr == plot_year] for year in years: # Get info for this year if self.basin == 'all': storm_ids = [key for key in self.data.keys() if year in [ i.year for i in self.data[key]['time']]] else: try: season = self.get_season(year) storm_ids = season.summary()['id'] except: continue # Generate list of dates for this year if self.basin in constants.SOUTH_HEMISPHERE_BASINS: year_dates = np.array([dt.strptime(((pd.to_datetime(i)).strftime('%Y%m%d%H')), '%Y%m%d%H') for i in np.arange(dt(year - 1, 7, 1), dt(year, 7, 1), timedelta(hours=6))]) else: year_dates = np.array([dt.strptime(((pd.to_datetime(i)).strftime('%Y%m%d%H')), '%Y%m%d%H') for i in np.arange(dt(year, 1, 1), dt(year + 1, 1, 1), timedelta(hours=6))]) # Remove 2/29 from dates if calendar.isleap(year): year_dates = year_dates[year_dates != dt(year, 2, 29, 0)] year_dates = year_dates[year_dates != dt(year, 2, 29, 6)] year_dates = year_dates[year_dates != dt(year, 2, 29, 12)] year_dates = year_dates[year_dates != dt(year, 2, 29, 18)] # Additional empty arrays temp_arr = np.zeros((year_dates.shape)) cumulative = {} all_thres = ['ts', 'c1', 'c2', 'c3', 'c4', 'c5'] for thres in all_thres: cumulative[thres] = np.copy(temp_arr) year_genesis = [] # Get list of storms for this year for storm in storm_ids: # Get HURDAT data for this storm storm_data = self.data[storm] storm_date_y = np.array([int(i.strftime('%Y')) for i in storm_data['time']]) storm_date_h = np.array([i.strftime('%H%M') for i in storm_data['time']]) storm_date_m = [i.strftime('%m%d') for i in storm_data['time']] storm_date = np.array(storm_data['time']) storm_type = np.array(storm_data['type']) storm_vmax = np.array(storm_data['vmax']) # Subset to remove obs not useful for calculation idx1 = ((storm_type == 'SS') | (storm_type == 'TS') | ( storm_type == 'HU') | (storm_type == 'TY') | (storm_type == 'ST')) idx2 = ~np.isnan(storm_vmax) idx3 = ((storm_date_h == '0000') | (storm_date_h == '0600') | ( storm_date_h == '1200') | (storm_date_h == '1800')) idx4 = storm_date_y == year if self.basin in constants.SOUTH_HEMISPHERE_BASINS: idx4[idx4 == False] = True storm_date = storm_date[(idx1) & (idx2) & (idx3) & (idx4)] storm_type = storm_type[(idx1) & (idx2) & (idx3) & (idx4)] storm_vmax = storm_vmax[(idx1) & (idx2) & (idx3) & (idx4)] if len(storm_vmax) == 0: continue # Continue if doesn't apply to this storm # Account for storms on february 29th by pushing them forward 1 day if '0229' in storm_date_m: storm_date_temp = [] for idate in storm_date: dt_date = pd.to_datetime(idate) if dt_date.strftime('%m%d') == '0229' or dt_date.strftime('%m') == '03': dt_date += timedelta(hours=24) storm_date_temp.append(dt_date) storm_date = storm_date_temp # Append storm days to cumulative sum idx = np.nonzero(np.in1d(year_dates, storm_date)) cumulative['ts'][idx] += duration_thres(storm_vmax, 34.0) cumulative['c1'][idx] += duration_thres(storm_vmax, 64.0) cumulative['c2'][idx] += duration_thres(storm_vmax, 83.0) cumulative['c3'][idx] += duration_thres(storm_vmax, 96.0) cumulative['c4'][idx] += duration_thres(storm_vmax, 113.0) cumulative['c5'][idx] += duration_thres(storm_vmax, 137.0) year_genesis.append( np.where(year_dates == storm_date[0])[0][0]) # Calculate cumulative sum of year if rolling_sum == 0: year_genesis = np.array(year_genesis) # Attach to dict tc_days[str(year)] = {} tc_days[str(year)]['time'] = year_dates tc_days[str(year)]['genesis_index'] = year_genesis # Loop through all thresholds for thres in all_thres: tc_days[str(year)][thres] = np.cumsum(cumulative[thres]) else: year_genesis = np.array(year_genesis) - ((rolling_sum * 4) - 1) # Attach to dict tc_days[str(year)] = {} tc_days[str(year)]['time'] = year_dates[( (rolling_sum * 4) - 1):] tc_days[str(year)]['genesis_index'] = year_genesis # Loop through all thresholds for thres in all_thres: tc_days[str(year)][thres] = np.sum(rolling_window( cumulative[thres], rolling_sum * 4), axis=1) # ------------------------------------------------------------------------------------------ # Construct non-leap year julian day array julian_x = np.arange(365 * 4.0) / 4.0 julian = np.arange(365 * 4.0) / 4.0 if self.basin in constants.SOUTH_HEMISPHERE_BASINS: julian = list(julian[182 * 4:]) + list(julian[:182 * 4]) if rolling_sum != 0: julian = julian[((rolling_sum * 4) - 1):] julian_x = julian_x[((rolling_sum * 4) - 1):] # Get julian days for a non-leap year months_julian = months_in_julian(2019) julian_start = months_julian['start'] julian_midpoint = months_julian['midpoint'] julian_name = months_julian['name'] julian_months = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] if self.basin in constants.SOUTH_HEMISPHERE_BASINS: julian_start = list( np.array(julian_start[6:]) - 181) + list(np.array(julian_start[:6]) + 183) julian_midpoint = list(np.array( julian_midpoint[6:]) - 181) + list(np.array(julian_midpoint[:6]) + 183) julian_name = julian_name[6:] + julian_name[:6] julian_months = [7, 8, 9, 10, 11, 12, 1, 2, 3, 4, 5, 6] # Determine type of plot to make category_match = {0: 'ts', 1: 'c1', 2: 'c2', 3: 'c3', 4: 'c4', 5: 'c5'} if category is None: cat = 0 else: cat = category_match.get(category, 'c1') # Construct percentile arrays if cat == 0: p50 = {} for thres in all_thres: all_tc_days = np.zeros((len(years), len(julian))) for year in years: all_tc_days[years.index(year)] = tc_days[str(year)][thres] p50[thres] = np.percentile(all_tc_days, 50, axis=0) p50[thres] = np.average(all_tc_days, axis=0) else: all_tc_days = np.zeros((len(years), len(julian))) for year in years: all_tc_days[years.index(year)] = tc_days[str(year)][cat] pmin, p10, p25, p40, p60, p75, p90, pmax = np.percentile( all_tc_days, [0, 10, 25, 40, 60, 75, 90, 100], axis=0) # Return if not plotting if return_dict: return tc_days # ------------------------------------------------------------------------------------------ # Create figure fig, ax = plt.subplots(figsize=(9, 7), dpi=200) # Set up x-axis ax.grid(axis='y', linewidth=0.5, color='k', alpha=0.2, zorder=1, linestyle='--') ax.set_xticks(julian_midpoint) ax.set_xticklabels(julian_name) for i, (istart, iend) in enumerate(zip(julian_start[:-1][::2], julian_start[1:][::2])): ax.axvspan(istart, iend, color='#e4e4e4', alpha=0.5, zorder=0) # Set x-axis bounds if month_range is None: ax.set_xlim(julian_start[4], julian_x[-1]) else: start_month = julian_months.index(int(month_range[0])) end_month = julian_months.index(int(month_range[1])) start_julian = julian_x[0] if start_month == 0 else julian_start[start_month] end_julian = julian_x[-1] if end_month == len( julian_months) - 1 else julian_start[end_month + 1] ax.set_xlim(start_julian, end_julian) # Format plot title by category category_names = {'ts': 'Tropical Storm', 'c1': 'Category 1', 'c2': 'Category 2', 'c3': 'Category 3', 'c4': 'Category 4', 'c5': 'Category 5'} if cat == 0: add_str = "Tropical Cyclone" else: add_str = category_names.get(cat) # Add plot title basin_title = self.basin.title().replace( '_', ' ') if self.basin != 'all' else 'Global' plot_year_title = '' if plot_year is None: title_string = f"{basin_title} Accumulated {add_str} Days" else: plot_year_title = str( plot_year) if self.basin not in constants.SOUTH_HEMISPHERE_BASINS else f'{plot_year-1}-{plot_year}' cur_year = (dt.now()).year if plot_year == cur_year: add_current = f" (through {(dt.now()).strftime('%b %d')})" else: add_current = "" title_string = f"{plot_year_title} {basin_title} Accumulated {add_str} Days{add_current}" if rolling_sum != 0: title_add = f"\n{rolling_sum}-Day Running Sum" else: title_add = "" ax.set_title(f"{title_string}{title_add}", fontsize=12, fontweight='bold', loc='left') # Plot requested year if plot_year is not None: if cat == 0: year_labels = [] for icat in all_thres[::-1]: year_julian_x = np.copy(julian_x) year_tc_days = tc_days[str(plot_year)][icat] # Check to see if this is current year cur_year = (dt.now()).year if plot_year == cur_year: start_time = dt(2019, 1, 1) if self.basin in constants.SOUTH_HEMISPHERE_BASINS: start_time = dt(2018, 7, 1) end_time = (dt.now()).replace( year=2019, minute=0, second=0) temp_julian = ((end_time - start_time).days + (end_time - start_time).seconds / 86400.0) + 1 cur_julian = int(temp_julian) * 4 - \ int(rolling_sum * 4) year_julian_x = year_julian_x[:cur_julian + 1] year_tc_days = year_tc_days[:cur_julian + 1] ax.plot(year_julian_x[-1], year_tc_days[-1], 'o', color=get_colors_sshws( icat), ms=8, mec='k', mew=0.8, zorder=8) year_tc_days_masked = np.array(year_tc_days) year_tc_days_masked = np.ma.masked_where( year_tc_days_masked == 0, year_tc_days_masked) ax.plot(year_julian_x, year_tc_days_masked, '-', color='k', linewidth=2.8, zorder=6) ax.plot(year_julian_x, year_tc_days_masked, '-', color=get_colors_sshws(icat), linewidth=2.0, zorder=6) year_labels.append(f"{np.max(year_tc_days):.1f}") else: year_julian_x = np.copy(julian_x) year_tc_days = tc_days[str(plot_year)][cat] year_genesis = tc_days[str(plot_year)]['genesis_index'] # Check to see if this is current year cur_year = (dt.now()).year if plot_year == cur_year: start_time = dt(2019, 1, 1) if self.basin in constants.SOUTH_HEMISPHERE_BASINS: start_time = dt(2018, 7, 1) end_time = (dt.now()).replace( year=2019, minute=0, second=0) temp_julian = ((end_time - start_time).days + (end_time - start_time).seconds / 86400.0) + 1 cur_julian = int(temp_julian) * 4 - int(rolling_sum * 4) year_julian_x = year_julian_x[:cur_julian + 1] year_tc_days = year_tc_days[:cur_julian + 1] year_genesis = year_genesis[year_genesis <= cur_julian] ax.plot(year_julian_x[-1], year_tc_days[-1], 'o', color='#FF7CFF', ms=8, mec='#750775', mew=0.8, zorder=8) ax.plot(year_julian_x, year_tc_days, '-', color='#750775', linewidth=2.8, zorder=6) ax.plot(year_julian_x, year_tc_days, '-', color='#FF7CFF', linewidth=2.0, zorder=6, label=f'{plot_year} ({np.max(year_tc_days):.1f} days)') ax.plot(year_julian_x[year_genesis], year_tc_days[year_genesis], 'D', color='#FF7CFF', ms=5, mec='#750775', mew=0.5, zorder=7, label='TC Genesis') # Plot comparison years if compare_years is not None and cat != 0: if isinstance(compare_years, int): compare_years = [compare_years] for year in compare_years: year_julian_x = np.copy(julian_x) year_tc_days = tc_days[str(year)][cat] year_genesis = tc_days[str(year)]['genesis_index'] # Check to see if this is current year cur_year = (dt.now()).year if year == cur_year: cur_julian = int(convert_to_julian((dt.now()).replace( year=2019, minute=0, second=0))) * 4 - int(rolling_sum * 4) year_julian_x = year_julian_x[:cur_julian + 1] year_tc_days = year_tc_days[:cur_julian + 1] year_genesis = year_genesis[:cur_julian + 1] ax.plot(year_julian_x[-1], year_tc_days[-1], 'o', color='#333333', alpha=0.3, ms=6, zorder=5) if len(compare_years) <= 5: ax.plot(year_julian_x, year_tc_days, '-', color='k', linewidth=1.0, alpha=0.5, zorder=3, label=f'{year} ({np.max(year_tc_days):.1f} days)') ax.plot(year_julian_x[year_genesis], year_tc_days[year_genesis], 'D', color='#333333', ms=3, alpha=0.3, zorder=4) ax.text(year_julian_x[-2], year_tc_days[-2] + 2, str( year), fontsize=7, fontweight='bold', alpha=0.7, ha='right', va='bottom') else: ax.plot(year_julian_x, year_tc_days, '-', color='k', linewidth=1.0, alpha=0.15, zorder=3) # Plot all climatological values if cat == 0: if plot_year is None: add_str = ["" for i in all_thres] else: add_str = [f" | {plot_year}: {i}" for i in year_labels[::-1]] xnums = np.zeros((p50['ts'].shape)) ax.fill_between(julian_x, p50['c1'], p50['ts'], color=get_colors_sshws( 34), alpha=0.3, zorder=2, label=f'TS (Avg: {np.max(p50["ts"]):.1f}{add_str[0]})') ax.fill_between(julian_x, p50['c2'], p50['c1'], color=get_colors_sshws( 64), alpha=0.3, zorder=2, label=f'C1 (Avg: {np.max(p50["c1"]):.1f}{add_str[1]})') ax.fill_between(julian_x, p50['c3'], p50['c2'], color=get_colors_sshws( 83), alpha=0.3, zorder=2, label=f'C2 (Avg: {np.max(p50["c2"]):.1f}{add_str[2]})') ax.fill_between(julian_x, p50['c4'], p50['c3'], color=get_colors_sshws( 96), alpha=0.3, zorder=2, label=f'C3 (Avg: {np.max(p50["c3"]):.1f}{add_str[3]})') ax.fill_between(julian_x, p50['c5'], p50['c4'], color=get_colors_sshws( 113), alpha=0.3, zorder=2, label=f'C4 (Avg: {np.max(p50["c4"]):.1f}{add_str[4]})') ax.fill_between(julian_x, xnums, p50['c5'], color=get_colors_sshws( 137), alpha=0.3, zorder=2, label=f'C5 (Avg: {np.max(p50["c5"]):.1f}{add_str[5]})') else: pmin_masked = np.array(pmin) pmin_masked = np.ma.masked_where(pmin_masked == 0, pmin_masked) ax.plot(julian_x, pmax, '--', color='r', zorder=2, label=f'Max ({np.max(pmax):.1f} days)') ax.plot(julian_x, pmin_masked, '--', color='b', zorder=2, label=f'Min ({np.max(pmin):.1f} days)') ax.fill_between(julian_x, p10, p90, color='#60CE56', alpha=0.3, zorder=2, label='Climo 10-90%') ax.fill_between(julian_x, p25, p75, color='#16A147', alpha=0.3, zorder=2, label='Climo 25-75%') ax.fill_between(julian_x, p40, p60, color='#00782A', alpha=0.3, zorder=2, label='Climo 40-60%') # Add legend & plot credit ax.legend(loc=2) endash = u"\u2013" ax.text(0.99, 0.01, plot_credit(), fontsize=6, color='k', alpha=0.7, transform=ax.transAxes, ha='right', va='bottom', zorder=10) ax.text(0.99, 0.99, f'Climatology from {climo_bounds[0]}{endash}{climo_bounds[-1]}', fontsize=8, color='k', alpha=0.7, transform=ax.transAxes, ha='right', va='top', zorder=10) # Show/save plot and close if save_path is not None and isinstance(save_path, str): plt.savefig(save_path, bbox_inches='tight') return ax
[docs] def wind_pres_relationship(self, storm=None, climo_bounds=None, return_dict=False, save_path=None): r""" Creates a climatology of maximum sustained wind speed vs minimum MSLP relationships. Parameters ---------- storm : str or tuple Storm to plot. Can be either string of storm ID (e.g., "AL052019"), or tuple with storm name and year (e.g., ("Matthew",2016)). climo_bounds : list or tuple List or tuple representing the start and end years (e.g., ``(1950,2018)``). Default is the start and end of dataset. return_dict : bool If False (default), plot axes will be returned. If True, a dictionary containing the raw data is returned. save_path : str Determines the file path to save the image to. If blank or none, image will be directly shown. Returns ------- ax or dict By default, the plot axes is returned. If return_dict is True, a dictionary containing data about the wind vs. MSLP relationship climatology is returned. Notes ----- Climatology is only valid for time steps during which cyclones were tropical or subtropical. """ # Define empty dictionary relationship = {} # Determine year range of dataset if climo_bounds is None: start_year = self.data[self.keys[0]]['year'] end_year = self.data[self.keys[-1]]['year'] elif isinstance(climo_bounds, (list, tuple)): if len(climo_bounds) != 2: raise ValueError( "climo_bounds must be a tuple or list with 2 elements: (start_year, end_year)") start_year = int(climo_bounds[0]) if start_year < self.data[self.keys[0]]['year']: start_year = self.data[self.keys[0]]['year'] end_year = int(climo_bounds[1]) if end_year > self.data[self.keys[-1]]['year']: end_year = self.data[self.keys[-1]]['year'] else: raise TypeError("climo_bounds must be of type tuple or list") # Get velocity & pressure pairs for all storms in dataset vp = filter_storms_vp(self, year_min=start_year, year_max=end_year) relationship['vp'] = vp # Create 2D histogram of v+p relationship counts, yedges, xedges = np.histogram2d( *zip(*vp), [np.arange(800, 1050, 5) - 2.5, np.arange(0, 250, 5) - 2.5]) relationship['counts'] = counts relationship['yedges'] = yedges relationship['xedges'] = xedges if return_dict: return relationship # Create figure fig, ax = plt.subplots(figsize=(12, 9.5), dpi=200) # Plot climatology CS = plt.pcolor(xedges, yedges, counts**0.3, vmin=0, vmax=np.amax(counts)**.3, cmap='gnuplot2_r') plt.plot(xedges, [testfit(vp, x, 2) for x in xedges], 'k--', linewidth=2) # Plot storm, if specified if storm is not None: # Check if storm is str or tuple if isinstance(storm, str): pass elif isinstance(storm, tuple): storm = self.get_storm_id((storm[0], storm[1])) else: raise RuntimeError( "Storm must be a string (e.g., 'AL052019') or tuple (e.g., ('Matthew',2016)).") # Plot storm storm_data = self.data[storm] V = np.array(storm_data['vmax']) P = np.array(storm_data['mslp']) T = np.array(storm_data['type']) def get_color(itype): if itype in constants.TROPICAL_STORM_TYPES: return ['#00EE00', 'palegreen'] # lime else: return ['#00A600', '#3BD73B'] def getMarker(itype): mtype = '^' if itype in constants.SUBTROPICAL_ONLY_STORM_TYPES: mtype = 's' elif itype in constants.TROPICAL_ONLY_STORM_TYPES: mtype = 'o' return mtype xt_label = False tr_label = False for i, (iv, ip, it) in enumerate(zip(V[:-1], P[:-1], T[:-1])): check = False if it in constants.TROPICAL_STORM_TYPES and tr_label: check = True if not it in constants.TROPICAL_STORM_TYPES and xt_label: check = True if check: plt.scatter(iv, ip, marker='o', s=80, color=get_color( it)[0], edgecolor='k', zorder=9) else: if it in constants.TROPICAL_STORM_TYPES and not tr_label: tr_label = True label_content = f"{storm_data['name'].title()} {storm_data['year']} (Tropical)" if it not in constants.TROPICAL_STORM_TYPES and not xt_label: xt_label = True label_content = f"{storm_data['name'].title()} {storm_data['year']} (Non-Tropical)" plt.scatter(iv, ip, marker='o', s=80, color=get_color( it)[0], edgecolor='k', label=label_content, zorder=9) plt.scatter(V[-1], P[-1], marker='D', s=80, color=get_color(it) [0], edgecolor='k', linewidth=2, zorder=9) for i, (iv, ip, it, mv, mp, mt) in enumerate(zip(V[1:], P[1:], T[1:], V[:-1], P[:-1], T[:-1])): plt.quiver(mv, mp, iv - mv, ip - mp, scale_units='xy', angles='xy', scale=1, width=0.005, color=get_color(it)[1], zorder=8) # Add legend plt.legend(loc='upper right', scatterpoints=1, prop={'weight': 'bold', 'size': 14}) # Additional plot settings plt.xlabel('Maximum sustained winds (kt)', fontsize=14) plt.ylabel('Minimum central pressure (hPa)', fontsize=14) plt.title(f"TC Pressure vs. Wind \n {self.basin.title().replace('_',' ')} | " + f"{start_year}-{end_year}", fontsize=18, fontweight='bold') plt.xticks(np.arange(20, 200, 20)) plt.yticks(np.arange(880, 1040, 20)) plt.tick_params(labelsize=14) plt.grid() plt.axis([0, 200, 860, 1040]) cbar = fig.colorbar(CS) cbar.ax.set_ylabel('Historical Frequency', fontsize=14) cbar.ax.tick_params(labelsize=14) cbar.set_ticks(np.array( [i for i in [0, 5, 50, 200, 500, 1000, 2000] if i < np.amax(counts)])**0.3) cbar.set_ticklabels( [i for i in [0, 5, 50, 200, 500, 1000, 2000] if i < np.amax(counts)]) # add credit credit_text = Plot().plot_credit() plt.text(0.99, 0.01, credit_text, fontsize=9, color='k', alpha=0.7, backgroundcolor='w', transform=plt.gca().transAxes, ha='right', va='bottom', zorder=10) # Show/save plot and close if save_path is not None and isinstance(save_path, str): plt.savefig(save_path, bbox_inches='tight') return ax
[docs] def rank_storm(self, metric, return_df=True, ascending=False, domain=None, year_range=None, date_range=None, subtropical=True): r""" Ranks storm by a specified metric. Parameters ---------- metric : str Metric to rank storms by. Can be any of the following: * **ace** = rank storms by ACE * **start_lat** = starting latitude of cyclone * **start_lon** = starting longitude of cyclone * **end_lat** = ending latitude of cyclone * **end_lon** = ending longitude of cyclone * **start_date** = formation date of cyclone * **start_date_indomain** = first time step a cyclone entered the domain * **max_wind** = first instance of the maximum sustained wind of cyclone * **min_mslp** = first instance of the minimum MSLP of cyclone * **wind_ge_XX** = first instance of wind greater than/equal to a certain threshold (knots) return_df : bool Whether to return a pandas.DataFrame (True) or dict (False). Default is True. ascending : bool Whether to return rank in ascending order (True) or descending order (False). Default is False. domain : str Geographic domain. Default is entire basin. Please refer to :ref:`options-domain` for available domain options. year_range : list or tuple List or tuple representing the start and end years (e.g., (1950,2018)). Default is start and end years of dataset. date_range : list or tuple List or tuple representing the start and end dates in 'month/day' format (e.g., (6/1,8/15)). Default is entire year. subtropical : bool Whether to include subtropical storms in the ranking. Default is True. Returns ------- pandas.DataFrame Returns a pandas DataFrame containing ranked storms. If pandas is not installed, a dict will be returned instead. """ if self.source == 'ibtracs': warnings.warn( "This function is not currently configured to work for the ibtracs dataset.") # Revise metric if threshold included if 'wind_ge' in metric: thresh = int(metric.split("_")[2]) metric = 'wind_ge' # Error check for metric metric = metric.lower() metric_bank = {'ace': {'output': ['ace'], 'subset_type': 'domain'}, 'start_lat': {'output': ['lat', 'lon', 'type'], 'subset_type': 'start'}, 'start_lon': {'output': ['lon', 'lat', 'type'], 'subset_type': 'start'}, 'end_lat': {'output': ['lat', 'lon', 'type'], 'subset_type': 'end'}, 'end_lon': {'output': ['lon', 'lat', 'type'], 'subset_type': 'end'}, 'start_date': {'output': ['time', 'lat', 'lon', 'type'], 'subset_type': 'start'}, 'start_date_indomain': {'output': ['time', 'lat', 'lon', 'type'], 'subset_type': 'domain'}, 'max_wind': {'output': ['vmax', 'mslp', 'lat', 'lon'], 'subset_type': 'domain'}, 'min_mslp': {'output': ['mslp', 'vmax', 'lat', 'lon'], 'subset_type': 'domain'}, 'wind_ge': {'output': ['lat', 'lon', 'mslp', 'vmax', 'time'], 'subset_type': 'start'}, } if metric not in metric_bank.keys(): raise ValueError( "Metric requested for sorting is not available. Please reference the documentation for acceptable entries for 'metric'.") # Determine year range of dataset if year_range is None: start_year = self.data[self.keys[0]]['year'] end_year = self.data[self.keys[-1]]['year'] elif isinstance(year_range, (list, tuple)): if len(year_range) != 2: raise ValueError( "year_range must be a tuple or list with 2 elements: (start_year, end_year)") start_year = int(year_range[0]) end_year = int(year_range[1]) else: raise TypeError("year_range must be of type tuple or list") # Initialize empty dict analyze_list = metric_bank[metric]['output'] analyze_list.insert(1, 'id') analyze_list.insert(2, 'name') analyze_list.insert(3, 'year') analyze_dict = {key: [] for key in analyze_list} # Iterate over every storm in dataset for storm in self.keys: # Get entry for this storm storm_data = self.data[storm] # Filter by year if storm_data['year'] < start_year or storm_data['year'] > end_year: continue # Filter for purely tropical/subtropical storm locations type_array = np.array(storm_data['type']) if subtropical: idx = np.where((type_array == 'SD') | (type_array == 'SS') | (type_array == 'TD') | ( type_array == 'TS') | (type_array == 'HU') | (type_array == 'TY') | (type_array == 'ST')) else: idx = np.where((type_array == 'TD') | (type_array == 'TS') | ( type_array == 'HU') | (type_array == 'TY') | (type_array == 'ST')) if len(idx[0]) == 0: continue lat_tropical = np.array(storm_data['lat'])[idx] lon_tropical = np.array(storm_data['lon'])[idx] time_tropical = np.array(storm_data['time'])[idx] type_tropical = np.array(storm_data['type'])[idx] vmax_tropical = np.array(storm_data['vmax'])[idx] mslp_tropical = np.array(storm_data['mslp'])[idx] basin_tropical = np.array(storm_data['wmo_basin'])[idx] # Filter geographically if domain is not None: if isinstance(domain, dict): keys = domain.keys() check = [False, False, False, False] for key in keys: if key[0].lower() == 'n': check[0] = True bound_n = domain[key] if key[0].lower() == 's': check[1] = True bound_s = domain[key] if key[0].lower() == 'e': check[2] = True bound_e = domain[key] if key[0].lower() == 'w': check[3] = True bound_w = domain[key] if False in check: msg = "Custom domains must be of type dict with arguments for 'n', 's', 'e' and 'w'." raise ValueError(msg) idx = np.where((lat_tropical >= bound_s) & (lat_tropical <= bound_n) & ( lon_tropical >= bound_w) & (lon_tropical <= bound_e)) elif isinstance(domain, str): idx = np.where(basin_tropical == domain) else: msg = "domain must be of type str or dict." raise TypeError(msg) if len(idx[0]) == 0: continue # Check for subset type subset_type = metric_bank[metric]['subset_type'] if subset_type == 'domain': lat_tropical = lat_tropical[idx] lon_tropical = lon_tropical[idx] time_tropical = time_tropical[idx] type_tropical = type_tropical[idx] vmax_tropical = vmax_tropical[idx] mslp_tropical = mslp_tropical[idx] basin_tropical = basin_tropical[idx] # Filter by time if date_range is not None: start_time = dt.strptime( f"{storm_data['year']}/{date_range[0]}", '%Y/%m/%d') end_time = dt.strptime( f"{storm_data['year']}/{date_range[1]}", '%Y/%m/%d') + timedelta(hours=23) idx = np.array([i for i in range(len( lat_tropical)) if time_tropical[i] >= start_time and time_tropical[i] <= end_time]) if len(idx) == 0: continue # Check for subset type subset_type = metric_bank[metric]['subset_type'] if subset_type == 'domain': lat_tropical = lat_tropical[idx] lon_tropical = lon_tropical[idx] time_tropical = time_tropical[idx] type_tropical = type_tropical[idx] vmax_tropical = vmax_tropical[idx] mslp_tropical = mslp_tropical[idx] basin_tropical = basin_tropical[idx] # Filter by requested metric if metric == 'ace': if storm_data['ace'] == 0: continue storm_ace = 0.0 for i, (i_time, i_vmax) in enumerate(zip(time_tropical, vmax_tropical)): if i_time.strftime('%H%M') not in constants.STANDARD_HOURS: continue storm_ace += accumulated_cyclone_energy(i_vmax) analyze_dict['ace'].append(round(storm_ace, 4)) elif metric in ['start_lat', 'end_lat', 'start_lon', 'end_lon']: use_idx = 0 if 'start' in metric else -1 analyze_dict['lat'].append(lat_tropical[use_idx]) analyze_dict['lon'].append(lon_tropical[use_idx]) analyze_dict['type'].append(type_tropical[use_idx]) elif metric in ['start_date']: analyze_dict['lat'].append(lat_tropical[0]) analyze_dict['lon'].append(lon_tropical[0]) analyze_dict['type'].append(type_tropical[0]) analyze_dict['time'].append( time_tropical[0].replace(year=2016)) elif metric in ['max_wind', 'min_mslp']: # Find max wind or min MSLP if metric == 'max_wind' and all_nan(vmax_tropical): continue if metric == 'min_mslp' and all_nan(mslp_tropical): continue use_idx = np.where( vmax_tropical == np.nanmax(vmax_tropical))[0][0] if metric == 'min_mslp': use_idx = np.where( mslp_tropical == np.nanmin(mslp_tropical))[0][0] analyze_dict['lat'].append(lat_tropical[use_idx]) analyze_dict['lon'].append(lon_tropical[use_idx]) analyze_dict['mslp'].append(mslp_tropical[use_idx]) analyze_dict['vmax'].append(vmax_tropical[use_idx]) elif metric in ['wind_ge']: # Find max wind or min MSLP if metric == 'wind_ge' and all_nan(vmax_tropical): continue if metric == 'wind_ge' and np.nanmax(vmax_tropical) < thresh: continue use_idx = np.where(vmax_tropical >= thresh)[0][0] analyze_dict['lat'].append(lat_tropical[use_idx]) analyze_dict['lon'].append(lon_tropical[use_idx]) analyze_dict['time'].append(time_tropical[use_idx]) analyze_dict['mslp'].append(mslp_tropical[use_idx]) analyze_dict['vmax'].append(vmax_tropical[use_idx]) # Append generic storm attributes analyze_dict['id'].append(storm) analyze_dict['name'].append(storm_data['name']) analyze_dict['year'].append(int(storm_data['year'])) # Error check if len(analyze_dict[analyze_list[0]]) == 0: raise RuntimeError( "No storms were found given the requested criteria.") # Sort in requested order arg_idx = np.argsort(analyze_dict[analyze_list[0]]) if not ascending: arg_idx = arg_idx[::-1] # Sort all variables in requested order for key in analyze_dict.keys(): analyze_dict[key] = (np.array(analyze_dict[key])[arg_idx]) # Enter into new ranked dict ranked_dict = {} for i in range(len(analyze_dict['id'])): ranked_dict[i + 1] = {key: analyze_dict[key][i] for key in analyze_list} if 'time' in ranked_dict[i + 1].keys(): ranked_dict[i + 1]['time'] = ranked_dict[i + 1]['time'].replace(year=ranked_dict[i + 1]['year']) # Return ranked dictionary return (pd.DataFrame(ranked_dict).transpose())[analyze_list]
[docs] def storm_ace_vs_season(self, storm, year_range=None): r""" Retrives a list of entire hurricane seasons with lower ACE than the storm provided. Parameters ---------- storm : str or tuple Storm to rank seasons against. Can be either string of storm ID (e.g., "AL052019"), or tuple with storm name and year (e.g., ("Matthew",2016)). year_range : list or tuple List or tuple representing the start and end years (e.g., (1950,2018)). Default is 1950 through the last year in the dataset. Returns ------- dict Dictionary containing the seasons with less ACE than the requested storm. """ # Warning for ibtracs if self.source == 'ibtracs': warning_str = "This function is not currently configured to optimally work for the ibtracs dataset." warnings.warn(warning_str) # Determine year range of dataset if year_range is None: start_year = self.data[self.keys[0]]['year'] if start_year < 1950: start_year = 1950 end_year = self.data[self.keys[-1]]['year'] elif isinstance(year_range, (list, tuple)): if len(year_range) != 2: raise ValueError( "year_range must be a tuple or list with 2 elements: (start_year, end_year)") start_year = int(year_range[0]) if start_year < self.data[self.keys[0]]['year']: start_year = self.data[self.keys[0]]['year'] end_year = int(year_range[1]) if end_year > self.data[self.keys[-1]]['year']: end_year = self.data[self.keys[-1]]['year'] else: raise TypeError("year_range must be of type tuple or list") # Check if storm is str or tuple if isinstance(storm, str): pass elif isinstance(storm, tuple): storm = self.get_storm_id((storm[0], storm[1])) else: raise RuntimeError( "Storm must be a string (e.g., 'AL052019') or tuple (e.g., ('Matthew',2016)).") # Get ACE for this storm storm_data = self.data[storm] # Retrieve ACE for this event storm_name = storm_data['name'] storm_year = storm_data['year'] storm_ace = np.round(storm_data['ace'], 4) # Initialize empty dict ace_rank = {'year': [], 'ace': []} # Iterate over every season for year in range(start_year, end_year + 1): season = self.get_season(year) year_data = season.summary() year_ace = year_data['season_ace'] # Compare year ACE against storm ACE if year_ace < storm_ace: ace_rank['year'].append(year) ace_rank['ace'].append(year_ace) return ace_rank
[docs] def filter_storms(self, storm=None, year_range=None, date_range=None, thresh={}, domain=None, interpolate_data=False, return_keys=True): r""" Filters all storms by various thresholds. Parameters ---------- storm : list or str Single storm ID or list of storm IDs (e.g., ``'AL012022'``, ``['AL012022','AL022022']``) to search through. If None, defaults to searching through the entire dataset. year_range : list or tuple List or tuple representing the start and end years (e.g., ``(1950,2018)``). Default is start and end years of dataset. date_range : list or tuple List or tuple representing the start and end dates as a string in 'month/day' format (e.g., ``('6/1','8/15')``). Default is ``('1/1','12/31')`` or full year. thresh : dict Keywords include: * **sample_min** - minimum number of storms in a grid box for "request" to be applied. For the functions 'percentile' and 'average', 'sample_min' defaults to 5 and will override any value less than 5. * **v_min** - minimum wind for a given point to be included in "request". * **p_max** - maximum pressure for a given point to be included in "request". * **dv_min** - minimum change in wind over dt_window for a given point to be included in "request". * **dp_max** - maximum change in pressure over dt_window for a given point to be included in "request". * **dt_window** - time window over which change variables are calculated (hours). Default is 24. * **dt_align** - alignment of dt_window for change variables -- 'start','middle','end' -- e.g. 'end' for dt_window=24 associates a TC point with change over the past 24 hours. Default is middle. Units of all wind variables = kt, and pressure variables = hPa. These are added to the subtitle. domain : str Geographic domain. Default is entire basin. Please refer to :ref:`options-domain` for available domain options. interpolate_data : bool Whether to interpolate track data to hourly. Default is False. return_keys : bool If True, returns a list of storm IDs that match the specified criteria. Otherwise returns a pandas.DataFrame object with all matching data points. Default is True. Returns ------- list or pandas.DataFrame Check return_keys for more information. """ # Add default year aned date ranges if year_range is None: year_range = (0, 9999) if date_range is None: date_range = ('1/1', '12/31') # Add interpolation automatically if requested threshold necessitates it check_keys = [True if i in thresh else False for i in [ 'dv_min', 'dv_max', 'dp_min', 'dp_max', 'speed_min', 'speed_max']] if True in check_keys: interpolate_data = True # Update thresh based on input default_thresh = {'sample_min': 1, 'p_max': 9999, 'p_min': 0, 'v_min': 0, 'v_max': 9999, 'dv_min': -9999, 'dp_max': 9999, 'dv_max': 9999, 'dp_min': -9999, 'speed_max': 9999, 'speed_min': -9999, 'dt_window': 24, 'dt_align': 'middle'} for key in thresh: default_thresh[key] = thresh[key] thresh = default_thresh # Determine domain over which to filter data if domain is None: lon_min = 0 lon_max = 360 lat_min = -90 lat_max = 90 else: keys = domain.keys() check = [False, False, False, False] for key in keys: if key[0].lower() == 'n': check[0] = True lat_max = domain[key] if key[0].lower() == 's': check[1] = True lat_min = domain[key] if key[0].lower() == 'e': check[2] = True lon_max = domain[key] if key[0].lower() == 'w': check[3] = True lon_min = domain[key] if False in check: msg = "Custom domains must be of type dict with arguments for 'n', 's', 'e' and 'w'." raise ValueError(msg) if lon_max < 0: lon_max += 360.0 if lon_min < 0: lon_min += 360.0 # Determine year and date range year_min, year_max = year_range date_min, date_max = [dt.strptime(i, '%m/%d') for i in date_range] date_max += timedelta(days=1, seconds=-1) # Determine if a date falls within the date range def date_range_test(t, t_min, t_max): if date_min < date_max: test1 = (t >= t_min.replace(year=t.year)) test2 = (t <= t_max.replace(year=t.year)) return test1 & test2 else: test1 = (t_min.replace(year=t.year) <= t < dt(t.year + 1, 1, 1)) test2 = (dt(t.year, 1, 1) <= t <= t_max.replace(year=t.year)) return test1 | test2 # Create empty dictionary to store output in points = {} for name in ['vmax', 'mslp', 'type', 'lat', 'lon', 'time', 'season', 'stormid', 'ace'] + \ ['dmslp_dt', 'dvmax_dt', 'acie', 'dx_dt', 'dy_dt', 'speed'] * int(interpolate_data): points[name] = [] # Iterate over every storm in TrackDataset if storm is not None: if isinstance(storm, list): if isinstance(storm[0], tuple): stormkeys = [self.get_storm_id(s) for s in storm] else: stormkeys = storm elif isinstance(storm, tuple): stormkeys = [self.get_storm_id(storm)] else: stormkeys = [storm] else: stormkeys = self.keys for key in stormkeys: # Only interpolate storms within the provided temporal range if self.data[key]['year'] <= (year_range[0] - 1) or self.data[key]['year'] >= (year_range[-1] + 1): continue subset_dates = np.array(self.data[key]['time'])[np.array( [i in constants.TROPICAL_STORM_TYPES for i in self.data[key]['type']])] if len(subset_dates) == 0: continue verify_dates = [date_range_test( i, date_min, date_max) for i in subset_dates] if True not in verify_dates: continue # Interpolate temporally if requested if interpolate_data: istorm = interp_storm(self.data[key].copy(), hours=1, dt_window=thresh['dt_window'], dt_align=thresh['dt_align']) self.data_interp[key] = istorm.copy() timeres = 1 else: istorm = self.data[key] timeres = 6 # Iterate over every timestep of the storm for i in range(len(istorm['time'])): # Filter to only tropical cyclones, and filter by dates & coordiates if istorm['type'][i] in constants.TROPICAL_STORM_TYPES \ and lat_min <= istorm['lat'][i] <= lat_max and lon_min <= istorm['lon'][i] % 360 <= lon_max \ and year_min <= istorm['time'][i].year <= year_max \ and date_range_test(istorm['time'][i], date_min, date_max): # Append data points points['vmax'].append(istorm['vmax'][i]) points['mslp'].append(istorm['mslp'][i]) points['type'].append(istorm['type'][i]) points['lat'].append(istorm['lat'][i]) points['lon'].append(istorm['lon'][i]) points['time'].append(istorm['time'][i]) points['season'].append(istorm['season']) points['stormid'].append(key) if istorm['vmax'][i] > 34: points['ace'].append( istorm['vmax'][i]**2 * 1e-4 * timeres / 6) else: points['ace'].append(0) # Append separately for interpolated data if interpolate_data: points['dvmax_dt'].append(istorm['dvmax_dt'][i]) points['acie'].append( [0, istorm['dvmax_dt'][i]**2 * 1e-4 * timeres / 6][istorm['dvmax_dt'][i] > 0]) points['dmslp_dt'].append(istorm['dmslp_dt'][i]) points['dx_dt'].append(istorm['dx_dt'][i]) points['dy_dt'].append(istorm['dy_dt'][i]) points['speed'].append(istorm['speed'][i]) # Create a DataFrame from the dictionary p = pd.DataFrame.from_dict(points) # Filter by thresholds if thresh['v_min'] > 0: p = p.loc[(p['vmax'] >= thresh['v_min'])] if thresh['v_max'] < 9999: p = p.loc[(p['vmax'] <= thresh['v_max'])] if thresh['p_max'] < 9999: p = p.loc[(p['mslp'] <= thresh['p_max'])] if thresh['p_min'] > 0: p = p.loc[(p['mslp'] >= thresh['p_min'])] if interpolate_data: if thresh['dv_min'] > -9999: p = p.loc[(p['dvmax_dt'] >= thresh['dv_min'])] if thresh['dp_max'] < 9999: p = p.loc[(p['dmslp_dt'] <= thresh['dp_max'])] if thresh['dv_max'] < 9999: p = p.loc[(p['dvmax_dt'] <= thresh['dv_max'])] if thresh['dp_min'] > -9999: p = p.loc[(p['dmslp_dt'] >= thresh['dp_min'])] if thresh['speed_max'] < 9999: p = p.loc[(p['speed'] >= thresh['speed_max'])] if thresh['speed_min'] > -9999: p = p.loc[(p['speed'] >= thresh['speed_min'])] # Determine how to return data if return_keys: return [g[0] for g in p.groupby("stormid")] else: return p
[docs] def gridded_stats(self, request, thresh={}, storm=None, year_range=None, year_range_subtract=None, year_average=False, date_range=('1/1', '12/31'), binsize=1, domain=None, ax=None, return_array=False, cartopy_proj=None, **kwargs): r""" Creates a plot of gridded statistics. Parameters ---------- request : str This string is a descriptor for what you want to plot. It will be used to define the variable (e.g. 'wind' --> 'vmax') and the function (e.g. 'maximum' --> np.max()). This string is also used as the plot title. Variable words to use in request: * **wind** - (kt). Sustained wind. * **pressure** - (hPa). Minimum pressure. * **wind change** - (kt/time). Must be followed by an integer value denoting the length of the time window '__ hours' (e.g., "wind change in 24 hours"). * **pressure change** - (hPa/time). Must be followed by an integer value denoting the length of the time window '__ hours' (e.g., "pressure change in 24 hours"). * **storm motion** - (km/hour). Can be followed a length of time window. Otherwise defaults to 24 hours. Units of all wind variables are knots and pressure variables are hPa. These are added into the title. Function words to use in request: * **maximum** * **minimum** * **average** * **percentile** - Percentile must be preceded by an integer [0,100]. * **number** - Number of storms in grid box satisfying filter thresholds. Example usage: "maximum wind change in 24 hours", "50th percentile wind", "number of storms" thresh : dict, optional Keywords include: * **sample_min** - minimum number of storms in a grid box for the request to be applied. For the functions 'percentile' and 'average', 'sample_min' defaults to 5 and will override any value less than 5. * **v_min** - minimum wind for a given point to be included in the request. * **p_max** - maximum pressure for a given point to be included in the request. * **dv_min** - minimum change in wind over dt_window for a given point to be included in the request. * **dp_max** - maximum change in pressure over dt_window for a given point to be included in the request. * **dt_window** - time window over which change variables are calculated (hours). Default is 24. * **dt_align** - alignment of dt_window for change variables -- 'start','middle','end' -- e.g. 'end' for dt_window=24 associates a TC point with change over the past 24 hours. Default is middle. Units of all wind variables = kt, and pressure variables = hPa. These are added to the subtitle. year_range : list or tuple, optional List or tuple representing the start and end years (e.g., (1950,2018)). Default is start and end years of dataset. year_range_subtract : list or tuple, optional A year range to subtract from the previously specified "year_range". If specified, will create a difference plot. year_average : bool, optional If True, both year ranges will be computed and plotted as an annual average. date_range : list or tuple, optional List or tuple representing the start and end dates as a string in 'month/day' format (e.g., ``('6/1','8/15')``). Default is ``('1/1','12/31')`` i.e., the full year. binsize : float, optional Grid resolution in degrees. Default is 1 degree. domain : str, optional Domain for the plot. Default is "dynamic". Please refer to :ref:`options-domain` for available domain options. ax : axes, optional Instance of axes to plot on. If none, one will be generated. Default is none. return_array : bool, optional If True, returns the gridded 2D array used to generate the plot. Default is False. cartopy_proj : ccrs, optional Instance of a cartopy projection to use. If none, one will be generated. Default is none. Other Parameters ---------------- prop : dict, optional Customization properties of plot. Please refer to :ref:`options-prop-gridded` for available options. map_prop : dict, optional Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options. Returns ------- By default, the plot axes is returned. If "return_array" are set to True, a dictionary is returned containing both the axes and data array. Notes ----- The following properties are available for customizing the plot, via ``prop``: .. list-table:: :widths: 25 75 :header-rows: 1 * - Property - Description * - plot_values - Boolean for whether to plot label values for each gridpoint. Default is False. * - smooth - Number (in units of sigma) to smooth the data using scipy's gaussian filter. Default is 0 (no smoothing). * - cmap - Colormap to use for the plot. If string 'category' is passed (default), uses a pre-defined color scale corresponding to the Saffir-Simpson Hurricane Wind Scale. * - clevs - Contour levels for the plot. Default is minimum and maximum values in the grid. * - left_title - Title string for the left side of the plot. Default is the string passed via the 'request' keyword argument. * - right_title - Title string for the right side of the plot. Default is 'All storms'. """ # Retrieve kwargs prop = kwargs.pop('prop', {}) map_prop = kwargs.pop('map_prop', {}) default_prop = {'smooth': None} for key in prop.keys(): default_prop[key] = prop[key] prop = default_prop # Update thresh based on input default_thresh = {'sample_min': np.nan, 'p_max': np.nan, 'v_min': np.nan, 'dv_min': np.nan, 'dp_max': np.nan, 'dv_max': np.nan, 'dp_min': np.nan, 'dt_window': 24, 'dt_align': 'middle'} for key in thresh: default_thresh[key] = thresh[key] thresh = default_thresh # Retrieve the requested function, variable for computing stats, and plot title. These modify thresh if necessary. thresh, func = find_func(request, thresh) thresh, varname = find_var(request, thresh) thresh, plot_subtitle = construct_title(thresh) if storm is not None: thresh['sample_min'] = 1 plot_subtitle = '' # Determine whether request includes a vector (i.e., TC motion vector) VEC_FLAG = isinstance(varname, tuple) # Determine year range of plot def get_year_range(y_r): start_year = self.data[self.keys[0]]['year'] end_year = self.data[self.keys[-1]]['year'] if y_r is None: new_y_r = (start_year, end_year) else: if not isinstance(y_r, (list, tuple)): msg = "\"year_range\" and \"year_range_subtract\" must be of type list or tuple." raise ValueError(msg) if year_range_subtract is not None and len(year_range_subtract) != 2: msg = "\"year_range\" and \"year_range_subtract\" must contain 2 elements." raise ValueError(msg) new_y_r = (max((start_year, min(y_r))), min((end_year, max(y_r)))) return new_y_r year_range = get_year_range(year_range) # Start date in numpy datetime64 startdate = np.datetime64( '2000-' + '-'.join([f'{int(d):02}' for d in date_range[0].split('/')])) # Determine year range to subtract, if making a difference plot if year_range_subtract is not None: year_range_subtract = get_year_range(year_range_subtract) # --------------------------------------------------------------------------------------------------- # Perform analysis either once or twice depending on year_range_subtract if year_range_subtract is None: years_analysis = [year_range] else: years_analysis = [year_range, year_range_subtract] grid_x_years = [] grid_y_years = [] grid_z_years = [] for year_range_temp in years_analysis: # Obtain all data points for the requested threshold and year/date ranges. Interpolate data to hourly. print("--> Getting filtered storm tracks") points = self.filter_storms( storm, year_range_temp, date_range, thresh=thresh, interpolate_data=True, return_keys=False) # Round lat/lon points down to nearest bin def to_bin(x): return np.floor(x / binsize) * binsize points["latbin"] = points.lat.map(to_bin) points["lonbin"] = points.lon.map(to_bin) # --------------------------------------------------------------------------------------------------- # Group by latbin,lonbin,stormid print("--> Grouping by lat/lon/storm") groups = points.groupby(["latbin", "lonbin", "stormid", "season"]) # Loops through groups, and apply stat func to storms # Constructs a new dataframe containing the lat/lon bins, storm ID and the plotting variable new_df = {'latbin': [], 'lonbin': [], 'stormid': [], 'season': [], varname: []} for g in groups: # Apply function to all time steps in which a storm tracks within a gridbox if VEC_FLAG: new_df[varname].append( [func(g[1][v].values) for v in varname]) elif varname == 'date': new_df[varname].append(func([date_diff(dt(2000, t.month, t.day), startdate) for t in pd.DatetimeIndex(g[1][varname].values)])) else: new_df[varname].append(func(g[1][varname].values)) new_df['latbin'].append(g[0][0]) new_df['lonbin'].append(g[0][1]) new_df['stormid'].append(g[0][2]) new_df['season'].append(g[0][3]) new_df = pd.DataFrame.from_dict(new_df) # --------------------------------------------------------------------------------------------------- # Group again by latbin,lonbin # Construct two 1D lists: zi (grid values) and coords, that correspond to the 2D grid groups = new_df.groupby(["latbin", "lonbin"]) # Apply the function to all storms that pass through a gridpoint if VEC_FLAG: zi = [[func(v) for v in zip(*g[1][varname])] if len(g[1]) >= thresh['sample_min'] else [np.nan] * 2 for g in groups] elif varname == 'date': zi = [func(g[1][varname]) if len(g[1]) >= thresh['sample_min'] else np.nan for g in groups] zi = [mdates.date2num(startdate + z) for z in zi] else: zi = [func(g[1][varname]) if len(g[1]) >= thresh['sample_min'] else np.nan for g in groups] # Construct a 1D array of coordinates coords = [g[0] for g in groups] # Construct a 2D longitude and latitude grid, using the specified binsize resolution if prop['smooth'] is not None: all_lats = [(round(l / binsize) * binsize) for key in self.data.keys() for l in self.data[key]['lat']] all_lons = [(round(l / binsize) * binsize) % 360 for key in self.data.keys() for l in self.data[key]['lon']] xi = np.arange(min(all_lons) - binsize, max(all_lons) + 2 * binsize, binsize) yi = np.arange(min(all_lats) - binsize, max(all_lats) + 2 * binsize, binsize) if self.basin == 'all': xi = np.arange(0, 360 + binsize, binsize) yi = np.arange(-90, 90 + binsize, binsize) else: xi = np.arange(np.nanmin( points["lonbin"]) - binsize, np.nanmax(points["lonbin"]) + 2 * binsize, binsize) yi = np.arange(np.nanmin( points["latbin"]) - binsize, np.nanmax(points["latbin"]) + 2 * binsize, binsize) grid_x, grid_y = np.meshgrid(xi, yi) grid_x_years.append(grid_x) grid_y_years.append(grid_y) # Construct a 2D grid for the z value, depending on whether vector or scalar quantity if VEC_FLAG: grid_z_u = np.ones(grid_x.shape) * np.nan grid_z_v = np.ones(grid_x.shape) * np.nan for c, z in zip(coords, zi): grid_z_u[np.where((grid_y == c[0]) & (grid_x == c[1]))] = z[0] grid_z_v[np.where((grid_y == c[0]) & (grid_x == c[1]))] = z[1] grid_z = [grid_z_u, grid_z_v] else: grid_z = np.ones(grid_x.shape) * np.nan for c, z in zip(coords, zi): grid_z[np.where((grid_y == c[0]) & (grid_x == c[1]))] = z # Set zero values to nan's if necessary if varname == 'type': grid_z[np.where(grid_z == 0)] = np.nan # Add to list of grid_z's grid_z_years.append(grid_z) # --------------------------------------------------------------------------------------------------- # Calculate difference between plots, if specified if len(grid_z_years) == 2: # Determine whether to use averages if year_average: years_listed = len(range(year_range[0], year_range[1] + 1)) grid_z_years[0] = grid_z_years[0] / years_listed years_listed = len( range(year_range_subtract[0], year_range_subtract[1] + 1)) grid_z_years[1] = grid_z_years[1] / years_listed # Construct DataArrays grid_z_1 = xr.DataArray(np.nan_to_num(grid_z_years[0]), coords=[ grid_y_years[0].T[0], grid_x_years[0][0]], dims=['lat', 'lon']) grid_z_2 = xr.DataArray(np.nan_to_num(grid_z_years[1]), coords=[ grid_y_years[1].T[0], grid_x_years[1][0]], dims=['lat', 'lon']) # Compute difference grid grid_z = grid_z_1 - grid_z_2 # Reconstruct lat & lon grids xi = grid_z.lon.values yi = grid_z.lat.values grid_z = grid_z.values grid_x, grid_y = np.meshgrid(xi, yi) # Determine NaNs grid_z_years[0][np.isnan(grid_z_years[0])] = -9999 grid_z_years[1][np.isnan(grid_z_years[1])] = -8999 grid_z_years[0][grid_z_years[0] != -9999] = 0 grid_z_years[1][grid_z_years[1] != -8999] = 0 grid_z_1 = xr.DataArray(np.nan_to_num(grid_z_years[0]), coords=[ grid_y_years[0].T[0], grid_x_years[0][0]], dims=['lat', 'lon']) grid_z_2 = xr.DataArray(np.nan_to_num(grid_z_years[1]), coords=[ grid_y_years[1].T[0], grid_x_years[1][0]], dims=['lat', 'lon']) grid_z_check = (grid_z_1 - grid_z_2).values grid_z[grid_z_check == -1000] = np.nan print(np.nanmin(grid_z)) else: # Determine whether to use averages if year_average: years_listed = len(range(year_range[0], year_range[1] + 1)) grid_z = grid_z / years_listed # Create instance of plot object try: self.plot_obj except: self.plot_obj = TrackPlot() # Create cartopy projection using basin if domain is None: domain = self.basin if cartopy_proj is None: if max(points['lon']) > 150 or min(points['lon']) < -150: self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=180.0) else: self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=0.0) # Format left title for plot endash = u"\u2013" dot = u"\u2022" title_L = request.lower() for name in ['wind', 'vmax']: title_L = title_L.replace(name, 'wind (kt)') for name in ['pressure', 'mslp']: title_L = title_L.replace(name, 'pressure (hPa)') for name in ['heading', 'motion']: title_L = title_L.replace( name, f'heading (kt) over {thresh["dt_window"]} hours') for name in ['speed', 'movement']: title_L = title_L.replace( name, f'forward speed (kt) over {thresh["dt_window"]} hours') if request.find('change') >= 0: title_L = title_L + f", {thresh['dt_align']}" title_L = title_L[0].upper() + title_L[1:] + plot_subtitle # Format right title for plot if storm is not None: if isinstance(storm, list): title_R = 'Storm Composite' else: if isinstance(storm, str): storm = basin.get_storm_tuple(storm) title_R = f'{storm[0]} {storm[1]}' else: date_range = [dt.strptime(d, '%m/%d').strftime('%b/%d') for d in date_range] if np.subtract(*year_range) == 0: y_r_title = f'{year_range[0]}' else: y_r_title = f'{year_range[0]} {endash} {year_range[1]}' add_avg = ' year-avg' if year_average else '' if year_range_subtract is None: title_R = f'{date_range[0].replace("/"," ")} {endash} {date_range[1].replace("/"," ")} {dot} {y_r_title}{add_avg}' else: if np.subtract(*year_range_subtract) == 0: y_r_s_title = f'{year_range_subtract[0]}' else: y_r_s_title = f'{year_range_subtract[0]} {endash} {year_range_subtract[1]}' title_R = f'{date_range[0].replace("/"," ")} {endash} {date_range[1].replace("/"," ")}\n{y_r_title}{add_avg} minus {y_r_s_title}{add_avg}' prop['title_L'], prop['title_R'] = title_L, title_R # Change the masking for variables that go out to zero near the edge of the data if prop['smooth'] is not None: # Replace NaNs with zeros to apply Gaussian filter grid_z_zeros = grid_z.copy() grid_z_zeros[np.isnan(grid_z)] = 0 initial_mask = grid_z.copy() # Save initial mask initial_mask[np.isnan(grid_z)] = -9999 grid_z_zeros = gfilt(grid_z_zeros, sigma=prop['smooth']) if len(grid_z_years) == 2: # grid_z_1_zeros = np.asarray(grid_z_1) # grid_z_1_zeros[grid_z_1==-9999]=0 # grid_z_1_zeros = gfilt(grid_z_1_zeros,sigma=prop['smooth']) # grid_z_2_zeros = np.asarray(grid_z_2) # grid_z_2_zeros[grid_z_2==-8999]=0 # grid_z_2_zeros = gfilt(grid_z_2_zeros,sigma=prop['smooth']) # grid_z_zeros = grid_z_1_zeros - grid_z_2_zeros # test_zeros = (grid_z_1_zeros<.02*np.nanmax(grid_z_1_zeros)) & (grid_z_2_zeros<.02*np.nanmax(grid_z_2_zeros)) pass elif varname not in [('dx_dt', 'dy_dt'), 'speed', 'mslp']: # Apply cutoff at 2% of maximum test_zeros = (grid_z_zeros < .02 * np.amax(grid_z_zeros)) grid_z_zeros[test_zeros] = -9999 initial_mask = grid_z_zeros.copy() grid_z_zeros[initial_mask == -9999] = np.nan grid_z = grid_z_zeros.copy() # Plot gridded field plot_ax = self.plot_obj.plot_gridded_wrapper( grid_x, grid_y, grid_z, varname, VEC_FLAG, domain, ax=ax, prop=prop, map_prop=map_prop) # Format grid into xarray if specified if return_array: arr = xr.DataArray(np.nan_to_num(grid_z), coords=[ grid_y.T[0], grid_x[0]], dims=['lat', 'lon']) return arr # Return axis if return_array: return {'ax': plot_ax, 'array': arr} else: return plot_ax
[docs] def assign_storm_tornadoes(self, dist_thresh=1000, tornado_path='spc'): r""" Assigns tornadoes to all North Atlantic tropical cyclones from TornadoDataset. Parameters ---------- dist_thresh : int Distance threshold (in kilometers) from the tropical cyclone track over which to attribute tornadoes to the TC. Default is 1000 km. tornado_path : str Source to read tornado data from. Default is "spc", which reads from the online Storm Prediction Center (SPC) 1950-present tornado database. Can change this to a local file. Notes ----- If you intend on analyzing tornadoes for multiple tropical cyclones using a Storm object, it is recommended to run this function first to avoid the need to re-read the entire tornado database for each Storm object. """ # Check to ensure data source is over North Atlantic if self.basin != "north_atlantic": raise RuntimeError( "Tropical cyclone tornado data is only available for the North Atlantic basin.") # Check to see if tornado data already exists in this instance self.TorDataset = TornadoDataset(tornado_path=tornado_path) self.tornado_dist_thresh = dist_thresh # Iterate through all storms in dataset and assign them tornadoes, if they exist timer_start = dt.now() print(f'--> Starting to assign tornadoes to storms') for i, key in enumerate(self.keys): # Skip years prior to 1950 if self.data[key]['year'] < 1950: continue # Get tornado data for storm storm_obj = self.get_storm(key) tor_data = self.TorDataset.get_storm_tornadoes( storm_obj, dist_thresh=dist_thresh) tor_data = self.TorDataset.rotateToHeading(storm_obj, tor_data) self.data_tors[key] = tor_data # Check if storm contains tornadoes if len(tor_data) > 0: self.keys_tors[i] = 1 # Update user on status print(f'--> Completed assigning tornadoes to storm (%.2f seconds)' % (dt.now() - timer_start).total_seconds())
[docs] def plot_TCtors_rotated(self, storms, mag_thresh=0, return_df=False, save_path=None): r""" Plot tracks of tornadoes relative to the storm motion vector of the tropical cyclone. Parameters ---------- storms : list or str Storm(s) for which to plot motion-relative tornado data for. Can be either a list of storm IDs/tuples for which to create a composite of, or a string "all" for all storms containing tornado data. mag_thresh : int Minimum threshold for tornado rating. return_df : bool Whether to return the pandas DataFrame containing the composite tornado data. Default is False. save_path : str Relative or full path of directory to save the image in. If none, image will not be saved. Returns ------- ax or dict By default, the plot axes is returned. If "return_df" is set to True, returns a dict containing both the data and the axes plot Notes ----- The motion vector is oriented upwards (in the +y direction). """ # Error check try: self.TorDataset except: raise RuntimeError( "No tornado data has been attributed to this dataset. Please run \"TrackDataset.assign_storm_tornadoes()\" first.") # Error check if not isinstance(mag_thresh, int): raise TypeError("mag_thresh must be of type int.") elif mag_thresh not in [0, 1, 2, 3, 4, 5]: raise ValueError("mag_thresh must be between 0 and 5.") # Get IDs of all storms to composite if storms == 'all': storms = [self.keys[i] for i in range(len(self.keys)) if self.keys_tors[i] == 1] else: if len(storms) == 2 and isinstance(storms[-1], int): use_storms = [self.get_storm_id(storms)] else: use_storms = [i if isinstance(i, str) else self.get_storm_id(i) for i in storms] storms = [ i for i in use_storms if i in self.keys and self.keys_tors[self.keys.index(i)] == 1] if len(storms) == 0: raise RuntimeError( "None of the requested storms produced any tornadoes.") # Get stormTors formatted with requested storm(s) stormTors = (self.data_tors[storms[0]]).copy() stormTors['storm_id'] = [storms[0]] * len(stormTors) if len(storms) > 1: for storm in storms[1:]: storm_df = self.data_tors[storm] storm_df['storm_id'] = [storm] * len(storm_df) stormTors = stormTors.append(storm_df) # Create figure for plotting plt.figure(figsize=(9, 9), dpi=150) ax = plt.subplot() # Default EF color scale EFcolors = get_colors_ef('default') # Number of storms exceeding mag_thresh num_storms = len( np.unique(stormTors.loc[stormTors['mag'] >= mag_thresh]['storm_id'].values)) # Filter for mag >= mag_thresh, and sort by mag so highest will be plotted on top stormTors = stormTors.loc[stormTors['mag'] >= mag_thresh].sort_values('mag') # Plot all tornado tracks in motion relative coords for _, row in stormTors.iterrows(): plt.plot([row['rot_xdist_s'], row['rot_xdist_e'] + .01], [row['rot_ydist_s'], row['rot_ydist_e'] + .01], lw=2, c=EFcolors[row['mag']]) # Plot dist_thresh radius dist_thresh = self.tornado_dist_thresh ax.set_facecolor('#F6F6F6') circle = plt.Circle((0, 0), dist_thresh, color='w') ax.add_artist(circle) an = np.linspace(0, 2 * np.pi, 100) ax.plot(dist_thresh * np.cos(an), dist_thresh * np.sin(an), 'k') ax.plot([-dist_thresh, dist_thresh], [0, 0], 'k--', lw=.5) ax.plot([0, 0], [-dist_thresh, dist_thresh], 'k--', lw=.5) # Plot motion vector plt.arrow(0, -dist_thresh * .1, 0, dist_thresh * .2, length_includes_head=True, head_width=45, head_length=45, fc='k', lw=2, zorder=100) # Labels ax.set_aspect('equal', 'box') ax.set_xlabel('Left/Right of Storm Heading (km)', fontsize=13) ax.set_ylabel('Behind/Ahead of Storm Heading (km)', fontsize=13) ax.set_title( f'Composite motion-relative tornadoes\nMin threshold: EF-{mag_thresh} | n={num_storms} storms', fontsize=14, fontweight='bold') ax.tick_params(axis='both', which='major', labelsize=11.5) # Add legend handles = [] for ef, color in enumerate(EFcolors): if ef >= mag_thresh: count = len(stormTors[stormTors['mag'] == ef]) handles.append(mlines.Line2D([], [], linestyle='-', color=color, label=f'EF-{ef} ({count})')) ax.legend(handles=handles, loc='lower left', fontsize=11.5) # Add attribution ax.text(0.99, 0.01, plot_credit(), fontsize=8, color='k', alpha=0.7, transform=ax.transAxes, ha='right', va='bottom', zorder=10) # Save image if specified if save_path is not None and isinstance(save_path, str): plt.savefig(save_path, bbox_inches='tight') # Return data if return_df: return {'ax': ax, 'df': stormTors} else: return ax
[docs] def to_dataframe(self): r""" Retrieve a Pandas DataFrame for all seasons within TrackDataset. Returns ------- pandas.DataFrame Returns a Pandas DataFrame containing requested data. """ # Get start and end seasons in this TrackDataset object start_season = self.data[self.keys[0]]['year'] end_season = self.data[self.keys[-1]]['year'] # Create empty dict to be used for making pandas DataFrame object ds = {'season': [], 'all_storms': [], 'named_storms': [], 'hurricanes': [ ], 'major_hurricanes': [], 'ace': [], 'start_time': [], 'end_time': []} # Iterate over all seasons in the TrackDataset object for season in range(start_season, end_season + 1): # Get season summary, if season can be retrieved try: season_summary = self.get_season(season).summary() except: continue if len(season_summary['id']) == 0: continue # Add information to dict ds['season'].append(season) ds['all_storms'].append(season_summary['season_storms']) ds['named_storms'].append(season_summary['season_named']) ds['hurricanes'].append(season_summary['season_hurricane']) ds['major_hurricanes'].append(season_summary['season_major']) ds['ace'].append(season_summary['season_ace']) ds['start_time'].append(season_summary['season_start']) ds['end_time'].append(season_summary['season_end']) # Convert entire dict to a DataFrame ds = pd.DataFrame(ds) # Return dataset return ds.set_index('season')
[docs] def climatology(self, climo_bounds=(1991, 2020)): r""" Create a climatology for this dataset given start and end seasons. If none passed, defaults to 1991-2020. Parameters ---------- climo_bounds : list or tuple, optional Start and end year for the climatology range. Default is (1991,2020). Returns ------- dict Dictionary containing the climatology for this dataset. Notes ----- If in southern hemisphere, year is the 2nd year of the season (e.g., 1975 for 1974-1975). """ # Error check if not isinstance(climo_bounds, (list, tuple)): raise TypeError("climo_bounds must be of type list or tuple.") if len(climo_bounds) != 2: raise TypeError( "climo_bounds must have two elements, start and end year.") start_season, end_season = climo_bounds if start_season >= end_season: raise ValueError("start_season cannot be greater than end_season.") if not is_number(start_season) or not is_number(end_season): raise TypeError("start_season and end_season must be of type int.") if (end_season - start_season) < 5: raise ValueError( "A minimum of 5 seasons is required for constructing a climatology.") # Retrieve data for all seasons in this dataset full_climo = self.to_dataframe() subset_climo = full_climo.loc[start_season:end_season + 1] # Convert dates to julian days julian_start = [convert_to_julian(pd.to_datetime( i)) for i in subset_climo['start_time'].values] julian_end = [convert_to_julian(pd.to_datetime(i)) for i in subset_climo['end_time'].values] if self.basin in constants.SOUTH_HEMISPHERE_BASINS: julian_start = [i + 365 if i < 100 else i for i in julian_start] julian_end = [i - 365 if i > 300 else i for i in julian_end] elif self.basin != 'all': julian_end = [i + 365 if i < 100 else i for i in julian_end] subset_climo = subset_climo.drop(columns=['start_time', 'end_time']) subset_climo['start_time'] = julian_start subset_climo['end_time'] = julian_end subset_climo_means = (subset_climo.mean(axis=0)).round(1) climatology = {} for key in ['all_storms', 'named_storms', 'hurricanes', 'major_hurricanes', 'ace']: climatology[key] = subset_climo_means[key] for key in ['start_time', 'end_time']: climatology[key] = dt(dt.now().year - 1, 12, 31) + \ timedelta(hours=24 * subset_climo_means[key]) return climatology
[docs] def season_composite(self, seasons, climo_bounds=None): r""" Create composite statistics for a list of seasons. Parameters ---------- seasons : list List of seasons to create a composite of. climo_bounds : list or tuple List or tuple of start and end years of climatology bounds. If none, defaults to (1991,2020). Returns ------- dict Dictionary containing the composite of the requested seasons. Notes ----- If in southern hemisphere, year is the 2nd year of the season (e.g., 1975 for 1974-1975). """ if not isinstance(seasons, list): raise TypeError("'seasons' must be of type list.") if climo_bounds is None: climo_bounds = (1991, 2020) summary = self.get_season(seasons).summary() climatology = self.climatology(climo_bounds) full_climo = self.to_dataframe() subset_climo = full_climo.loc[climo_bounds[0]:climo_bounds[1] + 1] # Create composite dictionary map_keys = {'all_storms': 'season_storms', 'named_storms': 'season_named', 'hurricanes': 'season_hurricane', 'major_hurricanes': 'season_major', 'ace': 'season_ace', } composite = {} for key in map_keys.keys(): # Get list from seasons season_list = summary[map_keys.get(key)] season_climo = climatology[key] season_fullclimo = subset_climo[key].values # Create dictionary of relevant calculations for this entry composite[key] = {'list': season_list, 'average': np.round(np.average(season_list), 1), 'composite_anomaly': np.round(np.average(season_list) - season_climo, 1), 'percentile_ranks': [np.round(stats.percentileofscore(season_fullclimo, i), 1) for i in season_list], } return composite
[docs] def analogs_from_point(self, point, radius, units='km', thresh={}, non_tropical=False, year_range=None, date_range=None): r""" Retrieve historical TC tracks surrounding a point. Parameters ---------- point : tuple Tuple ordered by (latitude, longitude). radius : int or float Radius in kilometers surrounding the point to search for storms. units : str, optional Units of distance for radius. Can be "miles" or "km". Default is "km". thresh : dict Dict for threshold(s) that storms within the requested radius must meet. The following options are available: * **v_min** - Search for sustained wind (kt) above this threshold * **v_max** - Search for sustained wind (kt) below this threshold * **p_min** - Search for MSLP (hPa) below this threshold * **p_max** - Search for MSLP (hPa) above this threshold non_tropical : bool If True, non-tropical (e.g., tropical disturbance, extra-tropical cyclone) points are included in the search. Default is False. year_range : tuple Year range over which to search. If None, defaults to entire dataset. date_range : tuple Start and end dates, formatted as a "month/day" string. If None, defaults to year round. Returns ------- dict Dict of tropical cyclones that meet the criteria, with storm ID as the key and its closest distance to point as the value. Notes ----- This function automatically interpolates all storm data within this TrackDataset instance to hourly, if this hasn't already been done previously. """ # Determine year range of dataset if year_range is None: start_year = self.data[self.keys[0]]['year'] end_year = self.data[self.keys[-1]]['year'] elif isinstance(year_range, (list, tuple)): if len(year_range) != 2: raise ValueError( "year_range must be a tuple or list with 2 elements: (start_year, end_year)") start_year = int(year_range[0]) if start_year < self.data[self.keys[0]]['year']: start_year = self.data[self.keys[0]]['year'] end_year = int(year_range[1]) if end_year > self.data[self.keys[-1]]['year']: end_year = self.data[self.keys[-1]]['year'] else: raise TypeError("year_range must be of type tuple or list") # Determine date range if date_range is None: date_range = ('1/1', '12/31') # Units error check if units not in ['km', 'miles']: raise ValueError("units must be 'km' or 'miles'.") unit_factor = 1.0 if units == 'km' else 0.621371 # Interpolate all storm data, if hasn't been done already self.__interpolate_storms(self.keys) data = {} for key in self.keys: if self.data[key]['year'] > end_year or self.data[key]['year'] < start_year: continue storm_data = [[great_circle(point, (self.data_interp[key]['lat'][i], self.data_interp[key]['lon'][i])).kilometers, self.data_interp[key]['vmax'][i], self.data_interp[key]['mslp'][i], self.data_interp[key]['time'][i]] for i in range(len(self.data_interp[key]['lat'])) if self.data_interp[key]['type'][i] in constants.TROPICAL_STORM_TYPES or non_tropical == True] storm_data = [i for i in storm_data if i[0] <= radius * unit_factor] storm_data = [i for i in storm_data if i[3] >= dt.strptime(date_range[0], '%m/%d').replace( year=i[3].year) and i[3] <= (dt.strptime(date_range[1], '%m/%d') + timedelta(hours=23)).replace(year=i[3].year)] if len(storm_data) == 0: continue if 'v_min' in thresh.keys(): storm_data = [i for i in storm_data if i[1] >= thresh['v_min']] if 'v_max' in thresh.keys(): storm_data = [i for i in storm_data if i[1] <= thresh['v_max']] if 'p_min' in thresh.keys(): storm_data = [i for i in storm_data if i[2] >= thresh['p_min']] if 'p_max' in thresh.keys(): storm_data = [i for i in storm_data if i[2] <= thresh['p_max']] if len(storm_data) == 0: continue data[key] = np.round( np.nanmin([i[0] * unit_factor for i in storm_data]), 1) return data
[docs] def analogs_from_shape(self, points, thresh={}, non_tropical=False, year_range=None, date_range=None): r""" Retrieve historical TC tracks within a bounded region. Parameters ---------- points : list List of tuples ordered by (latitude, longitude) corresponding to the bounded region. thresh : dict Dict for threshold(s) that storms within the requested radius must meet. The following options are available: * **v_min** - Search for sustained wind (kt) above this threshold * **v_max** - Search for sustained wind (kt) below this threshold * **p_min** - Search for MSLP (hPa) below this threshold * **p_max** - Search for MSLP (hPa) above this threshold non_tropical : bool If True, non-tropical (e.g., tropical disturbance, extra-tropical cyclone) points are included in the search. Default is False. year_range : tuple Year range over which to search. If None, defaults to entire dataset. date_range : tuple Start and end dates, formatted as a "month/day" string. If None, defaults to year round. Returns ------- list List of tropical cyclones that meet the criteria. Notes ----- This function automatically interpolates all storm data within this TrackDataset instance to hourly, if this hasn't already been done previously. """ # Determine year range of dataset if year_range is None: start_year = self.data[self.keys[0]]['year'] end_year = self.data[self.keys[-1]]['year'] elif isinstance(year_range, (list, tuple)): if len(year_range) != 2: raise ValueError( "year_range must be a tuple or list with 2 elements: (start_year, end_year)") start_year = int(year_range[0]) if start_year < self.data[self.keys[0]]['year']: start_year = self.data[self.keys[0]]['year'] end_year = int(year_range[1]) if end_year > self.data[self.keys[-1]]['year']: end_year = self.data[self.keys[-1]]['year'] else: raise TypeError("year_range must be of type tuple or list") # Determine date range if date_range is None: date_range = ('1/1', '12/31') # Interpolate all storm data, if hasn't been done already self.__interpolate_storms(self.keys) # Check for last entry of tuple if points[-1] != points[0]: points.append(points[0]) p = path.Path(points) # Coerce points longitudes to -180 to 180 for point in points: if point[1] > 180.0: point[1] = point[1] - 360.0 data = [] for key in self.keys: lon_shift = self.data_interp[key]['lon'] + 0.0 lon_shift[lon_shift > 180.0] = lon_shift[lon_shift > 180.0] - 360.0 if self.data[key]['year'] > end_year or self.data[key]['year'] < start_year: continue storm_data = [[p.contains_point((self.data_interp[key]['lat'][i], lon_shift[i])), self.data_interp[key]['vmax'][i], self.data_interp[key]['mslp'][i], self.data_interp[key]['time'][i]] for i in range( len(self.data_interp[key]['lat'])) if self.data_interp[key]['type'][i] in constants.TROPICAL_STORM_TYPES or non_tropical == True] storm_data = [i for i in storm_data if i[0] == True] storm_data = [i for i in storm_data if i[3] >= dt.strptime(date_range[0], '%m/%d').replace( year=i[3].year) and i[3] <= (dt.strptime(date_range[1], '%m/%d') + timedelta(hours=23)).replace(year=i[3].year)] if len(storm_data) == 0: continue if 'v_min' in thresh.keys(): storm_data = [i for i in storm_data if i[1] >= thresh['v_min']] if 'v_max' in thresh.keys(): storm_data = [i for i in storm_data if i[1] <= thresh['v_max']] if 'p_min' in thresh.keys(): storm_data = [i for i in storm_data if i[2] >= thresh['p_min']] if 'p_max' in thresh.keys(): storm_data = [i for i in storm_data if i[2] <= thresh['p_max']] if len(storm_data) == 0: continue data.append(key) return data
[docs] def plot_analogs_from_point(self, point, radius, units='km', thresh={}, non_tropical=False, year_range=None, date_range=None, **kwargs): r""" Plot historical TC tracks surrounding a point. Parameters ---------- point : tuple Tuple ordered by (latitude, longitude). radius : int or float Radius in kilometers surrounding the point to search for storms. units : str, optional Units of distance for radius. Can be "miles" or "km". Default is "km". thresh : dict Dict for threshold(s) that storms within the requested radius must meet. The following options are available: * **v_min** - Search for sustained wind (kt) above this threshold * **v_max** - Search for sustained wind (kt) below this threshold * **p_min** - Search for MSLP (hPa) below this threshold * **p_max** - Search for MSLP (hPa) above this threshold non_tropical : bool If True, non-tropical (e.g., tropical disturbance, extra-tropical cyclone) points are included in the search. Default is False. year_range : tuple Year range over which to search. If None, defaults to entire dataset. date_range : tuple Start and end dates, formatted as a "month/day" string. If None, defaults to year round. Other Parameters ---------------- **kwargs Refer to ``tropycal.tracks.TrackDataset.plot_storms`` for plotting keyword arguments. Returns ------- ax Axes instance of the plot. Notes ----- This function automatically interpolates all storm data within this TrackDataset instance to hourly, if this hasn't already been done previously. """ # Determine year range of dataset if year_range is None: start_year = self.data[self.keys[0]]['year'] end_year = self.data[self.keys[-1]]['year'] elif isinstance(year_range, (list, tuple)): if len(year_range) != 2: raise ValueError( "year_range must be a tuple or list with 2 elements: (start_year, end_year)") start_year = int(year_range[0]) if start_year < self.data[self.keys[0]]['year']: start_year = self.data[self.keys[0]]['year'] end_year = int(year_range[1]) if end_year > self.data[self.keys[-1]]['year']: end_year = self.data[self.keys[-1]]['year'] else: raise TypeError("year_range must be of type tuple or list") # Determine date range if date_range is None: date_range = ('1/1', '12/31') # Reconfigure domain to be centered around circle import cartopy.geodesic as geodesic unit_factor = 1.0 if units == 'km' else 0.621371 circle_points = geodesic.Geodesic().circle( lon=point[1], lat=point[0], radius=radius * 1000 * unit_factor, n_samples=360, endpoint=False) domain = kwargs.pop('domain', None) if domain is None: lons = [i[0] for i in circle_points] lats = [i[1] for i in circle_points] bounds = dynamic_map_extent(np.nanmin(lons), np.nanmax( lons), np.nanmin(lats), np.nanmax(lats)) kwargs['domain'] = {'w': bounds[0], 'e': bounds[1], 's': bounds[2], 'n': bounds[3]} else: kwargs['domain'] = domain # Retrieve storms and plot on axes storms = self.analogs_from_point( point, radius, units, thresh, non_tropical, year_range, date_range).keys() ax = self.plot_storms(storms, **kwargs) # Plot circle and dot import cartopy import shapely ms = 12 linewidth = 2.5 color = 'k' ax.plot(point[1], point[0], 'o', mfc=color, mec=color, ms=ms, zorder=30) geom = shapely.geometry.Polygon(circle_points) ax.add_geometries((geom,), crs=cartopy.crs.PlateCarree( ), facecolor='none', edgecolor=color, linewidth=linewidth, zorder=30) # Change title title = kwargs.pop('title', '') if title == '': endash = u"\u2013" dot = u"\u2022" degree_sign = u'\N{DEGREE SIGN}' lat_formatter = f"{point[0]:.1f}{degree_sign}N" if point[0] < 0: lat_formatter = f"{abs(point[0]):.1f}{degree_sign}S" lon_formatter = f"{point[1]:.1f}{degree_sign}E" if point[1] < 0: lon_formatter = f"{abs(point[1]):.1f}{degree_sign}W" if point[1] > 180: lon_formatter = f"{abs(point[1]-360.0):.1f}{degree_sign}W" start_day = dt.strptime(date_range[0], '%m/%d').strftime('%b %d') end_day = dt.strptime(date_range[1], '%m/%d').strftime('%b %d') ax.set_title(f"TCs Within {radius} {units} of {lat_formatter}, {lon_formatter}", loc='left', fontsize=17, fontweight='bold') ax.set_title( f"Number of storms: {len(storms)}\n{start_day} {endash} {end_day} {dot} {start_year} {endash} {end_year}", loc='right', fontsize=13) return ax
[docs] def plot_analogs_from_shape(self, points, thresh={}, non_tropical=False, year_range=None, date_range=None, **kwargs): r""" Plot historical TC tracks surrounding a point. Parameters ---------- points : list List of tuples ordered by (latitude, longitude) corresponding to the bounded region. thresh : dict Dict for threshold(s) that storms within the requested radius must meet. The following options are available: * **v_min** - Search for sustained wind (kt) above this threshold * **v_max** - Search for sustained wind (kt) below this threshold * **p_min** - Search for MSLP (hPa) below this threshold * **p_max** - Search for MSLP (hPa) above this threshold non_tropical : bool If True, non-tropical (e.g., tropical disturbance, extra-tropical cyclone) points are included in the search. Default is False. year_range : tuple Year range over which to search. If None, defaults to entire dataset. date_range : tuple Start and end dates, formatted as a "month/day" string. If None, defaults to year round. Other Parameters ---------------- linewidth : int or float Width of bounded shape line. Defaults to 2.0. color : str Color of bounded shape line. Defaults to black. **kwargs Refer to ``tropycal.tracks.TrackDataset.plot_storms`` for plotting keyword arguments. Returns ------- ax Axes instance of the plot. Notes ----- This function automatically interpolates all storm data within this TrackDataset instance to hourly, if this hasn't already been done previously. """ # Determine year range of dataset if year_range is None: start_year = self.data[self.keys[0]]['year'] end_year = self.data[self.keys[-1]]['year'] elif isinstance(year_range, (list, tuple)): if len(year_range) != 2: raise ValueError( "year_range must be a tuple or list with 2 elements: (start_year, end_year)") start_year = int(year_range[0]) if start_year < self.data[self.keys[0]]['year']: start_year = self.data[self.keys[0]]['year'] end_year = int(year_range[1]) if end_year > self.data[self.keys[-1]]['year']: end_year = self.data[self.keys[-1]]['year'] else: raise TypeError("year_range must be of type tuple or list") # Determine date range if date_range is None: date_range = ('1/1', '12/31') # Reconfigure domain to be centered around circle domain = kwargs.pop('domain', None) if domain is None: lons = [i[1] for i in points] lats = [i[0] for i in points] bounds = dynamic_map_extent(np.nanmin(lons), np.nanmax( lons), np.nanmin(lats), np.nanmax(lats)) kwargs['domain'] = {'w': bounds[0], 'e': bounds[1], 's': bounds[2], 'n': bounds[3]} else: kwargs['domain'] = domain # Retrieve storms and plot on axes storms = self.analogs_from_shape( points, thresh, non_tropical, year_range, date_range) ax = self.plot_storms(storms, **kwargs) # Plot circle and dot import cartopy.crs as ccrs linewidth = 3.0 color = 'k' if points[-1] != points[0]: points.append(points[0]) ax.plot([i[1] for i in points], [i[0] for i in points], color=color, linewidth=linewidth, zorder=30, transform=ccrs.PlateCarree()) # Change title title = kwargs.pop('title', '') if title == '': endash = u"\u2013" dot = u"\u2022" start_day = dt.strptime(date_range[0], '%m/%d').strftime('%b %d') end_day = dt.strptime(date_range[1], '%m/%d').strftime('%b %d') ax.set_title(f"TC Tracks Within Bounded Region", loc='left', fontsize=17, fontweight='bold') ax.set_title( f"Number of storms: {len(storms)}\n{start_day} {endash} {end_day} {dot} {start_year} {endash} {end_year}", loc='right', fontsize=13) return ax
[docs] def plot_summary(self, time, domain=None, ax=None, cartopy_proj=None, save_path=None, **kwargs): r""" Plot a summary map of past tropical cyclone and NHC potential development activity. Only valid for areas in NHC's area of responsibility at this time. Parameters ---------- time : datetime Valid time for the summary plot. domain : str Domain for the plot. Default is current basin. Please refer to :ref:`options-domain` for available domain options. ax : axes, optional Instance of axes to plot on. If none, one will be generated. Default is none. cartopy_proj : ccrs, optional Instance of a cartopy projection to use. If none, one will be generated. Default is none. save_path : str, optional Relative or full path of directory to save the image in. If none, image will not be saved. Other Parameters ---------------- two_prop : dict Customization properties of NHC Tropical Weather Outlook (TWO). Please refer to :ref:`options-summary` for available options. storm_prop : dict Customization properties of active storms. Please refer to :ref:`options-summary` for available options. cone_prop : dict Customization properties of cone of uncertainty. Please refer to :ref:`options-summary` for available options. map_prop : dict Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options. Returns ------- ax Instance of axes containing the plot is returned. Notes ----- The following properties are available for plotting NHC Tropical Weather Outlook (TWO), via ``two_prop``. .. list-table:: :widths: 25 75 :header-rows: 1 * - Property - Description * - plot - Boolean to determine whether to plot NHC TWO. Default is True. * - days - Number of days for TWO. Can be either 2 or 5. Default is 5. * - fontsize - Font size for text label. Default is 12. * - ms - Marker size for area location, if applicable. Default is 15. The following properties are available for plotting storms, via ``storm_prop``. .. list-table:: :widths: 25 75 :header-rows: 1 * - Property - Description * - plot - Boolean to determine whether to plot active storms. Default is True. * - linewidth - Line width for past track. Default is 0.8. Set to zero to not plot line. * - linecolor - Line color for past track. Default is black. * - linestyle - Line style for past track. Default is dotted. * - fontsize - Font size for storm name label. Default is 12. * - fillcolor - Fill color for storm location marker. Default is color by SSHWS category ("category"). * - label_category - Boolean for whether to plot SSHWS category on top of storm location marker. Default is True. * - ms - Marker size for storm location. Default is 14. The following properties are available for plotting realtime cone of uncertainty, via ``cone_prop``. .. list-table:: :widths: 25 75 :header-rows: 1 * - Property - Description * - plot - Boolean to determine whether to plot cone of uncertainty & forecast track for active storms. Default is True. * - linewidth - Line width for forecast track. Default is 1.5. Set to zero to not plot line. * - alpha - Opacity for cone of uncertainty. Default is 0.6. * - days - Number of days for cone of uncertainty, from 2 through 5. Default is 5. * - fillcolor - Fill color for forecast dots. Default is color by SSHWS category ("category"). * - label_category - Boolean for whether to plot SSHWS category on top of forecast dots. Default is True. * - ms - Marker size for forecast dots. Default is 12. """ # Set basin if domain is None: domain = self.basin # Find closest NHC shapefile shapefiles = get_two_archive(time) if shapefiles['areas'] is None: two_prop = {'plot': False} else: two_prop = kwargs.pop('two_prop', {}) # Search all valid storms at the time print("--> Reading storm data") storms = [] forecasts = [] for key in self.keys: # Temporal filter if time < self.data[key]['time'][0]: continue if self.data[key]['time'][-1] < time: continue # Filter for ibtracs sources (use best track instead of operational track) if self.source != 'hurdat': diff = [(time - i).total_seconds() / 3600 for i in self.data[key]['time']] diff_maxes = [i for i in diff if i >= 0] if len(diff_maxes) == 0: continue idx = diff.index(np.nanmin(diff_maxes)) if np.nanmin(diff) > 0: continue if diff[idx] < (-9.0 / 24.0): continue if diff[idx] > (9.0 / 24.0): continue storm = self.get_storm(key).sel( time=(self.data[key]['time'][0], time)) if True in np.isin(list(storm.type), list(constants.TROPICAL_STORM_TYPES)): storms.append(storm) continue # Get forecast storm = self.get_storm(key) storm.get_operational_forecasts() # Get all NHC forecast entries nhc_forecasts = storm.forecast_dict['OFCL'] carq_forecasts = storm.forecast_dict['CARQ'] # Get list of all NHC forecast initializations nhc_forecast_init = [k for k in nhc_forecasts.keys()] carq_forecast_init = [k for k in carq_forecasts.keys()] # Find closest matching time to the provided forecast date, or time nhc_forecast_init_dt = [dt.strptime(k, '%Y%m%d%H') if 3 not in nhc_forecasts[k]['fhr'] else dt.strptime( k, '%Y%m%d%H') + timedelta(hours=3) for k in nhc_forecast_init] time_diff = [(i - time).days + (i - time).seconds / 86400 for i in nhc_forecast_init_dt] time_diff = np.array([i for i in time_diff if i <= 0.0]) if len(time_diff) == 0: continue closest_idx = np.abs(time_diff).argmin() forecast_dict = nhc_forecasts[nhc_forecast_init[closest_idx]] advisory_num = closest_idx + 1 # Second filter if np.nanmin(time_diff) > 0: continue if time_diff[closest_idx] < (-9.0 / 24.0): continue if time_diff[closest_idx] > (9.0 / 24.0): continue # Get observed track as per NHC analyses track_dict = {'lat': [], 'lon': [], 'vmax': [], 'type': [], 'mslp': [ ], 'time': [], 'extra_obs': [], 'special': [], 'ace': 0.0} use_carq = True for k in nhc_forecast_init: if k not in carq_forecasts.keys(): continue if carq_forecasts[k]['init'] > time: continue hrs = nhc_forecasts[k]['fhr'] hrs_carq = carq_forecasts[k]['fhr'] if k in carq_forecast_init else [ ] # Account for old years when hour 0 wasn't included directly # if 0 not in hrs and k in carq_forecast_init and 0 in hrs_carq: if storm.dict['year'] < 2000 and k in carq_forecast_init and 0 in hrs_carq: use_carq = True hr_idx = hrs_carq.index(0) track_dict['lat'].append(carq_forecasts[k]['lat'][hr_idx]) track_dict['lon'].append(carq_forecasts[k]['lon'][hr_idx]) track_dict['vmax'].append( carq_forecasts[k]['vmax'][hr_idx]) track_dict['mslp'].append(np.nan) track_dict['time'].append(carq_forecasts[k]['init']) itype = carq_forecasts[k]['type'][hr_idx] if itype == "": itype = get_storm_type( carq_forecasts[k]['vmax'][0], False) track_dict['type'].append(itype) hr = carq_forecasts[k]['init'].strftime("%H%M") track_dict['extra_obs'].append(0) if hr in [ '0300', '0900', '1500', '2100'] else track_dict['extra_obs'].append(1) track_dict['special'].append("") else: use_carq = False if 3 in hrs: hr_idx = hrs.index(3) hr_add = 3 else: hr_idx = 0 hr_add = 0 if nhc_forecasts[k]['init'] + timedelta(hours=hr_add) > time: continue track_dict['lat'].append(nhc_forecasts[k]['lat'][hr_idx]) track_dict['lon'].append(nhc_forecasts[k]['lon'][hr_idx]) track_dict['vmax'].append(nhc_forecasts[k]['vmax'][hr_idx]) track_dict['mslp'].append(np.nan) track_dict['time'].append( nhc_forecasts[k]['init'] + timedelta(hours=hr_add)) itype = nhc_forecasts[k]['type'][hr_idx] if itype == "": itype = get_storm_type( nhc_forecasts[k]['vmax'][0], False) track_dict['type'].append(itype) hr = nhc_forecasts[k]['init'].strftime("%H%M") track_dict['extra_obs'].append(0) if hr in [ '0300', '0900', '1500', '2100'] else track_dict['extra_obs'].append(1) track_dict['special'].append("") # Add main elements from storm dict for key in ['id', 'operational_id', 'name', 'year']: track_dict[key] = storm.dict[key] # Add carq to forecast dict as hour 0, if available if use_carq and forecast_dict['init'] in track_dict['time']: insert_idx = track_dict['time'].index(forecast_dict['init']) if 0 in forecast_dict['fhr']: forecast_dict['lat'][0] = track_dict['lat'][insert_idx] forecast_dict['lon'][0] = track_dict['lon'][insert_idx] forecast_dict['vmax'][0] = track_dict['vmax'][insert_idx] forecast_dict['mslp'][0] = track_dict['mslp'][insert_idx] forecast_dict['type'][0] = track_dict['type'][insert_idx] else: forecast_dict['fhr'].insert(0, 0) forecast_dict['lat'].insert( 0, track_dict['lat'][insert_idx]) forecast_dict['lon'].insert( 0, track_dict['lon'][insert_idx]) forecast_dict['vmax'].insert( 0, track_dict['vmax'][insert_idx]) forecast_dict['mslp'].insert( 0, track_dict['mslp'][insert_idx]) forecast_dict['type'].insert( 0, track_dict['type'][insert_idx]) # Fix forecast dict if hour 3 is available if not use_carq and 3 in forecast_dict['fhr']: idx_3 = forecast_dict['fhr'].index(3) for iter_key in ['fhr', 'lat', 'lon', 'vmax', 'mslp', 'type']: forecast_dict[iter_key] = forecast_dict[iter_key][idx_3:] forecast_dict['fhr'][0] = 0 # Add other info to forecast dict forecast_dict['advisory_num'] = advisory_num forecast_dict['basin'] = storm.basin # Append to storms track_dict['prob_2day'] = 'N/A' track_dict['risk_2day'] = 'N/A' track_dict['prob_7day'] = 'N/A' track_dict['risk_7day'] = 'N/A' # Fill empty observed dicts (assuming storm just formed) if len(track_dict['lat']) == 0: for iter_key in ['lat', 'lon', 'vmax', 'mslp', 'type']: track_dict[iter_key] = [forecast_dict[iter_key][0]] # Fix name if applicable if np.nanmax(track_dict['vmax']) < 34: if track_dict['operational_id'] != '': track_dict['name'] = num_to_text( int(track_dict['operational_id'][2:4])).upper() else: track_dict['name'] = num_to_text( int(track_dict['id'][2:4])).upper() track_dict['basin'] = storm.basin storms.append(Storm(track_dict)) forecasts.append(forecast_dict) # Retrieve kwargs invest_prop = {'plot': False} storm_prop = kwargs.pop('storm_prop', {}) cone_prop = kwargs.pop('cone_prop', {}) map_prop = kwargs.pop('map_prop', {}) # Create instance of plot object try: self.plot_obj except: self.plot_obj = TrackPlot() # Create cartopy projection if cartopy_proj is not None: self.plot_obj.proj = cartopy_proj else: self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=180.0) # 0.0 # Plot print("--> Generating plot") ax = self.plot_obj.plot_summary(storms, forecasts, shapefiles, time, domain, ax, save_path, two_prop, invest_prop, storm_prop, cone_prop, map_prop) return ax
[docs] def search_time(self, time, extratropical=False, vmin=None, pmax=None): r""" Searches for all storms active during the requested time. Parameters ---------- time : datetime.datetime Datetime object denoting the requested time. extratropical : bool, optional If False, search excludes extratropical cyclones during the requested time. Default is False. vmin : int, optional Minimum threshold for sustained wind (knots) to filter. pmax : int, optional Maximum threshold for cyclone minimum MSLP (hPa) to filter. Returns ------- pandas.DataFrame DataFrame containing the storm IDs found during the requested time and additional relevant storm attributes. """ # Iterate over all keys return_data = {'id': [], 'name': [], 'lat': [], 'lon': [], 'vmax': [], 'mslp': [], 'type': [], 'wmo_basin': []} for key in self.keys: # First filter if time < self.data[key]['time'][0]: continue if self.data[key]['time'][-1] < time: continue diff = [(time - i).total_seconds() / 3600 for i in self.data[key]['time']] diff_maxes = [i for i in diff if i >= 0] if len(diff_maxes) == 0: continue idx = diff.index(np.nanmin(diff_maxes)) if np.nanmin(diff) > 0: continue if diff[idx] >= 6.0: continue # Second filter if not extratropical and self.data[key]['type'][idx] not in constants.TROPICAL_STORM_TYPES: continue # Third filter if vmin is not None: if np.isnan(self.data[key]['vmax'][idx]) or self.data[key]['vmax'][idx] < vmin: continue if pmax is not None: if np.isnan(self.data[key]['mslp'][idx]) or self.data[key]['mslp'][idx] > pmax: continue # Return data return_data['id'].append(key) return_data['name'].append(self.data[key]['name']) for iter_key in [k for k in return_data.keys()][2:]: return_data[iter_key].append(self.data[key][iter_key][idx]) return pd.DataFrame(return_data)