Source code for py_eddy_tracker.generic

# -*- coding: utf-8 -*-
"""
Tool method which use mostly numba
"""

from numba import njit, prange, types as numba_types
from numpy import (
    absolute,
    arcsin,
    arctan2,
    bool_,
    cos,
    empty,
    floor,
    histogram,
    interp,
    isnan,
    linspace,
    nan,
    ones,
    pi,
    radians,
    sin,
    where,
    zeros,
)


[docs]@njit(cache=True) def count_consecutive(mask): """ Count consecutive events every False flag count restart :param array[bool] mask: event to count :return: count when consecutive event :rtype: array """ count = 0 output = zeros(mask.shape, dtype=numba_types.int_) for i in range(mask.shape[0]): if not mask[i]: count = 0 continue count += 1 output[i] = count return output
[docs]@njit(cache=True) def reverse_index(index, nb): """ Compute a list of indices, which are not in index. :param array index: index of group which will be set to False :param array nb: Count for each group :return: mask of value selected :rtype: array """ m = ones(nb, dtype=numba_types.bool_) for i in index: m[i] = False return where(m)[0]
[docs]@njit(cache=True) def build_index(groups): """We expect that variable is monotonous, and return index for each step change. :param array groups: array that contains groups to be separated :return: (first_index of each group, last_index of each group, value to shift groups) :rtype: (array, array, int) :Example: >>> build_index(array((1, 1, 3, 4, 4))) (array([0, 2, 2, 3]), array([2, 2, 3, 5]), 1) """ i0, i1 = groups.min(), groups.max() amplitude = i1 - i0 + 1 # Index of first observation for each group first_index = zeros(amplitude, dtype=numba_types.int_) for i, group in enumerate(groups[:-1]): # Get next value to compare next_group = groups[i + 1] # if different we need to set index for all groups between the 2 values if group != next_group: first_index[group - i0 + 1 : next_group - i0 + 1] = i + 1 last_index = zeros(amplitude, dtype=numba_types.int_) last_index[:-1] = first_index[1:] last_index[-1] = len(groups) return first_index, last_index, i0
[docs]@njit(cache=True) def hist_numba(x, bins): """Call numba histogram to speed up.""" return histogram(x, bins)
[docs]@njit(cache=True, fastmath=True, parallel=False) def distance_grid(lon0, lat0, lon1, lat1): """ Get distance for every couple of points. :param array lon0: :param array lat0: :param array lon1: :param array lat1: :return: nan value for far away points, and km for other :rtype: array """ nb_0 = lon0.shape[0] nb_1 = lon1.shape[0] dist = empty((nb_0, nb_1)) D2R = pi / 180.0 for i in prange(nb_0): for j in prange(nb_1): dlat = absolute(lat1[j] - lat0[i]) if dlat > 15: dist[i, j] = nan continue dlon = absolute(lon1[j] - lon0[i]) if dlon > 180: dlon = absolute((dlon + 180) % 360 - 180) if dlon > 20: dist[i, j] = nan continue sin_dlat = sin((dlat) * 0.5 * D2R) sin_dlon = sin((dlon) * 0.5 * D2R) cos_lat1 = cos(lat0[i] * D2R) cos_lat2 = cos(lat1[j] * D2R) a_val = sin_dlon**2 * cos_lat1 * cos_lat2 + sin_dlat**2 dist[i, j] = 6370.997 * 2 * arctan2(a_val**0.5, (1 - a_val) ** 0.5) return dist
[docs]@njit(cache=True, fastmath=True) def distance(lon0, lat0, lon1, lat1): """ Compute distance between points from each line. :param float lon0: :param float lat0: :param float lon1: :param float lat1: :return: distance (in m) :rtype: array """ D2R = pi / 180.0 sin_dlat = sin((lat1 - lat0) * 0.5 * D2R) sin_dlon = sin((lon1 - lon0) * 0.5 * D2R) cos_lat1 = cos(lat0 * D2R) cos_lat2 = cos(lat1 * D2R) a_val = sin_dlon**2 * cos_lat1 * cos_lat2 + sin_dlat**2 return 6370997.0 * 2 * arctan2(a_val**0.5, (1 - a_val) ** 0.5)
[docs]@njit(cache=True) def cumsum_by_track(field, track): """ Cumsum by track. :param array field: data to sum :pram array(int) track: id of trajectories to separate data :return: cumsum with a reset at each start of track :rtype: array """ tr_previous = 0 d_cum = 0 cumsum_array = empty(track.shape, dtype=field.dtype) for i in range(field.shape[0]): tr = track[i] if tr != tr_previous: d_cum = 0 d_cum += field[i] cumsum_array[i] = d_cum tr_previous = tr return cumsum_array
[docs]@njit(cache=True, fastmath=True) def interp2d_geo(x_g, y_g, z_g, m_g, x, y, nearest=False): """ For geographic grid, test of cicularity. :param array x_g: coordinates of grid :param array y_g: coordinates of grid :param array z_g: Grid value :param array m_g: Boolean grid, True if value is masked :param array x: coordinate where interpolate z :param array y: coordinate where interpolate z :param bool nearest: if True we will take nearest pixel :return: z interpolated :rtype: array """ if nearest: return interp2d_nearest(x_g, y_g, z_g, x, y) else: return interp2d_bilinear(x_g, y_g, z_g, m_g, x, y)
[docs]@njit(cache=True, fastmath=True) def interp2d_nearest(x_g, y_g, z_g, x, y): """ Nearest interpolation with wrapping if circular :param array x_g: coordinates of grid :param array y_g: coordinates of grid :param array z_g: Grid value :param array x: coordinate where interpolate z :param array y: coordinate where interpolate z :return: z interpolated :rtype: array """ x_ref = x_g[0] y_ref = y_g[0] x_step = x_g[1] - x_ref y_step = y_g[1] - y_ref nb_x = x_g.shape[0] nb_y = y_g.shape[0] is_circular = abs(x_g[-1] % 360 - (x_g[0] - x_step) % 360) < 1e-5 z = empty(x.shape, dtype=z_g.dtype) for i in prange(x.size): i0 = int(round((x[i] - x_ref) / x_step)) j0 = int(round((y[i] - y_ref) / y_step)) if is_circular: i0 %= nb_x if i0 >= nb_x or i0 < 0 or j0 < 0 or j0 >= nb_y: z[i] = nan continue z[i] = z_g[i0, j0] return z
[docs]@njit(cache=True, fastmath=True) def interp2d_bilinear(x_g, y_g, z_g, m_g, x, y): """ Bilinear interpolation with wrapping if circular :param array x_g: coordinates of grid :param array y_g: coordinates of grid :param array z_g: Grid value :param array m_g: Boolean grid, True if value is masked :param array x: coordinate where interpolate z :param array y: coordinate where interpolate z :return: z interpolated :rtype: array """ x_ref = x_g[0] y_ref = y_g[0] x_step = x_g[1] - x_ref y_step = y_g[1] - y_ref nb_x = x_g.shape[0] nb_y = y_g.shape[0] is_circular = abs(x_g[-1] % 360 - (x_g[0] - x_step) % 360) < 1e-5 # Indexes that should never exist i0_old, j0_old, masked = -100000000, -10000000, False z = empty(x.shape, dtype=z_g.dtype) for i in prange(x.size): x_ = (x[i] - x_ref) / x_step y_ = (y[i] - y_ref) / y_step i0 = int(floor(x_)) # To keep original values if wrapping applied to compute xd i0_ = i0 j0 = int(floor(y_)) # corners are the same need only a new xd and yd if i0 != i0_old or j0 != j0_old: i1 = i0 + 1 j1 = j0 + 1 if is_circular: i0 %= nb_x i1 %= nb_x if i1 >= nb_x or i0 < 0 or j0 < 0 or j1 >= nb_y: masked = True else: masked = False if not masked: if m_g[i0, j0] or m_g[i0, j1] or m_g[i1, j0] or m_g[i1, j1]: masked = True else: z00, z01, z10, z11 = ( z_g[i0, j0], z_g[i0, j1], z_g[i1, j0], z_g[i1, j1], ) masked = False # Need to be stored only on change i0_old, j0_old = i0, j0 if masked: z[i] = nan else: xd = x_ - i0_ yd = y_ - j0 z[i] = (z00 * (1 - xd) + (z10 * xd)) * (1 - yd) + ( z01 * (1 - xd) + z11 * xd ) * yd return z
[docs]@njit(cache=True, fastmath=True) def uniform_resample(x_val, y_val, num_fac=2, fixed_size=None): """ Resample contours to have (nearly) equal spacing. :param array_like x_val: input x contour coordinates :param array_like y_val: input y contour coordinates :param int num_fac: factor to increase lengths of output coordinates :param int,None fixed_size: if defined, will be used to set sampling """ nb = x_val.shape[0] # Get distances dist = empty(nb) dist[0] = 0 dist[1:] = distance(x_val[:-1], y_val[:-1], x_val[1:], y_val[1:]) # To be still monotonous (dist is store in m) dist[1:][dist[1:] < 1e-3] = 1e-3 dist = dist.cumsum() # Get uniform distances if fixed_size is None: fixed_size = dist.size * num_fac d_uniform = linspace(0, dist[-1], fixed_size) x_new = interp(d_uniform, dist, x_val) y_new = interp(d_uniform, dist, y_val) return x_new, y_new
[docs]@njit(cache=True) def flatten_line_matrix(l_matrix): """ Flat matrix and add on between each line. :param l_matrix: matrix of position :return: array with nan between line """ nb_line, sampling = l_matrix.shape final_size = (nb_line - 1) + nb_line * sampling empty_dataset = False if final_size < 1: empty_dataset = True final_size = 1 out = empty(final_size, dtype=l_matrix.dtype) if empty_dataset: out[:] = nan return out inc = 0 for i in range(nb_line): for j in range(sampling): out[inc] = l_matrix[i, j] inc += 1 out[inc] = nan inc += 1 return out
[docs]@njit(cache=True) def simplify(x, y, precision=0.1): """ Will remove all middle/end points closer than precision. :param array x: :param array y: :param float precision: if two points have distance inferior to precision we remove next point :return: (x,y) :rtype: (array,array) """ precision2 = precision**2 nb = x.shape[0] # will be True for kept values mask = ones(nb, dtype=bool_) for j in range(0, nb): x_previous, y_previous = x[j], y[j] if isnan(x_previous) or isnan(y_previous): mask[j] = False continue break # Only nan if j == (nb - 1): return zeros(0, dtype=x.dtype), zeros(0, dtype=x.dtype) last_nan = False for i in range(j + 1, nb): x_, y_ = x[i], y[i] if isnan(x_) or isnan(y_): if last_nan: mask[i] = False else: last_nan = True continue last_nan = False d_x = x_ - x_previous if d_x > precision: x_previous, y_previous = x_, y_ continue d_y = y_ - y_previous if d_y > precision: x_previous, y_previous = x_, y_ continue d2 = d_x**2 + d_y**2 if d2 > precision2: x_previous, y_previous = x_, y_ continue mask[i] = False new_nb = mask.sum() new_x, new_y = empty(new_nb, dtype=x.dtype), empty(new_nb, dtype=y.dtype) j = 0 for i in range(nb): if mask[i]: new_x[j], new_y[j] = x[i], y[i] j += 1 return new_x, new_y
[docs]@njit(cache=True) def split_line(x, y, i): """ Split x and y at each i change. :param x: array :param y: array :param i: array of int at each i change, we cut x, y :return: x and y separated by nan at each i jump """ nb_jump = len(where(i[1:] - i[:-1] != 0)[0]) nb_value = x.shape[0] final_size = (nb_jump - 1) + nb_value new_x = empty(final_size, dtype=x.dtype) new_y = empty(final_size, dtype=y.dtype) new_j = 0 for j in range(nb_value): new_x[new_j] = x[j] new_y[new_j] = y[j] new_j += 1 if j < (nb_value - 1) and i[j] != i[j + 1]: new_x[new_j] = nan new_y[new_j] = nan new_j += 1 return new_x, new_y
[docs]@njit(cache=True) def wrap_longitude(x, y, ref, cut=False): """ Will wrap contiguous longitude with reference as western boundary. :param array x: :param array y: :param float ref: longitude of reference, all the new values will be between ref and ref + 360 :param bool cut: if True line will be cut at the bounds :return: lon,lat :rtype: (array,array) """ if cut: indexs = list() nb = x.shape[0] new_x_previous = (x[0] - ref) % 360 + ref x_previous = x[0] for i in range(1, nb): x_ = x[i] new_x = (x_ - ref) % 360 + ref if not isnan(x_) and not isnan(x_previous): d_new = new_x - new_x_previous d = x_ - x_previous if abs(d - d_new) > 1e-5: indexs.append(i) x_previous, new_x_previous = x_, new_x nb_indexs = len(indexs) new_size = nb + nb_indexs * 3 out_x = empty(new_size, dtype=x.dtype) out_y = empty(new_size, dtype=y.dtype) i_ = 0 j = 0 for i in range(nb): if j < nb_indexs and i == indexs[j]: j += 1 # FIXME need check cor = 360 if x[i - 1] > x[i] else -360 out_x[i + i_] = (x[i] - ref) % 360 + ref - cor out_y[i + i_] = y[i] out_x[i + i_ + 1] = nan out_y[i + i_ + 1] = nan out_x[i + i_ + 2] = (x[i - 1] - ref) % 360 + ref + cor out_y[i + i_ + 2] = y[i - 1] i_ += 3 out_x[i + i_] = (x[i] - ref) % 360 + ref out_y[i + i_] = y[i] return out_x, out_y else: nb = x.shape[0] out = empty(nb, dtype=x.dtype) for i in range(nb): out[i] = (x[i] - ref) % 360 + ref return out, y
[docs]@njit(cache=True, fastmath=True) def coordinates_to_local(lon, lat, lon0, lat0): """ Take latlong coordinates to transform in local coordinates (in m). :param array x: coordinates to transform :param array y: coordinates to transform :param float lon0: longitude of local reference :param float lat0: latitude of local reference :return: x,y :retype: (array, array) """ D2R = pi / 180.0 R = 6370997 dlon = (lon - lon0) * D2R sin_dlat = sin((lat - lat0) * 0.5 * D2R) sin_dlon = sin(dlon * 0.5) cos_lat0 = cos(lat0 * D2R) cos_lat = cos(lat * D2R) a_val = sin_dlon**2 * cos_lat0 * cos_lat + sin_dlat**2 module = R * 2 * arctan2(a_val**0.5, (1 - a_val) ** 0.5) azimuth = pi / 2 - arctan2( cos_lat * sin(dlon), cos_lat0 * sin(lat * D2R) - sin(lat0 * D2R) * cos_lat * cos(dlon), ) return module * cos(azimuth), module * sin(azimuth)
[docs]@njit(cache=True, fastmath=True) def local_to_coordinates(x, y, lon0, lat0): """ Take local coordinates (in m) to transform to latlong. :param array x: coordinates to transform :param array y: coordinates to transform :param float lon0: longitude of local reference :param float lat0: latitude of local reference :return: lon,lat :retype: (array, array) """ D2R = pi / 180.0 R = 6370997 d = (x**2 + y**2) ** 0.5 / R a = -(arctan2(y, x) - pi / 2) lat = arcsin(sin(lat0 * D2R) * cos(d) + cos(lat0 * D2R) * sin(d) * cos(a)) lon = ( lon0 + arctan2( sin(a) * sin(d) * cos(lat0 * D2R), cos(d) - sin(lat0 * D2R) * sin(lat) ) / D2R ) return lon, lat / D2R
[docs]@njit(cache=True, fastmath=True) def nearest_grd_indice(x, y, x0, y0, xstep, ystep): """ Get nearest grid index from a position. :param x: longitude :param y: latitude :param float x0: first grid longitude :param float y0: first grid latitude :param float xstep: step between two longitude :param float ystep: step between two latitude """ return ( numba_types.int32(round(((x - x0[0]) % 360.0) / xstep)), numba_types.int32(round((y - y0[0]) / ystep)), )
[docs]@njit(cache=True) def bbox_indice_regular(vertices, x0, y0, xstep, ystep, N, circular, x_size): """ Get bbox index of a contour in a regular grid. :param vertices: vertice of contour :param float x0: first grid longitude :param float y0: first grid latitude :param float xstep: step between two longitude :param float ystep: step between two latitude :param int N: shift of index to enlarge window :param bool circular: To know if grid is wrappable :param int x_size: Number of longitude """ lon, lat = vertices[:, 0], vertices[:, 1] lon_min, lon_max = lon.min(), lon.max() lat_min, lat_max = lat.min(), lat.max() i_x0, i_y0 = nearest_grd_indice(lon_min, lat_min, x0, y0, xstep, ystep) i_x1, i_y1 = nearest_grd_indice(lon_max, lat_max, x0, y0, xstep, ystep) if circular: slice_x = (i_x0 - N) % x_size, (i_x1 + N + 1) % x_size else: slice_x = max(i_x0 - N, 0), i_x1 + N + 1 slice_y = i_y0 - N, i_y1 + N + 1 return slice_x, slice_y
[docs]def build_circle(x0, y0, r): """ Build circle from center coordinates. :param float x0: center coordinate :param float y0: center coordinate :param float r: radius i meter :return: x,y :rtype: (array,array) """ angle = radians(linspace(0, 360, 50)) x_norm, y_norm = cos(angle), sin(angle) return x_norm * r + x0, y_norm * r + y0
[docs]@njit(cache=True) def window_index(x, x0, half_window=1): """ Give for a fixed half_window each start and end index for each x0, in an unsorted array. :param array x: array of value :param array x0: array of window center :param float half_window: half window """ # Sort array, bounds will be sort also i_ordered = x.argsort() nb_x, nb_pt = x.size, x0.size first_index = empty(nb_pt, dtype=i_ordered.dtype) last_index = empty(nb_pt, dtype=i_ordered.dtype) # First bound to find j_min, j_max = 0, 0 x_min = x0[j_min] - half_window x_max = x0[j_max] + half_window # We iterate on ordered x for i, i_x in enumerate(i_ordered): x_ = x[i_x] # if x bigger than x_min , we found bound and search next one while x_ > x_min and j_min < nb_pt: first_index[j_min] = i j_min += 1 x_min = x0[j_min] - half_window # if x bigger than x_max , we found bound and search next one while x_ > x_max and j_max < nb_pt: last_index[j_max] = i j_max += 1 x_max = x0[j_max] + half_window if j_max == nb_pt: break for i in range(j_min, nb_pt): first_index[i] = nb_x for i in range(j_max, nb_pt): last_index[i] = nb_x return i_ordered, first_index, last_index