Source code for tropycal.recon.realtime

import urllib3
import requests
import pandas as pd
from datetime import datetime as dt, timedelta
import matplotlib.pyplot as plt

from .plot import *
from .tools import *
from ..utils import *

[docs]class RealtimeRecon(): r""" Creates an instance of a RealtimeRecon object. Parameters ---------- hours : int Number of hours to search back for recon missions. Default is 6 hours. Max allowed is 48 hours. Returns ------- RealtimeRecon Instance of a RealtimeRecon object. Notes ----- .. warning:: RealtimeRecon data is retrieved directly through NHC's archive, though this source can lag by up to 40 minutes compared to the very latest recon data available directly on their website. The ``RealtimeRecon`` Class and accompanying ``Mission`` class make up the realtime part of the recon module. Unlike the ``ReconDataset`` Class and its accompanying ``hdobs``, ``dropsondes`` and ``vdms`` classes which are **storm-centric**, the realtime recon functionality is **mission-centric**. This mission-centric functionality means realtime recon missions lack the storm-centering functionality that the more comprehensive full recon functionality has, but is also much faster at reading recon data for realtime purposes and includes non-tropical cyclone recon missions. The following example shows how to use realtime recon functionality. First, create an instance of ``RealtimeRecon``, which retrieves all active missions within the specified hour window (default is the most recent 6 hours): .. code-block:: python from tropycal import recon realtime_obj = recon.RealtimeRecon() We can now quickly retrieve a list of all active mission IDs: >>> realtime_obj.get_mission_ids() ['AF306-0801A-ALEX', 'AF303-0901A-ALEX'] Now use the ``get_mission()`` method to retrieve a Mission object containing all available data (HDOBs, VDMs & dropsondes) for this mission. The resulting object can access all of the methods and attributes of the ``Mission`` class. .. code-block:: python mission = realtime_obj.get_mission('AF303-0901A-ALEX') """ def __repr__(self): summary = ["<tropycal.recon.RealtimeRecon>"] # Add dataset summary summary.append("Dataset Summary:") summary.append( f'{" "*4}Numbers of active missions: {len(self.missions)}') return "\n".join(summary) def __init__(self, hours=6): # Error check self.hours = hours if hours > 48 or hours <= 0: raise ValueError("Maximum allowed search is 48 hours back.") # Start timing print("--> Searching for active missions") timer_start = # Set URLs for reading data self.urls = { 'hdobs': f'{dt.utcnow().year}/AHONT1/', 'dropsondes': f'{dt.utcnow().year}/REPNT3/', 'vdms': f'{dt.utcnow().year}/REPNT2/' } # Start time set by hour window start_time_request = dt.utcnow() - timedelta(hours=hours) start_time = dt.utcnow() - timedelta(hours=hours + 12) self.start_time_request = start_time_request # Retrieve list of files in URL and filter by storm dates files = { 'hdobs': [], 'dropsondes': [], 'vdms': [] } for key in files.keys(): page = requests.get(self.urls[key]).text content = page.split("\n") file_list = [] for line in content: if ".txt" in line: file_list.append( ((line.split('txt">')[1]).split("</a>")[0]).split(".")) del content file_list = sorted([i for i in file_list if dt.strptime( i[1][:10], '%Y%m%d%H') >= start_time], key=lambda x: x[1]) files[key] = [self.urls[key] + '.'.join(l) for l in file_list] self.files = files # Retrieve all active missions & read HDOBs urllib3.disable_warnings() http = urllib3.PoolManager() self.missions = {} for file in files['hdobs']: # Retrieve content response = http.request('GET', file) content ='utf-8') content_split = content.split("\n") # Construct mission ID try: mission_id = '-'.join( (content_split[3].replace(" ", " ")).split(" ")[:3]) if mission_id not in self.missions: self.missions[mission_id] = { 'hdobs': decode_hdob(content), 'vdms': [], 'dropsondes': [], 'aircraft': mission_id.split("-")[0], 'storm_name': mission_id.split("-")[2] } else: self.missions[mission_id]['hdobs'] = pd.concat( [self.missions[mission_id]['hdobs'], decode_hdob(content)]) except: pass # Retrieve VDMs for file in files['vdms']: # Retrieve content response = http.request('GET', file) content ='utf-8') content_split = content.split("\n") # Construct mission ID mission_id = ['-'.join(i.split("U. ")[1].replace(" ", " ").split(" ")[:3]) for i in content_split if i[:2] == "U."][0] time = dt.strptime((file.split('.')[-2])[:8], '%Y%m%d') try: blank, data = decode_vdm(content, time) except: continue if mission_id in self.missions.keys(): self.missions[mission_id]['vdms'].append(data) # Retrieve dropsondes for file in files['dropsondes']: # Retrieve content response = http.request('GET', file) content ='utf-8') content_split = content.split("\n") # Construct mission ID mission_id = ['-'.join(i.split("61616 ")[1].replace(" ", " ").split(" ")[:3]) for i in content_split if i[:5] == "61616"][0] time = dt.strptime((file.split('.')[-2])[:8], '%Y%m%d') try: blank, data = decode_dropsonde(content, time) except: continue if mission_id in self.missions.keys(): self.missions[mission_id]['dropsondes'].append(data) # Temporally filter missions keys = [k for k in self.missions.keys()] for key in keys: end_time = pd.to_datetime( self.missions[key]['hdobs']['time'].values[-1]) if end_time < start_time_request: del self.missions[key] # Sort each mission by time for key in self.missions.keys(): self.missions[key]['hdobs'].sort_values(['time'], inplace=True) # Save current time self.time = # Add attributes self.attrs = { 'hours': self.hours, 'time': self.time, } print( f"--> Completed retrieving active missions ({(} seconds)")
[docs] def update(self): r""" Updates RealtimeRecon with the latest available data. Notes ----- This function has no return value, but simply updates RealtimeRecon with the latest available recon data. """ # Start timing timer_start = # Start time set by hour window start_time = dt.utcnow() - timedelta(hours=24) if start_time < self.start_time_request: start_time = self.start_time_request # Retrieve list of files in URL and filter by storm dates files = { 'hdobs': [], 'dropsondes': [], 'vdms': [] } for key in files.keys(): page = requests.get(self.urls[key]).text content = page.split("\n") file_list = [] for line in content: if ".txt" in line: file_list.append( ((line.split('txt">')[1]).split("</a>")[0]).split(".")) del content file_list = sorted([i for i in file_list if dt.strptime( i[1][:10], '%Y%m%d%H') >= start_time], key=lambda x: x[1]) files[key] = [self.urls[key] + '.'.join( l) for l in file_list if self.urls[key] + '.'.join(l) not in self.files[key]] self.files[key] += files[key] # Retrieve all active missions & read HDOBs urllib3.disable_warnings() http = urllib3.PoolManager() for file in files['hdobs']: # Retrieve content response = http.request('GET', file) content ='utf-8') content_split = content.split("\n") # Construct mission ID mission_id = '-'.join( (content_split[3].replace(" ", " ")).split(" ")[:3]) if mission_id not in self.missions: self.missions[mission_id] = { 'hdobs': decode_hdob(content), 'vdms': [], 'dropsondes': [], 'aircraft': mission_id.split("-")[0], 'storm_name': mission_id.split("-")[2] } else: self.missions[mission_id]['hdobs'] = pd.concat( [self.missions[mission_id]['hdobs'], decode_hdob(content)]) # Retrieve VDMs for file in files['vdms']: # Retrieve content response = http.request('GET', file) content ='utf-8') content_split = content.split("\n") # Construct mission ID mission_id = ['-'.join(i.split("U. ")[1].replace(" ", " ").split(" ")[:3]) for i in content_split if i[:2] == "U."][0] time = dt.strptime((file.split('.')[-2])[:8], '%Y%m%d') blank, data = decode_vdm(content, time) if mission_id in self.missions.keys(): self.missions[mission_id]['vdms'].append(data) # Retrieve dropsondes for file in files['dropsondes']: # Retrieve content response = http.request('GET', file) content ='utf-8') content_split = content.split("\n") # Construct mission ID mission_id = ['-'.join(i.split("61616 ")[1].replace(" ", " ").split(" ")[:3]) for i in content_split if i[:5] == "61616"][0] time = dt.strptime((file.split('.')[-2])[:8], '%Y%m%d') blank, data = decode_dropsonde(content, time) if mission_id in self.missions.keys(): self.missions[mission_id]['dropsondes'].append(data) # Temporally filter missions keys = [k for k in self.missions.keys()] for key in keys: end_time = pd.to_datetime( self.missions[key]['hdobs']['time'].values[-1]) start_time_request = dt.utcnow() - timedelta(hours=self.hours) if end_time < start_time_request: del self.missions[key] # Sort each mission by time for key in self.missions.keys(): self.missions[key]['hdobs'].sort_values(['time'], inplace=True) print( f"--> Completed updating mission data ({(} seconds)")
[docs] def get_mission(self, mission_id): r""" Retrieve a Mission object given the mission ID. Parameters ---------- mission_id : str String denoting requested mission ID. All active mission IDs can be retrieved using ``get_mission_ids()``. Returns ------- tropycal.recon.Mission An instance of a Mission object for this mission. """ return Mission(self.missions[mission_id], mission_id)
[docs] def get_mission_ids(self, storm_name=None): r""" Retrieve a list of all active mission IDs. Parameters ---------- storm_name : str, optional Storm name (case-insensitive) to filter the search by. If None, all active missions are searched. Returns ------- list List containing all active mission IDs. """ mission_ids = [key for key in self.missions.keys()] if storm_name is not None: mission_ids = [i for i in mission_ids if i.split( "-")[2].lower() == storm_name.lower()] return mission_ids
[docs] def get_hdobs_summary(self): r""" Retrieve the latest 10-minute HDOBs for all missions. Only valid for missions that were active within the last hour. Returns ------- dict Dictionary with the parsed HDOB data highlights. Notes ----- .. note:: The decoded HDOBs summary automatically filters out all flagged observations for each variable. If all observations are flagged or missing for a variable, a value of NaN is returned. """ data = {} for mission_id in self.missions.keys(): sub_df = self.missions[mission_id]['hdobs'].tail(20) if pd.to_datetime(sub_df['time'].values[-1]) < dt.utcnow() - timedelta(hours=1): continue # Parse data array = [val for i, val in enumerate( sub_df['sfmr']) if 'sfmr' not in sub_df['flag'].values[i]] max_sfmr = np.nanmax(array) if not all_nan(array) else np.nan array = [val for i, val in enumerate( sub_df['p_sfc']) if 'p_sfc' not in sub_df['flag'].values[i]] min_p_sfc = np.nanmin(array) if not all_nan(array) else np.nan array = [val for i, val in enumerate( sub_df['wspd']) if 'wspd' not in sub_df['flag'].values[i]] max_wspd = np.nanmax(array) if not all_nan(array) else np.nan array = [val for i, val in enumerate( sub_df['temp']) if 'temp' not in sub_df['flag'].values[i]] max_temp = np.nanmax(array) if not all_nan(array) else np.nan array = [val for i, val in enumerate( sub_df['dwpt']) if 'dwpt' not in sub_df['flag'].values[i]] max_dwpt = np.nanmax(array) if not all_nan(array) else np.nan data[mission_id] = { 'min_mslp': min_p_sfc, 'max_sfmr': max_sfmr, 'max_wspd': max_wspd, 'max_temp': max_temp, 'max_dwpt': max_dwpt, 'start_time': pd.to_datetime(sub_df['time'].values[0]), 'end_time': pd.to_datetime(sub_df['time'].values[-1]), } return data
[docs] def get_hdobs_realtime(self, basin, aircraft, decoded=True): r""" Retrieve the latest 10-minute HDOBs directly from NHC. Parameters ---------- basin : str Basin for which to retrieve the latest HDOBs for. Available options are ``"north_atlantic"`` or ``"east_pacific"``. aircraft : str Type of aircraft for recon mission. Available options are ``"noaa"`` or ``"usaf"`` (US Air Force). decoded : bool, optional If False, returns a Pandas DataFrame of all HDOBs. If True (default), returns a dictionary with decoded HDOBs highlights. Returns ------- pandas.DataFrame or dict Depending on the value of ``decoded``, returns either a Pandas DataFrame or a dictionary with the parsed HDOB data highlights. Notes ----- .. note:: If using ``decoded=True``, the decoded HDOBs summary automatically filters out all flagged observations for each variable. If all observations are flagged or missing for a variable, a value of NaN is returned. The National Hurricane Center (NHC) website has 4 links for the latest HDOBs available: 1. NOAA aircraft in the North Atlantic basin 2. US Air Force (USAF) aircraft in the North Atlantic basin 3. NOAA aircraft in the East Pacific basin 4. US Air Force (USAF) aircraft in the East Pacific basin Each of these links store the most recent 10-minute HDOB observations from the most recent mission for each aircraft type and basin, regardless of how long ago that mission was. Note that this function is different from ``get_hdobs_summary()``, as the archived NHC recon observations can lag by 10 to as much as 40 minutes. This function fetches the recon observations directly from NHC's realtime recon webpage with the very latest data as soon as it comes in. """ # Determine URL if basin == 'north_atlantic': if aircraft == 'noaa': url = '' elif aircraft == 'usaf': url = '' elif basin == 'east_pacific': if aircraft == 'noaa': url = '' elif aircraft == 'usaf': url = '' # Read URL content and get DataFrame content = read_url(url, subsplit=False) hdob_content = [] found = False for line in content: if line == '<pre>': found = True continue if found: if line == '': continue hdob_content.append(line) if line == '</pre>': break df = decode_hdob('\n'.join(hdob_content), mission_row=2) if not decoded: return df # Parse data array = [val for i, val in enumerate( df['sfmr']) if 'sfmr' not in df['flag'].values[i]] max_sfmr = np.nanmax(array) if not all_nan(array) else np.nan array = [val for i, val in enumerate( df['p_sfc']) if 'p_sfc' not in df['flag'].values[i]] min_p_sfc = np.nanmin(array) if not all_nan(array) else np.nan array = [val for i, val in enumerate( df['wspd']) if 'wspd' not in df['flag'].values[i]] max_wspd = np.nanmax(array) if not all_nan(array) else np.nan array = [val for i, val in enumerate( df['temp']) if 'temp' not in df['flag'].values[i]] max_temp = np.nanmax(array) if not all_nan(array) else np.nan array = [val for i, val in enumerate( df['dwpt']) if 'dwpt' not in df['flag'].values[i]] max_dwpt = np.nanmax(array) if not all_nan(array) else np.nan data = { 'mission_id': df['mission_id'].values[0], 'min_mslp': min_p_sfc, 'max_sfmr': max_sfmr, 'max_wspd': max_wspd, 'max_temp': max_temp, 'max_dwpt': max_dwpt, 'start_time': pd.to_datetime(df['time'].values[0]), 'end_time': pd.to_datetime(df['time'].values[-1]), } return data
class PseudoStorm(): r""" Creates a dummy Storm object for functions that require a Storm object. """ def __init__(self): self.dict = { 'type': ['TS'], 'vmax': [50], 'wmo_basin': 'north_atlantic', 'name': 'Test' }
[docs]class Mission(): r""" Creates an instance of a Mission object. Parameters ---------- data : dict Dictionary containing mission data. This is passed automatically from ``RealtimeRecon.get_mission()`` or ``ReconDataset.get_mission()``. mission_id : str String representing the full Mission ID. This is passed automatically from ``RealtimeRecon.get_mission()`` or ``ReconDataset.get_mission()``. Returns ------- tropycal.recon.Mission An instance of a Mission object. Notes ----- Mission objects can be retrieved via two methods. The first method is for realtime Recon missions, retrieved directly from ``RealtimeRecon`` objects provided a mission ID string. For this example below, we'll retrieve the latest mission available: .. code-block:: python #Read in all recent mission data from tropycal import recon realtime_obj = recon.RealtimeRecon() #Get latest mission ID string (or set this to any of the available mission IDs) latest_mission_id = realtime_obj.get_mission_ids()[-1] #Retrieve an instance of Mission mission = realtime_obj.get_mission(latest_mission_id) The second method to retrieve a Mission object is for archived recon missions, via ReconDataset: .. code-block:: python #Read in HURDATv2 dataset from tropycal import tracks basin = tracks.TrackDataset() storm = basin.get_storm(('michael',2018)) #Get mission #11 mission = storm.recon.get_mission(11) This instance of Mission can now access all of the attributes and methods of the Mission class. Note that since the Mission object is mission-centric, not storm-centric, much of the functionality available in the storm-centric recon classes is not available here, though some of the plotting functionality (plotting HDOB points, time series, or dropsonde Skew-T) is available. """ def __repr__(self): summary = ["<tropycal.recon.Mission>"] # Find maximum wind and minimum pressure array = [val for i, val in enumerate( self.hdobs['sfmr']) if 'sfmr' not in self.hdobs['flag'].values[i]] max_sfmr = np.nanmax(array) if not all_nan(array) else np.nan array = [val for i, val in enumerate( self.hdobs['p_sfc']) if 'p_sfc' not in self.hdobs['flag'].values[i]] min_psfc = np.nanmin(array) if not all_nan(array) else np.nan array = [val for i, val in enumerate( self.hdobs['wspd']) if 'wspd' not in self.hdobs['flag'].values[i]] max_wspd = np.nanmax(array) if not all_nan(array) else np.nan array = [val for i, val in enumerate( self.hdobs['pkwnd']) if 'pkwnd' not in self.hdobs['flag'].values[i]] max_pkwnd = np.nanmax(array) if not all_nan(array) else np.nan time_range = [pd.to_datetime(t) for t in ( np.nanmin(self.hdobs['time']), np.nanmax(self.hdobs['time']))] # Add summary text emdash = '\u2014' summary_keys = { 'Dropsondes': len(self.dropsondes), 'VDMs': len(self.vdms), 'Max 30sec flight level wind': f"{max_wspd} knots", 'Max 10sec flight level wind': f"{max_pkwnd} knots", 'Max SFMR wind': f"{max_sfmr} knots", 'Min surface pressure': f"{min_psfc} hPa" } # Add text to output summary.append("Mission Summary:") add_space = np.max([len(key) for key in summary_keys.keys()]) + 3 for key in summary_keys.keys(): key_name = key + ":" summary.append( f'{" "*4}{key_name:<{add_space}}{summary_keys[key]}') # Add attributes summary.append("\nAttributes:") add_space = np.max([len(key) for key in self.attrs.keys()]) + 3 for key in self.attrs.keys(): key_name = key + ":" summary.append(f'{" "*4}{key_name:<{add_space}}{self.attrs[key]}') return "\n".join(summary) def __init__(self, data, mission_id): # Retrieve variables self.vdms = data['vdms'] self.hdobs = data['hdobs'] self.dropsondes = data['dropsondes'] # Retrieve attributes self.mission_id = mission_id self.aircraft = data['aircraft'] self.storm_name = data['storm_name'] self.mission_id = mission_id self.start_time = pd.to_datetime(np.nanmin(self.hdobs['time'])) self.end_time = pd.to_datetime(np.nanmax(self.hdobs['time'])) # Get sources if 'source' in data.keys(): self.source = data['source'] else: if self.start_time.year <= 2005 and self.start_time.year >= 1989: self.source = [ 'National Hurricane Center (NHC)', "UCAR's Tropical Cyclone Guidance Project (TCGP)"] else: self.source = 'National Hurricane Center (NHC)' # Retrieve attributes self.attrs = { 'aircraft': data['aircraft'], 'storm_name': data['storm_name'], 'mission_id': mission_id, 'start_time': pd.to_datetime(np.nanmin(self.hdobs['time'])), 'end_time': pd.to_datetime(np.nanmax(self.hdobs['time'])), 'source': self.source, } # Add status for mission if self.aircraft.upper() == 'NOAA9': values = ['Upper Air' for i in self.hdobs['plane_p'].values] else: if self.attrs['start_time'].year >= 2007: values = get_status(self.hdobs['plane_p'].values) else: values = get_status(self.hdobs['plane_z'].values, use_z=True) try: del self.hdobs['status'] except: pass self.hdobs.insert(loc=len(self.hdobs.columns), column='status', value=values)
[docs] def get_hdobs(self): r""" Returns High Density Observations (HDOBs) for this mission. Returns ------- Pandas.DataFrame DataFrame containing HDOBs entries for this mission. """ return self.hdobs
[docs] def get_dropsondes(self): r""" Returns dropsondes for this mission. Returns ------- dict Dictionary containing dropsonde data. """ return self.dropsondes
[docs] def get_vdms(self): r""" Returns Vortex Data Messages (VDMs) for this mission. Returns ------- dict Dictionary containing VDM data. """ return self.vdms
[docs] def status(self): r""" Returns the current mission status. Returns ------- str String corresponding to the current mission status. Notes ----- This function returns four possible values: .. list-table:: :widths: 25 75 :header-rows: 1 * - Value - Description * - "En Route" - Mission has not flown higher than 650 hPa or initial peak altitude yet * - "In Progress" - Mission has descended below 650 hPa, or is constant below 650 hPa * - "Finished" - Mission has re-ascended above 650 hPa for a sufficient duration * - "Upper Air" - Mission is a NOAA9 aircraft, suggesting it is an upper-level mission. """ return self.hdobs['status'].values[-1]
[docs] def plot_points(self, varname='wspd', in_storm=False, barbs=False, plot_vdms=False, domain="dynamic", ax=None, cartopy_proj=None, **kwargs): r""" Creates a plot of High Density Observations (HDOBs) data points. Parameters ---------- varname : str Variable to plot. Can be one of the following keys in dataframe: * **"sfmr"** = SFMR surface wind * **"wspd"** = 30-second flight level wind (default) * **"pkwnd"** = 10-second flight level wind * **"p_sfc"** = extrapolated surface pressure in_storm : bool, optional If True, only plots points considered within the storm, or all points if en route. Default is False (plot all points). barbs : bool, optional If True, plots wind barbs. If False (default), plots dots. plot_vdms : bool, optional If True, plots a dot with the minimum MSLP (hPa) from all mission VDMs. Default is False. domain : str, optional Domain for the plot. Default is "dynamic". Please refer to :ref:`options-domain` for available domain options. ax : axes, optional Instance of axes to plot on. If none, one will be generated. Default is none. cartopy_proj : ccrs, optional Instance of a cartopy projection to use. If none, one will be generated. Default is none. Other Parameters ---------------- prop : dict Customization properties of recon plot. Please refer to :ref:`options-prop-recon-plot` for available options. map_prop : dict Customization properties of Cartopy map. Please refer to :ref:`options-map-prop` for available options. Returns ------- ax Instance of axes containing the plot is returned. Notes ----- 1. Plotting wind barbs only works for wind related variables. ``barbs`` will be automatically set to False for non-wind variables. 2. The special colormap **category_recon** can be used in the prop dict (``prop={'cmap':'category_recon'}``). This uses the standard SSHWS colormap, but with a new color for wind between 50 and 64 knots. .. warning:: The ``in_storm`` flag uses an internal algorithm to determine if the recon mission is near the storm, without knowledge of the actual coordinates of the storm. This algorithm works well for most standard recon missions, but can return unexpected results for the occasional outlier missions or incomplete/aborted missions. """ # Change barbs if varname == 'p_sfc': barbs = False # Pop kwargs prop = kwargs.pop('prop', {}) map_prop = kwargs.pop('map_prop', {}) # Get plot data if in_storm and self.aircraft.upper() != 'NOAA9': try: dfRecon = self.hdobs.loc[self.hdobs['status'] == 'In Storm'] if len(dfRecon) == 0: dfRecon = self.hdobs except: dfRecon = self.hdobs else: dfRecon = self.hdobs # Create instance of plot object self.plot_obj = ReconPlot() # Create cartopy projection if cartopy_proj is None: self.plot_obj.create_cartopy( proj='PlateCarree', central_longitude=0.0) cartopy_proj = self.plot_obj.proj # Get VDMs if plot_vdms: vdms = self.vdms else: vdms = [] # Plot recon plot_ax = self.plot_obj.plot_points(PseudoStorm(), dfRecon, domain, barbs=barbs, varname=varname, radlim=None, ax=ax, prop=prop, map_prop=map_prop, mission=True, vdms=vdms, mission_id=self.mission_id) # Return axis return plot_ax
[docs] def plot_skewt(self, number=None): r""" Plot a single dropsonde Skew-T for a given dropsonde number. Parameters ---------- number : int Number of dropsonde during mission. If None (default), plots the latest available dropsonde. Returns ------- fig Figure instance containing the plot. """ # Retrieve data for dropsonde number try: if number == -1: number = len(self.dropsondes) if number is None: number = len(self.dropsondes) data = [self.dropsondes[number - 1]] except: raise ValueError("Requested dropsonde number not available.") # Format title string title_string = f'Mission ID: {self.mission_id}\nDropsonde #{number}' return plot_skewt(data, title_string)
[docs] def plot_time_series(self, varname=('p_sfc', 'wspd'), time=None, realtime=False, **kwargs): r""" Plots a time series of one or two variables on an axis. Parameters ---------- varname : str or tuple If one variable to plot, varname is a string of the variable name. If two variables to plot, varname is a tuple of the left and right variable names, respectively. Available varnames are: * **p_sfc** - Mean Sea Level Pressure (hPa) * **temp** - Flight Level Temperature (C) * **dwpt** - Flight Level Dewpoint (C) * **wspd** - Flight Level Wind (kt) * **sfmr** - Surface Wind (kt) * **pkwnd** - Peak Wind Gust (kt) * **rain** - Rain Rate (mm/hr) * **plane_z** - Geopotential Height (m) * **plane_p** - Pressure (hPa) time : tuple Tuple of start and end times (datetime.datetime) to plot. If None, all times available are plotted. realtime : bool If True, the most recent 2 hours of the mission will plot, overriding the time argument. Default is False. Other Parameters ---------------- left_prop : dict Dictionary of properties for the left line. Scroll down for more information. right_prop : dict Dictionary of properties for the right line. Scroll down for more information. Returns ------- ax Instance of axes containing the plot is returned. Notes ----- The following properties are available for customizing the plot, via ``left_prop`` and ``right_prop``. .. list-table:: :widths: 25 75 :header-rows: 1 * - Property - Description * - ms - Marker size. If zero, none will be plotted. Default is zero. * - color - Color of lines (and markers if used). Default varies per varname. * - linewidth - Line width. Default is 1.0. """ # Pop kwargs left_prop = kwargs.pop('left_prop', {}) right_prop = kwargs.pop('right_prop', {}) # Retrieve variables twin_ax = False if isinstance(varname, tuple): varname_right = varname[1] varname = varname[0] twin_ax = True varname_right_info = time_series_plot(varname_right) varname_info = time_series_plot(varname) # Get data df = self.hdobs # Filter by time or realtime flag if realtime: end_time = pd.to_datetime(df['time'].values[-1]) df = df.loc[(df['time'] >= end_time - timedelta(hours=2)) & (df['time'] <= end_time)] elif time is not None: df = df.loc[(df['time'] >= time[0]) & (df['time'] <= time[1])] if len(df) == 0: raise ValueError("Time range provided is invalid.") # Filter by default kwargs left_prop_default = { 'ms': 0, 'color': varname_info['color'], 'linewidth': 1 } for key in left_prop.keys(): left_prop_default[key] = left_prop[key] left_prop = left_prop_default if twin_ax: right_prop_default = { 'ms': 0, 'color': varname_right_info['color'], 'linewidth': 1 } for key in right_prop.keys(): right_prop_default[key] = right_prop[key] right_prop = right_prop_default # ---------------------------------------------------------------------------------- # Create figure fig, ax = plt.subplots(figsize=(9, 6), dpi=200) if twin_ax: ax.grid(axis='x') else: ax.grid() # Plot line line1 = ax.plot(df['time'], df[varname], color=left_prop['color'], linewidth=left_prop['linewidth'], label=varname_info['name']) ax.set_ylabel(varname_info['full_name']) # Plot dots if left_prop['ms'] >= 1: plot_times = df['time'].values plot_var = df[varname].values plot_times = [plot_times[i] for i in range( len(plot_times)) if varname not in df['flag'].values[i]] plot_var = [plot_var[i] for i in range( len(plot_var)) if varname not in df['flag'].values[i]] ax.plot(plot_times, plot_var, 'o', color=left_prop['color'], ms=left_prop['ms']) # Format x-axis dates ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%Mz\n%m/%d')) # Add twin axis if twin_ax: ax2 = ax.twinx() # Plot line line2 = ax2.plot(df['time'], df[varname_right], color=right_prop['color'], linewidth=right_prop['linewidth'], label=varname_right_info['name']) ax2.set_ylabel(varname_right_info['full_name']) # Plot dots if right_prop['ms'] >= 1: plot_times = df['time'].values plot_var = df[varname_right].values plot_times = [plot_times[i] for i in range( len(plot_times)) if varname_right not in df['flag'].values[i]] plot_var = [plot_var[i] for i in range( len(plot_var)) if varname_right not in df['flag'].values[i]] ax2.plot(plot_times, plot_var, 'o', color=right_prop['color'], ms=right_prop['ms']) # Add legend lines = line1 + line2 labels = [l.get_label() for l in lines] ax.legend(lines, labels) # Special handling if both are in units of Celsius same_unit = False if varname in ['temp', 'dwpt'] and varname_right in ['temp', 'dwpt']: same_unit = True if varname in ['sfmr', 'wspd', 'pkwnd'] and varname_right in ['sfmr', 'wspd', 'pkwnd']: same_unit = True if same_unit: min_val = np.nanmin( [np.nanmin(df[varname]), np.nanmin(df[varname_right])]) max_val = np.nanmax( [np.nanmax(df[varname]), np.nanmax(df[varname_right])]) * 1.05 min_val = min_val * 1.05 if min_val < 0 else min_val * 0.95 if np.isnan(min_val): min_val = 0 if np.isnan(max_val): max_val = 0 if min_val == max_val: min_val = 0 max_val = 10 ax.set_ylim(min_val, max_val) ax2.set_ylim(min_val, max_val) # Add titles title_string = f"\nRecon Aircraft HDOBs\nMission ID: {self.mission_id}" ax.set_title(title_string, loc='left', fontweight='bold') ax.set_title("Plot generated using Tropycal", fontsize=8, loc='right') # Return plot return ax