Source code for py_eddy_tracker.eddy_feature

# -*- coding: utf-8 -*-
"""
Class to compute Amplitude and average speed profile
"""

import logging

from matplotlib.cm import get_cmap
from matplotlib.colors import Normalize
from matplotlib.figure import Figure
from numba import njit, types as numba_types
from numpy import (
    array,
    concatenate,
    digitize,
    empty,
    int_,
    ma,
    ones,
    round,
    unique,
    zeros,
)

from .poly import winding_number_poly

logger = logging.getLogger("pet")


[docs]class Amplitude(object): """ Class to calculate *amplitude* and counts of *local maxima/minima* within a closed region of a sea surface height field. """ EPSILON = 1e-8 __slots__ = ( "h_0", "grid_extract", "pixel_mask", "nb_pixel", "sla", "contour", "interval_min", "interval_min_secondary", "amplitude", "mle", ) def __init__( self, contour, contour_height, data, interval, mle=1, nb_step_min=2, nb_step_to_be_mle=2, ): """ Create amplitude object :param Contours contour: usefull class defined below :param float contour_height: field value of the contour :param array data: grid :param float interval: step between two contours :param int mle: maximum number of local extrema in contour :param int nb_step_min: minimum number of intervals to consider the contour as an eddy :param int nb_step_to_be_mle: number of intervals to be considered as another extrema """ # Height of the contour self.h_0 = contour_height # Step minimal to consider amplitude self.interval_min = interval * nb_step_min self.interval_min_secondary = interval * nb_step_to_be_mle # Indices of all pixels in contour self.contour = contour # Link on original grid (local view) or copy if it's on bound (x_start, x_stop), (y_start, y_stop) = contour.bbox_slice on_bounds = x_start > x_stop if on_bounds: self.grid_extract = ma.concatenate( (data[x_start:, y_start:y_stop], data[:x_stop, y_start:y_stop]) ) if self.grid_extract.mask.size == 1: self.grid_extract = ma.array( self.grid_extract, mask=ones(self.grid_extract.shape, dtype="bool") * self.grid_extract.mask, ) else: self.grid_extract = data[x_start:x_stop, y_start:y_stop] # => maybe replace pixel out of contour by nan? self.pixel_mask = zeros(self.grid_extract.shape, dtype="bool") i_x = contour.pixels_index[0] - x_start if on_bounds: i_x %= data.shape[0] self.pixel_mask[i_x, contour.pixels_index[1] - y_start] = True self.nb_pixel = i_x.shape[0] # Only pixel in contour # FIXME : change sla by ssh as the grid can be adt? self.sla = data[contour.pixels_index] # Amplitude which will be provided self.amplitude = 0 # Maximum local extrema accepted self.mle = mle
[docs] def within_amplitude_limits(self): """Need update""" return self.interval_min <= self.amplitude
[docs] def all_pixels_below_h0(self, level): """ Check CSS11 criterion 1: The SSH values of all of the pixels are below a given SSH threshold for cyclonic eddies. """ # In some cases pixel value may be very close to the contour bounds if self.sla.mask.any() or ((self.sla.data - self.h_0) > self.EPSILON).any(): return False else: # All local extrema index on th box lmi_i, lmi_j = detect_local_minima_( self.grid_extract.data, self.grid_extract.mask, self.pixel_mask, self.mle, 1, ) # After we use grid.data because index are in contour and we check before than no pixel are hide nb = len(lmi_i) if nb == 0: logger.warning( "No extrema found in contour of %d pixels in level %f", self.nb_pixel, level, ) return False elif nb == 1: i, j = lmi_i[0], lmi_j[0] else: # Verify if several extrema are seriously below contour nb_real_extrema = ( (level - self.grid_extract.data[lmi_i, lmi_j]) >= self.interval_min_secondary ).sum() if nb_real_extrema > self.mle: return False index = self.grid_extract.data[lmi_i, lmi_j].argmin() i, j = lmi_i[index], lmi_j[index] self.amplitude = abs(self.grid_extract.data[i, j] - self.h_0) (x_start, _), (y_start, _) = self.contour.bbox_slice i += x_start j += y_start return i, j
[docs] def all_pixels_above_h0(self, level): """ Check CSS11 criterion 1: The SSH values of all of the pixels are above a given SSH threshold for anticyclonic eddies. """ # In some case pixel value must be very near of contour bounds if self.sla.mask.any() or ((self.h_0 - self.sla.data) > self.EPSILON).any(): return False else: # All local extrema index on th box lmi_i, lmi_j = detect_local_minima_( self.grid_extract.data, self.grid_extract.mask, self.pixel_mask, self.mle, -1, ) # After we use grid.data because index are in contour and we check before than no pixel are hide nb = len(lmi_i) if nb == 0: logger.warning( "No extrema found in contour of %d pixels in level %f", self.nb_pixel, level, ) return False elif nb == 1: i, j = lmi_i[0], lmi_j[0] else: # Verify if several extrema are seriously above contour nb_real_extrema = ( (self.grid_extract.data[lmi_i, lmi_j] - level) >= self.interval_min_secondary ).sum() if nb_real_extrema > self.mle: return False index = self.grid_extract.data[lmi_i, lmi_j].argmax() i, j = lmi_i[index], lmi_j[index] self.amplitude = abs(self.grid_extract.data[i, j] - self.h_0) (x_start, _), (y_start, _) = self.contour.bbox_slice i += x_start j += y_start return i, j
[docs]@njit(cache=True) def detect_local_minima_(grid, general_mask, pixel_mask, maximum_local_extremum, sign): """ Take an array and detect the troughs using the local maximum filter. Returns a boolean mask of the troughs (i.e., 1 when the pixel's value is the neighborhood maximum, 0 otherwise) http://stackoverflow.com/questions/3684484/peak-detection-in-a-2d-array/3689710#3689710 """ nb_x, nb_y = grid.shape # init with fake value because numba need to type data in list xs, ys = [0], [0] xs.pop(0) ys.pop(0) g = empty((3, 3)) for i in range(1, nb_x - 1): for j in range(1, nb_y - 1): # Copy footprint for i_ in range(-1, 2): for j_ in range(-1, 2): if general_mask[i_ + i, j_ + j]: g[i_ + 1, j_ + 1] = 2e10 else: g[i_ + 1, j_ + 1] = grid[i_ + i, j_ + j] * sign # if center equal to min # TODO if center and neigboor have same value we have problem, i don't know how if g.min() == (grid[i, j] * sign) and pixel_mask[i, j]: xs.append(i) ys.append(j) nb_extrema = len(xs) # If several extrema we try to separate them if nb_extrema > maximum_local_extremum: # Group nb_group = 1 gr = zeros(nb_extrema, dtype=numba_types.int16) for i0 in range(nb_extrema - 1): for i1 in range(i0 + 1, nb_extrema): if (abs(xs[i0] - xs[i1]) + abs(ys[i0] - ys[i1])) == 1: if gr[i0] == 0 and gr[i1] == 0: # Nobody was link with a known group gr[i0] = nb_group gr[i1] = nb_group nb_group += 1 elif gr[i0] == 0 and gr[i1] != 0: # i1 is link not i0 gr[i0] = gr[i1] elif gr[i1] == 0 and gr[i0] != 0: # i0 is link not i1 gr[i1] = gr[i0] else: # there already linked in two different group # we replace group from i0 with group from i1 gr[gr == gr[i0]] = gr[i1] m = gr != 0 grs = unique(gr[m]) # all non grouped extremum # Numba work around xs_new, ys_new = [0], [0] xs_new.pop(0), ys_new.pop(0) for i in range(nb_extrema): if m[i]: continue xs_new.append(xs[i]) ys_new.append(ys[i]) for gr_ in grs: nb = 0 x_mean = 0 y_mean = 0 # Choose barycentre of group for i in range(nb_extrema): if gr_ == gr[i]: x_mean += xs[i] y_mean += ys[i] nb += 1 x_mean /= nb y_mean /= nb xs_new.append(numba_types.int32(round(x_mean))) ys_new.append(numba_types.int32(round(y_mean))) return xs_new, ys_new return xs, ys
[docs]class Contours(object): """ Class to calculate average geostrophic velocity along a contour, *uavg*, and return index to contour with maximum *uavg* within a series of closed contours. Attributes: contour: A matplotlib contour object of high-pass filtered SSH eddy: A tracklist object holding the SSH data grd: A grid object """ __slots__ = ( "contours", "x_value", "y_value", "contour_index", "level_index", "x_min_per_contour", "y_min_per_contour", "x_max_per_contour", "y_max_per_contour", "nb_pt_per_contour", "nb_contour_per_level", ) DELTA_PREC = 1e-10 DELTA_SUP = 1e-2
[docs] def get_next(self, origin, paths_left, paths_right): for i, path in enumerate(paths_right): if abs(origin.vertices[-1, 1] - path.vertices[0, 1]) < self.DELTA_PREC: if (path.vertices[0, 0] - origin.vertices[-1, 0]) > 1: path.vertices[:, 0] -= 360 origin.vertices = concatenate((origin.vertices, path.vertices)) paths_right.pop(i) if self.check_closing(origin): origin.vertices[-1] = origin.vertices[0] return True return self.get_next(origin, paths_right, paths_left) return False
[docs] def check_closing(self, path): return abs(path.vertices[0, 1] - path.vertices[-1, 1]) < self.DELTA_PREC
[docs] def find_wrapcut_path_and_join(self, x0, x1): poly_solve = 0 for collection in self.contours.collections: paths = collection.get_paths() paths_left = [] paths_right = [] paths_solve = [] paths_out = list() # All path near meridian bounds for path in paths: x_start, x_end = path.vertices[0, 0], path.vertices[-1, 0] if ( abs(x_start - x0) < self.DELTA_PREC and abs(x_end - x0) < self.DELTA_PREC ): paths_left.append(path) elif ( abs(x_start - x1) < self.DELTA_PREC and abs(x_end - x1) < self.DELTA_PREC ): paths_right.append(path) else: paths_out.append(path) if paths_left and paths_right: polys_to_pop_left = list() # Solve simple close (2 segment) for i_left, path_left in enumerate(paths_left): for i_right, path_right in enumerate(paths_right): if ( abs(path_left.vertices[0, 1] - path_right.vertices[-1, 1]) < self.DELTA_PREC and abs( path_left.vertices[-1, 1] - path_right.vertices[0, 1] ) < self.DELTA_PREC ): polys_to_pop_left.append(i_left) path_right.vertices[:, 0] -= 360 path_left.vertices = concatenate( (path_left.vertices, path_right.vertices[1:]) ) path_left.vertices[-1] = path_left.vertices[0] paths_solve.append(path_left) paths_right.pop(i_right) break for i in polys_to_pop_left[::-1]: paths_left.pop(i) # Solve multiple segment: if paths_left and paths_right: while len(paths_left): origin = paths_left.pop(0) if self.get_next(origin, paths_left, paths_right): paths_solve.append(origin) poly_solve += len(paths_solve) paths_out.extend(paths_solve) paths_out.extend(paths_left) paths_out.extend(paths_right) collection._paths = paths_out logger.info("%d contours close over the bounds", poly_solve)
def __init__(self, x, y, z, levels, wrap_x=False, keep_unclose=False): """ c_i : index to contours l_i : index to levels """ logger.info("Start computing iso lines") fig = Figure() ax = fig.add_subplot(111) if wrap_x: logger.debug("wrapping activated to compute contour") x = concatenate((x, x[:1] + 360)) z = ma.concatenate((z, z[:1])) logger.debug("X shape : %s", x.shape) logger.debug("Y shape : %s", y.shape) logger.debug("Z shape : %s", z.shape) logger.info( "Start computing iso lines with %d levels from %f to %f ...", len(levels), levels[0], levels[-1], ) self.contours = ax.contour( x, y, z.T if z.shape != x.shape else z, levels, cmap="rainbow" ) if wrap_x: self.find_wrapcut_path_and_join(x[0], x[-1]) logger.info("Finish computing iso lines") nb_level = 0 nb_contour = 0 nb_pt = 0 almost_closed_contours = 0 closed_contours = 0 # Count level and contour for i, collection in enumerate(self.contours.collections): collection.get_nearest_path_bbox_contain_pt = ( lambda x, y, i=i: self.get_index_nearest_path_bbox_contain_pt(i, x, y) ) nb_level += 1 keep_path = list() for contour in collection.get_paths(): # Contour with less vertices than 4 are popped if contour.vertices.shape[0] < 4: continue # Remove unclosed path d_closed = ( (contour.vertices[0, 0] - contour.vertices[-1, 0]) ** 2 + (contour.vertices[0, 1] - contour.vertices[-1, 1]) ** 2 ) ** 0.5 if d_closed > self.DELTA_SUP and not keep_unclose: continue elif d_closed != 0 and d_closed <= self.DELTA_SUP: # Repair almost closed contour if d_closed > self.DELTA_PREC: almost_closed_contours += 1 else: closed_contours += 1 contour.vertices[-1] = contour.vertices[0] x_min, y_min = contour.vertices.min(axis=0) x_max, y_max = contour.vertices.max(axis=0) ptp_min = self.DELTA_PREC * 100 if abs(x_min - x_max) < ptp_min or abs(y_min - y_max) < ptp_min: continue # Store to use latter contour.xmin = x_min contour.xmax = x_max contour.ymin = y_min contour.ymax = y_max keep_path.append(contour) collection._paths = keep_path for contour in collection.get_paths(): contour.used = False contour.reject = 0 nb_contour += 1 nb_pt += contour.vertices.shape[0] logger.info( "Repair %d closed contours and %d almost closed contours / %d contours", closed_contours, almost_closed_contours, nb_contour, ) # Type for coordinates coord_dtype = contour.vertices.dtype # Array declaration self.x_value = empty(nb_pt, dtype=coord_dtype) self.y_value = empty(nb_pt, dtype=coord_dtype) self.level_index = empty(nb_level, dtype="u4") self.nb_contour_per_level = empty(nb_level, dtype="u4") self.nb_pt_per_contour = empty(nb_contour, dtype="u4") self.x_min_per_contour = empty(nb_contour, dtype=coord_dtype) self.x_max_per_contour = empty(nb_contour, dtype=coord_dtype) self.y_min_per_contour = empty(nb_contour, dtype=coord_dtype) self.y_max_per_contour = empty(nb_contour, dtype=coord_dtype) # Filled array i_pt = 0 i_c = 0 i_l = 0 for collection in self.contours.collections: self.level_index[i_l] = i_c for contour in collection.get_paths(): nb_pt = contour.vertices.shape[0] # Copy pt self.x_value[i_pt : i_pt + nb_pt] = contour.vertices[:, 0] self.y_value[i_pt : i_pt + nb_pt] = contour.vertices[:, 1] # Set bbox self.x_min_per_contour[i_c], self.y_min_per_contour[i_c] = ( contour.xmin, contour.ymin, ) self.x_max_per_contour[i_c], self.y_max_per_contour[i_c] = ( contour.xmax, contour.ymax, ) # Count pt self.nb_pt_per_contour[i_c] = nb_pt i_pt += nb_pt i_c += 1 i_l += 1 self.contour_index = array( self.nb_pt_per_contour.cumsum() - self.nb_pt_per_contour, dtype="u4" ) self.level_index[0] = 0 self.nb_contour_per_level[:-1] = self.level_index[1:] - self.level_index[:-1] self.nb_contour_per_level[-1] = nb_contour - self.level_index[-1]
[docs] def iter(self, start=None, stop=None, step=None): return self.contours.collections[slice(start, stop, step)]
@property def cvalues(self): return self.contours.cvalues @property def levels(self): return self.contours.levels
[docs] def get_index_nearest_path_bbox_contain_pt(self, level, xpt, ypt): """Get index from the nearest path in the level, if the bbox of the path contain pt overhead of python is huge with numba, cython little bit best?? """ index = index_from_nearest_path_with_pt_in_bbox_( level, self.level_index, self.nb_contour_per_level, self.nb_pt_per_contour, self.contour_index, self.x_value, self.y_value, self.x_min_per_contour, self.y_min_per_contour, self.x_max_per_contour, self.y_max_per_contour, xpt, ypt, ) if index == -1: return None else: return self.contours.collections[level]._paths[index]
[docs] def display( self, ax, step=1, only_used=False, only_unused=False, only_contain_eddies=False, display_criterion=False, field=None, bins=None, cmap="Spectral_r", **kwargs ): """ Display contour :param matplotlib.axes.Axes ax: :param int step: display only contour every step :param bool only_used: display only contour used in an eddy :param bool only_unused: display only contour unused in an eddy :param bool only_contain_eddies: display only contour which enclosed an eddiy :param bool display_criterion: display only unused contour with criterion color 0. - Accepted (green) 1. - Reject for shape error (red) 2. - Masked value in contour (blue) 3. - Under or over pixel limit bound (black) 4. - Amplitude criterion (yellow) :param str field: Must be 'shape_error', 'x', 'y' or 'radius'. If defined display_criterion is not use. bins argument must be defined :param array bins: bins used to colorize contour :param str cmap: Name of cmap for field display :param dict kwargs: look at :py:meth:`matplotlib.collections.LineCollection` .. minigallery:: py_eddy_tracker.Contours.display """ from matplotlib.collections import LineCollection overide_color = display_criterion or field is not None if display_criterion: paths = {0: list(), 1: list(), 2: list(), 3: list(), 4: list()} elif field is not None: paths = dict() for i in range(len(bins)): paths[i] = list() paths[i + 1] = list() for j, collection in enumerate(self.contours.collections[::step]): if not overide_color: paths = list() for i in collection.get_paths(): if only_used and not i.used: continue elif only_unused and i.used: continue elif only_contain_eddies and not i.contain_eddies: continue if display_criterion: paths[i.reject].append(i.vertices) elif field is not None: x, y, radius, shape_error = i.fit_circle() if field == "shape_error": i_ = digitize(shape_error, bins) elif field == "radius": i_ = digitize(radius, bins) elif field == "x": i_ = digitize(x, bins) elif field == "y": i_ = digitize(y, bins) paths[i_].append(i.vertices) else: paths.append(i.vertices) local_kwargs = kwargs.copy() if "color" not in kwargs: local_kwargs["color"] = collection.get_edgecolor() local_kwargs.pop("label", None) elif j != 0: local_kwargs.pop("label", None) if not overide_color: ax.add_collection(LineCollection(paths, **local_kwargs)) if display_criterion: colors = { 0: "limegreen", 1: "red", 2: "mediumblue", 3: "black", 4: "gold", } for k, v in paths.items(): local_kwargs = kwargs.copy() local_kwargs.pop("label", None) local_kwargs["colors"] = colors[k] ax.add_collection(LineCollection(v, **local_kwargs)) elif field is not None: nb_bins = len(bins) - 1 cmap = get_cmap(cmap, lut=nb_bins) for k, v in paths.items(): local_kwargs = kwargs.copy() local_kwargs.pop("label", None) if k == 0: local_kwargs["colors"] = cmap(0.0) elif k > nb_bins: local_kwargs["colors"] = cmap(1.0) else: local_kwargs["colors"] = cmap((k - 1.0) / nb_bins) mappable = LineCollection(v, **local_kwargs) ax.add_collection(mappable) mappable.cmap = cmap mappable.norm = Normalize(vmin=bins[0], vmax=bins[-1]) # TODO : need to create an object with all collections return mappable else: if hasattr(self.contours, "_mins"): ax.update_datalim([self.contours._mins, self.contours._maxs]) ax.autoscale_view()
[docs] def label_contour_unused_which_contain_eddies(self, eddies): """Select contour containing several eddies""" if eddies.sign_type == 1: # anticyclonic sl = slice(None, -1) cor = 1 else: # cyclonic sl = slice(1, None) cor = -1 # On each level for j, collection in enumerate(self.contours.collections[sl]): # get next height contour_height = self.contours.cvalues[j + cor] # On each contour for i in collection.get_paths(): i.contain_eddies = False if i.used: continue nb = 0 # try with each eddy for eddy in eddies: if abs(eddy["height_external_contour"] - contour_height) > 1e-8: continue # If eddy center in contour wn = winding_number_poly( eddy["lon_max"], eddy["lat_max"], i.vertices ) if wn != 0: # Count nb += 1 if nb > 1: i.contain_eddies = True
[docs]@njit(cache=True, fastmath=True) def index_from_nearest_path_with_pt_in_bbox_( level_index, l_i, nb_c_per_l, nb_pt_per_c, indices_of_first_pts, x_value, y_value, x_min_per_c, y_min_per_c, x_max_per_c, y_max_per_c, xpt, ypt, ): """Get index from nearest path in edge bbox contain pt""" # Nb contour in level if nb_c_per_l[level_index] == 0: return -1 # First contour in level i_start_c = l_i[level_index] # First contour of the next level i_end_c = i_start_c + nb_c_per_l[level_index] # Flag to check if we iterate find_contour = 0 # We select the first pt of the first contour in the level # to initialize dist i_ref = i_start_c i_start_pt = indices_of_first_pts[i_start_c] dist_ref = (x_value[i_start_pt] - xpt) ** 2 + (y_value[i_start_pt] - ypt) ** 2 # We iterate over contour in the same level for i_elt_c in range(i_start_c, i_end_c): # if bbox of contour doesn't contain pt, we skip this contour if y_min_per_c[i_elt_c] > ypt: continue if y_max_per_c[i_elt_c] < ypt: continue x_min = x_min_per_c[i_elt_c] xpt_ = (xpt - x_min) % 360 + x_min if x_min > xpt_: continue if x_max_per_c[i_elt_c] < xpt_: continue # Indice of first pt of contour i_start_pt = indices_of_first_pts[i_elt_c] # Indice of first pt of the next contour i_end_pt = i_start_pt + nb_pt_per_c[i_elt_c] # We set flag to true, because we check contour find_contour = 1 # We do iteration on pt to check dist, if it's inferior we store # index of contour for i_elt_pt in range(i_start_pt, i_end_pt): d_x = x_value[i_elt_pt] - xpt_ if abs(d_x) > 180: d_x = (d_x + 180) % 360 - 180 dist = d_x**2 + (y_value[i_elt_pt] - ypt) ** 2 if dist < dist_ref: dist_ref = dist i_ref = i_elt_c # No iteration on contour, we return no index of contour if find_contour == 0: return int_(-1) # We return index of contour, for the specific level return int_(i_ref - i_start_c)