Source code for sed.sedplex

import os
import gc
import sys
import petsc4py
import numpy as np
import numpy_indexed as npi

from mpi4py import MPI
from time import process_time

if "READTHEDOCS" not in os.environ:
    from gospl._fortran import setmaxnb

MPIrank = petsc4py.PETSc.COMM_WORLD.Get_rank()


[docs] class SEDMesh(object): """ This class encapsulates all the functions related to sediment transport and deposition in the continental domain. """ def __init__(self, *args, **kwargs): """ The initialisation of `SEDMesh` class consists in the declaration of several PETSc vectors. """ # Petsc vectors self.tmp = self.hGlobal.duplicate() self.tmpL = self.hLocal.duplicate() self.tmp1 = self.hGlobal.duplicate() self.Qs = self.hGlobal.duplicate() self.QsL = self.hLocal.duplicate() self.nQs = self.hLocal.duplicate() self.vSed = self.hGlobal.duplicate() self.vSedLocal = self.hLocal.duplicate() # Get the maximum number of neighbours on the mesh maxnb = np.zeros(1, dtype=np.int64) maxnb[0] = setmaxnb(self.lpoints) MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, maxnb, op=MPI.MAX) self.maxnb = maxnb[0] return
[docs] def _getSedFlux(self): """ This function computes sediment flux in cubic metres per year from incoming rivers. Like for the computation of the flow discharge and erosion rates, the sediment flux is solved by an implicit time integration method, the matrix system is the one obtained from the receiver distributions over the unfilled elevation mesh for the flow discharge (`fMat`). The PETSc *scalable linear equations solvers* (**KSP**) is used here again with an iterative method obtained from PETSc Richardson solver (`richardson`) with block Jacobian preconditioning (`bjacobi`). """ t0 = process_time() # Stratigraphic layers exist if self.stratNb > 0: # thCoarse is already an erosion-positive rate (m/yr) from # stratplex.erodeStrat; use as-is. self.tmpL.setArray(self.thCoarse) self.dm.localToGlobal(self.tmpL, self.tmp) else: # Eb is in the thickness-rate convention (positive deposition, # negative incision); the upstream-integration solve below # wants an erosion-positive source so that vSed (m^3/yr) # accumulates as positive sediment flux downstream. Negate. self.Eb.copy(result=self.tmp) self.tmp.scale(-1.0) # Get the volume of sediment transported in m3/yr self.tmp.pointwiseMult(self.tmp, self.areaGlobal) self._solve_KSP(False, self.fMati, self.tmp, self.vSed) self.fMati.destroy() # Update local vector self.dm.globalToLocal(self.vSed, self.vSedLocal) if MPIrank == 0 and self.verbose: print( "Update Sediment Load (%0.02f seconds)" % (process_time() - t0), flush=True, ) petsc4py.PETSc.garbage_cleanup() return
[docs] def _moveDownstream(self, vSed, step): """ In cases where river sediment fluxes drain into depressions, they might fill the sink completely and overspill or be deposited in it. This function computes the excess of sediment (if any) able to flow dowstream. .. important:: The excess sediment volume is then added to the downstream sediment flux (`vSed`). :arg vSed: excess sediment volume array :arg step: downstream distribution step :return: (excess, sedFilled_changed) where excess is True when sediment still needs to be redistributed, and sedFilled_changed is True when at least one pit saturated this iteration (signalling that the flow direction matrix should be rebuilt next call). """ excess = False sedFilled_changed = False # Remove points belonging to other processors vSed = np.multiply(vSed, self.inIDs) # Get volume incoming in each depression grp = npi.group_by(self.pitIDs[self.lsink]) uID = grp.unique _, vol = grp.sum(vSed[self.lsink]) inV = np.zeros(len(self.pitParams), dtype=np.float64) ids = uID > -1 inV[uID[ids]] = vol[ids] # Combine incoming volume globally MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, inV, op=MPI.SUM) # Get excess volume to distribute downstream eV = inV - self.pitVol if (eV > 0.0).any(): eIDs = eV > 0.0 self.pitVol[eIDs] = 0.0 spillIDs = self.pitInfo[eIDs, 0] localSpill = np.where(self.pitInfo[eIDs, 1] == MPIrank)[0] localPts = spillIDs[localSpill] nvSed = np.zeros(self.lpoints, dtype=np.float64) nvSed[localPts] = eV[eIDs][localSpill] ids = np.isin(self.pitIDs, np.where(eV > 0.0)[0]) self.sedFilled[ids] = self.lFill[ids] sedFilled_changed = True # Update unfilled depressions volumes eIDs = eV < 0 self.pitVol[eIDs] -= inV[eIDs] self.pitVol[self.pitVol < 0] = 0.0 # In case there is still remaining sediment flux to distribute downstream if (eV > 1.0e-3).any(): # TODO-REFACTOR: value matches DEPOSIT_FLOOR but distinct role (sediment-routing convergence threshold); do not replace # Only rebuild the flow direction matrix when the topography has # actually changed (a pit saturated this iteration), or on the # very first iteration when no matrix exists yet, or on the # special step==100 fallback that switches to lFill. Otherwise # reuse the matrix built on the previous iteration. need_rebuild = ( sedFilled_changed or step == 0 or step == 100 ) if need_rebuild: if step > 0 and self.fMat is not None: self.fMat.destroy() if step == 100: self._buildFlowDirection(self.lFill) else: self._buildFlowDirection(self.sedFilled) self.tmpL.setArray(nvSed) self.dm.localToGlobal(self.tmpL, self.tmp) if self.tmp.sum() > 0.5 * self.maxarea[0]: excess = True self._solve_KSP(True, self.fMat, self.tmp, self.tmp1) self.dm.globalToLocal(self.tmp1, self.tmpL) # Note: matrix lifecycle moved to _distributeSediment so we don't # destroy/rebuild on every iteration when the topography is stable. return excess, sedFilled_changed
[docs] def _distributeSediment(self, hl): """ This function finds the continental sediment volumes to distribute downstream (down to the ocean or sinks) for a given iteration. :arg hl: local elevation prior deposition """ t0 = process_time() # Define the volume to distribute self.vSedLocal.copy(result=self.QsL) self.dm.localToGlobal(self.QsL, self.Qs) # Get the volumetric sediment rate (m3/yr) to distribute during the time step and convert it in volume (m3) vSed = self.QsL.getArray().copy() * self.dt # Drop any matrix from the prior context; _moveDownstream will rebuild # on iteration 0 and only rebuild thereafter when sedFilled changes. self.fMat.destroy() self.fMat = None step = 0 excess = True max_iters = 5000 while excess: if step >= max_iters: if MPIrank == 0: print( "Continental sediment routing did not converge after " "%d iterations; continuing." % max_iters, flush=True, ) break t1 = process_time() excess, _ = self._moveDownstream(vSed, step) if excess: vSed = self.tmpL.getArray().copy() nvSed = vSed.copy() nvSed[hl < self.sedFilled] = 0.0 self.tmpL.setArray(nvSed / self.dt) vSed[self.idBorders] = 0.0 self.vSedLocal.axpy(1.0, self.tmpL) if MPIrank == 0 and self.verbose: print( "Downstream sediment flux computation step %d (%0.02f seconds)" % (step, process_time() - t1), flush=True, ) step += 1 # Final cleanup of the cached flow matrix if self.fMat is not None: self.fMat.destroy() self.fMat = None self.dm.localToGlobal(self.vSedLocal, self.vSed) if MPIrank == 0 and self.verbose: print( "Distribute Continental Sediments (%0.02f seconds)" % (process_time() - t0), flush=True, ) return
def _bottomUpDelta(self, hl, depo, pit_select): """ Compute per-node deposit thickness for the pits flagged in `pit_select` using the bottom-up rule: lake surface stays horizontal at the level matching the deposited volume. :arg hl: local elevation prior deposition :arg depo: per-pit deposited volume (m^3) :arg pit_select: boolean array (num_pits) selecting which pits to fill :return: per-node delta thickness (m); zero outside selected pits """ num_pits = len(self.pitParams) target_lvl = np.full(num_pits, -np.inf, dtype=np.float64) active = np.where(pit_select & (depo > 0))[0] for p in active: pit_min = self.pitParams[p, 1] - self.pitParams[p, 2] vols = np.concatenate(([0.0], self.filled_vol[p])) lvls = np.concatenate(([pit_min], self.filled_lvl[p])) target_lvl[p] = np.interp(depo[p], vols, lvls) node_target_lvl = np.full(self.lpoints, -np.inf, dtype=np.float64) in_pit = self.pitIDs >= 0 node_target_lvl[in_pit] = target_lvl[self.pitIDs[in_pit]] surface = np.minimum(node_target_lvl, self.lFill) delta = np.maximum(0.0, surface - hl) # Mask out non-selected pits if active.size > 0: sel_node = np.isin(self.pitIDs, active) else: sel_node = np.zeros(self.lpoints, dtype=bool) delta[~sel_node] = 0.0 return delta def _spillCoords(self): """ Globally-known XYZ coordinates of each pit's spill point. Each rank knows its own spill points (those it owns); we publish the full table by zero-filling the others and Allreducing with SUM. :return: (num_pits, 3) numpy array of spill coordinates """ num_pits = len(self.pitParams) spill_xyz = np.zeros((num_pits, 3), dtype=np.float64) spill_idx = self.pitInfo[:, 0].astype(int) spill_owner = self.pitInfo[:, 1].astype(int) my_pits = np.where(spill_owner == MPIrank)[0] if my_pits.size > 0: valid = spill_idx[my_pits] >= 0 mp = my_pits[valid] spill_xyz[mp] = self.lcoords[spill_idx[mp]] MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, spill_xyz, op=MPI.SUM) return spill_xyz def _addPitMicroTilt(self, delta, pit_select): """ Tilt each lake's deposited surface very slightly toward its spill point so the next flow-routing step doesn't have to deal with large flat areas. The tilt is mass-conservative per pit (mean removed) and small enough (~1e-6 m/m) to be physically negligible. :arg delta: per-node deposit thickness from bottom-up fill :arg pit_select: boolean array (num_pits) selecting tilted pits :return: tilted delta (same shape, mass preserved per pit) """ num_pits = len(self.pitParams) # pit_select is globally consistent (built from pitParams / pitVol # which are MPI-reduced), so this early-exit is collective-safe. # We must NOT add an early-exit on sel_node.any() below: that mask is # local-only, so ranks with zero locally-selected cells would skip # the Allreduce inside _spillCoords and the per-pit reduce while # other ranks wait, causing a hang. if not pit_select.any(): return delta active_pit = np.where(pit_select)[0] sel_node = np.isin(self.pitIDs, active_pit) & (delta > 0) spill_xyz = self._spillCoords() # Distance from each selected node to its pit's spill in_pit = self.pitIDs >= 0 pit_safe = np.where(in_pit, self.pitIDs, 0) diff_xyz = self.lcoords - spill_xyz[pit_safe] dist = np.linalg.norm(diff_xyz, axis=1) # Per-pit mean distance over selected nodes (MPI reduced) # Build local per-pit sums and counts then reduce. local_sum = np.zeros(num_pits, dtype=np.float64) local_cnt = np.zeros(num_pits, dtype=np.float64) sel_local_owned = sel_node & (self.inIDs == 1) if sel_local_owned.any(): ids = self.pitIDs[sel_local_owned] grp = npi.group_by(ids) up, sums = grp.sum(dist[sel_local_owned]) _, cnts = grp.sum(np.ones_like(dist[sel_local_owned])) local_sum[up] = sums local_cnt[up] = cnts MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, local_sum, op=MPI.SUM) MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, local_cnt, op=MPI.SUM) with np.errstate(divide="ignore", invalid="ignore"): mean_dist = np.where( local_cnt > 0, local_sum / np.maximum(local_cnt, 1.0), 0.0 ) # Slope coefficient: ~1e-6 m/m. Lake surface is higher far from the # spill, lower near it -- gentle gradient pointing toward outlet. slope_coeff = 1.0e-6 tilt = slope_coeff * (dist - mean_dist[pit_safe]) tilt[~sel_node] = 0.0 # Final delta with tilt; clip negatives in case tilt would push the # surface below bedrock (shouldn't happen with such a small slope). tilted = delta + tilt tilted[tilted < 0] = 0.0 return tilted def _identifyPitInlets(self): """ Identify cells that receive flow from a donor outside their own pit. These are the natural inlets where upstream sediment enters a depression, used as the initial-pile location for the non-linear diffusion path in large/deep partially-filled pits. :return: boolean array (lpoints) marking inlet cells (owned cells only) """ if not hasattr(self, "rcvIDi"): return np.zeros(self.lpoints, dtype=bool) flowDir = self.rcvIDi.shape[1] src = np.repeat(np.arange(self.lpoints), flowDir) dst = self.rcvIDi.flatten().astype(int) # Drop self-loops (cells with no defined receiver) good = src != dst src = src[good] dst = dst[good] src_pit = self.pitIDs[src] dst_pit = self.pitIDs[dst] dst_in_pit = dst_pit >= 0 cross_pit = src_pit != dst_pit owned_dst = self.inIDs[dst] == 1 inlet_edges = dst_in_pit & cross_pit & owned_dst inlet_mask = np.zeros(self.lpoints, dtype=bool) if inlet_edges.any(): inlet_mask[dst[inlet_edges]] = True return inlet_mask def _diffuseLargePit(self, hl, depo, pit_select): """ Deposit sediment in large/deep partially-filled pits. The initial deposit shape is a bathymetric bottom-up fill (most of the volume distributed proportional to water depth below the target lake level) combined with a small inlet bias that seeds delta progradation. Marine-style non-linear diffusion then refines the surface, and a per-pit rescale at the end enforces exact volume conservation. :arg hl: local elevation prior deposition :arg depo: per-pit deposited volume :arg pit_select: boolean array (num_pits) selecting diffused pits :return: per-node delta thickness (m) """ num_pits = len(self.pitParams) active = np.where(pit_select & (depo > 0))[0] if active.size == 0: return np.zeros(self.lpoints, dtype=np.float64) # Build node-level mask for selected pits sel_node = np.isin(self.pitIDs, active) # Identify inlets among selected pits inlet_mask = self._identifyPitInlets() & sel_node # Per-pit inlet count (globally) inlet_count = np.zeros(num_pits, dtype=np.float64) if inlet_mask.any(): ids = self.pitIDs[inlet_mask] grp = npi.group_by(ids) up, cnts = grp.sum(np.ones_like(ids, dtype=np.float64)) inlet_count[up] = cnts MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, inlet_count, op=MPI.SUM) # Pits in the active set that have no detected inlet on any rank # (rare; can happen for orphan basins with all-flat boundaries) fall # back to the highest in-pit cell as a one-cell pile. no_inlet = active[inlet_count[active] == 0] if no_inlet.size > 0: # Batched two-step reduction so the cost is independent of # len(no_inlet). Step 1 finds the global max elevation per pit; # step 2 finds which rank held that max (lowest rank wins ties). # The previous element-wise MAX over [elev, rank, idx] was # incorrect because the rank field maxes independently of elev, # so only the highest MPIrank ever wrote, regardless of which # rank actually owned the highest cell. mpisize = MPI.COMM_WORLD.Get_size() local_top_elev = np.full(num_pits, -np.inf, dtype=np.float64) local_top_idx = np.full(num_pits, -1, dtype=np.int64) for p in no_inlet: pit_local = np.where( (self.pitIDs == p) & (self.inIDs == 1) )[0] if pit_local.size > 0: k = np.argmax(hl[pit_local]) local_top_elev[p] = hl[pit_local[k]] local_top_idx[p] = pit_local[k] # Step 1: max elevation per pit, globally global_top_elev = local_top_elev.copy() MPI.COMM_WORLD.Allreduce( MPI.IN_PLACE, global_top_elev, op=MPI.MAX ) # Step 2: each rank publishes its rank if it owns a cell whose # elevation matches the global max for that pit; sentinel mpisize # otherwise. MIN reduction picks the lowest-ranked match (stable # tie-break). winner = np.full(num_pits, mpisize, dtype=np.int64) for p in no_inlet: if ( local_top_idx[p] >= 0 and abs(local_top_elev[p] - global_top_elev[p]) < 1.0e-12 ): winner[p] = MPIrank MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, winner, op=MPI.MIN) # Only the winner rank writes the inlet mask + bumps inlet_count for p in no_inlet: if ( int(winner[p]) == MPIrank and local_top_idx[p] >= 0 ): inlet_mask[int(local_top_idx[p])] = True inlet_count[p] = 1.0 # Step 3: sync inlet_count so all ranks see the bumps MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, inlet_count, op=MPI.MAX) # Initial deposit shape: (1 - bias_frac) of each pit's volume is # distributed proportional to local water depth below the target lake # level (a bottom-up bathymetric fill), and bias_frac is concentrated # at the inlets to seed delta progradation. Starting the diffusion # solver from a near-final lake-floor geometry avoids the rim-only # blocky artefact produced by a pure inlet-point pile. The fraction # is configurable via the `nlPitInletBias` YAML key (clamped [0, 1]). bias_frac = self.nl_pit_inlet_bias # Target lake water level per active pit (same interp the bottom-up # path uses; volume -> level mapping built when the pit was labelled). target_lvl = np.full(num_pits, -np.inf, dtype=np.float64) for p in active: pit_min = self.pitParams[p, 1] - self.pitParams[p, 2] vols = np.concatenate(([0.0], self.filled_vol[p])) lvls = np.concatenate(([pit_min], self.filled_lvl[p])) target_lvl[p] = np.interp(depo[p], vols, lvls) in_pit = self.pitIDs >= 0 pit_safe = np.where(in_pit, self.pitIDs, 0) node_lvl = np.where(in_pit, target_lvl[pit_safe], -np.inf) water_depth = np.maximum(0.0, node_lvl - hl) water_depth[~sel_node] = 0.0 # Per-pit sum of water_depth * area over owned cells, MPI-reduced. denom = np.zeros(num_pits, dtype=np.float64) owned = (self.inIDs == 1) & sel_node if owned.any(): ids = self.pitIDs[owned] grp = npi.group_by(ids) up, sums = grp.sum((water_depth * self.larea)[owned]) denom[up] = sums MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, denom, op=MPI.SUM) # Bathymetric baseline: thick where the lake is deep, thin near rim. delta_init = np.zeros(self.lpoints, dtype=np.float64) denom_node = denom[pit_safe] ok = (denom_node > 0) & sel_node delta_init[ok] = ( (1.0 - bias_frac) * depo[pit_safe[ok]] * water_depth[ok] / np.maximum(denom_node[ok], 1e-30) ) # Inlet bias: bias_frac * depo[p] / (n_inlets * area). For pits where # the bathymetric baseline is degenerate (denom == 0) fall back to # the original inlet-only behaviour so all the volume is still placed. bias_at_inlet = np.where(denom > 0, bias_frac, 1.0) if inlet_mask.any(): iIDs = np.where(inlet_mask)[0] for c in iIDs: p = self.pitIDs[c] if inlet_count[p] > 0: delta_init[c] += ( bias_at_inlet[p] * depo[p] / (inlet_count[p] * self.larea[c]) ) # Run non-linear diffusion over the selected pit cells. Diffusion is # zero outside this mask so deposit cannot spread out of the basin. delta_smooth = self._diffuseImplicit( delta_init, sel_node, self.nl_pit_K, label="lake" ) # Per-pit mass rescale: enforce sum(delta_smooth * larea) == depo[p] # Compute global sum per pit actual_vol = np.zeros(num_pits, dtype=np.float64) owned = (self.inIDs == 1) & sel_node if owned.any(): ids = self.pitIDs[owned] vol_per_node = (delta_smooth * self.larea)[owned] grp = npi.group_by(ids) up, sums = grp.sum(vol_per_node) actual_vol[up] = sums MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, actual_vol, op=MPI.SUM) with np.errstate(divide="ignore", invalid="ignore"): scale = np.where( actual_vol > 0, depo / np.maximum(actual_vol, 1e-30), 1.0 ) in_pit = self.pitIDs >= 0 pit_safe = np.where(in_pit, self.pitIDs, 0) node_scale = scale[pit_safe] node_scale[~sel_node] = 0.0 delta_smooth = delta_smooth * node_scale return delta_smooth
[docs] def _updateSinks(self, hl): """ Update depression elevations based on incoming sediment volumes. Three-path algorithm depending on the pit's fill state and size: 1. Pit fully filled (depo >= pitVolume) -- bottom-up to lFill (rim). 2. Partial fill, pit small/shallow (volume below `nl_pit_volume` OR depth below `nl_pit_depth`) -- bottom-up + per-pit micro-tilt toward the spill point so the lake surface is not perfectly flat (avoids large flats in the next flow-routing step). 3. Partial fill, pit large AND deep -- inlet-pile initial condition + marine-style non-linear diffusion. Produces emergent clinoform-like deposit progradation from the inlets. :arg hl: local elevation prior deposition """ depo = self.pitParams[:, 0] - self.pitVol depo[depo < 0] = 0.0 pit_volume = self.pitParams[:, 0] pit_depth = self.pitParams[:, 2] # Path classification active = depo > 0 full_fill = active & (depo >= pit_volume - 1.0e-6) partial = active & ~full_fill is_large = (pit_volume >= self.nl_pit_volume) & ( pit_depth >= self.nl_pit_depth ) diffuse_path = partial & is_large bottomup_path = (full_fill | (partial & ~is_large)) # Path 1+2: bottom-up delta = self._bottomUpDelta(hl, depo, bottomup_path) if bottomup_path.any(): delta = self._addPitMicroTilt(delta, bottomup_path) # Path 3: large/deep partial fill via diffusion if diffuse_path.any(): delta_diff = self._diffuseLargePit(hl, depo, diffuse_path) delta = delta + delta_diff # Apply deposit self.tmpL.setArray(delta) self.dm.localToGlobal(self.tmpL, self.tmp) self.cumED.axpy(1.0, self.tmp) self.hGlobal.axpy(1.0, self.tmp) self.dm.globalToLocal(self.cumED, self.cumEDLocal) self.dm.globalToLocal(self.hGlobal, self.hLocal) # Update soil thickness if self.cptSoil: self.updateSoilThickness() # Update stratigraphic layer parameters if self.stratNb > 0: self.deposeStrat() self.elevStrat() # In case there is other sediment type self.pitParams[:, 0] = self.pitVol.copy() return
[docs] def sedChange(self): """ This function is the main entry point to perform continental river-induced deposition. It calls the private functions: - _getSedFlux - _distributeSediment - _updateSinks """ # Find Continental Sediment Fluxes self._getSedFlux() # Compute depressions information as elevations changed due to erosion self.fillElevation(sed=True) hl = self.hLocal.getArray().copy() self.lsink = self.lsinki.copy() self.pitVol = self.pitParams[:, 0].copy() # Distribute inland sediments self.sedFilled = hl.copy() self._distributeSediment(hl) self._updateSinks(hl) return