Source code for model

import os
import sys

from mpi4py import MPI
from time import process_time

if "READTHEDOCS" not in os.environ:
    from .flow import FAMesh as _FAMesh
    from .flow import IceMesh as _IceMesh
    from .flow import PITFill as _PITFill
    from .eroder import SPL as _SPL
    from .eroder import nlSPL as _nlSPL
    from .eroder import soilSPL as _soilSPL
    from .sed import SEDMesh as _SEDMesh
    from .sed import hillSLP as _hillSLP
    from .sed import SEAMesh as _SEAMesh
    from .sed import STRAMesh as _STRAMesh
    from .tools import ReadYaml as _ReadYaml
    from .mesher import UnstMesh as _UnstMesh
    from .mesher import VoroBuild as _VoroBuild
    from .tools import GridProcess as _GridProcess
    from .mesher import Tectonics as _Tectonics
    from .tools import WriteMesh as _WriteMesh
    from .tools import Profiler as _Profiler

else:

    class _ReadYaml(object):
        def __init__(self, filename):
            print("Fake print statement for readthedocs", filename)

    class _VoroBuild(object):
        def __init__(self):
            pass

    class _UnstMesh(object):
        def __init__(self):
            pass

    class _Tectonics(object):
        def __init__(self):
            pass

    class _WriteMesh(object):
        def __init__(self):
            pass

    class _FAMesh(object):
        def __init__(self):
            pass

    class _IceMesh(object):
        def __init__(self):
            pass

    class _SPL(object):
        def __init__(self):
            pass

    class _nlSPL(object):
        def __init__(self):
            pass

    class _soilSPL(object):
        def __init__(self):
            pass

    class _PITFill(object):
        def __init__(self):
            pass

    class _SEDMesh(object):
        def __init__(self):
            pass

    class _hillSLP(object):
        def __init__(self):
            pass

    class _SEAMesh(object):
        def __init__(self):
            pass

    class _STRAMesh(object):
        def __init__(self):
            pass

    class _GridProcess(object):
        def __init__(self):
            pass

    class _Profiler(object):
        def __init__(self, *args, **kwargs):
            self.enabled = False

        def phase(self, name):
            from contextlib import nullcontext

            return nullcontext()

        def report(self, *args, **kwargs):
            return None

MPIrank = MPI.COMM_WORLD.Get_rank()
MPIsize = MPI.COMM_WORLD.Get_size()

_mpi_excepthook_installed = False


class _IndentWriter(object):
    """
    Thin stdout proxy that prepends a fixed prefix to every physical line it
    forwards. Used by ``Model._phase`` so the existing per-substep verbose
    ``print(...)`` calls scattered across the sub-modules are visually nested
    under their phase header without editing any of them. Restored to the real
    stream when the phase context exits.
    """

    def __init__(self, stream, prefix):
        self._s = stream
        self._p = prefix
        self._bol = True            # at the beginning of a line?

    def write(self, s):
        if not s:
            return
        i, n = 0, len(s)
        while i < n:
            if self._bol:
                self._s.write(self._p)
                self._bol = False
            j = s.find("\n", i)
            if j == -1:
                self._s.write(s[i:])
                break
            self._s.write(s[i : j + 1])
            self._bol = True
            i = j + 1

    def flush(self):
        self._s.flush()

    def isatty(self):
        return getattr(self._s, "isatty", lambda: False)()


def _install_mpi_abort_excepthook():
    """
    Install a ``sys.excepthook`` that ``MPI_Abort``s **every** rank on any
    uncaught exception (parallel runs only; a no-op at ``MPIsize == 1``).

    Why: without it, an exception on ONE rank — especially one raised inside a
    collective (a failed KSP solve, a PETSc collective error such as the
    PETSc-3.21 ``garbage_cleanup`` ``MPI_ERR_BUFFER`` bug, …) — leaves the OTHER
    ranks blocked in MPI forever, so the whole job **hangs** and must be killed
    by hand. ``MPI.COMM_WORLD.Abort(1)`` turns that deadlock into an immediate,
    clean termination of all ranks that still surfaces the traceback. Idempotent
    and chains to the previous hook so the traceback prints as usual first.
    """
    global _mpi_excepthook_installed
    if _mpi_excepthook_installed or MPIsize == 1:
        return
    _prev_hook = sys.excepthook

    def _hook(exc_type, exc, tb):
        try:
            _prev_hook(exc_type, exc, tb)
            sys.stderr.flush()
            sys.stdout.flush()
        finally:
            # A lone rank's exception otherwise deadlocks the others waiting in
            # a collective — terminate the entire job.
            MPI.COMM_WORLD.Abort(1)

    sys.excepthook = _hook
    _mpi_excepthook_installed = True


[docs] class Model( _ReadYaml, _WriteMesh, _UnstMesh, _VoroBuild, _GridProcess, _Tectonics, _FAMesh, _IceMesh, _SPL, _nlSPL, _soilSPL, _PITFill, _SEDMesh, _hillSLP, _SEAMesh, _STRAMesh, ): """ Instantiates model's objects and initialise classes. This object contains methods for the following operations: - initialisation of goSPL mesh based on input file options - computation of surface processes over time - cleaning/destruction of PETSC objects :arg filename: YAML input file :arg verbose: output flag for model main functions :arg showlog: output flag for PETSC logging file """ def __init__( self, filename, verbose=True, showlog=False, profile=False, *args, **kwargs ): self.showlog = showlog # Make any uncaught exception abort ALL ranks instead of deadlocking the # job (parallel runs only). Installed first so even an init-time failure # terminates cleanly. See _install_mpi_abort_excepthook. _install_mpi_abort_excepthook() self.modelRunTime = process_time() self.verbose = verbose # Announce the goSPL version first thing in a verbose run (rank 0). if self.verbose and MPIrank == 0: from gospl import __version__ print("goSPL version %s" % __version__, flush=True) # Read input dataset _ReadYaml.__init__(self, filename) # Wall-clock phase profiler. Enabled by the `profile` kwarg OR the # YAML `output: profile: true` flag (parsed in _readOut). When off it # is a zero-overhead no-op. See gospl/tools/profiler.py. prof_on = profile or bool(getattr(self, "profileFlag", False)) self.profiler = _Profiler(enabled=prof_on) # Stratigraphy initialisation _STRAMesh.__init__(self) # Define voronoi mesh _VoroBuild.__init__(self) # Define unstructured mesh _UnstMesh.__init__(self) # Initialise output mesh _WriteMesh.__init__(self) # River flow initialisation _FAMesh.__init__(self, *args, **kwargs) # Ice flow initialisation _IceMesh.__init__(self, *args, **kwargs) # SPL initialisation _SPL.__init__(self, *args, **kwargs) # Non-linear SPL initialisation _nlSPL.__init__(self, *args, **kwargs) # Non-linear SPL with soil generation initialisation _soilSPL.__init__(self, *args, **kwargs) # Pit filling initialisation _PITFill.__init__(self, *args, **kwargs) # Continental sediment transfer initialisation _SEDMesh.__init__(self, *args, **kwargs) # Hillslope processes (linear and non-linear) initialisation _hillSLP.__init__(self, *args, **kwargs) # Marine sediment transport and deposition initialisation _SEAMesh.__init__(self, *args, **kwargs) # Define additional grid processes (flexure, orographic rain) _GridProcess.__init__(self) # Get external forces driving landscape dynamics over time (tectonic, rainfall...) _UnstMesh.applyForces(self) # Initialise horizontal tectonics forcings (multiple advection techniques) _Tectonics.__init__(self) # Check if simulation just restarted if self.rStep > 0: _WriteMesh.readData(self) if not self.fast: # Compute flow accumulation _FAMesh.flowAccumulation(self) if MPIrank == 0: print( "--- Initialisation Phase (%0.02f seconds)" % (process_time() - self.modelRunTime), flush=True, ) return def _phase(self, tag, detail=""): """ Verbose grouping context manager: print a ``[tag]`` header (rank 0, ``-v`` only) and indent every ``print(...)`` emitted inside it, so the per-substep timings from the sub-modules read as a nested block under their phase. A zero-cost no-op when not verbose or off rank 0. Restores ``sys.stdout`` on exit (even on exception). """ from contextlib import contextmanager @contextmanager def _ctx(): if MPIrank != 0 or not getattr(self, "verbose", False): yield return print(" [%s]%s" % (tag, (" " + detail) if detail else ""), flush=True) old = sys.stdout sys.stdout = _IndentWriter(old, " ") try: yield finally: sys.stdout = old return _ctx()
[docs] def runProcesses(self): """ Runs simulation over time. This function contains methods for the following operations: - computes flow accumulation based on imposed precipitation field - performs land surface evolution from river erosion, transport and deposition - executes creep processes and marine depostion (linear and non-linear diffusion) - records stratigraphic layers evolution and associated porosity variations - applies user-defined tectonics forcing (horizontal and vertical displacements) """ runStart = MPI.Wtime() while self.tNow <= self.tEnd: tstep = process_time() # Flexure interval counter (see GridProcess + flex_interval): the # load reference is snapshotted at interval starts (in the eroder) # and flexure applied at interval ends below. if self.flexOn: self.flexCount += 1 # Verbose step banner (rank 0, -v): the per-phase output below is # grouped under tagged headers and indented (see `_phase`). if MPIrank == 0 and self.verbose: print( "=== %.0f yr ==================================" % self.tNow, flush=True, ) # Output time step with self.profiler.phase("tectonics"): _Tectonics.updatePaleoZ(self) with self.profiler.phase("output"): _WriteMesh.visModel(self) if self.tNow == self.tEnd: break # Create new stratal layer newLayer = self.tNow >= self.saveStrat if newLayer: self.stratStep += 1 self.saveStrat += self.strat # Perform advection and tectonics with self.profiler.phase("tectonics"), self._phase("tectonics"): _Tectonics.getTectonics(self) if not self.fast: if self.iceOn: # Compute ice accumulation with self.profiler.phase("ice"), self._phase("ice"): _IceMesh.iceAccumulation(self) # Compute flow accumulation with self.profiler.phase("flow"), self._phase("flow"): _FAMesh.flowAccumulation(self) # Perform River Incision/Deposition based on Stream Power Law (different flavors) with self.profiler.phase("erosion"), self._phase("erosion"): if self.cptSoil: # Non-linear coupled to soil production _soilSPL.erodepSPLsoil(self) elif self.spl_n == 1.0: # Linear slope dependencies _SPL.erodepSPL(self) else: # Non-linear slope dependencies _nlSPL.erodepSPLnl(self) # Glacial till: abrasion-produced till transported by ice and # deposited (melt-out) as moraine in the ablation zone (glacial # till only; no-op otherwise). Done before fluvial deposition # so the moraine is reworked by meltwater/rivers this step. if self.iceOn: with self.profiler.phase("till"), self._phase("glacial-till"): _IceMesh.glacialTill(self) if not self.nodep: # Downstream sediment deposition over the continents with self.profiler.phase("flow"), self._phase("flow"): _FAMesh.flowAccumulation(self) with self.profiler.phase("sed"), self._phase("sediment"): _SEDMesh.sedChange(self) if self.seaDepo: # Downstream sediment deposition in marine environments with self.profiler.phase("sea"), self._phase("marine"): _SEAMesh.seaChange(self) # Hillslope diffusion (linear and non-linear) with self.profiler.phase("hillslope"), self._phase("hillslope"): _hillSLP.getHillslope(self) # Per-step dual-lithology + provenance status lines ([dual] / # [prov]; verbose only, no-op when each is off). Keeps the # sand/mud balance and source-class mix visible throughout the # run, not just in the one-off init summaries. _STRAMesh.logDualState(self) _STRAMesh.logProvState(self) if newLayer: # Stratigraphic layer porosity and thicknesses under compaction with self.profiler.phase("strat"), self._phase("compaction"): _STRAMesh.getCompaction(self) # Apply flexural isostasy (local and global). With flex_interval > 1 # the load accumulates and flexure is solved only at interval ends # (interval = 1 → every step, the default/unchanged behaviour). if self.flexOn and ( self.flexCount % self.flex_interval == self.flex_interval - 1 ): with self.profiler.phase("flexure"), self._phase("flexure"): _GridProcess.applyFlexure(self) # Update tectonic, sea-level & climatic conditions if self.tNow < self.tEnd: with self.profiler.phase("forcing"), self._phase("forcing"): _UnstMesh.applyForces(self) # Advance time self.tNow += self.dt if MPIrank == 0: print( "--- step done in %0.02f s (now %d yr)" % (process_time() - tstep, self.tNow), flush=True, ) # Cross-rank wall-clock profile + machine-readable profile.json # (collective; a no-op when profiling is disabled). self.profiler.report(self.outputDir, total_wall=MPI.Wtime() - runStart) return
[docs] def destroy(self): """ Destroy PETSc DMPlex objects and associated Petsc local/global Vectors and Matrices. Safely quit model. """ # When `showlog` is set, dump the PETSc Log summary (KSP/SNES/TS solver # timings, flop counts, MPI reductions) so solver-level performance is # captured alongside the wall-clock phase profile. Best-effort: a # missing viewer/log API must not break a clean shutdown. if getattr(self, "showlog", False) and getattr(self, "log", None) is not None: try: import petsc4py viewer = petsc4py.PETSc.Viewer().createASCII( os.path.join(self.outputDir, "petsc_log.txt"), comm=petsc4py.PETSc.COMM_WORLD, ) petsc4py.PETSc.Log.view(viewer) viewer.destroy() except Exception: pass _UnstMesh.destroy_DMPlex(self) return