# -*- coding: utf-8 -*-
"""
Tool method which use mostly numba
"""
from numba import njit, prange, types as numba_types
from numpy import (
absolute,
arcsin,
arctan2,
bool_,
cos,
empty,
floor,
histogram,
interp,
isnan,
linspace,
nan,
ones,
pi,
radians,
sin,
where,
zeros,
)
[docs]@njit(cache=True)
def count_consecutive(mask):
"""
Count consecutive events every False flag count restart
:param array[bool] mask: event to count
:return: count when consecutive event
:rtype: array
"""
count = 0
output = zeros(mask.shape, dtype=numba_types.int_)
for i in range(mask.shape[0]):
if not mask[i]:
count = 0
continue
count += 1
output[i] = count
return output
[docs]@njit(cache=True)
def reverse_index(index, nb):
"""
Compute a list of indices, which are not in index.
:param array index: index of group which will be set to False
:param array nb: Count for each group
:return: mask of value selected
:rtype: array
"""
m = ones(nb, dtype=numba_types.bool_)
for i in index:
m[i] = False
return where(m)[0]
[docs]@njit(cache=True)
def build_index(groups):
"""We expect that variable is monotonous, and return index for each step change.
:param array groups: array that contains groups to be separated
:return: (first_index of each group, last_index of each group, value to shift groups)
:rtype: (array, array, int)
:Example:
>>> build_index(array((1, 1, 3, 4, 4)))
(array([0, 2, 2, 3]), array([2, 2, 3, 5]), 1)
"""
i0, i1 = groups.min(), groups.max()
amplitude = i1 - i0 + 1
# Index of first observation for each group
first_index = zeros(amplitude, dtype=numba_types.int_)
for i, group in enumerate(groups[:-1]):
# Get next value to compare
next_group = groups[i + 1]
# if different we need to set index for all groups between the 2 values
if group != next_group:
first_index[group - i0 + 1 : next_group - i0 + 1] = i + 1
last_index = zeros(amplitude, dtype=numba_types.int_)
last_index[:-1] = first_index[1:]
last_index[-1] = len(groups)
return first_index, last_index, i0
[docs]@njit(cache=True)
def hist_numba(x, bins):
"""Call numba histogram to speed up."""
return histogram(x, bins)
[docs]@njit(cache=True, fastmath=True, parallel=False)
def distance_grid(lon0, lat0, lon1, lat1):
"""
Get distance for every couple of points.
:param array lon0:
:param array lat0:
:param array lon1:
:param array lat1:
:return: nan value for far away points, and km for other
:rtype: array
"""
nb_0 = lon0.shape[0]
nb_1 = lon1.shape[0]
dist = empty((nb_0, nb_1))
D2R = pi / 180.0
for i in prange(nb_0):
for j in prange(nb_1):
dlat = absolute(lat1[j] - lat0[i])
if dlat > 15:
dist[i, j] = nan
continue
dlon = absolute(lon1[j] - lon0[i])
if dlon > 180:
dlon = absolute((dlon + 180) % 360 - 180)
if dlon > 20:
dist[i, j] = nan
continue
sin_dlat = sin((dlat) * 0.5 * D2R)
sin_dlon = sin((dlon) * 0.5 * D2R)
cos_lat1 = cos(lat0[i] * D2R)
cos_lat2 = cos(lat1[j] * D2R)
a_val = sin_dlon**2 * cos_lat1 * cos_lat2 + sin_dlat**2
dist[i, j] = 6370.997 * 2 * arctan2(a_val**0.5, (1 - a_val) ** 0.5)
return dist
[docs]@njit(cache=True, fastmath=True)
def distance(lon0, lat0, lon1, lat1):
"""
Compute distance between points from each line.
:param float lon0:
:param float lat0:
:param float lon1:
:param float lat1:
:return: distance (in m)
:rtype: array
"""
D2R = pi / 180.0
sin_dlat = sin((lat1 - lat0) * 0.5 * D2R)
sin_dlon = sin((lon1 - lon0) * 0.5 * D2R)
cos_lat1 = cos(lat0 * D2R)
cos_lat2 = cos(lat1 * D2R)
a_val = sin_dlon**2 * cos_lat1 * cos_lat2 + sin_dlat**2
return 6370997.0 * 2 * arctan2(a_val**0.5, (1 - a_val) ** 0.5)
[docs]@njit(cache=True)
def cumsum_by_track(field, track):
"""
Cumsum by track.
:param array field: data to sum
:pram array(int) track: id of trajectories to separate data
:return: cumsum with a reset at each start of track
:rtype: array
"""
tr_previous = 0
d_cum = 0
cumsum_array = empty(track.shape, dtype=field.dtype)
for i in range(field.shape[0]):
tr = track[i]
if tr != tr_previous:
d_cum = 0
d_cum += field[i]
cumsum_array[i] = d_cum
tr_previous = tr
return cumsum_array
[docs]@njit(cache=True, fastmath=True)
def interp2d_geo(x_g, y_g, z_g, m_g, x, y, nearest=False):
"""
For geographic grid, test of cicularity.
:param array x_g: coordinates of grid
:param array y_g: coordinates of grid
:param array z_g: Grid value
:param array m_g: Boolean grid, True if value is masked
:param array x: coordinate where interpolate z
:param array y: coordinate where interpolate z
:param bool nearest: if True we will take nearest pixel
:return: z interpolated
:rtype: array
"""
if nearest:
return interp2d_nearest(x_g, y_g, z_g, x, y)
else:
return interp2d_bilinear(x_g, y_g, z_g, m_g, x, y)
[docs]@njit(cache=True, fastmath=True)
def interp2d_nearest(x_g, y_g, z_g, x, y):
"""
Nearest interpolation with wrapping if circular
:param array x_g: coordinates of grid
:param array y_g: coordinates of grid
:param array z_g: Grid value
:param array x: coordinate where interpolate z
:param array y: coordinate where interpolate z
:return: z interpolated
:rtype: array
"""
x_ref = x_g[0]
y_ref = y_g[0]
x_step = x_g[1] - x_ref
y_step = y_g[1] - y_ref
nb_x = x_g.shape[0]
nb_y = y_g.shape[0]
is_circular = abs(x_g[-1] % 360 - (x_g[0] - x_step) % 360) < 1e-5
z = empty(x.shape, dtype=z_g.dtype)
for i in prange(x.size):
i0 = int(round((x[i] - x_ref) / x_step))
j0 = int(round((y[i] - y_ref) / y_step))
if is_circular:
i0 %= nb_x
if i0 >= nb_x or i0 < 0 or j0 < 0 or j0 >= nb_y:
z[i] = nan
continue
z[i] = z_g[i0, j0]
return z
[docs]@njit(cache=True, fastmath=True)
def interp2d_bilinear(x_g, y_g, z_g, m_g, x, y):
"""
Bilinear interpolation with wrapping if circular
:param array x_g: coordinates of grid
:param array y_g: coordinates of grid
:param array z_g: Grid value
:param array m_g: Boolean grid, True if value is masked
:param array x: coordinate where interpolate z
:param array y: coordinate where interpolate z
:return: z interpolated
:rtype: array
"""
x_ref = x_g[0]
y_ref = y_g[0]
x_step = x_g[1] - x_ref
y_step = y_g[1] - y_ref
nb_x = x_g.shape[0]
nb_y = y_g.shape[0]
is_circular = abs(x_g[-1] % 360 - (x_g[0] - x_step) % 360) < 1e-5
# Indexes that should never exist
i0_old, j0_old, masked = -100000000, -10000000, False
z = empty(x.shape, dtype=z_g.dtype)
for i in prange(x.size):
x_ = (x[i] - x_ref) / x_step
y_ = (y[i] - y_ref) / y_step
i0 = int(floor(x_))
# To keep original values if wrapping applied to compute xd
i0_ = i0
j0 = int(floor(y_))
# corners are the same need only a new xd and yd
if i0 != i0_old or j0 != j0_old:
i1 = i0 + 1
j1 = j0 + 1
if is_circular:
i0 %= nb_x
i1 %= nb_x
if i1 >= nb_x or i0 < 0 or j0 < 0 or j1 >= nb_y:
masked = True
else:
masked = False
if not masked:
if m_g[i0, j0] or m_g[i0, j1] or m_g[i1, j0] or m_g[i1, j1]:
masked = True
else:
z00, z01, z10, z11 = (
z_g[i0, j0],
z_g[i0, j1],
z_g[i1, j0],
z_g[i1, j1],
)
masked = False
# Need to be stored only on change
i0_old, j0_old = i0, j0
if masked:
z[i] = nan
else:
xd = x_ - i0_
yd = y_ - j0
z[i] = (z00 * (1 - xd) + (z10 * xd)) * (1 - yd) + (
z01 * (1 - xd) + z11 * xd
) * yd
return z
[docs]@njit(cache=True)
def flatten_line_matrix(l_matrix):
"""
Flat matrix and add on between each line.
:param l_matrix: matrix of position
:return: array with nan between line
"""
nb_line, sampling = l_matrix.shape
final_size = (nb_line - 1) + nb_line * sampling
empty_dataset = False
if final_size < 1:
empty_dataset = True
final_size = 1
out = empty(final_size, dtype=l_matrix.dtype)
if empty_dataset:
out[:] = nan
return out
inc = 0
for i in range(nb_line):
for j in range(sampling):
out[inc] = l_matrix[i, j]
inc += 1
out[inc] = nan
inc += 1
return out
[docs]@njit(cache=True)
def simplify(x, y, precision=0.1):
"""
Will remove all middle/end points closer than precision.
:param array x:
:param array y:
:param float precision: if two points have distance inferior to precision we remove next point
:return: (x,y)
:rtype: (array,array)
"""
precision2 = precision**2
nb = x.shape[0]
# will be True for kept values
mask = ones(nb, dtype=bool_)
for j in range(0, nb):
x_previous, y_previous = x[j], y[j]
if isnan(x_previous) or isnan(y_previous):
mask[j] = False
continue
break
# Only nan
if j == (nb - 1):
return zeros(0, dtype=x.dtype), zeros(0, dtype=x.dtype)
last_nan = False
for i in range(j + 1, nb):
x_, y_ = x[i], y[i]
if isnan(x_) or isnan(y_):
if last_nan:
mask[i] = False
else:
last_nan = True
continue
last_nan = False
d_x = x_ - x_previous
if d_x > precision:
x_previous, y_previous = x_, y_
continue
d_y = y_ - y_previous
if d_y > precision:
x_previous, y_previous = x_, y_
continue
d2 = d_x**2 + d_y**2
if d2 > precision2:
x_previous, y_previous = x_, y_
continue
mask[i] = False
new_nb = mask.sum()
new_x, new_y = empty(new_nb, dtype=x.dtype), empty(new_nb, dtype=y.dtype)
j = 0
for i in range(nb):
if mask[i]:
new_x[j], new_y[j] = x[i], y[i]
j += 1
return new_x, new_y
[docs]@njit(cache=True)
def split_line(x, y, i):
"""
Split x and y at each i change.
:param x: array
:param y: array
:param i: array of int at each i change, we cut x, y
:return: x and y separated by nan at each i jump
"""
nb_jump = len(where(i[1:] - i[:-1] != 0)[0])
nb_value = x.shape[0]
final_size = (nb_jump - 1) + nb_value
new_x = empty(final_size, dtype=x.dtype)
new_y = empty(final_size, dtype=y.dtype)
new_j = 0
for j in range(nb_value):
new_x[new_j] = x[j]
new_y[new_j] = y[j]
new_j += 1
if j < (nb_value - 1) and i[j] != i[j + 1]:
new_x[new_j] = nan
new_y[new_j] = nan
new_j += 1
return new_x, new_y
[docs]@njit(cache=True)
def wrap_longitude(x, y, ref, cut=False):
"""
Will wrap contiguous longitude with reference as western boundary.
:param array x:
:param array y:
:param float ref: longitude of reference, all the new values will be between ref and ref + 360
:param bool cut: if True line will be cut at the bounds
:return: lon,lat
:rtype: (array,array)
"""
if cut:
indexs = list()
nb = x.shape[0]
new_x_previous = (x[0] - ref) % 360 + ref
x_previous = x[0]
for i in range(1, nb):
x_ = x[i]
new_x = (x_ - ref) % 360 + ref
if not isnan(x_) and not isnan(x_previous):
d_new = new_x - new_x_previous
d = x_ - x_previous
if abs(d - d_new) > 1e-5:
indexs.append(i)
x_previous, new_x_previous = x_, new_x
nb_indexs = len(indexs)
new_size = nb + nb_indexs * 3
out_x = empty(new_size, dtype=x.dtype)
out_y = empty(new_size, dtype=y.dtype)
i_ = 0
j = 0
for i in range(nb):
if j < nb_indexs and i == indexs[j]:
j += 1
# FIXME need check
cor = 360 if x[i - 1] > x[i] else -360
out_x[i + i_] = (x[i] - ref) % 360 + ref - cor
out_y[i + i_] = y[i]
out_x[i + i_ + 1] = nan
out_y[i + i_ + 1] = nan
out_x[i + i_ + 2] = (x[i - 1] - ref) % 360 + ref + cor
out_y[i + i_ + 2] = y[i - 1]
i_ += 3
out_x[i + i_] = (x[i] - ref) % 360 + ref
out_y[i + i_] = y[i]
return out_x, out_y
else:
nb = x.shape[0]
out = empty(nb, dtype=x.dtype)
for i in range(nb):
out[i] = (x[i] - ref) % 360 + ref
return out, y
[docs]@njit(cache=True, fastmath=True)
def coordinates_to_local(lon, lat, lon0, lat0):
"""
Take latlong coordinates to transform in local coordinates (in m).
:param array x: coordinates to transform
:param array y: coordinates to transform
:param float lon0: longitude of local reference
:param float lat0: latitude of local reference
:return: x,y
:retype: (array, array)
"""
D2R = pi / 180.0
R = 6370997
dlon = (lon - lon0) * D2R
sin_dlat = sin((lat - lat0) * 0.5 * D2R)
sin_dlon = sin(dlon * 0.5)
cos_lat0 = cos(lat0 * D2R)
cos_lat = cos(lat * D2R)
a_val = sin_dlon**2 * cos_lat0 * cos_lat + sin_dlat**2
module = R * 2 * arctan2(a_val**0.5, (1 - a_val) ** 0.5)
azimuth = pi / 2 - arctan2(
cos_lat * sin(dlon),
cos_lat0 * sin(lat * D2R) - sin(lat0 * D2R) * cos_lat * cos(dlon),
)
return module * cos(azimuth), module * sin(azimuth)
[docs]@njit(cache=True, fastmath=True)
def local_to_coordinates(x, y, lon0, lat0):
"""
Take local coordinates (in m) to transform to latlong.
:param array x: coordinates to transform
:param array y: coordinates to transform
:param float lon0: longitude of local reference
:param float lat0: latitude of local reference
:return: lon,lat
:retype: (array, array)
"""
D2R = pi / 180.0
R = 6370997
d = (x**2 + y**2) ** 0.5 / R
a = -(arctan2(y, x) - pi / 2)
lat = arcsin(sin(lat0 * D2R) * cos(d) + cos(lat0 * D2R) * sin(d) * cos(a))
lon = (
lon0
+ arctan2(
sin(a) * sin(d) * cos(lat0 * D2R), cos(d) - sin(lat0 * D2R) * sin(lat)
)
/ D2R
)
return lon, lat / D2R
[docs]@njit(cache=True, fastmath=True)
def nearest_grd_indice(x, y, x0, y0, xstep, ystep):
"""
Get nearest grid index from a position.
:param x: longitude
:param y: latitude
:param float x0: first grid longitude
:param float y0: first grid latitude
:param float xstep: step between two longitude
:param float ystep: step between two latitude
"""
return (
numba_types.int32(round(((x - x0[0]) % 360.0) / xstep)),
numba_types.int32(round((y - y0[0]) / ystep)),
)
[docs]@njit(cache=True)
def bbox_indice_regular(vertices, x0, y0, xstep, ystep, N, circular, x_size):
"""
Get bbox index of a contour in a regular grid.
:param vertices: vertice of contour
:param float x0: first grid longitude
:param float y0: first grid latitude
:param float xstep: step between two longitude
:param float ystep: step between two latitude
:param int N: shift of index to enlarge window
:param bool circular: To know if grid is wrappable
:param int x_size: Number of longitude
"""
lon, lat = vertices[:, 0], vertices[:, 1]
lon_min, lon_max = lon.min(), lon.max()
lat_min, lat_max = lat.min(), lat.max()
i_x0, i_y0 = nearest_grd_indice(lon_min, lat_min, x0, y0, xstep, ystep)
i_x1, i_y1 = nearest_grd_indice(lon_max, lat_max, x0, y0, xstep, ystep)
if circular:
slice_x = (i_x0 - N) % x_size, (i_x1 + N + 1) % x_size
else:
slice_x = max(i_x0 - N, 0), i_x1 + N + 1
slice_y = i_y0 - N, i_y1 + N + 1
return slice_x, slice_y
[docs]def build_circle(x0, y0, r):
"""
Build circle from center coordinates.
:param float x0: center coordinate
:param float y0: center coordinate
:param float r: radius i meter
:return: x,y
:rtype: (array,array)
"""
angle = radians(linspace(0, 360, 50))
x_norm, y_norm = cos(angle), sin(angle)
return x_norm * r + x0, y_norm * r + y0
[docs]@njit(cache=True)
def window_index(x, x0, half_window=1):
"""
Give for a fixed half_window each start and end index for each x0, in
an unsorted array.
:param array x: array of value
:param array x0: array of window center
:param float half_window: half window
"""
# Sort array, bounds will be sort also
i_ordered = x.argsort()
nb_x, nb_pt = x.size, x0.size
first_index = empty(nb_pt, dtype=i_ordered.dtype)
last_index = empty(nb_pt, dtype=i_ordered.dtype)
# First bound to find
j_min, j_max = 0, 0
x_min = x0[j_min] - half_window
x_max = x0[j_max] + half_window
# We iterate on ordered x
for i, i_x in enumerate(i_ordered):
x_ = x[i_x]
# if x bigger than x_min , we found bound and search next one
while x_ > x_min and j_min < nb_pt:
first_index[j_min] = i
j_min += 1
x_min = x0[j_min] - half_window
# if x bigger than x_max , we found bound and search next one
while x_ > x_max and j_max < nb_pt:
last_index[j_max] = i
j_max += 1
x_max = x0[j_max] + half_window
if j_max == nb_pt:
break
for i in range(j_min, nb_pt):
first_index[i] = nb_x
for i in range(j_max, nb_pt):
last_index[i] = nb_x
return i_ordered, first_index, last_index