# -*- coding: utf-8 -*-
"""
Class to load and manipulate RegularGrid and UnRegularGrid
"""
import logging
from datetime import datetime
from cv2 import filter2D
from matplotlib.path import Path as BasePath
from netCDF4 import Dataset
from numba import njit
from numba import types as numba_types
from numpy import (
arange,
array,
ceil,
concatenate,
cos,
deg2rad,
empty,
errstate,
exp,
float_,
histogram2d,
int8,
int_,
interp,
isnan,
linspace,
ma,
)
from numpy import mean as np_mean
from numpy import (
meshgrid,
nan,
nanmean,
ones,
percentile,
pi,
round_,
sin,
sinc,
where,
zeros,
)
from pint import UnitRegistry
from scipy.interpolate import RectBivariateSpline, interp1d
from scipy.ndimage import convolve, gaussian_filter
from scipy.signal import welch
from scipy.spatial import cKDTree
from scipy.special import j1
from .. import VAR_DESCR
from ..eddy_feature import Amplitude, Contours
from ..generic import (
bbox_indice_regular,
coordinates_to_local,
distance,
interp2d_geo,
local_to_coordinates,
nearest_grd_indice,
uniform_resample,
)
from ..observations.observation import EddiesObservations
from ..poly import (
create_vertice,
fit_circle,
get_pixel_in_regular,
poly_area,
poly_contain_poly,
visvalingam,
winding_number_poly,
)
logger = logging.getLogger("pet")
[docs]def raw_resample(datas, fixed_size):
nb_value = datas.shape[0]
if nb_value == 1:
raise Exception()
return interp(
arange(fixed_size), arange(nb_value) * (fixed_size - 1) / (nb_value - 1), datas
)
@property
def mean_coordinates(self):
# last coordinates == first
return self.vertices[1:].mean(axis=0)
@property
def lon(self):
return self.vertices[:, 0]
@property
def lat(self):
return self.vertices[:, 1]
BasePath.mean_coordinates = mean_coordinates
BasePath.lon = lon
BasePath.lat = lat
[docs]@njit(cache=True)
def value_on_regular_contour(x_g, y_g, z_g, m_g, vertices, num_fac=2, fixed_size=None):
x_val, y_val = vertices[:, 0], vertices[:, 1]
x_new, y_new = uniform_resample(x_val, y_val, num_fac, fixed_size)
return interp2d_geo(x_g, y_g, z_g, m_g, x_new[1:], y_new[1:])
[docs]@njit(cache=True)
def mean_on_regular_contour(
x_g, y_g, z_g, m_g, vertices, num_fac=2, fixed_size=None, nan_remove=False
):
x_val, y_val = vertices[:, 0], vertices[:, 1]
x_new, y_new = uniform_resample(x_val, y_val, num_fac, fixed_size)
values = interp2d_geo(x_g, y_g, z_g, m_g, x_new[1:], y_new[1:])
if nan_remove:
return nanmean(values)
else:
return values.mean()
[docs]def fit_circle_path(self, method="fit"):
if not hasattr(self, "_circle_params"):
self._circle_params = dict()
if method not in self._circle_params.keys():
if method == "fit":
self._circle_params["fit"] = _fit_circle_path(self.vertices)
if method == "equal_area":
self._circle_params["equal_area"] = _circle_from_equal_area(self.vertices)
return self._circle_params[method]
@njit(cache=True, fastmath=True)
def _circle_from_equal_area(vertice):
lons, lats = vertice[:, 0], vertice[:, 1]
# last coordinates == first
lon0, lat0 = lons[1:].mean(), lats[1:].mean()
c_x, c_y = coordinates_to_local(lons, lats, lon0, lat0)
# Some time, edge is only a dot of few coordinates
d_lon = lons.max() - lons.min()
d_lat = lats.max() - lats.min()
if d_lon < 1e-7 and d_lat < 1e-7:
# logger.warning('An edge is only define in one position')
# logger.debug('%d coordinates %s,%s', len(lons),lons,
# lats)
return 0, -90, nan, nan
return lon0, lat0, (poly_area(c_x, c_y) / pi) ** 0.5, nan
@njit(cache=True, fastmath=True)
def _fit_circle_path(vertice):
lons, lats = vertice[:, 0], vertice[:, 1]
# last coordinates == first
lon0, lat0 = lons[1:].mean(), lats[1:].mean()
c_x, c_y = coordinates_to_local(lons, lats, lon0, lat0)
# Some time, edge is only a dot of few coordinates
d_lon = lons.max() - lons.min()
d_lat = lats.max() - lats.min()
if d_lon < 1e-7 and d_lat < 1e-7:
# logger.warning('An edge is only define in one position')
# logger.debug('%d coordinates %s,%s', len(lons),lons,
# lats)
return 0, -90, nan, nan
centlon, centlat, eddy_radius, err = fit_circle(c_x, c_y)
centlon, centlat = local_to_coordinates(centlon, centlat, lon0, lat0)
centlon = (centlon - lon0 + 180) % 360 + lon0 - 180
return centlon, centlat, eddy_radius, err
@njit(cache=True, fastmath=True)
def _get_pixel_in_unregular(vertices, x_c, y_c, x_start, x_stop, y_start, y_stop):
nb_x, nb_y = x_stop - x_start, y_stop - y_start
wn = empty((nb_x, nb_y), dtype=numba_types.bool_)
for i in range(nb_x):
for j in range(nb_y):
x_pt = x_c[i + x_start, j + y_start]
y_pt = y_c[i + x_start, j + y_start]
wn[i, j] = winding_number_poly(x_pt, y_pt, vertices)
i_x, i_y = where(wn)
i_x += x_start
i_y += y_start
return i_x, i_y
BasePath.fit_circle = fit_circle_path
[docs]def pixels_in(self, grid):
if not hasattr(self, "_slice"):
self._slice = grid.bbox_indice(self.vertices)
if not hasattr(self, "_pixels_in"):
self._pixels_in = grid.get_pixels_in(self)
return self._pixels_in
@property
def bbox_slice(self):
if not hasattr(self, "_slice"):
raise Exception("No pixels_in call before!")
return self._slice
@property
def pixels_index(self):
if not hasattr(self, "_slice"):
raise Exception("No pixels_in call before!")
return self._pixels_in
@property
def nb_pixel(self):
if not hasattr(self, "_pixels_in"):
raise Exception("No pixels_in call before!")
return self._pixels_in[0].shape[0]
BasePath.pixels_in = pixels_in
BasePath.pixels_index = pixels_index
BasePath.bbox_slice = bbox_slice
BasePath.nb_pixel = nb_pixel
[docs]class GridDataset(object):
"""
Class to have basic tool on NetCDF Grid
"""
__slots__ = (
"_x_var",
"_y_var",
"x_c",
"y_c",
"x_bounds",
"y_bounds",
"centered",
"x_dim",
"y_dim",
"coordinates",
"filename",
"dimensions",
"indexs",
"variables_description",
"global_attrs",
"vars",
"interpolators",
"speed_coef",
"contours",
)
GRAVITY = 9.807
EARTH_RADIUS = 6370997.0
# EARTH_RADIUS = 6378136.3
# indice margin (if put to 0, raise warning that i don't understand)
N = 1
def __init__(
self, filename, x_name, y_name, centered=None, indexs=None, unset=False
):
"""
:param str filename: Filename to load
:param str x_name: Name of longitude coordinates
:param str y_name: Name of latitude coordinates
:param bool,None centered: Allow to know how coordinates could be used with pixel
:param dict indexs: A dictionary which set indexs to use for non-coordinate dimensions
:param bool unset: Set to True to create an empty grid object without file
"""
self.dimensions = None
self.variables_description = None
self.global_attrs = None
self.x_c = None
self.y_c = None
self.x_bounds = None
self.y_bounds = None
self.x_dim = None
self.y_dim = None
self.centered = centered
self.contours = None
self.filename = filename
self.coordinates = x_name, y_name
self.vars = dict()
self.indexs = dict() if indexs is None else indexs
self.interpolators = dict()
if centered is None:
logger.warning(
"We assume pixel position of grid is center for %s", filename
)
if not unset:
self.load_general_features()
self.load()
@property
def is_centered(self):
"""Give True if pixel is described with its center's position or
a corner
:return: True if centered
:rtype: bool
"""
if self.centered is None:
return True
else:
return self.centered
[docs] def load_general_features(self):
"""Load attrs to be stored in object"""
logger.debug(
"Load general feature from %(filename)s", dict(filename=self.filename)
)
with Dataset(self.filename) as h:
# Load generals
self.dimensions = {i: len(v) for i, v in h.dimensions.items()}
self.variables_description = dict()
for i, v in h.variables.items():
args = (i, v.datatype)
kwargs = dict(dimensions=v.dimensions, zlib=True)
if hasattr(v, "_FillValue"):
kwargs["fill_value"] = (v._FillValue,)
attrs = dict()
for attr in v.ncattrs():
if attr in kwargs.keys():
continue
if attr == "_FillValue":
continue
attrs[attr] = getattr(v, attr)
self.variables_description[i] = dict(
args=args, kwargs=kwargs, attrs=attrs, infos=dict()
)
self.global_attrs = {attr: getattr(h, attr) for attr in h.ncattrs()}
[docs] def write(self, filename):
"""Write dataset output with same format as input
:param str filename: filename used to save the grid
"""
with Dataset(filename, "w") as h_out:
for dimension, size in self.dimensions.items():
test = False
for varname, variable in self.variables_description.items():
if (
varname not in self.coordinates
and varname not in self.vars.keys()
):
continue
if dimension in variable["kwargs"]["dimensions"]:
test = True
break
if test:
h_out.createDimension(dimension, size)
for varname, variable in self.variables_description.items():
if varname not in self.coordinates and varname not in self.vars.keys():
continue
var = h_out.createVariable(*variable["args"], **variable["kwargs"])
for key, value in variable["attrs"].items():
setattr(var, key, value)
infos = self.variables_description[varname]["infos"]
if infos.get("transpose", False):
var[:] = self.vars[varname].T
else:
var[:] = self.vars[varname]
for attr, value in self.global_attrs.items():
setattr(h_out, attr, value)
[docs] def load(self):
"""
Load variable (data).
Get coordinates and setup coordinates function
"""
x_name, y_name = self.coordinates
with Dataset(self.filename) as h:
self.x_dim = h.variables[x_name].dimensions
self.y_dim = h.variables[y_name].dimensions
sl_x = [self.indexs.get(dim, slice(None)) for dim in self.x_dim]
sl_y = [self.indexs.get(dim, slice(None)) for dim in self.y_dim]
self.vars[x_name] = h.variables[x_name][sl_x]
self.vars[y_name] = h.variables[y_name][sl_y]
self.setup_coordinates()
[docs] @staticmethod
def c_to_bounds(c):
"""
Centred coordinates to bounds coordinates
:param array c: centred coordinates to translate
:return: bounds coordinates
"""
bounds = concatenate((c, (2 * c[-1] - c[-2],)))
d = bounds[1:] - bounds[:-1]
bounds[:-1] -= d / 2
bounds[-1] -= d[-1] / 2
return bounds
[docs] def setup_coordinates(self):
x_name, y_name = self.coordinates
if self.is_centered:
logger.info("Grid center")
self.x_c = self.vars[x_name].astype("float64")
self.y_c = self.vars[y_name].astype("float64")
self.x_bounds = concatenate((self.x_c, (2 * self.x_c[-1] - self.x_c[-2],)))
self.y_bounds = concatenate((self.y_c, (2 * self.y_c[-1] - self.y_c[-2],)))
d_x = self.x_bounds[1:] - self.x_bounds[:-1]
d_y = self.y_bounds[1:] - self.y_bounds[:-1]
self.x_bounds[:-1] -= d_x / 2
self.x_bounds[-1] -= d_x[-1] / 2
self.y_bounds[:-1] -= d_y / 2
self.y_bounds[-1] -= d_y[-1] / 2
else:
self.x_bounds = self.vars[x_name].astype("float64")
self.y_bounds = self.vars[y_name].astype("float64")
if len(self.x_dim) == 1:
self.x_c = self.x_bounds.copy()
dx2 = (self.x_bounds[1:] - self.x_bounds[:-1]) / 2
self.x_c[:-1] += dx2
self.x_c[-1] += dx2[-1]
self.y_c = self.y_bounds.copy()
dy2 = (self.y_bounds[1:] - self.y_bounds[:-1]) / 2
self.y_c[:-1] += dy2
self.y_c[-1] += dy2[-1]
else:
raise Exception("not write")
[docs] def is_circular(self):
"""Check grid circularity"""
return False
[docs] def units(self, varname):
"""Get unit from variable"""
stored_units = self.variables_description[varname]["attrs"].get("units", None)
if stored_units is not None:
return stored_units
with Dataset(self.filename) as h:
var = h.variables[varname]
if hasattr(var, "units"):
return var.units
@property
def variables(self):
return self.variables_description.keys()
[docs] def copy(self, grid_in, grid_out):
"""
Duplicate the variable from grid_in in grid_out
:param grid_in:
:param grid_out:
"""
h_dict = self.variables_description[grid_in]
self.variables_description[grid_out] = dict(
infos=h_dict["infos"].copy(),
attrs=h_dict["attrs"].copy(),
args=tuple((grid_out, *h_dict["args"][1:])),
kwargs=h_dict["kwargs"].copy(),
)
self.vars[grid_out] = self.grid(grid_in).copy()
[docs] def add_grid(self, varname, grid):
"""
Add a grid in handler
:param str varname: name of the future grid
:param array grid: grid array
"""
self.vars[varname] = grid
[docs] def grid(self, varname, indexs=None):
"""Give the grid required
:param str varname: Variable to get
:param dict,None indexs: If defined dict must have dimensions name as key
:return: array asked, reduced by the indexes
:rtype: array
.. minigallery:: py_eddy_tracker.GridDataset.grid
"""
if indexs is None:
indexs = dict()
if varname not in self.vars:
coordinates_dims = list(self.x_dim)
coordinates_dims.extend(list(self.y_dim))
logger.debug(
"Load %(varname)s from %(filename)s",
dict(varname=varname, filename=self.filename),
)
with Dataset(self.filename) as h:
dims = h.variables[varname].dimensions
sl = [
indexs.get(
dim,
self.indexs.get(
dim, slice(None) if dim in coordinates_dims else 0
),
)
for dim in dims
]
self.vars[varname] = h.variables[varname][sl]
if len(self.x_dim) == 1:
i_x = where(array(dims) == self.x_dim)[0][0]
i_y = where(array(dims) == self.y_dim)[0][0]
if i_x > i_y:
self.variables_description[varname]["infos"]["transpose"] = True
self.vars[varname] = self.vars[varname].T
if not hasattr(self.vars[varname], "mask"):
self.vars[varname] = ma.array(
self.vars[varname],
mask=zeros(self.vars[varname].shape, dtype="bool"),
)
return self.vars[varname]
[docs] def grid_tiles(self, varname, slice_x, slice_y):
"""Give the grid tiles required, without buffer system"""
coordinates_dims = list(self.x_dim)
coordinates_dims.extend(list(self.y_dim))
logger.debug(
"Extract %(varname)s from %(filename)s with slice(x:%(slice_x)s,y:%(slice_y)s)",
dict(
varname=varname,
filename=self.filename,
slice_y=slice_y,
slice_x=slice_x,
),
)
with Dataset(self.filename) as h:
dims = h.variables[varname].dimensions
sl = [
(slice_x if dim in list(self.x_dim) else slice_y)
if dim in coordinates_dims
else 0
for dim in dims
]
data = h.variables[varname][sl]
if len(self.x_dim) == 1:
i_x = where(array(dims) == self.x_dim)[0][0]
i_y = where(array(dims) == self.y_dim)[0][0]
if i_x > i_y:
data = data.T
if not hasattr(data, "mask"):
data = ma.array(data, mask=zeros(data.shape, dtype="bool"))
return data
[docs] def high_filter(self, grid_name, w_cut, **kwargs):
"""Return the grid high-pass filtered, by substracting to the grid the low-pass filter (default: order=1)
:param grid_name: the name of the grid
:param int, w_cut: the half-power wavelength cutoff (km)
"""
result = self._low_filter(grid_name, w_cut, **kwargs)
self.vars[grid_name] -= result
[docs] def low_filter(self, grid_name, w_cut, **kwargs):
"""Return the grid low-pass filtered (default: order=1)
:param grid_name: the name of the grid
:param int, w_cut: the half-power wavelength cutoff (km)
"""
result = self._low_filter(grid_name, w_cut, **kwargs)
self.vars[grid_name] -= self.vars[grid_name] - result
@property
def bounds(self):
"""Give bounds"""
return (
self.x_bounds.min(),
self.x_bounds.max(),
self.y_bounds.min(),
self.y_bounds.max(),
)
[docs] def eddy_identification(
self,
grid_height,
uname,
vname,
date,
step=0.005,
shape_error=55,
sampling=50,
sampling_method="visvalingam",
pixel_limit=None,
precision=None,
force_height_unit=None,
force_speed_unit=None,
**kwargs,
):
"""
Compute eddy identification on the pecified grid
:param str grid_height: Grid name of Sea Surface Height
:param str uname: Grid name of u speed component
:param str vname: Grid name of v speed component
:param datetime.datetime date: Date which will be stored in object to date data
:param float,int step: Height between two layers in m
:param float,int shape_error: Maximal error allowed for outter contour in %
:param int sampling: Number of points to store contours and speed profile
:param str sampling_method: Method to resample 'uniform' or 'visvalingam'
:param (int,int),None pixel_limit:
Min and max number of pixels inside the inner and the outer contour to be considered as an eddy
:param float,None precision: Truncate values at the defined precision in m
:param str force_height_unit: Unit used for height unit
:param str force_speed_unit: Unit used for speed unit
:param dict kwargs: Argument given to amplitude
:return: Return a list of 2 elements: Anticyclone and Cyclone
:rtype: py_eddy_tracker.observations.observation.EddiesObservations
.. minigallery:: py_eddy_tracker.GridDataset.eddy_identification
"""
if not isinstance(date, datetime):
raise Exception("Date argument be a datetime object")
# The inf limit must be in pixel and sup limit in surface
if pixel_limit is None:
pixel_limit = (4, 1000)
# Compute an interpolator for eke
self.init_speed_coef(uname, vname)
# Get unit of h grid
h_units = (
self.units(grid_height) if force_height_unit is None else force_height_unit
)
units = UnitRegistry()
in_h_unit = units.parse_expression(h_units)
if in_h_unit is not None:
factor, _ = in_h_unit.to("m").to_tuple()
logger.info(
"We will apply on step a factor to be coherent with grid : %f",
1 / factor,
)
step /= factor
if precision is not None:
precision /= factor
# Get ssh grid
data = self.grid(grid_height).astype("f8")
# In case of a reduce mask
if len(data.mask.shape) == 0 and not data.mask:
data.mask = zeros(data.shape, dtype="bool")
# we remove noisy information
if precision is not None:
data = (data / precision).round() * precision
# Compute levels for ssh
z_min, z_max = data.min(), data.max()
d_z = z_max - z_min
data_tmp = data[~data.mask]
epsilon = 0.001 # in %
z_min_p, z_max_p = (
percentile(data_tmp, epsilon),
percentile(data_tmp, 100 - epsilon),
)
d_zp = z_max_p - z_min_p
if d_z / d_zp > 2:
logger.warning(
"Maybe some extrema are present zmin %f (m) and zmax %f (m) will be replace by %f and %f",
z_min,
z_max,
z_min_p,
z_max_p,
)
z_min, z_max = z_min_p, z_max_p
levels = arange(z_min - z_min % step, z_max - z_max % step + 2 * step, step)
# Get x and y values
x, y = self.x_c, self.y_c
# Compute ssh contour
self.contours = Contours(x, y, data, levels, wrap_x=self.is_circular())
out_sampling = dict(fixed_size=sampling)
resample = visvalingam if sampling_method == "visvalingam" else uniform_resample
track_extra_variables = [
"height_max_speed_contour",
"height_external_contour",
"height_inner_contour",
"lon_max",
"lat_max",
]
array_variables = [
"contour_lon_e",
"contour_lat_e",
"contour_lon_s",
"contour_lat_s",
"uavg_profile",
]
# Complete cyclonic and anticylonic research:
a_and_c = list()
for anticyclonic_search in [True, False]:
eddies = list()
iterator = 1 if anticyclonic_search else -1
# Loop over each collection
for coll_ind, coll in enumerate(self.contours.iter(step=iterator)):
corrected_coll_index = coll_ind
if iterator == -1:
corrected_coll_index = -coll_ind - 1
contour_paths = coll.get_paths()
nb_paths = len(contour_paths)
if nb_paths == 0:
continue
cvalues = self.contours.cvalues[corrected_coll_index]
logger.debug(
"doing collection %s, contour value %.4f, %d paths",
corrected_coll_index,
cvalues,
nb_paths,
)
# Loop over individual c_s contours (i.e., every eddy in field)
for contour in contour_paths:
if contour.used:
continue
# FIXME : center could be not in contour and fit on raw sampling
_, _, _, aerr = contour.fit_circle()
# Filter for shape
if aerr < 0 or aerr > shape_error or isnan(aerr):
contour.reject = 1
continue
# Find all pixels in the contour
i_x_in, i_y_in = contour.pixels_in(self)
# Check if pixels in contour are masked
if has_masked_value(data.mask, i_x_in, i_y_in):
if contour.reject == 0:
contour.reject = 2
continue
# Test of the rotating sense: cyclone or anticyclone
if has_value(
data, i_x_in, i_y_in, cvalues, below=anticyclonic_search
):
continue
# FIXME : Maybe limit max must be replace with a maximum of surface
if (
contour.nb_pixel < pixel_limit[0]
or contour.nb_pixel > pixel_limit[1]
):
contour.reject = 3
continue
# Compute amplitude
reset_centroid, amp = self.get_amplitude(
contour,
cvalues,
data,
anticyclonic_search=anticyclonic_search,
level=self.contours.levels[corrected_coll_index],
interval=step,
**kwargs,
)
# If we have a valid amplitude
if (not amp.within_amplitude_limits()) or (amp.amplitude == 0):
contour.reject = 4
continue
if reset_centroid:
if self.is_circular():
centi = self.normalize_x_indice(reset_centroid[0])
else:
centi = reset_centroid[0]
centj = reset_centroid[1]
# To move in regular and unregular grid
if len(x.shape) == 1:
centlon_e = x[centi]
centlat_e = y[centj]
else:
centlon_e = x[centi, centj]
centlat_e = y[centi, centj]
# centlat_e and centlon_e must be index of maximum, we will loose some inner contour if it's not
(
max_average_speed,
speed_contour,
inner_contour,
speed_array,
i_max_speed,
i_inner,
) = self.get_uavg(
self.contours,
centlon_e,
centlat_e,
contour,
anticyclonic_search,
corrected_coll_index,
pixel_min=pixel_limit[0],
)
# FIXME : Instantiate new EddyObservation object (high cost need to be reviewed)
obs = EddiesObservations(
size=1,
track_extra_variables=track_extra_variables,
track_array_variables=sampling,
array_variables=array_variables,
)
obs.height_max_speed_contour[:] = self.contours.cvalues[i_max_speed]
obs.height_external_contour[:] = cvalues
obs.height_inner_contour[:] = self.contours.cvalues[i_inner]
array_size = speed_array.shape[0]
obs.nb_contour_selected[:] = array_size
if speed_array.shape[0] == 1:
obs.uavg_profile[:] = speed_array[0]
else:
obs.uavg_profile[:] = raw_resample(speed_array, sampling)
obs.amplitude[:] = amp.amplitude
obs.speed_average[:] = max_average_speed
obs.num_point_e[:] = contour.lon.shape[0]
xy_e = resample(contour.lon, contour.lat, **out_sampling)
obs.contour_lon_e[:], obs.contour_lat_e[:] = xy_e
obs.num_point_s[:] = speed_contour.lon.shape[0]
xy_s = resample(
speed_contour.lon, speed_contour.lat, **out_sampling
)
obs.contour_lon_s[:], obs.contour_lat_s[:] = xy_s
# FIXME : we use a contour without resampling
# First, get position based on innermost contour
centlon_i, centlat_i, _, _ = _fit_circle_path(
create_vertice(inner_contour.lon, inner_contour.lat)
)
# Second, get speed-based radius based on contour of max uavg
centlon_s, centlat_s, eddy_radius_s, aerr_s = _fit_circle_path(
create_vertice(*xy_s)
)
# Compute again to use resampled contour
_, _, eddy_radius_e, aerr_e = _fit_circle_path(
create_vertice(*xy_e)
)
obs.radius_s[:] = eddy_radius_s
obs.radius_e[:] = eddy_radius_e
obs.shape_error_e[:] = aerr_e
obs.shape_error_s[:] = aerr_s
obs.speed_area[:] = poly_area(
*coordinates_to_local(*xy_s, lon0=centlon_s, lat0=centlat_s)
)
obs.effective_area[:] = poly_area(
*coordinates_to_local(*xy_e, lon0=centlon_s, lat0=centlat_s)
)
obs.lon[:] = centlon_s
obs.lat[:] = centlat_s
obs.lon_max[:] = centlon_i
obs.lat_max[:] = centlat_i
if aerr > 99.9 or aerr_s > 99.9:
logger.warning(
"Strange shape at this step! shape_error : %f, %f",
aerr,
aerr_s,
)
eddies.append(obs)
# To reserve definitively the area
data.mask[i_x_in, i_y_in] = True
if len(eddies) == 0:
eddies = EddiesObservations(
track_extra_variables=track_extra_variables,
track_array_variables=sampling,
array_variables=array_variables,
)
else:
eddies = EddiesObservations.concatenate(eddies)
eddies.sign_type = 1 if anticyclonic_search else -1
eddies.time[:] = (date - datetime(1950, 1, 1)).total_seconds() / 86400.0
# normalization longitude between 0 - 360, because storage have an offset on 180
eddies.lon_max[:] %= 360
eddies.lon[:] %= 360
ref = eddies.lon - 180
eddies.contour_lon_e[:] = ((eddies.contour_lon_e.T - ref) % 360 + ref).T
eddies.contour_lon_s[:] = ((eddies.contour_lon_s.T - ref) % 360 + ref).T
a_and_c.append(eddies)
if in_h_unit is not None:
for name in [
"amplitude",
"height_max_speed_contour",
"height_external_contour",
"height_inner_contour",
]:
out_unit = units.parse_expression(VAR_DESCR[name]["nc_attr"]["units"])
factor, _ = in_h_unit.to(out_unit).to_tuple()
a_and_c[0].obs[name] *= factor
a_and_c[1].obs[name] *= factor
u_units = self.units(uname) if force_speed_unit is None else force_speed_unit
in_u_units = units.parse_expression(u_units)
if in_u_units is not None:
for name in ["speed_average", "uavg_profile"]:
out_unit = units.parse_expression(VAR_DESCR[name]["nc_attr"]["units"])
factor, _ = in_u_units.to(out_unit).to_tuple()
a_and_c[0].obs[name] *= factor
a_and_c[1].obs[name] *= factor
return a_and_c
[docs] def get_uavg(
self,
all_contours,
centlon_e,
centlat_e,
original_contour,
anticyclonic_search,
level_start,
pixel_min=3,
):
"""
Calculate geostrophic speed around successive contours
Returns the average
"""
# Init max speed to search maximum
max_average_speed = self.speed_coef_mean(original_contour)
speed_array = [max_average_speed]
eddy_contours = [original_contour]
inner_contour = selected_contour = original_contour
# Must start only on upper or lower contour, no need to test the two part
step = 1 if anticyclonic_search else -1
i_inner = i_max_speed = -1
for i, coll in enumerate(
all_contours.iter(start=level_start + step, step=step)
):
level_contour = coll.get_nearest_path_bbox_contain_pt(centlon_e, centlat_e)
# Leave loop if no contours at level
if level_contour is None:
break
# Ensure polygon_i is within polygon_e
if not poly_contain_poly(original_contour.vertices, level_contour.vertices):
break
# 3. Respect size range (for max speed)
# nb_pixel properties need to call pixels_in before with a grid of pixel
level_contour.pixels_in(self)
# Interpolate uspd to seglon, seglat, then get mean
level_average_speed = self.speed_coef_mean(level_contour)
speed_array.append(level_average_speed)
if (
pixel_min < level_contour.nb_pixel
and level_average_speed >= max_average_speed
):
max_average_speed = level_average_speed
i_max_speed = i
selected_contour = level_contour
inner_contour = level_contour
eddy_contours.append(level_contour)
i_inner = i
for contour in eddy_contours:
contour.used = True
i_max_speed = level_start + step + step * i_max_speed
i_inner = level_start + step + step * i_inner
return (
max_average_speed,
selected_contour,
inner_contour,
array(speed_array),
i_max_speed,
i_inner,
)
@staticmethod
def _gaussian_filter(data, sigma, mode="reflect"):
"""Standard gaussian filter"""
local_data = data.copy()
local_data[data.mask] = 0
v = gaussian_filter(local_data, sigma=sigma, mode=mode)
w = gaussian_filter(float_(~data.mask), sigma=sigma, mode=mode)
with errstate(invalid="ignore"):
return ma.array(v / w, mask=w == 0)
[docs] @staticmethod
def get_amplitude(
contour, contour_height, data, anticyclonic_search=True, level=None, **kwargs
):
# Instantiate Amplitude object
amp = Amplitude(
# Indices of all pixels in contour
contour=contour,
# Height of level
contour_height=contour_height,
# All grid
data=data,
**kwargs,
)
if anticyclonic_search:
reset_centroid = amp.all_pixels_above_h0(level)
else:
reset_centroid = amp.all_pixels_below_h0(level)
return reset_centroid, amp
[docs]class UnRegularGridDataset(GridDataset):
"""Class managing unregular grid"""
__slots__ = (
"index_interp",
"_speed_norm",
)
[docs] def load(self):
"""Load variable (data)"""
x_name, y_name = self.coordinates
with Dataset(self.filename) as h:
self.x_dim = h.variables[x_name].dimensions
self.y_dim = h.variables[y_name].dimensions
sl_x = [self.indexs.get(dim, slice(None)) for dim in self.x_dim]
sl_y = [self.indexs.get(dim, slice(None)) for dim in self.y_dim]
self.vars[x_name] = h.variables[x_name][sl_x]
self.vars[y_name] = h.variables[y_name][sl_y]
self.x_c = self.vars[x_name]
self.y_c = self.vars[y_name]
self.init_pos_interpolator()
@property
def bounds(self):
"""Give bound"""
return self.x_c.min(), self.x_c.max(), self.y_c.min(), self.y_c.max()
[docs] def bbox_indice(self, vertices):
dist, idx = self.index_interp.query(vertices, k=1)
i_y = idx % self.x_c.shape[1]
i_x = int_((idx - i_y) / self.x_c.shape[1])
return (
(max(i_x.min() - self.N, 0), i_x.max() + self.N + 1),
(max(i_y.min() - self.N, 0), i_y.max() + self.N + 1),
)
[docs] def get_pixels_in(self, contour):
(x_start, x_stop), (y_start, y_stop) = contour.bbox_slice
return _get_pixel_in_unregular(
contour.vertices, self.x_c, self.y_c, x_start, x_stop, y_start, y_stop
)
[docs] def normalize_x_indice(self, indices):
"""Not do"""
return indices
[docs] def nearest_grd_indice(self, x, y):
dist, idx = self.index_interp.query((x, y), k=1)
i_y = idx % self.x_c.shape[1]
i_x = int_((idx - i_y) / self.x_c.shape[1])
return i_x, i_y
[docs] def compute_pixel_path(self, x0, y0, x1, y1):
pass
[docs] def init_pos_interpolator(self):
logger.debug("Create a KdTree could be long ...")
self.index_interp = cKDTree(
create_vertice(self.x_c.reshape(-1), self.y_c.reshape(-1))
)
logger.debug("... OK")
def _low_filter(self, grid_name, w_cut, factor=8.0):
data = self.grid(grid_name)
x = self.grid(self.coordinates[0])
y = self.grid(self.coordinates[1])
regrid_step = w_cut / 111.0 / factor
x_min, x_max, y_min, y_max = self.bounds
x_array = arange(x_min, x_max + regrid_step, regrid_step)
y_array = arange(y_min, min(y_max + regrid_step, 89), regrid_step)
bins = (x_array, y_array)
x_flat, y_flat, z_flat = x.reshape((-1,)), y.reshape((-1,)), data.reshape((-1,))
m = ~z_flat.mask
x_flat, y_flat, z_flat = x_flat[m], y_flat[m], z_flat[m]
nb_value, _, _ = histogram2d(x_flat, y_flat, bins=bins)
sum_value, _, _ = histogram2d(x_flat, y_flat, bins=bins, weights=z_flat)
with errstate(invalid="ignore"):
z_grid = ma.array(sum_value / nb_value, mask=nb_value == 0)
regular_grid = RegularGridDataset.with_array(
coordinates=self.coordinates,
datas={
grid_name: z_grid,
self.coordinates[0]: x_array[:-1],
self.coordinates[1]: y_array[:-1],
},
centered=False,
)
regular_grid.bessel_low_filter(grid_name, w_cut, order=1)
z_filtered = regular_grid.grid(grid_name)
x_center = (x_array[:-1] + x_array[1:]) / 2
y_center = (y_array[:-1] + y_array[1:]) / 2
opts_interpolation = dict(kx=1, ky=1, s=0)
m_interp = RectBivariateSpline(
x_center, y_center, z_filtered.mask, **opts_interpolation
)
z_filtered.data[z_filtered.mask] = 0
z_interp = RectBivariateSpline(
x_center, y_center, z_filtered.data, **opts_interpolation
).ev(x, y)
return ma.array(z_interp, mask=m_interp.ev(x, y) > 0.00001)
[docs] def speed_coef_mean(self, contour):
dist, idx = self.index_interp.query(
uniform_resample_stack(contour.vertices)[1:], k=4
)
i_y = idx % self.x_c.shape[1]
i_x = int_((idx - i_y) / self.x_c.shape[1])
# A simplified solution to be change by a weight mean
return self._speed_norm[i_x, i_y].mean(axis=1).mean()
[docs] def init_speed_coef(self, uname="u", vname="v"):
self._speed_norm = (self.grid(uname) ** 2 + self.grid(vname) ** 2) ** 0.5
[docs]class RegularGridDataset(GridDataset):
"""Class only for regular grid"""
__slots__ = (
"_speed_ev",
"_is_circular",
"x_size",
"_x_step",
"_y_step",
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._is_circular = None
[docs] def setup_coordinates(self):
super().setup_coordinates()
self.x_size = self.x_c.shape[0]
self._x_step = (self.x_c[1:] - self.x_c[:-1]).mean()
self._y_step = (self.y_c[1:] - self.y_c[:-1]).mean()
[docs] @classmethod
def with_array(cls, coordinates, datas, variables_description=None, **kwargs):
"""
Geo matrix data must be ordered like this (X,Y) and masked with numpy.ma.array
"""
vd = dict() if variables_description is None else variables_description
x_name, y_name = coordinates[0], coordinates[1]
obj = cls("array", x_name, y_name, unset=True, **kwargs)
obj.x_dim = (x_name,)
obj.y_dim = (y_name,)
obj.variables_description = dict()
obj.dimensions = {i: v.shape[0] for i, v in datas.items() if i in coordinates}
for k, v in datas.items():
obj.vars[k] = v
obj.variables_description[k] = dict(
attrs=vd.get(k, dict()),
args=(k, v.dtype),
kwargs=dict(
dimensions=coordinates if k not in coordinates else (k,),
complevel=1,
zlib=True,
),
infos=dict(),
)
obj.global_attrs = dict(history="Grid setup with an array")
obj.setup_coordinates()
return obj
[docs] def bbox_indice(self, vertices):
return bbox_indice_regular(
vertices,
self.x_bounds,
self.y_bounds,
self.xstep,
self.ystep,
self.N,
self.is_circular(),
self.x_size,
)
[docs] def get_pixels_in(self, contour):
"""
Get indices of pixels in contour.
:param vertice,Path contour: Contour which enclosed some pixels
:return: Indices of grid in contour
:rtype: array[int],array[int]
"""
if isinstance(contour, BasePath):
(x_start, x_stop), (y_start, y_stop) = contour.bbox_slice
return get_pixel_in_regular(
contour.vertices, self.x_c, self.y_c, x_start, x_stop, y_start, y_stop
)
else:
(x_start, x_stop), (y_start, y_stop) = self.bbox_indice(contour)
return get_pixel_in_regular(
contour, self.x_c, self.y_c, x_start, x_stop, y_start, y_stop
)
[docs] def normalize_x_indice(self, indices):
return indices % self.x_size
[docs] def nearest_grd_indice(self, x, y):
return nearest_grd_indice(
x, y, self.x_bounds, self.y_bounds, self.xstep, self.ystep
)
@property
def xstep(self):
"""Only for regular grid with no step variation"""
return self._x_step
@property
def ystep(self):
"""Only for regular grid with no step variation"""
return self._y_step
[docs] def compute_pixel_path(self, x0, y0, x1, y1):
"""Give a series of indexes which describe the path between to position"""
return compute_pixel_path(
x0,
y0,
x1,
y1,
self.x_bounds[0],
self.y_bounds[0],
self.xstep,
self.ystep,
self.x_size,
)
[docs] def clean_land(self):
"""Function to remove all land pixel"""
pass
[docs] def is_circular(self):
"""Check if the grid is circular"""
if self._is_circular is None:
self._is_circular = (
abs((self.x_bounds[0] % 360) - (self.x_bounds[-1] % 360)) < 0.0001
)
return self._is_circular
[docs] @staticmethod
def check_order(order):
if order < 1:
logger.warning("order must be superior to 0")
return ceil(order).astype(int)
[docs] def get_step_in_km(self, lat, wave_length):
step_y_km = self.ystep * distance(0, 0, 0, 1) / 1000
step_x_km = self.xstep * distance(0, lat, 1, lat) / 1000
min_wave_length = max(step_x_km, step_y_km) * 2
if wave_length < min_wave_length:
logger.error(
"Wave_length too short for resolution, must be > %d km",
ceil(min_wave_length),
)
raise Exception()
return step_x_km, step_y_km
[docs] def estimate_kernel_shape(self, lat, wave_length, order):
step_x_km, step_y_km = self.get_step_in_km(lat, wave_length)
# half size will be multiply with by order
half_x_pt, half_y_pt = (
ceil(wave_length / step_x_km).astype(int),
ceil(wave_length / step_y_km).astype(int),
)
# x size is not good over 60 degrees
y = arange(
lat - self.ystep * half_y_pt * order,
lat + self.ystep * half_y_pt * order + 0.01 * self.ystep,
self.ystep,
)
# We compute half + 1 and the other part will be compute by symetry
x = arange(0, self.xstep * half_x_pt * order + 0.01 * self.xstep, self.xstep)
y, x = meshgrid(y, x)
dist_norm = distance(0, lat, x, y) / 1000.0 / wave_length
return half_x_pt, half_y_pt, dist_norm
[docs] def finalize_kernel(self, kernel, order, half_x_pt, half_y_pt):
# Symetry
kernel_ = empty((half_x_pt * 2 * order + 1, half_y_pt * 2 * order + 1))
kernel_[half_x_pt * order :] = kernel
kernel_[: half_x_pt * order] = kernel[:0:-1]
# remove unused row/column
k_valid = kernel_ != 0
x_valid = where(k_valid.sum(axis=1))[0]
x_slice = slice(x_valid[0], x_valid[-1] + 1)
y_valid = where(k_valid.sum(axis=0))[0]
y_slice = slice(y_valid[0], y_valid[-1] + 1)
return kernel_[x_slice, y_slice]
[docs] def kernel_lanczos(self, lat, wave_length, order=1):
"""Not really operational
wave_length in km
order must be int
"""
order = self.check_order(order)
half_x_pt, half_y_pt, dist_norm = self.estimate_kernel_shape(
lat, wave_length, order
)
kernel = sinc(dist_norm / order) * sinc(dist_norm)
kernel[dist_norm > order] = 0
return self.finalize_kernel(kernel, order, half_x_pt, half_y_pt)
[docs] def kernel_bessel(self, lat, wave_length, order=1):
"""wave_length in km
order must be int
"""
order = self.check_order(order)
half_x_pt, half_y_pt, dist_norm = self.estimate_kernel_shape(
lat, wave_length, order
)
with errstate(invalid="ignore"):
kernel = sinc(dist_norm / order) * j1(2 * pi * dist_norm) / dist_norm
kernel[0, half_y_pt * order] = pi
kernel[dist_norm > order] = 0
return self.finalize_kernel(kernel, order, half_x_pt, half_y_pt)
def _low_filter(self, grid_name, w_cut, **kwargs):
"""low filtering"""
return self.convolve_filter_with_dynamic_kernel(
grid_name, self.kernel_bessel, wave_length=w_cut, **kwargs
)
[docs] def convolve_filter_with_dynamic_kernel(
self, grid, kernel_func, lat_max=85, extend=False, **kwargs_func
):
"""
:param str grid: grid name
:param func kernel_func: function of kernel to use
:param float lat_max: absolute latitude above no filtering apply
:param bool extend: if False, only non masked value will return a filtered value
:param dict kwargs_func: look at kernel_func
:return: filtered value
:rtype: array
"""
if (abs(self.y_c) > lat_max).any():
logger.warning("No filtering above %f degrees of latitude", lat_max)
if isinstance(grid, str):
data = self.grid(grid).copy()
else:
data = grid.copy()
# Matrix for result
data_out = ma.empty(data.shape)
data_out.mask = ones(data_out.shape, dtype=bool)
nb_lines = self.y_c.shape[0]
dt = list()
debug_active = logger.getEffectiveLevel() == logging.DEBUG
for i, lat in enumerate(self.y_c):
if abs(lat) > lat_max or data[:, i].mask.all():
data_out.mask[:, i] = True
continue
# Get kernel
kernel = kernel_func(lat, **kwargs_func)
# Kernel shape
k_shape = kernel.shape
t0 = datetime.now()
if debug_active and len(dt) > 0:
dt_mean = np_mean(dt) * (nb_lines - i)
print(
"Remain ",
dt_mean,
"ETA ",
t0 + dt_mean,
"current kernel size :",
k_shape,
"Step : %d/%d " % (i, nb_lines),
end="\r",
)
# Half size, k_shape must be always impair
d_lat = int((k_shape[1] - 1) / 2)
d_lon = int((k_shape[0] - 1) / 2)
# Temporary matrix to have exact shape at outuput
tmp_matrix = ma.zeros((2 * d_lon + data.shape[0], k_shape[1]))
tmp_matrix.mask = ones(tmp_matrix.shape, dtype=bool)
# Slice to apply on input data
sl_lat_data = slice(max(0, i - d_lat), min(i + d_lat, data.shape[1]))
# slice to apply on temporary matrix to store input data
sl_lat_in = slice(
d_lat - (i - sl_lat_data.start), d_lat + (sl_lat_data.stop - i)
)
# If global => manual wrapping
if self.is_circular():
tmp_matrix[:d_lon, sl_lat_in] = data[-d_lon:, sl_lat_data]
tmp_matrix[-d_lon:, sl_lat_in] = data[:d_lon, sl_lat_data]
# Copy data
tmp_matrix[d_lon:-d_lon, sl_lat_in] = data[:, sl_lat_data]
# Convolution
m = ~tmp_matrix.mask
tmp_matrix[~m] = 0
demi_x, demi_y = k_shape[0] // 2, k_shape[1] // 2
values_sum = filter2D(tmp_matrix.data, -1, kernel)[demi_x:-demi_x, demi_y]
kernel_sum = filter2D(m.astype(float), -1, kernel)[demi_x:-demi_x, demi_y]
with errstate(invalid="ignore", divide="ignore"):
if extend:
data_out[:, i] = ma.array(
values_sum / kernel_sum,
mask=kernel_sum < (extend * kernel.sum()),
)
else:
data_out[:, i] = values_sum / kernel_sum
dt.append(datetime.now() - t0)
if len(dt) == 100:
dt.pop(0)
if extend:
out = ma.array(data_out, mask=data_out.mask)
else:
out = ma.array(data_out, mask=data.mask + data_out.mask)
if debug_active:
print()
if out.dtype != data.dtype:
return out.astype(data.dtype)
return out
[docs] def lanczos_high_filter(
self, grid_name, wave_length, order=1, lat_max=85, **kwargs
):
logger.warning("It could be not safe to use lanczos filter")
data_out = self.convolve_filter_with_dynamic_kernel(
grid_name,
self.kernel_lanczos,
lat_max=lat_max,
wave_length=wave_length,
order=order,
**kwargs,
)
self.vars[grid_name] -= data_out
[docs] def lanczos_low_filter(self, grid_name, wave_length, order=1, lat_max=85, **kwargs):
logger.warning("It could be not safe to use lanczos filter")
data_out = self.convolve_filter_with_dynamic_kernel(
grid_name,
self.kernel_lanczos,
lat_max=lat_max,
wave_length=wave_length,
order=order,
**kwargs,
)
self.vars[grid_name] = data_out
[docs] def bessel_band_filter(self, grid_name, wave_length_inf, wave_length_sup, **kwargs):
data_out = self.convolve_filter_with_dynamic_kernel(
grid_name, self.kernel_bessel, wave_length=wave_length_inf, **kwargs
)
self.vars[grid_name] = data_out
data_out = self.convolve_filter_with_dynamic_kernel(
grid_name, self.kernel_bessel, wave_length=wave_length_sup, **kwargs
)
self.vars[grid_name] -= data_out
[docs] def bessel_high_filter(self, grid_name, wave_length, order=1, lat_max=85, **kwargs):
"""
:param str grid_name: grid to filter, data will replace original one
:param float wave_length: in km
:param int order: order to use, if > 1 negative values of the cardinal sinus are present in kernel
:param float lat_max: absolute latitude above no filtering apply
:param dict kwargs: look at :py:meth:`RegularGridDataset.convolve_filter_with_dynamic_kernel`
.. minigallery:: py_eddy_tracker.RegularGridDataset.bessel_high_filter
"""
logger.debug(
"Run filtering with wave of %(wave_length)s km and order of %(order)s ...",
dict(wave_length=wave_length, order=order),
)
data_out = self.convolve_filter_with_dynamic_kernel(
grid_name,
self.kernel_bessel,
lat_max=lat_max,
wave_length=wave_length,
order=order,
**kwargs,
)
logger.debug("Filtering done")
self.vars[grid_name] -= data_out
[docs] def bessel_low_filter(self, grid_name, wave_length, order=1, lat_max=85, **kwargs):
data_out = self.convolve_filter_with_dynamic_kernel(
grid_name,
self.kernel_bessel,
lat_max=lat_max,
wave_length=wave_length,
order=order,
**kwargs,
)
self.vars[grid_name] = data_out
[docs] def spectrum_lonlat(self, grid_name, area=None, ref=None, **kwargs):
if area is None:
area = dict(llcrnrlon=190, urcrnrlon=280, llcrnrlat=-62, urcrnrlat=8)
scaling = kwargs.pop("scaling", "density")
ref_grid_name = kwargs.pop("ref_grid_name", None)
x0, y0 = self.nearest_grd_indice(area["llcrnrlon"], area["llcrnrlat"])
x1, y1 = self.nearest_grd_indice(area["urcrnrlon"], area["urcrnrlat"])
data = self.grid(grid_name)[x0:x1, y0:y1]
# Lat spectrum
pws = list()
step_y_km = self.ystep * distance(0, 0, 0, 1) / 1000
nb_invalid = 0
for i, _ in enumerate(self.x_c[x0:x1]):
f, pw = welch(data[i, :], 1 / step_y_km, scaling=scaling, **kwargs)
if isnan(pw).any():
nb_invalid += 1
continue
pws.append(pw)
if nb_invalid:
logger.warning("%d/%d columns invalid", nb_invalid, i + 1)
with errstate(divide="ignore"):
lat_content = 1 / f, array(pws).mean(axis=0)
# Lon spectrum
fs, pws = list(), list()
f_min, f_max = None, None
nb_invalid = 0
for i, lat in enumerate(self.y_c[y0:y1]):
step_x_km = self.xstep * distance(0, lat, 1, lat) / 1000
f, pw = welch(data[:, i], 1 / step_x_km, scaling=scaling, **kwargs)
if isnan(pw).any():
nb_invalid += 1
continue
if f_min is None:
f_min = f.min()
f_max = f.max()
else:
f_min = max(f_min, f.min())
f_max = min(f_max, f.max())
fs.append(f)
pws.append(pw)
if nb_invalid:
logger.warning("%d/%d lines invalid", nb_invalid, i + 1)
f_interp = linspace(f_min, f_max, f.shape[0])
pw_m = array(
[
interp1d(f, pw, fill_value=0.0, bounds_error=False)(f_interp)
for f, pw in zip(fs, pws)
]
).mean(axis=0)
with errstate(divide="ignore"):
lon_content = 1 / f_interp, pw_m
if ref is None:
return lon_content, lat_content
else:
if ref_grid_name is not None:
grid_name = ref_grid_name
ref_lon_content, ref_lat_content = ref.spectrum_lonlat(
grid_name, area, **kwargs
)
return (
(lon_content[0], lon_content[1] / ref_lon_content[1]),
(lat_content[0], lat_content[1] / ref_lat_content[1]),
)
[docs] def compute_finite_difference(self, data, schema=1, mode="reflect", vertical=False):
if not isinstance(schema, int) and schema < 1:
raise Exception("schema must be a positive int")
data2 = data.copy()
data1 = data.copy()
if vertical:
data1[:, :-schema] = data[:, schema:]
data2[:, schema:] = data[:, :-schema]
# put nan
data1[:, -schema:] = nan
data2[:, :schema] = nan
else:
data1[:-schema] = data[schema:]
data2[schema:] = data[:-schema]
if mode == "wrap":
data1[-schema:] = data[:schema]
data2[:schema] = data[-schema:]
else:
# put nan
data1[-schema:] = nan
data2[:schema] = nan
d = self.EARTH_RADIUS * 2 * pi / 360 * 2 * schema
if vertical:
d *= self.ystep
else:
d *= self.xstep * cos(deg2rad(self.y_c))
return (data1 - data2) / d
[docs] def compute_stencil(
self, data, stencil_halfwidth=4, mode="reflect", vertical=False
):
r"""
Apply stencil ponderation on field.
:param array data: array where apply stencil
:param int stencil_halfwidth: from 1 t0 4, maximal stencil used
:param str mode: convolution mode
:param bool vertical: if True, method apply a vertical convolution
:return: gradient array from stencil application
:rtype: array
Short story, how to get stencil coefficient for stencil (3 points, 5 points and 7 points)
Taylor's theorem:
.. math::
f(x \pm h) = f(x) \pm f'(x)h
+ \frac{f''(x)h^2}{2!} \pm \frac{f^{(3)}(x)h^3}{3!}
+ \frac{f^{(4)}(x)h^4}{4!} \pm \frac{f^{(5)}(x)h^5}{5!}
+ O(h^6)
If we stop at `O(h^2)`, we get classic differenciation (stencil 3 points):
.. math:: f(x+h) - f(x-h) = f(x) - f(x) + 2 f'(x)h + O(h^2)
.. math:: f'(x) = \frac{f(x+h) - f(x-h)}{2h} + O(h^2)
If we stop at `O(h^4)`, we will get stencil 5 points:
.. math::
f(x+h) - f(x-h) = 2 f'(x)h + 2 \frac{f^{(3)}(x)h^3}{3!} + O(h^4)
:label: E1
.. math::
f(x+2h) - f(x-2h) = 4 f'(x)h + 16 \frac{f^{(3)}(x)h^3}{3!} + O(h^4)
:label: E2
If we multiply equation :eq:`E1` by 8 and substract equation :eq:`E2`, we get:
.. math:: 8(f(x+h) - f(x-h)) - (f(x+2h) - f(x-2h)) = 16 f'(x)h - 4 f'(x)h + O(h^4)
.. math:: f'(x) = \frac{f(x-2h) - 8f(x-h) + 8f(x+h) - f(x+2h)}{12h} + O(h^4)
If we stop at `O(h^6)`, we will get stencil 7 points:
.. math::
f(x+h) - f(x-h) = 2 f'(x)h + 2 \frac{f^{(3)}(x)h^3}{3!} + 2 \frac{f^{(5)}(x)h^5}{5!} + O(h^6)
:label: E3
.. math::
f(x+2h) - f(x-2h) = 4 f'(x)h + 16 \frac{f^{(3)}(x)h^3}{3!} + 64 \frac{f^{(5)}(x)h^5}{5!} + O(h^6)
:label: E4
.. math::
f(x+3h) - f(x-3h) = 6 f'(x)h + 54 \frac{f^{(3)}(x)h^3}{3!} + 486 \frac{f^{(5)}(x)h^5}{5!} + O(h^6)
:label: E5
If we multiply equation :eq:`E3` by 45 and substract equation :eq:`E4` multiply by 9
and add equation :eq:`E5`, we get:
.. math::
45(f(x+h) - f(x-h)) - 9(f(x+2h) - f(x-2h)) + (f(x+3h) - f(x-3h)) =
90 f'(x)h - 36 f'(x)h + 6 f'(x)h + O(h^6)
.. math::
f'(x) = \frac{-f(x-3h) + 9f(x-2h) - 45f(x-h) + 45f(x+h) - 9f(x+2h) +f(x+3h)}{60h} + O(h^6)
...
"""
stencil_halfwidth = max(min(int(stencil_halfwidth), 4), 1)
logger.debug("Stencil half width apply : %d", stencil_halfwidth)
# output
grad = None
weights = [
array((3, -32, 168, -672, 0, 672, -168, 32, -3)) / 840.0,
array((-1, 9, -45, 0, 45, -9, 1)) / 60.0,
array((1, -8, 0, 8, -1)) / 12.0,
array((-1, 0, 1)) / 2.0,
# uncentered kernel
# like array((0, -1, 1)) but left value could be default value
array((-1, 1)),
# like array((-1, 1, 0)) but right value could be default value
(1, array((-1, 1))),
]
# reduce to stencil selected
weights = weights[4 - stencil_halfwidth :]
if vertical:
data = data.T
# Iteration from larger stencil to smaller (to fill matrix)
for weight in weights:
if isinstance(weight, tuple):
# In the case of unbalanced diff
shift, weight = weight
data_ = data.copy()
data_[shift:] = data[:-shift]
if not vertical:
data_[:shift] = data[-shift:]
else:
data_ = data
# Delta h
d_h = convolve(data_, weights=weight.reshape((-1, 1)), mode=mode)
mask = convolve(
int8(data_.mask), weights=ones(weight.shape).reshape((-1, 1)), mode=mode
)
d_h = ma.array(d_h, mask=mask != 0)
# Delta d
if vertical:
d_h = d_h.T
d = self.EARTH_RADIUS * 2 * pi / 360 * convolve(self.y_c, weight)
else:
if mode == "wrap":
# Along x axis, we need to close
# we will compute in two part
x = self.x_c % 360
d_degrees = convolve(x, weight, mode=mode)
d_degrees_180 = convolve((x + 180) % 360 - 180, weight, mode=mode)
# Arbitrary, to be sure to be far far away of bound
m = (x < 90) + (x > 270)
d_degrees[m] = d_degrees_180[m]
else:
d_degrees = convolve(self.x_c, weight, mode=mode)
d = (
self.EARTH_RADIUS
* 2
* pi
/ 360
* d_degrees.reshape((-1, 1))
* cos(deg2rad(self.y_c))
)
if grad is None:
# First Gradient
grad = d_h / d
else:
# Fill hole
grad[grad.mask] = (d_h / d)[grad.mask]
return grad
[docs] def add_uv_lagerloef(self, grid_height, uname="u", vname="v", schema=15):
self.add_uv(grid_height, uname, vname)
latmax = 5
_, (i_start, i_end) = self.nearest_grd_indice((0, 0), (-latmax, latmax))
sl = slice(i_start, i_end)
# Divide by sideral day
lat = self.y_c[sl]
gob = (
cos(deg2rad(lat))
* ones((self.x_c.shape[0], 1))
* 4.0
* pi
/ (23 * 3600 + 56 * 60 + 4.1)
/ self.EARTH_RADIUS
)
with errstate(divide="ignore"):
gob = self.GRAVITY / (gob * ones((self.x_c.shape[0], 1)))
mode = "wrap" if self.is_circular() else "reflect"
# fill data to compute a finite difference on all point
data = self.convolve_filter_with_dynamic_kernel(
grid_height,
self.kernel_bessel,
lat_max=10,
wave_length=500,
order=1,
extend=0.1,
)
data = self.convolve_filter_with_dynamic_kernel(
data, self.kernel_bessel, lat_max=10, wave_length=500, order=1, extend=0.1
)
data = self.convolve_filter_with_dynamic_kernel(
data, self.kernel_bessel, lat_max=10, wave_length=500, order=1, extend=0.1
)
v_lagerloef = (
self.compute_finite_difference(
self.compute_finite_difference(data, mode=mode, schema=schema),
mode=mode,
schema=schema,
)[:, sl]
* gob
)
u_lagerloef = (
-self.compute_finite_difference(
self.compute_finite_difference(data, vertical=True, schema=schema),
vertical=True,
schema=schema,
)[:, sl]
* gob
)
w = 1 - exp(-((lat / 2.2) ** 2))
self.vars[vname][:, sl] = self.vars[vname][:, sl] * w + v_lagerloef * (1 - w)
self.vars[uname][:, sl] = self.vars[uname][:, sl] * w + u_lagerloef * (1 - w)
[docs] def add_uv(self, grid_height, uname="u", vname="v", stencil_halfwidth=4):
"""Compute a u and v grid
:param str grid_height: grid name where the funtion will apply stencil method
:param str uname: future name of u
:param str vname: future name of v
:param int stencil_halfwidth: largest stencil could be apply (max: 4)
.. minigallery:: py_eddy_tracker.RegularGridDataset.add_uv
"""
logger.info("Add u/v variable with stencil method")
data = self.grid(grid_height)
h_dict = self.variables_description[grid_height]
for variable in (uname, vname):
self.variables_description[variable] = dict(
infos=h_dict["infos"].copy(),
attrs=h_dict["attrs"].copy(),
args=tuple((variable, *h_dict["args"][1:])),
kwargs=h_dict["kwargs"].copy(),
)
if "units" in self.variables_description[variable]["attrs"]:
self.variables_description[variable]["attrs"]["units"] += "/s"
if "long_name" in self.variables_description[variable]["attrs"]:
self.variables_description[variable]["attrs"][
"long_name"
] += " gradient"
# Divide by sideral day
gof = (
sin(deg2rad(self.y_c))
* ones((self.x_c.shape[0], 1))
* 4.0
* pi
/ (23 * 3600 + 56 * 60 + 4.1)
)
with errstate(divide="ignore"):
gof = self.GRAVITY / (gof * ones((self.x_c.shape[0], 1)))
# Compute v
mode = "wrap" if self.is_circular() else "reflect"
self.vars[vname] = (
self.compute_stencil(data, mode=mode, stencil_halfwidth=stencil_halfwidth)
* gof
)
# Compute u
self.vars[uname] = (
-self.compute_stencil(
data, vertical=True, stencil_halfwidth=stencil_halfwidth
)
* gof
)
[docs] def speed_coef_mean(self, contour):
"""Some nan can be computed over contour if we are near border,
something to explore
"""
return mean_on_regular_contour(
self.x_c,
self.y_c,
self._speed_ev,
self._speed_ev.mask,
contour.vertices,
nan_remove=True,
)
[docs] def init_speed_coef(self, uname="u", vname="v"):
"""Draft"""
self._speed_ev = (self.grid(uname) ** 2 + self.grid(vname) ** 2) ** 0.5
[docs] def display(self, ax, name, factor=1, ref=None, **kwargs):
"""
:param matplotlib.axes.Axes ax: matplotlib axes use to draw
:param str,array name: variable to display, could be an array
:param float factor: multiply grid by
:param float,None ref: if define use like west bound
:param dict kwargs: look at :py:meth:`matplotlib.axes.Axes.pcolormesh`
.. minigallery:: py_eddy_tracker.RegularGridDataset.display
"""
if "cmap" not in kwargs:
kwargs["cmap"] = "coolwarm"
data = self.grid(name) if isinstance(name, str) else name
if ref is None:
x = self.x_bounds
else:
x = (self.x_c - ref) % 360 + ref
i = x.argsort()
x = self.c_to_bounds(x[i])
data = data[i]
return ax.pcolormesh(x, self.y_bounds, data.T * factor, **kwargs)
[docs] def contour(self, ax, name, factor=1, ref=None, **kwargs):
"""
:param matplotlib.axes.Axes ax: matplotlib axes use to draw
:param str,array name: variable to display, could be an array
:param float factor: multiply grid by
:param float,None ref: if define use like west bound
:param dict kwargs: look at :py:meth:`matplotlib.axes.Axes.contour`
.. minigallery:: py_eddy_tracker.RegularGridDataset.contour
"""
data = self.grid(name) if isinstance(name, str) else name
if ref is None:
x = self.x_c
else:
x = (self.x_c - ref) % 360 + ref
i = x.argsort()
x = x[i]
data = data[i]
return ax.contour(x, self.y_c, data.T * factor, **kwargs)
[docs] def regrid(self, other, grid_name, new_name=None):
"""
Interpolate another grid at the current grid position
:param RegularGridDataset other:
:param str grid_name: variable name to interpolate
:param str new_name: name used to store, if None method will use current ont
.. minigallery:: py_eddy_tracker.RegularGridDataset.regrid
"""
if new_name is None:
new_name = grid_name
x, y = meshgrid(self.x_c, self.y_c)
# interp and reshape
v_interp = (
other.interp(grid_name, x.reshape(-1), y.reshape(-1)).reshape(x.shape).T
)
v_interp = ma.array(v_interp, mask=isnan(v_interp))
# and add it to self
self.add_grid(new_name, v_interp)
self.variables_description[new_name] = other.variables_description[grid_name]
# self.variables_description[new_name]['infos'] = False
# self.variables_description[new_name]['kwargs']['dimensions'] = ...
[docs] def interp(self, grid_name, lons, lats, method="bilinear"):
"""
Compute z over lons, lats
:param str grid_name: Grid to be interpolated
:param lons: new x
:param lats: new y
:param str method: Could be 'bilinear' or 'nearest'
:return: new z
"""
g = self.grid(grid_name)
if len(g.mask.shape):
m = g.mask
else:
m = ones(g.shape) if g.mask else zeros(g.shape)
return interp2d_geo(
self.x_c, self.y_c, g, m, lons, lats, nearest=method == "nearest"
)
[docs]@njit(cache=True, fastmath=True)
def compute_pixel_path(x0, y0, x1, y1, x_ori, y_ori, x_step, y_step, nb_x):
"""Give a serie of indexes describing the path between two position"""
# index
nx = x0.shape[0]
i_x0 = empty(nx, dtype=numba_types.int_)
i_x1 = empty(nx, dtype=numba_types.int_)
i_y0 = empty(nx, dtype=numba_types.int_)
i_y1 = empty(nx, dtype=numba_types.int_)
# Because round_ is not accepted with array in numba
for i in range(nx):
i_x0[i] = round_(((x0[i] - x_ori) % 360) / x_step)
i_x1[i] = round_(((x1[i] - x_ori) % 360) / x_step)
i_y0[i] = round_((y0[i] - y_ori) / y_step)
i_y1[i] = round_((y1[i] - y_ori) / y_step)
# Delta index of x
d_x = i_x1 - i_x0
d_x = (d_x + nb_x // 2) % nb_x - (nb_x // 2)
i_x1 = i_x0 + d_x
# Delta index of y
d_y = i_y1 - i_y0
# max and abs sum doesn't work on array?
d_max = empty(nx, dtype=numba_types.int32)
nb_value = 0
for i in range(nx):
d_max[i] = max(abs(d_x[i]), abs(d_y[i]))
# Compute number of pixel which we go trought
nb_value += d_max[i] + 1
# Create an empty array to store value of pixel across the travel
i_g = empty(nb_value, dtype=numba_types.int32)
j_g = empty(nb_value, dtype=numba_types.int32)
# Index to determine the position in the global array
ii = 0
# Iteration on each travel
for i, delta in enumerate(d_max):
# If the travel don't cross multiple pixel
if delta == 0:
i_g[ii : ii + delta + 1] = i_x0[i]
j_g[ii : ii + delta + 1] = i_y0[i]
# Vertical move
elif d_x[i] == 0:
sup = -1 if d_y[i] < 0 else 1
i_g[ii : ii + delta + 1] = i_x0[i]
j_g[ii : ii + delta + 1] = arange(i_y0[i], i_y1[i] + sup, sup)
# Horizontal move
elif d_y[i] == 0:
sup = -1 if d_x[i] < 0 else 1
i_g[ii : ii + delta + 1] = arange(i_x0[i], i_x1[i] + sup, sup)
j_g[ii : ii + delta + 1] = i_y0[i]
# In case of multiple direction
else:
a = (i_x1[i] - i_x0[i]) / float(i_y1[i] - i_y0[i])
if abs(d_x[i]) >= abs(d_y[i]):
sup = -1 if d_x[i] < 0 else 1
value = arange(i_x0[i], i_x1[i] + sup, sup)
i_g[ii : ii + delta + 1] = value
j_g[ii : ii + delta + 1] = (value - i_x0[i]) / a + i_y0[i]
else:
sup = -1 if d_y[i] < 0 else 1
value = arange(i_y0[i], i_y1[i] + sup, sup)
j_g[ii : ii + delta + 1] = value
i_g[ii : ii + delta + 1] = (value - i_y0[i]) * a + i_x0[i]
ii += delta + 1
i_g %= nb_x
return i_g, j_g, d_max
[docs]@njit(cache=True)
def has_masked_value(grid, i_x, i_y):
for i, j in zip(i_x, i_y):
if grid[i, j]:
return True
return False
[docs]@njit(cache=True)
def has_value(grid, i_x, i_y, value, below=False):
for i, j in zip(i_x, i_y):
if below:
if grid[i, j] < value:
return True
else:
if grid[i, j] > value:
return True
return False