Source code for py_eddy_tracker.appli.eddies

# -*- coding: utf-8 -*-
"""
Applications on detection and tracking files
"""
import argparse
from datetime import datetime
from glob import glob
import logging
from os import mkdir
from os.path import basename, dirname, exists, join as join_path
from re import compile as re_compile

from netCDF4 import Dataset
from numpy import bincount, bytes_, empty, in1d, unique
from yaml import safe_load

from .. import EddyParser, identify_time
from ..observations.observation import EddiesObservations, reverse_index
from ..observations.tracking import TrackEddiesObservations
from ..tracking import Correspondances

logger = logging.getLogger("pet")


[docs]def eddies_add_circle(): parser = EddyParser("Add or replace contour with radius parameter") parser.add_argument("filename", help="all file to merge") parser.add_argument("out", help="output file") args = parser.parse_args() obs = EddiesObservations.load_file(args.filename) if obs.track_array_variables == 0: obs.track_array_variables = 50 obs = obs.add_fields( array_fields=( "contour_lon_e", "contour_lat_e", "contour_lon_s", "contour_lat_s", ) ) obs.circle_contour() obs.write_file(filename=args.out)
[docs]def merge_eddies(): parser = EddyParser("Merge eddies") parser.add_argument("filename", nargs="+", help="all file to merge") parser.add_argument("out", help="output file") parser.add_argument( "--add_rotation_variable", help="add rotation variables", action="store_true" ) parser.add_argument( "--include_var", nargs="+", type=str, help="use only listed variable" ) parser.memory_arg() args = parser.parse_args() if args.include_var is None: with Dataset(args.filename[0]) as h: args.include_var = h.variables.keys() obs = list() for filename in args.filename: e = TrackEddiesObservations.load_file( filename, raw_data=True, include_vars=args.include_var ) if args.add_rotation_variable: e = e.add_rotation_type() obs.append(e) obs = TrackEddiesObservations.concatenate(obs) obs.write_file(filename=args.out)
[docs]def get_frequency_grid(): parser = EddyParser("Compute eddy frequency") parser.add_argument("observations", help="Input observations to compute frequency") parser.add_argument("out", help="Grid output file") parser.contour_intern_arg() parser.add_argument( "--xrange", nargs="+", type=float, help="Horizontal range : START,STOP,STEP" ) parser.add_argument( "--yrange", nargs="+", type=float, help="Vertical range : START,STOP,STEP" ) args = parser.parse_args() if (args.xrange is None or len(args.xrange) not in (3,)) or ( args.yrange is None or len(args.yrange) not in (3,) ): raise Exception("Use START/STOP/STEP for --xrange and --yrange") var_to_load = ["longitude"] var_to_load.extend(EddiesObservations.intern(args.intern, public_label=True)) e = EddiesObservations.load_file(args.observations, include_vars=var_to_load) bins = args.xrange, args.yrange g = e.grid_count(bins, intern=args.intern) g.write(args.out)
[docs]def display_infos(): parser = EddyParser("Display General inforamtion") parser.add_argument( "observations", nargs="+", help="Input observations to compute frequency" ) parser.add_argument("--vars", nargs="+", help=argparse.SUPPRESS) parser.add_argument( "--area", nargs=4, type=float, metavar=("llcrnrlon", "llcrnrlat", "urcrnrlon", "urcrnrlat"), help="Bounding box", ) args = parser.parse_args() if args.vars: vars = args.vars else: vars = [ "amplitude", "speed_radius", "speed_area", "effective_radius", "effective_area", "time", "latitude", "longitude", ] filenames = args.observations filenames.sort() for filename in filenames: with Dataset(filename) as h: track = "track" in h.variables print(f"-- {filename} -- ") if track: vars_ = vars.copy() vars_.extend(("track", "observation_number", "observation_flag")) e = TrackEddiesObservations.load_file(filename, include_vars=vars_) else: e = EddiesObservations.load_file(filename, include_vars=vars) if args.area is not None: area = dict( llcrnrlon=args.area[0], llcrnrlat=args.area[1], urcrnrlon=args.area[2], urcrnrlat=args.area[3], ) e = e.extract_with_area(area) print(e)
[docs]def eddies_tracking(): parser = EddyParser("Tool to use identification step to compute tracking") parser.add_argument("yaml_file", help="Yaml file to configure py-eddy-tracker") parser.add_argument("--correspondance_in", help="Filename of saved correspondance") parser.add_argument("--correspondance_out", help="Filename to save correspondance") parser.add_argument( "--save_correspondance_and_stop", action="store_true", help="Stop tracking after correspondance computation," " merging can be done with EddyFinalTracking", ) parser.add_argument( "--zarr", action="store_true", help="Output will be wrote in zarr" ) parser.add_argument( "--unraw", action="store_true", help="Load unraw data, use only for netcdf." "If unraw is active, netcdf is loaded without apply scalefactor and add_offset.", ) parser.add_argument( "--blank_period", type=int, default=0, help="Nb of detection which will not use at the end of the period", ) parser.memory_arg() args = parser.parse_args() # Read yaml configuration file with open(args.yaml_file, "r") as stream: config = safe_load(stream) if "CLASS" in config: classname = config["CLASS"]["CLASS"] obs_class = dict( class_method=getattr( __import__(config["CLASS"]["MODULE"], globals(), locals(), classname), classname, ), class_kw=config["CLASS"].get("OPTIONS", dict()), ) else: obs_class = dict() c_in, c_out = args.correspondance_in, args.correspondance_out if c_in is None: c_in = config["PATHS"].get("CORRESPONDANCES_IN", None) y_c_out = config["PATHS"].get( "CORRESPONDANCES_OUT", "{path}/{sign_type}_correspondances.nc" ) if c_out is None: c_out = y_c_out # Create ouput folder if necessary save_dir = config["PATHS"].get("SAVE_DIR", None) if save_dir is not None and not exists(save_dir): mkdir(save_dir) track( pattern=config["PATHS"]["FILES_PATTERN"], output_dir=save_dir, c_out=c_out, **obs_class, virtual=int(config.get("VIRTUAL_LENGTH_MAX", 0)), previous_correspondance=c_in, memory=args.memory, correspondances_only=args.save_correspondance_and_stop, raw=not args.unraw, zarr=args.zarr, nb_obs_min=int(config.get("TRACK_DURATION_MIN", 10)), blank_period=args.blank_period, )
[docs]def browse_dataset_in( data_dir, files_model, date_regexp, date_model=None, start_date=None, end_date=None, sub_sampling_step=1, files=None, ): pattern_regexp = re_compile(".*/" + date_regexp) if files is not None: filenames = bytes_(files) else: full_path = join_path(data_dir, files_model) logger.info("Search files : %s", full_path) filenames = bytes_(glob(full_path)) dataset_list = empty( len(filenames), dtype=[("filename", "S500"), ("date", "datetime64[s]")], ) dataset_list["filename"] = filenames logger.info("%s grids available", dataset_list.shape[0]) mode_attrs = False if "(" not in date_regexp: logger.debug("Attrs date : %s", date_regexp) mode_attrs = date_regexp.strip().split(":") else: logger.debug("Pattern date : %s", date_regexp) for item in dataset_list: str_date = None if mode_attrs: with Dataset(item["filename"].decode("utf-8")) as h: if len(mode_attrs) == 1: str_date = getattr(h, mode_attrs[0]) else: str_date = getattr(h.variables[mode_attrs[0]], mode_attrs[1]) else: result = pattern_regexp.match(str(item["filename"])) if result: str_date = result.groups()[0] if str_date is not None: if date_model is None: item["date"] = identify_time(str_date) else: item["date"] = datetime.strptime(str_date, date_model) dataset_list.sort(order=["date", "filename"]) steps = unique(dataset_list["date"][1:] - dataset_list["date"][:-1]) if len(steps) > 1: raise Exception("Several timesteps in grid dataset %s" % steps) if sub_sampling_step != 1: logger.info("Grid subsampling %d", sub_sampling_step) dataset_list = dataset_list[::sub_sampling_step] if start_date is not None or end_date is not None: logger.info( "Available grid from %s to %s", dataset_list[0]["date"], dataset_list[-1]["date"], ) logger.info("Filtering grid by time %s, %s", start_date, end_date) mask = (dataset_list["date"] >= start_date) * (dataset_list["date"] <= end_date) dataset_list = dataset_list[mask] return dataset_list
[docs]def track( pattern, output_dir, c_out, nb_obs_min=10, raw=True, zarr=False, blank_period=0, correspondances_only=False, **kw_c, ): kw = dict(date_regexp=".*_([0-9]*?).[nz].*") if isinstance(pattern, list): kw.update(dict(data_dir=None, files_model=None, files=pattern)) else: kw.update(dict(data_dir=dirname(pattern), files_model=basename(pattern))) datasets = browse_dataset_in(**kw) if blank_period > 0: datasets = datasets[:-blank_period] logger.info("Last %d files will be pop", blank_period) if nb_obs_min > len(datasets): raise Exception( "Input file number (%s) is shorter than TRACK_DURATION_MIN (%s)." % (len(datasets), nb_obs_min) ) c = Correspondances(datasets=datasets["filename"], **kw_c) c.track() logger.info("Track finish") kw_save = dict( date_start=datasets["date"][0], date_stop=datasets["date"][-1], date_prod=datetime.now(), path=output_dir, sign_type=c.current_obs.sign_legend, ) c.save(c_out, kw_save) if correspondances_only: return logger.info("Start merging") c.prepare_merging() logger.info("Longer track saved have %d obs", c.nb_obs_by_tracks.max()) logger.info( "The mean length is %d observations for all tracks", c.nb_obs_by_tracks.mean() ) kw_write = dict(path=output_dir, zarr_flag=zarr) c.get_unused_data(raw_data=raw).write_file( filename="%(path)s/%(sign_type)s_untracked.nc", **kw_write ) short_c = c._copy() short_c.shorter_than(size_max=nb_obs_min) short_track = short_c.merge(raw_data=raw) if c.longer_than(size_min=nb_obs_min) is False: long_track = short_track.empty_dataset() else: long_track = c.merge(raw_data=raw) # We flag obs if c.virtual: long_track["virtual"][:] = long_track["time"] == 0 long_track.normalize_longitude() long_track.filled_by_interpolation(long_track["virtual"] == 1) short_track["virtual"][:] = short_track["time"] == 0 short_track.normalize_longitude() short_track.filled_by_interpolation(short_track["virtual"] == 1) logger.info("Longer track saved have %d obs", c.nb_obs_by_tracks.max()) logger.info( "The mean length is %d observations for long track", c.nb_obs_by_tracks.mean(), ) long_track.write_file(**kw_write) short_track.write_file( filename="%(path)s/%(sign_type)s_track_too_short.nc", **kw_write )
[docs]def get_group( dataset1, dataset2, index1, index2, score, invalid=2, low=10, high=60, ): group1, group2 = dict(), dict() m_valid = (score * 100) >= invalid i1, i2, score = index1[m_valid], index2[m_valid], score[m_valid] * 100 # Eddies with no association & scores < invalid group1["nomatch"] = reverse_index(i1, len(dataset1)) group2["nomatch"] = reverse_index(i2, len(dataset2)) # Select all eddies involved in multiple associations i1_, nb1 = unique(i1, return_counts=True) i2_, nb2 = unique(i2, return_counts=True) i1_multi = i1_[nb1 >= 2] i2_multi = i2_[nb2 >= 2] m_multi = in1d(i1, i1_multi) + in1d(i2, i2_multi) # Low scores m_low = score < low m_low *= ~m_multi group1["low"] = i1[m_low] group2["low"] = i2[m_low] # Intermediate scores m_i = (score >= low) * (score < high) m_i *= ~m_multi group1["intermediate"] = i1[m_i] group2["intermediate"] = i2[m_i] # High scores m_high = score >= high m_high *= ~m_multi group1["high"] = i1[m_high] group2["high"] = i2[m_high] # Here for a nice display order group1["multi_match"] = unique(i1[m_multi]) group2["multi_match"] = unique(i2[m_multi]) def get_twin(j2, j1): # True only if j1 is used only one m = bincount(j1)[j1] == 1 # We keep only link of this mask j1 have exactly one parent j2_ = j2[m] # We count parent times m_ = (bincount(j2_)[j2_] == 2) * (bincount(j2)[j2_] == 2) # we fill first mask with second one m[m] = m_ return m m1 = get_twin(i1, i2) m2 = get_twin(i2, i1) group1["parent"] = unique(i1[m1]) group2["parent"] = unique(i2[m2]) group1["twin"] = i1[m2] group2["twin"] = i2[m1] m = ~m1 * ~m2 * m_multi group1["complex"] = unique(i1[m]) group2["complex"] = unique(i2[m]) return group1, group2
[docs]def run_compare(ref, others, invalid=1, low=20, high=80, intern=False, **kwargs): groups_ref, groups_other = dict(), dict() for i, (k, other) in enumerate(others.items()): print(f"[{i}] {k} -> {len(other)} obs") gr1, gr2 = get_group( ref, other, *ref.match(other, intern=intern, **kwargs), invalid=invalid, low=low, high=high, ) groups_ref[k] = gr1 groups_other[k] = gr2 return groups_ref, groups_other
[docs]def display_compare( ref, others, invalid=1, low=20, high=80, area=False, intern=False, **kwargs ): gr_ref, gr_others = run_compare( ref, others, invalid=invalid, low=low, high=high, intern=intern, **kwargs ) def display(value, ref=None): outs = list() for v in value: if ref: if area: outs.append(f"{v / ref * 100:.1f}% ({v:.1f}Mkm²)") else: outs.append(f"{v/ref * 100:.1f}% ({v})") else: outs.append(v) if area: return "".join([f"{v:^16}" for v in outs]) else: return "".join([f"{v:^15}" for v in outs]) def get_values(v, dataset): if area: area_ = dataset["speed_area" if intern else "effective_area"] return [area_[v_].sum() / 1e12 for v_ in v.values()] else: return [ v_.sum() if v_.dtype == "bool" else v_.shape[0] for v_ in v.values() ] labels = dict( high=f"{high:0.0f} <= high", low=f"{invalid:0.0f} <= low < {low:0.0f}", ) keys = [labels.get(key, key) for key in list(gr_ref.values())[0].keys()] print(" ", display(keys)) if area: ref_ = ref["speed_area" if intern else "effective_area"].sum() / 1e12 else: ref_ = len(ref) for i, v in enumerate(gr_ref.values()): print(f"[{i:2}] ", display(get_values(v, ref), ref=ref_)) print(" Point of view of study dataset") print(" ", display(keys)) for i, (k, v) in enumerate(gr_others.items()): other = others[k] if area: ref_ = other["speed_area" if intern else "effective_area"].sum() / 1e12 else: ref_ = len(other) print(f"[{i:2}] ", display(get_values(v, other), ref=ref_))
[docs]def quick_compare(): parser = EddyParser( "Tool to have a quick comparison between several identification" ) parser.add_argument("ref", help="Identification file of reference") parser.add_argument("others", nargs="+", help="Identifications files to compare") help = "Display in percent of area instead percent of observation" parser.add_argument("--area", action="store_true", help=help) help = "Use minimal cost function" parser.add_argument("--minimal_area", action="store_true", help=help) parser.add_argument("--high", default=40, type=float) parser.add_argument("--low", default=20, type=float) parser.add_argument("--invalid", default=5, type=float) parser.add_argument( "--path_out", default=None, help="Save each group in separate file" ) parser.contour_intern_arg() args = parser.parse_args() kw = dict( include_vars=[ "longitude", *EddiesObservations.intern(args.intern, public_label=True), ] ) if args.area: kw["include_vars"].append("speed_area" if args.intern else "effective_area") if args.path_out is not None: kw = dict() ref = EddiesObservations.load_file(args.ref, **kw) print(f"[ref] {args.ref} -> {len(ref)} obs") others = {other: EddiesObservations.load_file(other, **kw) for other in args.others} kwargs = dict( invalid=args.invalid, low=args.low, high=args.high, intern=args.intern, minimal_area=args.minimal_area, ) if args.path_out is not None: groups_ref, groups_other = run_compare(ref, others, **kwargs) if not exists(args.path_out): mkdir(args.path_out) for i, other_ in enumerate(args.others): dirname_ = f"{args.path_out}/{other_.replace('/', '_')}/" if not exists(dirname_): mkdir(dirname_) for k, v in groups_other[other_].items(): basename_ = f"other_{k}.nc" others[other_].index(v).write_file(filename=f"{dirname_}/{basename_}") for k, v in groups_ref[other_].items(): basename_ = f"ref_{k}.nc" ref.index(v).write_file(filename=f"{dirname_}/{basename_}") return display_compare(ref, others, **kwargs, area=args.area)