Source code for flow.flowplex

import os
import gc
import sys
import petsc4py
import numpy as np
import numpy_indexed as npi

from mpi4py import MPI
from time import process_time

if "READTHEDOCS" not in os.environ:
    from gospl._fortran import mfdreceivers

MPIrank = petsc4py.PETSc.COMM_WORLD.Get_rank()
MPIcomm = petsc4py.PETSc.COMM_WORLD


class FAMesh(object):
    """
    This class calculates **drainage area** in an implicit, iterative manner using PETSc solvers. It accounts for multiple flow direction paths (SFD to MFD) based on user input declaration.

    .. note::

        The class follows the parallel approach described in `Richardson et al., 2014 <https://agupubs.onlinelibrary.wiley.com/doi/full/10.1002/2013WR014326>`_.
    """

    def __init__(self, *args, **kwargs):
        """
        The initialisation of `FAMesh` class consists in the declaration of PETSc vectors and matrices.
        """

        # KSP solver parameters
        self.rtol = 1.0e-10

        # Cached KSP/PC objects: created lazily on first solve and reused
        # across timesteps so we avoid the create/destroy churn (~5-10ms per
        # solve). PETSc auto-detects matrix changes via setOperators and
        # rebuilds the preconditioner factor when necessary.
        self._ksp_main = None
        self._ksp_fallback = None

        # Identity matrix construction
        self.II = np.arange(0, self.lpoints + 1, dtype=petsc4py.PETSc.IntType)
        self.JJ = np.arange(0, self.lpoints, dtype=petsc4py.PETSc.IntType)
        self.iMat = self._matrix_build_diag(np.ones(self.lpoints))

        # Petsc vectors
        self.fillFAL = self.hLocal.duplicate()
        self.FAG = self.hGlobal.duplicate()
        self.FAL = self.hLocal.duplicate()

        return

[docs] def _matrix_build(self, nnz=(1, 1)): """ Creates a sparse PETSc matrix. .. note:: To achieve good performance during matrix assembly, the function preallocates the matrix storage. :arg nnz: array containing the number of nonzeros in the various rows :return: sparse PETSc matrix """ matrix = petsc4py.PETSc.Mat().create(comm=MPIcomm) matrix.setType("aij") matrix.setSizes(self.sizes) matrix.setLGMap(self.lgmap_row, self.lgmap_col) matrix.setFromOptions() matrix.setPreallocationNNZ(nnz) return matrix
[docs] def _matrix_build_diag(self, V, nnz=(1, 1)): """ Builds a PETSc diagonal matrix based on a given array `V` :arg V: diagonal data array :arg nnz: array containing the number of nonzero blocks :return: sparse PETSc matrix """ matrix = self._matrix_build() # Define diagonal matrix matrix.assemblyBegin() matrix.setValuesLocalCSR( self.II, self.JJ, V, ) matrix.assemblyEnd() return matrix
def _make_reasons(self, reasons): """ Provides reasons for PETSc error... """ return dict( [(getattr(reasons, r), r) for r in dir(reasons) if not r.startswith("_")] ) def _solve_KSP2(self, matrix, vector1, vector2): """ Solution of Krylov subspace iterative method (PETSc *scalable linear equations solvers* - **KSP**) implemented using the Flexible Generalized Minimal Residual method (`fgmres`) with Additive Schwarz preconditioning (`asm`). .. note:: This function is used if the KSP convergence failed using the PETSc Richardson solver with block Jacobian preconditioning. :arg matrix: PETSc sparse matrix used by the KSP solver composed of diagonal terms set to unity (identity matrix) and off-diagonal terms (weights between 0 and 1). The weights are calculated based on the number of downslope neighbours (*i.e.*, user-defined number of flow directions) and are proportional to the slope. :arg vector1: PETSc vector corresponding to the local volume of water available for runoff during a given time step (*e.g.* voronoi area times local precipitation rate). :arg vector2: PETSc vector corresponding to the unknown flow discharge values. :return: vector2 PETSc vector of the new flow discharge values """ if self._ksp_fallback is None: ksp = petsc4py.PETSc.KSP().create(petsc4py.PETSc.COMM_WORLD) ksp.setType("fgmres") ksp.getPC().setType("asm") ksp.setTolerances(rtol=1.0e-6, divtol=1.e20) ksp.setInitialGuessNonzero(True) self._ksp_fallback = ksp ksp = self._ksp_fallback ksp.setOperators(matrix, matrix) ksp.solve(vector1, vector2) r = ksp.getConvergedReason() if r < 0: KSPReasons = self._make_reasons(petsc4py.PETSc.KSP.ConvergedReason()) if MPIrank == 0: print( "LinearSolver failed to converge after iterations", ksp.getIterationNumber(), flush=True, ) print("with reason: ", KSPReasons[r], flush=True) # If the fgmres+asm fallback also diverges, return a zero discharge # for this step. Operators should monitor the warning above. vector2.set(0.0) petsc4py.PETSc.garbage_cleanup() return vector2
[docs] def _solve_KSP(self, guess, matrix, vector1, vector2): """ PETSc *scalable linear equations solvers* (**KSP**) component provides Krylov subspace iterative method and a preconditioner. Here, flow accumulation solution is obtained using PETSc Richardson solver (`richardson`) with block Jacobian preconditioning (`bjacobi`). .. note:: The solver choice was made based on the convergence results from `Richardson et al. (2014) <https://agupubs.onlinelibrary.wiley.com/doi/full/10.1002/2013WR014326>`_ but can be changed if better solver and preconditioner combinations are found. Using such iterative method allows for an initial guess to be provided. When this initial guess is close to the solution, the number of iterations required for convergence dramatically decreases. Here the flow discharge solution from previous time step can be passed as an initial `guess` to the solver as discharge often exhibits little change between successive time intervals. :arg guess: Boolean specifying if the iterative KSP solver initial guess is nonzero (when provided it corresponds to the previous flow discharge values). :arg matrix: PETSc sparse matrix used by the KSP solver composed of diagonal terms set to unity (identity matrix) and off-diagonal terms (weights between 0 and 1). The weights are calculated based on the number of downslope neighbours (based on the chosen number of flow direction directions) and are proportional to the slope. :arg vector1: PETSc vector corresponding to the local volume of water available for runoff during a given time step (*e.g.* voronoi area times local precipitation rate). :arg vector2: PETSc vector corresponding to the unknown flow discharge values. :return: vector2 PETSc vector of the new flow discharge values """ if self._ksp_main is None: ksp = petsc4py.PETSc.KSP().create(petsc4py.PETSc.COMM_WORLD) ksp.setType("richardson") ksp.getPC().setType("bjacobi") ksp.setTolerances(rtol=self.rtol) self._ksp_main = ksp ksp = self._ksp_main if guess: ksp.setInitialGuessNonzero(True) ksp.setOperators(matrix, matrix) ksp.solve(vector1, vector2) r = ksp.getConvergedReason() if r < 0: vector2 = self._solve_KSP2(matrix, vector1, vector2) petsc4py.PETSc.garbage_cleanup() return vector2
[docs] def matrixFlow(self, flowdir, dep=None): """ This function defines the flow direction matrices. .. note:: The matrix is built incrementally looping through the number of flow direction paths defined by the user. It proceeds by assembling a local Compressed Sparse Row (**CSR**) matrix to a global PETSc matrix. When setting up the flow matrix in PETSc, we preallocate the non-zero entries of the matrix before starting filling in the values. Using PETSc sparse matrix storage scheme has the advantage that matrix-vector multiplication is extremely fast. The matrix coefficients consist of weights (comprised between 0 and 1) and calculated based on the number of downslope neighbours and proportional to the slope. :arg flow: boolean to compute matrix for either downstream water or sediment transport :arg dep: deposition flux coefficient in case where the sediment transport/deposition term is considered. """ self.fMat = self.iMat.copy() indptr = np.arange(0, self.lpoints + 1, dtype=petsc4py.PETSc.IntType) nodes = indptr[:-1] if dep is None: wght = self.wghtVal else: wght = np.multiply(self.wghtVal, dep.reshape((len(dep), 1))) rcv = self.rcvID for k in range(0, flowdir): # Flow direction matrix for a specific direction tmpMat = self._matrix_build() data = -wght[:, k].copy() data[rcv[:, k].astype(petsc4py.PETSc.IntType) == nodes] = 0.0 tmpMat.assemblyBegin() tmpMat.setValuesLocalCSR( indptr, rcv[:, k].astype(petsc4py.PETSc.IntType), data, ) tmpMat.assemblyEnd() # Add the weights from each direction self.fMat.axpy(1.0, tmpMat) tmpMat.destroy() if self.memclear: del data, indptr, nodes gc.collect() # Store flow accumulation matrix self.fMat.transpose() petsc4py.PETSc.garbage_cleanup() return
[docs] def _buildFlowDirection(self, h, down=True): """ This function builds from neighbouring slopes the flow directions. It calls a fortran subroutine that locally computes for each vertice: - the indices of receivers (downstream) nodes depending on the desired number of flow directions (SFD to MFD). - the distances to the receivers based on mesh resolution. - the associated weights calculated based on the number of receivers and proportional to the slope. :arg h: elevation in the form of a Numpy Array :arg down: boolean to indicate whether the filled elevation needs to be considered or not. """ # Get open marine regions self.seaID = np.where(self.lFill <= self.sealevel)[0] # Define multiple flow directions for unfilled elevation self.donRcvs, self.distRcv, self.wghtVal = mfdreceivers( self.flowDir, self.flowExp, h, self.sealevel ) self.rcvID = self.donRcvs.copy() self.rcvID[self.ghostIDs, :] = -1 self.distRcv[self.ghostIDs, :] = 0 self.wghtVal[self.ghostIDs, :] = 0 if down: sum_weight = np.sum(self.wghtVal, axis=1) ids = ( (h == self.lFill) & (self.pitIDs > -1) & (self.flatDirs > -1) & (sum_weight == 0.0) ) ids = ids.nonzero()[0] self.rcvID[ids, :] = np.tile(ids, (self.flowDir, 1)).T self.rcvID[ids, 0] = self.flatDirs[ids] self.wghtVal[ids, :] = 0.0 self.wghtVal[ids, 0] = 1.0 # Set borders nodes if self.flatModel: self.rcvID[self.idBorders, :] = np.tile(self.idBorders, (self.flowDir, 1)).T self.distRcv[self.idBorders, :] = 0.0 self.wghtVal[self.idBorders, :] = 0.0 # Get local nodes with no receivers as boolean array sum_weight = np.sum(self.wghtVal, axis=1) lsink = sum_weight == 0.0 # We don't consider open sea nodes and borders as sinks lsink[self.idBorders] = False lsink[self.seaID] = False lsink = lsink.astype(int) * self.inIDs self.lsink = lsink == 1 self.matrixFlow(self.flowDir) return
def _potentialLakeEvap(self): """ Per-pit max evaporation volume (m^3) for one timestep, assuming the lake fills to the spillover (= max-fill surface). Surface = Σ larea over cells with pitIDs >= 0 (the lake's potential extent at spillover). The `inIDs == 1` mask prevents MPI halo double-counting; the per-pit total is then Allreduce-summed across ranks so every rank ends up with the same evap budget array. See DESIGN_EVAPORATION.md §1 D3 for the design rationale (we use max-fill rather than current-fill to avoid the circular dependency between fill level and evap rate). :return: numpy array of shape (nbpits,) with per-pit evap m^3 """ out = np.zeros(len(self.pitParams), dtype=np.float64) if self.evapVal is None: return out isPitCell = (self.pitIDs >= 0) & (self.inIDs == 1) if isPitCell.any(): cellEvap = self.evapVal * self.larea * self.dt grp = npi.group_by(self.pitIDs[isPitCell]) uIDs = grp.unique _, vol = grp.sum(cellEvap[isPitCell]) ids = uIDs > -1 out[uIDs[ids]] = vol[ids] MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, out, op=MPI.SUM) return out
[docs] def _distributeDownstream(self, pitVol, FA, hl, step, ice=False): """ In cases where rivers flow in depressions, they might fill the sink completely and overspill or remain within the depression, forming a lake. This function computes the excess of water (if any) able to flow dowstream. .. important:: The excess water is then added to the downstream flow accumulation (`FA`) and used to estimate rivers' erosion. :arg pitVol: volume of depressions :arg FA: excess flow accumulation array :arg hl: current elevation array :arg step: downstream distribution step :arg ice: boolean indicating where the ice flow is considered or not. :return: pitVol, excess, nFA (updated volume in each depression, boolean set to True is excess flow remains to be distributed and new flow accumulation values) """ excess = False # Remove points belonging to other processors FA = np.multiply(FA, self.inIDs) # Get volume incoming in each depression grp = npi.group_by(self.pitIDs[self.lsink]) uID = grp.unique _, vol = grp.sum(FA[self.lsink]) inV = np.zeros(len(self.pitParams), dtype=np.float64) ids = uID > -1 inV[uID[ids]] = vol[ids] # Combine incoming volume globally MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, inV, op=MPI.SUM) # Lake-surface evaporation (DESIGN_EVAPORATION.md §2.2). Subtract # the per-pit max-fill evap budget from inflow exactly once per # outer step (step==0). Cascading spillover iterations (step>0) # carry already-net-of-evap water and must NOT be debited again. # When the evap budget exceeds inflow, lakeLoss is clamped at inV # so the pit ends with zero net inflow — the partial-fill branch # below skips it via the inV>0 mask, leaving waterFilled at hl. if self.evapVal is not None and step == 0: evapBudget = self._potentialLakeEvap() lakeLoss = np.minimum(inV, evapBudget) inV = inV - lakeLoss self.evapLoss += lakeLoss.sum() # Get excess volume to distribute downstream eV = inV - pitVol if (eV > 0.0).any(): eIDs = eV > 0.0 pitVol[eIDs] = 0.0 spillIDs = self.pitInfo[eIDs, 0] localSpill = np.where(self.pitInfo[eIDs, 1] == MPIrank)[0] localPts = spillIDs[localSpill] nFA = np.zeros(self.lpoints, dtype=np.float64) nFA[localPts] = eV[eIDs][localSpill] ids = np.isin(self.pitIDs, np.where(eV > 0.0)[0]) self.waterFilled[ids] = self.lFill[ids] # Update unfilled depressions volumes and assign water level in depressions # NOTE: the (inV > 0.0) clause excludes pits where lake-evap fully # consumed the inflow (inV becomes 0 after the subtraction above). # Without it, eV = -pitVol drives pitVol → 0 here, incorrectly # marking a "dry" depression as filled-to-spillover. if (eV < 0.0).any(): eIDs = np.where((eV < 0.0) & (inV > 0.0))[0] pitVol[eIDs] += eV[eIDs] nid = np.absolute(self.filled_vol[eIDs] - pitVol[eIDs][:, None]).argmin( axis=1 ) fill_lvl = self.filled_lvl[eIDs, nid] # Vectorised replacement of the per-pit Python loop. Build a # pit-id → fill-level lookup and apply it in one pass over nodes. in_eIDs = np.isin(self.pitIDs, eIDs) pit_fill_lvl = np.zeros(len(self.pitParams)) pit_fill_lvl[eIDs] = fill_lvl node_lvl = pit_fill_lvl[self.pitIDs] mask = in_eIDs & (self.waterFilled <= node_lvl) self.waterFilled[mask] = node_lvl[mask] # In case there is still remaining water flux to distribute downstream if (eV > 1.0e-3).any(): # TODO-REFACTOR: value matches DEPOSIT_FLOOR but distinct role (water-routing convergence threshold); do not replace if step == 100: self.fMat.destroy() self._buildFlowDirection(self.lFill) else: self.fMat.destroy() self._buildFlowDirection(self.waterFilled) self.tmpL.setArray(nFA / self.dt) self.dm.localToGlobal(self.tmpL, self.tmp) # Skip the KSP solve for negligible residual fluxes: comparing # total residual flux (m^3/yr) against the largest cell area is a # cheap "less than 1 m of water over the biggest cell" guard. if self.tmp.sum() > self.maxarea[0]: excess = True self._solve_KSP(True, self.fMat, self.tmp, self.tmp1) self.dm.globalToLocal(self.tmp1, self.tmpL) nFA = self.tmpL.getArray().copy() * self.dt FA = nFA.copy() FA[hl < self.waterFilled] = 0.0 self.tmpL.setArray(FA / self.dt) if ice: self.iceFAL.axpy(1.0, self.tmpL) else: self.FAL.axpy(1.0, self.tmpL) else: nFA = None else: nFA = None return excess, pitVol, nFA
[docs] def flowAccumulation(self): """ This function is the **main entry point** for flow accumulation computation. .. note:: Flow accumulation (`FA`) calculations are a core component of landscape evolution models as they are often used as proxy to estimate flow discharge, sediment load, river width, bedrock erosion, and sediment deposition. Until recently, goSPL model computes the flow discharge from `FA` and the net precipitation rate using a **parallel implicit drainage area (IDA) method** proposed by `Richardson et al., 2014 <https://agupubs.onlinelibrary.wiley.com/doi/full/10.1002/2013WR014326>`_ but adapted to unstructured grids. It calls the following *private functions*: 1. _buildFlowDirection 2. _solve_KSP 3. _distributeDownstream """ t0 = process_time() # Compute depressions information self.fillElevation(sed=False) pitVol = self.pitParams[:, 0].copy() # Build flow direction and downstream matrix hl = self.hLocal.getArray().copy() self._buildFlowDirection(hl, False) self.wghtVali = self.wghtVal.copy() self.rcvIDi = self.rcvID.copy() self.distRcvi = self.distRcv.copy() self.fMati = self.fMat.copy() self.lsinki = self.lsink.copy() # Get amount of water or ice rainA = self.bL.getArray().copy() rainA[self.seaID] = 0. # Channel evaporation (DESIGN_EVAPORATION.md §2.1). evapVal is m/yr, # larea is m^2, rainA is m^3/yr — same units. channelLoss is clamped # at the available rain so an arid cell cannot produce negative # runoff; the unused evap capacity is silently dropped (the # groundwater path is out of scope). if self.evapVal is not None: channelLoss = np.minimum(rainA, self.evapVal * self.larea) rainA = rainA - channelLoss self.evapLoss += channelLoss.sum() * self.dt if self.iceOn: elaH = self.elaH(self.tNow) iceH = self.iceH(self.tNow) tmp = (hl - elaH) / (iceH - elaH) tmp[tmp > 1.] = 1.0 tmp[tmp < 0.] = 0.0 rainA = np.multiply(rainA, 1. - tmp) # Re-inject glacial meltwater captured during iceAccumulation: # sub-ELA cells with ice present release the local ablation # rate as liquid water into the river source. Without this, # melt computed in the ice solve is discarded by the negative # clamp on iceFAL and downstream basins under-predict # discharge. rainA = rainA + self.iceMeltL.getArray() rainA[self.seaID] = 0. # Solve flow/ice accumulation self.bL.setArray(rainA) self.dm.localToGlobal(self.bL, self.bG) self._solve_KSP(True, self.fMat, self.bG, self.FAG) self.dm.globalToLocal(self.FAG, self.FAL) # Volume of water flowing downstream self.waterFilled = hl.copy() if (pitVol > 0.0).any(): FA = self.FAL.getArray().copy() * self.dt excess = True step = 0 while excess: t1 = process_time() excess, pitVol, FA = self._distributeDownstream(pitVol, FA, hl, step) if MPIrank == 0 and self.verbose: print( "Downstream flow computation step %d (%0.02f seconds)" % (step, process_time() - t1), flush=True, ) step += 1 # Get overall water flowing donwstream accounting for filled depressions FA = self.FAL.getArray().copy() ids = (hl <= self.waterFilled) & (self.pitIDs > -1) FA[ids] = 0.0 self.FAL.setArray(FA) self.dm.localToGlobal(self.FAL, self.FAG) self.dm.globalToLocal(self.FAG, self.FAL) # Lake-erosion proxy: assign a background discharge (10% of the # global max) at filled-depression nodes so that SPL still produces # some erosion on the filled topography. FAL itself stays zero # there because no water actually flows. FA[ids] = self.FAG.max()[1] * 0.1 self.fillFAL.setArray(FA) else: self.FAL.copy(result=self.fillFAL) # Get water level self.waterFilled -= hl if MPIrank == 0 and self.verbose: print( "Compute Flow Accumulation (%0.02f seconds)" % (process_time() - t0), flush=True, ) return