Source code for analyse.catchment

"""
Post-processing: extract, **per drainage basin**, the cell of maximum water
discharge and the cell of maximum sediment load from a gridded goSPL output —
i.e. each basin's **outflow point** (river mouth) and its flux.

This is the natural downstream step after :mod:`gospl.analyse.gridexport`: that
tool rasterises a goSPL surface to a CF-NetCDF grid carrying ``FA`` (flow
accumulation / water discharge), ``sedLoad`` (sediment load) and ``basin`` (the
drainage-basin id of every cell). Here, for every basin, we pick the single cell
where ``FA`` peaks (the water outlet) and the single cell where ``sedLoad`` peaks
(the sediment outlet), and write their ``lon``/``lat`` + value. Looped over a
time series, this gives the migrating river-mouth fluxes used in flux maps.

The water discharge defaults to ``FA``. Pass ``flow_var="fillFA"``
(``--flow-var fillFA``) to use the **depression-filled** accumulation instead,
which routes the trunk river *through* lakes / pits so a basin's outlet carries
its full upstream discharge even when the channel crosses a depression (raw
``FA`` can drop to zero inside a lake).

Consistent variable names
-------------------------
The defaults match the names :func:`gospl.analyse.gridexport.to_netcdf` writes,
so this tool reads a ``surface*.nc`` produced by ``gospl-grid`` **directly** — no
renaming step:

================  ====================  ===========================
quantity          gridexport name       legacy fallback (mapOutputs)
================  ====================  ===========================
water discharge   ``FA``                ``flowDischarge``
sediment load     ``sedLoad``           ``sedimentLoad``
basin id          ``basin``             ``basinID``
longitude         ``lon``               ``longitude``
latitude          ``lat``               ``latitude``
================  ====================  ===========================

Each variable falls back to the legacy name automatically if the gridexport one
is absent, so old ``fsdata*.nc`` files still work; explicit ``*_var`` arguments
override both.

Speed
-----
The per-basin maximum is a **grouped arg-max**, done once with a single
``lexsort`` over the subaerial cells (sort by ``(basin, value)``; the last cell
of each basin group is its maximum). This is :math:`O(N \\log N)` for the whole
grid, replacing the previous :math:`O(N_\\mathrm{basins} \\times N_\\mathrm{cells})`
scan (a full ``where(basin == k)`` per basin), which dominated runtime on a
global 0.1° grid (millions of cells × thousands of basins). It is fast enough to
run **serially** — the old MPI fan-out over basins is no longer needed.

Runnable as ``gospl-catchment`` (installed) or
``python -m gospl.analyse.catchment``::

    gospl-catchment -i inputSedFlow.csv -o flowsed

where the index CSV has two columns ``time,netcdf``::

    time,netcdf
    1,results/surface1.nc
    5,results/surface5.nc
    10,results/surface10.nc

writes ``flowsed/flow{time}.csv`` and ``flowsed/sed{time}.csv`` (columns
``basin,lon,lat,val``). In a notebook::

    from gospl.analyse.catchment import catchment_flux, basin_outflow
    out = catchment_flux("inputSedFlow.csv", "flowsed")   # batch -> CSVs
    o = basin_outflow("results/surface10.nc")             # one file
    flowdf, seddf = o["flow"], o["sed"]
"""

import os
import argparse

import numpy as np


# Variable-name aliases: the gridexport name first, the legacy mapOutputs name
# last. The first one present in the file is used (unless overridden). Water
# discharge defaults to the raw `FA`; `fillFA` (depression-filled accumulation)
# is kept as a fallback and can be selected explicitly with `--flow-var fillFA`.
_FLOW_ALIASES = ("FA", "fillFA", "flowDischarge")
_SED_ALIASES = ("sedLoad", "sedimentLoad")
_BASIN_ALIASES = ("basin", "basinID")
_LON_ALIASES = ("lon", "longitude")
_LAT_ALIASES = ("lat", "latitude")


def _resolve(present, requested, aliases, kind):
    """Pick the variable name to read: explicit request, else first alias present."""
    if requested is not None:
        if requested not in present:
            raise KeyError(
                "variable %r (%s) not in NetCDF; present: %s"
                % (requested, kind, sorted(present))
            )
        return requested
    for a in aliases:
        if a in present:
            return a
    raise KeyError(
        "no %s variable found (looked for %s) in NetCDF; present: %s"
        % (kind, list(aliases), sorted(present))
    )


def _read_grid(ncfile, flow_var, sed_var, basin_var, lon_var, lat_var):
    """
    Read the flow / sediment / basin fields and the lon-lat axes from a gridded
    NetCDF. ``ncfile`` may be a path, an open ``netCDF4.Dataset`` or an
    ``xarray.Dataset``. Returns ``(flow2d, sed2d, basin2d, lon1d, lat1d)``.
    """
    if isinstance(ncfile, (str, os.PathLike)):
        import netCDF4

        with netCDF4.Dataset(os.fspath(ncfile)) as ds:
            return _extract(ds, list(ds.variables), flow_var, sed_var,
                            basin_var, lon_var, lat_var)
    # already-open dataset: both netCDF4.Dataset and xarray.Dataset expose a
    # `.variables` mapping (xarray's includes coords) and index by name.
    return _extract(ncfile, list(ncfile.variables), flow_var, sed_var,
                    basin_var, lon_var, lat_var)


def _extract(ds, present, flow_var, sed_var, basin_var, lon_var, lat_var):
    def arr(name):
        v = ds[name]
        return np.asarray(v.values if hasattr(v, "values") else v[:])

    fv = _resolve(present, flow_var, _FLOW_ALIASES, "water discharge")
    sv = _resolve(present, sed_var, _SED_ALIASES, "sediment load")
    bv = _resolve(present, basin_var, _BASIN_ALIASES, "basin id")
    lov = _resolve(present, lon_var, _LON_ALIASES, "longitude")
    lav = _resolve(present, lat_var, _LAT_ALIASES, "latitude")
    return (arr(fv).astype(np.float64), arr(sv).astype(np.float64),
            arr(bv), arr(lov).astype(np.float64), arr(lav).astype(np.float64))


def _basin_argmax(basin, value, lon, lat, nb, min_cells):
    """
    For every basin id (``0..nb-1``), find the cell of MAXIMUM ``value`` and
    return its ``lon``/``lat``/``value``. Vectorised grouped arg-max: one stable
    ``lexsort`` by ``(basin, value)`` puts each basin's largest value last in its
    group, so the group boundaries give the per-basin maxima in one pass.

    Basins with at most ``min_cells`` cells are dropped (too small to be a
    meaningful catchment — matches the legacy ``len(ids) > 10`` filter). Returns
    a DataFrame ``basin,lon,lat,val`` (one row per kept basin).
    """
    import pandas as pd

    # Non-finite discharge can never be a maximum (NaN sorts last in lexsort, so
    # guard it explicitly); also lets us reject basins whose every cell is NaN.
    val = np.where(np.isfinite(value), value, -np.inf)
    order = np.lexsort((val, basin))            # basin asc, then value asc
    bs = basin[order]
    last = np.empty(bs.size, dtype=bool)
    last[-1] = True
    last[:-1] = bs[1:] != bs[:-1]               # last cell of each basin = its max
    sel = order[last]
    sb = basin[sel]

    counts = np.bincount(basin, minlength=nb)
    keep = (counts[sb] > min_cells) & np.isfinite(val[sel])
    sel, sb = sel[keep], sb[keep]

    df = pd.DataFrame({
        "basin": sb.astype(np.int64),
        "lon": lon[sel],
        "lat": lat[sel],
        "val": value[sel],
    })
    return df.sort_values("basin").reset_index(drop=True)


[docs] def basin_outflow(ncfile, min_cells=10, flow_var=None, sed_var=None, basin_var=None, lon_var=None, lat_var=None): """ Per-basin outflow points for one gridded NetCDF. :arg ncfile: path (or open ``netCDF4``/``xarray`` dataset) of a gridded surface — typically a ``gospl-grid`` ``surface*.nc``. :arg min_cells: basins with at most this many cells are ignored (default 10). :arg flow_var, sed_var, basin_var, lon_var, lat_var: override the variable names (defaults auto-detect the gridexport names, then the legacy ones). :return: ``{"flow": DataFrame, "sed": DataFrame}`` — each ``basin,lon,lat,val`` (``val`` in m³/yr), one row per basin, the cell where that basin's water discharge (``flow``) / sediment load (``sed``) is largest. """ flow, sed, basin, lon, lat = _read_grid( ncfile, flow_var, sed_var, basin_var, lon_var, lat_var) mlon, mlat = np.meshgrid(lon, lat) # (nlat, nlon), matches the fields basin = basin.ravel() flow = flow.ravel() sed = sed.ravel() lonf = mlon.ravel() latf = mlat.ravel() # Subaerial cells carrying a basin id (marine / outside = -1 or NaN fill). bint = np.where(np.isfinite(basin.astype(np.float64)), basin, -1).astype(np.int64) sub = bint >= 0 b = bint[sub] if b.size == 0: import pandas as pd empty = pd.DataFrame(columns=["basin", "lon", "lat", "val"]) return {"flow": empty, "sed": empty.copy()} nb = int(b.max()) + 1 flowdf = _basin_argmax(b, flow[sub], lonf[sub], latf[sub], nb, min_cells) seddf = _basin_argmax(b, sed[sub], lonf[sub], latf[sub], nb, min_cells) return {"flow": flowdf, "sed": seddf}
[docs] def catchment_flux(index, outdir=None, min_cells=10, verbose=True, flow_var=None, sed_var=None, basin_var=None, lon_var=None, lat_var=None): """ Batch :func:`basin_outflow` over a time series of gridded NetCDFs. :arg index: a CSV path with columns ``time,netcdf`` (paths relative to the CSV's directory are resolved), a ``pandas.DataFrame`` with those columns, or an iterable of ``(time, ncfile)`` pairs. :arg outdir: if given, write ``outdir/flow{time}.csv`` and ``outdir/sed{time}.csv`` (columns ``basin,lon,lat,val``). :return: ``{time: {"flow": DataFrame, "sed": DataFrame}}``. """ import pandas as pd from time import process_time base = "" if isinstance(index, (str, os.PathLike)): base = os.path.dirname(os.path.abspath(os.fspath(index))) rows = list(pd.read_csv(index)[["time", "netcdf"]].itertuples( index=False, name=None)) elif isinstance(index, pd.DataFrame): rows = list(index[["time", "netcdf"]].itertuples(index=False, name=None)) else: rows = list(index) if outdir is not None: os.makedirs(outdir, exist_ok=True) results = {} for time, ncfile in rows: t0 = process_time() path = os.fspath(ncfile) if base and not os.path.isabs(path) and not os.path.exists(path): cand = os.path.join(base, path) if os.path.exists(cand): path = cand if verbose: print("\nOpen output", path, flush=True) out = basin_outflow(path, min_cells=min_cells, flow_var=flow_var, sed_var=sed_var, basin_var=basin_var, lon_var=lon_var, lat_var=lat_var) results[time] = out if outdir is not None: step = int(time) if float(time).is_integer() else time out["flow"].to_csv(os.path.join(outdir, "flow%s.csv" % step), index=False) out["sed"].to_csv(os.path.join(outdir, "sed%s.csv" % step), index=False) if verbose: print(" + %d flow / %d sediment outlets (%.2f s)" % (len(out["flow"]), len(out["sed"]), process_time() - t0), flush=True) return results
# --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main(argv=None): p = argparse.ArgumentParser( description="Extract per-basin outflow (max water-discharge and " "max sediment-load cell) from gridded goSPL NetCDF output.", add_help=True, ) p.add_argument("-i", "--input", required=True, help="index CSV with columns time,netcdf") p.add_argument("-o", "--output", required=True, help="output folder for flow{t}.csv / sed{t}.csv") p.add_argument("--min-cells", type=int, default=10, help="ignore basins with <= this many cells (default 10)") p.add_argument("--flow-var", default=None, help="water-discharge variable (default: FA; e.g. fillFA for " "through-lake discharge; else flowDischarge)") p.add_argument("--sed-var", default=None, help="sediment-load variable (default: sedLoad, else sedimentLoad)") p.add_argument("--basin-var", default=None, help="basin-id variable (default: basin, else basinID)") p.add_argument("-q", "--quiet", action="store_true", help="no per-file log") args = p.parse_args(argv) catchment_flux(args.input, args.output, min_cells=args.min_cells, verbose=not args.quiet, flow_var=args.flow_var, sed_var=args.sed_var, basin_var=args.basin_var) return 0 if __name__ == "__main__": raise SystemExit(main())