import os
import gc
import sys
import petsc4py
import numpy as np
import pandas as pd
import numpy_indexed as npi
from mpi4py import MPI
from time import process_time
from gospl.tools.constants import GRAPH_OUTLIER_CAP, MISSING_DATA_SENTINEL
if "READTHEDOCS" not in os.environ:
from gospl._fortran import edge_tile
from gospl._fortran import fill_tile
from gospl._fortran import fill_edges
from gospl._fortran import fill_dir
from gospl._fortran import nghb_dir
from gospl._fortran import fill_rcvs
from gospl._fortran import getpitvol
from gospl._fortran import pits_cons
from gospl._fortran import fill_depressions
from gospl._fortran import graph_nodes
from gospl._fortran import combine_edges
from gospl._fortran import label_pits
from gospl._fortran import spill_pts
from gospl._fortran import sort_ids
MPIrank = petsc4py.PETSc.COMM_WORLD.Get_rank()
MPIsize = petsc4py.PETSc.COMM_WORLD.Get_size()
[docs]
class PITFill(object):
"""
Depression filling is an important preconditioning step to many landscape evolution models.
This class implements a linearly-scaling parallel priority-flood depression-filling algorithm based on `Barnes (2016) <https://arxiv.org/pdf/1606.06204.pdf>`_ algorithm.
.. note::
Unlike previous algorithms, `Barnes (2016) <https://arxiv.org/pdf/1606.06204.pdf>`_ approach guarantees a fixed number of memory access and communication events per processors.
The approach proposed in goSPL handles irregular meshes, allowing for complex distributed meshes to be used as long as a clear definition of inter-mesh connectivities is available. It also creates directions over flat regions allowing for downstream flows in cases where the entire volume of a depression is filled.
For inter-mesh connections and message passing, the approach relies on PETSc DMPlex functions.
The main functions return the following parameters:
- the elevation of the filled surface,
- the information for each depression (e.g., a unique global ID, its spillover local points and related processor),
- the description of each depression (total volume and maximum filled depth).
"""
def __init__(self, *args, **kwargs):
"""
The initialisation of `PITFill` class consists in the declaration of PETSc vectors, matrices and each partition internals edge vertices.
"""
# Petsc vectors
edges = -np.ones((self.lpoints, 2), dtype=int)
edges[self.idLBounds, 0] = self.idLBounds
edges[self.idBorders, 0] = self.idBorders
edges[self.idLBounds, 1] = 0
edges[self.idBorders, 1] = 1
self.borders = edges
self.outEdges = np.zeros(self.lpoints, dtype=int)
self.outEdges[self.ghostIDs] = 1
return
[docs]
def _buildPitDataframe(self, label1, label2):
"""
Definition of a Pandas Dataframe used to find unique pit ID across processors.
:arg label1: depression ID in a given processor
:arg label2: same depression ID in a neighbouring mesh
:return: df (sorted Dataframe of pit ID between processors)
"""
data = {
"p1": label1,
"p2": label2,
}
df = pd.DataFrame(data, columns=["p1", "p2"])
df = df.drop_duplicates().sort_values(["p2", "p1"], ascending=(False, False))
return df
[docs]
def _sortingPits(self, df):
"""
Sorts depressions number before combining them to ensure no depression index is changed in an unsorted way.
:arg df: Pandas Dataframe containing depression numbers which have to be combined.
:return: df sorted pandas dataframe containing depression numbers.
"""
df["p2"] = sort_ids(df["p1"].values.astype(int), df["p2"].values.astype(int))
df = df.drop_duplicates().sort_values(["p2", "p1"], ascending=(False, False))
return df
[docs]
def _offsetGlobal(self, lgth):
"""
Computes the offset between processors to ensure unique number for considered indices.
:arg lgth: local length of the data to distribute
:return: cumulative sum and sum of the labels to add to each processor
"""
label_offset = np.zeros(MPIsize + 1, dtype=int)
label_offset[MPIrank + 1] = lgth
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, label_offset, op=MPI.MAX)
return np.cumsum(label_offset), np.sum(label_offset)
[docs]
def _fillFromEdges(self, mgraph):
"""
Combine local meshes by joining their edges based on local spillover graphs.
:arg mgraph: Numpy Array containing local mesh edges information
:arg ggraph: Numpy Array containing filled elevation values based on other processors values
"""
# Get bidirectional edges connections
cgraph = pd.DataFrame(
mgraph, columns=["source", "target", "elev", "spill", "rank"]
)
cgraph = cgraph.sort_values("elev")
cgraph = cgraph.drop_duplicates(["source", "target"], keep="first")
c12 = np.concatenate((cgraph["source"].values, cgraph["target"].values))
cmax = np.max(np.bincount(c12.astype(int))) + 1
# Filling the bidirectional graph
cgraph = cgraph.values
elev, rank, nodes, spillID = fill_edges(
int(max(cgraph[:, 1]) + 2), cgraph, cmax
)
ggraph = -np.ones((len(elev), 5))
ggraph[:, 0] = elev
ggraph[:, 1] = nodes
ggraph[:, 2] = rank
ggraph[:, 3] = spillID
if self.memclear:
del elev, nodes, rank
del spillID, c12
return ggraph
[docs]
def _transferIDs(self, pitIDs):
"""
This function transfers local depression IDs along local borders and combines them with a unique identifier.
:arg pitIDs: local depression index.
:return: number of depressions.
"""
# Define globally unique watershed index
t0 = process_time()
fillIDs = pitIDs >= 0
offset, _ = self._offsetGlobal(np.amax(pitIDs))
pitIDs[fillIDs] += offset[MPIrank]
if MPIrank == 0 and self.verbose:
print(
"Offset Global pit ids (%0.02f seconds)" % (process_time() - t0)
)
t0 = process_time()
# Transfer depression IDs along local borders
self.tmpL.setArray(pitIDs)
self.dm.localToGlobal(self.tmpL, self.tmp)
self.dm.globalToLocal(self.tmp, self.tmpL)
label = self.tmpL.getArray().copy().astype(int)
ids = label < pitIDs
df = self._buildPitDataframe(label[ids], pitIDs[ids])
ids = label > pitIDs
df2 = self._buildPitDataframe(pitIDs[ids], label[ids])
df = pd.concat([df, df2], ignore_index=True)
df = df.drop_duplicates().sort_values(["p2", "p1"], ascending=(False, False))
df = df[(df["p1"] >= 0) & (df["p2"] >= 0)]
if MPIrank == 0 and self.verbose:
print(
"Build pit dataframe (%0.02f seconds)" % (process_time() - t0)
)
t0 = process_time()
# Send depression IDs globally
offset, _ = self._offsetGlobal(len(df))
combIds = -np.ones((np.amax(offset), 2), dtype=int)
if len(df) > 0:
combIds[offset[MPIrank] : offset[MPIrank + 1], :] = df.values
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, combIds, op=MPI.MAX)
df = self._buildPitDataframe(combIds[:, 0], combIds[:, 1])
df = df[(df["p1"] >= 0) & (df["p2"] >= 0)]
if MPIrank == 0 and self.verbose:
print(
"Combine pit dataframe (%0.02f seconds)" % (process_time() - t0)
)
t0 = process_time()
# Sorting label transfer between processors
if len(df) > 1:
sorting = True
stp = 0
while sorting:
df2 = self._sortingPits(df)
sorting = not df.equals(df2)
df = df2.copy()
stp += 1
if stp > 1000:
if MPIrank == 0:
print(
"Pit sorting did not converge after 1000 iterations; continuing.",
flush=True,
)
break
for k in range(len(df)):
label[label == df["p2"].iloc[k]] = df["p1"].iloc[k]
if MPIrank == 0 and self.verbose:
print(
"Sorting pits (%0.02f seconds)" % (process_time() - t0)
)
t0 = process_time()
# Transfer depression IDs along local borders
self.tmpL.setArray(label)
self.dm.localToGlobal(self.tmpL, self.tmp)
self.dm.globalToLocal(self.tmp, self.tmpL)
# At this point all pits have a unique IDs across processors
self.pitIDs = self.tmpL.getArray().astype(int)
# Lets make consecutive indices
pitnbs = np.zeros(1, dtype=int)
pitnbs[0] = np.max(self.pitIDs) + 1
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, pitnbs, op=MPI.MAX)
fillIDs = self.pitIDs >= 0
valpit = -np.ones(pitnbs[0], dtype=int)
unique, idx_groups = npi.group_by(
self.pitIDs[fillIDs], np.arange(len(self.pitIDs[fillIDs]))
)
valpit[unique] = 1
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, valpit, op=MPI.MAX)
pitNb = np.where(valpit > 0)[0]
if MPIrank == 0 and self.verbose:
print(
"Define consecutive pit ids (%0.02f seconds)" % (process_time() - t0)
)
t0 = process_time()
self.pitIDs = pits_cons(self.pitIDs, pitNb)
return pitNb
[docs]
def _dirFlats(self):
"""
This function finds routes to spillover points on filled depressions to ensure downstream distribution if they are overfilled.
"""
# Find local receivers to direct flow to spillover nodes
ids = self.pitIDs > -1
if ids.any():
pdir = fill_dir(self.lspillIDs, self.pitIDs, self.lFill)
else:
pdir = -np.ones(self.lpoints, dtype=np.int32)
# Transfer filled values along the local borders
keep = -np.ones(1)
stp = 0
while keep[0] < 0:
self.tmp.setArray(pdir[self.glIDs])
self.dm.globalToLocal(self.tmp, self.tmpL, 3)
pdir = self.tmpL.getArray().copy().astype(int)
pdir = nghb_dir(self.pitIDs, self.lFill, pdir)
# Check if there are pit nodes without directions
if ids.any():
keep[0] = pdir[ids].min()
else:
keep[0] = 1.0
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, keep, op=MPI.MIN)
stp += 1
if stp > 100:
break
# Define receiver nodes on each depression
self.flatDirs = fill_rcvs(self.pitIDs, self.lFill, pdir)
self.flatDirs[self.lspillIDs] = -1
return
[docs]
def _getPitParams(self, hl, nbpits):
"""
Define depression global parameters:
- volume of each depression
- maximum filled depth
:arg hl: Numpy Array of unfilled surface elevation
:arg nbpits: number of depression in the global mesh
"""
# Get pit parameters (volume and maximum filled elevation)
ids = self.inIDs == 1
grp = npi.group_by(self.pitIDs[ids])
uids = grp.unique
_, vol = grp.sum((self.lFill[ids] - hl[ids]) * self.larea[ids])
_, hh = grp.max(self.lFill[ids])
_, dh = grp.max(self.lFill[ids] - hl[ids])
totv = np.zeros(nbpits, dtype=np.float64)
hmax = np.full(nbpits, MISSING_DATA_SENTINEL, dtype=np.float64)
diffh = np.zeros(nbpits, dtype=np.float64)
ids = uids > -1
totv[uids[ids]] = vol[ids]
hmax[uids[ids]] = hh[ids]
diffh[uids[ids]] = dh[ids]
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, totv, op=MPI.SUM)
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, hmax, op=MPI.MAX)
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, diffh, op=MPI.MAX)
self.pitParams = np.empty((nbpits, 3), dtype=np.float64)
self.pitParams[:, 0] = totv
self.pitParams[:, 1] = hmax
self.pitParams[:, 2] = diffh
return
[docs]
def fillElevation(self, sed=False):
"""
This function is the main entry point to perform pit filling.
It relies on the following private functions:
- _performFilling
- _pitInformation
:arg sed: boolean specifying if the pits are filled with water or sediments.
"""
tfill = process_time()
hl = self.hLocal.getArray().copy()
minh = self.hGlobal.min()[1]
if not self.flatModel:
minh += 1.0e-3 # TODO-REFACTOR: value matches DEPOSIT_FLOOR but distinct role (minh epsilon nudge for fillElevation); do not replace
level = max(minh, self.sealevel + self.oFill)
self._performFilling(hl - level, level, sed)
self.lFill += level
self._pitInformation(hl, level, sed)
# Compute discrete depth-volume curves per pit. Used by water filling
# (sed=False) for routing residual flux, and by the bottom-up
# sediment fill in _updateSinks (sed=True) to find the lake-surface
# level that matches the deposited volume.
ids = self.pitParams[:, 0] > 0.0
dh = np.zeros((len(self.pitInfo), 6), dtype=np.float64)
dh[ids, 0] = self.pitParams[ids, 1] - self.pitParams[ids, 2]
dh[ids, 1:] = np.expand_dims(self.pitParams[ids, 2] / 5.0, axis=1)
self.filled_lvl = np.cumsum(dh, axis=1)[:, 1:]
self.filled_vol = np.zeros((len(self.pitInfo), 5), dtype=np.float64)
hl_clip = hl.copy()
if not sed:
hl_clip[hl_clip < self.sealevel] = self.sealevel
self.filled_vol[:, :-1] = getpitvol(
self.filled_lvl[:, :-1], hl_clip, self.pitIDs, self.inIDs
)
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, self.filled_vol, op=MPI.SUM)
self.filled_vol[:, -1] = self.pitParams[:, 0]
if MPIrank == 0 and self.verbose:
print(
"Handling depressions over the surface (%0.02f seconds)"
% (process_time() - tfill)
)
if self.memclear:
del hl, minh
gc.collect()
return
[docs]
def fillIceElevation(self, hl):
"""
This function is the main entry point to perform pit filling for glacier.
It relies on the following private functions:
- _performFilling
- _pitInformation
:arg sed: boolean specifying if the pits are filled with water or sediments.
"""
tfill = process_time()
minh = self.hGlobal.min()[1]
if not self.flatModel:
minh += 1.0e-3 # TODO-REFACTOR: value matches DEPOSIT_FLOOR but distinct role (minh epsilon nudge for fillIceElevation); do not replace
level = max(minh, self.sealevel + self.oFill)
self._performFilling(hl - level, level, False)
self.lFill += level
self._pitInformation(hl, level, False)
# Define specific filling levels for unfilled water depressions
# if not False:
ids = self.pitParams[:, 0] > 0.0
dh = np.zeros((len(self.pitInfo), 6), dtype=np.float64)
dh[ids, 0] = self.pitParams[ids, 1] - self.pitParams[ids, 2]
dh[ids, 1:] = np.expand_dims(self.pitParams[ids, 2] / 5.0, axis=1)
self.filled_lvl = np.cumsum(dh, axis=1)[:, 1:]
self.filled_vol = np.zeros((len(self.pitInfo), 5), dtype=np.float64)
hl[hl < self.sealevel] = self.sealevel
self.filled_vol[:, :-1] = getpitvol(
self.filled_lvl[:, :-1], hl, self.pitIDs, self.inIDs
)
MPI.COMM_WORLD.Allreduce(MPI.IN_PLACE, self.filled_vol, op=MPI.SUM)
self.filled_vol[:, -1] = self.pitParams[:, 0]
if MPIrank == 0 and self.verbose:
print(
"Handling ice depressions over the smooth surface (%0.02f seconds)"
% (process_time() - tfill)
)
if self.memclear:
del minh
gc.collect()
return