# -*- coding: utf-8 -*-
"""
Base class to manage eddy observation
"""
from datetime import datetime
from io import BufferedReader, BytesIO
import logging
from tarfile import ExFileObject
from tokenize import TokenError
from Polygon import Polygon
from matplotlib.cm import get_cmap
from matplotlib.collections import LineCollection, PolyCollection
from matplotlib.colors import Normalize
from netCDF4 import Dataset
from numba import njit, types as numba_types
from numpy import (
absolute,
arange,
array,
array_equal,
ceil,
concatenate,
cos,
datetime64,
digitize,
empty,
errstate,
floor,
histogram,
histogram2d,
in1d,
isnan,
linspace,
ma,
nan,
ndarray,
ones,
percentile,
radians,
sin,
unique,
where,
zeros,
)
import packaging.version
from pint import UnitRegistry
from pint.errors import UndefinedUnitError
import zarr
from .. import VAR_DESCR, VAR_DESCR_inv, __version__
from ..generic import (
bbox_indice_regular,
build_index,
distance,
distance_grid,
flatten_line_matrix,
hist_numba,
local_to_coordinates,
reverse_index,
window_index,
wrap_longitude,
)
from ..poly import (
bbox_intersection,
close_center,
convexs,
create_meshed_particles,
create_vertice,
get_pixel_in_regular,
insidepoly,
poly_indexs,
reduce_size,
vertice_overlap,
)
logger = logging.getLogger("pet")
# keep only major and minor version number
_software_version_reduced = packaging.version.Version(
"{v.major}.{v.minor}".format(v=packaging.version.parse(__version__))
)
_display_check_warning = True
def _check_versions(version):
"""Check if version of py_eddy_tracker used to create the file is compatible with software version
if not, warn user with both versions
:param version: string version of software used to create the file. If None, version was not provided
:type version: str, None
"""
if not _display_check_warning:
return
file_version = packaging.version.parse(version) if version is not None else None
if file_version is None or file_version < _software_version_reduced:
logger.warning(
"File was created with py-eddy-tracker version '%s' but software version is '%s'",
file_version,
_software_version_reduced,
)
[docs]@njit(cache=True, fastmath=True)
def shifted_ellipsoid_degrees_mask2(lon0, lat0, lon1, lat1, minor=1.5, major=1.5):
"""
Work only if major is an array but faster * 6
"""
# c = (major ** 2 - minor ** 2) ** .5 + major
c = major
major = minor + 0.5 * (major - minor)
# r=.5*(c-c0)
# a=c0+r
# Focal
f_right = lon0
f_left = f_right - (c - minor)
# Ellipse center
x_c = (f_left + f_right) * 0.5
nb_0, nb_1 = lat0.shape[0], lat1.shape[0]
m = empty((nb_0, nb_1), dtype=numba_types.bool_)
for j in range(nb_0):
for i in range(nb_1):
dy = absolute(lat1[i] - lat0[j])
if dy > minor:
m[j, i] = False
continue
dx = absolute(lon1[i] - x_c[j])
if dx > 180:
dx = absolute((dx + 180) % 360 - 180)
if dx > major[j]:
m[j, i] = False
continue
d_normalize = dx**2 / major[j] ** 2 + dy**2 / minor**2
m[j, i] = d_normalize < 1.0
return m
[docs]class Table(object):
def __init__(self, values):
self.values = values
def _repr_html_(self):
rows = list()
if isinstance(self.values, ndarray):
row = "\n".join([f"<td>{v}</td >" for v in self.values.dtype.names])
rows.append(f"<tr>{row}</tr>")
for row in self.values:
row = "\n".join([f"<td>{v}</td >" for v in row])
rows.append(f"<tr>{row}</tr>")
rows = "\n".join(rows)
return (
f'<font size="2">'
f'<table class="docutils align-default">'
f"{rows}"
f"</table>"
f"</font>"
)
[docs]class EddiesObservations(object):
"""
Class to store eddy observations.
"""
__slots__ = (
"track_extra_variables",
"track_array_variables",
"array_variables",
"only_variables",
"observations",
"sign_type",
"raw_data",
"period_",
)
ELEMENTS = [
"lon",
"lat",
"radius_s",
"radius_e",
"amplitude",
"speed_average",
"time",
"shape_error_e",
"shape_error_s",
"speed_area",
"effective_area",
"nb_contour_selected",
"num_point_e",
"num_point_s",
"height_max_speed_contour",
"height_external_contour",
"height_inner_contour",
]
COLORS = [
"sienna",
"red",
"darkorange",
"gold",
"palegreen",
"limegreen",
"forestgreen",
"mediumblue",
"dodgerblue",
"lightskyblue",
"violet",
"blueviolet",
"darkmagenta",
"darkgrey",
"dimgrey",
"steelblue",
]
NB_COLORS = len(COLORS)
def __init__(
self,
size=0,
track_extra_variables=None,
track_array_variables=0,
array_variables=None,
only_variables=None,
raw_data=False,
):
self.period_ = None
self.only_variables = only_variables
self.raw_data = raw_data
self.track_extra_variables = (
track_extra_variables if track_extra_variables is not None else []
)
self.track_array_variables = track_array_variables
self.array_variables = array_variables if array_variables is not None else []
for elt in self.elements:
if elt not in VAR_DESCR:
raise Exception("Unknown element : %s" % elt)
self.observations = zeros(size, dtype=self.dtype)
self.sign_type = None
@property
def tracks(self):
return self.track
def __eq__(self, other):
if self.sign_type != other.sign_type:
return False
if self.dtype != other.dtype:
return False
return array_equal(self.obs, other.obs)
[docs] def get_color(self, i):
"""Return colors as a cyclic list"""
return self.COLORS[i % self.NB_COLORS]
@property
def sign_legend(self):
return "Cyclonic" if self.sign_type != 1 else "Anticyclonic"
@property
def shape(self):
return self.observations.shape
[docs] def get_infos(self):
infos = dict(
bins_lat=(-90, -60, -15, 15, 60, 90),
bins_amplitude=array((0, 1, 2, 3, 4, 5, 10, 500)),
bins_radius=array((0, 15, 30, 45, 60, 75, 100, 200, 2000)),
nb_obs=len(self),
)
t0, t1 = self.period
infos["t0"], infos["t1"] = t0, t1
infos["period"] = t1 - t0 + 1
return infos
def _repr_html_(self):
infos = self.get_infos()
return f"""<b>{infos['nb_obs']} observations from {infos['t0']} to {infos['t1']} </b>"""
[docs] def parse_varname(self, name):
return self[name] if isinstance(name, str) else name
[docs] def hist(self, varname, x, bins, percent=False, mean=False, nb=False):
"""Build histograms.
:param str,array varname: variable to use to compute stat
:param str,array x: variable to use to know in which bins
:param array bins:
:param bool percent: normalized by sum of all bins
:param bool mean: compute mean by bins
:param bool nb: only count by bins
:return: value by bins
:rtype: array
"""
x = self.parse_varname(x)
if nb:
v = hist_numba(x, bins=bins)[0]
else:
v = histogram(x, bins=bins, weights=self.parse_varname(varname))[0]
if percent:
v = v.astype("f4") / v.sum() * 100
elif mean:
v /= hist_numba(x, bins=bins)[0]
return v
[docs] @staticmethod
def box_display(value):
"""Return values evenly spaced with few numbers"""
return "".join([f"{v_:10.2f}" for v_ in value])
@property
def fields(self):
return list(self.obs.dtype.names)
[docs] def field_table(self):
"""
Produce description table of the fields available in this object
"""
rows = [("Name (Unit)", "Long name", "Scale factor", "Offset")]
names = self.fields
names.sort()
for field in names:
infos = VAR_DESCR[field]
rows.append(
(
f"{infos.get('nc_name', field)} ({infos['nc_attr'].get('units', '')})",
infos["nc_attr"].get("long_name", "").capitalize(),
infos.get("scale_factor", ""),
infos.get("add_offset", ""),
)
)
return Table(rows)
def __repr__(self):
"""
Return general informations on dataset as strings.
:return: informations on datasets
:rtype: str
"""
t0, t1 = self.period
period = t1 - t0 + 1
bins_lat = (-90, -60, -15, 15, 60, 90)
bins_amplitude = array((0, 1, 2, 3, 4, 5, 10, 500))
bins_radius = array((0, 15, 30, 45, 60, 75, 100, 200, 2000))
nb_obs = len(self)
return f""" | {nb_obs} observations from {t0} to {t1} ({period} days, ~{nb_obs / period:.0f} obs/day)
| Speed area : {self.speed_area.sum() / period / 1e12:.2f} MkmĀ²/day
| Effective area : {self.effective_area.sum() / period / 1e12:.2f} MkmĀ²/day
----Distribution in Amplitude:
| Amplitude bounds (cm) {self.box_display(bins_amplitude)}
| Percent of eddies : {
self.box_display(self.hist('time', 'amplitude', bins_amplitude / 100., percent=True, nb=True))}
----Distribution in Radius:
| Speed radius (km) {self.box_display(bins_radius)}
| Percent of eddies : {
self.box_display(self.hist('time', 'radius_s', bins_radius * 1000., percent=True, nb=True))}
| Effective radius (km) {self.box_display(bins_radius)}
| Percent of eddies : {
self.box_display(self.hist('time', 'radius_e', bins_radius * 1000., percent=True, nb=True))}
----Distribution in Latitude
Latitude bounds {self.box_display(bins_lat)}
Percent of eddies : {self.box_display(self.hist('time', 'lat', bins_lat, percent=True, nb=True))}
Percent of speed area : {self.box_display(self.hist('speed_area', 'lat', bins_lat, percent=True))}
Percent of effective area : {self.box_display(self.hist('effective_area', 'lat', bins_lat, percent=True))}
Mean speed radius (km) : {self.box_display(self.hist('radius_s', 'lat', bins_lat, mean=True) / 1000.)}
Mean effective radius (km): {self.box_display(self.hist('radius_e', 'lat', bins_lat, mean=True) / 1000.)}
Mean amplitude (cm) : {self.box_display(self.hist('amplitude', 'lat', bins_lat, mean=True) * 100.)}"""
def __dir__(self):
"""Provide method name lookup and completion."""
base = set(dir(type(self)))
intern_name = set(self.elements)
extern_name = set([VAR_DESCR[k]["nc_name"] for k in intern_name])
# Must be check in init not here
if base & intern_name:
logger.warning(
"Some variable name have a common name with class attrs: %s",
base & intern_name,
)
if base & extern_name:
logger.warning(
"Some variable name have a common name with class attrs: %s",
base & extern_name,
)
return sorted(base.union(intern_name).union(extern_name))
def __getitem__(self, attr: str):
if attr in self.elements:
return self.obs[attr]
elif attr in VAR_DESCR_inv:
return self.obs[VAR_DESCR_inv[attr]]
elif attr in ("lifetime", "age"):
return getattr(self, attr)
raise KeyError("%s unknown" % attr)
def __getattr__(self, attr):
if attr in self.elements:
return self.obs[attr]
elif attr in VAR_DESCR_inv:
return self.obs[VAR_DESCR_inv[attr]]
raise AttributeError(
"{!r} object has no attribute {!r}".format(type(self).__name__, attr)
)
[docs] @classmethod
def needed_variable(cls):
return None
[docs] @classmethod
def obs_dimension(cls, handler):
for candidate in ("obs", "Nobs", "observation", "i"):
if candidate in handler.dimensions.keys():
return candidate
[docs] def remove_fields(self, *fields):
"""
Copy with fields listed remove
"""
nb_obs = len(self)
fields = set(fields)
only_variables = set(self.fields) - fields
track_extra_variables = set(self.track_extra_variables) - fields
array_variables = set(self.array_variables) - fields
new = self.__class__(
size=nb_obs,
track_extra_variables=track_extra_variables,
track_array_variables=self.track_array_variables,
array_variables=array_variables,
only_variables=only_variables,
raw_data=self.raw_data,
)
new.sign_type = self.sign_type
for name in new.fields:
logger.debug("Copy of field %s ...", name)
new.obs[name] = self.obs[name]
return new
[docs] def add_fields(self, fields=list(), array_fields=list()):
"""
Add a new field.
"""
nb_obs = len(self)
new = self.__class__(
size=nb_obs,
track_extra_variables=list(
concatenate((self.track_extra_variables, fields))
),
track_array_variables=self.track_array_variables,
array_variables=list(concatenate((self.array_variables, array_fields))),
only_variables=list(concatenate((self.fields, fields, array_fields))),
raw_data=self.raw_data,
)
new.sign_type = self.sign_type
for name in self.fields:
logger.debug("Copy of field %s ...", name)
new.obs[name] = self.obs[name]
return new
[docs] def add_rotation_type(self):
new = self.add_fields(("type_cyc",))
new.type_cyc[:] = self.sign_type
return new
[docs] def circle_contour(self, only_virtual=False, factor=1):
"""
Set contours as circles from radius and center data.
.. minigallery:: py_eddy_tracker.EddiesObservations.circle_contour
"""
angle = radians(linspace(0, 360, self.track_array_variables))
x_norm, y_norm = cos(angle), sin(angle)
radius_s = "contour_lon_s" in self.fields
radius_e = "contour_lon_e" in self.fields
for i, obs in enumerate(self):
if only_virtual and not obs["virtual"]:
continue
x, y = obs["lon"], obs["lat"]
if radius_s:
r_s = obs["radius_s"] * factor
obs["contour_lon_s"], obs["contour_lat_s"] = local_to_coordinates(
x_norm * r_s, y_norm * r_s, x, y
)
if radius_e:
r_e = obs["radius_e"] * factor
obs["contour_lon_e"], obs["contour_lat_e"] = local_to_coordinates(
x_norm * r_e, y_norm * r_e, x, y
)
@property
def dtype(self):
"""Return dtype to build numpy array."""
dtype = list()
for elt in self.elements:
data_type = (
VAR_DESCR[elt].get("compute_type", VAR_DESCR[elt].get("nc_type"))
if not self.raw_data
else VAR_DESCR[elt]["output_type"]
)
if elt in self.array_variables:
dtype.append((elt, data_type, (self.track_array_variables,)))
else:
dtype.append((elt, data_type))
return dtype
@property
def elements(self):
"""Return all the names of the variables."""
elements = [i for i in self.ELEMENTS]
if self.track_array_variables > 0:
elements += self.array_variables
if len(self.track_extra_variables):
elements += self.track_extra_variables
if self.only_variables is not None:
elements = [i for i in elements if i in self.only_variables]
return list(set(elements))
[docs] def coherence(self, other):
"""Check coherence between two datasets."""
test = self.track_extra_variables == other.track_extra_variables
test *= self.track_array_variables == other.track_array_variables
test *= self.array_variables == other.array_variables
return test
[docs] @classmethod
def concatenate(cls, observations):
nb_obs = 0
ref_obs = observations[0]
for obs in observations:
if not ref_obs.coherence(obs):
raise Exception("Merge of different type of observations")
nb_obs += len(obs)
eddies = cls.new_like(ref_obs, nb_obs)
i = 0
for obs in observations:
nb_obs = len(obs)
eddies.obs[i : i + nb_obs] = obs.obs
i += nb_obs
eddies.sign_type = ref_obs.sign_type
return eddies
[docs] def merge(self, other):
"""Merge two datasets."""
nb_obs_self = len(self)
nb_obs = nb_obs_self + len(other)
eddies = self.new_like(self, nb_obs)
other_keys = other.fields
self_keys = self.fields
for key in eddies.fields:
eddies.obs[key][:nb_obs_self] = self.obs[key][:]
if key in other_keys:
eddies.obs[key][nb_obs_self:] = other.obs[key][:]
if "track" in other_keys and "track" in self_keys:
last_track = eddies.track[nb_obs_self - 1] + 1
eddies.track[nb_obs_self:] += last_track
eddies.sign_type = self.sign_type
return eddies
[docs] def reset(self):
self.observations = zeros(0, dtype=self.dtype)
@property
def obs(self):
"""Return observations."""
return self.observations
def __len__(self):
return len(self.observations)
def __iter__(self):
for obs in self.obs:
yield obs
[docs] def iter_on(self, xname, window=None, bins=None):
"""
Yield observation group for each bin.
:param str,array xname:
:param float,None window: if defined we use a moving window with value like half window
:param array bins: bounds of each bin
:yield array,float,float: index in self, lower bound, upper bound
.. minigallery:: py_eddy_tracker.EddiesObservations.iter_on
"""
x = self.parse_varname(xname)
if window is not None:
x0 = arange(x.min(), x.max()) if bins is None else array(bins)
i_ordered, first_index, last_index = window_index(x, x0, window)
for x_, i0, i1 in zip(x0, first_index, last_index):
yield i_ordered[i0:i1], x_ - window, x_ + window
else:
d = x[1:] - x[:-1]
if bins is None:
bins = arange(x.min(), x.max() + 2)
elif not isinstance(bins, ndarray):
bins = array(bins)
nb_bins = len(bins) - 1
# Not monotonous
if (d < 0).any():
# If bins cover a small part of value
test, translate, x = iter_mode_reduce(x, bins)
# convert value in bins number
i = numba_digitize(x, bins) - 1
# Order by bins
i_sort = i.argsort()
# If in reduced mode we will translate i_sort in full array index
i_sort_ = translate[i_sort] if test else i_sort
# Bound for each bins in sorting view
i0, i1, _ = build_index(i[i_sort])
m = ~(i0 == i1)
i0, i1 = i0[m], i1[m]
for i0_, i1_ in zip(i0, i1):
i_bins = i[i_sort[i0_]]
if i_bins == -1 or i_bins == nb_bins:
continue
yield i_sort_[i0_:i1_], bins[i_bins], bins[i_bins + 1]
else:
i = numba_digitize(x, bins) - 1
i0, i1, _ = build_index(i)
m = ~(i0 == i1)
i0, i1 = i0[m], i1[m]
for i0_, i1_ in zip(i0, i1):
i_bins = i[i0_]
yield slice(i0_, i1_), bins[i_bins], bins[i_bins + 1]
[docs] def align_on(self, other, var_name="time", all_ref=False, **kwargs):
"""
Align the variable indices of two datasets.
:param other: other compare with self
:param str,tuple var_name: variable name to align or two array, defaults to "time"
:param bool all_ref: yield all value of ref, if false only common value, defaults to False
:yield array,array,float,float: index in self, index in other, lower bound, upper bound
.. minigallery:: py_eddy_tracker.EddiesObservations.align_on
"""
if isinstance(var_name, str):
iter_self = self.iter_on(var_name, **kwargs)
iter_other = other.iter_on(var_name, **kwargs)
else:
iter_self = self.iter_on(var_name[0], **kwargs)
iter_other = other.iter_on(var_name[1], **kwargs)
indexs_other, b0_other, b1_other = iter_other.__next__()
for indexs_self, b0_self, b1_self in iter_self:
if b0_self > b0_other:
try:
while b0_other < b0_self:
indexs_other, b0_other, b1_other = iter_other.__next__()
except StopIteration:
break
if b0_self < b0_other:
if all_ref:
yield indexs_self, empty(
0, dtype=indexs_self.dtype
), b0_self, b1_self
continue
yield indexs_self, indexs_other, b0_self, b1_self
[docs] def insert_observations(self, other, index):
"""Insert other obs in self at the given index."""
if not self.coherence(other):
raise Exception("Observations with no coherence")
insert_size = len(other)
self_size = len(self)
new_size = self_size + insert_size
if self_size == 0:
self.observations = other.obs
return self
elif insert_size == 0:
return self
if index < 0:
index = self_size + index + 1
eddies = self.new_like(self, new_size)
eddies.obs[:index] = self.obs[:index]
eddies.obs[index : index + insert_size] = other.obs
eddies.obs[index + insert_size :] = self.obs[index:]
self.observations = eddies.obs
return self
[docs] def append(self, other):
"""Merge."""
return self + other
def __add__(self, other):
return self.insert_observations(other, -1)
[docs] def distance(self, other):
"""Use haversine distance for distance matrix between every self and
other eddies."""
return distance_grid(self.lon, self.lat, other.lon, other.lat)
def __copy__(self):
eddies = self.new_like(self, len(self))
for k in self.fields:
eddies[k][:] = self[k][:]
eddies.sign_type = self.sign_type
return eddies
[docs] def copy(self):
return self.__copy__()
[docs] @classmethod
def new_like(cls, eddies, new_size: int):
return cls(
new_size,
track_extra_variables=eddies.track_extra_variables,
track_array_variables=eddies.track_array_variables,
array_variables=eddies.array_variables,
only_variables=eddies.only_variables,
raw_data=eddies.raw_data,
)
[docs] def index(self, index, reverse=False):
"""Return obs from self at the index."""
if reverse:
index = reverse_index(index, len(self))
size = 1
if hasattr(index, "__iter__"):
size = len(index)
elif isinstance(index, slice):
size = index.stop - index.start
eddies = self.new_like(self, size)
eddies.obs[:] = self.obs[index]
eddies.sign_type = self.sign_type
return eddies
[docs] @staticmethod
def zarr_dimension(filename):
if isinstance(filename, zarr.storage.MutableMapping):
h = filename
else:
h = zarr.open(filename)
dims = list()
for varname in h:
shape = getattr(h, varname).shape
if len(shape) > len(dims):
dims = shape
return dims
[docs] @classmethod
def load_file(cls, filename, **kwargs):
"""
Load the netcdf or the zarr file.
Load only latitude and longitude on the first 300 obs :
.. code-block:: python
kwargs_latlon_300 = dict(
include_vars=[
"longitude",
"latitude",
],
indexs=dict(obs=slice(0, 300)),
)
small_dataset = TrackEddiesObservations.load_file(
filename, **kwargs_latlon_300
)
For `**kwargs` look at :py:meth:`load_from_zarr` or :py:meth:`load_from_netcdf`
"""
filename_ = (
filename.filename if isinstance(filename, ExFileObject) else filename
)
if isinstance(filename, zarr.storage.MutableMapping):
return cls.load_from_zarr(filename, **kwargs)
if isinstance(filename, (bytes, str)):
end = b".zarr" if isinstance(filename_, bytes) else ".zarr"
zarr_file = filename_.endswith(end)
else:
zarr_file = False
logger.info(f"loading file '{filename_}'")
if zarr_file:
return cls.load_from_zarr(filename, **kwargs)
else:
return cls.load_from_netcdf(filename, **kwargs)
[docs] @classmethod
def load_from_zarr(
cls,
filename,
raw_data=False,
remove_vars=None,
include_vars=None,
indexs=None,
buffer_size=5000000,
**class_kwargs,
):
"""Load data from zarr.
:param str,store filename: path or store to load data
:param bool raw_data: If true load data without scale_factor and add_offset
:param None,list(str) remove_vars: List of variable name that will be not loaded
:param None,list(str) include_vars: If defined only this variable will be loaded
:param None,dict indexs: Indexes to load only a slice of data
:param int buffer_size: Size of buffer used to load zarr data
:param class_kwargs: argument to set up observations class
:return: Obsevations selected
:return type: class
"""
# FIXME
if isinstance(filename, zarr.storage.MutableMapping):
h_zarr = filename
else:
if not isinstance(filename, str):
filename = filename.astype(str)
h_zarr = zarr.open(filename)
_check_versions(h_zarr.attrs.get("framework_version", None))
var_list = cls.build_var_list(list(h_zarr.keys()), remove_vars, include_vars)
nb_obs = getattr(h_zarr, var_list[0]).shape[0]
track_array_variables = h_zarr.attrs["track_array_variables"]
if indexs is not None and "obs" in indexs:
sl = indexs["obs"]
sl = slice(sl.start, min(sl.stop, nb_obs))
if sl.stop is not None:
nb_obs = sl.stop
if sl.start is not None:
nb_obs -= sl.start
if sl.step is not None:
indexs["obs"] = slice(sl.start, sl.stop)
logger.warning("step of slice won't be use")
logger.debug("%d observations will be load", nb_obs)
kwargs = dict()
kwargs["track_array_variables"] = h_zarr.attrs.get(
"track_array_variables", track_array_variables
)
array_variables = list()
for variable in var_list:
if len(h_zarr[variable].shape) > 1:
var_inv = VAR_DESCR_inv[variable]
array_variables.append(var_inv)
kwargs["array_variables"] = array_variables
track_extra_variables = []
for variable in var_list:
var_inv = VAR_DESCR_inv[variable]
if var_inv not in cls.ELEMENTS and var_inv not in array_variables:
track_extra_variables.append(var_inv)
kwargs["track_extra_variables"] = track_extra_variables
kwargs["raw_data"] = raw_data
kwargs["only_variables"] = (
None if include_vars is None else [VAR_DESCR_inv[i] for i in include_vars]
)
kwargs.update(class_kwargs)
eddies = cls(size=nb_obs, **kwargs)
for i_var, variable in enumerate(var_list):
var_inv = VAR_DESCR_inv[variable]
logger.debug("%s will be loaded (%d/%d)", variable, i_var, len(var_list))
# find unit factor
input_unit = h_zarr[variable].attrs.get("unit", None)
if input_unit is None:
input_unit = h_zarr[variable].attrs.get("units", None)
output_unit = VAR_DESCR[var_inv]["nc_attr"].get("units", None)
factor = cls.compare_units(input_unit, output_unit, variable)
sl_obs = slice(None) if indexs is None else indexs.get("obs", slice(None))
scale_factor = VAR_DESCR[var_inv].get("scale_factor", None)
add_offset = VAR_DESCR[var_inv].get("add_offset", None)
cls.copy_data_to_zarr(
h_zarr[variable],
eddies.obs[var_inv],
sl_obs,
buffer_size,
factor,
raw_data,
scale_factor,
add_offset,
)
eddies.sign_type = int(h_zarr.attrs.get("rotation_type", 0))
if eddies.sign_type == 0:
logger.debug("File come from another algorithm of identification")
eddies.sign_type = -1
return eddies
[docs] @staticmethod
def copy_data_to_zarr(
handler_zarr,
handler_eddies,
sl_obs,
buffer_size,
factor,
raw_data,
scale_factor,
add_offset,
):
"""
Copy with buffer for zarr.
Zarr need to get real value, and size could be huge, so we use a buffer to manage memory
:param zarr_dataset handler_zarr:
:param array handler_eddies:
:param slice zarr_dataset sl_obs:
:param int buffer_size:
:param float factor:
:param bool raw_data:
:param None,float scale_factor:
:param None,float add_offset:
"""
i_start, i_stop = sl_obs.start, sl_obs.stop
if i_start is None:
i_start = 0
if i_stop is None:
i_stop = handler_zarr.shape[0]
for i in range(i_start, i_stop, buffer_size):
sl_in = slice(i, min(i + buffer_size, i_stop))
data = handler_zarr[sl_in]
if factor != 1:
data *= factor
if raw_data:
if add_offset is not None:
data -= add_offset
if scale_factor is not None:
data /= scale_factor
sl_out = slice(i - i_start, i - i_start + buffer_size)
handler_eddies[sl_out] = data
[docs] @classmethod
def load_from_netcdf(
cls,
filename,
raw_data=False,
remove_vars=None,
include_vars=None,
indexs=None,
**class_kwargs,
):
"""Load data from netcdf.
:param str,ExFileObject filename: path or handler to load data
:param bool raw_data: If true load data without apply scale_factor and add_offset
:param None,list(str) remove_vars: List of variable name which will be not loaded
:param None,list(str) include_vars: If defined only this variable will be loaded
:param None,dict indexs: Indexes to load only a slice of data
:param class_kwargs: argument to set up observations class
:return: Obsevations selected
:return type: class
"""
array_dim = "NbSample"
if isinstance(filename, bytes):
filename = filename.astype(str)
if isinstance(filename, (ExFileObject, BufferedReader, BytesIO)):
filename.seek(0)
args, kwargs = ("in-mem-file",), dict(memory=filename.read())
else:
args, kwargs = (filename,), dict()
with Dataset(*args, **kwargs) as h_nc:
_check_versions(getattr(h_nc, "framework_version", None))
var_list = cls.build_var_list(
list(h_nc.variables.keys()), remove_vars, include_vars
)
obs_dim = cls.obs_dimension(h_nc)
nb_obs = len(h_nc.dimensions[obs_dim])
if indexs is not None and obs_dim in indexs:
sl = indexs[obs_dim]
sl = slice(sl.start, min(sl.stop, nb_obs))
if sl.stop is not None:
nb_obs = sl.stop
if sl.start is not None:
nb_obs -= sl.start
if sl.step is not None:
indexs[obs_dim] = slice(sl.start, sl.stop)
logger.warning("step of slice won't be use")
logger.debug("%d observations will be load", nb_obs)
kwargs = dict()
if array_dim in h_nc.dimensions:
kwargs["track_array_variables"] = len(h_nc.dimensions[array_dim])
kwargs["array_variables"] = list()
for variable in var_list:
if array_dim in h_nc.variables[variable].dimensions:
var_inv = VAR_DESCR_inv[variable]
kwargs["array_variables"].append(var_inv)
array_variables = kwargs.get("array_variables", list())
kwargs["track_extra_variables"] = []
for variable in var_list:
var_inv = VAR_DESCR_inv[variable]
if var_inv not in cls.ELEMENTS and var_inv not in array_variables:
kwargs["track_extra_variables"].append(var_inv)
kwargs["raw_data"] = raw_data
kwargs["only_variables"] = (
None
if include_vars is None
else [VAR_DESCR_inv[i] for i in include_vars]
)
kwargs.update(class_kwargs)
eddies = cls(size=nb_obs, **kwargs)
for variable in var_list:
var_inv = VAR_DESCR_inv[variable]
# Patch
h_nc.variables[variable].set_auto_maskandscale(not raw_data)
logger.debug(
"Up load %s variable%s",
variable,
", with raw mode" if raw_data else "",
)
# find unit factor
factor = 1
if not raw_data:
input_unit = getattr(h_nc.variables[variable], "unit", None)
if input_unit is None:
input_unit = getattr(h_nc.variables[variable], "units", None)
output_unit = VAR_DESCR[var_inv]["nc_attr"].get("units", None)
factor = cls.compare_units(input_unit, output_unit, variable)
if indexs is None:
indexs = dict()
var_sl = [
indexs.get(dim, slice(None))
for dim in h_nc.variables[variable].dimensions
]
if factor != 1:
eddies.obs[var_inv] = h_nc.variables[variable][var_sl] * factor
else:
eddies.obs[var_inv] = h_nc.variables[variable][var_sl]
for variable in var_list:
var_inv = VAR_DESCR_inv[variable]
if var_inv == "type_cyc":
eddies.sign_type = h_nc.variables[variable][0]
if eddies.sign_type is None:
title = getattr(h_nc, "title", None)
if title is None:
eddies.sign_type = getattr(h_nc, "rotation_type", 0)
else:
eddies.sign_type = -1 if title == "Cyclonic" else 1
if eddies.sign_type == 0:
logger.debug("File come from another algorithm of identification")
eddies.sign_type = -1
return eddies
[docs] @staticmethod
def build_var_list(var_list, remove_vars, include_vars):
if include_vars is not None:
var_list = [i for i in var_list if i in include_vars]
elif remove_vars is not None:
var_list = [i for i in var_list if i not in remove_vars]
return var_list
[docs] @staticmethod
def compare_units(input_unit, output_unit, name):
if output_unit is None or input_unit is None or output_unit == input_unit:
return 1
units = UnitRegistry()
try:
input_unit = units.parse_expression(input_unit, case_sensitive=False)
output_unit = units.parse_expression(output_unit, case_sensitive=False)
except UndefinedUnitError:
input_unit = None
except TokenError:
input_unit = None
if input_unit is not None:
factor = input_unit.to(output_unit).to_tuple()[0]
# If we are able to find a conversion
if factor != 1:
logger.info(
"%s will be multiply by %f to take care of units(%s->%s)",
name,
factor,
input_unit,
output_unit,
)
return factor
else:
return 1
[docs] @classmethod
def from_array(cls, arrays, **kwargs):
nb = arrays["time"].size
eddies = cls(size=nb, **kwargs)
for k, v in arrays.items():
eddies.obs[k] = v
return eddies
[docs] @classmethod
def from_zarr(cls, handler):
nb_obs = len(handler.dimensions[cls.obs_dimension(handler)])
kwargs = dict()
if hasattr(handler, "track_array_variables"):
kwargs["track_array_variables"] = handler.track_array_variables
kwargs["array_variables"] = handler.array_variables.split(",")
if len(handler.track_extra_variables) > 1:
kwargs["track_extra_variables"] = handler.track_extra_variables.split(",")
eddies = cls(size=nb_obs, **kwargs)
for variable in handler.variables:
# Patch
if variable == "time":
eddies.obs[variable] = handler.variables[variable][:]
else:
eddies.obs[VAR_DESCR_inv[variable]] = handler.variables[variable][:]
eddies.sign_type = handler.rotation_type
return eddies
[docs] @classmethod
def from_netcdf(cls, handler):
nb_obs = len(handler.dimensions[cls.obs_dimension(handler)])
kwargs = dict()
if hasattr(handler, "track_array_variables"):
kwargs["track_array_variables"] = handler.track_array_variables
kwargs["array_variables"] = handler.array_variables.split(",")
if len(handler.track_extra_variables) > 1:
kwargs["track_extra_variables"] = handler.track_extra_variables.split(",")
eddies = cls(size=nb_obs, **kwargs)
for variable in handler.variables:
# Patch
if variable == "time":
eddies.obs[variable] = handler.variables[variable][:]
else:
eddies.obs[VAR_DESCR_inv[variable]] = handler.variables[variable][:]
eddies.sign_type = handler.rotation_type
return eddies
[docs] def propagate(
self, previous_obs, current_obs, obs_to_extend, dead_track, nb_next, model
):
"""
Fill virtual obs (C).
:param previous_obs: previous obs from current (A)
:param current_obs: previous obs from virtual (B)
:param obs_to_extend:
:param dead_track:
:param nb_next:
:param model:
:return: New position C = B + AB
"""
next_obs = VirtualEddiesObservations(
size=nb_next,
track_extra_variables=model.track_extra_variables,
track_array_variables=model.track_array_variables,
array_variables=model.array_variables,
)
next_obs.sign_type = self.sign_type
nb_dead = len(previous_obs)
nb_virtual_extend = nb_next - nb_dead
for key in model.elements:
if key in ["lon", "lat", "time"] or "contour_" in key:
continue
next_obs[key][:nb_dead] = current_obs[key]
next_obs["dlon"][:nb_dead] = current_obs["lon"] - previous_obs["lon"]
next_obs["dlat"][:nb_dead] = current_obs["lat"] - previous_obs["lat"]
next_obs["lon"][:nb_dead] = current_obs["lon"] + next_obs["dlon"][:nb_dead]
next_obs["lat"][:nb_dead] = current_obs["lat"] + next_obs["dlat"][:nb_dead]
# Id which are extended
next_obs["track"][:nb_dead] = dead_track
# Add previous virtual
if nb_virtual_extend > 0:
for key in next_obs.elements:
if (
key in ["lon", "lat", "time", "track", "segment_size"]
or "contour_" in key
):
continue
next_obs[key][nb_dead:] = obs_to_extend[key]
next_obs["lon"][nb_dead:] = obs_to_extend["lon"] + obs_to_extend["dlon"]
next_obs["lat"][nb_dead:] = obs_to_extend["lat"] + obs_to_extend["dlat"]
next_obs["track"][nb_dead:] = obs_to_extend["track"]
next_obs["segment_size"][nb_dead:] = obs_to_extend["segment_size"]
# Count
next_obs["segment_size"][:] += 1
return next_obs
[docs] @staticmethod
def intern(flag, public_label=False):
if flag:
labels = "contour_lon_s", "contour_lat_s"
else:
labels = "contour_lon_e", "contour_lat_e"
if public_label:
labels = [VAR_DESCR[label]["nc_name"] for label in labels]
return labels
[docs] def match(
self,
other,
i_self=None,
i_other=None,
method="overlap",
intern=False,
cmin=0,
**kwargs,
):
"""Return index and score computed on the effective contour.
:param EddiesObservations other: Observations to compare
:param array[bool,int],None i_self:
Index or mask to subset observations, it could avoid to build a specific dataset.
:param array[bool,int],None i_other:
Index or mask to subset observations, it could avoid to build a specific dataset.
:param str method:
- "overlap": the score is computed with contours;
- "circle": circles are computed and used for score (TODO)
:param bool intern: if True, speed contour is used (default = effective contour)
:param float cmin: 0 < cmin < 1, return only couples with score >= cmin
:param dict kwargs: look at :py:meth:`vertice_overlap`
:return: return the indices of the eddies in self coupled with eddies in
other and their associated score
:rtype: (array(int), array(int), array(float))
.. minigallery:: py_eddy_tracker.EddiesObservations.match
"""
x_name, y_name = self.intern(intern)
if i_self is None:
i_self = slice(None)
if i_other is None:
i_other = slice(None)
if method == "overlap":
x0, y0 = self[x_name][i_self], self[y_name][i_self]
x1, y1 = other[x_name][i_other], other[y_name][i_other]
i, j = bbox_intersection(x0, y0, x1, y1)
c = vertice_overlap(x0[i], y0[i], x1[j], y1[j], **kwargs)
elif method == "close_center":
x0, y0 = self.longitude[i_self], self.latitude[i_self]
x1, y1 = other.longitude[i_other], other.latitude[i_other]
i, j, c = close_center(x0, y0, x1, y1, **kwargs)
m = c >= cmin # ajout >= pour garder la cmin dans la sƩlection
return i[m], j[m], c[m]
[docs] @staticmethod
def re_reference_index(index, ref):
"""
Shift index with ref
:param array,int index: local index to re ref
:param slice,array ref:
reference could be a slice in this case we juste add start to index
or could be indices and in this case we need to translate
"""
if isinstance(ref, slice):
return index + ref.start
else:
return ref[index]
[docs] @classmethod
def cost_function_common_area(cls, xy_in, xy_out, distance, intern=False):
"""How does it work on x bound ?
:param xy_in:
:param xy_out:
:param distance:
:param bool intern:
"""
x_name, y_name = cls.intern(intern)
nb_records = xy_in.shape[0]
x_in, y_in = xy_in[x_name], xy_in[y_name]
x_out, y_out = xy_out[x_name], xy_out[y_name]
x_in_min, y_in_min = x_in.min(axis=1), y_in.min(axis=1)
x_in_max, y_in_max = x_in.max(axis=1), y_in.max(axis=1)
x_out_min, y_out_min = x_out.min(axis=1), y_out.min(axis=1)
x_out_max, y_out_max = x_out.max(axis=1), y_out.max(axis=1)
costs = ma.empty(nb_records, dtype="f4")
for i in range(nb_records):
if x_in_max[i] < x_out_min[i] or x_in_min[i] > x_out_max[i]:
costs[i] = 1
continue
if y_in_max[i] < y_out_min[i] or y_in_min[i] > y_out_max[i]:
costs[i] = 1
continue
x_in_, x_out_ = x_in[i], x_out[i]
p_in = Polygon(create_vertice(x_in_, y_in[i]))
if abs(x_in_[0] - x_out_[0]) > 180:
x_out_ = (x_out[i] - (x_in_[0] - 180)) % 360 + x_in_[0] - 180
p_out = Polygon(create_vertice(x_out_, y_out[i]))
costs[i] = 1 - (p_in & p_out).area() / min(p_in.area(), p_out.area())
costs.mask = costs == 1
return costs
[docs] def mask_function(self, other, distance):
return distance < 125
[docs] @staticmethod
def cost_function(records_in, records_out, distance):
r"""Return the cost function between two obs.
.. math::
cost = \sqrt{({Amp_{_{in}} - Amp_{_{out}} \over Amp_{_{in}}}) ^2 +
({Rspeed_{_{in}} - Rspeed_{_{out}} \over Rspeed_{_{in}}}) ^2 +
({distance \over 125}) ^2
}
:param records_in: starting observations
:param records_out: observations to associate
:param distance: computed between in and out
"""
cost = (
(records_in["amplitude"] - records_out["amplitude"])
/ records_in["amplitude"]
) ** 2
cost += (
(records_in["radius_s"] - records_out["radius_s"]) / records_in["radius_s"]
) ** 2
cost += (distance / 125) ** 2
cost **= 0.5
# Mask value superior at 60 % of variation
# return ma.array(cost, mask=m)
return cost
[docs] def shifted_ellipsoid_degrees_mask(self, other, minor=1.5, major=1.5):
return shifted_ellipsoid_degrees_mask2(
self.lon, self.lat, other.lon, other.lat, minor, major
)
[docs] def fixed_ellipsoid_mask(
self, other, minor=50, major=100, only_east=False, shifted_ellipse=False
):
dist = self.distance(other).T
accepted = dist < minor
rejected = dist > major
rejected += isnan(dist)
# All obs we are not in rejected and accepted, there are between
# two circle
needs_investigation = -(rejected + accepted)
index_other, index_self = where(needs_investigation)
nb_case = index_self.shape[0]
if nb_case != 0:
if isinstance(major, ndarray):
major = major[index_self]
if isinstance(minor, ndarray):
minor = minor[index_self]
# focal distance
f_degree = ((major**2 - minor**2) ** 0.5) / (
111.2 * cos(radians(self.lat[index_self]))
)
lon_self = self.lon[index_self]
if shifted_ellipse:
x_center_ellipse = lon_self - (major - minor) / 2
else:
x_center_ellipse = lon_self
lon_left_f = x_center_ellipse - f_degree
lon_right_f = x_center_ellipse + f_degree
dist_left_f = distance(
lon_left_f,
self.lat[index_self],
other.lon[index_other],
other.lat[index_other],
)
dist_right_f = distance(
lon_right_f,
self.lat[index_self],
other.lon[index_other],
other.lat[index_other],
)
dist_2a = (dist_left_f + dist_right_f) / 1000
accepted[index_other, index_self] = dist_2a < (2 * major)
if only_east:
d_lon = (other.lon[index_other] - lon_self + 180) % 360 - 180
mask = d_lon < 0
accepted[index_other[mask], index_self[mask]] = False
return accepted.T
[docs] @staticmethod
def solve_conflict(cost):
pass
[docs] @staticmethod
def solve_simultaneous(cost):
"""Deduce link from cost matrix.
:param array(float) cost: Cost for each available link
:return: return a boolean mask array, True for each valid couple
:rtype: array(bool)
"""
mask = ~cost.mask
if mask.size == 0:
return mask
# Count number of links by self obs and other obs
self_links, other_links = sum_row_column(mask)
max_links = max(self_links.max(), other_links.max())
if max_links > 5:
logger.warning("One observation have %d links", max_links)
# If some obs have multiple links, we keep only one link by eddy
eddies_separation = 1 < self_links
eddies_merge = 1 < other_links
test = eddies_separation.any() or eddies_merge.any()
if test:
# We extract matrix that contains conflict
obs_linking_to_self = mask[eddies_separation].any(axis=0)
obs_linking_to_other = mask[:, eddies_merge].any(axis=1)
i_self_keep = where(obs_linking_to_other + eddies_separation)[0]
i_other_keep = where(obs_linking_to_self + eddies_merge)[0]
# Cost to resolve conflict
cost_reduce = cost[i_self_keep][:, i_other_keep]
shape = cost_reduce.shape
nb_conflict = (~cost_reduce.mask).sum()
logger.debug("Shape conflict matrix : %s, %d conflicts", shape, nb_conflict)
if nb_conflict >= (shape[0] + shape[1]):
logger.warning(
"High number of conflict : %d (nb_conflict)", shape[0] + shape[1]
)
links_resolve = 0
# Arbitrary value
max_iteration = max(cost_reduce.shape)
security_increment = 0
while False in cost_reduce.mask:
if security_increment > max_iteration:
# Maybe check if the size decreases if not rise an exception
# x_i, y_i = where(-cost_reduce.mask)
raise Exception("To many iteration: %d" % security_increment)
security_increment += 1
i_min_value = cost_reduce.argmin()
i, j = floor(i_min_value / shape[1]).astype(int), i_min_value % shape[1]
# Set to False all links
mask[i_self_keep[i]] = False
mask[:, i_other_keep[j]] = False
cost_reduce.mask[i] = True
cost_reduce.mask[:, j] = True
# we active only this link
mask[i_self_keep[i], i_other_keep[j]] = True
links_resolve += 1
logger.debug("%d links resolve", links_resolve)
return mask
[docs] @staticmethod
def solve_first(cost, multiple_link=False):
mask = ~cost.mask
# Count number of links by self obs and other obs
self_links = mask.sum(axis=1)
other_links = mask.sum(axis=0)
max_links = max(self_links.max(), other_links.max())
if max_links > 5:
logger.warning("One observation have %d links", max_links)
# If some obs have multiple links, we keep only one link by eddy
eddies_separation = 1 < self_links
eddies_merge = 1 < other_links
test = eddies_separation.any() or eddies_merge.any()
if test:
# We extract matrix that contains conflict
obs_linking_to_self = mask[eddies_separation].any(axis=0)
obs_linking_to_other = mask[:, eddies_merge].any(axis=1)
i_self_keep = where(obs_linking_to_other + eddies_separation)[0]
i_other_keep = where(obs_linking_to_self + eddies_merge)[0]
# Cost to resolve conflict
cost_reduce = cost[i_self_keep][:, i_other_keep]
shape = cost_reduce.shape
nb_conflict = (~cost_reduce.mask).sum()
logger.debug("Shape conflict matrix : %s, %d conflicts", shape, nb_conflict)
if nb_conflict >= (shape[0] + shape[1]):
logger.warning(
"High number of conflict : %d (nb_conflict)", shape[0] + shape[1]
)
links_resolve = 0
for i in range(shape[0]):
j = cost_reduce[i].argmin()
if hasattr(cost_reduce[i, j], "mask"):
continue
links_resolve += 1
# Set all links to False
mask[i_self_keep[i]] = False
cost_reduce.mask[i] = True
if not multiple_link:
mask[:, i_other_keep[j]] = False
cost_reduce.mask[:, j] = True
# We activate this link only
mask[i_self_keep[i], i_other_keep[j]] = True
logger.debug("%d links resolve", links_resolve)
return mask
[docs] def solve_function(self, cost_matrix):
return numba_where(self.solve_simultaneous(cost_matrix))
[docs] def post_process_link(self, other, i_self, i_other):
if unique(i_other).shape[0] != i_other.shape[0]:
raise Exception()
return i_self, i_other
[docs] def tracking(self, other):
"""Track obs between self and other"""
dist = self.distance(other)
mask_accept_dist = self.mask_function(other, dist)
indexs_closest = where(mask_accept_dist)
cost_values = self.cost_function(
self.obs[indexs_closest[0]],
other.obs[indexs_closest[1]],
dist[mask_accept_dist],
)
cost_mat = ma.empty(mask_accept_dist.shape, dtype="f4")
cost_mat.mask = ~mask_accept_dist
cost_mat[mask_accept_dist] = cost_values
i_self, i_other = self.solve_function(cost_mat)
i_self, i_other = self.post_process_link(other, i_self, i_other)
logger.debug("%d matched with previous", i_self.shape[0])
return i_self, i_other, cost_mat[i_self, i_other]
[docs] def to_zarr(self, handler, **kwargs):
handler.attrs["track_extra_variables"] = ",".join(self.track_extra_variables)
if self.track_array_variables != 0:
handler.attrs["track_array_variables"] = self.track_array_variables
handler.attrs["array_variables"] = ",".join(self.array_variables)
# Iter on variables to create:
for ori_name in self.fields:
# Patch for a transition
name = ori_name
#
logger.debug("Create Variable %s", VAR_DESCR[name]["nc_name"])
self.create_variable_zarr(
handler,
dict(
name=VAR_DESCR[name]["nc_name"],
store_dtype=VAR_DESCR[name]["output_type"],
dtype=VAR_DESCR[name]["nc_type"],
dimensions=VAR_DESCR[name]["nc_dims"],
),
VAR_DESCR[name]["nc_attr"],
self.obs[ori_name],
scale_factor=VAR_DESCR[name].get("scale_factor", None),
add_offset=VAR_DESCR[name].get("add_offset", None),
filters=VAR_DESCR[name].get("filters", None),
**kwargs,
)
self.set_global_attr_zarr(handler)
[docs] @staticmethod
def netcdf_create_dimensions(handler, dim, nb):
if dim not in handler.dimensions:
handler.createDimension(dim, nb)
else:
old_nb = len(handler.dimensions[dim])
if nb != old_nb:
raise Exception(
f"{dim} dimensions previously set to a different size {old_nb} (current value : {nb})"
)
[docs] def to_netcdf(self, handler, **kwargs):
eddy_size = len(self)
logger.debug('Create Dimensions "obs" : %d', eddy_size)
self.netcdf_create_dimensions(handler, "obs", eddy_size)
handler.track_extra_variables = ",".join(self.track_extra_variables)
if self.track_array_variables != 0:
self.netcdf_create_dimensions(
handler, "NbSample", self.track_array_variables
)
handler.track_array_variables = self.track_array_variables
handler.array_variables = ",".join(self.array_variables)
# Iter on variables to create:
fields_ = array([VAR_DESCR[field]["nc_name"] for field in self.fields])
i = fields_.argsort()
for ori_name in array(self.fields)[i]:
# Patch for a transition
name = ori_name
#
logger.debug("Create Variable %s", VAR_DESCR[name]["nc_name"])
self.create_variable(
handler,
dict(
varname=VAR_DESCR[name]["nc_name"],
datatype=VAR_DESCR[name]["output_type"],
dimensions=VAR_DESCR[name]["nc_dims"],
),
VAR_DESCR[name]["nc_attr"],
self.obs[ori_name],
scale_factor=VAR_DESCR[name].get("scale_factor", None),
add_offset=VAR_DESCR[name].get("add_offset", None),
**kwargs,
)
self.set_global_attr_netcdf(handler)
[docs] def create_variable(
self,
handler_nc,
kwargs_variable,
attr_variable,
data,
scale_factor=None,
add_offset=None,
**kwargs,
):
dims = kwargs_variable.get("dimensions", None)
# Manage chunk in 2d case
if dims is not None and len(dims) > 1:
chunk = [1]
cum = 1
for dim in dims[1:]:
nb = len(handler_nc.dimensions[dim])
chunk.append(nb)
cum *= nb
chunk[0] = min(int(400000 / cum), len(handler_nc.dimensions[dims[0]]))
kwargs_variable["chunksizes"] = chunk
kwargs_variable["zlib"] = True
kwargs_variable["complevel"] = 1
kwargs_variable.update(kwargs)
var = handler_nc.createVariable(**kwargs_variable)
attrs = list(attr_variable.keys())
attrs.sort()
for attr in attrs:
attr_value = attr_variable[attr]
var.setncattr(attr, attr_value)
if self.raw_data:
var[:] = data
if scale_factor is not None:
var.scale_factor = scale_factor
if add_offset is not None:
var.add_offset = add_offset
else:
var.add_offset = 0
if not self.raw_data:
var[:] = data
try:
if len(var.dimensions) == 1 or var.size < 1e7:
var.setncattr("min", var[:].min())
var.setncattr("max", var[:].max())
except ValueError:
logger.warning("Data is empty")
[docs] @staticmethod
def get_filters_zarr(name):
"""Get filters to store in zarr for known variable
:param str name: private variable name
:return list: filters list
"""
content = VAR_DESCR.get(name)
filters = list()
store_dtype = content["output_type"]
scale_factor, add_offset = content.get("scale_factor", None), content.get(
"add_offset", None
)
if scale_factor is not None or add_offset is not None:
if add_offset is None:
add_offset = 0
filters.append(
zarr.FixedScaleOffset(
offset=add_offset,
scale=1 / scale_factor,
dtype=content["nc_type"],
astype=store_dtype,
)
)
filters.extend(content.get("filters", []))
return filters
[docs] def create_variable_zarr(
self,
handler_zarr,
kwargs_variable,
attr_variable,
data,
scale_factor=None,
add_offset=None,
filters=None,
compressor=None,
chunck_size=2500000,
):
kwargs_variable["shape"] = data.shape
kwargs_variable["compressor"] = (
zarr.Blosc(cname="zstd", clevel=2) if compressor is None else compressor
)
kwargs_variable["filters"] = list()
store_dtype = kwargs_variable.pop("store_dtype", None)
if scale_factor is not None or add_offset is not None:
if add_offset is None:
add_offset = 0
kwargs_variable["filters"].append(
zarr.FixedScaleOffset(
offset=add_offset,
scale=1 / scale_factor,
dtype=kwargs_variable["dtype"],
astype=store_dtype,
)
)
if filters is not None:
kwargs_variable["filters"].extend(filters)
dims = kwargs_variable.get("dimensions", None)
# Manage chunk in 2d case
if len(dims) == 1:
kwargs_variable["chunks"] = (chunck_size,)
if len(dims) == 2:
second_dim = data.shape[1]
kwargs_variable["chunks"] = (chunck_size // second_dim, second_dim)
kwargs_variable.pop("dimensions")
v = handler_zarr.create_dataset(**kwargs_variable)
attrs = list(attr_variable.keys())
attrs.sort()
for attr in attrs:
attr_value = attr_variable[attr]
v.attrs[attr] = str(attr_value)
if self.raw_data:
if scale_factor is not None:
s_bloc = kwargs_variable["chunks"][0]
nb_bloc = int(ceil(data.shape[0] / s_bloc))
for i in range(nb_bloc):
sl = slice(i * s_bloc, (i + 1) * s_bloc)
v[sl] = data[sl] * scale_factor + add_offset
else:
v[:] = data
if not self.raw_data:
v[:] = data
try:
if v.size < 1e8:
v.attrs["min"] = str(v[:].min())
v.attrs["max"] = str(v[:].max())
except ValueError:
logger.warning("Data is empty")
[docs] def write_file(
self, path="./", filename="%(path)s/%(sign_type)s.nc", zarr_flag=False, **kwargs
):
"""Write a netcdf or zarr with eddy obs.
Zarr is usefull for large dataset > 10M observations
:param str path: set path variable
:param str filename: model to store file
:param bool zarr_flag: If True, method will use zarr format instead of netcdf
:param dict kwargs: look at :py:meth:`to_zarr` or :py:meth:`to_netcdf`
"""
filename = filename % dict(
path=path,
sign_type=self.sign_legend,
prod_time=datetime.now().strftime("%Y%m%d"),
)
if zarr_flag:
filename = filename.replace(".nc", ".zarr")
if filename.endswith(".zarr"):
zarr_flag = True
logger.info("Store in %s (%d observations)", filename, len(self))
if zarr_flag:
handler = zarr.open(filename, "w")
self.to_zarr(handler, **kwargs)
else:
nc_format = kwargs.pop("format", "NETCDF4")
with Dataset(filename, "w", format=nc_format) as handler:
self.to_netcdf(handler, **kwargs)
@property
def global_attr(self):
return dict(
Metadata_Conventions="Unidata Dataset Discovery v1.0",
comment="Surface product; mesoscale eddies",
framework_used="https://github.com/AntSimi/py-eddy-tracker",
framework_version=__version__,
standard_name_vocabulary="NetCDF Climate and Forecast (CF) Metadata Convention Standard Name Table",
rotation_type=self.sign_type,
)
[docs] def set_global_attr_zarr(self, h_zarr):
for key, item in self.global_attr.items():
h_zarr.attrs[key] = str(item)
[docs] def set_global_attr_netcdf(self, h_nc):
for key, item in self.global_attr.items():
h_nc.setncattr(key, item)
[docs] def mask_from_polygons(self, polygons):
"""
Return mask for all observations in one of polygons list
:param list((array,array)) polygons: list of x/y array which be used to identify observations
"""
x, y = polygons[0]
m = insidepoly(
self.longitude, self.latitude, x.reshape((1, -1)), y.reshape((1, -1))
)
for x, y in polygons[1:]:
m_ = ~m
m[m_] = insidepoly(
self.longitude[m_],
self.latitude[m_],
x.reshape((1, -1)),
y.reshape((1, -1)),
)
return m
@property
def time_datetime64(self):
dt = (datetime64("1970-01-01") - datetime64("1950-01-01")).astype("i8")
return (self.time - dt).astype("datetime64[D]")
[docs] def time_sub_sample(self, t0, time_step):
"""
Time sub sampling
:param int,float t0: reference time that will be keep
:param int,float time_step: keep every observation spaced by time_step
"""
mask = (self.time - t0) % time_step == 0
return self.extract_with_mask(mask)
[docs] def scatter(self, ax, name=None, ref=None, factor=1, **kwargs):
"""
Scatter data.
:param matplotlib.axes.Axes ax: matplotlib axe used to draw
:param str,array,None name:
variable used to fill the contour, if None all elements have the same color
:param float,None ref: if defined, all coordinates are wrapped with ref as western boundary
:param float factor: multiply value by
:param dict kwargs: look at :py:meth:`matplotlib.axes.Axes.scatter`
:return: scatter mappable
.. minigallery:: py_eddy_tracker.EddiesObservations.scatter
"""
x = self.longitude
if ref is not None:
x = (x - ref) % 360 + ref
kwargs = kwargs.copy()
if name is not None and "c" not in kwargs:
v = self.parse_varname(name)
kwargs["c"] = v * factor
return ax.scatter(x, self.latitude, **kwargs)
[docs] def filled(
self,
ax,
varname=None,
ref=None,
intern=False,
cmap="magma_r",
lut=10,
vmin=None,
vmax=None,
factor=1,
**kwargs,
):
"""
:param matplotlib.axes.Axes ax: matplotlib axe used to draw
:param str,array,None varname: variable used to fill the contours, or an array of same size than obs
:param float,None ref: if defined, all coordinates are wrapped with ref as western boundary
:param bool intern: if True draw speed contours instead of effective contours
:param str cmap: matplotlib colormap name
:param int,None lut: Number of colors in the colormap
:param float,None vmin: Min value of the colorbar
:param float,None vmax: Max value of the colorbar
:param float factor: multiply value by
:return: Collection drawed
:rtype: matplotlib.collections.PolyCollection
.. minigallery:: py_eddy_tracker.EddiesObservations.filled
"""
x_name, y_name = self.intern(intern)
x, y = self[x_name], self[y_name]
if ref is not None:
# TODO : maybe buggy with global display
shape_out = x.shape
x, y = wrap_longitude(x.reshape(-1), y.reshape(-1), ref)
x, y = x.reshape(shape_out), y.reshape(shape_out)
verts = list()
for x_, y_ in zip(x, y):
verts.append(create_vertice(x_, y_))
if "facecolors" not in kwargs:
kwargs = kwargs.copy()
cmap = get_cmap(cmap, lut)
v = self.parse_varname(varname) * factor
if vmin is None:
vmin = v.min()
if vmax is None:
vmax = v.max()
v = (v - vmin) / (vmax - vmin)
colors = [cmap(v_) for v_ in v]
kwargs["facecolors"] = colors
if "label" in kwargs:
kwargs["label"] = self.format_label(kwargs["label"])
c = PolyCollection(verts, **kwargs)
ax.add_collection(c)
c.cmap = cmap
c.norm = Normalize(vmin=vmin, vmax=vmax)
return c
def __merge_filters__(self, *filters):
"""
Compute an intersection between all filters after to evaluate each of them
:param list(slice,array[int],array[bool]) filters:
:return: Return applicable object to numpy.array
:rtype: slice, index, mask
"""
filter1 = filters[0]
if len(filters) > 2:
filter2 = self.__merge_filters__(*filters[1:])
elif len(filters) == 2:
filter2 = filters[1]
# Merge indexs and filter
if isinstance(filter1, slice):
reject = ones(len(self), dtype="bool")
reject[filter1] = False
if isinstance(filter2, slice):
reject[filter2] = False
return ~reject
# Mask case
elif filter2.dtype == bool:
return ~reject * filter2
# index case
else:
return filter2[~reject[filter2]]
# mask case
elif filter1.dtype == bool:
if isinstance(filter2, slice):
select = zeros(len(self), dtype="bool")
select[filter2] = True
return select * filter1
# Mask case
elif filter2.dtype == bool:
return filter2 * filter1
# index case
else:
return filter2[filter1[filter2]]
# index case
else:
if isinstance(filter2, slice):
select = zeros(len(self), dtype="bool")
select[filter2] = True
return filter1[select[filter1]]
# Mask case
elif filter2.dtype == bool:
return filter1[filter2[filter1]]
# index case
else:
return filter1[in1d(filter1, filter2)]
[docs] def merge_filters(self, *filters):
"""
Compute an intersection between all filters after to evaluate each of them
:param list(callable,None,slice,array[int],array[bool]) filters:
:return: Return applicable object to numpy.array
:rtype: slice, index, mask
"""
if len(filters) == 1 and isinstance(filters[0], list):
filters = filters[0]
filters_ = list()
# Remove all filter which select all obs
for filter in filters:
if callable(filter):
filter = filter(self)
if filter is None:
continue
if isinstance(filter, slice):
if filter == slice(None):
continue
elif filter.dtype == "bool":
if filter.all():
continue
if not filter.any():
return empty(0, dtype=int)
filters_.append(filter)
if len(filters_) == 1:
return filters_[0]
elif len(filters_) == 0:
return slice(None)
else:
return self.__merge_filters__(*filters_)
[docs] def bins_stat(self, xname, bins=None, yname=None, method=None, mask=None):
"""
:param str,array xname: variable to compute stats on
:param array, None bins: bins to perform statistics, if None bins = arange(variable.min(), variable.max() + 2)
:param None,str,array yname: variable used to apply method
:param None,str method: If None method counts the number of observations in each bin, can be "mean", "std"
:param None,array(bool) mask: If defined use only True position
:return: x array and y array
:rtype: array,array
.. minigallery:: py_eddy_tracker.EddiesObservations.bins_stat
"""
v = self.parse_varname(xname)
mask = self.merge_filters(mask)
v = v[mask]
if bins is None:
bins = arange(v.min(), v.max() + 2)
y, x = hist_numba(v, bins=bins)
x = (x[1:] + x[:-1]) / 2
if method == "mean":
y_v = self.parse_varname(yname)
y_v = y_v[mask]
y_, _ = histogram(v, bins=bins, weights=y_v)
with errstate(divide="ignore", invalid="ignore"):
y = y_ / y
return x, y
[docs] def display_color(self, ax, field, ref=None, intern=False, **kwargs):
"""Plot colored contour of eddies
:param matplotlib.axes.Axes ax: matplotlib axe used to draw
:param str,array field: color field
:param float,None ref: if defined, all coordinates are wrapped with ref as western boundary
:param bool intern: if True, draw the speed contour
:param dict kwargs: look at :py:meth:`matplotlib.collections.LineCollection`
.. minigallery:: py_eddy_tracker.EddiesObservations.display_color
"""
xname, yname = self.intern(intern)
x, y = self[xname], self[yname]
if ref is not None:
# TODO : maybe buggy with global display
shape_out = x.shape
x, y = wrap_longitude(x.reshape(-1), y.reshape(-1), ref)
x, y = x.reshape(shape_out), y.reshape(shape_out)
c = self.parse_varname(field)
cmap = get_cmap(kwargs.pop("cmap", "Spectral_r"))
cmin, cmax = kwargs.pop("vmin", c.min()), kwargs.pop("vmax", c.max())
colors = cmap((c - cmin) / (cmax - cmin))
lines = LineCollection(
[create_vertice(i, j) for i, j in zip(x, y)], colors=colors, **kwargs
)
ax.add_collection(lines)
lines.cmap = cmap
lines.norm = Normalize(vmin=cmin, vmax=cmax)
return lines
[docs] def display(self, ax, ref=None, extern_only=False, intern_only=False, **kwargs):
"""Plot the speed and effective (dashed) contour of the eddies
:param matplotlib.axes.Axes ax: matplotlib axe used to draw
:param float,None ref: if defined, all coordinates are wrapped with ref as western boundary
:param bool extern_only: if True, draw only the effective contour
:param bool intern_only: if True, draw only the speed contour
:param dict kwargs: look at :py:meth:`matplotlib.axes.Axes.plot`
.. minigallery:: py_eddy_tracker.EddiesObservations.display
"""
if not extern_only:
lon_s = flatten_line_matrix(self.contour_lon_s)
lat_s = flatten_line_matrix(self.contour_lat_s)
if not intern_only:
lon_e = flatten_line_matrix(self.contour_lon_e)
lat_e = flatten_line_matrix(self.contour_lat_e)
if "label" in kwargs:
kwargs["label"] = self.format_label(kwargs["label"])
kwargs_e = kwargs.copy()
if "ls" not in kwargs_e and "linestyle" not in kwargs_e:
kwargs_e["linestyle"] = "-."
if not extern_only:
kwargs_e.pop("label", None)
mappables = list()
if not extern_only:
if ref is not None:
lon_s, lat_s = wrap_longitude(lon_s, lat_s, ref, cut=True)
mappables.append(ax.plot(lon_s, lat_s, **kwargs)[0])
if not intern_only:
if ref is not None:
lon_e, lat_e = wrap_longitude(lon_e, lat_e, ref, cut=True)
mappables.append(ax.plot(lon_e, lat_e, **kwargs_e)[0])
return mappables
[docs] def first_obs(self):
"""
Get first obs of each trajectory.
:rtype: __class__
.. minigallery:: py_eddy_tracker.EddiesObservations.first_obs
"""
return self.extract_with_mask(self.n == 0)
[docs] def last_obs(self):
"""
Get Last obs of each trajectory.
:rtype: __class__
.. minigallery:: py_eddy_tracker.EddiesObservations.last_obs
"""
m = zeros(len(self), dtype="bool")
m[-1] = True
m[:-1][self.n[1:] == 0] = True
return self.extract_with_mask(m)
[docs] def is_convex(self, intern=False):
"""
Get flag of the eddy's convexity
:param bool intern: If True use speed contour instead of effective contour
:return: True if the contour is convex
:rtype: array[bool]
"""
xname, yname = self.intern(intern)
return convexs(self[xname], self[yname])
[docs] def contains(self, x, y, intern=False):
"""
Return index of contour containing (x,y)
:param array x: longitude
:param array y: latitude
:param bool intern: If true use speed contour instead of effective contour
:return: indexs, -1 if no index
:rtype: array[int32]
"""
xname, yname = self.intern(intern)
m = ~(isnan(x) + isnan(y))
i = -ones(x.shape, dtype="i4")
if x.size != 0 and m.any():
i[m] = poly_indexs(x[m], y[m], self[xname], self[yname])
return i
[docs] def inside(self, x, y, intern=False):
"""
True for each point inside the effective contour of an eddy
:param array x: longitude
:param array y: latitude
:param bool intern: If true use speed contour instead of effective contour
:return: flag
:rtype: array[bool]
"""
xname, yname = self.intern(intern)
return insidepoly(x, y, self[xname], self[yname])
[docs] def grid_count(self, bins, intern=False, center=False, filter=slice(None)):
"""
Count the eddies in each bin (use all pixels in each contour)
:param (numpy.array,numpy.array) bins: bins (grid) to count
:param bool intern: if True use speed contour only
:param bool center: if True use of center to count
:param array,mask,slice filter: keep the data selected with the filter
:return: return the grid of counts
:rtype: py_eddy_tracker.dataset.grid.RegularGridDataset
.. minigallery:: py_eddy_tracker.EddiesObservations.grid_count
"""
filter = self.merge_filters(filter)
x_name, y_name = self.intern(intern)
x_bins, y_bins = arange(*bins[0]), arange(*bins[1])
x0 = bins[0][0]
grid = ma.zeros((x_bins.shape[0] - 1, y_bins.shape[0] - 1), dtype="u4")
from ..dataset.grid import RegularGridDataset
regular_grid = RegularGridDataset.with_array(
coordinates=("lon", "lat"),
datas=dict(
count=grid,
lon=(x_bins[1:] + x_bins[:-1]) / 2,
lat=(y_bins[1:] + y_bins[:-1]) / 2,
),
variables_description=dict(
count=dict(long_name="Number of times the pixel is within an eddy")
),
centered=True,
)
if center:
x, y = (self.longitude[filter] - x0) % 360 + x0, self.latitude[filter]
grid[:] = histogram2d(x, y, (x_bins, y_bins))[0]
grid.mask = grid.data == 0
else:
x_ref = ((self.longitude[filter] - x0) % 360 + x0 - 180).reshape(-1, 1)
x_contour, y_contour = self[x_name][filter], self[y_name][filter]
grid_count_pixel_in(
grid.data,
x_contour,
y_contour,
x_ref,
regular_grid.x_bounds,
regular_grid.y_bounds,
regular_grid.xstep,
regular_grid.ystep,
regular_grid.N,
regular_grid.is_circular(),
regular_grid.x_size,
regular_grid.x_c,
regular_grid.y_c,
)
grid.mask = grid == 0
return regular_grid
[docs] def grid_box_stat(self, bins, varname, method=50, data=None, filter=slice(None)):
"""
Get percentile of eddies in each bin
:param (numpy.array,numpy.array) bins: bins (grid) to count
:param str varname: variable to apply the method if data is None and will be output name
:param str,float method: method to apply. If float, use ?
:param array data: Array used to compute stat if defined
:param array,mask,slice filter: keep the data selected with the filter
:return: return grid of method
:rtype: py_eddy_tracker.dataset.grid.RegularGridDataset
.. minigallery:: py_eddy_tracker.EddiesObservations.grid_box_stat
"""
x_bins, y_bins = arange(*bins[0]), arange(*bins[1])
x0 = bins[0][0]
x, y = (self.longitude - x0) % 360 + x0, self.latitude
data = self[varname] if data is None else data
if hasattr(data, "mask"):
filter = self.merge_filters(~data.mask, self.merge_filters(filter))
else:
filter = self.merge_filters(filter)
x, y, data = x[filter], y[filter], data[filter]
from ..dataset.grid import RegularGridDataset
shape = (x_bins.shape[0] - 1, y_bins.shape[0] - 1)
grid = ma.empty(shape, dtype=data.dtype)
grid.mask = ones(shape, dtype="bool")
regular_grid = RegularGridDataset.with_array(
coordinates=("x", "y"),
datas={varname: grid, "x": x_bins[:-1], "y": y_bins[:-1]},
centered=False,
)
grid_box_stat(
regular_grid.x_c,
regular_grid.y_c,
grid.data,
grid.mask,
x,
y,
data,
regular_grid.is_circular(),
method,
)
return regular_grid
[docs] def grid_stat(self, bins, varname, data=None):
"""
Return the mean of the eddies' variable in each bin
:param (numpy.array,numpy.array) bins: bins (grid) to compute the mean on
:param str varname: name of variable to compute the mean on and output grid_name
:param array data: Array used to compute stat if defined
:return: return the gridde mean variable
:rtype: py_eddy_tracker.dataset.grid.RegularGridDataset
.. minigallery:: py_eddy_tracker.EddiesObservations.grid_stat
"""
x_bins, y_bins = arange(*bins[0]), arange(*bins[1])
x0 = bins[0][0]
x, y = (self.longitude - x0) % 360 + x0, self.latitude
data = self[varname] if data is None else data
if hasattr(data, "mask"):
m = ~data.mask
sum_obs = histogram2d(x[m], y[m], (x_bins, y_bins), weights=data[m])[0]
nb_obs = histogram2d(x[m], y[m], (x_bins, y_bins))[0]
else:
sum_obs = histogram2d(x, y, (x_bins, y_bins), weights=data)[0]
nb_obs = histogram2d(x, y, (x_bins, y_bins))[0]
from ..dataset.grid import RegularGridDataset
with errstate(divide="ignore", invalid="ignore"):
regular_grid = RegularGridDataset.with_array(
coordinates=("x", "y"),
datas={
varname: ma.array(sum_obs / nb_obs, mask=nb_obs == 0),
"x": x_bins[:-1],
"y": y_bins[:-1],
},
centered=False,
)
return regular_grid
[docs] def interp_grid(
self, grid_object, varname, i=None, method="center", dtype=None, intern=None
):
"""
Interpolate a grid on a center or contour with mean, min or max method
:param grid_object: Handler of grid to interp
:type grid_object: py_eddy_tracker.dataset.grid.RegularGridDataset
:param str varname: Name of variable to use
:param array[bool,int],None i:
Index or mask to subset observations, it could avoid to build a specific dataset.
:param str method: 'center', 'mean', 'max', 'min', 'nearest'
:param str dtype: if None we use var dtype
:param bool intern: Use extern or intern contour
.. minigallery:: py_eddy_tracker.EddiesObservations.interp_grid
"""
if method in ("center", "nearest"):
x, y = self.longitude, self.latitude
if i is not None:
x, y = x[i], y[i]
return grid_object.interp(varname, x, y, method)
elif method in ("min", "max", "mean", "count"):
x0 = grid_object.x_bounds[0]
x_name, y_name = self.intern(False if intern is None else intern)
x_ref = ((self.longitude - x0) % 360 + x0 - 180).reshape(-1, 1)
x, y = (self[x_name] - x_ref) % 360 + x_ref, self[y_name]
if i is not None:
x, y = x[i], y[i]
grid = grid_object.grid(varname)
result = empty(x.shape[0], dtype=grid.dtype if dtype is None else dtype)
min_method = method == "min"
grid_stat(
grid_object.x_c,
grid_object.y_c,
-grid.data if min_method else grid.data,
grid.mask,
x,
y,
result,
grid_object.is_circular(),
method="max" if min_method else method,
)
return -result if min_method else result
else:
raise Exception(f'method "{method}" unknown')
@property
def period(self):
"""
Give the time coverage. If collection is empty, return nan,nan
:return: first and last date
:rtype: (int,int)
"""
if self.period_ is None:
if self.time.size < 1:
self.period_ = nan, nan
else:
self.period_ = self.time.min(), self.time.max()
return self.period_
@property
def nb_days(self):
"""Return period in days covered by the dataset
:return: Number of days
:rtype: int
"""
return self.period[1] - self.period[0] + 1
[docs] def create_particles(self, step, intern=True):
"""Create particles inside contour (Default : speed contour). Avoid creating too large numpy arrays, only to be masked
:param step: step for particles
:type step: float
:param bool intern: If true use speed contour instead of effective contour
:return: lon, lat and indices of particles
:rtype: tuple(np.array)
"""
xname, yname = self.intern(intern)
return create_meshed_particles(self[xname], self[yname], step)
[docs] def empty_dataset(self):
return self.new_like(self, 0)
[docs]@njit(cache=True)
def grid_count_(grid, i, j):
"""
Add 1 to each index
"""
for i_, j_ in zip(i, j):
grid[i_, j_] += 1
[docs]@njit(cache=True)
def grid_count_pixel_in(
grid,
x,
y,
x_ref,
x_bounds,
y_bounds,
xstep,
ystep,
N,
is_circular,
x_size,
x_c,
y_c,
):
"""
Count how many times a pixel is used.
:param array grid:
:param array x: x for all contour
:param array y: y for all contour
:param array x_ref: x reference for wrapping
:param array x_bounds: grid longitude
:param array y_bounds: grid latitude
:param float xstep: step between two longitude
:param float ystep: step between two latitude
:param int N: shift of index to enlarge window
:param bool is_circular: To know if grid is wrappable
:param int x_size: Number of longitude
:param array x_c: longitude coordinate of grid
:param array y_c: latitude coordinate of grid
"""
nb = x_ref.shape[0]
for i_ in range(nb):
x_, y_, x_ref_ = x[i_], y[i_], x_ref[i_]
x_ = (x_ - x_ref_) % 360 + x_ref_
x_, y_ = reduce_size(x_, y_)
v = create_vertice(x_, y_)
(x_start, x_stop), (y_start, y_stop) = bbox_indice_regular(
v,
x_bounds,
y_bounds,
xstep,
ystep,
N,
is_circular,
x_size,
)
i, j = get_pixel_in_regular(v, x_c, y_c, x_start, x_stop, y_start, y_stop)
grid_count_(grid, i, j)
[docs]@njit(cache=True)
def grid_box_stat(x_c, y_c, grid, mask, x, y, value, circular=False, method=50):
"""
Compute method on each set (one set by box)
:param array_like x_c: grid longitude coordinates
:param array_like y_c: grid latitude coordinates
:param array_like grid: grid to store the result
:param array[bool] mask: grid to store unused boxes
:param array_like x: longitude of observations
:param array_like y: latitude of observations
:param array_like value: value to group to apply method
:param bool circular: True if grid is wrappable
:param float method: percentile
"""
xstep, ystep = x_c[1] - x_c[0], y_c[1] - y_c[0]
x0, y0 = x_c[0] - xstep / 2.0, y_c[0] - ystep / 2.0
nb_x = x_c.shape[0]
nb_y = y_c.shape[0]
i, j = (
((x - x0) // xstep).astype(numba_types.int32),
((y - y0) // ystep).astype(numba_types.int32),
)
if circular:
i %= nb_x
else:
if (i < 0).any():
raise Exception("x indices underflow")
if (i >= nb_x).any():
raise Exception("x indices overflow")
if (j < 0).any():
raise Exception("y indices underflow")
if (j >= nb_y).any():
raise Exception("y indices overflow")
abs_i = j * nb_x + i
k_sort = abs_i.argsort()
i0, j0 = i[k_sort[0]], j[k_sort[0]]
values = list()
for k in k_sort:
i_, j_ = i[k], j[k]
# group change
if i_ != i0 or j_ != j0:
# apply method and store result
grid[i_, j_] = percentile(values, method)
mask[i_, j_] = False
# start new group
i0, j0 = i_, j_
# reset list
values.clear()
values.append(value[k])
[docs]@njit(cache=True)
def grid_stat(x_c, y_c, grid, mask, x, y, result, circular=False, method="mean"):
"""
Compute the mean or the max of the grid for each contour
:param array_like x_c: the grid longitude coordinates
:param array_like y_c: the grid latitude coordinates
:param array_like grid: grid value
:param array[bool] mask: mask for invalid value
:param array_like x: longitude of contours
:param array_like y: latitude of contours
:param array_like result: return values
:param bool circular: True if grid is wrappable
:param str method: 'mean', 'max'
"""
# FIXME : how does it work on grid bound
nb = result.shape[0]
xstep, ystep = x_c[1] - x_c[0], y_c[1] - y_c[0]
x0, y0 = x_c - xstep / 2.0, y_c - ystep / 2.0
nb_x = x_c.shape[0]
max_method = "max" == method
mean_method = "mean" == method
count_method = "count" == method
for elt in range(nb):
v = create_vertice(x[elt], y[elt])
(x_start, x_stop), (y_start, y_stop) = bbox_indice_regular(
v, x0, y0, xstep, ystep, 1, circular, nb_x
)
i, j = get_pixel_in_regular(v, x_c, y_c, x_start, x_stop, y_start, y_stop)
if count_method:
result[elt] = i.shape[0]
elif mean_method:
v_sum = 0
nb_ = 0
for i_, j_ in zip(i, j):
if mask[i_, j_]:
continue
v_sum += grid[i_, j_]
nb_ += 1
# FIXME : how does it work on grid bound,
if nb_ == 0:
result[elt] = nan
else:
result[elt] = v_sum / nb_
elif max_method:
v_max = -1e40
for i_, j_ in zip(i, j):
values = grid[i_, j_]
# FIXME must use mask
v_max = max(v_max, values)
result[elt] = v_max
[docs]class VirtualEddiesObservations(EddiesObservations):
"""Class to work with virtual obs"""
__slots__ = ()
@property
def elements(self):
elements = super().elements
elements.extend(["track", "segment_size", "dlon", "dlat"])
return list(set(elements))
[docs]@njit(cache=True)
def numba_where(mask):
"""Usefull when mask is close to be empty"""
return where(mask)
[docs]@njit(cache=True)
def sum_row_column(mask):
"""
Compute sum on row and column at same time
"""
nb_x, nb_y = mask.shape
row_sum = zeros(nb_x, dtype=numba_types.int32)
column_sum = zeros(nb_y, dtype=numba_types.int32)
for i in range(nb_x):
for j in range(nb_y):
if mask[i, j]:
row_sum[i] += 1
column_sum[j] += 1
return row_sum, column_sum
[docs]@njit(cache=True)
def numba_digitize(values, bins):
# Check if bins are regular
nb_bins = bins.shape[0]
step = bins[1] - bins[0]
bin_previous = bins[1]
for i in range(2, nb_bins):
bin_current = bins[i]
if step != (bin_current - bin_previous):
# If bins are not regular
return digitize(values, bins)
bin_previous = bin_current
nb_values = values.shape[0]
out = empty(nb_values, dtype=numba_types.int64)
up, down = bins[0], bins[-1]
for i in range(nb_values):
v_ = values[i]
if v_ >= down:
out[i] = nb_bins
continue
if v_ < up:
out[i] = 0
continue
out[i] = (v_ - bins[0]) / step + 1
return out
[docs]@njit(cache=True)
def iter_mode_reduce(x, bins):
"""
Test if we could use a reduce mode
:param array x: array to divide in group
:param array bins: array which defined bounds between each group
:return: If reduce mode, translator, and reduce x
"""
nb = x.shape[0]
# If we use less than half value
limit = nb // 2
# low and up
x0, x1 = bins[0], bins[-1]
m = empty(nb, dtype=numba_types.bool_)
# To count number of value cover by bins
c = 0
for i in range(nb):
x_ = x[i]
test = (x_ >= x0) * (x_ <= x1)
m[i] = test
if test:
c += 1
# If number value exceed limit
if c > limit:
return False, empty(0, dtype=numba_types.int_), x
# Indices to be able to translate in full index array
indices = empty(c, dtype=numba_types.int_)
x_ = empty(c, dtype=x.dtype)
j = 0
for i in range(nb):
if m[i]:
indices[j] = i
x_[j] = x[i]
j += 1
return True, indices, x_