Source code for tropycal.ships.ships

r"""Functionality for storing and analyzing SHIPS data."""

import numpy as np
import xarray as xr
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.colors as col
from matplotlib import gridspec
import cartopy.crs as ccrs
import cartopy.feature as cfeature
from ..utils import ships_parser, get_colors_sshws, wind_to_category, dynamic_map_extent

[docs]class Ships(): r""" Initializes an instance of Ships. Parameters ---------- content : str SHIPS file content. If initialized via a ``tropycal.tracks.Storm`` object, this does not need to be provided. Other Parameters ---------------- storm_name : str, optional If provided, overrides storm name from SHIPS text data with the provided storm name. Returns ------- tropycal.ships.Ships Instance of a Ships object. Notes ----- A Ships object is best retrieved from a Storm object's ``get_ships()`` method. For example, if the dataset read in is the default North Atlantic and the desired storm is Hurricane Michael (2018), SHIPS data would be retrieved as follows: .. code-block:: python from tropycal import tracks import datetime as dt # Retrieve storm object for Hurricane Michael basin = tracks.TrackDataset() storm = basin.get_storm(('michael',2018)) # Retrieve instance of a Ships object ships = storm.get_ships(dt.datetime(2018, 10, 8, 0)) """ def __setitem__(self, key, value): self.__dict__[key] = value def __getitem__(self, key): return self.__dict__[key] def __repr__(self): # Label object summary = ["<tropycal.ships.Ships>"] # Format keys for coordinates variable_keys = {} for key in self.dict.keys(): dtype = type(self.dict[key][0]).__name__ dtype = dtype.replace("_", "") variable_keys[key] = f"({dtype}) [{self.dict[key][0]} .... {self.dict[key][-1]}]" # Add data summary.append('\nVariables:') add_space = np.max([len(key) for key in variable_keys.keys()]) + 3 for key in variable_keys.keys(): key_name = key summary.append( f'{" "*4}{key_name:<{add_space}}{variable_keys[key]}') # Add additional information summary.append('\nRI Probabilities:') add_space = np.max([len(key) for key in self.dict_ri.keys()]) + 3 for key in self.dict_ri.keys(): key_name = key + ":" entry = f'{self.dict_ri[key]["probability"]}% ({self.dict_ri[key]["prob / climo"]}x climo mean)' summary.append(f'{" "*4}{key_name:<{add_space}}{entry}') # Add additional information summary.append('\nAttributes:') add_space = np.max([len(key) for key in self.attrs.keys()]) + 3 for key in self.attrs.keys(): key_name = key + ":" summary.append(f'{" "*4}{key_name:<{add_space}}{self.attrs[key]}') return "\n".join(summary) def __init__(self, content, storm_name=None, forecast_init=None): ships = ships_parser(content) self.dict = ships['data'] self.dict_ri = ships['data_ri'] self.attrs = ships['data_attrs'] # Override storm name if requested if storm_name is not None: self.attrs['storm_name'] = storm_name # Set data variables as attributes of this object for key in self.dict: self[key] = np.array(self.dict[key])
[docs] def get_variables(self): r""" Return list of available forecast variables. Returns ------- list List containing available forecast variables. Notes ----- These variables can be accessed from two methods: >>> # 1. Access directly from object >>> ships.fhr array([ 0, 6, 12, 18, 24, 36, 48, 60, 72, 84, 96, 108, 120]) >>> # 2. Access from internal data dictionary >>> ships.dict['fhr'] array([ 0, 6, 12, 18, 24, 36, 48, 60, 72, 84, 96, 108, 120]) """ return list(self.dict.keys())
[docs] def get_snapshot(self, hour): r""" Return all variables valid at a single forecast hour. Parameters ---------- hour : int Requested forecast hour. Returns ------- dict Dictionary containing all variables valid at this forecast hour. """ # Error check if hour not in self.dict['fhr']: raise ValueError('Requested forecast hour is not available.') idx = self.dict['fhr'].index(hour) # Format dict data = {} for key in self.dict: data[key] = self.dict[key][idx] return data
[docs] def get_ri_prob(self): r""" Return rapid intensification probabilities. Returns ------- dict Dictionary containing rapid intensification probabilities. """ return self.dict_ri
[docs] def to_xarray(self): r""" Convert data to an xarray Dataset. Returns ------- xarray.Dataset SHIPS data converted to an xarray Dataset. """ # Set up empty dict for dataset time = self.dict['fhr'] ds = {} # Add every key containing a list into the dict, otherwise add as an attribute keys = [k for k in self.dict.keys() if k != 'fhr'] for key in keys: ds[key] = xr.DataArray(self.dict[key], coords = [time], dims = ['fhr']) # Convert entire dict to a Dataset ds = xr.Dataset(ds, attrs=self.attrs) # Return dataset return ds
[docs] def to_dataframe(self, attrs_as_columns=False): r""" Convert data to a pandas DataFrame object. Parameters ---------- attrs_as_columns : bool If True, adds Ships object attributes as columns in the DataFrame returned. Default is False. Returns ------- pandas.DataFrame A pandas DataFrame object containing SHIPS data. """ # Set up empty dict for dataframe df = {} # Add every key containing a list into the dict for key in self.dict: df[key] = self.dict[key] if attrs_as_columns: for key in self.attrs: df[key] = self.attrs[key] # Convert entire dict to a DataFrame df = pd.DataFrame(df) # Return dataset return df
[docs] def plot_summary(self): r""" Generates a plot summarizing the SHIPS forecast. Returns ------- matplotlib.figure Matplotlib figure containing multiple axes. """ # Create the figure and gridspec fig = plt.figure(figsize=(9,6),dpi=200) gs = gridspec.GridSpec(6, 32) # Left column & format Cartopy projection plot_lon = np.copy(self.lon) if np.nanmax(self.lon) > 150 or np.nanmin(self.lon) < -150: proj = ccrs.PlateCarree(central_longitude = 180.0) plot_lon[plot_lon < 0] += 360.0 else: proj = ccrs.PlateCarree() ax1 = plt.subplot(gs[:3, :14]) ax2 = plt.subplot(gs[3:, :14], projection=proj) # Right column ax3 = plt.subplot(gs[:1, 16:]) ax4 = plt.subplot(gs[1:2, 16:]) ax5 = plt.subplot(gs[2:3, 16:]) ax6 = plt.subplot(gs[3:4, 16:]) ax7 = plt.subplot(gs[4:5, 16:]) ax8 = plt.subplot(gs[5:6, 16:]) # ================================================================================================ # Hide axes boundaries for spine in ['top', 'bottom', 'left', 'right']: ax1.spines[spine].set_visible(False) ax1.set_xticks([]) ax1.set_yticks([]) ax1.tick_params(axis='both', which='both', length=0) # Plot title ax1.text(0.5, 0.95, f"SHIPS Forecast for {self.attrs['storm_name']}", fontweight='bold', ha='center', va='center', fontsize=14, transform=ax1.transAxes) ax1.text(0.5, 0.86, f"Initialized: {self.attrs['forecast_init'].strftime('%H%M UTC %d %b %Y')}", ha='center', va='center', fontsize=9, transform=ax1.transAxes) # Current storm information ax1.text(0.5, 0.70, 'Current Storm Information:', color='#222286', fontweight='bold', ha='center', va='center', fontsize=9, transform=ax1.transAxes) dot = u"\u2022" deg = u'\N{DEGREE SIGN}' current_info = f'Lat: {self.lat[0]}{deg} {dot} Lon: {self.lon[0]}{deg} {dot} ' current_info += f'Wind: {int(self.vmax_land_kt[0])} kt' ax1.text(0.5, 0.62, current_info, color='#222286', alpha=0.6, ha='center', va='center', fontsize=9, transform=ax1.transAxes) # Colormap for RI probabilities cmap = plt.cm.Reds norm = col.BoundaryNorm(np.arange(0,100,1),cmap.N) # Prepare table for RI probabilities def format_label(value, entry_label): if np.isnan(value): return 'N/A' return f'{value}{entry_label}' data = [] colors = [] data.append([i.replace('/','/\n') for i in self.dict_ri.keys()]) colors.append(['#8CBCE1' for i in self.dict_ri.keys()]) entries = ['probability', 'prob / climo'] entries_label = ['%', 'x\nclimo'] for entry, entry_label in zip(entries, entries_label): data.append([format_label(self.dict_ri[i][entry], entry_label) for i in self.dict_ri.keys()]) if entry == 'prob / climo': colors.append(['#fff' for i in self.dict_ri.keys()]) else: colors.append([cmap(norm(self.dict_ri[i][entry]))[:-1] + (0.5,) for i in self.dict_ri.keys()]) # Set table to occupy bottom half of axes table_bbox = [0, 0, 1, 0.40] # Insert table into axes ax1.text(0.5, 0.46, 'Rapid Intensification Probabilities:', fontweight='bold', ha='center', va='center', fontsize=9, transform=ax1.transAxes) table = ax1.table(cellText=data, cellLoc='center', loc='center', cellColours=colors, bbox=table_bbox, edges='closed') # Customize table display table.set_fontsize(8) table.scale(1, 1.5) table.auto_set_column_width(range(len(data[0]))) # ================================================================================================ # Plot forecast ax2.plot(plot_lon, self.lat, color='k', linewidth=1.5, transform=ccrs.PlateCarree()) for i_lon, i_lat, i_vmax, i_type in zip(plot_lon, self.lat, self.vmax_land_kt, self.storm_type): if True in [np.isnan(i) for i in [i_lon, i_lat, i_vmax]]: continue marker_type = 'o' if i_type == 'TROP' else '^' marker_size = 9 if i_type == 'TROP' else 8 ax2.plot(i_lon, i_lat, marker_type, mfc=get_colors_sshws(i_vmax), mec='k', mew=0.5, ms=marker_size, transform=ccrs.PlateCarree()) if i_type != 'TROP': continue category = str(wind_to_category(i_vmax)) if category == "0": category = 'S' if category == "-1": category = 'D' ax2.text(i_lon, i_lat, category, ha='center', va='center', fontsize=7, transform=ccrs.PlateCarree()) # Plot coastlines and political boundaries ax2.add_feature(cfeature.STATES.with_scale('50m'), linewidths=0.25, linestyle='solid', edgecolor='#444') ax2.add_feature(cfeature.BORDERS.with_scale('50m'), linewidths=0.5, linestyle='solid', edgecolor='#444') ax2.add_feature(cfeature.COASTLINE.with_scale('50m'), linewidths=0.5, linestyle='solid', edgecolor='#444') ax2.add_feature(cfeature.LAND.with_scale('50m'), facecolor='#EEEEEE', edgecolor='face') plot_domain = dynamic_map_extent(np.nanmin(plot_lon), np.nanmax(plot_lon), np.nanmin(self.lat), np.nanmax(self.lat)) ax2.set_extent(plot_domain) # Add lat/lon labels gl = ax2.gridlines(draw_labels=True, linewidth=0.5, color='gray', alpha=0.5, linestyle='dashed') gl.top_labels = False gl.right_labels = False gl.xlabel_style = {'size': 8} gl.ylabel_style = {'size': 8} # Add plot credit ax2.text(0.02, 0.98, "Plot generated by Tropycal\nInspired by Deelan Jariwala", fontsize=8, alpha=0.6, ha='left', va='top', transform=ax2.transAxes) # ================================================================================================ # Set grid ax3.grid(linestyle='dotted') ax3.set_xticks(range(0,max(self.fhr)+1,24)) ax3.set_xticklabels([]) ax3.tick_params(axis='y', labelsize=8) ax3.set_xlim(0,max(self.fhr)) # Plot vmax ax3.plot(self.fhr, self.vmax_land_kt, color='k', label='Max Wind (kt)') ax3.plot(self.fhr, self.vmax_noland_kt, color='k', alpha=0.3, label='Max Wind (kt, no land)') ax3.legend(fontsize=7) # ================================================================================================ # Set grid ax4.grid(linestyle='dotted') ax4.set_xticks(range(0,max(self.fhr)+1,24)) ax4.set_xticklabels([]) ax4.tick_params(axis='y', labelsize=8) ax4.set_xlim(0,max(self.fhr)) # Plot max potential intensity ax4.plot(self.fhr, self.vmax_pot_kt, color='r', label='Max Potential Intensity (kt)') ax4.legend(fontsize=7) # ================================================================================================ # Set grid ax5.grid(linestyle='dotted') ax5.set_xticks(range(0,max(self.fhr)+1,24)) ax5.set_xticklabels([]) ax5.tick_params(axis='y', labelsize=8) ax5.set_xlim(0,max(self.fhr)) # Plot shear ax5.plot(self.fhr, self.shear_kt, color='b', label='Wind Shear (kt)') ax5.legend(fontsize=7) # ================================================================================================ # Set grid ax6.grid(linestyle='dotted') ax6.set_xticks(range(0,max(self.fhr)+1,24)) ax6.set_xticklabels([]) ax6.tick_params(axis='y', labelsize=8) ax6.set_xlim(0,max(self.fhr)) # Plot SSTs ax6.plot(self.fhr, self.sst_c, color='orange', label=f'SST ({deg}C)') ax6.legend(fontsize=7) # ================================================================================================ # Set grid ax7.grid(linestyle='dotted') ax7.set_xticks(range(0,max(self.fhr)+1,24)) ax7.set_xticklabels([]) ax7.tick_params(axis='y', labelsize=8) ax7.set_xlim(0,max(self.fhr)) # Plot 700-500mb RH ax7.plot(self.fhr, self.dict['700_500_rh'], color='g', label='700-500mb RH (%)') ax7.legend(fontsize=7) # ================================================================================================ # Set grid ax8.grid(linestyle='dotted') ax8.set_xticks(range(0,max(self.fhr)+1,24)) ax8.tick_params(axis='both', labelsize=8) ax8.set_xlim(0,max(self.fhr)) # Plot heat potential ax8.plot(self.fhr, self.heat_content, color='purple', label=r'Heat Content (KJ/cm$^2$)') ax8.legend(fontsize=7) return fig