Source code for py_eddy_tracker.dataset.grid

# -*- coding: utf-8 -*-
"""
Class to load and manipulate RegularGrid and UnRegularGrid
"""
from datetime import datetime
import logging

from cv2 import filter2D
from matplotlib.path import Path as BasePath
from netCDF4 import Dataset
from numba import njit, prange, types as numba_types
from numpy import (
    arange,
    array,
    ceil,
    concatenate,
    cos,
    deg2rad,
    empty,
    errstate,
    exp,
    float_,
    floor,
    histogram2d,
    int_,
    interp,
    isnan,
    linspace,
    ma,
    mean as np_mean,
    meshgrid,
    nan,
    nanmean,
    ones,
    percentile,
    pi,
    radians,
    round_,
    sin,
    sinc,
    where,
    zeros,
)
from pint import UnitRegistry
from scipy.interpolate import RectBivariateSpline, interp1d
from scipy.ndimage import gaussian_filter
from scipy.signal import welch
from scipy.spatial import cKDTree
from scipy.special import j1

from .. import VAR_DESCR
from ..eddy_feature import Amplitude, Contours
from ..generic import (
    bbox_indice_regular,
    coordinates_to_local,
    distance,
    interp2d_geo,
    local_to_coordinates,
    nearest_grd_indice,
    uniform_resample,
)
from ..observations.observation import EddiesObservations
from ..poly import (
    create_vertice,
    fit_circle,
    get_pixel_in_regular,
    poly_area,
    poly_contain_poly,
    visvalingam,
    winding_number_poly,
)

logger = logging.getLogger("pet")


[docs]def raw_resample(datas, fixed_size): nb_value = datas.shape[0] if nb_value == 1: raise Exception() return interp( arange(fixed_size), arange(nb_value) * (fixed_size - 1) / (nb_value - 1), datas )
@property def mean_coordinates(self): # last coordinates == first return self.vertices[1:].mean(axis=0) @property def lon(self): return self.vertices[:, 0] @property def lat(self): return self.vertices[:, 1] BasePath.mean_coordinates = mean_coordinates BasePath.lon = lon BasePath.lat = lat
[docs]@njit(cache=True) def uniform_resample_stack(vertices, num_fac=2, fixed_size=None): x_val, y_val = vertices[:, 0], vertices[:, 1] x_new, y_new = uniform_resample(x_val, y_val, num_fac, fixed_size) data = empty((x_new.shape[0], 2), dtype=vertices.dtype) data[:, 0] = x_new data[:, 1] = y_new return data
[docs]@njit(cache=True) def value_on_regular_contour(x_g, y_g, z_g, m_g, vertices, num_fac=2, fixed_size=None): x_val, y_val = vertices[:, 0], vertices[:, 1] x_new, y_new = uniform_resample(x_val, y_val, num_fac, fixed_size) return interp2d_geo(x_g, y_g, z_g, m_g, x_new[1:], y_new[1:])
[docs]@njit(cache=True) def mean_on_regular_contour( x_g, y_g, z_g, m_g, vertices, num_fac=2, fixed_size=None, nan_remove=False ): x_val, y_val = vertices[:, 0], vertices[:, 1] x_new, y_new = uniform_resample(x_val, y_val, num_fac, fixed_size) values = interp2d_geo(x_g, y_g, z_g, m_g, x_new[1:], y_new[1:]) if nan_remove: return nanmean(values) else: return values.mean()
[docs]def fit_circle_path(self, method="fit"): if not hasattr(self, "_circle_params"): self._circle_params = dict() if method not in self._circle_params.keys(): if method == "fit": self._circle_params["fit"] = _fit_circle_path(self.vertices) if method == "equal_area": self._circle_params["equal_area"] = _circle_from_equal_area(self.vertices) return self._circle_params[method]
@njit(cache=True, fastmath=True) def _circle_from_equal_area(vertice): lons, lats = vertice[:, 0], vertice[:, 1] # last coordinates == first lon0, lat0 = lons[1:].mean(), lats[1:].mean() c_x, c_y = coordinates_to_local(lons, lats, lon0, lat0) # Sometimes, edge is only a dot of few coordinates d_lon = lons.max() - lons.min() d_lat = lats.max() - lats.min() if d_lon < 1e-7 and d_lat < 1e-7: # logger.warning('An edge is only define in one position') # logger.debug('%d coordinates %s,%s', len(lons),lons, # lats) return 0, -90, nan, nan return lon0, lat0, (poly_area(c_x, c_y) / pi) ** 0.5, nan @njit(cache=True, fastmath=True) def _fit_circle_path(vertice): lons, lats = vertice[:, 0], vertice[:, 1] # last coordinates == first lon0, lat0 = lons[1:].mean(), lats[1:].mean() c_x, c_y = coordinates_to_local(lons, lats, lon0, lat0) # Some time, edge is only a dot of few coordinates d_lon = lons.max() - lons.min() d_lat = lats.max() - lats.min() if d_lon < 1e-7 and d_lat < 1e-7: # logger.warning('An edge is only define in one position') # logger.debug('%d coordinates %s,%s', len(lons),lons, # lats) return 0, -90, nan, nan centlon, centlat, eddy_radius, err = fit_circle(c_x, c_y) centlon, centlat = local_to_coordinates(centlon, centlat, lon0, lat0) centlon = (centlon - lon0 + 180) % 360 + lon0 - 180 return centlon, centlat, eddy_radius, err @njit(cache=True, fastmath=True) def _get_pixel_in_unregular(vertices, x_c, y_c, x_start, x_stop, y_start, y_stop): nb_x, nb_y = x_stop - x_start, y_stop - y_start wn = empty((nb_x, nb_y), dtype=numba_types.bool_) for i in range(nb_x): for j in range(nb_y): x_pt = x_c[i + x_start, j + y_start] y_pt = y_c[i + x_start, j + y_start] wn[i, j] = winding_number_poly(x_pt, y_pt, vertices) i_x, i_y = where(wn) i_x += x_start i_y += y_start return i_x, i_y BasePath.fit_circle = fit_circle_path
[docs]def pixels_in(self, grid): if not hasattr(self, "_slice"): self._slice = grid.bbox_indice(self.vertices) if not hasattr(self, "_pixels_in"): self._pixels_in = grid.get_pixels_in(self) return self._pixels_in
@property def bbox_slice(self): if not hasattr(self, "_slice"): raise Exception("No pixels_in call before!") return self._slice @property def pixels_index(self): if not hasattr(self, "_slice"): raise Exception("No pixels_in call before!") return self._pixels_in @property def nb_pixel(self): if not hasattr(self, "_pixels_in"): raise Exception("No pixels_in call before!") return self._pixels_in[0].shape[0] BasePath.pixels_in = pixels_in BasePath.pixels_index = pixels_index BasePath.bbox_slice = bbox_slice BasePath.nb_pixel = nb_pixel
[docs]class GridDataset(object): """ Class for basic tools on NetCDF Grid """ __slots__ = ( "x_c", "y_c", "x_bounds", "y_bounds", "centered", "x_dim", "y_dim", "coordinates", "filename", "dimensions", "indexs", "variables_description", "global_attrs", "vars", "contours", "nan_mask", ) GRAVITY = 9.807 EARTH_RADIUS = 6370997.0 # EARTH_RADIUS = 6378136.3 # indice margin (if put to 0, raise warning that i don't understand) N = 1 def __init__( self, filename, x_name, y_name, centered=None, indexs=None, unset=False, nan_masking=False, ): """ :param str filename: Filename to load :param str x_name: Name of longitude coordinates :param str y_name: Name of latitude coordinates :param bool,None centered: Allow to know how coordinates could be used with pixel :param dict indexs: A dictionary that sets indexes to use for non-coordinate dimensions :param bool unset: Set to True to create an empty grid object without file :param bool nan_masking: Set to True to replace data.mask with isnan method result """ self.dimensions = None self.variables_description = None self.global_attrs = None self.x_c = None self.y_c = None self.x_bounds = None self.y_bounds = None self.x_dim = None self.y_dim = None self.nan_mask = nan_masking self.centered = centered self.contours = None self.filename = filename self.coordinates = x_name, y_name self.vars = dict() self.indexs = dict() if indexs is None else indexs if centered is None: logger.warning( "We assume pixel position of grid is centered for %s", filename ) if not unset: self.populate()
[docs] def populate(self): if self.dimensions is None: self.load_general_features() self.load()
[docs] def clean(self): self.dimensions = None self.variables_description = None self.global_attrs = None self.x_c = None self.y_c = None self.x_bounds = None self.y_bounds = None self.x_dim = None self.y_dim = None self.contours = None self.vars = dict()
@property def is_centered(self): """Give True if pixel is described with its center's position or a corner :return: True if centered :rtype: bool """ if self.centered is None: return True else: return self.centered
[docs] def load_general_features(self): """Load attrs to be stored in object""" logger.debug( "Load general feature from %(filename)s", dict(filename=self.filename) ) with Dataset(self.filename) as h: # Load generals self.dimensions = {i: len(v) for i, v in h.dimensions.items()} self.variables_description = dict() for i, v in h.variables.items(): args = (i, v.datatype) kwargs = dict(dimensions=v.dimensions, zlib=True) if hasattr(v, "_FillValue"): kwargs["fill_value"] = (v._FillValue,) attrs = dict() for attr in v.ncattrs(): if attr in kwargs.keys(): continue if attr == "_FillValue": continue attrs[attr] = getattr(v, attr) self.variables_description[i] = dict( args=args, kwargs=kwargs, attrs=attrs, infos=dict() ) self.global_attrs = {attr: getattr(h, attr) for attr in h.ncattrs()}
[docs] def write(self, filename): """Write dataset output with same format as input :param str filename: filename used to save the grid """ with Dataset(filename, "w") as h_out: for dimension, size in self.dimensions.items(): test = False for varname, variable in self.variables_description.items(): if ( varname not in self.coordinates and varname not in self.vars.keys() ): continue if dimension in variable["kwargs"]["dimensions"]: test = True break if test: h_out.createDimension(dimension, size) for varname, variable in self.variables_description.items(): if varname not in self.coordinates and varname not in self.vars.keys(): continue var = h_out.createVariable(*variable["args"], **variable["kwargs"]) for key, value in variable["attrs"].items(): setattr(var, key, value) infos = self.variables_description[varname]["infos"] if infos.get("transpose", False): var[:] = self.vars[varname].T else: var[:] = self.vars[varname] for attr, value in self.global_attrs.items(): setattr(h_out, attr, value)
[docs] def load(self): """ Load variable (data). Get coordinates and setup coordinates function """ x_name, y_name = self.coordinates with Dataset(self.filename) as h: self.x_dim = h.variables[x_name].dimensions self.y_dim = h.variables[y_name].dimensions sl_x = [self.indexs.get(dim, slice(None)) for dim in self.x_dim] sl_y = [self.indexs.get(dim, slice(None)) for dim in self.y_dim] self.vars[x_name] = h.variables[x_name][sl_x] self.vars[y_name] = h.variables[y_name][sl_y] self.setup_coordinates()
[docs] @staticmethod def get_mask(a): if len(a.mask.shape): m = a.mask else: m = ones(a.shape, dtype="bool") if a.mask else zeros(a.shape, dtype="bool") return m
[docs] @staticmethod def c_to_bounds(c): """ Centered coordinates to bounds coordinates :param array c: centered coordinates to translate :return: bounds coordinates """ bounds = concatenate((c, (2 * c[-1] - c[-2],))) d = bounds[1:] - bounds[:-1] bounds[:-1] -= d / 2 bounds[-1] -= d[-1] / 2 return bounds
[docs] def setup_coordinates(self): x_name, y_name = self.coordinates if self.is_centered: # logger.info("Grid center") self.x_c = self.vars[x_name].astype("float64") self.y_c = self.vars[y_name].astype("float64") self.x_bounds = concatenate((self.x_c, (2 * self.x_c[-1] - self.x_c[-2],))) self.y_bounds = concatenate((self.y_c, (2 * self.y_c[-1] - self.y_c[-2],))) d_x = self.x_bounds[1:] - self.x_bounds[:-1] d_y = self.y_bounds[1:] - self.y_bounds[:-1] self.x_bounds[:-1] -= d_x / 2 self.x_bounds[-1] -= d_x[-1] / 2 self.y_bounds[:-1] -= d_y / 2 self.y_bounds[-1] -= d_y[-1] / 2 else: self.x_bounds = self.vars[x_name].astype("float64") self.y_bounds = self.vars[y_name].astype("float64") if len(self.x_dim) == 1: self.x_c = self.x_bounds.copy() dx2 = (self.x_bounds[1:] - self.x_bounds[:-1]) / 2 self.x_c[:-1] += dx2 self.x_c[-1] += dx2[-1] self.y_c = self.y_bounds.copy() dy2 = (self.y_bounds[1:] - self.y_bounds[:-1]) / 2 self.y_c[:-1] += dy2 self.y_c[-1] += dy2[-1] else: raise Exception("not write")
[docs] def is_circular(self): """Check grid circularity""" return False
[docs] def units(self, varname): """Get unit from variable""" stored_units = self.variables_description[varname]["attrs"].get("units", None) if stored_units is not None: return stored_units with Dataset(self.filename) as h: var = h.variables[varname] if hasattr(var, "units"): return var.units
@property def variables(self): return self.variables_description.keys()
[docs] def copy(self, grid_in, grid_out): """ Duplicate the variable from grid_in in grid_out :param grid_in: :param grid_out: """ h_dict = self.variables_description[grid_in] self.variables_description[grid_out] = dict( infos=h_dict["infos"].copy(), attrs=h_dict["attrs"].copy(), args=tuple((grid_out, *h_dict["args"][1:])), kwargs=h_dict["kwargs"].copy(), ) self.vars[grid_out] = self.grid(grid_in).copy()
[docs] def add_grid(self, varname, grid): """ Add a grid in handler :param str varname: name of the future grid :param array grid: grid array """ self.vars[varname] = grid
[docs] def grid(self, varname, indexs=None): """Give the grid required :param str varname: Variable to get :param dict,None indexs: If defined dict must have dimensions name as key :return: array asked, reduced by the indexes :rtype: array .. minigallery:: py_eddy_tracker.GridDataset.grid """ if indexs is None: indexs = dict() if varname not in self.vars: coordinates_dims = list(self.x_dim) coordinates_dims.extend(list(self.y_dim)) logger.debug( "Load %(varname)s from %(filename)s", dict(varname=varname, filename=self.filename), ) with Dataset(self.filename) as h: dims = h.variables[varname].dimensions sl = [ indexs.get( dim, self.indexs.get( dim, slice(None) if dim in coordinates_dims else 0 ), ) for dim in dims ] self.vars[varname] = h.variables[varname][sl] if len(self.x_dim) == 1: i_x = where(array(dims) == self.x_dim)[0][0] i_y = where(array(dims) == self.y_dim)[0][0] if i_x > i_y: self.variables_description[varname]["infos"]["transpose"] = True self.vars[varname] = self.vars[varname].T if self.nan_mask: self.vars[varname] = ma.array( self.vars[varname], mask=isnan(self.vars[varname]), ) if not hasattr(self.vars[varname], "mask"): self.vars[varname] = ma.array( self.vars[varname], mask=zeros(self.vars[varname].shape, dtype="bool"), ) return self.vars[varname]
[docs] def grid_tiles(self, varname, slice_x, slice_y): """Give the grid tiles required, without buffer system""" coordinates_dims = list(self.x_dim) coordinates_dims.extend(list(self.y_dim)) logger.debug( "Extract %(varname)s from %(filename)s with slice(x:%(slice_x)s,y:%(slice_y)s)", dict( varname=varname, filename=self.filename, slice_y=slice_y, slice_x=slice_x, ), ) with Dataset(self.filename) as h: dims = h.variables[varname].dimensions sl = [ (slice_x if dim in list(self.x_dim) else slice_y) if dim in coordinates_dims else 0 for dim in dims ] data = h.variables[varname][sl] if len(self.x_dim) == 1: i_x = where(array(dims) == self.x_dim)[0][0] i_y = where(array(dims) == self.y_dim)[0][0] if i_x > i_y: data = data.T if not hasattr(data, "mask"): data = ma.array(data, mask=zeros(data.shape, dtype="bool")) return data
[docs] def high_filter(self, grid_name, w_cut, **kwargs): """Return the high-pass filtered grid, by substracting to the initial grid the low-pass filtered grid (default: order=1) :param grid_name: the name of the grid :param int, w_cut: the half-power wavelength cutoff (km) """ result = self._low_filter(grid_name, w_cut, **kwargs) self.vars[grid_name] -= result
[docs] def low_filter(self, grid_name, w_cut, **kwargs): """Return the low-pass filtered grid (default: order=1) :param grid_name: the name of the grid :param int, w_cut: the half-power wavelength cutoff (km) """ result = self._low_filter(grid_name, w_cut, **kwargs) self.vars[grid_name] -= self.vars[grid_name] - result
@property def bounds(self): """Give bounds""" return ( self.x_bounds.min(), self.x_bounds.max(), self.y_bounds.min(), self.y_bounds.max(), )
[docs] def eddy_identification( self, grid_height, uname, vname, date, step=0.005, shape_error=55, presampling_multiplier=10, sampling=50, sampling_method="visvalingam", pixel_limit=None, precision=None, force_height_unit=None, force_speed_unit=None, **kwargs, ): """ Compute eddy identification on the specified grid :param str grid_height: Grid name of Sea Surface Height :param str uname: Grid name of u speed component :param str vname: Grid name of v speed component :param datetime.datetime date: Date to be stored in object to date data :param float,int step: Height between two layers in m :param float,int shape_error: Maximal error allowed for outermost contour in % :param int presampling_multiplier: Evenly oversample the initial number of points in the contour by nb_pts x presampling_multiplier to fit circles :param int sampling: Number of points to store contours and speed profile :param str sampling_method: Method to resample the stored contours, 'uniform' or 'visvalingam' :param (int,int),None pixel_limit: Min and max number of pixels inside the inner and the outermost contour to be considered as an eddy :param float,None precision: Truncate values at the defined precision in m :param str force_height_unit: Unit used for height unit :param str force_speed_unit: Unit used for speed unit :param dict kwargs: Arguments given to amplitude (mle, nb_step_min, nb_step_to_be_mle). Look at :py:meth:`py_eddy_tracker.eddy_feature.Amplitude` The amplitude threshold is given by `step*nb_step_min` :return: Return a list of 2 elements: Anticyclones and Cyclones :rtype: py_eddy_tracker.observations.observation.EddiesObservations .. minigallery:: py_eddy_tracker.GridDataset.eddy_identification """ if not isinstance(date, datetime): raise Exception("Date argument must be a datetime object") # The inf limit must be in pixel and sup limit in surface if pixel_limit is None: pixel_limit = (4, 1000) # Compute an interpolator for eke self.init_speed_coef(uname, vname) # Get unit of h grid h_units = ( self.units(grid_height) if force_height_unit is None else force_height_unit ) units = UnitRegistry() in_h_unit = units.parse_expression(h_units) if in_h_unit is not None: factor, _ = in_h_unit.to("m").to_tuple() logger.info( "We will apply on step a factor to be coherent with grid : %f", 1 / factor, ) step /= factor if precision is not None: precision /= factor # Get ssh grid data = self.grid(grid_height).astype("f8") # In case of a reduced mask if len(data.mask.shape) == 0 and not data.mask: data.mask = zeros(data.shape, dtype="bool") # we remove noisy data if precision is not None: data = (data / precision).round() * precision # Compute levels for ssh z_min, z_max = data.min(), data.max() d_z = z_max - z_min data_tmp = data[~data.mask] epsilon = 0.001 # in % z_min_p, z_max_p = ( percentile(data_tmp, epsilon), percentile(data_tmp, 100 - epsilon), ) d_zp = z_max_p - z_min_p if d_z / d_zp > 2: logger.warning( "Maybe some extrema are present zmin %f (m) and zmax %f (m) will be replace by %f and %f", z_min, z_max, z_min_p, z_max_p, ) z_min, z_max = z_min_p, z_max_p logger.debug("Levels from %f to %f", z_min, z_max) levels = arange(z_min - z_min % step, z_max - z_max % step + 2 * step, step) # Get x and y values x, y = self.x_c, self.y_c # Compute ssh contour self.contours = Contours(x, y, data, levels, wrap_x=self.is_circular()) out_sampling = dict(fixed_size=sampling) resample = visvalingam if sampling_method == "visvalingam" else uniform_resample track_extra_variables = [ "height_max_speed_contour", "height_external_contour", "height_inner_contour", "lon_max", "lat_max", ] array_variables = [ "contour_lon_e", "contour_lat_e", "contour_lon_s", "contour_lat_s", "uavg_profile", ] # Complete cyclonic and anticylonic research: a_and_c = list() for anticyclonic_search in [True, False]: eddies = list() iterator = 1 if anticyclonic_search else -1 # Loop over each collection for coll_ind, coll in enumerate(self.contours.iter(step=iterator)): corrected_coll_index = coll_ind if iterator == -1: corrected_coll_index = -coll_ind - 1 contour_paths = coll.get_paths() nb_paths = len(contour_paths) if nb_paths == 0: continue cvalues = self.contours.cvalues[corrected_coll_index] logger.debug( "doing collection %s, contour value %.4f, %d paths", corrected_coll_index, cvalues, nb_paths, ) # Loop over individual c_s contours (i.e., every eddy in field) for contour in contour_paths: if contour.used: continue # FIXME : center could be outside the contour due to the fit # FIXME : warning : the fit is made on raw sampling _, _, _, aerr = contour.fit_circle() # Filter for shape if aerr < 0 or aerr > shape_error or isnan(aerr): contour.reject = 1 continue # Find all pixels in the contour i_x_in, i_y_in = contour.pixels_in(self) # Check if pixels in contour are masked if has_masked_value(data.mask, i_x_in, i_y_in): if contour.reject == 0: contour.reject = 2 continue # Test of the rotating sense: cyclone or anticyclone if has_value( data, i_x_in, i_y_in, cvalues, below=anticyclonic_search ): continue # Test the number of pixels within the outermost contour # FIXME : Maybe limit max must be replaced with a maximum of surface if ( contour.nb_pixel < pixel_limit[0] or contour.nb_pixel > pixel_limit[1] ): contour.reject = 3 continue # Here the considered contour passed shape_error test, masked_pixels test, # values strictly above (AEs) or below (CEs) the contour, number_pixels test) # Compute amplitude reset_centroid, amp = self.get_amplitude( contour, cvalues, data, anticyclonic_search=anticyclonic_search, level=self.contours.levels[corrected_coll_index], interval=step, **kwargs, ) # If we have a valid amplitude if (not amp.within_amplitude_limits()) or (amp.amplitude == 0): contour.reject = 4 continue if reset_centroid: if self.is_circular(): centi = self.normalize_x_indice(reset_centroid[0]) else: centi = reset_centroid[0] centj = reset_centroid[1] # FIXME : To move in regular and unregular grid if len(x.shape) == 1: centlon_e = x[centi] centlat_e = y[centj] else: centlon_e = x[centi, centj] centlat_e = y[centi, centj] # centlat_e and centlon_e must be indexes of maximum, we will loose some inner contour if it's not ( max_average_speed, speed_contour, inner_contour, speed_array, i_max_speed, i_inner, ) = self.get_uavg( self.contours, centlon_e, centlat_e, contour, anticyclonic_search, corrected_coll_index, pixel_min=pixel_limit[0], ) # FIXME : Instantiate new EddyObservation object (high cost, need to be reviewed) obs = EddiesObservations( size=1, track_extra_variables=track_extra_variables, track_array_variables=sampling, array_variables=array_variables, ) obs.height_max_speed_contour[:] = self.contours.cvalues[i_max_speed] obs.height_external_contour[:] = cvalues obs.height_inner_contour[:] = self.contours.cvalues[i_inner] array_size = speed_array.shape[0] obs.nb_contour_selected[:] = array_size if speed_array.shape[0] == 1: obs.uavg_profile[:] = speed_array[0] else: obs.uavg_profile[:] = raw_resample(speed_array, sampling) obs.amplitude[:] = amp.amplitude obs.speed_average[:] = max_average_speed obs.num_point_e[:] = contour.lon.shape[0] obs.num_point_s[:] = speed_contour.lon.shape[0] # Evenly resample contours with nb_pts = nb_pts_original x presampling_multiplier xy_i = uniform_resample( inner_contour.lon, inner_contour.lat, num_fac=presampling_multiplier, ) xy_e = uniform_resample( contour.lon, contour.lat, num_fac=presampling_multiplier, ) xy_s = uniform_resample( speed_contour.lon, speed_contour.lat, num_fac=presampling_multiplier, ) # First, get position of max SSH based on best fit circle with resampled innermost contour centlon_i, centlat_i, _, _ = _fit_circle_path(create_vertice(*xy_i)) obs.lon_max[:] = centlon_i obs.lat_max[:] = centlat_i # Second, get speed-based radius, shape error, eddy center, area based on resampled contour of max uavg centlon_s, centlat_s, eddy_radius_s, aerr_s = _fit_circle_path( create_vertice(*xy_s) ) obs.radius_s[:] = eddy_radius_s obs.shape_error_s[:] = aerr_s obs.speed_area[:] = poly_area( *coordinates_to_local(*xy_s, lon0=centlon_s, lat0=centlat_s) ) obs.lon[:] = centlon_s obs.lat[:] = centlat_s # Third, compute effective radius, shape error, area from resampled effective contour _, _, eddy_radius_e, aerr_e = _fit_circle_path( create_vertice(*xy_e) ) obs.radius_e[:] = eddy_radius_e obs.shape_error_e[:] = aerr_e obs.effective_area[:] = poly_area( *coordinates_to_local(*xy_e, lon0=centlon_s, lat0=centlat_s) ) # Finally, resample contours with output parameters xy_e_f = resample(*xy_e, **out_sampling) xy_s_f = resample(*xy_s, **out_sampling) obs.contour_lon_s[:], obs.contour_lat_s[:] = xy_s_f obs.contour_lon_e[:], obs.contour_lat_e[:] = xy_e_f if aerr > 99.9 or aerr_s > 99.9: logger.warning( "Strange shape at this step! shape_error : %f, %f", aerr, aerr_s, ) eddies.append(obs) # To reserve definitively the area data.mask[i_x_in, i_y_in] = True if len(eddies) == 0: eddies = EddiesObservations( track_extra_variables=track_extra_variables, track_array_variables=sampling, array_variables=array_variables, ) else: eddies = EddiesObservations.concatenate(eddies) eddies.sign_type = 1 if anticyclonic_search else -1 eddies.time[:] = (date - datetime(1950, 1, 1)).total_seconds() / 86400.0 # normalization longitude between 0 - 360, because storage have an offset on 180 eddies.lon_max[:] %= 360 eddies.lon[:] %= 360 ref = eddies.lon - 180 eddies.contour_lon_e[:] = ((eddies.contour_lon_e.T - ref) % 360 + ref).T eddies.contour_lon_s[:] = ((eddies.contour_lon_s.T - ref) % 360 + ref).T a_and_c.append(eddies) if in_h_unit is not None: for name in [ "amplitude", "height_max_speed_contour", "height_external_contour", "height_inner_contour", ]: out_unit = units.parse_expression(VAR_DESCR[name]["nc_attr"]["units"]) factor, _ = in_h_unit.to(out_unit).to_tuple() a_and_c[0].obs[name] *= factor a_and_c[1].obs[name] *= factor u_units = self.units(uname) if force_speed_unit is None else force_speed_unit in_u_units = units.parse_expression(u_units) if in_u_units is not None: for name in ["speed_average", "uavg_profile"]: out_unit = units.parse_expression(VAR_DESCR[name]["nc_attr"]["units"]) factor, _ = in_u_units.to(out_unit).to_tuple() a_and_c[0].obs[name] *= factor a_and_c[1].obs[name] *= factor return a_and_c
[docs] def get_uavg( self, all_contours, centlon_e, centlat_e, original_contour, anticyclonic_search, level_start, pixel_min=3, ): """ Compute geostrophic speed around successive contours Returns the average """ # Init max speed to search maximum max_average_speed = self.speed_coef_mean(original_contour) speed_array = [max_average_speed] eddy_contours = [original_contour] inner_contour = selected_contour = original_contour # Must start only on upper or lower contour, no need to test the two part step = 1 if anticyclonic_search else -1 i_inner = i_max_speed = -1 for i, coll in enumerate( all_contours.iter(start=level_start + step, step=step) ): level_contour = coll.get_nearest_path_bbox_contain_pt(centlon_e, centlat_e) # Leave loop if no contours at level if level_contour is None: break # Ensure polygon_i is within polygon_e if not poly_contain_poly(original_contour.vertices, level_contour.vertices): break # 3. Respect size range (for max speed) # nb_pixel properties need to call pixels_in before with a grid of pixel level_contour.pixels_in(self) # Interpolate uspd to seglon, seglat, then get mean level_average_speed = self.speed_coef_mean(level_contour) speed_array.append(level_average_speed) if ( pixel_min < level_contour.nb_pixel and level_average_speed >= max_average_speed ): max_average_speed = level_average_speed i_max_speed = i selected_contour = level_contour inner_contour = level_contour eddy_contours.append(level_contour) i_inner = i for contour in eddy_contours: contour.used = True i_max_speed = level_start + step + step * i_max_speed i_inner = level_start + step + step * i_inner return ( max_average_speed, selected_contour, inner_contour, array(speed_array), i_max_speed, i_inner, )
@staticmethod def _gaussian_filter(data, sigma, mode="reflect"): """Standard gaussian filter""" local_data = data.copy() local_data[data.mask] = 0 v = gaussian_filter(local_data, sigma=sigma, mode=mode) w = gaussian_filter(float_(~data.mask), sigma=sigma, mode=mode) with errstate(invalid="ignore"): return ma.array(v / w, mask=w == 0)
[docs] @staticmethod def get_amplitude( contour, contour_height, data, anticyclonic_search=True, level=None, **kwargs ): # Instantiate Amplitude object amp = Amplitude( # Indices of all pixels in contour contour=contour, # Height of level contour_height=contour_height, # All grid data=data, **kwargs, ) if anticyclonic_search: reset_centroid = amp.all_pixels_above_h0(level) else: reset_centroid = amp.all_pixels_below_h0(level) return reset_centroid, amp
[docs]class UnRegularGridDataset(GridDataset): """Class managing unregular grid""" __slots__ = ( "index_interp", "_speed_norm", )
[docs] def load(self): """Load variable (data)""" x_name, y_name = self.coordinates with Dataset(self.filename) as h: self.x_dim = h.variables[x_name].dimensions self.y_dim = h.variables[y_name].dimensions sl_x = [self.indexs.get(dim, slice(None)) for dim in self.x_dim] sl_y = [self.indexs.get(dim, slice(None)) for dim in self.y_dim] self.vars[x_name] = h.variables[x_name][sl_x] self.vars[y_name] = h.variables[y_name][sl_y] self.x_c = self.vars[x_name] self.y_c = self.vars[y_name] self.init_pos_interpolator()
@property def bounds(self): """Give bounds""" return self.x_c.min(), self.x_c.max(), self.y_c.min(), self.y_c.max()
[docs] def bbox_indice(self, vertices): dist, idx = self.index_interp.query(vertices, k=1) i_y = idx % self.x_c.shape[1] i_x = int_((idx - i_y) / self.x_c.shape[1]) return ( (max(i_x.min() - self.N, 0), i_x.max() + self.N + 1), (max(i_y.min() - self.N, 0), i_y.max() + self.N + 1), )
[docs] def get_pixels_in(self, contour): (x_start, x_stop), (y_start, y_stop) = contour.bbox_slice return _get_pixel_in_unregular( contour.vertices, self.x_c, self.y_c, x_start, x_stop, y_start, y_stop )
[docs] def normalize_x_indice(self, indices): """Not do""" return indices
[docs] def nearest_grd_indice(self, x, y): dist, idx = self.index_interp.query((x, y), k=1) i_y = idx % self.x_c.shape[1] i_x = int_((idx - i_y) / self.x_c.shape[1]) return i_x, i_y
[docs] def compute_pixel_path(self, x0, y0, x1, y1): pass
[docs] def init_pos_interpolator(self): logger.debug("Create a KdTree, could be long ...") self.index_interp = cKDTree( create_vertice(self.x_c.reshape(-1), self.y_c.reshape(-1)) ) logger.debug("... OK")
def _low_filter(self, grid_name, w_cut, factor=8.0): data = self.grid(grid_name) x = self.grid(self.coordinates[0]) y = self.grid(self.coordinates[1]) regrid_step = w_cut / 111.0 / factor x_min, x_max, y_min, y_max = self.bounds x_array = arange(x_min, x_max + regrid_step, regrid_step) y_array = arange(y_min, min(y_max + regrid_step, 89), regrid_step) bins = (x_array, y_array) x_flat, y_flat, z_flat = x.reshape((-1,)), y.reshape((-1,)), data.reshape((-1,)) m = ~self.get_mask(z_flat) x_flat, y_flat, z_flat = x_flat[m], y_flat[m], z_flat[m] nb_value, _, _ = histogram2d(x_flat, y_flat, bins=bins) sum_value, _, _ = histogram2d(x_flat, y_flat, bins=bins, weights=z_flat) with errstate(invalid="ignore"): z_grid = ma.array(sum_value / nb_value, mask=nb_value == 0) regular_grid = RegularGridDataset.with_array( coordinates=self.coordinates, datas={ grid_name: z_grid, self.coordinates[0]: x_array[:-1], self.coordinates[1]: y_array[:-1], }, centered=False, ) regular_grid.bessel_low_filter(grid_name, w_cut, order=1) z_filtered = regular_grid.grid(grid_name) x_center = (x_array[:-1] + x_array[1:]) / 2 y_center = (y_array[:-1] + y_array[1:]) / 2 opts_interpolation = dict(kx=1, ky=1, s=0) m_interp = RectBivariateSpline( x_center, y_center, z_filtered.mask, **opts_interpolation ) z_filtered.data[z_filtered.mask] = 0 z_interp = RectBivariateSpline( x_center, y_center, z_filtered.data, **opts_interpolation ).ev(x, y) return ma.array(z_interp, mask=m_interp.ev(x, y) > 0.00001)
[docs] def speed_coef_mean(self, contour): dist, idx = self.index_interp.query( uniform_resample_stack(contour.vertices)[1:], k=4 ) i_y = idx % self.x_c.shape[1] i_x = int_((idx - i_y) / self.x_c.shape[1]) # A simplified solution to be change by a weight mean return self._speed_norm[i_x, i_y].mean(axis=1).mean()
[docs] def init_speed_coef(self, uname="u", vname="v"): self._speed_norm = (self.grid(uname) ** 2 + self.grid(vname) ** 2) ** 0.5
[docs]class RegularGridDataset(GridDataset): """Class only for regular grid""" __slots__ = ( "_speed_ev", "_is_circular", "x_size", "_x_step", "_y_step", ) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._is_circular = None
[docs] def setup_coordinates(self): super().setup_coordinates() self.x_size = self.x_c.shape[0] if len(self.x_c.shape) != 1: raise Exception( "Coordinates in RegularGridDataset must be 1D array, or think to use UnRegularGridDataset" ) dx = self.x_bounds[1:] - self.x_bounds[:-1] dy = self.y_bounds[1:] - self.y_bounds[:-1] if (dx < 0).any() or (dy < 0).any(): raise Exception( "Coordinates in RegularGridDataset must be strictly increasing" ) self._x_step = (self.x_c[1:] - self.x_c[:-1]).mean() self._y_step = (self.y_c[1:] - self.y_c[:-1]).mean()
[docs] @classmethod def with_array(cls, coordinates, datas, variables_description=None, **kwargs): """ Geo matrix data must be ordered like this (X,Y) and masked with numpy.ma.array """ vd = dict() if variables_description is None else variables_description x_name, y_name = coordinates[0], coordinates[1] obj = cls("array", x_name, y_name, unset=True, **kwargs) obj.x_dim = (x_name,) obj.y_dim = (y_name,) obj.variables_description = dict() obj.dimensions = {i: v.shape[0] for i, v in datas.items() if i in coordinates} for k, v in datas.items(): obj.vars[k] = v obj.variables_description[k] = dict( attrs=vd.get(k, dict()), args=(k, v.dtype), kwargs=dict( dimensions=coordinates if k not in coordinates else (k,), complevel=1, zlib=True, ), infos=dict(), ) obj.global_attrs = dict(history="Grid setup with an array") obj.setup_coordinates() return obj
[docs] def bbox_indice(self, vertices): return bbox_indice_regular( vertices, self.x_bounds, self.y_bounds, self.xstep, self.ystep, self.N, self.is_circular(), self.x_size, )
[docs] def get_pixels_in(self, contour): """ Get indexes of pixels in contour. :param vertice,Path contour: Contour that encloses some pixels :return: Indexes of grid in contour :rtype: array[int],array[int] """ if isinstance(contour, BasePath): (x_start, x_stop), (y_start, y_stop) = contour.bbox_slice return get_pixel_in_regular( contour.vertices, self.x_c, self.y_c, x_start, x_stop, y_start, y_stop ) else: (x_start, x_stop), (y_start, y_stop) = self.bbox_indice(contour) return get_pixel_in_regular( contour, self.x_c, self.y_c, x_start, x_stop, y_start, y_stop )
[docs] def normalize_x_indice(self, indices): return indices % self.x_size
[docs] def nearest_grd_indice(self, x, y): return nearest_grd_indice( x, y, self.x_bounds, self.y_bounds, self.xstep, self.ystep )
@property def xstep(self): """Only for regular grid with no step variation""" return self._x_step @property def ystep(self): """Only for regular grid with no step variation""" return self._y_step
[docs] def compute_pixel_path(self, x0, y0, x1, y1): """Give a series of indexes describing the path between two positions""" return compute_pixel_path( x0, y0, x1, y1, self.x_bounds[0], self.y_bounds[0], self.xstep, self.ystep, self.x_size, )
[docs] def clean_land(self): """Function to remove all land pixel""" pass
[docs] def is_circular(self): """Check if the grid is circular""" if self._is_circular is None: self._is_circular = ( abs((self.x_bounds[0] % 360) - (self.x_bounds[-1] % 360)) < 0.0001 ) return self._is_circular
[docs] @staticmethod def check_order(order): if order < 1: logger.warning("order must be superior to 0") return ceil(order).astype(int)
[docs] def get_step_in_km(self, lat, wave_length): step_y_km = self.ystep * distance(0, 0, 0, 1) / 1000 step_x_km = self.xstep * distance(0, lat, 1, lat) / 1000 min_wave_length = max(step_x_km, step_y_km) * 2 if wave_length < min_wave_length: logger.error( "wave_length too short for resolution, must be > %d km", ceil(min_wave_length), ) raise Exception() return step_x_km, step_y_km
[docs] def estimate_kernel_shape(self, lat, wave_length, order): step_x_km, step_y_km = self.get_step_in_km(lat, wave_length) # half size will be multiply with by order half_x_pt, half_y_pt = ( ceil(wave_length / step_x_km).astype(int), ceil(wave_length / step_y_km).astype(int), ) # x size is not good over 60 degrees y = arange( lat - self.ystep * half_y_pt * order, lat + self.ystep * half_y_pt * order + 0.01 * self.ystep, self.ystep, ) # We compute half + 1 and the other part will be compute by symetry x = arange(0, self.xstep * half_x_pt * order + 0.01 * self.xstep, self.xstep) y, x = meshgrid(y, x) dist_norm = distance(0, lat, x, y) / 1000.0 / wave_length return half_x_pt, half_y_pt, dist_norm
[docs] def finalize_kernel(self, kernel, order, half_x_pt, half_y_pt): # Symetry kernel_ = empty((half_x_pt * 2 * order + 1, half_y_pt * 2 * order + 1)) kernel_[half_x_pt * order :] = kernel kernel_[: half_x_pt * order] = kernel[:0:-1] # remove unused row/column k_valid = kernel_ != 0 x_valid = where(k_valid.sum(axis=1))[0] x_slice = slice(x_valid[0], x_valid[-1] + 1) y_valid = where(k_valid.sum(axis=0))[0] y_slice = slice(y_valid[0], y_valid[-1] + 1) return kernel_[x_slice, y_slice]
[docs] def kernel_lanczos(self, lat, wave_length, order=1): """Not really operational wave_length in km order must be int """ order = self.check_order(order) half_x_pt, half_y_pt, dist_norm = self.estimate_kernel_shape( lat, wave_length, order ) kernel = sinc(dist_norm / order) * sinc(dist_norm) kernel[dist_norm > order] = 0 return self.finalize_kernel(kernel, order, half_x_pt, half_y_pt)
[docs] def kernel_bessel(self, lat, wave_length, order=1): """wave_length in km order must be int """ order = self.check_order(order) half_x_pt, half_y_pt, dist_norm = self.estimate_kernel_shape( lat, wave_length, order ) with errstate(invalid="ignore"): kernel = sinc(dist_norm / order) * j1(2 * pi * dist_norm) / dist_norm kernel[0, half_y_pt * order] = pi kernel[dist_norm > order] = 0 return self.finalize_kernel(kernel, order, half_x_pt, half_y_pt)
def _low_filter(self, grid_name, w_cut, **kwargs): """low filtering""" return self.convolve_filter_with_dynamic_kernel( grid_name, self.kernel_bessel, wave_length=w_cut, **kwargs )
[docs] def convolve_filter_with_dynamic_kernel( self, grid, kernel_func, lat_max=85, extend=False, **kwargs_func ): """ :param str grid: grid name :param func kernel_func: function of kernel to use :param float lat_max: absolute latitude above no filtering apply :param bool extend: if False, only non masked value will return a filtered value :param dict kwargs_func: look at kernel_func :return: filtered value :rtype: array """ if (abs(self.y_c) > lat_max).any(): logger.warning("No filtering above %f degrees of latitude", lat_max) if isinstance(grid, str): data = self.grid(grid).copy() else: data = grid.copy() # Matrix for result data_out = ma.empty(data.shape) data_out.mask = ones(data_out.shape, dtype=bool) nb_lines = self.y_c.shape[0] dt = list() debug_active = logger.getEffectiveLevel() == logging.DEBUG for i, lat in enumerate(self.y_c): if abs(lat) > lat_max or data[:, i].mask.all(): data_out.mask[:, i] = True continue # Get kernel kernel = kernel_func(lat, **kwargs_func) # Kernel shape k_shape = kernel.shape t0 = datetime.now() if debug_active and len(dt) > 0: dt_mean = np_mean(dt) * (nb_lines - i) print( "Remain ", dt_mean, "ETA ", t0 + dt_mean, "current kernel size :", k_shape, "Step : %d/%d " % (i, nb_lines), end="\r", ) # Half size, k_shape must be always impair d_lat = int((k_shape[1] - 1) / 2) d_lon = int((k_shape[0] - 1) / 2) # Temporary matrix to have exact shape at outuput tmp_matrix = ma.zeros((2 * d_lon + data.shape[0], k_shape[1])) tmp_matrix.mask = ones(tmp_matrix.shape, dtype=bool) # Slice to apply on input data # +1 for upper bound, to take in acount this column sl_lat_data = slice(max(0, i - d_lat), min(i + d_lat + 1, data.shape[1])) # slice to apply on temporary matrix to store input data sl_lat_in = slice( d_lat - (i - sl_lat_data.start), d_lat + (sl_lat_data.stop - i) ) # If global => manual wrapping if self.is_circular(): tmp_matrix[:d_lon, sl_lat_in] = data[-d_lon:, sl_lat_data] tmp_matrix[-d_lon:, sl_lat_in] = data[:d_lon, sl_lat_data] # Copy data tmp_matrix[d_lon:-d_lon, sl_lat_in] = data[:, sl_lat_data] # Convolution m = ~tmp_matrix.mask tmp_matrix[~m] = 0 demi_x, demi_y = k_shape[0] // 2, k_shape[1] // 2 values_sum = filter2D(tmp_matrix.data, -1, kernel)[demi_x:-demi_x, demi_y] kernel_sum = filter2D(m.astype(float), -1, kernel)[demi_x:-demi_x, demi_y] with errstate(invalid="ignore", divide="ignore"): if extend: data_out[:, i] = ma.array( values_sum / kernel_sum, mask=kernel_sum < (extend * kernel.sum()), ) else: data_out[:, i] = values_sum / kernel_sum dt.append(datetime.now() - t0) if len(dt) == 100: dt.pop(0) if extend: out = ma.array(data_out, mask=data_out.mask) else: out = ma.array(data_out, mask=data.mask + data_out.mask) if debug_active: print() if out.dtype != data.dtype: return out.astype(data.dtype) return out
[docs] def lanczos_high_filter( self, grid_name, wave_length, order=1, lat_max=85, **kwargs ): logger.warning("It could be not safe to use lanczos filter") data_out = self.convolve_filter_with_dynamic_kernel( grid_name, self.kernel_lanczos, lat_max=lat_max, wave_length=wave_length, order=order, **kwargs, ) self.vars[grid_name] -= data_out
[docs] def lanczos_low_filter(self, grid_name, wave_length, order=1, lat_max=85, **kwargs): logger.warning("It could be not safe to use lanczos filter") data_out = self.convolve_filter_with_dynamic_kernel( grid_name, self.kernel_lanczos, lat_max=lat_max, wave_length=wave_length, order=order, **kwargs, ) self.vars[grid_name] = data_out
[docs] def bessel_band_filter(self, grid_name, wave_length_inf, wave_length_sup, **kwargs): data_out = self.convolve_filter_with_dynamic_kernel( grid_name, self.kernel_bessel, wave_length=wave_length_inf, **kwargs ) self.vars[grid_name] = data_out data_out = self.convolve_filter_with_dynamic_kernel( grid_name, self.kernel_bessel, wave_length=wave_length_sup, **kwargs ) self.vars[grid_name] -= data_out
[docs] def bessel_high_filter(self, grid_name, wave_length, order=1, lat_max=85, **kwargs): """ :param str grid_name: grid to filter, data will replace original one :param float wave_length: in km :param int order: order to use, if > 1 negative values of the cardinal sinus are present in kernel :param float lat_max: absolute latitude, no filtering above :param dict kwargs: look at :py:meth:`RegularGridDataset.convolve_filter_with_dynamic_kernel` .. minigallery:: py_eddy_tracker.RegularGridDataset.bessel_high_filter """ logger.debug( "Run filtering with wavelength of %(wave_length)s km and order of %(order)s ...", dict(wave_length=wave_length, order=order), ) data_out = self.convolve_filter_with_dynamic_kernel( grid_name, self.kernel_bessel, lat_max=lat_max, wave_length=wave_length, order=order, **kwargs, ) logger.debug("Filtering done") self.vars[grid_name] -= data_out
[docs] def bessel_low_filter(self, grid_name, wave_length, order=1, lat_max=85, **kwargs): data_out = self.convolve_filter_with_dynamic_kernel( grid_name, self.kernel_bessel, lat_max=lat_max, wave_length=wave_length, order=order, **kwargs, ) self.vars[grid_name] = data_out
[docs] def spectrum_lonlat(self, grid_name, area=None, ref=None, **kwargs): if area is None: area = dict(llcrnrlon=190, urcrnrlon=280, llcrnrlat=-62, urcrnrlat=8) scaling = kwargs.pop("scaling", "density") ref_grid_name = kwargs.pop("ref_grid_name", None) x0, y0 = self.nearest_grd_indice(area["llcrnrlon"], area["llcrnrlat"]) x1, y1 = self.nearest_grd_indice(area["urcrnrlon"], area["urcrnrlat"]) data = self.grid(grid_name)[x0:x1, y0:y1] # Lat spectrum pws = list() step_y_km = self.ystep * distance(0, 0, 0, 1) / 1000 nb_invalid = 0 for i, _ in enumerate(self.x_c[x0:x1]): f, pw = welch(data[i, :], 1 / step_y_km, scaling=scaling, **kwargs) if isnan(pw).any(): nb_invalid += 1 continue pws.append(pw) if nb_invalid: logger.warning("%d/%d columns invalid", nb_invalid, i + 1) with errstate(divide="ignore"): lat_content = 1 / f, array(pws).mean(axis=0) # Lon spectrum fs, pws = list(), list() f_min, f_max = None, None nb_invalid = 0 for i, lat in enumerate(self.y_c[y0:y1]): step_x_km = self.xstep * distance(0, lat, 1, lat) / 1000 f, pw = welch(data[:, i], 1 / step_x_km, scaling=scaling, **kwargs) if isnan(pw).any(): nb_invalid += 1 continue if f_min is None: f_min = f.min() f_max = f.max() else: f_min = max(f_min, f.min()) f_max = min(f_max, f.max()) fs.append(f) pws.append(pw) if nb_invalid: logger.warning("%d/%d lines invalid", nb_invalid, i + 1) f_interp = linspace(f_min, f_max, f.shape[0]) pw_m = array( [ interp1d(f, pw, fill_value=0.0, bounds_error=False)(f_interp) for f, pw in zip(fs, pws) ] ).mean(axis=0) with errstate(divide="ignore"): lon_content = 1 / f_interp, pw_m if ref is None: return lon_content, lat_content else: if ref_grid_name is not None: grid_name = ref_grid_name ref_lon_content, ref_lat_content = ref.spectrum_lonlat( grid_name, area, **kwargs ) return ( (lon_content[0], lon_content[1] / ref_lon_content[1]), (lat_content[0], lat_content[1] / ref_lat_content[1]), )
[docs] def compute_finite_difference(self, data, schema=1, mode="reflect", vertical=False): if not isinstance(schema, int) and schema < 1: raise Exception("schema must be a positive int") data2 = data.copy() data1 = data.copy() if vertical: data1[:, :-schema] = data[:, schema:] data2[:, schema:] = data[:, :-schema] # put nan data1[:, -schema:] = nan data2[:, :schema] = nan else: data1[:-schema] = data[schema:] data2[schema:] = data[:-schema] if mode == "wrap": data1[-schema:] = data[:schema] data2[:schema] = data[-schema:] else: # put nan data1[-schema:] = nan data2[:schema] = nan d = self.EARTH_RADIUS * 2 * pi / 360 * 2 * schema if vertical: d *= self.ystep else: d *= self.xstep * cos(deg2rad(self.y_c)) return (data1 - data2) / d
[docs] def compute_stencil( self, data, stencil_halfwidth=4, mode="reflect", vertical=False ): r""" Apply stencil ponderation on field. :param array data: array where apply stencil :param int stencil_halfwidth: from 1 t0 4, maximal stencil used :param str mode: convolution mode :param bool vertical: if True, method apply a vertical convolution :return: gradient array from stencil application :rtype: array Short story, how to get stencil coefficient for stencil (3 points, 5 points and 7 points) Taylor's theorem: .. math:: f(x \pm h) = f(x) \pm f'(x)h + \frac{f''(x)h^2}{2!} \pm \frac{f^{(3)}(x)h^3}{3!} + \frac{f^{(4)}(x)h^4}{4!} \pm \frac{f^{(5)}(x)h^5}{5!} + O(h^6) If we stop at `O(h^2)`, we get classic differenciation (stencil 3 points): .. math:: f(x+h) - f(x-h) = f(x) - f(x) + 2 f'(x)h + O(h^2) .. math:: f'(x) = \frac{f(x+h) - f(x-h)}{2h} + O(h^2) If we stop at `O(h^4)`, we will get stencil 5 points: .. math:: f(x+h) - f(x-h) = 2 f'(x)h + 2 \frac{f^{(3)}(x)h^3}{3!} + O(h^4) :label: E1 .. math:: f(x+2h) - f(x-2h) = 4 f'(x)h + 16 \frac{f^{(3)}(x)h^3}{3!} + O(h^4) :label: E2 If we multiply equation :eq:`E1` by 8 and substract equation :eq:`E2`, we get: .. math:: 8(f(x+h) - f(x-h)) - (f(x+2h) - f(x-2h)) = 16 f'(x)h - 4 f'(x)h + O(h^4) .. math:: f'(x) = \frac{f(x-2h) - 8f(x-h) + 8f(x+h) - f(x+2h)}{12h} + O(h^4) If we stop at `O(h^6)`, we will get stencil 7 points: .. math:: f(x+h) - f(x-h) = 2 f'(x)h + 2 \frac{f^{(3)}(x)h^3}{3!} + 2 \frac{f^{(5)}(x)h^5}{5!} + O(h^6) :label: E3 .. math:: f(x+2h) - f(x-2h) = 4 f'(x)h + 16 \frac{f^{(3)}(x)h^3}{3!} + 64 \frac{f^{(5)}(x)h^5}{5!} + O(h^6) :label: E4 .. math:: f(x+3h) - f(x-3h) = 6 f'(x)h + 54 \frac{f^{(3)}(x)h^3}{3!} + 486 \frac{f^{(5)}(x)h^5}{5!} + O(h^6) :label: E5 If we multiply equation :eq:`E3` by 45 and substract equation :eq:`E4` multiply by 9 and add equation :eq:`E5`, we get: .. math:: 45(f(x+h) - f(x-h)) - 9(f(x+2h) - f(x-2h)) + (f(x+3h) - f(x-3h)) = 90 f'(x)h - 36 f'(x)h + 6 f'(x)h + O(h^6) .. math:: f'(x) = \frac{-f(x-3h) + 9f(x-2h) - 45f(x-h) + 45f(x+h) - 9f(x+2h) +f(x+3h)}{60h} + O(h^6) ... """ stencil_halfwidth = max(min(int(stencil_halfwidth), 4), 1) logger.debug("Stencil half width apply : %d", stencil_halfwidth) g, m = compute_stencil( self.x_c, self.y_c, data.data, self.get_mask(data), self.EARTH_RADIUS, vertical=vertical, stencil_halfwidth=stencil_halfwidth, ) return ma.array(g, mask=m)
[docs] def add_uv_lagerloef(self, grid_height, uname="u", vname="v", schema=15): self.add_uv(grid_height, uname, vname) latmax = 5 _, (i_start, i_end) = self.nearest_grd_indice((0, 0), (-latmax, latmax)) sl = slice(i_start, i_end) # Divide by sideral day lat = self.y_c[sl] gob = ( cos(deg2rad(lat)) * ones((self.x_c.shape[0], 1)) * 4.0 * pi / (23 * 3600 + 56 * 60 + 4.1) / self.EARTH_RADIUS ) with errstate(divide="ignore"): gob = self.GRAVITY / (gob * ones((self.x_c.shape[0], 1))) mode = "wrap" if self.is_circular() else "reflect" # fill data to compute a finite difference on all point data = self.convolve_filter_with_dynamic_kernel( grid_height, self.kernel_bessel, lat_max=10, wave_length=500, order=1, extend=0.1, ) data = self.convolve_filter_with_dynamic_kernel( data, self.kernel_bessel, lat_max=10, wave_length=500, order=1, extend=0.1 ) data = self.convolve_filter_with_dynamic_kernel( data, self.kernel_bessel, lat_max=10, wave_length=500, order=1, extend=0.1 ) v_lagerloef = ( self.compute_finite_difference( self.compute_finite_difference(data, mode=mode, schema=schema), mode=mode, schema=schema, )[:, sl] * gob ) u_lagerloef = ( -self.compute_finite_difference( self.compute_finite_difference(data, vertical=True, schema=schema), vertical=True, schema=schema, )[:, sl] * gob ) w = 1 - exp(-((lat / 2.2) ** 2)) self.vars[vname][:, sl] = self.vars[vname][:, sl] * w + v_lagerloef * (1 - w) self.vars[uname][:, sl] = self.vars[uname][:, sl] * w + u_lagerloef * (1 - w)
[docs] def add_uv(self, grid_height, uname="u", vname="v", stencil_halfwidth=4): r"""Compute a u and v grid :param str grid_height: grid name where the funtion will apply stencil method :param str uname: future name of u :param str vname: future name of v :param int stencil_halfwidth: largest stencil could be apply (max: 4) .. math:: u = \frac{g}{f} \frac{dh}{dy} v = -\frac{g}{f} \frac{dh}{dx} where .. math:: g = gravity f = 2 \Omega sin(\phi) .. minigallery:: py_eddy_tracker.RegularGridDataset.add_uv """ logger.info("Add u/v variable with stencil method") data = self.grid(grid_height) h_dict = self.variables_description[grid_height] for variable in (uname, vname): self.variables_description[variable] = dict( infos=h_dict["infos"].copy(), attrs=h_dict["attrs"].copy(), args=tuple((variable, *h_dict["args"][1:])), kwargs=h_dict["kwargs"].copy(), ) if "units" in self.variables_description[variable]["attrs"]: self.variables_description[variable]["attrs"]["units"] += "/s" if "long_name" in self.variables_description[variable]["attrs"]: self.variables_description[variable]["attrs"][ "long_name" ] += " gradient" # Divide by sideral day gof = ( sin(deg2rad(self.y_c)) * ones((self.x_c.shape[0], 1)) * 4.0 * pi / (23 * 3600 + 56 * 60 + 4.1) ) with errstate(divide="ignore"): gof = self.GRAVITY / (gof * ones((self.x_c.shape[0], 1))) # Compute v mode = "wrap" if self.is_circular() else "reflect" self.vars[vname] = ( self.compute_stencil(data, mode=mode, stencil_halfwidth=stencil_halfwidth) * gof ) # Compute u self.vars[uname] = ( -self.compute_stencil( data, vertical=True, stencil_halfwidth=stencil_halfwidth ) * gof )
[docs] def speed_coef_mean(self, contour): """Some nan can be computed over contour if we are near borders, something to explore """ return mean_on_regular_contour( self.x_c, self.y_c, self._speed_ev, self._speed_ev.mask, contour.vertices, nan_remove=True, )
[docs] def init_speed_coef(self, uname="u", vname="v"): """Draft""" self._speed_ev = (self.grid(uname) ** 2 + self.grid(vname) ** 2) ** 0.5
[docs] def display(self, ax, name, factor=1, ref=None, **kwargs): """ :param matplotlib.axes.Axes ax: matplotlib axes used to draw :param str,array name: variable to display, could be an array :param float factor: multiply grid by :param float,None ref: if defined, all coordinates are wrapped with ref as western boundary :param dict kwargs: look at :py:meth:`matplotlib.axes.Axes.pcolormesh` .. minigallery:: py_eddy_tracker.RegularGridDataset.display """ if "cmap" not in kwargs: kwargs["cmap"] = "coolwarm" data = self.grid(name) if isinstance(name, str) else name if ref is None: x = self.x_bounds else: x = (self.x_c - ref) % 360 + ref i = x.argsort() x = self.c_to_bounds(x[i]) data = data[i] return ax.pcolormesh(x, self.y_bounds, data.T * factor, **kwargs)
[docs] def contour(self, ax, name, factor=1, ref=None, **kwargs): """ :param matplotlib.axes.Axes ax: matplotlib axes used to draw :param str,array name: variable to display, could be an array :param float factor: multiply grid by :param float,None ref: if defined, all coordinates are wrapped with ref as western boundary :param dict kwargs: look at :py:meth:`matplotlib.axes.Axes.contour` .. minigallery:: py_eddy_tracker.RegularGridDataset.contour """ data = self.grid(name) if isinstance(name, str) else name if ref is None: x = self.x_c else: x = (self.x_c - ref) % 360 + ref i = x.argsort() x = x[i] data = data[i] return ax.contour(x, self.y_c, data.T * factor, **kwargs)
[docs] def regrid(self, other, grid_name, new_name=None): """ Interpolate another grid at the current grid position :param RegularGridDataset other: :param str grid_name: variable name to interpolate :param str new_name: name used to store, if None method will use current ont .. minigallery:: py_eddy_tracker.RegularGridDataset.regrid """ if new_name is None: new_name = grid_name x, y = meshgrid(self.x_c, self.y_c) # interp and reshape v_interp = ( other.interp(grid_name, x.reshape(-1), y.reshape(-1)).reshape(x.shape).T ) v_interp = ma.array(v_interp, mask=isnan(v_interp)) # and add it to self self.add_grid(new_name, v_interp) self.variables_description[new_name] = other.variables_description[grid_name]
# self.variables_description[new_name]['infos'] = False # self.variables_description[new_name]['kwargs']['dimensions'] = ...
[docs] def interp(self, grid_name, lons, lats, method="bilinear"): """ Compute z over lons, lats :param str grid_name: Grid to be interpolated :param lons: new x :param lats: new y :param str method: Could be 'bilinear' or 'nearest' :return: new z """ g = self.grid(grid_name) m = self.get_mask(g) return interp2d_geo( self.x_c, self.y_c, g, m, lons, lats, nearest=method == "nearest" )
[docs] def uv_for_advection(self, u_name=None, v_name=None, time_step=600, h_name=None, backward=False, factor=1): """ Get U,V to be used in degrees with precomputed time step :param None,str,array u_name: U field to advect obs, if h_name is None :param None,str,array v_name: V field to advect obs, if h_name is None :param None,str,array h_name: H field to compute UV to advect obs, if u_name and v_name are None :param int time_step: Number of second for each advection """ if h_name is not None: u_name, v_name = 'u', 'v' if u_name not in self.vars: self.add_uv(h_name) self.vars.pop(h_name, None) u = (self.grid(u_name) if isinstance(u_name, str) else u_name).copy() v = (self.grid(v_name) if isinstance(v_name, str) else v_name).copy() # N seconds / 1 degrees in m coef = time_step * 180 / pi / self.EARTH_RADIUS * factor u *= coef / cos(radians(self.y_c)) v *= coef if backward: u = -u v = -v m = u.mask + v.mask return u, v, m
[docs] def advect(self, x, y, u_name, v_name, nb_step=10, rk4=True, **kw): """ At each call it will update position in place with u & v field It's a dummy advection using only one layer of current :param array x: Longitude of obs to move :param array y: Latitude of obs to move :param str,array u_name: U field to advect obs :param str,array v_name: V field to advect obs :param int nb_step: Number of iterations before releasing data .. minigallery:: py_eddy_tracker.GridDataset.advect """ u, v, m = self.uv_for_advection(u_name, v_name, **kw) m_p = isnan(x) + isnan(y) advect_ = advect_rk4 if rk4 else advect while True: advect_(self.x_c, self.y_c, u, v, m, x, y, m_p, nb_step) yield x, y
[docs] def filament( self, x, y, u_name, v_name, nb_step=10, filament_size=6, rk4=True, **kw ): """ Produce filament with concatenation of advection It's a dummy advection using only one layer of current :param array x: Longitude of obs to move :param array y: Latitude of obs to move :param str,array u_name: U field to advect obs :param str,array v_name: V field to advect obs :param int nb_step: Number of iteration before releasing data :param int filament_size: Number of point by filament :return: x,y for a line .. minigallery:: py_eddy_tracker.GridDataset.filament """ u, v, m = self.uv_for_advection(u_name, v_name, **kw) x, y = x.copy(), y.copy() nb = x.shape[0] filament_size_ = filament_size + 1 f_x = empty(nb * filament_size_, dtype="f4") f_y = empty(nb * filament_size_, dtype="f4") f_x[:] = nan f_y[:] = nan f_x[::filament_size_] = x f_y[::filament_size_] = y mp = isnan(x) + isnan(y) advect_ = advect_rk4 if rk4 else advect while True: # Shift position f_x[1:] = f_x[:-1] f_y[1:] = f_y[:-1] # Remove last position f_x[filament_size::filament_size_] = nan f_y[filament_size::filament_size_] = nan advect_(self.x_c, self.y_c, u, v, m, x, y, mp, nb_step) f_x[::filament_size_] = x f_y[::filament_size_] = y yield f_x, f_y
[docs]@njit(cache=True) def advect_rk4(x_g, y_g, u_g, v_g, m_g, x, y, m, nb_step): # Grid coordinates x_ref, y_ref = x_g[0], y_g[0] x_step, y_step = x_g[1] - x_ref, y_g[1] - y_ref is_circular = abs(x_g[-1] % 360 - (x_g[0] - x_step) % 360) < 1e-5 nb_x_ = x_g.size nb_x = nb_x_ if is_circular else 0 # cache i_cache, j_cache = -1000000, -1000000 masked = False u00, u01, u10, u11 = 0.0, 0.0, 0.0, 0.0 v00, v01, v10, v11 = 0.0, 0.0, 0.0, 0.0 # On each particle for i in prange(x.size): # If particle is not valid => continue if m[i]: continue x_, y_ = x[i], y[i] # Iterate on whole steps for _ in range(nb_step): # k1, slope at origin ii_, jj_, xd, yd = get_grid_indices( x_ref, y_ref, x_step, y_step, x_, y_, nb_x ) if ii_ != i_cache or jj_ != j_cache: i_cache, j_cache = ii_, jj_ if not is_circular and (ii_ < 0 or ii_ > nb_x_): masked = True else: masked, u00, u01, u10, u11, v00, v01, v10, v11 = get_uv_quad( ii_, jj_, u_g, v_g, m_g, nb_x ) # The 3 following could be in cache operation but this one must be tested in any case if masked: x_, y_ = nan, nan m[i] = True break u1, v1 = interp_uv(xd, yd, u00, u01, u10, u11, v00, v01, v10, v11) # k2, slope at middle with first guess position x1, y1 = x_ + u1 * 0.5, y_ + v1 * 0.5 ii_, jj_, xd, yd = get_grid_indices( x_ref, y_ref, x_step, y_step, x1, y1, nb_x ) if ii_ != i_cache or jj_ != j_cache: i_cache, j_cache = ii_, jj_ if not is_circular and (ii_ < 0 or ii_ > nb_x_): masked = True else: masked, u00, u01, u10, u11, v00, v01, v10, v11 = get_uv_quad( ii_, jj_, u_g, v_g, m_g, nb_x ) if masked: x_, y_ = nan, nan m[i] = True break u2, v2 = interp_uv(xd, yd, u00, u01, u10, u11, v00, v01, v10, v11) # k3, slope at middle with updated guess position x2, y2 = x_ + u2 * 0.5, y_ + v2 * 0.5 ii_, jj_, xd, yd = get_grid_indices( x_ref, y_ref, x_step, y_step, x2, y2, nb_x ) if ii_ != i_cache or jj_ != j_cache: i_cache, j_cache = ii_, jj_ if not is_circular and (ii_ < 0 or ii_ > nb_x_): masked = True else: masked, u00, u01, u10, u11, v00, v01, v10, v11 = get_uv_quad( ii_, jj_, u_g, v_g, m_g, nb_x ) if masked: x_, y_ = nan, nan m[i] = True break u3, v3 = interp_uv(xd, yd, u00, u01, u10, u11, v00, v01, v10, v11) # k4, slope at end with updated guess position x3, y3 = x_ + u3, y_ + v3 ii_, jj_, xd, yd = get_grid_indices( x_ref, y_ref, x_step, y_step, x3, y3, nb_x ) if ii_ != i_cache or jj_ != j_cache: i_cache, j_cache = ii_, jj_ if not is_circular and (ii_ < 0 or ii_ > nb_x_): masked = True else: masked, u00, u01, u10, u11, v00, v01, v10, v11 = get_uv_quad( ii_, jj_, u_g, v_g, m_g, nb_x ) if masked: x_, y_ = nan, nan m[i] = True break u4, v4 = interp_uv(xd, yd, u00, u01, u10, u11, v00, v01, v10, v11) # RK4 compute dx = (u1 + 2 * u2 + 2 * u3 + u4) / 6 dy = (v1 + 2 * v2 + 2 * v3 + v4) / 6 # Compute new x,y x_ += dx y_ += dy x[i] = x_ y[i] = y_
[docs]@njit(cache=True) def advect(x_g, y_g, u_g, v_g, m_g, x, y, m, nb_step): # Grid coordinates x_ref, y_ref = x_g[0], y_g[0] x_step, y_step = x_g[1] - x_ref, y_g[1] - y_ref is_circular = abs(x_g[-1] % 360 - (x_g[0] - x_step) % 360) < 1e-5 nb_x_ = x_g.size nb_x = nb_x_ if is_circular else 0 # Indexes which should be never exist i0_old, j0_old = -100000, -100000 masked = False u00, u01, u10, u11 = 0.0, 0.0, 0.0, 0.0 v00, v01, v10, v11 = 0.0, 0.0, 0.0, 0.0 # On each particule for i in prange(x.size): # If particule is not valid => continue if m[i]: continue # Iterate on whole steps for _ in range(nb_step): i0, j0, xd, yd = get_grid_indices( x_ref, y_ref, x_step, y_step, x[i], y[i], nb_x ) # corners are the same, need only a new xd and yd if i0 != i0_old or j0 != j0_old: # Need to be stored only on change i0_old, j0_old = i0, j0 if not is_circular and (i0 < 0 or i0 > nb_x_): masked = True else: masked, u00, u01, u10, u11, v00, v01, v10, v11 = get_uv_quad( i0, j0, u_g, v_g, m_g, nb_x ) if masked: x[i], y[i] = nan, nan m[i] = True break u, v = interp_uv(xd, yd, u00, u01, u10, u11, v00, v01, v10, v11) # Compute new x,y x[i] += u y[i] += v
[docs]@njit(cache=True, fastmath=True) def compute_pixel_path(x0, y0, x1, y1, x_ori, y_ori, x_step, y_step, nb_x): """Give a serie of indexes describing the path between two positions""" # index nx = x0.shape[0] i_x0 = empty(nx, dtype=numba_types.int_) i_x1 = empty(nx, dtype=numba_types.int_) i_y0 = empty(nx, dtype=numba_types.int_) i_y1 = empty(nx, dtype=numba_types.int_) # Because round_ is not accepted with array in numba for i in range(nx): i_x0[i] = round_(((x0[i] - x_ori) % 360) / x_step) i_x1[i] = round_(((x1[i] - x_ori) % 360) / x_step) i_y0[i] = round_((y0[i] - y_ori) / y_step) i_y1[i] = round_((y1[i] - y_ori) / y_step) # Delta index of x d_x = i_x1 - i_x0 d_x = (d_x + nb_x // 2) % nb_x - (nb_x // 2) i_x1 = i_x0 + d_x # Delta index of y d_y = i_y1 - i_y0 # max and abs sum do not work on array? d_max = empty(nx, dtype=numba_types.int32) nb_value = 0 for i in range(nx): d_max[i] = max(abs(d_x[i]), abs(d_y[i])) # Compute number of pixel we go trought nb_value += d_max[i] + 1 # Create an empty array to store value of pixel across the path i_g = empty(nb_value, dtype=numba_types.int32) j_g = empty(nb_value, dtype=numba_types.int32) # Index to determine the position in the global array ii = 0 # Iteration on each path for i, delta in enumerate(d_max): # If the path doesn't cross multiple pixels if delta == 0: i_g[ii : ii + delta + 1] = i_x0[i] j_g[ii : ii + delta + 1] = i_y0[i] # Vertical move elif d_x[i] == 0: sup = -1 if d_y[i] < 0 else 1 i_g[ii : ii + delta + 1] = i_x0[i] j_g[ii : ii + delta + 1] = arange(i_y0[i], i_y1[i] + sup, sup) # Horizontal move elif d_y[i] == 0: sup = -1 if d_x[i] < 0 else 1 i_g[ii : ii + delta + 1] = arange(i_x0[i], i_x1[i] + sup, sup) j_g[ii : ii + delta + 1] = i_y0[i] # In case of multiple directions else: a = (i_x1[i] - i_x0[i]) / float(i_y1[i] - i_y0[i]) if abs(d_x[i]) >= abs(d_y[i]): sup = -1 if d_x[i] < 0 else 1 value = arange(i_x0[i], i_x1[i] + sup, sup) i_g[ii : ii + delta + 1] = value j_g[ii : ii + delta + 1] = (value - i_x0[i]) / a + i_y0[i] else: sup = -1 if d_y[i] < 0 else 1 value = arange(i_y0[i], i_y1[i] + sup, sup) j_g[ii : ii + delta + 1] = value i_g[ii : ii + delta + 1] = (value - i_y0[i]) * a + i_x0[i] ii += delta + 1 i_g %= nb_x return i_g, j_g, d_max
[docs]@njit(cache=True) def has_masked_value(grid, i_x, i_y): for i, j in zip(i_x, i_y): if grid[i, j]: return True return False
[docs]@njit(cache=True) def has_value(grid, i_x, i_y, value, below=False): for i, j in zip(i_x, i_y): if below: if grid[i, j] < value: return True else: if grid[i, j] > value: return True return False
[docs]class GridCollection: def __init__(self): self.datasets = list()
[docs] @classmethod def from_netcdf_cube(cls, filename, x_name, y_name, t_name, heigth=None, **kwargs): new = cls() with Dataset(filename) as h: for i, t in enumerate(h.variables[t_name][:]): d = RegularGridDataset( filename, x_name, y_name, indexs={t_name: i}, **kwargs ) if heigth is not None: d.add_uv(heigth) new.datasets.append((t, d)) return new
[docs] @classmethod def from_netcdf_list( cls, filenames, t, x_name, y_name, indexs=None, heigth=None, **kwargs ): new = cls() for i, _t in enumerate(t): filename = filenames[i] logger.debug(f"load file {i:02d}/{len(t)} t={_t} : {filename}") d = RegularGridDataset(filename, x_name, y_name, indexs=indexs, **kwargs) if heigth is not None: d.add_uv(heigth) new.datasets.append((_t, d)) return new
@property def are_loaded(self): return ~array([d.dimensions is None for _, d in self.datasets]) def __repr__(self): nb_dataset = len(self.datasets) return f"{self.are_loaded.sum()}/{nb_dataset} datasets loaded"
[docs] def shift_files(self, t, filename, heigth=None, **rgd_kwargs): """Add next file to the list and remove the oldest""" self.datasets = self.datasets[1:] d = RegularGridDataset(filename, **rgd_kwargs) if heigth is not None: d.add_uv(heigth) self.datasets.append((t, d)) logger.debug(f"shift and adding i={len(self.datasets)} t={t} : {filename}")
[docs] def interp(self, grid_name, t, lons, lats, method="bilinear"): """ Compute z over lons, lats :param str grid_name: Grid to be interpolated :param float, t: time for interpolation :param lons: new x :param lats: new y :param str method: Could be 'bilinear' or 'nearest' :return: new z """ # FIXME: we do assumption on time step t0 = int(t) t1 = t0 + 1 h0, h1 = self[t0], self[t1] g0, g1 = h0.grid(grid_name), h1.grid(grid_name) m0, m1 = h0.get_mask(g0), h0.get_mask(g1) kw = dict(x=lons, y=lats, nearest=method == "nearest") v0 = interp2d_geo(h0.x_c, h0.y_c, g0, m0, **kw) v1 = interp2d_geo(h1.x_c, h1.y_c, g1, m1, **kw) w = (t - t0) / (t1 - t0) return v1 * w + v0 * (1 - w)
def __iter__(self): for _, d in self.datasets: yield d def __getitem__(self, item): for t, d in self.datasets: if t == item: d.populate() return d raise KeyError(item)
[docs] def filament( self, x, y, u_name, v_name, t_init, nb_step=10, time_step=600, filament_size=6, rk4=True, **kw, ): """ Produce filament with concatenation of advection :param array x: Longitude of obs to move :param array y: Latitude of obs to move :param str,array u_name: U field to advect obs :param str,array v_name: V field to advect obs :param int nb_step: Number of iteration before to release data :param int time_step: Number of second for each advection :param int filament_size: Number of point by filament :return: x,y for a line .. minigallery:: py_eddy_tracker.GridCollection.filament """ x, y = x.copy(), y.copy() nb = x.shape[0] filament_size_ = filament_size + 1 f_x = empty(nb * filament_size_, dtype="f4") f_y = empty(nb * filament_size_, dtype="f4") f_x[:] = nan f_y[:] = nan f_x[::filament_size_] = x f_y[::filament_size_] = y backward = kw.get("backward", False) if backward: generator = self.get_previous_time_step(t_init) dt = -nb_step * time_step t_step = -time_step else: generator = self.get_next_time_step(t_init) dt = nb_step * time_step t_step = time_step t0, d0 = generator.__next__() u0, v0, m0 = d0.uv_for_advection(u_name, v_name, time_step, **kw) t1, d1 = generator.__next__() u1, v1, m1 = d1.uv_for_advection(u_name, v_name, time_step, **kw) t0 = t0 * 86400 t1 = t1 * 86400 t = t_init * 86400 mp = isnan(x) + isnan(y) advect_ = advect_t_rk4 if rk4 else advect_t while True: # Shift position f_x[1:] = f_x[:-1] f_y[1:] = f_y[:-1] # Remove last position f_x[filament_size::filament_size_] = nan f_y[filament_size::filament_size_] = nan if (backward and t <= t1) or (not backward and t >= t1): t0, u0, v0, m0 = t1, u1, v1, m1 t1, d1 = generator.__next__() t1 = t1 * 86400 u1, v1, m1 = d1.uv_for_advection(u_name, v_name, time_step, **kw) w = 1 - (arange(t, t + dt, t_step) - t0) / (t1 - t0) half_w = t_step / 2.0 / (t1 - t0) advect_(d0.x_c, d0.y_c, u0, v0, m0, u1, v1, m1, x, y, mp, w, half_w=half_w) f_x[::filament_size_] = x f_y[::filament_size_] = y t += dt yield t, f_x, f_y
[docs] def reset_grids(self, N=None): if N is not None: m = self.are_loaded if m.sum() > N: for i in where(m)[0]: self.datasets[i][1].clean()
[docs] def advect( self, x, y, t_init, mask_particule=None, nb_step=10, time_step=600, rk4=True, reset_grid=None, **kw, ): """ At each call it will update position in place with u & v field :param array x: Longitude of obs to move :param array y: Latitude of obs to move :param float t_init: time to start advection :param array,None mask_particule: advect only i mask is True :param int nb_step: Number of iteration before to release data :param int time_step: Number of second for each advection :param bool rk4: Use rk4 algorithm instead of finite difference :param int reset_grid: Delete all loaded data in cube if there are more than N grid loaded :return: t,x,y position .. minigallery:: py_eddy_tracker.GridCollection.advect """ self.reset_grids(reset_grid) backward = kw.get("backward", False) if backward: generator = self.get_previous_time_step(t_init) dt = -nb_step * time_step t_step = -time_step else: generator = self.get_next_time_step(t_init) dt = nb_step * time_step t_step = time_step t0, d0 = generator.__next__() u0, v0, m0 = d0.uv_for_advection(time_step=time_step, **kw) t1, d1 = generator.__next__() u1, v1, m1 = d1.uv_for_advection(time_step=time_step, **kw) t0 = t0 * 86400 t1 = t1 * 86400 t = t_init * 86400 advect_ = advect_t_rk4 if rk4 else advect_t if mask_particule is None: mask_particule = isnan(x) + isnan(y) else: mask_particule += isnan(x) + isnan(y) while True: logger.debug(f"advect : t={t/86400}") if (backward and t <= t1) or (not backward and t >= t1): t0, u0, v0, m0 = t1, u1, v1, m1 t1, d1 = generator.__next__() t1 = t1 * 86400 u1, v1, m1 = d1.uv_for_advection(time_step=time_step, **kw) w = 1 - (arange(t, t + dt, t_step) - t0) / (t1 - t0) half_w = t_step / 2.0 / (t1 - t0) advect_( d0.x_c, d0.y_c, u0, v0, m0, u1, v1, m1, x, y, mask_particule, w, half_w=half_w, ) t += dt yield t, x, y
[docs] def get_next_time_step(self, t_init): for i, (t, dataset) in enumerate(self.datasets): if t < t_init: continue dataset.populate() logger.debug(f"i={i}, t={t}, dataset={dataset}") yield t, dataset
[docs] def get_previous_time_step(self, t_init): i = len(self.datasets) for t, dataset in reversed(self.datasets): i -= 1 if t > t_init: continue dataset.populate() logger.debug(f"i={i}, t={t}, dataset={dataset}") yield t, dataset
[docs] def path(self, x0, y0, *args, nb_time=2, **kwargs): """ At each call it will update position in place with u & v field :param array x0: Longitude of obs to move :param array y0: Latitude of obs to move :param int nb_time: Number of iteration for particle :param dict kwargs: look at :py:meth:`GridCollection.advect` :return: t,x,y .. minigallery:: py_eddy_tracker.GridCollection.path """ particles = self.advect(x0.copy(), y0.copy(), *args, **kwargs) t = empty(nb_time + 1, dtype="f8") x = empty((nb_time + 1, x0.size), dtype=x0.dtype) y = empty(x.shape, dtype=y0.dtype) t[0], x[0], y[0] = kwargs.get("t_init"), x0, y0 for i in range(nb_time): t[i + 1], x[i + 1], y[i + 1] = particles.__next__() return t, x, y
[docs]@njit(cache=True) def advect_t(x_g, y_g, u_g0, v_g0, m_g0, u_g1, v_g1, m_g1, x, y, m, weigths, half_w=0): # Grid coordinates x_ref, y_ref = x_g[0], y_g[0] x_step, y_step = x_g[1] - x_ref, y_g[1] - y_ref is_circular = abs(x_g[-1] % 360 - (x_g[0] - x_step) % 360) < 1e-5 nb_x_ = x_g.size nb_x = nb_x_ if is_circular else 0 # Indexes that should never exist i0_old, j0_old = -100000, -100000 m0, m1 = False, False u000, u001, u010, u011 = 0.0, 0.0, 0.0, 0.0 v000, v001, v010, v011 = 0.0, 0.0, 0.0, 0.0 u100, u101, u110, u111 = 0.0, 0.0, 0.0, 0.0 v100, v101, v110, v111 = 0.0, 0.0, 0.0, 0.0 # On each particle for i in prange(x.size): # If particle is not valid => continue if m[i]: continue # Iterate on whole steps for w in weigths: i0, j0, xd, yd = get_grid_indices( x_ref, y_ref, x_step, y_step, x[i], y[i], nb_x ) if i0 != i0_old or j0 != j0_old: # Need to be stored only on change i0_old, j0_old = i0, j0 if not is_circular and (i0 < 0 or i0 > nb_x_): m0, m1 = True, True else: (m0, u000, u001, u010, u011, v000, v001, v010, v011) = get_uv_quad( i0, j0, u_g0, v_g0, m_g0, nb_x ) (m1, u100, u101, u110, u111, v100, v101, v110, v111) = get_uv_quad( i0, j0, u_g1, v_g1, m_g1, nb_x ) if m0 or m1: x[i], y[i] = nan, nan m[i] = True break # Compute distance xd_i, yd_i = 1 - xd, 1 - yd # Compute new x,y dx0 = (u000 * xd_i + u010 * xd) * yd_i + (u001 * xd_i + u011 * xd) * yd dx1 = (u100 * xd_i + u110 * xd) * yd_i + (u101 * xd_i + u111 * xd) * yd dy0 = (v000 * xd_i + v010 * xd) * yd_i + (v001 * xd_i + v011 * xd) * yd dy1 = (v100 * xd_i + v110 * xd) * yd_i + (v101 * xd_i + v111 * xd) * yd x[i] += dx0 * w + dx1 * (1 - w) y[i] += dy0 * w + dy1 * (1 - w)
[docs]@njit(cache=True, fastmath=True) def get_uv_quad(i0, j0, u, v, m, nb_x=0): """ Return u/v for (i0, j0), (i1, j0), (i0, j1), (i1, j1) :param int i0: indexes of longitude :param int j0: indexes of latitude :param array[float] u: current along x axis :param array[float] v: current along y axis :param array[bool] m: flag to know if position is valid :param int nb_x: If different of 0 we check if wrapping is needed :return: if cell is valid 4 u, 4 v :rtype: bool,float,float,float,float,float,float,float,float """ i1, j1 = i0 + 1, j0 + 1 if nb_x != 0: i1 %= nb_x i_max, j_max = m.shape if i1 >= i_max or j1 >= j_max: return True, nan, nan, nan, nan, nan, nan, nan, nan if m[i0, j0] or m[i0, j1] or m[i1, j0] or m[i1, j1]: return True, nan, nan, nan, nan, nan, nan, nan, nan # Extract value for u and v u00, u01, u10, u11 = u[i0, j0], u[i0, j1], u[i1, j0], u[i1, j1] v00, v01, v10, v11 = v[i0, j0], v[i0, j1], v[i1, j0], v[i1, j1] return False, u00, u01, u10, u11, v00, v01, v10, v11
[docs]@njit(cache=True, fastmath=True) def get_grid_indices(x0, y0, x_step, y_step, x, y, nb_x=0): """ Return grid indexes and weight :param float x0: first longitude :param float y0: first latitude :param float x_step: longitude grid step :param float y_step: latitude grid step :param float x: longitude to interpolate :param float y: latitude to interpolate :param int nb_x: If different of 0 we check if wrapping is needed :return: indexes and weight :rtype: int,int,float,float """ i, j = (x - x0) / x_step, (y - y0) / y_step i0, j0 = int(floor(i)), int(floor(j)) xd, yd = i - i0, j - j0 if nb_x != 0: i0 %= nb_x return i0, j0, xd, yd
[docs]@njit(cache=True, fastmath=True) def interp_uv(xd, yd, u00, u01, u10, u11, v00, v01, v10, v11): """ Return u/v interpolated in cell :param float xd: x weight :param float yd: y weight :param float u00: u lower left :param float u01: u upper left :param float u10: u lower right :param float u11: u upper right :param float v00: v lower left :param float v01: v upper left :param float v10: v lower right :param float v11: v upper right """ xd_i, yd_i = 1 - xd, 1 - yd u = (u00 * xd_i + u10 * xd) * yd_i + (u01 * xd_i + u11 * xd) * yd v = (v00 * xd_i + v10 * xd) * yd_i + (v01 * xd_i + v11 * xd) * yd return u, v
[docs]@njit(cache=True, fastmath=True) def advect_t_rk4( x_g, y_g, u_g0, v_g0, m_g0, u_g1, v_g1, m_g1, x, y, m, weigths, half_w ): # Grid coordinates x_ref, y_ref = x_g[0], y_g[0] x_step, y_step = x_g[1] - x_ref, y_g[1] - y_ref is_circular = abs(x_g[-1] % 360 - (x_g[0] - x_step) % 360) < 1e-5 nb_x_ = x_g.size nb_x = nb_x_ if is_circular else 0 # cache i_cache, j_cache = -1000000, -1000000 m0, m1 = False, False u000, u001, u010, u011 = 0.0, 0.0, 0.0, 0.0 v000, v001, v010, v011 = 0.0, 0.0, 0.0, 0.0 u100, u101, u110, u111 = 0.0, 0.0, 0.0, 0.0 v100, v101, v110, v111 = 0.0, 0.0, 0.0, 0.0 # On each particle for i in prange(x.size): # If particle is not valid => continue if m[i]: continue x_, y_ = x[i], y[i] # Iterate on whole steps for w in weigths: # k1, slope at origin ii_, jj_, xd, yd = get_grid_indices( x_ref, y_ref, x_step, y_step, x_, y_, nb_x ) if ii_ != i_cache or jj_ != j_cache: i_cache, j_cache = ii_, jj_ if not is_circular and (ii_ < 0 or ii_ > nb_x_): m0, m1 = True, True else: (m0, u000, u001, u010, u011, v000, v001, v010, v011) = get_uv_quad( ii_, jj_, u_g0, v_g0, m_g0, nb_x ) (m1, u100, u101, u110, u111, v100, v101, v110, v111) = get_uv_quad( ii_, jj_, u_g1, v_g1, m_g1, nb_x ) # The 3 following could be in cache operation but this one must be tested in any case if m0 or m1: x_, y_ = nan, nan m[i] = True break u0_, v0_ = interp_uv(xd, yd, u000, u001, u010, u011, v000, v001, v010, v011) u1_, v1_ = interp_uv(xd, yd, u100, u101, u110, u111, v100, v101, v110, v111) u1, v1 = u0_ * w + u1_ * (1 - w), v0_ * w + v1_ * (1 - w) # k2, slope at middle with first guess position x1, y1 = x_ + u1 * 0.5, y_ + v1 * 0.5 ii_, jj_, xd, yd = get_grid_indices( x_ref, y_ref, x_step, y_step, x1, y1, nb_x ) if ii_ != i_cache or jj_ != j_cache: i_cache, j_cache = ii_, jj_ if not is_circular and (ii_ < 0 or ii_ > nb_x_): m0, m1 = True, True else: (m0, u000, u001, u010, u011, v000, v001, v010, v011) = get_uv_quad( ii_, jj_, u_g0, v_g0, m_g0, nb_x ) (m1, u100, u101, u110, u111, v100, v101, v110, v111) = get_uv_quad( ii_, jj_, u_g1, v_g1, m_g1, nb_x ) if m0 or m1: x_, y_ = nan, nan m[i] = True break u0_, v0_ = interp_uv(xd, yd, u000, u001, u010, u011, v000, v001, v010, v011) u1_, v1_ = interp_uv(xd, yd, u100, u101, u110, u111, v100, v101, v110, v111) w_ = w - half_w u2, v2 = u0_ * w_ + u1_ * (1 - w_), v0_ * w_ + v1_ * (1 - w_) # k3, slope at middle with updated guess position x2, y2 = x_ + u2 * 0.5, y_ + v2 * 0.5 ii_, jj_, xd, yd = get_grid_indices( x_ref, y_ref, x_step, y_step, x2, y2, nb_x ) if ii_ != i_cache or jj_ != j_cache: i_cache, j_cache = ii_, jj_ if not is_circular and (ii_ < 0 or ii_ > nb_x_): m0, m1 = True, True else: (m0, u000, u001, u010, u011, v000, v001, v010, v011) = get_uv_quad( ii_, jj_, u_g0, v_g0, m_g0, nb_x ) (m1, u100, u101, u110, u111, v100, v101, v110, v111) = get_uv_quad( ii_, jj_, u_g1, v_g1, m_g1, nb_x ) if m0 or m1: x_, y_ = nan, nan m[i] = True break u0_, v0_ = interp_uv(xd, yd, u000, u001, u010, u011, v000, v001, v010, v011) u1_, v1_ = interp_uv(xd, yd, u100, u101, u110, u111, v100, v101, v110, v111) u3, v3 = u0_ * w_ + u1_ * (1 - w_), v0_ * w_ + v1_ * (1 - w_) # k4, slope at end with updated guess position x3, y3 = x_ + u3, y_ + v3 ii_, jj_, xd, yd = get_grid_indices( x_ref, y_ref, x_step, y_step, x3, y3, nb_x ) if ii_ != i_cache or jj_ != j_cache: i_cache, j_cache = ii_, jj_ if not is_circular and (ii_ < 0 or ii_ > nb_x_): m0, m1 = True, True else: (m0, u000, u001, u010, u011, v000, v001, v010, v011) = get_uv_quad( ii_, jj_, u_g0, v_g0, m_g0, nb_x ) (m1, u100, u101, u110, u111, v100, v101, v110, v111) = get_uv_quad( ii_, jj_, u_g1, v_g1, m_g1, nb_x ) if m0 or m1: x_, y_ = nan, nan m[i] = True break u0_, v0_ = interp_uv(xd, yd, u000, u001, u010, u011, v000, v001, v010, v011) u1_, v1_ = interp_uv(xd, yd, u100, u101, u110, u111, v100, v101, v110, v111) w_ -= half_w u4, v4 = u0_ * w_ + u1_ * (1 - w_), v0_ * w_ + v1_ * (1 - w_) # RK4 compute dx = (u1 + 2 * u2 + 2 * u3 + u4) / 6 dy = (v1 + 2 * v2 + 2 * v3 + v4) / 6 x_ += dx y_ += dy x[i], y[i] = x_, y_
[docs]@njit( [ "Tuple((f8[:,:],b1[:,:]))(f8[:],f8[:],f8[:,:],b1[:,:],f8,b1,i1)", "Tuple((f4[:,:],b1[:,:]))(f8[:],f8[:],f4[:,:],b1[:,:],f8,b1,i1)", ], cache=True, fastmath=True, ) def compute_stencil(x, y, h, m, earth_radius, vertical=False, stencil_halfwidth=4): """ Compute stencil on RegularGrid :param array x: longitude coordinates :param array y: latitude coordinates :param array h: 2D array to derivate :param array m: mask associated to h to know where are invalid data :param float earth_radius: Earth radius in m :param bool vertical: if True stencil will be vertical (along y) :param int stencil_halfwidth: from 1 to 4 to specify maximal kernel usable stencil_halfwidth: - (1) : - (-1, 1, 0) - (0, -1, 1) - (-1, 0, 1) / 2 - (2) : (1, -8, 0, 8, 1) / 12 - (3) : (-1, 9, -45, 0, 45, -9, 1) / 60 - (4) : (3, -32, 168, -672, 0, 672, -168, 32, 3) / 840 """ if vertical: # If vertical we transpose matrix and inverse coordinates h = h.T m = m.T x, y = y, x shape = h.shape nb_x, nb_y = shape # Out array m_out = empty(shape, dtype=numba_types.bool_) grad = empty(shape, dtype=h.dtype) # Distance step in degrees d_step = x[1] - x[0] if vertical: is_circular = False else: # Test if matrix is circular is_circular = abs(x[-1] % 360 - (x[0] - d_step) % 360) < 1e-5 # Compute caracteristic distance, constant when vertical d_ = 360 / (d_step * pi * 2 * earth_radius) for j in range(nb_y): # Buffer of maximal size of stencil (9) if is_circular: h_3, h_2, h_1, h0 = h[-4, j], h[-3, j], h[-2, j], h[-1, j] m_3, m_2, m_1, m0 = m[-4, j], m[-3, j], m[-2, j], m[-1, j] else: m_3, m_2, m_1, m0 = False, False, False, False h1, h2, h3, h4 = h[0, j], h[1, j], h[2, j], h[3, j] m1, m2, m3, m4 = m[0, j], m[1, j], m[2, j], m[3, j] for i in range(nb_x): # Roll value and only last h_4, h_3, h_2, h_1, h0, h1, h2, h3 = h_3, h_2, h_1, h0, h1, h2, h3, h4 m_4, m_3, m_2, m_1, m0, m1, m2, m3 = m_3, m_2, m_1, m0, m1, m2, m3, m4 i_ = i + 4 if i_ >= nb_x: if is_circular: i_ = i_ % nb_x m4 = m[i_, j] h4 = h[i_, j] else: # When we are out m4 = False else: m4 = m[i_, j] h4 = h[i_, j] # Current value not defined if m0: m_out[i, j] = True continue if not vertical: # For each row we compute distance d_ = 360 / (d_step * cos(deg2rad(y[j])) * pi * 2 * earth_radius) if m1 ^ m_1: # unbalanced kernel if m_1: grad[i, j] = (h1 - h0) * d_ m_out[i, j] = False continue if m1: grad[i, j] = (h0 - h_1) * d_ m_out[i, j] = False continue continue if m2 or m_2 or stencil_halfwidth == 1: grad[i, j] = (h1 - h_1) / 2 * d_ m_out[i, j] = False continue if m3 or m_3 or stencil_halfwidth == 2: grad[i, j] = (h_2 - h2 + 8 * (h1 - h_1)) / 12 * d_ m_out[i, j] = False continue if m4 or m_4 or stencil_halfwidth == 3: grad[i, j] = (h3 - h_3 + 9 * (h_2 - h2) + 45 * (h1 - h_1)) / 60 * d_ m_out[i, j] = False continue # If all values of buffer are available grad[i, j] = ( (3 * (h_4 - h4) + 32 * (h3 - h_3) + 168 * (h_2 - h2) + 672 * (h1 - h_1)) / 840 * d_ ) m_out[i, j] = False if vertical: return grad.T, m_out.T else: return grad, m_out