Source code for analyse.provenance

"""
Sediment provenance / source-to-sink attribution from goSPL output.

Standalone post-processor (Approach A in ``docs/DESIGN_PROVENANCE.md``): given a
per-vertex **source class** (from user source-rock polygons) and a goSPL run, it
tracks the eroded material down the per-step flow network and accumulates, in
each sink, the deposited volume attributable to each source rock type — per
basin and per pixel (mesh node) — plus the mean transport distance and an
optional copper-fertility layer.

The core (:class:`ProvenanceTracker`) is pure NumPy and operates on arrays, so it
runs without GIS / HDF5 dependencies; the optional I/O helpers
(:func:`classes_from_shapefile`, :func:`tracker_from_gospl`) import ``geopandas``
/ ``h5py`` lazily.

Method (deterministic continuum of particle tracking, mass-conserving per step):
for each output step the net ``erodep`` change gives the eroded volume (sources)
and deposited volume (sinks); the eroded material — recycled from the local
deposited pile, plus fresh bedrock of the node's source class — is routed
downstream and laid down where the model deposits, carrying its provenance and
travelled distance. Routing is **multiple-flow-direction** by default (flux
splits across all lower neighbours by slope, so a source can feed several
basins), with single steepest descent available (``routing='sfd'``).

Caveats (see the design doc): it sees the **net** erodep per *output* step (not
the compute dt), re-derives the routing from the saved elevation, and is a
reconstruction — exact volumes need the in-model tracer (Approach B). Provenance
is not prospectivity: the Cu layer is a fertility-weighted detritus proxy,
upstream of the ore-forming process.

Inputs (all per-vertex arrays are in the **global input-mesh** ordering — the
``npdata`` mesh ``v``; build per-vertex labels with
:func:`classes_from_shapefile` and reassemble model fields with
:class:`GosplOutput`):

==================  ========  ========  ===================================================
input               shape     required  meaning / how to obtain
==================  ========  ========  ===================================================
``coords``          (N, 2|3)  yes       global mesh vertex coordinates (mesh ``v``)
``cells``           (M, 3)    yes*      triangular cells (mesh ``c``); *or* pass ``neighbours``
``source_class``    (N,)      yes       integer source-rock class per vertex, in
                                        ``[0, n_classes)`` — point-in-polygon of the source
                                        shapefiles (assign an "other"/background class to
                                        vertices outside every polygon; do **not** use -1)
``n_classes``       scalar    yes       number of source classes
``erodep``/``elev`` (N,)      yes       per step, fed to :meth:`step` (from ``GosplOutput``)
``area``            (N,)      no        Voronoi cell area (m²); defaults to 1. Provide it for
                                        correct volumes on irregular meshes (ratios/%, which
                                        are area-independent, are unaffected)
``basin_id``        (N,)      no        sink-basin label per vertex, ``-1`` = not in a sink.
                                        **Optional** — needed *only* for the per-basin %
                                        roll-up (:meth:`basin_percentages`); per-pixel
                                        provenance is produced everywhere regardless
``cu_weight``       (C,)      no        copper fertility per source class; needed only for
                                        :meth:`cu_fraction`
==================  ========  ========  ===================================================
"""

import numpy as np


[docs] def build_neighbours(cells, npoints): """ Build the vertex adjacency (CSR ``indptr``/``indices``) from triangular mesh cells — the undirected edge graph used for steepest-descent routing. :arg cells: (ntri, 3) integer vertex indices. :arg npoints: number of vertices. :return: (indptr, indices) CSR adjacency arrays. """ cells = np.asarray(cells) e = np.vstack( [cells[:, [0, 1]], cells[:, [1, 2]], cells[:, [2, 0]]] ) e = np.vstack([e, e[:, ::-1]]) # undirected e = np.unique(e, axis=0) counts = np.bincount(e[:, 0], minlength=npoints) indptr = np.zeros(npoints + 1, dtype=np.int64) indptr[1:] = np.cumsum(counts) indices = e[:, 1].astype(np.int64) # e sorted by col 0 via unique return indptr, indices
[docs] def steepest_receivers(elev, indptr, indices): """ Single-flow-direction (SFD) receivers: each node points to its lowest neighbour strictly below it, or to itself (a sink / outlet) if none is lower. :return: integer receiver array (length npoints). """ n = len(elev) rcv = np.arange(n, dtype=np.int64) for i in range(n): lo = elev[i] best = i for k in range(indptr[i], indptr[i + 1]): j = indices[k] if elev[j] < lo: lo = elev[j] best = j rcv[i] = best return rcv
[docs] def downhill_edges(elev, coords, indptr, indices, routing="mfd", exponent=1.0): """ Build the weighted downhill-flow graph as CSR edges ``(e_indptr, e_idx, e_weight, e_seg)``: for each node, the receivers it routes to, the fraction of its flux each takes, and the edge length. - ``routing='sfd'``: a single edge to the steepest-descent neighbour (weight 1); empty for sinks/outlets. - ``routing='mfd'``: an edge to **every** lower neighbour, weight :math:`\\propto \\text{slope}^{exponent}` and normalised to sum to one (multiple-flow-direction, as goSPL routes water/sediment). ``exponent`` is the flow-partition exponent (1 ≈ uniform-to-slope spreading). """ n = len(elev) elev = np.asarray(elev, dtype=np.float64) coords = np.asarray(coords, dtype=np.float64) # All adjacency edges as (src -> dst), vectorised (indices is CSR-grouped by # source, so `src` is the run-length expansion of the node ids). src = np.repeat(np.arange(n), np.diff(indptr)) dst = indices dz = elev[src] - elev[dst] down = dz > 0.0 # keep downhill edges only src, dst, dz = src[down], dst[down], dz[down] seg = np.linalg.norm(coords[dst] - coords[src], axis=1) slope = np.divide(dz, seg, out=dz.copy(), where=seg > 0.0) w = slope ** exponent if routing == "sfd": # One edge per source: the steepest. Edges are grouped by source; pick # the max-slope edge in each group. order = np.lexsort((-slope, src)) s_o = src[order] first = np.empty(len(s_o), dtype=bool) first[0] = True if len(s_o) else False first[1:] = s_o[1:] != s_o[:-1] keep = order[first] e_src, e_idx, e_seg = src[keep], dst[keep], seg[keep] e_w = np.ones(len(keep)) else: # MFD: normalise weights per source to sum to one. wsum = np.bincount(src, weights=w, minlength=n) e_src, e_idx, e_seg = src, dst, seg e_w = w / wsum[src] # CSR indptr from per-source edge counts (edges stay grouped by source). counts = np.bincount(e_src, minlength=n) e_indptr = np.zeros(n + 1, dtype=np.int64) e_indptr[1:] = np.cumsum(counts) return ( e_indptr, np.asarray(e_idx, dtype=np.int64), np.asarray(e_w, dtype=np.float64), np.asarray(e_seg, dtype=np.float64), )
def _sweep_impl(order, e_indptr, e_idx, e_w, e_seg, eroded, D): """ Topological routing/deposition sweep (high → low): route the per-class eroded flux downstream, depositing the model's volume ``D`` at each node with the passing composition, and accumulate travel distance. Pure-arithmetic and Numba-``njit``-compatible (no Python objects), so the same source serves as both the reference and the compiled fast path. Returns the per-step increments ``(dep, dep_dist, dep_vol, exported)``. """ n, nc = eroded.shape flux = eroded.copy() flux_dist = np.zeros(n) dep = np.zeros((n, nc)) dep_dist = np.zeros(n) dep_vol = np.zeros(n) exported = np.zeros(nc) for idx in range(n): i = order[idx] tot = 0.0 for c in range(nc): tot += flux[i, c] if tot <= 0.0: continue d = D[i] if D[i] < tot else tot # deposit min(D, flux) if d > 0.0: mean_here = flux_dist[i] / tot for c in range(nc): inc = flux[i, c] / tot * d dep[i, c] += inc flux[i, c] -= inc dep_dist[i] += mean_here * d dep_vol[i] += d flux_dist[i] -= mean_here * d tot -= d a = e_indptr[i] b = e_indptr[i + 1] if a == b: # outlet / sink: export for c in range(nc): exported[c] += flux[i, c] continue fd_i = flux_dist[i] for k in range(a, b): j = e_idx[k] w = e_w[k] for c in range(nc): flux[j, c] += w * flux[i, c] flux_dist[j] += w * (fd_i + tot * e_seg[k]) for c in range(nc): flux[i, c] = 0.0 flux_dist[i] = 0.0 return dep, dep_dist, dep_vol, exported def _get_sweep(method): """Resolve the sweep kernel: Numba-compiled when available/requested, else the pure-Python reference.""" if method == "python": return _sweep_impl try: import numba except ImportError: if method == "numba": raise ImportError("method='numba' needs numba (`pip install numba`)") return _sweep_impl # 'auto' fallback if not hasattr(_get_sweep, "_njit"): _get_sweep._njit = numba.njit(cache=True)(_sweep_impl) return _get_sweep._njit
[docs] class ProvenanceTracker: """ Accumulate source-to-sink sediment provenance over a goSPL run. Feed cumulative ``erodep`` snapshots in time order with :meth:`step`; read the attribution with :meth:`basin_percentages`, :meth:`pixel_fractions`, :meth:`mean_distance` and :meth:`cu_fraction`. """
[docs] def __init__( self, coords, source_class, n_classes, area=None, cells=None, neighbours=None, basin_id=None, cu_weight=None, routing="mfd", flow_exp=1.0, method="auto", ): """ **Required** :arg coords: (npoints, 2|3) global-mesh vertex coordinates (mesh ``v``); used for travel distance and to define the vertex ordering all the other per-vertex arrays must follow. :arg source_class: (npoints,) integer source-rock class per vertex, each value in ``[0, n_classes)``. Obtain by point-in-polygon of the source shapefiles (:func:`classes_from_shapefile`); give vertices outside every polygon a dedicated "other"/background class — **not** -1. :arg n_classes: number of source classes. **One of** ``cells`` / ``neighbours`` (the routing connectivity) :arg cells: (ntri, 3) triangular mesh cells (mesh ``c``) — adjacency is built from these. :arg neighbours: pre-built ``(indptr, indices)`` CSR adjacency (instead of ``cells``). **Optional** :arg area: (npoints,) Voronoi cell area (m²); defaults to 1 (volume == thickness). Provide it for correct volumes on irregular meshes; percentages/ratios are area-independent and unaffected. :arg basin_id: (npoints,) sink-basin label per vertex, ``-1`` = not in a sink basin. **Optional** — needed *only* for :meth:`basin_percentages` (the per-basin roll-up). Per-pixel provenance (:meth:`pixel_fractions`) is produced at every depositing node regardless. :arg cu_weight: (n_classes,) copper fertility per source class; needed only for :meth:`cu_fraction`. :arg routing: ``'mfd'`` (default, multiple-flow-direction — flux splits across all lower neighbours by slope, so a source can feed several basins) or ``'sfd'`` (single steepest descent). :arg flow_exp: MFD flow-partition exponent (slope power for the weights). :arg method: routing-sweep backend — ``'auto'`` (default: the Numba-compiled sweep when ``numba`` is installed, else pure Python), ``'numba'`` (require Numba), or ``'python'`` (reference). All give **identical** results (same topological sweep, including the ``min(D, flux)`` deposition cap). ``numba`` removes the Python per-node loop — the dominant cost on large meshes — for a several-fold (and growing with N) per-step speedup; the vectorised edge build / erosion is shared, so the overall gain is modest on small meshes. (A sparse transport-with-loss solve was evaluated and rejected — slower at realistic sizes due to LU fill-in, and only exact when no node is sediment-undersupplied; see ``docs/DESIGN_PROVENANCE.md``.) """ self.coords = np.asarray(coords, dtype=np.float64) self.npoints = self.coords.shape[0] self.source_class = np.asarray(source_class, dtype=np.int64) self.n_classes = int(n_classes) if self.source_class.shape != (self.npoints,): raise ValueError( "source_class must have one value per vertex (%d), got %s" % (self.npoints, self.source_class.shape) ) if self.source_class.min() < 0 or self.source_class.max() >= self.n_classes: raise ValueError( "source_class values must lie in [0, n_classes); assign a " "background/'other' class to vertices outside every source " "polygon (do not use -1)." ) self.area = ( np.ones(self.npoints) if area is None else np.asarray(area, dtype=np.float64) ) if neighbours is not None: self.indptr, self.indices = neighbours elif cells is not None: self.indptr, self.indices = build_neighbours(cells, self.npoints) else: raise ValueError("provide either `cells` or `neighbours`") self.basin_id = None if basin_id is None else np.asarray(basin_id, dtype=np.int64) self.cu_weight = None if cu_weight is None else np.asarray(cu_weight, dtype=np.float64) if routing not in ("mfd", "sfd"): raise ValueError("routing must be 'mfd' or 'sfd'") if method not in ("auto", "numba", "python"): raise ValueError("method must be 'auto', 'numba' or 'python'") self.routing = routing self.flow_exp = float(flow_exp) self.method = method self._sweep = _get_sweep(method) # State. self.pile = np.zeros((self.npoints, self.n_classes)) # stored deposit comp. self.dep = np.zeros((self.npoints, self.n_classes)) # cumulative deposited self._dep_dist = np.zeros(self.npoints) # Σ dist·vol deposited self._dep_vol = np.zeros(self.npoints) # Σ vol deposited self.exported = np.zeros(self.n_classes) # left via outlets self._erodep_prev = None
[docs] def step(self, elev, erodep): """ Process one output step. :arg elev: (npoints,) surface elevation at this step (filled elevation preferred — ``fillFA``'s surface — so routing has no spurious pits). :arg erodep: (npoints,) cumulative erosion-deposition at this step. """ erodep = np.asarray(erodep, dtype=np.float64) if self._erodep_prev is None: self._erodep_prev = erodep.copy() return vol = (erodep - self._erodep_prev) * self.area # +dep / -ero (m^3) self._erodep_prev = erodep.copy() # 1. Erosion sources: recycle the local pile first, then fresh bedrock. E = np.maximum(-vol, 0.0) pile_tot = self.pile.sum(axis=1) take = np.minimum(E, pile_tot) rfrac = np.divide(take, pile_tot, out=np.zeros_like(take), where=pile_tot > 0) recycled = self.pile * rfrac[:, None] self.pile -= recycled eroded = recycled bed = E - take np.add.at(eroded, (np.arange(self.npoints), self.source_class), bed) # 2. Route downstream and 3. deposit where the model does. elev = np.asarray(elev, dtype=np.float64) e_indptr, e_idx, e_w, e_seg = downhill_edges( elev, self.coords, self.indptr, self.indices, self.routing, self.flow_exp ) D = np.maximum(vol, 0.0) # deposition (m^3) order = np.argsort(elev)[::-1].astype(np.int64) # high -> low dep_inc, dist_inc, vol_inc, exp_inc = self._sweep( order, e_indptr, e_idx, e_w, e_seg, eroded, D ) self.dep += dep_inc self.pile += dep_inc # deposited -> local pile self._dep_dist += dist_inc self._dep_vol += vol_inc self.exported += exp_inc
# ----- results -----
[docs] def pixel_fractions(self): """Per-node deposited provenance fractions (npoints, n_classes); rows that received no deposit are all-zero.""" tot = self.dep.sum(axis=1, keepdims=True) return np.divide(self.dep, tot, out=np.zeros_like(self.dep), where=tot > 0)
[docs] def mean_distance(self): """Per-node flux-weighted mean source->deposition transport distance.""" return np.divide( self._dep_dist, self._dep_vol, out=np.full(self.npoints, np.nan), where=self._dep_vol > 0, )
[docs] def basin_percentages(self): """ Dict ``{basin_id: array(n_classes)}`` of the % contribution of each source class to each sink basin. Requires ``basin_id``. """ if self.basin_id is None: raise ValueError("basin_id was not provided") out = {} for b in np.unique(self.basin_id): if b < 0: continue v = self.dep[self.basin_id == b].sum(axis=0) s = v.sum() out[int(b)] = (100.0 * v / s) if s > 0 else np.zeros(self.n_classes) return out
[docs] def cu_fraction(self): """Per-node Cu-sourced fraction of the deposit (needs ``cu_weight``).""" if self.cu_weight is None: raise ValueError("cu_weight was not provided") tot = self.dep.sum(axis=1) cu = self.dep @ self.cu_weight return np.divide(cu, tot, out=np.zeros_like(cu), where=tot > 0)
# --------------------------------------------------------------------------- # # Optional I/O (lazy GIS / HDF5 imports) # --------------------------------------------------------------------------- #
[docs] def classes_from_shapefile( coords, shapefile, attribute, background="other", crs=None ): """ Assign each vertex an integer class by point-in-polygon against a source-rock (or sink-basin) polygon shapefile. The result is directly usable as the tracker's ``source_class``: every vertex gets a class in ``[0, n_classes)``, with vertices outside every polygon assigned a **background class** (so there are no ``-1`` sentinels). Lazily imports ``geopandas``/``shapely``. :arg coords: (npoints, 2|3) vertex coordinates in the shapefile's CRS (only the first two columns are used). :arg shapefile: path to a polygon shapefile. :arg attribute: polygon attribute holding the class label. :arg background: label for vertices outside every polygon (appended as its own class). Set to ``None`` to instead return ``-1`` for those vertices (e.g. when building a ``basin_id``, where -1 means "not a sink"). :arg crs: optional CRS to reproject the polygons to before testing. :return: ``(class_array, {label: int} mapping)``. For ``source_class`` use the array as-is; for ``basin_id`` pass ``background=None``. """ try: import geopandas as gpd from shapely.geometry import Point except ImportError as exc: # pragma: no cover - optional dependency raise ImportError( "classes_from_shapefile needs geopandas + shapely " "(`pip install geopandas`)" ) from exc gdf = gpd.read_file(shapefile) if crs is not None: gdf = gdf.to_crs(crs) pts = gpd.GeoDataFrame( geometry=[Point(xy) for xy in coords[:, :2]], crs=gdf.crs ) joined = gpd.sjoin(pts, gdf[[attribute, "geometry"]], how="left", predicate="within") joined = joined[~joined.index.duplicated(keep="first")] labels = joined[attribute].to_numpy() uniq = [u for u in np.unique(labels[~_isnull(labels)])] mapping = {u: i for i, u in enumerate(uniq)} out = np.full(len(coords), -1, dtype=np.int64) for i, lab in enumerate(labels): if not _isnull(lab): out[i] = mapping[lab] if background is not None and (out < 0).any(): bg = len(mapping) mapping[background] = bg out[out < 0] = bg return out, mapping
def _isnull(v): try: return bool(np.isnan(v)) except (TypeError, ValueError): return v is None
[docs] class GosplOutput: """ Reader that reassembles **global** per-vertex fields from goSPL's *partitioned* HDF5 output. goSPL writes one file per MPI partition per step (``<base>.<step>.p<rank>.h5``) holding only that rank's local nodes, with the partition meshes in ``topology.p<rank>.h5``. Provenance routing is global, so this reader maps each partition's local nodes onto the **global input mesh** ordering with a KDTree — the same local↔global map goSPL builds at load time (``tree.query(lcoords)``) — and scatters each field into a global array. Pass the global mesh coordinates (the ``npdata`` mesh ``v``) so the assembled arrays line up with your per-vertex ``source_class`` / ``basin_id``. """
[docs] def __init__(self, h5dir, global_coords, file_base="gospl"): import glob import os import h5py from scipy.spatial import cKDTree self.h5dir = h5dir self.file_base = file_base self.npoints = len(global_coords) tfiles = sorted(glob.glob(os.path.join(h5dir, "topology.p*.h5"))) if not tfiles: raise FileNotFoundError("no topology.p*.h5 in %s" % h5dir) tree = cKDTree(np.asarray(global_coords)) self.ranks, self.maps = [], [] for tf in tfiles: rank = int(os.path.basename(tf).split(".p")[1].split(".h5")[0]) with h5py.File(tf, "r") as f: lc = np.asarray(f["coords"]) _, gid = tree.query(lc) # local node -> global vertex id self.ranks.append(rank) self.maps.append(gid.astype(np.int64))
[docs] def field(self, step, name): """Assemble a global per-vertex array for ``name`` at output ``step``.""" import os import h5py out = np.zeros(self.npoints) for rank, m in zip(self.ranks, self.maps): df = os.path.join( self.h5dir, "%s.%d.p%d.h5" % (self.file_base, step, rank) ) with h5py.File(df, "r") as f: v = np.asarray(f[name]) out[m] = v[:, 0] if v.ndim > 1 else v # ghost overlaps agree return out
def _spec(s): """Split a ``file.npz:key`` argument into (path, key).""" path, _, key = s.rpartition(":") if not path: raise ValueError("expected file.npz:key, got %r" % s) return path, key def main(argv=None): """ CLI: attribute deposited sediment in sink basins to source rock types over a goSPL run, and write a per-basin CSV + a per-pixel ``.npz``. The global input mesh (``--mesh-npz``, the ``npdata`` file) defines the vertex ordering; goSPL's *partitioned* output (``--h5dir`` with ``<base>.<step>.p<rank>.h5`` + ``topology.p*.h5``) is reassembled onto it by :class:`GosplOutput`. Source classes and basins are per-vertex ``.npz`` arrays in that same global ordering (rasterise your shapefiles with :func:`classes_from_shapefile`). Routing defaults to MFD. """ import argparse p = argparse.ArgumentParser(description="goSPL sediment provenance attribution") p.add_argument("--mesh-npz", required=True, type=_spec, help="global input mesh coords, as file.npz:key (e.g. mesh.npz:v)") p.add_argument("--cells", type=_spec, default=None, help="global mesh cells, as file.npz:key (e.g. mesh.npz:c)") p.add_argument("--h5dir", required=True, help="goSPL output h5/ directory (partitioned files)") p.add_argument("--file-base", default="gospl", help="output file base name") p.add_argument("--steps", required=True, help="output steps, inclusive range 'a:b' or comma list") p.add_argument("--source", required=True, type=_spec, help="per-vertex source-class array, file.npz:key (int)") p.add_argument("--n-classes", type=int, default=None) p.add_argument("--basins", type=_spec, default=None, help="per-vertex sink-basin id array file.npz:key (-1 = none)") p.add_argument("--cu-weights", default=None, help="comma-separated Cu fertility per class") p.add_argument("--routing", choices=["mfd", "sfd"], default="mfd") p.add_argument("--flow-exp", type=float, default=1.0) p.add_argument("--method", choices=["auto", "numba", "python"], default="auto", help="routing-sweep backend (numba is ~50-100x faster on large meshes)") p.add_argument("--elev-field", default="elev", help="HDF5 field used as the routing surface (e.g. waterFill)") p.add_argument("--out-prefix", required=True) args = p.parse_args(argv) mp, mk = args.mesh_npz coords = np.load(mp)[mk] cells = None if args.cells is not None: cp, ck = args.cells cells = np.load(cp)[ck] if ":" in args.steps and args.steps.count(":") == 1 and "," not in args.steps: a, b = (int(x) for x in args.steps.split(":")) steps = range(a, b + 1) else: steps = [int(x) for x in args.steps.split(",")] sp, sk = args.source source_class = np.load(sp)[sk].astype(np.int64) n_classes = args.n_classes or int(source_class.max() + 1) basin_id = None if args.basins is not None: bp, bk = args.basins basin_id = np.load(bp)[bk].astype(np.int64) cu_weight = None if args.cu_weights is not None: cu_weight = np.array([float(x) for x in args.cu_weights.split(",")]) import h5py reader = GosplOutput(args.h5dir, coords, file_base=args.file_base) t = ProvenanceTracker( coords, source_class, n_classes, cells=cells, basin_id=basin_id, cu_weight=cu_weight, routing=args.routing, flow_exp=args.flow_exp, method=args.method, ) # Per-step output: one HDF5 holding the cumulative provenance state after # each step (so the evolution is saved and can be viewed in ParaView), and a # per-basin time series CSV. steps = list(steps) basin_rows = [] with h5py.File(args.out_prefix + ".h5", "w") as h: g = h.create_group("mesh") g["coords"] = coords if cells is not None: g["cells"] = cells h["steps"] = np.asarray(steps) for s in steps: t.step(reader.field(s, args.elev_field), reader.field(s, "erodep")) grp = h.create_group("step_%d" % s) frac = t.pixel_fractions().astype("float32") grp.create_dataset("fractions", data=frac, compression="gzip") grp.create_dataset( "dominant", data=np.where(frac.sum(1) > 0, frac.argmax(1), -1).astype("int32"), compression="gzip", ) grp.create_dataset( "distance", data=t.mean_distance().astype("float32"), compression="gzip" ) if cu_weight is not None: grp.create_dataset( "cu_fraction", data=t.cu_fraction().astype("float32"), compression="gzip", ) if basin_id is not None: for b, pct in t.basin_percentages().items(): basin_rows.append((s, b, pct)) print("wrote %s.h5 (%d steps)" % (args.out_prefix, len(steps))) if cells is not None: write_xdmf( args.out_prefix, len(coords), len(cells), coords.shape[1], n_classes, steps, cu_weight is not None, ) print("wrote %s.xdmf (open in ParaView)" % args.out_prefix) if basin_id is not None: with open(args.out_prefix + "_basins.csv", "w") as fh: fh.write("step,basin," + ",".join("class%d" % c for c in range(n_classes)) + "\n") for s, b, pct in basin_rows: fh.write("%d,%d,%s\n" % (s, b, ",".join("%.3f" % v for v in pct))) print("wrote %s_basins.csv (per-step basin %%)" % args.out_prefix) return 0
[docs] def write_xdmf(prefix, npoints, ncells, geom_dim, n_classes, steps, has_cu): """ Write an XDMF temporal-collection wrapper for the provenance HDF5 so the per-step results open as a time series in ParaView (it references the ``<prefix>.h5`` datasets — keep the two files side by side). Exposes, per step: ``dominant`` source (scalar), ``distance``, optional ``cu_fraction``, and each source's contribution ``frac_class<c>`` (a HyperSlab column of the ``fractions`` array). """ import os base = os.path.basename(prefix) + ".h5" geom = "XYZ" if geom_dim == 3 else "XY" def _scalar(name, path, dtype="Float", prec=4): return ( ' <Attribute Name="%s" AttributeType="Scalar" Center="Node">\n' ' <DataItem Format="HDF" NumberType="%s" Precision="%d" ' 'Dimensions="%d 1">%s:%s</DataItem>\n' ' </Attribute>\n' % (name, dtype, prec, npoints, base, path) ) def _frac_col(c, sp): # HyperSlab selecting column c of the (npoints, n_classes) fractions. return ( ' <Attribute Name="frac_class%d" AttributeType="Scalar" Center="Node">\n' ' <DataItem ItemType="HyperSlab" Dimensions="%d 1">\n' ' <DataItem Dimensions="3 2" Format="XML">0 %d 1 1 %d 1</DataItem>\n' ' <DataItem Format="HDF" NumberType="Float" Precision="4" ' 'Dimensions="%d %d">%s:/step_%d/fractions</DataItem>\n' ' </DataItem>\n' ' </Attribute>\n' % (c, npoints, c, npoints, npoints, n_classes, base, sp) ) lines = [ '<?xml version="1.0" ?>\n<!DOCTYPE Xdmf SYSTEM "Xdmf.dtd" []>\n', '<Xdmf Version="2.0">\n <Domain>\n', ' <Grid Name="Provenance" GridType="Collection" CollectionType="Temporal">\n', ] for s in steps: lines.append(' <Grid Name="step_%d" GridType="Uniform">\n' % s) lines.append(' <Time Value="%d"/>\n' % s) lines.append( ' <Topology TopologyType="Triangle" NumberOfElements="%d">\n' ' <DataItem Format="HDF" DataType="Int" Dimensions="%d 3">%s:/mesh/cells</DataItem>\n' ' </Topology>\n' % (ncells, ncells, base) ) lines.append( ' <Geometry GeometryType="%s">\n' ' <DataItem Format="HDF" NumberType="Float" Precision="8" ' 'Dimensions="%d %d">%s:/mesh/coords</DataItem>\n' ' </Geometry>\n' % (geom, npoints, geom_dim, base) ) lines.append(_scalar("dominant", "/step_%d/dominant" % s, dtype="Int")) lines.append(_scalar("distance", "/step_%d/distance" % s)) if has_cu: lines.append(_scalar("cu_fraction", "/step_%d/cu_fraction" % s)) for c in range(n_classes): lines.append(_frac_col(c, s)) lines.append(' </Grid>\n') lines.append(' </Grid>\n </Domain>\n</Xdmf>\n') with open(prefix + ".xdmf", "w") as f: f.writelines(lines)
if __name__ == "__main__": raise SystemExit(main())