# -*- coding: utf-8 -*-
"""
Applications on detection and tracking files
"""
import argparse
import logging
from datetime import datetime
from glob import glob
from os import mkdir
from os.path import basename, dirname, exists
from os.path import join as join_path
from re import compile as re_compile
from netCDF4 import Dataset
from numpy import bincount, bytes_, empty, in1d, unique
from yaml import safe_load
from .. import EddyParser
from ..observations.observation import EddiesObservations, reverse_index
from ..observations.tracking import TrackEddiesObservations
from ..tracking import Correspondances
logger = logging.getLogger("pet")
[docs]def eddies_add_circle():
parser = EddyParser("Add or replace contour with radius parameter")
parser.add_argument("filename", help="all file to merge")
parser.add_argument("out", help="output file")
args = parser.parse_args()
obs = EddiesObservations.load_file(args.filename)
if obs.track_array_variables == 0:
obs.track_array_variables = 50
obs = obs.add_fields(
array_fields=(
"contour_lon_e",
"contour_lat_e",
"contour_lon_s",
"contour_lat_s",
)
)
obs.circle_contour()
obs.write_file(filename=args.out)
[docs]def merge_eddies():
parser = EddyParser("Merge eddies")
parser.add_argument("filename", nargs="+", help="all file to merge")
parser.add_argument("out", help="output file")
parser.add_argument(
"--add_rotation_variable", help="add rotation variables", action="store_true"
)
parser.add_argument(
"--include_var", nargs="+", type=str, help="use only listed variable"
)
parser.memory_arg()
args = parser.parse_args()
if args.include_var is None:
with Dataset(args.filename[0]) as h:
args.include_var = h.variables.keys()
obs = list()
for filename in args.filename:
e = TrackEddiesObservations.load_file(
filename, raw_data=True, include_vars=args.include_var
)
if args.add_rotation_variable:
e = e.add_rotation_type()
obs.append(e)
obs = TrackEddiesObservations.concatenate(obs)
obs.write_file(filename=args.out)
[docs]def get_frequency_grid():
parser = EddyParser("Compute eddy frequency")
parser.add_argument("observations", help="Input observations to compute frequency")
parser.add_argument("out", help="Grid output file")
parser.contour_intern_arg()
parser.add_argument(
"--xrange", nargs="+", type=float, help="Horizontal range : START,STOP,STEP"
)
parser.add_argument(
"--yrange", nargs="+", type=float, help="Vertical range : START,STOP,STEP"
)
args = parser.parse_args()
if (args.xrange is None or len(args.xrange) not in (3,)) or (
args.yrange is None or len(args.yrange) not in (3,)
):
raise Exception("Use START/STOP/STEP for --xrange and --yrange")
var_to_load = ["longitude"]
var_to_load.extend(EddiesObservations.intern(args.intern, public_label=True))
e = EddiesObservations.load_file(args.observations, include_vars=var_to_load)
bins = args.xrange, args.yrange
g = e.grid_count(bins, intern=args.intern)
g.write(args.out)
[docs]def display_infos():
parser = EddyParser("Display General inforamtion")
parser.add_argument(
"observations", nargs="+", help="Input observations to compute frequency"
)
parser.add_argument("--vars", nargs="+", help=argparse.SUPPRESS)
parser.add_argument(
"--area",
nargs=4,
type=float,
metavar=("llcrnrlon", "llcrnrlat", "urcrnrlon", "urcrnrlat"),
help="Bounding box",
)
args = parser.parse_args()
if args.vars:
vars = args.vars
else:
vars = [
"amplitude",
"speed_radius",
"speed_area",
"effective_radius",
"effective_area",
"time",
"latitude",
"longitude",
]
filenames = args.observations
filenames.sort()
for filename in filenames:
with Dataset(filename) as h:
track = "track" in h.variables
print(f"-- {filename} -- ")
if track:
vars_ = vars.copy()
vars_.extend(("track", "observation_number", "observation_flag"))
e = TrackEddiesObservations.load_file(filename, include_vars=vars_)
else:
e = EddiesObservations.load_file(filename, include_vars=vars)
if args.area is not None:
area = dict(
llcrnrlon=args.area[0],
llcrnrlat=args.area[1],
urcrnrlon=args.area[2],
urcrnrlat=args.area[3],
)
e = e.extract_with_area(area)
print(e)
[docs]def eddies_tracking():
parser = EddyParser("Tool to use identification step to compute tracking")
parser.add_argument("yaml_file", help="Yaml file to configure py-eddy-tracker")
parser.add_argument("--correspondance_in", help="Filename of saved correspondance")
parser.add_argument("--correspondance_out", help="Filename to save correspondance")
parser.add_argument(
"--save_correspondance_and_stop",
action="store_true",
help="Stop tracking after correspondance computation,"
" merging can be done with EddyFinalTracking",
)
parser.add_argument(
"--zarr", action="store_true", help="Output will be wrote in zarr"
)
parser.add_argument("--unraw", action="store_true", help="Load unraw data")
parser.add_argument(
"--blank_period",
type=int,
default=0,
help="Nb of detection which will not use at the end of the period",
)
parser.memory_arg()
args = parser.parse_args()
# Read yaml configuration file
with open(args.yaml_file, "r") as stream:
config = safe_load(stream)
if "CLASS" in config:
classname = config["CLASS"]["CLASS"]
obs_class = dict(
class_method=getattr(
__import__(config["CLASS"]["MODULE"], globals(), locals(), classname),
classname,
),
class_kw=config["CLASS"].get("OPTIONS", dict()),
)
else:
obs_class = dict()
c_in, c_out = args.correspondance_in, args.correspondance_out
if c_in is None:
c_in = config["PATHS"].get("CORRESPONDANCES_IN", None)
y_c_out = config["PATHS"].get(
"CORRESPONDANCES_OUT", "{path}/{sign_type}_correspondances.nc"
)
if c_out is None:
c_out = y_c_out
# Create ouput folder if necessary
save_dir = config["PATHS"].get("SAVE_DIR", None)
if save_dir is not None and not exists(save_dir):
mkdir(save_dir)
track(
pattern=config["PATHS"]["FILES_PATTERN"],
output_dir=save_dir,
c_out=c_out,
**obs_class,
virtual=int(config.get("VIRTUAL_LENGTH_MAX", 0)),
previous_correspondance=c_in,
memory=args.memory,
correspondances_only=args.save_correspondance_and_stop,
raw=not args.unraw,
zarr=args.zarr,
nb_obs_min=int(config.get("TRACK_DURATION_MIN", 10)),
blank_period=args.blank_period,
)
[docs]def browse_dataset_in(
data_dir,
files_model,
date_regexp,
date_model,
start_date=None,
end_date=None,
sub_sampling_step=1,
files=None,
):
pattern_regexp = re_compile(".*/" + date_regexp)
if files is not None:
filenames = bytes_(files)
else:
full_path = join_path(data_dir, files_model)
logger.info("Search files : %s", full_path)
filenames = bytes_(glob(full_path))
dataset_list = empty(
len(filenames),
dtype=[
("filename", "S500"),
("date", "datetime64[D]"),
],
)
dataset_list["filename"] = filenames
logger.info("%s grids available", dataset_list.shape[0])
mode_attrs = False
if "(" not in date_regexp:
logger.debug("Attrs date : %s", date_regexp)
mode_attrs = date_regexp.strip().split(":")
else:
logger.debug("Pattern date : %s", date_regexp)
for item in dataset_list:
str_date = None
if mode_attrs:
with Dataset(item["filename"].decode("utf-8")) as h:
if len(mode_attrs) == 1:
str_date = getattr(h, mode_attrs[0])
else:
str_date = getattr(h.variables[mode_attrs[0]], mode_attrs[1])
else:
result = pattern_regexp.match(str(item["filename"]))
if result:
str_date = result.groups()[0]
if str_date is not None:
item["date"] = datetime.strptime(str_date, date_model).date()
dataset_list.sort(order=["date", "filename"])
steps = unique(dataset_list["date"][1:] - dataset_list["date"][:-1])
if len(steps) > 1:
raise Exception("Several days steps in grid dataset %s" % steps)
if sub_sampling_step != 1:
logger.info("Grid subsampling %d", sub_sampling_step)
dataset_list = dataset_list[::sub_sampling_step]
if start_date is not None or end_date is not None:
logger.info(
"Available grid from %s to %s",
dataset_list[0]["date"],
dataset_list[-1]["date"],
)
logger.info("Filtering grid by time %s, %s", start_date, end_date)
mask = (dataset_list["date"] >= start_date) * (dataset_list["date"] <= end_date)
dataset_list = dataset_list[mask]
return dataset_list
[docs]def track(
pattern,
output_dir,
c_out,
nb_obs_min=10,
raw=True,
zarr=False,
blank_period=0,
correspondances_only=False,
**kw_c,
):
kw = dict(date_regexp=".*_([0-9]*?).[nz].*", date_model="%Y%m%d")
if isinstance(pattern, list):
kw.update(dict(data_dir=None, files_model=None, files=pattern))
else:
kw.update(dict(data_dir=dirname(pattern), files_model=basename(pattern)))
datasets = browse_dataset_in(**kw)
if blank_period > 0:
datasets = datasets[:-blank_period]
logger.info("Last %d files will be pop", blank_period)
if nb_obs_min > len(datasets):
raise Exception(
"Input file number (%s) is shorter than TRACK_DURATION_MIN (%s)."
% (len(datasets), nb_obs_min)
)
c = Correspondances(datasets=datasets["filename"], **kw_c)
c.track()
logger.info("Track finish")
t0, t1 = c.period
kw_save = dict(
date_start=t0,
date_stop=t1,
date_prod=datetime.now(),
path=output_dir,
sign_type=c.current_obs.sign_legend,
)
c.save(c_out, kw_save)
if correspondances_only:
return
logger.info("Start merging")
c.prepare_merging()
logger.info("Longer track saved have %d obs", c.nb_obs_by_tracks.max())
logger.info(
"The mean length is %d observations for all tracks", c.nb_obs_by_tracks.mean()
)
kw_write = dict(path=output_dir, zarr_flag=zarr)
c.get_unused_data(raw_data=raw).write_file(
filename="%(path)s/%(sign_type)s_untracked.nc", **kw_write
)
short_c = c._copy()
short_c.shorter_than(size_max=nb_obs_min)
c.longer_than(size_min=nb_obs_min)
long_track = c.merge(raw_data=raw)
short_track = short_c.merge(raw_data=raw)
# We flag obs
if c.virtual:
long_track["virtual"][:] = long_track["time"] == 0
long_track.normalize_longitude()
long_track.filled_by_interpolation(long_track["virtual"] == 1)
short_track["virtual"][:] = short_track["time"] == 0
short_track.normalize_longitude()
short_track.filled_by_interpolation(short_track["virtual"] == 1)
logger.info("Longer track saved have %d obs", c.nb_obs_by_tracks.max())
logger.info(
"The mean length is %d observations for long track",
c.nb_obs_by_tracks.mean(),
)
long_track.write_file(**kw_write)
short_track.write_file(
filename="%(path)s/%(sign_type)s_track_too_short.nc", **kw_write
)
[docs]def get_group(
dataset1,
dataset2,
index1,
index2,
score,
invalid=2,
low=10,
high=60,
):
group1, group2 = dict(), dict()
m_valid = (score * 100) >= invalid
i1, i2, score = index1[m_valid], index2[m_valid], score[m_valid] * 100
# Eddies with no association & scores < invalid
group1["nomatch"] = reverse_index(i1, len(dataset1))
group2["nomatch"] = reverse_index(i2, len(dataset2))
# Select all eddies involved in multiple associations
i1_, nb1 = unique(i1, return_counts=True)
i2_, nb2 = unique(i2, return_counts=True)
i1_multi = i1_[nb1 >= 2]
i2_multi = i2_[nb2 >= 2]
m_multi = in1d(i1, i1_multi) + in1d(i2, i2_multi)
group1["multi_match"] = unique(i1[m_multi])
group2["multi_match"] = unique(i2[m_multi])
# Low scores
m_low = score <= low
m_low *= ~m_multi
group1["low"] = i1[m_low]
group2["low"] = i2[m_low]
# Intermediate scores
m_i = (score > low) * (score <= high)
m_i *= ~m_multi
group1["intermediate"] = i1[m_i]
group2["intermediate"] = i2[m_i]
# High scores
m_high = score > high
m_high *= ~m_multi
group1["high"] = i1[m_high]
group2["high"] = i2[m_high]
def get_twin(j2, j1):
# True only if j1 is used only one
m = bincount(j1)[j1] == 1
# We keep only link of this mask j1 have exactly one parent
j2_ = j2[m]
# We count parent times
m_ = (bincount(j2_)[j2_] == 2) * (bincount(j2)[j2_] == 2)
# we fill first mask with second one
m[m] = m_
return m
m1 = get_twin(i1, i2)
m2 = get_twin(i2, i1)
group1["parent"] = unique(i1[m1])
group2["parent"] = unique(i2[m2])
group1["twin"] = i1[m2]
group2["twin"] = i2[m1]
m = ~m1 * ~m2 * m_multi
group1["complex"] = unique(i1[m])
group2["complex"] = unique(i2[m])
return group1, group2
[docs]def quick_compare():
parser = EddyParser(
"Tool to have a quick comparison between several identification"
)
parser.add_argument("ref", help="Identification file of reference")
parser.add_argument("others", nargs="+", help="Identifications files to compare")
parser.add_argument("--high", default=40, type=float)
parser.add_argument("--low", default=20, type=float)
parser.add_argument("--invalid", default=5, type=float)
parser.contour_intern_arg()
args = parser.parse_args()
kw = dict(
include_vars=[
"longitude",
*EddiesObservations.intern(args.intern, public_label=True),
]
)
ref = EddiesObservations.load_file(args.ref, **kw)
print(f"[ref] {args.ref} -> {len(ref)} obs")
groups_ref, groups_other = dict(), dict()
others = {other: EddiesObservations.load_file(other, **kw) for other in args.others}
for i, other_ in enumerate(args.others):
other = others[other_]
print(f"[{i}] {other_} -> {len(other)} obs")
gr1, gr2 = get_group(
ref,
other,
*ref.match(other, intern=args.intern),
invalid=args.invalid,
low=args.low,
high=args.high,
)
groups_ref[other_] = gr1
groups_other[other_] = gr2
def display(value, ref=None):
outs = list()
for v in value:
if ref:
outs.append(f"{v/ref * 100:.1f}% ({v})")
else:
outs.append(v)
return "".join([f"{v:^15}" for v in outs])
keys = list(gr1.keys())
print(" ", display(keys))
for i, v in enumerate(groups_ref.values()):
print(
f"[{i:2}] ",
display(
(v_.sum() if v_.dtype == "bool" else v_.shape[0] for v_ in v.values()),
ref=len(ref),
),
)
print(display(keys))
for i, (k, v) in enumerate(groups_other.items()):
print(
f"[{i:2}] ",
display(
(v_.sum() if v_.dtype == "bool" else v_.shape[0] for v_ in v.values()),
ref=len(others[k]),
),
)