Source code for py_eddy_tracker.observations.observation

# -*- coding: utf-8 -*-
"""
Base class to manage eddy observation
"""
from datetime import datetime
from io import BufferedReader, BytesIO
import logging
from tarfile import ExFileObject
from tokenize import TokenError

from Polygon import Polygon
from matplotlib.cm import get_cmap
from matplotlib.collections import LineCollection, PolyCollection
from matplotlib.colors import Normalize
from netCDF4 import Dataset
from numba import njit, types as numba_types
from numpy import (
    absolute,
    arange,
    array,
    array_equal,
    ceil,
    concatenate,
    cos,
    datetime64,
    digitize,
    empty,
    errstate,
    floor,
    histogram,
    histogram2d,
    in1d,
    isnan,
    linspace,
    ma,
    nan,
    ndarray,
    ones,
    percentile,
    radians,
    sin,
    unique,
    where,
    zeros,
)
import packaging.version
from pint import UnitRegistry
from pint.errors import UndefinedUnitError
import zarr

from .. import VAR_DESCR, VAR_DESCR_inv, __version__
from ..generic import (
    bbox_indice_regular,
    build_index,
    distance,
    distance_grid,
    flatten_line_matrix,
    hist_numba,
    local_to_coordinates,
    reverse_index,
    window_index,
    wrap_longitude,
)
from ..poly import (
    bbox_intersection,
    close_center,
    convexs,
    create_meshed_particles,
    create_vertice,
    get_pixel_in_regular,
    insidepoly,
    poly_indexs,
    reduce_size,
    vertice_overlap,
)

logger = logging.getLogger("pet")

# keep only major and minor version number
_software_version_reduced = packaging.version.Version(
    "{v.major}.{v.minor}".format(v=packaging.version.parse(__version__))
)
_display_check_warning = True


def _check_versions(version):
    """Check if version of py_eddy_tracker used to create the file is compatible with software version

    if not, warn user with both versions

    :param version: string version of software used to create the file. If None, version was not provided
    :type version: str, None
    """
    if not _display_check_warning:
        return
    file_version = packaging.version.parse(version) if version is not None else None
    if file_version is None or file_version < _software_version_reduced:
        logger.warning(
            "File was created with py-eddy-tracker version '%s' but software version is '%s'",
            file_version,
            _software_version_reduced,
        )


[docs]@njit(cache=True, fastmath=True) def shifted_ellipsoid_degrees_mask2(lon0, lat0, lon1, lat1, minor=1.5, major=1.5): """ Work only if major is an array but faster * 6 """ # c = (major ** 2 - minor ** 2) ** .5 + major c = major major = minor + 0.5 * (major - minor) # r=.5*(c-c0) # a=c0+r # Focal f_right = lon0 f_left = f_right - (c - minor) # Ellipse center x_c = (f_left + f_right) * 0.5 nb_0, nb_1 = lat0.shape[0], lat1.shape[0] m = empty((nb_0, nb_1), dtype=numba_types.bool_) for j in range(nb_0): for i in range(nb_1): dy = absolute(lat1[i] - lat0[j]) if dy > minor: m[j, i] = False continue dx = absolute(lon1[i] - x_c[j]) if dx > 180: dx = absolute((dx + 180) % 360 - 180) if dx > major[j]: m[j, i] = False continue d_normalize = dx**2 / major[j] ** 2 + dy**2 / minor**2 m[j, i] = d_normalize < 1.0 return m
[docs]class Table(object): def __init__(self, values): self.values = values def _repr_html_(self): rows = list() if isinstance(self.values, ndarray): row = "\n".join([f"<td>{v}</td >" for v in self.values.dtype.names]) rows.append(f"<tr>{row}</tr>") for row in self.values: row = "\n".join([f"<td>{v}</td >" for v in row]) rows.append(f"<tr>{row}</tr>") rows = "\n".join(rows) return ( f'<font size="2">' f'<table class="docutils align-default">' f"{rows}" f"</table>" f"</font>" )
[docs]class EddiesObservations(object): """ Class to store eddy observations. """ __slots__ = ( "track_extra_variables", "track_array_variables", "array_variables", "only_variables", "observations", "sign_type", "raw_data", "period_", ) ELEMENTS = [ "lon", "lat", "radius_s", "radius_e", "amplitude", "speed_average", "time", "shape_error_e", "shape_error_s", "speed_area", "effective_area", "nb_contour_selected", "num_point_e", "num_point_s", "height_max_speed_contour", "height_external_contour", "height_inner_contour", ] COLORS = [ "sienna", "red", "darkorange", "gold", "palegreen", "limegreen", "forestgreen", "mediumblue", "dodgerblue", "lightskyblue", "violet", "blueviolet", "darkmagenta", "darkgrey", "dimgrey", "steelblue", ] NB_COLORS = len(COLORS) def __init__( self, size=0, track_extra_variables=None, track_array_variables=0, array_variables=None, only_variables=None, raw_data=False, ): self.period_ = None self.only_variables = only_variables self.raw_data = raw_data self.track_extra_variables = ( track_extra_variables if track_extra_variables is not None else [] ) self.track_array_variables = track_array_variables self.array_variables = array_variables if array_variables is not None else [] for elt in self.elements: if elt not in VAR_DESCR: raise Exception("Unknown element : %s" % elt) self.observations = zeros(size, dtype=self.dtype) self.sign_type = None @property def tracks(self): return self.track def __eq__(self, other): if self.sign_type != other.sign_type: return False if self.dtype != other.dtype: return False return array_equal(self.obs, other.obs)
[docs] def get_color(self, i): """Return colors as a cyclic list""" return self.COLORS[i % self.NB_COLORS]
@property def sign_legend(self): return "Cyclonic" if self.sign_type != 1 else "Anticyclonic" @property def shape(self): return self.observations.shape
[docs] def get_infos(self): infos = dict( bins_lat=(-90, -60, -15, 15, 60, 90), bins_amplitude=array((0, 1, 2, 3, 4, 5, 10, 500)), bins_radius=array((0, 15, 30, 45, 60, 75, 100, 200, 2000)), nb_obs=len(self), ) t0, t1 = self.period infos["t0"], infos["t1"] = t0, t1 infos["period"] = t1 - t0 + 1 return infos
def _repr_html_(self): infos = self.get_infos() return f"""<b>{infos['nb_obs']} observations from {infos['t0']} to {infos['t1']} </b>"""
[docs] def parse_varname(self, name): return self[name] if isinstance(name, str) else name
[docs] def hist(self, varname, x, bins, percent=False, mean=False, nb=False): """Build histograms. :param str,array varname: variable to use to compute stat :param str,array x: variable to use to know in which bins :param array bins: :param bool percent: normalized by sum of all bins :param bool mean: compute mean by bins :param bool nb: only count by bins :return: value by bins :rtype: array """ x = self.parse_varname(x) if nb: v = hist_numba(x, bins=bins)[0] else: v = histogram(x, bins=bins, weights=self.parse_varname(varname))[0] if percent: v = v.astype("f4") / v.sum() * 100 elif mean: v /= hist_numba(x, bins=bins)[0] return v
[docs] @staticmethod def box_display(value): """Return values evenly spaced with few numbers""" return "".join([f"{v_:10.2f}" for v_ in value])
@property def fields(self): return list(self.obs.dtype.names)
[docs] def field_table(self): """ Produce description table of the fields available in this object """ rows = [("Name (Unit)", "Long name", "Scale factor", "Offset")] names = self.fields names.sort() for field in names: infos = VAR_DESCR[field] rows.append( ( f"{infos.get('nc_name', field)} ({infos['nc_attr'].get('units', '')})", infos["nc_attr"].get("long_name", "").capitalize(), infos.get("scale_factor", ""), infos.get("add_offset", ""), ) ) return Table(rows)
def __repr__(self): """ Return general informations on dataset as strings. :return: informations on datasets :rtype: str """ t0, t1 = self.period period = t1 - t0 + 1 bins_lat = (-90, -60, -15, 15, 60, 90) bins_amplitude = array((0, 1, 2, 3, 4, 5, 10, 500)) bins_radius = array((0, 15, 30, 45, 60, 75, 100, 200, 2000)) nb_obs = len(self) return f""" | {nb_obs} observations from {t0} to {t1} ({period} days, ~{nb_obs / period:.0f} obs/day) | Speed area : {self.speed_area.sum() / period / 1e12:.2f} MkmĀ²/day | Effective area : {self.effective_area.sum() / period / 1e12:.2f} MkmĀ²/day ----Distribution in Amplitude: | Amplitude bounds (cm) {self.box_display(bins_amplitude)} | Percent of eddies : { self.box_display(self.hist('time', 'amplitude', bins_amplitude / 100., percent=True, nb=True))} ----Distribution in Radius: | Speed radius (km) {self.box_display(bins_radius)} | Percent of eddies : { self.box_display(self.hist('time', 'radius_s', bins_radius * 1000., percent=True, nb=True))} | Effective radius (km) {self.box_display(bins_radius)} | Percent of eddies : { self.box_display(self.hist('time', 'radius_e', bins_radius * 1000., percent=True, nb=True))} ----Distribution in Latitude Latitude bounds {self.box_display(bins_lat)} Percent of eddies : {self.box_display(self.hist('time', 'lat', bins_lat, percent=True, nb=True))} Percent of speed area : {self.box_display(self.hist('speed_area', 'lat', bins_lat, percent=True))} Percent of effective area : {self.box_display(self.hist('effective_area', 'lat', bins_lat, percent=True))} Mean speed radius (km) : {self.box_display(self.hist('radius_s', 'lat', bins_lat, mean=True) / 1000.)} Mean effective radius (km): {self.box_display(self.hist('radius_e', 'lat', bins_lat, mean=True) / 1000.)} Mean amplitude (cm) : {self.box_display(self.hist('amplitude', 'lat', bins_lat, mean=True) * 100.)}""" def __dir__(self): """Provide method name lookup and completion.""" base = set(dir(type(self))) intern_name = set(self.elements) extern_name = set([VAR_DESCR[k]["nc_name"] for k in intern_name]) # Must be check in init not here if base & intern_name: logger.warning( "Some variable name have a common name with class attrs: %s", base & intern_name, ) if base & extern_name: logger.warning( "Some variable name have a common name with class attrs: %s", base & extern_name, ) return sorted(base.union(intern_name).union(extern_name)) def __getitem__(self, attr: str): if attr in self.elements: return self.obs[attr] elif attr in VAR_DESCR_inv: return self.obs[VAR_DESCR_inv[attr]] elif attr in ("lifetime", "age"): return getattr(self, attr) raise KeyError("%s unknown" % attr) def __getattr__(self, attr): if attr in self.elements: return self.obs[attr] elif attr in VAR_DESCR_inv: return self.obs[VAR_DESCR_inv[attr]] raise AttributeError( "{!r} object has no attribute {!r}".format(type(self).__name__, attr) )
[docs] @classmethod def needed_variable(cls): return None
[docs] @classmethod def obs_dimension(cls, handler): for candidate in ("obs", "Nobs", "observation", "i"): if candidate in handler.dimensions.keys(): return candidate
[docs] def remove_fields(self, *fields): """ Copy with fields listed remove """ nb_obs = len(self) fields = set(fields) only_variables = set(self.fields) - fields track_extra_variables = set(self.track_extra_variables) - fields array_variables = set(self.array_variables) - fields new = self.__class__( size=nb_obs, track_extra_variables=track_extra_variables, track_array_variables=self.track_array_variables, array_variables=array_variables, only_variables=only_variables, raw_data=self.raw_data, ) new.sign_type = self.sign_type for name in new.fields: logger.debug("Copy of field %s ...", name) new.obs[name] = self.obs[name] return new
[docs] def add_fields(self, fields=list(), array_fields=list()): """ Add a new field. """ nb_obs = len(self) new = self.__class__( size=nb_obs, track_extra_variables=list( concatenate((self.track_extra_variables, fields)) ), track_array_variables=self.track_array_variables, array_variables=list(concatenate((self.array_variables, array_fields))), only_variables=list(concatenate((self.fields, fields, array_fields))), raw_data=self.raw_data, ) new.sign_type = self.sign_type for name in self.fields: logger.debug("Copy of field %s ...", name) new.obs[name] = self.obs[name] return new
[docs] def add_rotation_type(self): new = self.add_fields(("type_cyc",)) new.type_cyc[:] = self.sign_type return new
[docs] def circle_contour(self, only_virtual=False, factor=1): """ Set contours as circles from radius and center data. .. minigallery:: py_eddy_tracker.EddiesObservations.circle_contour """ angle = radians(linspace(0, 360, self.track_array_variables)) x_norm, y_norm = cos(angle), sin(angle) radius_s = "contour_lon_s" in self.fields radius_e = "contour_lon_e" in self.fields for i, obs in enumerate(self): if only_virtual and not obs["virtual"]: continue x, y = obs["lon"], obs["lat"] if radius_s: r_s = obs["radius_s"] * factor obs["contour_lon_s"], obs["contour_lat_s"] = local_to_coordinates( x_norm * r_s, y_norm * r_s, x, y ) if radius_e: r_e = obs["radius_e"] * factor obs["contour_lon_e"], obs["contour_lat_e"] = local_to_coordinates( x_norm * r_e, y_norm * r_e, x, y )
@property def dtype(self): """Return dtype to build numpy array.""" dtype = list() for elt in self.elements: data_type = ( VAR_DESCR[elt].get("compute_type", VAR_DESCR[elt].get("nc_type")) if not self.raw_data else VAR_DESCR[elt]["output_type"] ) if elt in self.array_variables: dtype.append((elt, data_type, (self.track_array_variables,))) else: dtype.append((elt, data_type)) return dtype @property def elements(self): """Return all the names of the variables.""" elements = [i for i in self.ELEMENTS] if self.track_array_variables > 0: elements += self.array_variables if len(self.track_extra_variables): elements += self.track_extra_variables if self.only_variables is not None: elements = [i for i in elements if i in self.only_variables] return list(set(elements))
[docs] def coherence(self, other): """Check coherence between two datasets.""" test = self.track_extra_variables == other.track_extra_variables test *= self.track_array_variables == other.track_array_variables test *= self.array_variables == other.array_variables return test
[docs] @classmethod def concatenate(cls, observations): nb_obs = 0 ref_obs = observations[0] for obs in observations: if not ref_obs.coherence(obs): raise Exception("Merge of different type of observations") nb_obs += len(obs) eddies = cls.new_like(ref_obs, nb_obs) i = 0 for obs in observations: nb_obs = len(obs) eddies.obs[i : i + nb_obs] = obs.obs i += nb_obs eddies.sign_type = ref_obs.sign_type return eddies
[docs] def merge(self, other): """Merge two datasets.""" nb_obs_self = len(self) nb_obs = nb_obs_self + len(other) eddies = self.new_like(self, nb_obs) other_keys = other.fields self_keys = self.fields for key in eddies.fields: eddies.obs[key][:nb_obs_self] = self.obs[key][:] if key in other_keys: eddies.obs[key][nb_obs_self:] = other.obs[key][:] if "track" in other_keys and "track" in self_keys: last_track = eddies.track[nb_obs_self - 1] + 1 eddies.track[nb_obs_self:] += last_track eddies.sign_type = self.sign_type return eddies
[docs] def reset(self): self.observations = zeros(0, dtype=self.dtype)
@property def obs(self): """Return observations.""" return self.observations def __len__(self): return len(self.observations) def __iter__(self): for obs in self.obs: yield obs
[docs] def iter_on(self, xname, window=None, bins=None): """ Yield observation group for each bin. :param str,array xname: :param float,None window: if defined we use a moving window with value like half window :param array bins: bounds of each bin :yield array,float,float: index in self, lower bound, upper bound .. minigallery:: py_eddy_tracker.EddiesObservations.iter_on """ x = self.parse_varname(xname) if window is not None: x0 = arange(x.min(), x.max()) if bins is None else array(bins) i_ordered, first_index, last_index = window_index(x, x0, window) for x_, i0, i1 in zip(x0, first_index, last_index): yield i_ordered[i0:i1], x_ - window, x_ + window else: d = x[1:] - x[:-1] if bins is None: bins = arange(x.min(), x.max() + 2) elif not isinstance(bins, ndarray): bins = array(bins) nb_bins = len(bins) - 1 # Not monotonous if (d < 0).any(): # If bins cover a small part of value test, translate, x = iter_mode_reduce(x, bins) # convert value in bins number i = numba_digitize(x, bins) - 1 # Order by bins i_sort = i.argsort() # If in reduced mode we will translate i_sort in full array index i_sort_ = translate[i_sort] if test else i_sort # Bound for each bins in sorting view i0, i1, _ = build_index(i[i_sort]) m = ~(i0 == i1) i0, i1 = i0[m], i1[m] for i0_, i1_ in zip(i0, i1): i_bins = i[i_sort[i0_]] if i_bins == -1 or i_bins == nb_bins: continue yield i_sort_[i0_:i1_], bins[i_bins], bins[i_bins + 1] else: i = numba_digitize(x, bins) - 1 i0, i1, _ = build_index(i) m = ~(i0 == i1) i0, i1 = i0[m], i1[m] for i0_, i1_ in zip(i0, i1): i_bins = i[i0_] yield slice(i0_, i1_), bins[i_bins], bins[i_bins + 1]
[docs] def align_on(self, other, var_name="time", all_ref=False, **kwargs): """ Align the variable indices of two datasets. :param other: other compare with self :param str,tuple var_name: variable name to align or two array, defaults to "time" :param bool all_ref: yield all value of ref, if false only common value, defaults to False :yield array,array,float,float: index in self, index in other, lower bound, upper bound .. minigallery:: py_eddy_tracker.EddiesObservations.align_on """ if isinstance(var_name, str): iter_self = self.iter_on(var_name, **kwargs) iter_other = other.iter_on(var_name, **kwargs) else: iter_self = self.iter_on(var_name[0], **kwargs) iter_other = other.iter_on(var_name[1], **kwargs) indexs_other, b0_other, b1_other = iter_other.__next__() for indexs_self, b0_self, b1_self in iter_self: if b0_self > b0_other: try: while b0_other < b0_self: indexs_other, b0_other, b1_other = iter_other.__next__() except StopIteration: break if b0_self < b0_other: if all_ref: yield indexs_self, empty( 0, dtype=indexs_self.dtype ), b0_self, b1_self continue yield indexs_self, indexs_other, b0_self, b1_self
[docs] def insert_observations(self, other, index): """Insert other obs in self at the given index.""" if not self.coherence(other): raise Exception("Observations with no coherence") insert_size = len(other) self_size = len(self) new_size = self_size + insert_size if self_size == 0: self.observations = other.obs return self elif insert_size == 0: return self if index < 0: index = self_size + index + 1 eddies = self.new_like(self, new_size) eddies.obs[:index] = self.obs[:index] eddies.obs[index : index + insert_size] = other.obs eddies.obs[index + insert_size :] = self.obs[index:] self.observations = eddies.obs return self
[docs] def append(self, other): """Merge.""" return self + other
def __add__(self, other): return self.insert_observations(other, -1)
[docs] def distance(self, other): """Use haversine distance for distance matrix between every self and other eddies.""" return distance_grid(self.lon, self.lat, other.lon, other.lat)
def __copy__(self): eddies = self.new_like(self, len(self)) for k in self.fields: eddies[k][:] = self[k][:] eddies.sign_type = self.sign_type return eddies
[docs] def copy(self): return self.__copy__()
[docs] @classmethod def new_like(cls, eddies, new_size: int): return cls( new_size, track_extra_variables=eddies.track_extra_variables, track_array_variables=eddies.track_array_variables, array_variables=eddies.array_variables, only_variables=eddies.only_variables, raw_data=eddies.raw_data, )
[docs] def index(self, index, reverse=False): """Return obs from self at the index.""" if reverse: index = reverse_index(index, len(self)) size = 1 if hasattr(index, "__iter__"): size = len(index) elif isinstance(index, slice): size = index.stop - index.start eddies = self.new_like(self, size) eddies.obs[:] = self.obs[index] eddies.sign_type = self.sign_type return eddies
[docs] @staticmethod def zarr_dimension(filename): if isinstance(filename, zarr.storage.MutableMapping): h = filename else: h = zarr.open(filename) dims = list() for varname in h: shape = getattr(h, varname).shape if len(shape) > len(dims): dims = shape return dims
[docs] @classmethod def load_file(cls, filename, **kwargs): """ Load the netcdf or the zarr file. Load only latitude and longitude on the first 300 obs : .. code-block:: python kwargs_latlon_300 = dict( include_vars=[ "longitude", "latitude", ], indexs=dict(obs=slice(0, 300)), ) small_dataset = TrackEddiesObservations.load_file( filename, **kwargs_latlon_300 ) For `**kwargs` look at :py:meth:`load_from_zarr` or :py:meth:`load_from_netcdf` """ filename_ = ( filename.filename if isinstance(filename, ExFileObject) else filename ) if isinstance(filename, zarr.storage.MutableMapping): return cls.load_from_zarr(filename, **kwargs) if isinstance(filename, (bytes, str)): end = b".zarr" if isinstance(filename_, bytes) else ".zarr" zarr_file = filename_.endswith(end) else: zarr_file = False logger.info(f"loading file '{filename_}'") if zarr_file: return cls.load_from_zarr(filename, **kwargs) else: return cls.load_from_netcdf(filename, **kwargs)
[docs] @classmethod def load_from_zarr( cls, filename, raw_data=False, remove_vars=None, include_vars=None, indexs=None, buffer_size=5000000, **class_kwargs, ): """Load data from zarr. :param str,store filename: path or store to load data :param bool raw_data: If true load data without scale_factor and add_offset :param None,list(str) remove_vars: List of variable name that will be not loaded :param None,list(str) include_vars: If defined only this variable will be loaded :param None,dict indexs: Indexes to load only a slice of data :param int buffer_size: Size of buffer used to load zarr data :param class_kwargs: argument to set up observations class :return: Obsevations selected :return type: class """ # FIXME if isinstance(filename, zarr.storage.MutableMapping): h_zarr = filename else: if not isinstance(filename, str): filename = filename.astype(str) h_zarr = zarr.open(filename) _check_versions(h_zarr.attrs.get("framework_version", None)) var_list = cls.build_var_list(list(h_zarr.keys()), remove_vars, include_vars) nb_obs = getattr(h_zarr, var_list[0]).shape[0] track_array_variables = h_zarr.attrs["track_array_variables"] if indexs is not None and "obs" in indexs: sl = indexs["obs"] sl = slice(sl.start, min(sl.stop, nb_obs)) if sl.stop is not None: nb_obs = sl.stop if sl.start is not None: nb_obs -= sl.start if sl.step is not None: indexs["obs"] = slice(sl.start, sl.stop) logger.warning("step of slice won't be use") logger.debug("%d observations will be load", nb_obs) kwargs = dict() kwargs["track_array_variables"] = h_zarr.attrs.get( "track_array_variables", track_array_variables ) array_variables = list() for variable in var_list: if len(h_zarr[variable].shape) > 1: var_inv = VAR_DESCR_inv[variable] array_variables.append(var_inv) kwargs["array_variables"] = array_variables track_extra_variables = [] for variable in var_list: var_inv = VAR_DESCR_inv[variable] if var_inv not in cls.ELEMENTS and var_inv not in array_variables: track_extra_variables.append(var_inv) kwargs["track_extra_variables"] = track_extra_variables kwargs["raw_data"] = raw_data kwargs["only_variables"] = ( None if include_vars is None else [VAR_DESCR_inv[i] for i in include_vars] ) kwargs.update(class_kwargs) eddies = cls(size=nb_obs, **kwargs) for i_var, variable in enumerate(var_list): var_inv = VAR_DESCR_inv[variable] logger.debug("%s will be loaded (%d/%d)", variable, i_var, len(var_list)) # find unit factor input_unit = h_zarr[variable].attrs.get("unit", None) if input_unit is None: input_unit = h_zarr[variable].attrs.get("units", None) output_unit = VAR_DESCR[var_inv]["nc_attr"].get("units", None) factor = cls.compare_units(input_unit, output_unit, variable) sl_obs = slice(None) if indexs is None else indexs.get("obs", slice(None)) scale_factor = VAR_DESCR[var_inv].get("scale_factor", None) add_offset = VAR_DESCR[var_inv].get("add_offset", None) cls.copy_data_to_zarr( h_zarr[variable], eddies.obs[var_inv], sl_obs, buffer_size, factor, raw_data, scale_factor, add_offset, ) eddies.sign_type = int(h_zarr.attrs.get("rotation_type", 0)) if eddies.sign_type == 0: logger.debug("File come from another algorithm of identification") eddies.sign_type = -1 return eddies
[docs] @staticmethod def copy_data_to_zarr( handler_zarr, handler_eddies, sl_obs, buffer_size, factor, raw_data, scale_factor, add_offset, ): """ Copy with buffer for zarr. Zarr need to get real value, and size could be huge, so we use a buffer to manage memory :param zarr_dataset handler_zarr: :param array handler_eddies: :param slice zarr_dataset sl_obs: :param int buffer_size: :param float factor: :param bool raw_data: :param None,float scale_factor: :param None,float add_offset: """ i_start, i_stop = sl_obs.start, sl_obs.stop if i_start is None: i_start = 0 if i_stop is None: i_stop = handler_zarr.shape[0] for i in range(i_start, i_stop, buffer_size): sl_in = slice(i, min(i + buffer_size, i_stop)) data = handler_zarr[sl_in] if factor != 1: data *= factor if raw_data: if add_offset is not None: data -= add_offset if scale_factor is not None: data /= scale_factor sl_out = slice(i - i_start, i - i_start + buffer_size) handler_eddies[sl_out] = data
[docs] @classmethod def load_from_netcdf( cls, filename, raw_data=False, remove_vars=None, include_vars=None, indexs=None, **class_kwargs, ): """Load data from netcdf. :param str,ExFileObject filename: path or handler to load data :param bool raw_data: If true load data without apply scale_factor and add_offset :param None,list(str) remove_vars: List of variable name which will be not loaded :param None,list(str) include_vars: If defined only this variable will be loaded :param None,dict indexs: Indexes to load only a slice of data :param class_kwargs: argument to set up observations class :return: Obsevations selected :return type: class """ array_dim = "NbSample" if isinstance(filename, bytes): filename = filename.astype(str) if isinstance(filename, (ExFileObject, BufferedReader, BytesIO)): filename.seek(0) args, kwargs = ("in-mem-file",), dict(memory=filename.read()) else: args, kwargs = (filename,), dict() with Dataset(*args, **kwargs) as h_nc: _check_versions(getattr(h_nc, "framework_version", None)) var_list = cls.build_var_list( list(h_nc.variables.keys()), remove_vars, include_vars ) obs_dim = cls.obs_dimension(h_nc) nb_obs = len(h_nc.dimensions[obs_dim]) if indexs is not None and obs_dim in indexs: sl = indexs[obs_dim] sl = slice(sl.start, min(sl.stop, nb_obs)) if sl.stop is not None: nb_obs = sl.stop if sl.start is not None: nb_obs -= sl.start if sl.step is not None: indexs[obs_dim] = slice(sl.start, sl.stop) logger.warning("step of slice won't be use") logger.debug("%d observations will be load", nb_obs) kwargs = dict() if array_dim in h_nc.dimensions: kwargs["track_array_variables"] = len(h_nc.dimensions[array_dim]) kwargs["array_variables"] = list() for variable in var_list: if array_dim in h_nc.variables[variable].dimensions: var_inv = VAR_DESCR_inv[variable] kwargs["array_variables"].append(var_inv) array_variables = kwargs.get("array_variables", list()) kwargs["track_extra_variables"] = [] for variable in var_list: var_inv = VAR_DESCR_inv[variable] if var_inv not in cls.ELEMENTS and var_inv not in array_variables: kwargs["track_extra_variables"].append(var_inv) kwargs["raw_data"] = raw_data kwargs["only_variables"] = ( None if include_vars is None else [VAR_DESCR_inv[i] for i in include_vars] ) kwargs.update(class_kwargs) eddies = cls(size=nb_obs, **kwargs) for variable in var_list: var_inv = VAR_DESCR_inv[variable] # Patch h_nc.variables[variable].set_auto_maskandscale(not raw_data) logger.debug( "Up load %s variable%s", variable, ", with raw mode" if raw_data else "", ) # find unit factor factor = 1 if not raw_data: input_unit = getattr(h_nc.variables[variable], "unit", None) if input_unit is None: input_unit = getattr(h_nc.variables[variable], "units", None) output_unit = VAR_DESCR[var_inv]["nc_attr"].get("units", None) factor = cls.compare_units(input_unit, output_unit, variable) if indexs is None: indexs = dict() var_sl = [ indexs.get(dim, slice(None)) for dim in h_nc.variables[variable].dimensions ] if factor != 1: eddies.obs[var_inv] = h_nc.variables[variable][var_sl] * factor else: eddies.obs[var_inv] = h_nc.variables[variable][var_sl] for variable in var_list: var_inv = VAR_DESCR_inv[variable] if var_inv == "type_cyc": eddies.sign_type = h_nc.variables[variable][0] if eddies.sign_type is None: title = getattr(h_nc, "title", None) if title is None: eddies.sign_type = getattr(h_nc, "rotation_type", 0) else: eddies.sign_type = -1 if title == "Cyclonic" else 1 if eddies.sign_type == 0: logger.debug("File come from another algorithm of identification") eddies.sign_type = -1 return eddies
[docs] @staticmethod def build_var_list(var_list, remove_vars, include_vars): if include_vars is not None: var_list = [i for i in var_list if i in include_vars] elif remove_vars is not None: var_list = [i for i in var_list if i not in remove_vars] return var_list
[docs] @staticmethod def compare_units(input_unit, output_unit, name): if output_unit is None or input_unit is None or output_unit == input_unit: return 1 units = UnitRegistry() try: input_unit = units.parse_expression(input_unit, case_sensitive=False) output_unit = units.parse_expression(output_unit, case_sensitive=False) except UndefinedUnitError: input_unit = None except TokenError: input_unit = None if input_unit is not None: factor = input_unit.to(output_unit).to_tuple()[0] # If we are able to find a conversion if factor != 1: logger.info( "%s will be multiply by %f to take care of units(%s->%s)", name, factor, input_unit, output_unit, ) return factor else: return 1
[docs] @classmethod def from_array(cls, arrays, **kwargs): nb = arrays["time"].size eddies = cls(size=nb, **kwargs) for k, v in arrays.items(): eddies.obs[k] = v return eddies
[docs] @classmethod def from_zarr(cls, handler): nb_obs = len(handler.dimensions[cls.obs_dimension(handler)]) kwargs = dict() if hasattr(handler, "track_array_variables"): kwargs["track_array_variables"] = handler.track_array_variables kwargs["array_variables"] = handler.array_variables.split(",") if len(handler.track_extra_variables) > 1: kwargs["track_extra_variables"] = handler.track_extra_variables.split(",") eddies = cls(size=nb_obs, **kwargs) for variable in handler.variables: # Patch if variable == "time": eddies.obs[variable] = handler.variables[variable][:] else: eddies.obs[VAR_DESCR_inv[variable]] = handler.variables[variable][:] eddies.sign_type = handler.rotation_type return eddies
[docs] @classmethod def from_netcdf(cls, handler): nb_obs = len(handler.dimensions[cls.obs_dimension(handler)]) kwargs = dict() if hasattr(handler, "track_array_variables"): kwargs["track_array_variables"] = handler.track_array_variables kwargs["array_variables"] = handler.array_variables.split(",") if len(handler.track_extra_variables) > 1: kwargs["track_extra_variables"] = handler.track_extra_variables.split(",") eddies = cls(size=nb_obs, **kwargs) for variable in handler.variables: # Patch if variable == "time": eddies.obs[variable] = handler.variables[variable][:] else: eddies.obs[VAR_DESCR_inv[variable]] = handler.variables[variable][:] eddies.sign_type = handler.rotation_type return eddies
[docs] def propagate( self, previous_obs, current_obs, obs_to_extend, dead_track, nb_next, model ): """ Fill virtual obs (C). :param previous_obs: previous obs from current (A) :param current_obs: previous obs from virtual (B) :param obs_to_extend: :param dead_track: :param nb_next: :param model: :return: New position C = B + AB """ next_obs = VirtualEddiesObservations( size=nb_next, track_extra_variables=model.track_extra_variables, track_array_variables=model.track_array_variables, array_variables=model.array_variables, ) next_obs.sign_type = self.sign_type nb_dead = len(previous_obs) nb_virtual_extend = nb_next - nb_dead for key in model.elements: if key in ["lon", "lat", "time"] or "contour_" in key: continue next_obs[key][:nb_dead] = current_obs[key] next_obs["dlon"][:nb_dead] = current_obs["lon"] - previous_obs["lon"] next_obs["dlat"][:nb_dead] = current_obs["lat"] - previous_obs["lat"] next_obs["lon"][:nb_dead] = current_obs["lon"] + next_obs["dlon"][:nb_dead] next_obs["lat"][:nb_dead] = current_obs["lat"] + next_obs["dlat"][:nb_dead] # Id which are extended next_obs["track"][:nb_dead] = dead_track # Add previous virtual if nb_virtual_extend > 0: for key in next_obs.elements: if ( key in ["lon", "lat", "time", "track", "segment_size"] or "contour_" in key ): continue next_obs[key][nb_dead:] = obs_to_extend[key] next_obs["lon"][nb_dead:] = obs_to_extend["lon"] + obs_to_extend["dlon"] next_obs["lat"][nb_dead:] = obs_to_extend["lat"] + obs_to_extend["dlat"] next_obs["track"][nb_dead:] = obs_to_extend["track"] next_obs["segment_size"][nb_dead:] = obs_to_extend["segment_size"] # Count next_obs["segment_size"][:] += 1 return next_obs
[docs] @staticmethod def intern(flag, public_label=False): if flag: labels = "contour_lon_s", "contour_lat_s" else: labels = "contour_lon_e", "contour_lat_e" if public_label: labels = [VAR_DESCR[label]["nc_name"] for label in labels] return labels
[docs] def match( self, other, i_self=None, i_other=None, method="overlap", intern=False, cmin=0, **kwargs, ): """Return index and score computed on the effective contour. :param EddiesObservations other: Observations to compare :param array[bool,int],None i_self: Index or mask to subset observations, it could avoid to build a specific dataset. :param array[bool,int],None i_other: Index or mask to subset observations, it could avoid to build a specific dataset. :param str method: - "overlap": the score is computed with contours; - "circle": circles are computed and used for score (TODO) :param bool intern: if True, speed contour is used (default = effective contour) :param float cmin: 0 < cmin < 1, return only couples with score >= cmin :param dict kwargs: look at :py:meth:`vertice_overlap` :return: return the indices of the eddies in self coupled with eddies in other and their associated score :rtype: (array(int), array(int), array(float)) .. minigallery:: py_eddy_tracker.EddiesObservations.match """ x_name, y_name = self.intern(intern) if i_self is None: i_self = slice(None) if i_other is None: i_other = slice(None) if method == "overlap": x0, y0 = self[x_name][i_self], self[y_name][i_self] x1, y1 = other[x_name][i_other], other[y_name][i_other] i, j = bbox_intersection(x0, y0, x1, y1) c = vertice_overlap(x0[i], y0[i], x1[j], y1[j], **kwargs) elif method == "close_center": x0, y0 = self.longitude[i_self], self.latitude[i_self] x1, y1 = other.longitude[i_other], other.latitude[i_other] i, j, c = close_center(x0, y0, x1, y1, **kwargs) m = c >= cmin # ajout >= pour garder la cmin dans la sƩlection return i[m], j[m], c[m]
[docs] @staticmethod def re_reference_index(index, ref): """ Shift index with ref :param array,int index: local index to re ref :param slice,array ref: reference could be a slice in this case we juste add start to index or could be indices and in this case we need to translate """ if isinstance(ref, slice): return index + ref.start else: return ref[index]
[docs] @classmethod def cost_function_common_area(cls, xy_in, xy_out, distance, intern=False): """How does it work on x bound ? :param xy_in: :param xy_out: :param distance: :param bool intern: """ x_name, y_name = cls.intern(intern) nb_records = xy_in.shape[0] x_in, y_in = xy_in[x_name], xy_in[y_name] x_out, y_out = xy_out[x_name], xy_out[y_name] x_in_min, y_in_min = x_in.min(axis=1), y_in.min(axis=1) x_in_max, y_in_max = x_in.max(axis=1), y_in.max(axis=1) x_out_min, y_out_min = x_out.min(axis=1), y_out.min(axis=1) x_out_max, y_out_max = x_out.max(axis=1), y_out.max(axis=1) costs = ma.empty(nb_records, dtype="f4") for i in range(nb_records): if x_in_max[i] < x_out_min[i] or x_in_min[i] > x_out_max[i]: costs[i] = 1 continue if y_in_max[i] < y_out_min[i] or y_in_min[i] > y_out_max[i]: costs[i] = 1 continue x_in_, x_out_ = x_in[i], x_out[i] p_in = Polygon(create_vertice(x_in_, y_in[i])) if abs(x_in_[0] - x_out_[0]) > 180: x_out_ = (x_out[i] - (x_in_[0] - 180)) % 360 + x_in_[0] - 180 p_out = Polygon(create_vertice(x_out_, y_out[i])) costs[i] = 1 - (p_in & p_out).area() / min(p_in.area(), p_out.area()) costs.mask = costs == 1 return costs
[docs] def mask_function(self, other, distance): return distance < 125
[docs] @staticmethod def cost_function(records_in, records_out, distance): r"""Return the cost function between two obs. .. math:: cost = \sqrt{({Amp_{_{in}} - Amp_{_{out}} \over Amp_{_{in}}}) ^2 + ({Rspeed_{_{in}} - Rspeed_{_{out}} \over Rspeed_{_{in}}}) ^2 + ({distance \over 125}) ^2 } :param records_in: starting observations :param records_out: observations to associate :param distance: computed between in and out """ cost = ( (records_in["amplitude"] - records_out["amplitude"]) / records_in["amplitude"] ) ** 2 cost += ( (records_in["radius_s"] - records_out["radius_s"]) / records_in["radius_s"] ) ** 2 cost += (distance / 125) ** 2 cost **= 0.5 # Mask value superior at 60 % of variation # return ma.array(cost, mask=m) return cost
[docs] def shifted_ellipsoid_degrees_mask(self, other, minor=1.5, major=1.5): return shifted_ellipsoid_degrees_mask2( self.lon, self.lat, other.lon, other.lat, minor, major )
[docs] def fixed_ellipsoid_mask( self, other, minor=50, major=100, only_east=False, shifted_ellipse=False ): dist = self.distance(other).T accepted = dist < minor rejected = dist > major rejected += isnan(dist) # All obs we are not in rejected and accepted, there are between # two circle needs_investigation = -(rejected + accepted) index_other, index_self = where(needs_investigation) nb_case = index_self.shape[0] if nb_case != 0: if isinstance(major, ndarray): major = major[index_self] if isinstance(minor, ndarray): minor = minor[index_self] # focal distance f_degree = ((major**2 - minor**2) ** 0.5) / ( 111.2 * cos(radians(self.lat[index_self])) ) lon_self = self.lon[index_self] if shifted_ellipse: x_center_ellipse = lon_self - (major - minor) / 2 else: x_center_ellipse = lon_self lon_left_f = x_center_ellipse - f_degree lon_right_f = x_center_ellipse + f_degree dist_left_f = distance( lon_left_f, self.lat[index_self], other.lon[index_other], other.lat[index_other], ) dist_right_f = distance( lon_right_f, self.lat[index_self], other.lon[index_other], other.lat[index_other], ) dist_2a = (dist_left_f + dist_right_f) / 1000 accepted[index_other, index_self] = dist_2a < (2 * major) if only_east: d_lon = (other.lon[index_other] - lon_self + 180) % 360 - 180 mask = d_lon < 0 accepted[index_other[mask], index_self[mask]] = False return accepted.T
[docs] @staticmethod def basic_formula_ellipse_major_axis( lats, cmin=1.5, cmax=10.0, c0=1.5, lat1=13.5, lat2=5.0, degrees=False ): """Give major axis in km with a given latitude""" # Straight line between lat1 and lat2: # y = a * x + b a = (cmin - cmax) / (lat1 - lat2) b = a * -lat1 + cmin abs_lats = abs(lats) major_axis = ones(lats.shape, dtype="f8") * cmin major_axis[abs_lats < lat2] = cmax m = abs_lats > lat1 m += abs_lats < lat2 major_axis[~m] = a * abs_lats[~m] + b if not degrees: major_axis *= 111.2 return major_axis
[docs] @staticmethod def solve_conflict(cost): pass
[docs] @staticmethod def solve_simultaneous(cost): """Deduce link from cost matrix. :param array(float) cost: Cost for each available link :return: return a boolean mask array, True for each valid couple :rtype: array(bool) """ mask = ~cost.mask if mask.size == 0: return mask # Count number of links by self obs and other obs self_links, other_links = sum_row_column(mask) max_links = max(self_links.max(), other_links.max()) if max_links > 5: logger.warning("One observation have %d links", max_links) # If some obs have multiple links, we keep only one link by eddy eddies_separation = 1 < self_links eddies_merge = 1 < other_links test = eddies_separation.any() or eddies_merge.any() if test: # We extract matrix that contains conflict obs_linking_to_self = mask[eddies_separation].any(axis=0) obs_linking_to_other = mask[:, eddies_merge].any(axis=1) i_self_keep = where(obs_linking_to_other + eddies_separation)[0] i_other_keep = where(obs_linking_to_self + eddies_merge)[0] # Cost to resolve conflict cost_reduce = cost[i_self_keep][:, i_other_keep] shape = cost_reduce.shape nb_conflict = (~cost_reduce.mask).sum() logger.debug("Shape conflict matrix : %s, %d conflicts", shape, nb_conflict) if nb_conflict >= (shape[0] + shape[1]): logger.warning( "High number of conflict : %d (nb_conflict)", shape[0] + shape[1] ) links_resolve = 0 # Arbitrary value max_iteration = max(cost_reduce.shape) security_increment = 0 while False in cost_reduce.mask: if security_increment > max_iteration: # Maybe check if the size decreases if not rise an exception # x_i, y_i = where(-cost_reduce.mask) raise Exception("To many iteration: %d" % security_increment) security_increment += 1 i_min_value = cost_reduce.argmin() i, j = floor(i_min_value / shape[1]).astype(int), i_min_value % shape[1] # Set to False all links mask[i_self_keep[i]] = False mask[:, i_other_keep[j]] = False cost_reduce.mask[i] = True cost_reduce.mask[:, j] = True # we active only this link mask[i_self_keep[i], i_other_keep[j]] = True links_resolve += 1 logger.debug("%d links resolve", links_resolve) return mask
[docs] @staticmethod def solve_first(cost, multiple_link=False): mask = ~cost.mask # Count number of links by self obs and other obs self_links = mask.sum(axis=1) other_links = mask.sum(axis=0) max_links = max(self_links.max(), other_links.max()) if max_links > 5: logger.warning("One observation have %d links", max_links) # If some obs have multiple links, we keep only one link by eddy eddies_separation = 1 < self_links eddies_merge = 1 < other_links test = eddies_separation.any() or eddies_merge.any() if test: # We extract matrix that contains conflict obs_linking_to_self = mask[eddies_separation].any(axis=0) obs_linking_to_other = mask[:, eddies_merge].any(axis=1) i_self_keep = where(obs_linking_to_other + eddies_separation)[0] i_other_keep = where(obs_linking_to_self + eddies_merge)[0] # Cost to resolve conflict cost_reduce = cost[i_self_keep][:, i_other_keep] shape = cost_reduce.shape nb_conflict = (~cost_reduce.mask).sum() logger.debug("Shape conflict matrix : %s, %d conflicts", shape, nb_conflict) if nb_conflict >= (shape[0] + shape[1]): logger.warning( "High number of conflict : %d (nb_conflict)", shape[0] + shape[1] ) links_resolve = 0 for i in range(shape[0]): j = cost_reduce[i].argmin() if hasattr(cost_reduce[i, j], "mask"): continue links_resolve += 1 # Set all links to False mask[i_self_keep[i]] = False cost_reduce.mask[i] = True if not multiple_link: mask[:, i_other_keep[j]] = False cost_reduce.mask[:, j] = True # We activate this link only mask[i_self_keep[i], i_other_keep[j]] = True logger.debug("%d links resolve", links_resolve) return mask
[docs] def solve_function(self, cost_matrix): return numba_where(self.solve_simultaneous(cost_matrix))
[docs] def tracking(self, other): """Track obs between self and other""" dist = self.distance(other) mask_accept_dist = self.mask_function(other, dist) indexs_closest = where(mask_accept_dist) cost_values = self.cost_function( self.obs[indexs_closest[0]], other.obs[indexs_closest[1]], dist[mask_accept_dist], ) cost_mat = ma.empty(mask_accept_dist.shape, dtype="f4") cost_mat.mask = ~mask_accept_dist cost_mat[mask_accept_dist] = cost_values i_self, i_other = self.solve_function(cost_mat) i_self, i_other = self.post_process_link(other, i_self, i_other) logger.debug("%d matched with previous", i_self.shape[0]) return i_self, i_other, cost_mat[i_self, i_other]
[docs] def to_zarr(self, handler, **kwargs): handler.attrs["track_extra_variables"] = ",".join(self.track_extra_variables) if self.track_array_variables != 0: handler.attrs["track_array_variables"] = self.track_array_variables handler.attrs["array_variables"] = ",".join(self.array_variables) # Iter on variables to create: for ori_name in self.fields: # Patch for a transition name = ori_name # logger.debug("Create Variable %s", VAR_DESCR[name]["nc_name"]) self.create_variable_zarr( handler, dict( name=VAR_DESCR[name]["nc_name"], store_dtype=VAR_DESCR[name]["output_type"], dtype=VAR_DESCR[name]["nc_type"], dimensions=VAR_DESCR[name]["nc_dims"], ), VAR_DESCR[name]["nc_attr"], self.obs[ori_name], scale_factor=VAR_DESCR[name].get("scale_factor", None), add_offset=VAR_DESCR[name].get("add_offset", None), filters=VAR_DESCR[name].get("filters", None), **kwargs, ) self.set_global_attr_zarr(handler)
[docs] @staticmethod def netcdf_create_dimensions(handler, dim, nb): if dim not in handler.dimensions: handler.createDimension(dim, nb) else: old_nb = len(handler.dimensions[dim]) if nb != old_nb: raise Exception( f"{dim} dimensions previously set to a different size {old_nb} (current value : {nb})" )
[docs] def to_netcdf(self, handler, **kwargs): eddy_size = len(self) logger.debug('Create Dimensions "obs" : %d', eddy_size) self.netcdf_create_dimensions(handler, "obs", eddy_size) handler.track_extra_variables = ",".join(self.track_extra_variables) if self.track_array_variables != 0: self.netcdf_create_dimensions( handler, "NbSample", self.track_array_variables ) handler.track_array_variables = self.track_array_variables handler.array_variables = ",".join(self.array_variables) # Iter on variables to create: fields_ = array([VAR_DESCR[field]["nc_name"] for field in self.fields]) i = fields_.argsort() for ori_name in array(self.fields)[i]: # Patch for a transition name = ori_name # logger.debug("Create Variable %s", VAR_DESCR[name]["nc_name"]) self.create_variable( handler, dict( varname=VAR_DESCR[name]["nc_name"], datatype=VAR_DESCR[name]["output_type"], dimensions=VAR_DESCR[name]["nc_dims"], ), VAR_DESCR[name]["nc_attr"], self.obs[ori_name], scale_factor=VAR_DESCR[name].get("scale_factor", None), add_offset=VAR_DESCR[name].get("add_offset", None), **kwargs, ) self.set_global_attr_netcdf(handler)
[docs] def create_variable( self, handler_nc, kwargs_variable, attr_variable, data, scale_factor=None, add_offset=None, **kwargs, ): dims = kwargs_variable.get("dimensions", None) # Manage chunk in 2d case if dims is not None and len(dims) > 1: chunk = [1] cum = 1 for dim in dims[1:]: nb = len(handler_nc.dimensions[dim]) chunk.append(nb) cum *= nb chunk[0] = min(int(400000 / cum), len(handler_nc.dimensions[dims[0]])) kwargs_variable["chunksizes"] = chunk kwargs_variable["zlib"] = True kwargs_variable["complevel"] = 1 kwargs_variable.update(kwargs) var = handler_nc.createVariable(**kwargs_variable) attrs = list(attr_variable.keys()) attrs.sort() for attr in attrs: attr_value = attr_variable[attr] var.setncattr(attr, attr_value) if self.raw_data: var[:] = data if scale_factor is not None: var.scale_factor = scale_factor if add_offset is not None: var.add_offset = add_offset else: var.add_offset = 0 if not self.raw_data: var[:] = data try: if len(var.dimensions) == 1 or var.size < 1e7: var.setncattr("min", var[:].min()) var.setncattr("max", var[:].max()) except ValueError: logger.warning("Data is empty")
[docs] @staticmethod def get_filters_zarr(name): """Get filters to store in zarr for known variable :param str name: private variable name :return list: filters list """ content = VAR_DESCR.get(name) filters = list() store_dtype = content["output_type"] scale_factor, add_offset = content.get("scale_factor", None), content.get( "add_offset", None ) if scale_factor is not None or add_offset is not None: if add_offset is None: add_offset = 0 filters.append( zarr.FixedScaleOffset( offset=add_offset, scale=1 / scale_factor, dtype=content["nc_type"], astype=store_dtype, ) ) filters.extend(content.get("filters", [])) return filters
[docs] def create_variable_zarr( self, handler_zarr, kwargs_variable, attr_variable, data, scale_factor=None, add_offset=None, filters=None, compressor=None, chunck_size=2500000, ): kwargs_variable["shape"] = data.shape kwargs_variable["compressor"] = ( zarr.Blosc(cname="zstd", clevel=2) if compressor is None else compressor ) kwargs_variable["filters"] = list() store_dtype = kwargs_variable.pop("store_dtype", None) if scale_factor is not None or add_offset is not None: if add_offset is None: add_offset = 0 kwargs_variable["filters"].append( zarr.FixedScaleOffset( offset=add_offset, scale=1 / scale_factor, dtype=kwargs_variable["dtype"], astype=store_dtype, ) ) if filters is not None: kwargs_variable["filters"].extend(filters) dims = kwargs_variable.get("dimensions", None) # Manage chunk in 2d case if len(dims) == 1: kwargs_variable["chunks"] = (chunck_size,) if len(dims) == 2: second_dim = data.shape[1] kwargs_variable["chunks"] = (chunck_size // second_dim, second_dim) kwargs_variable.pop("dimensions") v = handler_zarr.create_dataset(**kwargs_variable) attrs = list(attr_variable.keys()) attrs.sort() for attr in attrs: attr_value = attr_variable[attr] v.attrs[attr] = str(attr_value) if self.raw_data: if scale_factor is not None: s_bloc = kwargs_variable["chunks"][0] nb_bloc = int(ceil(data.shape[0] / s_bloc)) for i in range(nb_bloc): sl = slice(i * s_bloc, (i + 1) * s_bloc) v[sl] = data[sl] * scale_factor + add_offset else: v[:] = data if not self.raw_data: v[:] = data try: if v.size < 1e8: v.attrs["min"] = str(v[:].min()) v.attrs["max"] = str(v[:].max()) except ValueError: logger.warning("Data is empty")
[docs] def write_file( self, path="./", filename="%(path)s/%(sign_type)s.nc", zarr_flag=False, **kwargs ): """Write a netcdf or zarr with eddy obs. Zarr is usefull for large dataset > 10M observations :param str path: set path variable :param str filename: model to store file :param bool zarr_flag: If True, method will use zarr format instead of netcdf :param dict kwargs: look at :py:meth:`to_zarr` or :py:meth:`to_netcdf` """ filename = filename % dict( path=path, sign_type=self.sign_legend, prod_time=datetime.now().strftime("%Y%m%d"), ) if zarr_flag: filename = filename.replace(".nc", ".zarr") if filename.endswith(".zarr"): zarr_flag = True logger.info("Store in %s (%d observations)", filename, len(self)) if zarr_flag: handler = zarr.open(filename, "w") self.to_zarr(handler, **kwargs) else: nc_format = kwargs.pop("format", "NETCDF4") with Dataset(filename, "w", format=nc_format) as handler: self.to_netcdf(handler, **kwargs)
@property def global_attr(self): return dict( Metadata_Conventions="Unidata Dataset Discovery v1.0", comment="Surface product; mesoscale eddies", framework_used="https://github.com/AntSimi/py-eddy-tracker", framework_version=__version__, standard_name_vocabulary="NetCDF Climate and Forecast (CF) Metadata Convention Standard Name Table", rotation_type=self.sign_type, )
[docs] def set_global_attr_zarr(self, h_zarr): for key, item in self.global_attr.items(): h_zarr.attrs[key] = str(item)
[docs] def set_global_attr_netcdf(self, h_nc): for key, item in self.global_attr.items(): h_nc.setncattr(key, item)
[docs] def mask_from_polygons(self, polygons): """ Return mask for all observations in one of polygons list :param list((array,array)) polygons: list of x/y array which be used to identify observations """ x, y = polygons[0] m = insidepoly( self.longitude, self.latitude, x.reshape((1, -1)), y.reshape((1, -1)) ) for x, y in polygons[1:]: m_ = ~m m[m_] = insidepoly( self.longitude[m_], self.latitude[m_], x.reshape((1, -1)), y.reshape((1, -1)), ) return m
[docs] def extract_with_area(self, area, **kwargs): """ Extract geographically with a bounding box. :param dict area: 4 coordinates in a dictionary to specify bounding box (lower left corner and upper right corner) :param dict kwargs: look at :py:meth:`extract_with_mask` :return: Return all eddy trajetories in bounds :rtype: EddiesObservations .. code-block:: python area = dict(llcrnrlon=x0, llcrnrlat=y0, urcrnrlon=x1, urcrnrlat=y1) .. minigallery:: py_eddy_tracker.EddiesObservations.extract_with_area """ lat0 = area.get("llcrnrlat", -90) lat1 = area.get("urcrnrlat", 90) mask = (self.latitude > lat0) * (self.latitude < lat1) lon0 = area["llcrnrlon"] lon = (self.longitude - lon0) % 360 + lon0 mask *= (lon > lon0) * (lon < area["urcrnrlon"]) return self.extract_with_mask(mask, **kwargs)
@property def time_datetime64(self): dt = (datetime64("1970-01-01") - datetime64("1950-01-01")).astype("i8") return (self.time - dt).astype("datetime64[D]")
[docs] def time_sub_sample(self, t0, time_step): """ Time sub sampling :param int,float t0: reference time that will be keep :param int,float time_step: keep every observation spaced by time_step """ mask = (self.time - t0) % time_step == 0 return self.extract_with_mask(mask)
[docs] def extract_with_mask(self, mask): """ Extract a subset of observations. :param array(bool) mask: mask to select observations :return: same object with selected observations :rtype: self """ nb_obs = mask.sum() new = self.__class__.new_like(self, nb_obs) new.sign_type = self.sign_type if nb_obs == 0: logger.warning("Empty dataset will be created") else: for field in self.fields: logger.debug("Copy of field %s ...", field) new.obs[field] = self.obs[field][mask] return new
[docs] def scatter(self, ax, name=None, ref=None, factor=1, **kwargs): """ Scatter data. :param matplotlib.axes.Axes ax: matplotlib axe used to draw :param str,array,None name: variable used to fill the contour, if None all elements have the same color :param float,None ref: if defined, all coordinates are wrapped with ref as western boundary :param float factor: multiply value by :param dict kwargs: look at :py:meth:`matplotlib.axes.Axes.scatter` :return: scatter mappable .. minigallery:: py_eddy_tracker.EddiesObservations.scatter """ x = self.longitude if ref is not None: x = (x - ref) % 360 + ref kwargs = kwargs.copy() if name is not None and "c" not in kwargs: v = self.parse_varname(name) kwargs["c"] = v * factor return ax.scatter(x, self.latitude, **kwargs)
[docs] def filled( self, ax, varname=None, ref=None, intern=False, cmap="magma_r", lut=10, vmin=None, vmax=None, factor=1, **kwargs, ): """ :param matplotlib.axes.Axes ax: matplotlib axe used to draw :param str,array,None varname: variable used to fill the contours, or an array of same size than obs :param float,None ref: if defined, all coordinates are wrapped with ref as western boundary :param bool intern: if True draw speed contours instead of effective contours :param str cmap: matplotlib colormap name :param int,None lut: Number of colors in the colormap :param float,None vmin: Min value of the colorbar :param float,None vmax: Max value of the colorbar :param float factor: multiply value by :return: Collection drawed :rtype: matplotlib.collections.PolyCollection .. minigallery:: py_eddy_tracker.EddiesObservations.filled """ x_name, y_name = self.intern(intern) x, y = self[x_name], self[y_name] if ref is not None: # TODO : maybe buggy with global display shape_out = x.shape x, y = wrap_longitude(x.reshape(-1), y.reshape(-1), ref) x, y = x.reshape(shape_out), y.reshape(shape_out) verts = list() for x_, y_ in zip(x, y): verts.append(create_vertice(x_, y_)) if "facecolors" not in kwargs: kwargs = kwargs.copy() cmap = get_cmap(cmap, lut) v = self.parse_varname(varname) * factor if vmin is None: vmin = v.min() if vmax is None: vmax = v.max() v = (v - vmin) / (vmax - vmin) colors = [cmap(v_) for v_ in v] kwargs["facecolors"] = colors if "label" in kwargs: kwargs["label"] = self.format_label(kwargs["label"]) c = PolyCollection(verts, **kwargs) ax.add_collection(c) c.cmap = cmap c.norm = Normalize(vmin=vmin, vmax=vmax) return c
def __merge_filters__(self, *filters): """ Compute an intersection between all filters after to evaluate each of them :param list(slice,array[int],array[bool]) filters: :return: Return applicable object to numpy.array :rtype: slice, index, mask """ filter1 = filters[0] if len(filters) > 2: filter2 = self.__merge_filters__(*filters[1:]) elif len(filters) == 2: filter2 = filters[1] # Merge indexs and filter if isinstance(filter1, slice): reject = ones(len(self), dtype="bool") reject[filter1] = False if isinstance(filter2, slice): reject[filter2] = False return ~reject # Mask case elif filter2.dtype == bool: return ~reject * filter2 # index case else: return filter2[~reject[filter2]] # mask case elif filter1.dtype == bool: if isinstance(filter2, slice): select = zeros(len(self), dtype="bool") select[filter2] = True return select * filter1 # Mask case elif filter2.dtype == bool: return filter2 * filter1 # index case else: return filter2[filter1[filter2]] # index case else: if isinstance(filter2, slice): select = zeros(len(self), dtype="bool") select[filter2] = True return filter1[select[filter1]] # Mask case elif filter2.dtype == bool: return filter1[filter2[filter1]] # index case else: return filter1[in1d(filter1, filter2)]
[docs] def merge_filters(self, *filters): """ Compute an intersection between all filters after to evaluate each of them :param list(callable,None,slice,array[int],array[bool]) filters: :return: Return applicable object to numpy.array :rtype: slice, index, mask """ if len(filters) == 1 and isinstance(filters[0], list): filters = filters[0] filters_ = list() # Remove all filter which select all obs for filter in filters: if callable(filter): filter = filter(self) if filter is None: continue if isinstance(filter, slice): if filter == slice(None): continue elif filter.dtype == "bool": if filter.all(): continue if not filter.any(): return empty(0, dtype=int) filters_.append(filter) if len(filters_) == 1: return filters_[0] elif len(filters_) == 0: return slice(None) else: return self.__merge_filters__(*filters_)
[docs] def bins_stat(self, xname, bins=None, yname=None, method=None, mask=None): """ :param str,array xname: variable to compute stats on :param array, None bins: bins to perform statistics, if None bins = arange(variable.min(), variable.max() + 2) :param None,str,array yname: variable used to apply method :param None,str method: If None method counts the number of observations in each bin, can be "mean", "std" :param None,array(bool) mask: If defined use only True position :return: x array and y array :rtype: array,array .. minigallery:: py_eddy_tracker.EddiesObservations.bins_stat """ v = self.parse_varname(xname) mask = self.merge_filters(mask) v = v[mask] if bins is None: bins = arange(v.min(), v.max() + 2) y, x = hist_numba(v, bins=bins) x = (x[1:] + x[:-1]) / 2 if method == "mean": y_v = self.parse_varname(yname) y_v = y_v[mask] y_, _ = histogram(v, bins=bins, weights=y_v) with errstate(divide="ignore", invalid="ignore"): y = y_ / y return x, y
[docs] def format_label(self, label): t0, t1 = self.period return label.format( t0=t0, t1=t1, nb_obs=len(self), )
[docs] def display_color(self, ax, field, ref=None, intern=False, **kwargs): """Plot colored contour of eddies :param matplotlib.axes.Axes ax: matplotlib axe used to draw :param str,array field: color field :param float,None ref: if defined, all coordinates are wrapped with ref as western boundary :param bool intern: if True, draw the speed contour :param dict kwargs: look at :py:meth:`matplotlib.collections.LineCollection` .. minigallery:: py_eddy_tracker.EddiesObservations.display_color """ xname, yname = self.intern(intern) x, y = self[xname], self[yname] if ref is not None: # TODO : maybe buggy with global display shape_out = x.shape x, y = wrap_longitude(x.reshape(-1), y.reshape(-1), ref) x, y = x.reshape(shape_out), y.reshape(shape_out) c = self.parse_varname(field) cmap = get_cmap(kwargs.pop("cmap", "Spectral_r")) cmin, cmax = kwargs.pop("vmin", c.min()), kwargs.pop("vmax", c.max()) colors = cmap((c - cmin) / (cmax - cmin)) lines = LineCollection( [create_vertice(i, j) for i, j in zip(x, y)], colors=colors, **kwargs ) ax.add_collection(lines) lines.cmap = cmap lines.norm = Normalize(vmin=cmin, vmax=cmax) return lines
[docs] def display(self, ax, ref=None, extern_only=False, intern_only=False, **kwargs): """Plot the speed and effective (dashed) contour of the eddies :param matplotlib.axes.Axes ax: matplotlib axe used to draw :param float,None ref: if defined, all coordinates are wrapped with ref as western boundary :param bool extern_only: if True, draw only the effective contour :param bool intern_only: if True, draw only the speed contour :param dict kwargs: look at :py:meth:`matplotlib.axes.Axes.plot` .. minigallery:: py_eddy_tracker.EddiesObservations.display """ if not extern_only: lon_s = flatten_line_matrix(self.contour_lon_s) lat_s = flatten_line_matrix(self.contour_lat_s) if not intern_only: lon_e = flatten_line_matrix(self.contour_lon_e) lat_e = flatten_line_matrix(self.contour_lat_e) if "label" in kwargs: kwargs["label"] = self.format_label(kwargs["label"]) kwargs_e = kwargs.copy() if "ls" not in kwargs_e and "linestyle" not in kwargs_e: kwargs_e["linestyle"] = "-." if not extern_only: kwargs_e.pop("label", None) mappables = list() if not extern_only: if ref is not None: lon_s, lat_s = wrap_longitude(lon_s, lat_s, ref, cut=True) mappables.append(ax.plot(lon_s, lat_s, **kwargs)[0]) if not intern_only: if ref is not None: lon_e, lat_e = wrap_longitude(lon_e, lat_e, ref, cut=True) mappables.append(ax.plot(lon_e, lat_e, **kwargs_e)[0]) return mappables
[docs] def first_obs(self): """ Get first obs of each trajectory. :rtype: __class__ .. minigallery:: py_eddy_tracker.EddiesObservations.first_obs """ return self.extract_with_mask(self.n == 0)
[docs] def last_obs(self): """ Get Last obs of each trajectory. :rtype: __class__ .. minigallery:: py_eddy_tracker.EddiesObservations.last_obs """ m = zeros(len(self), dtype="bool") m[-1] = True m[:-1][self.n[1:] == 0] = True return self.extract_with_mask(m)
[docs] def is_convex(self, intern=False): """ Get flag of the eddy's convexity :param bool intern: If True use speed contour instead of effective contour :return: True if the contour is convex :rtype: array[bool] """ xname, yname = self.intern(intern) return convexs(self[xname], self[yname])
[docs] def contains(self, x, y, intern=False): """ Return index of contour containing (x,y) :param array x: longitude :param array y: latitude :param bool intern: If true use speed contour instead of effective contour :return: indexs, -1 if no index :rtype: array[int32] """ xname, yname = self.intern(intern) m = ~(isnan(x) + isnan(y)) i = -ones(x.shape, dtype="i4") if x.size != 0 and m.any(): i[m] = poly_indexs(x[m], y[m], self[xname], self[yname]) return i
[docs] def inside(self, x, y, intern=False): """ True for each point inside the effective contour of an eddy :param array x: longitude :param array y: latitude :param bool intern: If true use speed contour instead of effective contour :return: flag :rtype: array[bool] """ xname, yname = self.intern(intern) return insidepoly(x, y, self[xname], self[yname])
[docs] def grid_count(self, bins, intern=False, center=False, filter=slice(None)): """ Count the eddies in each bin (use all pixels in each contour) :param (numpy.array,numpy.array) bins: bins (grid) to count :param bool intern: if True use speed contour only :param bool center: if True use of center to count :param array,mask,slice filter: keep the data selected with the filter :return: return the grid of counts :rtype: py_eddy_tracker.dataset.grid.RegularGridDataset .. minigallery:: py_eddy_tracker.EddiesObservations.grid_count """ filter = self.merge_filters(filter) x_name, y_name = self.intern(intern) x_bins, y_bins = arange(*bins[0]), arange(*bins[1]) x0 = bins[0][0] grid = ma.zeros((x_bins.shape[0] - 1, y_bins.shape[0] - 1), dtype="u4") from ..dataset.grid import RegularGridDataset regular_grid = RegularGridDataset.with_array( coordinates=("lon", "lat"), datas=dict( count=grid, lon=(x_bins[1:] + x_bins[:-1]) / 2, lat=(y_bins[1:] + y_bins[:-1]) / 2, ), variables_description=dict( count=dict(long_name="Number of times the pixel is within an eddy") ), centered=True, ) if center: x, y = (self.longitude[filter] - x0) % 360 + x0, self.latitude[filter] grid[:] = histogram2d(x, y, (x_bins, y_bins))[0] grid.mask = grid.data == 0 else: x_ref = ((self.longitude[filter] - x0) % 360 + x0 - 180).reshape(-1, 1) x_contour, y_contour = self[x_name][filter], self[y_name][filter] grid_count_pixel_in( grid.data, x_contour, y_contour, x_ref, regular_grid.x_bounds, regular_grid.y_bounds, regular_grid.xstep, regular_grid.ystep, regular_grid.N, regular_grid.is_circular(), regular_grid.x_size, regular_grid.x_c, regular_grid.y_c, ) grid.mask = grid == 0 return regular_grid
[docs] def grid_box_stat(self, bins, varname, method=50, data=None, filter=slice(None)): """ Get percentile of eddies in each bin :param (numpy.array,numpy.array) bins: bins (grid) to count :param str varname: variable to apply the method if data is None and will be output name :param str,float method: method to apply. If float, use ? :param array data: Array used to compute stat if defined :param array,mask,slice filter: keep the data selected with the filter :return: return grid of method :rtype: py_eddy_tracker.dataset.grid.RegularGridDataset .. minigallery:: py_eddy_tracker.EddiesObservations.grid_box_stat """ x_bins, y_bins = arange(*bins[0]), arange(*bins[1]) x0 = bins[0][0] x, y = (self.longitude - x0) % 360 + x0, self.latitude data = self[varname] if data is None else data if hasattr(data, "mask"): filter = self.merge_filters(~data.mask, self.merge_filters(filter)) else: filter = self.merge_filters(filter) x, y, data = x[filter], y[filter], data[filter] from ..dataset.grid import RegularGridDataset shape = (x_bins.shape[0] - 1, y_bins.shape[0] - 1) grid = ma.empty(shape, dtype=data.dtype) grid.mask = ones(shape, dtype="bool") regular_grid = RegularGridDataset.with_array( coordinates=("x", "y"), datas={varname: grid, "x": x_bins[:-1], "y": y_bins[:-1]}, centered=False, ) grid_box_stat( regular_grid.x_c, regular_grid.y_c, grid.data, grid.mask, x, y, data, regular_grid.is_circular(), method, ) return regular_grid
[docs] def grid_stat(self, bins, varname, data=None): """ Return the mean of the eddies' variable in each bin :param (numpy.array,numpy.array) bins: bins (grid) to compute the mean on :param str varname: name of variable to compute the mean on and output grid_name :param array data: Array used to compute stat if defined :return: return the gridde mean variable :rtype: py_eddy_tracker.dataset.grid.RegularGridDataset .. minigallery:: py_eddy_tracker.EddiesObservations.grid_stat """ x_bins, y_bins = arange(*bins[0]), arange(*bins[1]) x0 = bins[0][0] x, y = (self.longitude - x0) % 360 + x0, self.latitude data = self[varname] if data is None else data if hasattr(data, "mask"): m = ~data.mask sum_obs = histogram2d(x[m], y[m], (x_bins, y_bins), weights=data[m])[0] nb_obs = histogram2d(x[m], y[m], (x_bins, y_bins))[0] else: sum_obs = histogram2d(x, y, (x_bins, y_bins), weights=data)[0] nb_obs = histogram2d(x, y, (x_bins, y_bins))[0] from ..dataset.grid import RegularGridDataset with errstate(divide="ignore", invalid="ignore"): regular_grid = RegularGridDataset.with_array( coordinates=("x", "y"), datas={ varname: ma.array(sum_obs / nb_obs, mask=nb_obs == 0), "x": x_bins[:-1], "y": y_bins[:-1], }, centered=False, ) return regular_grid
[docs] def interp_grid( self, grid_object, varname, i=None, method="center", dtype=None, intern=None ): """ Interpolate a grid on a center or contour with mean, min or max method :param grid_object: Handler of grid to interp :type grid_object: py_eddy_tracker.dataset.grid.RegularGridDataset :param str varname: Name of variable to use :param array[bool,int],None i: Index or mask to subset observations, it could avoid to build a specific dataset. :param str method: 'center', 'mean', 'max', 'min', 'nearest' :param str dtype: if None we use var dtype :param bool intern: Use extern or intern contour .. minigallery:: py_eddy_tracker.EddiesObservations.interp_grid """ if method in ("center", "nearest"): x, y = self.longitude, self.latitude if i is not None: x, y = x[i], y[i] return grid_object.interp(varname, x, y, method) elif method in ("min", "max", "mean", "count"): x0 = grid_object.x_bounds[0] x_name, y_name = self.intern(False if intern is None else intern) x_ref = ((self.longitude - x0) % 360 + x0 - 180).reshape(-1, 1) x, y = (self[x_name] - x_ref) % 360 + x_ref, self[y_name] if i is not None: x, y = x[i], y[i] grid = grid_object.grid(varname) result = empty(x.shape[0], dtype=grid.dtype if dtype is None else dtype) min_method = method == "min" grid_stat( grid_object.x_c, grid_object.y_c, -grid.data if min_method else grid.data, grid.mask, x, y, result, grid_object.is_circular(), method="max" if min_method else method, ) return -result if min_method else result else: raise Exception(f'method "{method}" unknown')
@property def period(self): """ Give the time coverage. If collection is empty, return nan,nan :return: first and last date :rtype: (int,int) """ if self.period_ is None: if self.time.size < 1: self.period_ = nan, nan else: self.period_ = self.time.min(), self.time.max() return self.period_ @property def nb_days(self): """Return period in days covered by the dataset :return: Number of days :rtype: int """ return self.period[1] - self.period[0] + 1
[docs] def create_particles(self, step, intern=True): """Create particles inside contour (Default : speed contour). Avoid creating too large numpy arrays, only to be masked :param step: step for particles :type step: float :param bool intern: If true use speed contour instead of effective contour :return: lon, lat and indices of particles :rtype: tuple(np.array) """ xname, yname = self.intern(intern) return create_meshed_particles(self[xname], self[yname], step)
[docs] def empty_dataset(self): return self.new_like(self, 0)
[docs]@njit(cache=True) def grid_count_(grid, i, j): """ Add 1 to each index """ for i_, j_ in zip(i, j): grid[i_, j_] += 1
[docs]@njit(cache=True) def grid_count_pixel_in( grid, x, y, x_ref, x_bounds, y_bounds, xstep, ystep, N, is_circular, x_size, x_c, y_c, ): """ Count how many times a pixel is used. :param array grid: :param array x: x for all contour :param array y: y for all contour :param array x_ref: x reference for wrapping :param array x_bounds: grid longitude :param array y_bounds: grid latitude :param float xstep: step between two longitude :param float ystep: step between two latitude :param int N: shift of index to enlarge window :param bool is_circular: To know if grid is wrappable :param int x_size: Number of longitude :param array x_c: longitude coordinate of grid :param array y_c: latitude coordinate of grid """ nb = x_ref.shape[0] for i_ in range(nb): x_, y_, x_ref_ = x[i_], y[i_], x_ref[i_] x_ = (x_ - x_ref_) % 360 + x_ref_ x_, y_ = reduce_size(x_, y_) v = create_vertice(x_, y_) (x_start, x_stop), (y_start, y_stop) = bbox_indice_regular( v, x_bounds, y_bounds, xstep, ystep, N, is_circular, x_size, ) i, j = get_pixel_in_regular(v, x_c, y_c, x_start, x_stop, y_start, y_stop) grid_count_(grid, i, j)
[docs]@njit(cache=True) def grid_box_stat(x_c, y_c, grid, mask, x, y, value, circular=False, method=50): """ Compute method on each set (one set by box) :param array_like x_c: grid longitude coordinates :param array_like y_c: grid latitude coordinates :param array_like grid: grid to store the result :param array[bool] mask: grid to store unused boxes :param array_like x: longitude of observations :param array_like y: latitude of observations :param array_like value: value to group to apply method :param bool circular: True if grid is wrappable :param float method: percentile """ xstep, ystep = x_c[1] - x_c[0], y_c[1] - y_c[0] x0, y0 = x_c[0] - xstep / 2.0, y_c[0] - ystep / 2.0 nb_x = x_c.shape[0] nb_y = y_c.shape[0] i, j = ( ((x - x0) // xstep).astype(numba_types.int32), ((y - y0) // ystep).astype(numba_types.int32), ) if circular: i %= nb_x else: if (i < 0).any(): raise Exception("x indices underflow") if (i >= nb_x).any(): raise Exception("x indices overflow") if (j < 0).any(): raise Exception("y indices underflow") if (j >= nb_y).any(): raise Exception("y indices overflow") abs_i = j * nb_x + i k_sort = abs_i.argsort() i0, j0 = i[k_sort[0]], j[k_sort[0]] values = list() for k in k_sort: i_, j_ = i[k], j[k] # group change if i_ != i0 or j_ != j0: # apply method and store result grid[i_, j_] = percentile(values, method) mask[i_, j_] = False # start new group i0, j0 = i_, j_ # reset list values.clear() values.append(value[k])
[docs]@njit(cache=True) def grid_stat(x_c, y_c, grid, mask, x, y, result, circular=False, method="mean"): """ Compute the mean or the max of the grid for each contour :param array_like x_c: the grid longitude coordinates :param array_like y_c: the grid latitude coordinates :param array_like grid: grid value :param array[bool] mask: mask for invalid value :param array_like x: longitude of contours :param array_like y: latitude of contours :param array_like result: return values :param bool circular: True if grid is wrappable :param str method: 'mean', 'max' """ # FIXME : how does it work on grid bound nb = result.shape[0] xstep, ystep = x_c[1] - x_c[0], y_c[1] - y_c[0] x0, y0 = x_c - xstep / 2.0, y_c - ystep / 2.0 nb_x = x_c.shape[0] max_method = "max" == method mean_method = "mean" == method count_method = "count" == method for elt in range(nb): v = create_vertice(x[elt], y[elt]) (x_start, x_stop), (y_start, y_stop) = bbox_indice_regular( v, x0, y0, xstep, ystep, 1, circular, nb_x ) i, j = get_pixel_in_regular(v, x_c, y_c, x_start, x_stop, y_start, y_stop) if count_method: result[elt] = i.shape[0] elif mean_method: v_sum = 0 nb_ = 0 for i_, j_ in zip(i, j): if mask[i_, j_]: continue v_sum += grid[i_, j_] nb_ += 1 # FIXME : how does it work on grid bound, if nb_ == 0: result[elt] = nan else: result[elt] = v_sum / nb_ elif max_method: v_max = -1e40 for i_, j_ in zip(i, j): values = grid[i_, j_] # FIXME must use mask v_max = max(v_max, values) result[elt] = v_max
[docs]class VirtualEddiesObservations(EddiesObservations): """Class to work with virtual obs""" __slots__ = () @property def elements(self): elements = super().elements elements.extend(["track", "segment_size", "dlon", "dlat"]) return list(set(elements))
[docs]@njit(cache=True) def numba_where(mask): """Usefull when mask is close to be empty""" return where(mask)
[docs]@njit(cache=True) def sum_row_column(mask): """ Compute sum on row and column at same time """ nb_x, nb_y = mask.shape row_sum = zeros(nb_x, dtype=numba_types.int32) column_sum = zeros(nb_y, dtype=numba_types.int32) for i in range(nb_x): for j in range(nb_y): if mask[i, j]: row_sum[i] += 1 column_sum[j] += 1 return row_sum, column_sum
[docs]@njit(cache=True) def numba_digitize(values, bins): # Check if bins are regular nb_bins = bins.shape[0] step = bins[1] - bins[0] bin_previous = bins[1] for i in range(2, nb_bins): bin_current = bins[i] if step != (bin_current - bin_previous): # If bins are not regular return digitize(values, bins) bin_previous = bin_current nb_values = values.shape[0] out = empty(nb_values, dtype=numba_types.int64) up, down = bins[0], bins[-1] for i in range(nb_values): v_ = values[i] if v_ >= down: out[i] = nb_bins continue if v_ < up: out[i] = 0 continue out[i] = (v_ - bins[0]) / step + 1 return out
[docs]@njit(cache=True) def iter_mode_reduce(x, bins): """ Test if we could use a reduce mode :param array x: array to divide in group :param array bins: array which defined bounds between each group :return: If reduce mode, translator, and reduce x """ nb = x.shape[0] # If we use less than half value limit = nb // 2 # low and up x0, x1 = bins[0], bins[-1] m = empty(nb, dtype=numba_types.bool_) # To count number of value cover by bins c = 0 for i in range(nb): x_ = x[i] test = (x_ >= x0) * (x_ <= x1) m[i] = test if test: c += 1 # If number value exceed limit if c > limit: return False, empty(0, dtype=numba_types.int_), x # Indices to be able to translate in full index array indices = empty(c, dtype=numba_types.int_) x_ = empty(c, dtype=x.dtype) j = 0 for i in range(nb): if m[i]: indices[j] = i x_[j] = x[i] j += 1 return True, indices, x_