import os
import gc
import sys
import petsc4py
import numpy as np
import pandas as pd
import numpy_indexed as npi
from mpi4py import MPI
from time import process_time
from gospl.tools.constants import GRAPH_OUTLIER_CAP, MISSING_DATA_SENTINEL
if "READTHEDOCS" not in os.environ:
from gospl._fortran import edge_tile
from gospl._fortran import fill_tile
from gospl._fortran import fill_edges
from gospl._fortran import fill_dir
from gospl._fortran import nghb_dir
from gospl._fortran import getpitvol
from gospl._fortran import pits_cons
from gospl._fortran import fill_depressions
from gospl._fortran import graph_nodes
from gospl._fortran import combine_edges
from gospl._fortran import label_pits
from gospl._fortran import sort_ids
MPIrank = petsc4py.PETSc.COMM_WORLD.Get_rank()
MPIsize = petsc4py.PETSc.COMM_WORLD.Get_size()
[docs]
class PITFill(object):
"""
Depression filling is an important preconditioning step to many landscape evolution models.
This class implements a linearly-scaling parallel priority-flood depression-filling algorithm based on `Barnes (2016) <https://arxiv.org/pdf/1606.06204.pdf>`_ algorithm.
.. note::
Unlike previous algorithms, `Barnes (2016) <https://arxiv.org/pdf/1606.06204.pdf>`_ approach guarantees a fixed number of memory access and communication events per processors.
The approach proposed in goSPL handles irregular meshes, allowing for complex distributed meshes to be used as long as a clear definition of inter-mesh connectivities is available. It also creates directions over flat regions allowing for downstream flows in cases where the entire volume of a depression is filled.
For inter-mesh connections and message passing, the approach relies on PETSc DMPlex functions.
**Parallel (MPI) structure — the three Barnes (2016) phases:**
1. *Per-tile flood (parallel).* Each rank priority-floods its own partition
(``fill_tile``) and emits a small spillover graph linking its local
depression labels by their minimum spill elevation (``graph_nodes`` /
``combine_edges``).
2. *Global graph solve (serial, rank 0).* The per-rank graphs are gathered,
labels describing the same basin across a partition border are unified
(:meth:`_transferIDs` → :meth:`_unifyLabels`), and a single priority-flood
on the combined graph (:meth:`_fillFromEdges` → ``fill_edges``) computes
each depression's final water level. The graph scales with the partition
*perimeter* (``O(sqrt(N))`` per tile), so — as in Barnes (2016) — this
master solve is intentionally serial and cheap even on large meshes.
3. *Apply (parallel).* Each rank raises its cells to the per-label water
levels (``fill_depressions``).
.. note::
Cross-rank label unification (:meth:`_unifyLabels`) is a single
deterministic **union-find** pass that collapses each connected component
of the equivalence graph to its minimum label — so the depression
identifiers (and the filled surface) are **independent of the MPI
partition**. The serial graph solve uses a compact (densely re-indexed)
label space and a binary-heap priority queue, so its cost tracks the
number of distinct spillover basins rather than the rank-dependent raw
(globally-offset) label range.
The main functions return the following parameters:
- the elevation of the filled surface,
- the information for each depression (e.g., a unique global ID, its spillover local points and related processor),
- the description of each depression (total volume and maximum filled depth).
"""
def __init__(self, *args, **kwargs):
"""
The initialisation of `PITFill` class consists in the declaration of PETSc vectors, matrices and each partition internals edge vertices.
"""
# Petsc vectors. Domain spill-over outlets are the DRAINING borders
# (`outletIDs` = open/fixed edges); true-wall edges are excluded so a
# depression touching a wall fills and is contained rather than spilling
# out of the domain. (`idLBounds` are the parallel partition-local
# boundaries — unrelated to the physical domain boundary — and always
# participate in the spill graph.)
edges = -np.ones((self.lpoints, 2), dtype=int)
edges[self.idLBounds, 0] = self.idLBounds
edges[self.outletIDs, 0] = self.outletIDs
edges[self.idLBounds, 1] = 0
edges[self.outletIDs, 1] = 1
self.borders = edges
self.outEdges = np.zeros(self.lpoints, dtype=int)
self.outEdges[self.ghostIDs] = 1
return
[docs]
def _buildPitDataframe(self, label1, label2):
"""
Definition of a Pandas Dataframe used to find unique pit ID across processors.
:arg label1: depression ID in a given processor
:arg label2: same depression ID in a neighbouring mesh
:return: df (sorted Dataframe of pit ID between processors)
"""
data = {
"p1": label1,
"p2": label2,
}
df = pd.DataFrame(data, columns=["p1", "p2"])
df = df.drop_duplicates().sort_values(["p2", "p1"], ascending=(False, False))
return df
[docs]
def _sortingPits(self, df):
"""
Sorts depressions number before combining them to ensure no depression index is changed in an unsorted way.
:arg df: Pandas Dataframe containing depression numbers which have to be combined.
:return: df sorted pandas dataframe containing depression numbers.
.. note::
Superseded by :meth:`_unifyLabels` (union-find). Kept for reference
/ the standalone sort_ids kernel; no longer on the hot path.
"""
df["p2"] = sort_ids(df["p1"].values.astype(int), df["p2"].values.astype(int))
df = df.drop_duplicates().sort_values(["p2", "p1"], ascending=(False, False))
return df
[docs]
def _unifyLabels(self, df, label):
"""
Collapse cross-processor depression-label equivalence pairs into one
canonical id per connected component using union-find (disjoint-set),
then apply the mapping to ``label``.
``df`` holds the global equivalence pairs ``(p1, p2)`` with ``p1 < p2``
(two labels that are the same depression across a tile border). Each
connected component is collapsed to its MINIMUM label by keeping the
smaller label as the set root — matching the previous iterative
``sort_ids`` fixpoint, but in a single deterministic O(N α(N)) pass.
Determinism (root = min, independent of pair order) is what makes the
result identical regardless of the domain partition.
:arg df: Dataframe of equivalence pairs, columns ``p1`` < ``p2``.
:arg label: local depression-label array (globally-offset ids).
:return: ``label`` with every label remapped to its component minimum.
"""
pairs = df[["p1", "p2"]].values.astype(np.int64)
# Union-find over the (sparse, globally-offset) labels that appear in a
# pair. Most mesh labels never cross a border, so the structure is small.
parent = {}
def find(x):
root = x
while parent[root] != root:
root = parent[root]
while parent[x] != root: # path compression
parent[x], x = root, parent[x]
return root
for a, b in pairs:
a, b = int(a), int(b)
if a not in parent:
parent[a] = a
if b not in parent:
parent[b] = b
ra, rb = find(a), find(b)
if ra != rb:
# smaller label becomes the root → component minimum is canonical
if ra < rb:
parent[rb] = ra
else:
parent[ra] = rb
if not parent:
return label
# Vectorised remap: only labels appearing in a pair change; all others
# (the vast majority, plus the -1 border sentinel) pass through.
keys = np.fromiter(sorted(parent), dtype=np.int64, count=len(parent))
vals = np.fromiter((find(int(k)) for k in keys), dtype=np.int64,
count=len(keys))
idx = np.searchsorted(keys, label)
idx_clip = np.clip(idx, 0, len(keys) - 1)
hit = keys[idx_clip] == label
out = label.copy()
out[hit] = vals[idx_clip[hit]]
return out
[docs]
def _offsetGlobal(self, lgth):
"""
Computes the offset between processors to ensure unique number for considered indices.
:arg lgth: local length of the data to distribute
:return: cumulative sum and sum of the labels to add to each processor
"""
label_offset = np.zeros(MPIsize + 1, dtype=int)
label_offset[MPIrank + 1] = lgth
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, label_offset, op=MPI.MAX)
return np.cumsum(label_offset), np.sum(label_offset)
[docs]
def _fillFromEdges(self, mgraph):
"""
Combine local meshes by joining their edges based on local spillover graphs.
:arg mgraph: Numpy Array containing local mesh edges information
:arg ggraph: Numpy Array containing filled elevation values based on other processors values
"""
# Get bidirectional edges connections (pandas drop_duplicates is
# hash-based and faster than any numpy sort-based dedup here — verified
# by micro-benchmark; this dedup is <~30 ms even at 200k edges and is
# NOT the pitgraph hot spot).
cgraph = pd.DataFrame(
mgraph, columns=["source", "target", "elev", "spill", "rank"]
)
cgraph = cgraph.sort_values("elev")
cgraph = cgraph.drop_duplicates(["source", "target"], keep="first")
cgraph = cgraph.values
# --- Pit-label compaction --------------------------------------------
# source/target are globally-offset pit labels (disjoint per rank), so
# the label space is SPARSE and its maximum grows ~linearly with the
# rank count even though the number of distinct spillover pits is far
# smaller. fill_edges sizes its O(nb) / O(nb*maxnghbs) work arrays from
# that maximum (nb = max(label)+2), so the serial solve grows with
# ranks. Relabel to a dense [0, nlab) space, solve there, then scatter
# the results back onto the global label index the downstream code
# expects. Label 0 (the global outlet, == node 1, the priority-flood
# seed) is the smallest label so it always maps to compact 0; union it
# in explicitly in case no edge references it directly. This is a pure
# relabeling: the fill result is identical to solving in global space.
nb_global = int(cgraph[:, 1].max()) + 2
src = cgraph[:, 0].astype(np.int64)
tgt = cgraph[:, 1].astype(np.int64)
uniq = np.unique(np.concatenate((src, tgt)))
if uniq[0] != 0:
uniq = np.concatenate((np.array([0], dtype=uniq.dtype), uniq))
nlab = len(uniq)
ccgraph = cgraph.copy()
ccgraph[:, 0] = np.searchsorted(uniq, src)
ccgraph[:, 1] = np.searchsorted(uniq, tgt)
c12 = ccgraph[:, :2].astype(int).ravel()
cmax = np.max(np.bincount(c12)) + 1
# Filling the bidirectional graph (serial priority-flood; timed
# separately so we can see whether the Fortran graph solve or the
# surrounding MPI Reduce/bcast dominates the serial pitgraph cost).
# nlab+1 always covers every compact label 0..nlab-1 (the +1 keeps the
# one-node slack the global formula had).
self.profiler.start("flow_pit_edges")
elev, rank, nodes, spillID = fill_edges(nlab + 1, ccgraph, cmax)
self.profiler.stop("flow_pit_edges")
# Scatter compact results back to the global label index. Labels with
# no spillover edge are absent from the compact graph; in global space
# fill_edges would have left them at its nelev sentinel (1e8), which the
# caller rewrites to MISSING_DATA_SENTINEL — reproduce that default so
# behaviour is identical. spillID is itself a (compact) label, so map it
# back to the global label too (nodes/rank are mesh ids, not labels).
ggraph = -np.ones((nb_global, 5))
ggraph[:, 0] = 1.0e8
ggraph[uniq, 0] = elev[:nlab]
ggraph[uniq, 1] = nodes[:nlab]
ggraph[uniq, 2] = rank[:nlab]
gspill = spillID[:nlab].astype(np.int64)
valid = (gspill >= 0) & (gspill < nlab)
gspill[valid] = uniq[gspill[valid]]
ggraph[uniq, 3] = gspill
if self.memclear:
del elev, nodes, rank, spillID
del src, tgt, uniq, c12, gspill, ccgraph
return ggraph
[docs]
def _transferIDs(self, pitIDs):
"""
This function transfers local depression IDs along local borders and combines them with a unique identifier.
:arg pitIDs: local depression index.
:return: number of depressions.
"""
# Define globally unique watershed index
t0 = process_time()
fillIDs = pitIDs >= 0
offset, _ = self._offsetGlobal(np.amax(pitIDs))
pitIDs[fillIDs] += offset[MPIrank]
if MPIrank == 0 and self.verbose:
print(
"Offset Global pit ids (%0.02f seconds)" % (process_time() - t0)
)
t0 = process_time()
# Transfer depression IDs along local borders
self.tmpL.setArray(pitIDs)
self.dm.localToGlobal(self.tmpL, self.tmp)
self.dm.globalToLocal(self.tmp, self.tmpL)
label = self.tmpL.getArray().copy().astype(int)
ids = label < pitIDs
df = self._buildPitDataframe(label[ids], pitIDs[ids])
ids = label > pitIDs
df2 = self._buildPitDataframe(pitIDs[ids], label[ids])
df = pd.concat([df, df2], ignore_index=True)
df = df.drop_duplicates().sort_values(["p2", "p1"], ascending=(False, False))
df = df[(df["p1"] >= 0) & (df["p2"] >= 0)]
if MPIrank == 0 and self.verbose:
print(
"Build pit dataframe (%0.02f seconds)" % (process_time() - t0)
)
t0 = process_time()
# Gather every rank's equivalence pairs onto all ranks. Allgatherv of
# the real pairs (Barnes-style) replaces a padded Allreduce(MAX) over a
# (total, 2) array — same content, no -1 padding to move. All ranks then
# run the identical union-find.
offset, total = self._offsetGlobal(len(df))
sendbuf = (
df.values.astype(np.int64).ravel()
if len(df) > 0
else np.empty(0, dtype=np.int64)
)
counts = (np.diff(offset) * 2).astype(int)
recvbuf = np.empty(int(total) * 2, dtype=np.int64)
MPI.COMM_WORLD.Allgatherv(sendbuf, [recvbuf, counts])
combIds = recvbuf.reshape(-1, 2)
df = self._buildPitDataframe(combIds[:, 0], combIds[:, 1])
df = df[(df["p1"] >= 0) & (df["p2"] >= 0)]
if MPIrank == 0 and self.verbose:
print(
"Combine pit dataframe (%0.02f seconds)" % (process_time() - t0)
)
t0 = process_time()
# Collapse the cross-processor label-equivalence pairs (p1 < p2) into a
# single canonical id per connected component with union-find. This is
# Barnes-style label unification: the canonical id is the MINIMUM label
# in each component, identical to the previous iterative sort_ids
# fixpoint but in a single deterministic pass — no convergence loop (the
# old code iterated up to 1000 times) and no partition-dependent
# ordering, which removes the instability that forced the serial path.
if len(df) > 0:
label = self._unifyLabels(df, label)
if MPIrank == 0 and self.verbose:
print(
"Sorting pits (%0.02f seconds)" % (process_time() - t0)
)
t0 = process_time()
# Transfer depression IDs along local borders
self.tmpL.setArray(label)
self.dm.localToGlobal(self.tmpL, self.tmp)
self.dm.globalToLocal(self.tmp, self.tmpL)
# At this point all pits have a unique IDs across processors
self.pitIDs = self.tmpL.getArray().astype(int)
# Lets make consecutive indices
pitnbs = np.zeros(1, dtype=int)
pitnbs[0] = np.max(self.pitIDs) + 1
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, pitnbs, op=MPI.MAX)
fillIDs = self.pitIDs >= 0
valpit = -np.ones(pitnbs[0], dtype=int)
unique, idx_groups = npi.group_by(
self.pitIDs[fillIDs], np.arange(len(self.pitIDs[fillIDs]))
)
valpit[unique] = 1
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, valpit, op=MPI.MAX)
pitNb = np.where(valpit > 0)[0]
if MPIrank == 0 and self.verbose:
print(
"Define consecutive pit ids (%0.02f seconds)" % (process_time() - t0)
)
t0 = process_time()
self.pitIDs = pits_cons(self.pitIDs, pitNb)
return pitNb
[docs]
def _dirFlats(self):
"""
This function finds routes to spillover points on filled depressions to ensure downstream distribution if they are overfilled.
"""
# Find local receivers to direct flow to spillover nodes
ids = self.pitIDs > -1
if ids.any():
pdir = fill_dir(self.lspillIDs, self.pitIDs, self.lFill)
else:
pdir = -np.ones(self.lpoints, dtype=np.int32)
# Propagate flat-region distances from the spill seeds across partition
# boundaries until the field CONVERGES. `nghb_dir` now RELAXES (lowers) a
# node's distance whenever a shorter path appears, so the only cross-
# partition coupling is the owned->ghost halo copy at the top of each
# pass: once a full pass changes no owned node anywhere (global change
# count == 0), the halo copies the same owned values, the next pass is
# identical, and the fixed point is reached. That fixed point is the true
# graph distance to the nearest spill -> PARTITION-INVARIANT.
#
# The OLD loop terminated on a `remaining` (undirected-count) stall, which
# stopped as soon as no NEW node was directed -- but the set-once kernel
# had by then frozen boundary nodes at whichever (often longer) distance
# arrived first per cut, so `ptdir` and the receivers derived from it
# depended on the partition (~0.17% of nodes on a 5.9M global mesh ->
# a partition-dependent (I - W^T) operator and a flow-KSP "un-drained
# region" that varied with rank count). Converging on relaxation removes
# that. Isolated pockets (no pit-path to any spill) stay at -1 unchanged
# and so do not block convergence; they become safe self-sinks below.
owned = self.inIDs == 1
changed = np.array([1.0])
stp = 0
while True:
pdir_prev = pdir
self.tmp.setArray(pdir[self.glIDs])
self.dm.globalToLocal(self.tmp, self.tmpL, 3)
pdir = self.tmpL.getArray().copy().astype(int)
pdir = nghb_dir(self.pitIDs, self.lFill, pdir)
# Global count of owned nodes whose distance changed this pass. The
# field is monotone (coverage only grows, distances only shrink), so
# a single zero-change pass means the fixed point is reached.
changed[0] = float(np.count_nonzero(pdir[owned] != pdir_prev[owned]))
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, changed, op=MPI.SUM)
stp += 1
if changed[0] == 0.0 or stp > 10000:
break
# Isolated filled pockets (single cells / tiny clusters that share a
# watershed's pitID+spill but have no pit-connected path to it at the
# fill level) stay undirected: they pond locally and become safe
# self-sinks below (mass conserved). Count them (owned only) so the
# magnitude stays visible; this set is now partition-invariant.
if self.verbose:
unreached = owned & (self.pitIDs > -1) & (pdir < 0)
ncnt = np.array([float(np.count_nonzero(unreached))])
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, ncnt, op=MPI.SUM)
if MPIrank == 0:
print(
"[pitfill] %d isolated flat node(s) had no pit-path to a "
"spill -> left as sinks (pond locally; mass conserved)"
% int(ncnt[0]),
flush=True,
)
# Final halo refresh so the receiver pick sees `ptdir` consistent across
# partition boundaries (the last nghb_dir advanced owned nodes only,
# leaving ghost directions one pass stale).
self.tmp.setArray(pdir[self.glIDs])
self.dm.globalToLocal(self.tmp, self.tmpL, 3)
pdir = self.tmpL.getArray().copy().astype(int)
# Define receiver nodes on each depression — PARTITION-INVARIANT.
#
# Each flat node routes to a neighbour strictly closer to the spill
# (smaller `pdir`, not higher: lFill[c] <= lFill[i]). `pdir` is itself
# partition-invariant, but in a flat region MANY neighbours tie at the
# minimum distance, and the fortran `fill_rcvs` broke that tie by
# "first in FVnID neighbour order" — which is partition-DEPENDENT (local
# numbering/halo), so the same node picked a different (equally-valid)
# receiver at different rank counts (~0.5% of nodes on a 370k earth
# mesh). That made the assembled (I - W^T) operator partition-dependent
# → the near-singular pinches / KSP failures that varied with the cut.
#
# We resolve the tie on `locIDs` (the input-mesh id), which IS
# partition-invariant (unlike `self.gid`, PETSc's per-partition
# numbering). Among the strictly-closer, not-higher neighbours we pick
# the smallest `pdir`, ties broken by smallest `locIDs` — identical
# across any decomposition. Replaces the fortran `fill_rcvs` tie-break;
# non-tied cases are unchanged.
ngb = self.FVmesh_ngbID # (n, K) local ids, -1 pad
n = self.lpoints
valid = ngb >= 0
cidx = np.where(valid, ngb, 0)
cpd = pdir[cidx]
ch = self.lFill[cidx]
clid = self.locIDs[cidx].astype(np.int64)
cand = (
valid
& (cpd > -1)
& (cpd < pdir[:, None])
& (ch <= self.lFill[:, None])
& (self.pitIDs[:, None] > -1)
)
BIG = np.iinfo(np.int64).max
nid = np.int64(self.locIDs.max()) + 1 # lexicographic (pdir, locID)
comp = np.where(cand, cpd.astype(np.int64) * nid + clid, BIG)
kbest = comp.argmin(axis=1)
rows = np.arange(n)
has = comp[rows, kbest] < BIG
self.flatDirs = np.full(n, -1, dtype=np.int32)
ispit = self.pitIDs > -1
# pit node with a candidate -> chosen receiver; pit node w/o -> self (sink)
self.flatDirs[ispit] = np.where(
has[ispit], ngb[rows, kbest][ispit], rows[ispit]
).astype(np.int32)
self.flatDirs[self.lspillIDs] = -1
return
[docs]
def _getPitParams(self, hl, nbpits):
"""
Define depression global parameters:
- volume of each depression
- maximum filled depth
:arg hl: Numpy Array of unfilled surface elevation
:arg nbpits: number of depression in the global mesh
"""
# Get pit parameters (volume and maximum filled elevation)
ids = self.inIDs == 1
grp = npi.group_by(self.pitIDs[ids])
uids = grp.unique
_, vol = grp.sum((self.lFill[ids] - hl[ids]) * self.larea[ids])
_, hh = grp.max(self.lFill[ids])
_, dh = grp.max(self.lFill[ids] - hl[ids])
totv = np.zeros(nbpits, dtype=np.float64)
hmax = np.full(nbpits, MISSING_DATA_SENTINEL, dtype=np.float64)
diffh = np.zeros(nbpits, dtype=np.float64)
ids = uids > -1
totv[uids[ids]] = vol[ids]
hmax[uids[ids]] = hh[ids]
diffh[uids[ids]] = dh[ids]
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, totv, op=MPI.SUM)
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, hmax, op=MPI.MAX)
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, diffh, op=MPI.MAX)
self.pitParams = np.empty((nbpits, 3), dtype=np.float64)
self.pitParams[:, 0] = totv
self.pitParams[:, 1] = hmax
self.pitParams[:, 2] = diffh
return
[docs]
def fillElevation(self, sed=False):
"""
This function is the main entry point to perform pit filling.
It relies on the following private functions:
- _performFilling
- _pitInformation
:arg sed: boolean specifying if the pits are filled with water or sediments.
"""
tfill = process_time()
hl = self.hLocal.getArray().copy()
minh = self.hGlobal.min()[1]
if not self.flatModel:
minh += 1.0e-3 # TODO-REFACTOR: value matches DEPOSIT_FLOOR but distinct role (minh epsilon nudge for fillElevation); do not replace
level = max(minh, self.sealevel + self.oFill)
self._performFilling(hl - level, level, sed)
self.lFill += level
self._pitInformation(hl, level, sed)
# Compute discrete depth-volume curves per pit. Used by water filling
# (sed=False) for routing residual flux, and by the bottom-up
# sediment fill in _updateSinks (sed=True) to find the lake-surface
# level that matches the deposited volume.
ids = self.pitParams[:, 0] > 0.0
dh = np.zeros((len(self.pitInfo), 6), dtype=np.float64)
dh[ids, 0] = self.pitParams[ids, 1] - self.pitParams[ids, 2]
dh[ids, 1:] = np.expand_dims(self.pitParams[ids, 2] / 5.0, axis=1)
self.filled_lvl = np.cumsum(dh, axis=1)[:, 1:]
self.filled_vol = np.zeros((len(self.pitInfo), 5), dtype=np.float64)
hl_clip = hl.copy()
if not sed:
hl_clip[hl_clip < self.sealevel] = self.sealevel
self.filled_vol[:, :-1] = getpitvol(
self.filled_lvl[:, :-1], hl_clip, self.pitIDs, self.inIDs
)
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, self.filled_vol, op=MPI.SUM)
self.filled_vol[:, -1] = self.pitParams[:, 0]
if MPIrank == 0 and self.verbose:
print(
"Handling depressions over the surface (%0.02f seconds)"
% (process_time() - tfill)
)
if self.memclear:
del hl, minh
gc.collect()
return
[docs]
def fillIceElevation(self, hl):
"""
This function is the main entry point to perform pit filling for glacier.
It relies on the following private functions:
- _performFilling
- _pitInformation
:arg sed: boolean specifying if the pits are filled with water or sediments.
"""
tfill = process_time()
minh = self.hGlobal.min()[1]
if not self.flatModel:
minh += 1.0e-3 # TODO-REFACTOR: value matches DEPOSIT_FLOOR but distinct role (minh epsilon nudge for fillIceElevation); do not replace
level = max(minh, self.sealevel + self.oFill)
self._performFilling(hl - level, level, False)
self.lFill += level
self._pitInformation(hl, level, False)
# Define specific filling levels for unfilled water depressions
# if not False:
ids = self.pitParams[:, 0] > 0.0
dh = np.zeros((len(self.pitInfo), 6), dtype=np.float64)
dh[ids, 0] = self.pitParams[ids, 1] - self.pitParams[ids, 2]
dh[ids, 1:] = np.expand_dims(self.pitParams[ids, 2] / 5.0, axis=1)
self.filled_lvl = np.cumsum(dh, axis=1)[:, 1:]
self.filled_vol = np.zeros((len(self.pitInfo), 5), dtype=np.float64)
hl[hl < self.sealevel] = self.sealevel
self.filled_vol[:, :-1] = getpitvol(
self.filled_lvl[:, :-1], hl, self.pitIDs, self.inIDs
)
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, self.filled_vol, op=MPI.SUM)
self.filled_vol[:, -1] = self.pitParams[:, 0]
if MPIrank == 0 and self.verbose:
print(
"Handling ice depressions over the smooth surface (%0.02f seconds)"
% (process_time() - tfill)
)
if self.memclear:
del minh
gc.collect()
return