from abc import ABC, abstractmethod
import logging
from numba import njit, types as nb_types
from numpy import arange, full, int32, interp, isnan, median, where, zeros
from ..generic import window_index
from ..poly import create_meshed_particles, poly_indexs
from .observation import EddiesObservations
logger = logging.getLogger("pet")
[docs]@njit(cache=True)
def get_missing_indices(
array_time, array_track, dt=1, flag_untrack=True, indice_untrack=0
):
"""Return indexes where values are missing
:param np.array(int) array_time : array of strictly increasing int representing time
:param np.array(int) array_track: N° track where observations belong
:param int,float dt: theorical timedelta between 2 observations
:param bool flag_untrack: if True, ignore observations where n°track equal `indice_untrack`
:param int indice_untrack: n° representing where observations are untracked
ex : array_time = np.array([67, 68, 70, 71, 74, 75])
array_track= np.array([ 1, 1, 1, 1, 1, 1])
return : np.array([2, 4, 4])
"""
t0 = array_time[0]
t1 = t0
tr0 = array_track[0]
tr1 = tr0
nbr_step = zeros(array_time.shape, dtype=int32)
for i in range(array_time.size - 1):
t0 = t1
tr0 = tr1
t1 = array_time[i + 1]
tr1 = array_track[i + 1]
if flag_untrack & (tr1 == indice_untrack):
continue
if tr1 != tr0:
continue
diff = t1 - t0
if diff > dt:
nbr_step[i] = int(diff / dt) - 1
indices = zeros(nbr_step.sum(), dtype=int32)
j = 0
for i in range(array_time.size - 1):
nbr_missing = nbr_step[i]
if nbr_missing != 0:
for k in range(nbr_missing):
indices[j] = i + 1
j += 1
return indices
[docs]def advect(x, y, c, t0, n_days, u_name="u", v_name="v"):
"""
Advect particles from t0 to t0 + n_days, with data cube.
:param np.array(float) x: longitude of particles
:param np.array(float) y: latitude of particles
:param `~py_eddy_tracker.dataset.grid.GridCollection` c: GridCollection with speed for particles
:param int t0: julian day of advection start
:param int n_days: number of days to advect
:param str u_name: variable name for u component
:param str v_name: variable name for v component
"""
kw = dict(nb_step=6, time_step=86400 / 6)
if n_days < 0:
kw["backward"] = True
n_days = -n_days
p = c.advect(x, y, u_name=u_name, v_name=v_name, t_init=t0, **kw)
for _ in range(n_days):
t, x, y = p.__next__()
return t, x, y
[docs]def particle_candidate_step(
t_start, contours_start, contours_end, space_step, dt, c, day_fraction=6, **kwargs
):
"""Select particles within eddies, advect them, return target observation and associated percentages.
For one time step.
:param int t_start: julian day of the advection
:param (np.array(float),np.array(float)) contours_start: origin contour
:param (np.array(float),np.array(float)) contours_end: destination contour
:param float space_step: step between 2 particles
:param int dt: duration of advection
:param `~py_eddy_tracker.dataset.grid.GridCollection` c: GridCollection with speed for particles
:param int day_fraction: fraction of day
:params dict kwargs: dict of params given to advection
:return (np.array,np.array): return target index and percent associate
"""
# In case of zarr array
contours_start = [i[:] for i in contours_start]
contours_end = [i[:] for i in contours_end]
# Create particles in start contour
x, y, i_start = create_meshed_particles(*contours_start, space_step)
# Advect particles
kw = dict(nb_step=day_fraction, time_step=86400 / day_fraction)
p = c.advect(x, y, t_init=t_start, **kwargs, **kw)
for _ in range(abs(dt)):
_, x, y = p.__next__()
m = ~(isnan(x) + isnan(y))
i_end = full(x.shape, -1, dtype="i4")
if m.any():
# Id eddies for each alive particle in start contour
i_end[m] = poly_indexs(x[m], y[m], *contours_end)
shape = (contours_start[0].shape[0], 2)
# Get target for each contour
i_target, pct_target = full(shape, -1, dtype="i4"), zeros(shape, dtype="f8")
nb_end = contours_end[0].shape[0]
get_targets(i_start, i_end, i_target, pct_target, nb_end)
return i_target, pct_target.astype("i1")
[docs]def particle_candidate(
c,
eddies,
step_mesh,
t_start,
i_target,
pct,
contour_start="speed",
contour_end="effective",
**kwargs
):
"""Select particles within eddies, advect them, return target observation and associated percentages
:param `~py_eddy_tracker.dataset.grid.GridCollection` c: GridCollection with speed for particles
:param GroupEddiesObservations eddies: GroupEddiesObservations considered
:param int t_start: julian day of the advection
:param np.array(int) i_target: corresponding obs where particles are advected
:param np.array(int) pct: corresponding percentage of avected particles
:param str contour_start: contour where particles are injected
:param str contour_end: contour where particles are counted after advection
:params dict kwargs: dict of params given to `advect`
"""
# Obs from initial time
m_start = eddies.time == t_start
e = eddies.extract_with_mask(m_start)
# to be able to get global index
translate_start = where(m_start)[0]
# Create particles in specified contour
intern = False if contour_start == "effective" else True
x, y, i_start = e.create_particles(step_mesh, intern=intern)
# Advection
t_end, x, y = advect(x, y, c, t_start, **kwargs)
# eddies at last date
m_end = eddies.time == t_end / 86400
e_end = eddies.extract_with_mask(m_end)
# to be able to get global index
translate_end = where(m_end)[0]
# Id eddies for each alive particle in specified contour
intern = False if contour_end == "effective" else True
i_end = e_end.contains(x, y, intern=intern)
# compute matrix and fill target array
get_matrix(i_start, i_end, translate_start, translate_end, i_target, pct)
[docs]@njit(cache=True)
def get_targets(i_start, i_end, i_target, pct, nb_end):
"""Compute target observation and associated percentages
:param array(int) i_start: indices in time 0
:param array(int) i_end: indices in time N
:param array(int) i_target: corresponding obs where particles are advected
:param array(int) pct: corresponding percentage of avected particles
:param int nb_end: number of contour at time N
"""
nb_start = i_target.shape[0]
# Matrix which will store count for every couple
counts = zeros((nb_start, nb_end), dtype=nb_types.int32)
# Number of particles in each origin observation
ref = zeros(nb_start, dtype=nb_types.int32)
# For each particle
for i in range(i_start.size):
i_end_ = i_end[i]
i_start_ = i_start[i]
ref[i_start_] += 1
if i_end_ != -1:
counts[i_start_, i_end_] += 1
# From i to j
for i in range(nb_start):
for j in range(nb_end):
count = counts[i, j]
if count == 0:
continue
pct_ = count / ref[i] * 100
pct_0 = pct[i, 0]
# If percent is higher than previous stored in rank 0
if pct_ > pct_0:
pct[i, 1] = pct_0
pct[i, 0] = pct_
i_target[i, 1] = i_target[i, 0]
i_target[i, 0] = j
# If percent is higher than previous stored in rank 1
elif pct_ > pct[i, 1]:
pct[i, 1] = pct_
i_target[i, 1] = j
[docs]@njit(cache=True)
def get_matrix(i_start, i_end, translate_start, translate_end, i_target, pct):
"""Compute target observation and associated percentages
:param np.array(int) i_start: indices of associated contours at starting advection day
:param np.array(int) i_end: indices of associated contours after advection
:param np.array(int) translate_start: corresponding global indices at starting advection day
:param np.array(int) translate_end: corresponding global indices after advection
:param np.array(int) i_target: corresponding obs where particles are advected
:param np.array(int) pct: corresponding percentage of avected particles
"""
nb_start, nb_end = translate_start.size, translate_end.size
# Matrix which will store count for every couple
count = zeros((nb_start, nb_end), dtype=nb_types.int32)
# Number of particles in each origin observation
ref = zeros(nb_start, dtype=nb_types.int32)
# For each particle
for i in range(i_start.size):
i_end_ = i_end[i]
i_start_ = i_start[i]
if i_end_ != -1:
count[i_start_, i_end_] += 1
ref[i_start_] += 1
for i in range(nb_start):
for j in range(nb_end):
pct_ = count[i, j]
# If there are particles from i to j
if pct_ != 0:
# Get percent
pct_ = pct_ / ref[i] * 100.0
# Get indices in full dataset
i_, j_ = translate_start[i], translate_end[j]
pct_0 = pct[i_, 0]
if pct_ > pct_0:
pct[i_, 1] = pct_0
pct[i_, 0] = pct_
i_target[i_, 1] = i_target[i_, 0]
i_target[i_, 0] = j_
elif pct_ > pct[i_, 1]:
pct[i_, 1] = pct_
i_target[i_, 1] = j_
return i_target, pct
[docs]class GroupEddiesObservations(EddiesObservations, ABC):
[docs] @abstractmethod
def fix_next_previous_obs(self):
pass
[docs] @abstractmethod
def get_missing_indices(self, dt):
"Find indexes where observations are missing"
pass
[docs] def filled_by_interpolation(self, mask):
"""Fill selected values by interpolation
:param array(bool) mask: True if must be filled by interpolation
.. minigallery:: py_eddy_tracker.TrackEddiesObservations.filled_by_interpolation
"""
if self.track.size == 0:
return
nb_filled = mask.sum()
logger.info("%d obs will be filled (unobserved)", nb_filled)
nb_obs = len(self)
index = arange(nb_obs)
for field in self.fields:
if (
field in ["n", "virtual", "track", "cost_association"]
or field in self.array_variables
):
continue
self.obs[field][mask] = interp(
index[mask], index[~mask], self.obs[field][~mask]
)
[docs] def insert_virtual(self):
"""Insert virtual observations on segments where observations are missing"""
dt_theorical = median(self.time[1:] - self.time[:-1])
indices = self.get_missing_indices(dt_theorical)
logger.info("%d virtual observation will be added", indices.size)
# new observations size
size_obs_corrected = self.time.size + indices.size
# correction of indexes for new size
indices_corrected = indices + arange(indices.size)
# creating mask with indexes
mask = zeros(size_obs_corrected, dtype=bool)
mask[indices_corrected] = 1
new_TEO = self.new_like(self, size_obs_corrected)
new_TEO.obs[~mask] = self.obs
new_TEO.filled_by_interpolation(mask)
new_TEO.virtual[:] = mask
new_TEO.fix_next_previous_obs()
return new_TEO
[docs] def keep_tracks_by_date(self, date, nb_days):
"""
Find tracks that exist at date `date` and lasted at least `nb_days` after.
:param int,float date: date where the tracks must exist
:param int,float nb_days: number of times the tracks must exist. Can be negative
If nb_days is negative, it searches a track that exists at the date,
but existed at least `nb_days` before the date
"""
time = self.time
mask = zeros(time.shape, dtype=bool)
for i, b0, b1 in self.iter_on(self.tracks):
_time = time[i]
if date in _time and (date + nb_days) in _time:
mask[i] = True
return self.extract_with_mask(mask)
[docs] def particle_candidate_atlas(
self, cube, space_step, dt, start_intern=False, end_intern=False, callback_coherence=None, finalize_coherence=None, **kwargs
):
"""Select particles within eddies, advect them, return target observation and associated percentages
:param `~py_eddy_tracker.dataset.grid.GridCollection` cube: GridCollection with speed for particles
:param float space_step: step between 2 particles
:param int dt: duration of advection
:param bool start_intern: Use intern or extern contour at injection, defaults to False
:param bool end_intern: Use intern or extern contour at end of advection, defaults to False
:param dict kwargs: dict of params given to advection
:param func callback_coherence: if None we will use cls.fill_coherence
:param func finalize_coherence: to apply on results of callback_coherence
:return (np.array,np.array): return target index and percent associate
"""
t_start, t_end = int(self.period[0]), int(self.period[1])
# Pre-compute to get time index
i_sort, i_start, i_end = window_index(
self.time, arange(t_start, t_end + 1), 0.5
)
# Out shape
shape = (len(self), 2)
i_target, pct = full(shape, -1, dtype="i4"), zeros(shape, dtype="i1")
# Backward or forward
times = arange(t_start, t_end - dt) if dt > 0 else arange(t_start - dt, t_end)
if callback_coherence is None:
callback_coherence = self.fill_coherence
indexs = dict()
results = list()
kw_coherence = dict(space_step=space_step, dt=dt, c=cube)
kw_coherence.update(kwargs)
for t in times:
logger.info("Coherence for time step : %s in [%s:%s]", t, times[0], times[-1])
# Get index for origin
i = t - t_start
indexs0 = i_sort[i_start[i] : i_end[i]]
# Get index for end
i = t + dt - t_start
indexs1 = i_sort[i_start[i] : i_end[i]]
if indexs0.size == 0 or indexs1.size == 0:
continue
results.append(callback_coherence(self, i_target, pct, indexs0, indexs1, start_intern, end_intern, t_start=t, **kw_coherence))
indexs[results[-1]] = indexs0, indexs1
if finalize_coherence is not None:
finalize_coherence(results, indexs, i_target, pct)
return i_target, pct
[docs] @classmethod
def fill_coherence(cls, network, i_targets, percents, i_origin, i_end, start_intern, end_intern, **kwargs):
"""_summary_
:param array i_targets: global target
:param array percents:
:param array i_origin: indices of origins
:param array i_end: indices of ends
:param bool start_intern: Use intern or extern contour at injection
:param bool end_intern: Use intern or extern contour at end of advection
"""
# Get contour data
contours_start = [network[label][i_origin] for label in cls.intern(start_intern)]
contours_end = [network[label][i_end] for label in cls.intern(end_intern)]
# Compute local coherence
i_local_targets, local_percents = particle_candidate_step(contours_start=contours_start, contours_end=contours_end,**kwargs)
# Store
cls.merge_particle_result(i_targets, percents, i_local_targets, local_percents, i_origin, i_end)
[docs] @staticmethod
def merge_particle_result(i_targets, percents, i_local_targets, local_percents, i_origin, i_end):
"""Copy local result in merged result with global indexation
:param array i_targets: global target
:param array percents:
:param array i_local_targets: local index target
:param array local_percents:
:param array i_origin: indices of origins
:param array i_end: indices of ends
"""
m = i_local_targets != -1
i_local_targets[m] = i_end[i_local_targets[m]]
i_targets[i_origin] = i_local_targets
percents[i_origin] = local_percents