"""
Post-processing: publication-quality **stratigraphic sections, wells and
Wheeler diagrams** from a goSPL run (matplotlib, vector PDF/SVG).
Reassembles the global stratigraphic pile (``stratZ``/``stratH`` and, when
present, ``stratHf``/``phiS``/``phiF`` and provenance ``stratP``) plus the
current surface ``elev``, drops the bedrock-sentinel layer, and builds the
**current** layer-interface elevations (``surface = elev``; each interface =
``elev − Σ thickness above``; ``basement = elev − Σ thickness``), so eroded
layers pinch out correctly. On that it offers four products:
* :func:`cross_section` — a vertical section along the **x**- or **y**-direction
or an arbitrary **(x, y) path**: stratigraphy drawn as a coloured layer mesh
(``color_by`` = deposition elevation / thickness / lithology / porosity / age
/ provenance) bounded by **thick surface and basement lines**, sampled at the
mesh resolution.
* :func:`horizontal_slice` — a map at constant **elevation z**: which layer (and
its property) is preserved at that depth (subcrop / structure slice).
* :func:`synthetic_well` — the full stratigraphic column at a point, as a
borehole log coloured by any layer property (:func:`well_panel` draws several
wells side by side on one figure with a shared colour scale).
* :func:`wheeler` — a chronostratigraphic (Wheeler) chart along a transect:
distance vs deposition time, coloured by preserved thickness / composition,
with **hiatuses and erosion shown as gaps**.
Each returns the matplotlib ``Figure``/``Axes`` for paper tuning. CLI
``gospl-section`` (or ``python -m gospl.analyse.stratasection``) with
``--kind cross|slice|well|wheeler``.
"""
import os
import argparse
import numpy as np
# Per-layer colour fields. Each maps to an (n_nodes, n_layers) array.
_COLOR_FIELDS = ("deposition", "thickness", "lithology", "coarse",
"porosity", "age", "provenance", "facies")
_CMAP = {
"deposition": "terrain", "thickness": "viridis", "lithology": "YlGnBu",
"coarse": "YlOrBr", "porosity": "cividis", "age": "Spectral",
"provenance": "tab20",
}
_LABEL = {
"deposition": "deposition elevation (m)", "thickness": "layer thickness (m)",
"lithology": "fine fraction", "coarse": "coarse fraction",
"porosity": "bulk porosity", "age": "layer (oldest=0)",
"provenance": "dominant source class", "facies": "depositional facies",
}
_SENTINEL = 1.0e5
# Depositional-facies defaults (tunable). The facies of a layer is set by the
# water depth at the time of deposition, depth = sea_level - deposition_elev
# (stratZ): subaerial (< 0) = fluvial/deltaic plain, then the increasing-depth
# marine facies. `_FACIES_DEPTHS` are the bin edges (m below sea level), giving
# len+1 facies; `_FACIES_COLORS`/`_FACIES_LABELS` map one entry per facies.
_FACIES_DEPTHS = (0.0, 20.0, 50.0, 75.0)
_FACIES_COLORS = ("limegreen", "darkkhaki", "sandybrown", "khaki", "c", "teal")
_FACIES_LABELS = ("fluvial / deltaic plain", "shoreface", "distal offshore",
"upper slope", "lower slope")
def facies_field(data, sea_level=None, depths=_FACIES_DEPTHS):
"""
Per-(node, layer) **depositional-facies index** from the water depth at the
time of deposition, ``depth = sea_level - stratZ`` (``stratZ`` is the
recorded deposition elevation, i.e. the paleo-seabed). Binned by ``depths``
(m below sea level), facies 0 is the shallowest (subaerial). Eroded/absent
layers (``stratH <= 0``) are NaN.
:arg sea_level: scalar, or a per-layer array (length ``nlayers``) for a
time-varying paleo sea level; defaults to the step's sea level
(``data['sea_level']``), else 0.
:arg depths: increasing facies bin edges (m below sea level).
"""
if sea_level is None:
sea_level = data.get("sea_level")
if sea_level is None:
sea_level = 0.0
sl = np.asarray(sea_level, dtype=float)
if sl.ndim == 1: # per-layer paleo sea level
sl = sl[None, :]
depth = sl - data["stratZ"] # >0 below sea level
idx = np.digitize(depth, np.asarray(depths, dtype=float)).astype(float)
idx[data["stratH"] <= 0] = np.nan
return idx
def _facies_cmap_norm(depths, colors):
"""Discrete ListedColormap + BoundaryNorm for ``len(depths)+1`` facies."""
from matplotlib.colors import ListedColormap, BoundaryNorm
nfac = len(depths) + 1
fcolors = list(colors)[:nfac]
cmap = ListedColormap(fcolors)
norm = BoundaryNorm(np.arange(-0.5, nfac + 0.5, 1.0), cmap.N)
return cmap, norm, fcolors, nfac
def _facies_legend(ax, fcolors, labels, nfac, loc="lower left", extra=None):
"""Discrete facies legend (one patch per facies, plus any ``extra`` handles
such as the shoreline line) on ``ax``."""
from matplotlib.patches import Patch
flabels = list(labels)[:nfac]
handles = [Patch(facecolor=fcolors[i], label=flabels[i])
for i in range(min(nfac, len(flabels)))]
if extra:
handles += list(extra)
ax.legend(handles=handles, loc=loc, fontsize=8, framealpha=0.9,
title="facies")
def _effective_dt(data, tstart, dt):
"""
Resolve the per-layer time increment (yr). If ``dt`` is given, use it.
Otherwise derive it from the step's **stratal display time**
(``data['time']`` read from the .xmf) so the age / Wheeler time axis is the
real simulation time: the surface layer maps to ``data['time']`` and the
spacing is ``(time - tstart) / (lo + nlayers - 1)`` (the surface's global
layer index). Falls back to 1.0 (unitless layer count) if no time is known.
"""
if dt is not None:
return dt
T = data.get("time")
if T is None:
return 1.0
denom = max(data["lo"] + data["nlayers"] - 1, 1)
return (float(T) - tstart) / denom
def _km(ax, axis="x", geographic=False):
"""Display a distance axis in **km** (data stays in m). No-op for geographic
(lon/lat) meshes, where the axis is in degrees."""
if geographic:
return "deg"
from matplotlib.ticker import FuncFormatter
fmt = FuncFormatter(lambda v, _: "%g" % (v / 1000.0))
(ax.xaxis if axis == "x" else ax.yaxis).set_major_formatter(fmt)
return "km"
def _ky(ax, axis="y"):
"""Display a time axis in **ky** (data stays in yr)."""
from matplotlib.ticker import FuncFormatter
fmt = FuncFormatter(lambda v, _: "%g" % (v / 1000.0))
(ax.xaxis if axis == "x" else ax.yaxis).set_major_formatter(fmt)
return "ky"
[docs]
def load_strata(h5dir, mesh, step=None, vkey="v", ckey="c", file_base="gospl",
strat_base="stratal"):
"""
Reassemble the global stratigraphic pile + current surface for one step.
:return: a dict with the mesh (``coords``/``cells``/``x``/``y``,
``geographic``), ``elev`` (n,), per-layer ``stratZ``/``stratH`` and
optional ``stratHf``/``phiS``/``phiF``/``stratP`` (n, L) with the bedrock
sentinel dropped, ``interfaces`` (n, L+1) current interface elevations
(basement..surface), ``nlayers`` and ``lo`` (first kept layer).
"""
import glob
import h5py
from scipy.spatial import cKDTree
data = np.load(mesh)
coords = np.asarray(data[vkey], dtype=np.float64)
cells = np.asarray(data[ckey], dtype=np.int64)
npts = coords.shape[0]
r = np.linalg.norm(coords, axis=1)
geographic = bool(r.mean() > 1.0e5 and r.std() / max(r.mean(), 1.0) < 1.0e-3)
if geographic:
R = float(r.mean())
x = np.degrees(np.arctan2(coords[:, 1], coords[:, 0]))
y = np.degrees(np.arcsin(np.clip(coords[:, 2] / R, -1.0, 1.0)))
else:
x, y = coords[:, 0], coords[:, 1]
if step is None:
step = max(
int(os.path.basename(f).split(".")[1])
for f in glob.glob(os.path.join(h5dir, "%s.*.p*.h5" % strat_base))
)
tree = cKDTree(coords)
# First pass: discover layer count C from the stratal file.
sparts = sorted(glob.glob(os.path.join(h5dir, "%s.%d.p*.h5" % (strat_base, step))))
if not sparts:
raise FileNotFoundError("no %s.%d.p*.h5 in %s" % (strat_base, step, h5dir))
with h5py.File(sparts[0], "r") as f:
L = f["stratH"].shape[1]
has_litho = "stratHf" in f and "phiF" in f
has_prov = "stratP" in f
nprov = f["stratP"].shape[2] if has_prov else 0
fields = {k: np.full((npts, L), np.nan)
for k in ("stratZ", "stratH", "phiS")}
if has_litho:
fields["stratHf"] = np.full((npts, L), np.nan)
fields["phiF"] = np.full((npts, L), np.nan)
prov = np.full((npts, L, nprov), np.nan) if has_prov else None
elev = np.full(npts, np.nan)
for pth in sparts:
p = os.path.basename(pth).split(".p")[-1].split(".")[0]
with h5py.File(os.path.join(h5dir, "topology.p%s.h5" % p), "r") as tf:
lc = np.asarray(tf["coords"], dtype=np.float64)
_, idx = tree.query(lc)
with h5py.File(pth, "r") as f:
for k in fields:
if k in f:
fields[k][idx] = np.asarray(f[k])
if has_prov:
prov[idx] = np.asarray(f["stratP"])
mpath = os.path.join(h5dir, "%s.%d.p%s.h5" % (file_base, step, p))
if os.path.exists(mpath):
with h5py.File(mpath, "r") as f:
if "elev" in f:
elev[idx] = np.asarray(f["elev"])[:, 0]
# Drop the bedrock sentinel (and any leading huge layer).
lo = 0
while lo < L and np.nanmax(fields["stratH"][:, lo]) >= _SENTINEL:
lo += 1
sl = slice(lo, L)
stratH = np.nan_to_num(fields["stratH"][:, sl])
out = {
"coords": coords, "cells": cells, "x": x, "y": y,
"geographic": geographic, "elev": elev,
"stratZ": fields["stratZ"][:, sl], "stratH": stratH,
"phiS": fields["phiS"][:, sl],
"stratHf": fields["stratHf"][:, sl] if has_litho else None,
"phiF": fields["phiF"][:, sl] if has_litho else None,
"stratP": prov[:, sl, :] if has_prov else None,
"nlayers": L - lo, "lo": lo, "step": step,
}
# Sea level for this step, read from the simulation (the step's .xmf `sea`
# constant); None if the file is absent. Used as the default sea level for
# cross_section / wheeler so the shoreline datum matches the run without a
# manual --sea-level.
# Display time (years) of this step, also read from the .xmf, so the age /
# Wheeler time axis is the real simulation time without a manual --strat-dt.
try:
from gospl.analyse.gridexport import _read_sealevel, _read_time
out["sea_level"] = _read_sealevel(h5dir, file_base, step)
out["time"] = _read_time(h5dir, file_base, step)
except Exception:
out["sea_level"] = None
out["time"] = None
# Current interface elevations (basement .. surface).
above = np.cumsum(stratH[:, ::-1], axis=1)[:, ::-1] - stratH # above each layer
top = elev[:, None] - above # top of each layer
out["interfaces"] = np.column_stack([top[:, :1] - stratH[:, :1], top])
return out
def color_field(data, color_by, tstart=0.0, dt=1.0):
"""Per-(node, layer) array, colormap and label for a ``color_by`` choice."""
H = data["stratH"]
if color_by in ("lithology", "fine"):
f = data["stratHf"]
if f is None:
raise ValueError("color_by='lithology' needs dual-lithology output")
with np.errstate(divide="ignore", invalid="ignore"):
arr = np.where(H > 0, f / H, np.nan)
elif color_by == "coarse":
f = data["stratHf"]
if f is None:
raise ValueError("color_by='coarse' needs dual-lithology output")
with np.errstate(divide="ignore", invalid="ignore"):
arr = np.where(H > 0, 1.0 - f / H, np.nan)
elif color_by == "thickness":
arr = np.where(H > 0, H, np.nan)
elif color_by == "deposition":
arr = data["stratZ"]
elif color_by == "porosity":
if data["phiF"] is not None:
with np.errstate(divide="ignore", invalid="ignore"):
ff = np.where(H > 0, data["stratHf"] / H, 0.0)
arr = (1 - ff) * data["phiS"] + ff * data["phiF"]
else:
arr = data["phiS"]
elif color_by == "age":
arr = np.broadcast_to(
tstart + (data["lo"] + np.arange(data["nlayers"])) * dt,
H.shape,
).astype(float)
elif color_by == "provenance":
if data["stratP"] is None:
raise ValueError("color_by='provenance' needs provenance output")
arr = data["stratP"].argmax(axis=2).astype(float)
arr[H <= 0] = np.nan
else:
raise ValueError("unknown color_by %r (choose from %s)"
% (color_by, ", ".join(_COLOR_FIELDS)))
return arr, _CMAP.get(color_by, "viridis"), _LABEL.get(color_by, color_by)
def _transect(data, kind="x", at=None, path=None, npts=None):
"""
Sample coordinates along a transect: ``kind='x'`` (constant y, spans x),
``kind='y'`` (constant x, spans y), or an explicit ``path`` of (x, y)
waypoints. ``npts`` defaults to the mesh resolution. Returns (xs, ys, dist).
"""
x, y = data["x"], data["y"]
if npts is None:
e = np.vstack([data["cells"][:, [0, 1]], data["cells"][:, [1, 2]]])
res = float(np.median(np.linalg.norm(
data["coords"][e[:, 0], :2] - data["coords"][e[:, 1], :2], axis=1)))
if data["geographic"]:
res = float(np.median(np.abs(np.diff(np.sort(np.unique(x))))) or 1.0)
if path is not None:
pts = np.asarray(path, dtype=float)
seg = np.r_[0, np.cumsum(np.linalg.norm(np.diff(pts, axis=0), axis=1))]
n = npts or max(2, int(seg[-1] / res) + 1)
d = np.linspace(0, seg[-1], n)
xs = np.interp(d, seg, pts[:, 0])
ys = np.interp(d, seg, pts[:, 1])
return xs, ys, d
if kind == "x":
at = 0.5 * (y.min() + y.max()) if at is None else at
n = npts or max(2, int((x.max() - x.min()) / res) + 1)
xs = np.linspace(x.min(), x.max(), n)
ys = np.full(n, at)
elif kind == "y":
at = 0.5 * (x.min() + x.max()) if at is None else at
n = npts or max(2, int((y.max() - y.min()) / res) + 1)
ys = np.linspace(y.min(), y.max(), n)
xs = np.full(n, at)
else:
raise ValueError("kind must be 'x', 'y', or pass path=")
d = np.hypot(xs - xs[0], ys - ys[0])
return xs, ys, d
def _interp_layers(data, arr, xs, ys):
"""Interpolate a per-(node, layer) array onto transect points -> (npts, L)."""
import matplotlib.tri as mtri
tri = mtri.Triangulation(data["x"], data["y"], data["cells"])
out = np.full((xs.size, arr.shape[1]), np.nan)
for k in range(arr.shape[1]):
out[:, k] = mtri.LinearTriInterpolator(tri, arr[:, k])(xs, ys)
return out
[docs]
def cross_section(data, kind="x", at=None, path=None, color_by="lithology",
npts=None, vexag=1.0, sea_level=None, cmap=None, ax=None,
tstart=0.0, dt=None, figsize=None, layer_lines=0,
layer_line_kw=None, facies_depths=_FACIES_DEPTHS,
facies_colors=_FACIES_COLORS, facies_labels=_FACIES_LABELS,
legend_loc="lower left", title_fontsize=None, xlim=None,
ylim=None):
"""
Vertical stratigraphic section along x / y / a path. Layers are drawn as a
coloured ``pcolormesh`` (pinching where eroded) between the current
interfaces, bounded by thick **surface** and **basement** lines, with a
light-grey fill below the basement and a solid sea-level datum line drawn at
the back (behind the coloured section).
The y-axis shows the **true elevation** (m). Vertical exaggeration is applied
as a real data aspect ratio (``vexag``), so the tick labels stay true; with
``vexag == 1`` the box is auto-scaled and ``figsize`` controls the look.
:arg figsize: figure size (w, h) inches when a new figure is created.
:arg vexag: vertical exaggeration (data aspect); 1 = auto-scaled (labels are
always true elevation either way).
:arg layer_lines: if > 0, overlay a thin interface line every N layers (e.g.
2 = every other layer) on top of the surface/basement lines.
:arg layer_line_kw: dict of matplotlib line kwargs for those layer lines
(defaults: thin grey).
:arg color_by: any of ``deposition``/``thickness``/``lithology``/``coarse``/
``porosity``/``age``/``provenance``/``facies``. ``facies`` classifies
each layer by the water depth at deposition (see ``facies_field``) and
draws a discrete colour + legend instead of a colour bar.
:arg facies_depths / facies_colors / facies_labels: tunable facies bin edges
(m below sea level), colours and legend labels.
:arg sea_level: datum line + facies depth reference; defaults to the value
read from the simulation for this step (``data["sea_level"]``). Omitted
if the run did not record one.
:arg xlim / ylim: explicit axis ranges ``(min, max)`` in data units (m). When
either is given the view is clipped to exactly those bounds and the
aspect is left **auto** (so ``figsize`` sets the proportions); ``vexag``
only locks the data aspect when neither limit is pinned.
"""
import matplotlib.pyplot as plt
if sea_level is None:
sea_level = data.get("sea_level")
dt = _effective_dt(data, tstart, dt)
facies = color_by == "facies"
xs, ys, dist = _transect(data, kind, at, path, npts)
iface = _interp_layers(data, data["interfaces"], xs, ys) # (npts, L+1)
if facies:
cfld = facies_field(data, sea_level, facies_depths)
label = _LABEL["facies"]
else:
cfld, dcmap, label = color_field(data, color_by, tstart, dt)
cvals = _interp_layers(data, cfld, xs, ys) # (npts, L)
Y = iface.T # true elevation
X = np.broadcast_to(dist, Y.shape)
# flat shading: C is one smaller than the corner mesh in each axis.
C = np.ma.masked_invalid(cvals.T[:, :-1]) # (L, npts-1)
if ax is None:
_, ax = plt.subplots(figsize=figsize or (9, 4))
surf = iface[:, -1]
base = iface[:, 0]
# Sea-level datum: solid line at the BACK of the plot (low zorder) so the
# coloured section is drawn over it and it only shows through gaps / above
# the surface.
if sea_level is not None and np.ndim(sea_level) == 0:
ax.axhline(float(sea_level), color="steelblue", lw=1.0, zorder=0)
# Light-grey basement: fill everything below the basement line. Drawn at the
# back so the section sits on top of it; extend the y-limit down so it
# reaches the bottom of the axes with no white gap.
if np.isfinite(base).any():
ylo = float(np.nanmin(base))
rng = float(np.nanmax(surf) - ylo) if np.isfinite(surf).any() else 0.0
ylo -= 0.03 * rng if rng > 0 else 1.0
ax.fill_between(dist, ylo, base, color="0.85", zorder=0.5,
label="basement")
if facies:
fcmap, norm, fcolors, nfac = _facies_cmap_norm(facies_depths, facies_colors)
# interpolated facies indices are continuous; snap to the nearest class.
C = np.ma.masked_invalid(np.round(cvals.T[:, :-1]))
mesh = ax.pcolormesh(X, Y, C, cmap=fcmap, norm=norm, shading="flat",
zorder=2)
_facies_legend(ax, fcolors, facies_labels, nfac, loc=legend_loc)
else:
mesh = ax.pcolormesh(X, Y, C, cmap=cmap or dcmap, shading="flat",
zorder=2)
ax.figure.colorbar(mesh, ax=ax, label=label, shrink=0.8)
ax.plot(dist, surf, color="k", lw=2.2, zorder=5, label="surface")
ax.plot(dist, base, color="0.25", lw=2.2, zorder=5)
if layer_lines and layer_lines > 0:
lkw = {"color": "0.4", "lw": 0.5, "alpha": 0.7, "zorder": 4}
lkw.update(layer_line_kw or {})
# interface columns are basement(0) .. surface(L); draw interior ones
# every `layer_lines` layers (skip basement/surface, already drawn).
for j in range(layer_lines, iface.shape[1] - 1, layer_lines):
ax.plot(dist, iface[:, j], **lkw)
# Axis limits / aspect. If the user pins xlim and/or ylim we honour them
# EXACTLY (the view is clipped to those bounds) and leave the aspect auto so
# `figsize` sets the proportions — a locked data aspect (`set_aspect`) would
# otherwise re-adjust the limits at draw time and override them. With no
# pinned limits, `vexag` locks a true vertical exaggeration (adjustable=
# "datalim" keeps the box at `figsize`; the default "box" would resize it).
pinned = xlim is not None or ylim is not None
if vexag and vexag != 1 and not pinned:
ax.set_aspect(vexag, adjustable="datalim")
else:
ax.set_aspect("auto")
if xlim is not None:
ax.set_xlim(*xlim)
if ylim is not None:
ax.set_ylim(*ylim)
u = _km(ax, "x", data["geographic"])
ax.set_xlabel("distance along %s (%s)" % ("path" if path is not None else kind, u))
ax.set_ylabel("elevation (m)")
ax.set_title("Stratigraphic cross-section (%s)" % color_by,
fontsize=title_fontsize)
return ax
[docs]
def horizontal_slice(data, z, color_by="age", cmap=None, ax=None,
tstart=0.0, dt=None, figsize=None, title_fontsize=None):
"""
Map at constant elevation ``z``: colour each location by the property of the
layer preserved at that elevation (subcrop / structure slice). Cells where z
is above the surface or below the basement are blank.
"""
import matplotlib.pyplot as plt
import matplotlib.tri as mtri
dt = _effective_dt(data, tstart, dt)
iface = data["interfaces"] # (n, L+1)
cfld, dcmap, label = color_field(data, color_by, tstart, dt) # (n, L)
# Which layer interval contains z, per node.
below = iface <= z # (n, L+1)
k = below.sum(axis=1) - 1 # layer index
inside = (k >= 0) & (k < data["nlayers"])
val = np.full(data["x"].shape, np.nan)
rows = np.where(inside)[0]
val[rows] = cfld[rows, k[rows]]
if ax is None:
_, ax = plt.subplots(figsize=figsize or (7, 5))
tri = mtri.Triangulation(data["x"], data["y"], data["cells"])
tp = ax.tripcolor(tri, np.ma.masked_invalid(val), cmap=cmap or dcmap,
shading="gouraud")
ax.figure.colorbar(tp, ax=ax, label=label, shrink=0.8)
ax.set_aspect("equal")
ux = _km(ax, "x", data["geographic"])
_km(ax, "y", data["geographic"])
ax.set_xlabel("x (%s)" % ux)
ax.set_ylabel("y (%s)" % ux)
ax.set_title("Layer %s preserved at z = %g m" % (color_by, z),
fontsize=title_fontsize)
return ax
[docs]
def synthetic_well(data, x, y, color_by="lithology", cmap=None, ax=None,
tstart=0.0, dt=None, width=1.0, figsize=None,
title_fontsize=None, cbar_orientation="horizontal",
vmin=None, vmax=None, colorbar=True, ylim=None):
"""
Synthetic well at ``(x, y)``: the stratigraphic column drawn as stacked
intervals (true current elevations) coloured by a layer property.
To draw **several wells on one figure** with a shared colour scale, pass each
an ``ax=`` (your own subplots) plus common ``vmin``/``vmax`` and
``colorbar=False``, then add a single colour bar — or just use
:func:`well_panel`, which does this for a list of locations.
:arg cbar_orientation: ``'horizontal'`` (default, colour bar across the base)
or ``'vertical'``.
:arg vmin / vmax: colour-scale limits; default is this well's own data range
(pass shared values to make several wells comparable).
:arg colorbar: draw the per-well colour bar (set ``False`` when sharing one).
:arg ylim: elevation range ``(min, max)``; default is this well's own
basement-to-surface span.
:arg title_fontsize: font size for the title (matplotlib default if None).
"""
import matplotlib.pyplot as plt
from matplotlib import cm, colors
dt = _effective_dt(data, tstart, dt)
iface = _interp_layers(data, data["interfaces"],
np.array([x]), np.array([y]))[0] # (L+1,)
cfld, dcmap, label = color_field(data, color_by, tstart, dt)
cval = _interp_layers(data, cfld, np.array([x]), np.array([y]))[0] # (L,)
finite = cval[np.isfinite(cval)]
lo = vmin if vmin is not None else (finite.min() if finite.size else 0.0)
hi = vmax if vmax is not None else (finite.max() if finite.size else 1.0)
norm = colors.Normalize(lo, hi)
cmp = plt.get_cmap(cmap or dcmap)
if ax is None:
_, ax = plt.subplots(figsize=figsize or (2.6, 6))
for k in range(data["nlayers"]):
z0, z1 = iface[k], iface[k + 1]
if not np.isfinite(z0) or not np.isfinite(z1) or z1 - z0 <= 0:
continue
col = cmp(norm(cval[k])) if np.isfinite(cval[k]) else "white"
ax.add_patch(plt.Rectangle((0, z0), width, z1 - z0, facecolor=col,
edgecolor="0.5", lw=0.3))
ax.axhline(iface[-1], color="k", lw=2.0) # surface
ax.axhline(iface[0], color="0.25", lw=2.0) # basement
ax.set_xlim(0, width)
if ylim is not None:
ax.set_ylim(*ylim)
else:
ax.set_ylim(np.nanmin(iface), np.nanmax(iface))
ax.set_xticks([])
ax.set_ylabel("elevation (m)")
if colorbar:
sm = cm.ScalarMappable(norm=norm, cmap=cmp)
if cbar_orientation == "horizontal":
# A well is usually drawn tall & very narrow (e.g. figsize=(1, 8));
# a default horizontal colour bar there is squished with overlapping
# ticks. Use a thicker bar (low aspect), only a few ticks and a
# small font so it stays readable.
from matplotlib.ticker import MaxNLocator
cb = ax.figure.colorbar(sm, ax=ax, location="bottom", fraction=0.08,
pad=0.06, aspect=8)
cb.ax.xaxis.set_major_locator(MaxNLocator(nbins=3))
cb.ax.tick_params(labelsize=6)
cb.set_label(label, fontsize=7)
else:
ax.figure.colorbar(sm, ax=ax, label=label, shrink=0.8)
ax.set_title("Well (%.0f, %.0f)" % (x, y), fontsize=title_fontsize)
return ax
[docs]
def well_panel(data, locations, color_by="lithology", cmap=None, tstart=0.0,
dt=None, width=1.0, figsize=None, title_fontsize=None,
labels=None, sharey=True, vmin=None, vmax=None, ylim=None):
"""
Draw **several synthetic wells side by side on one figure**, with a shared
colour scale and a single colour bar at the base.
:arg locations: list of ``(x, y)`` well positions.
:arg labels: per-well titles; default ``"well 1"``, ``"well 2"``, … (pass a
list to name them, one entry per location).
:arg color_by: layer property coloured in every well (shared scale).
:arg vmin / vmax: colour-scale limits; default is the field's finite range
across all the wells.
:arg ylim: elevation range ``(min, max)`` for every well; default is the
**union of the wells' elevation bounds** (basement of the deepest to
surface of the highest), so the panel spans the full range between the
defined wells.
:arg sharey: share the elevation axis across wells (default True).
:return: ``(fig, axes)``.
"""
import matplotlib.pyplot as plt
from matplotlib import cm, colors
dt = _effective_dt(data, tstart, dt)
cfld, dcmap, label = color_field(data, color_by, tstart, dt)
cmp = plt.get_cmap(cmap or dcmap)
xs = np.array([p[0] for p in locations], dtype=float)
ys = np.array([p[1] for p in locations], dtype=float)
# Shared colour scale: the field's finite range across all wells, unless the
# caller pins vmin/vmax.
if vmin is None or vmax is None:
vals = _interp_layers(data, cfld, xs, ys) # (nwell, L)
fin = vals[np.isfinite(vals)]
gmin, gmax = (float(fin.min()), float(fin.max())) if fin.size else (0.0, 1.0)
vmin = gmin if vmin is None else vmin
vmax = gmax if vmax is None else vmax
# Default elevation range = union of all wells' interface elevations, so
# every column is shown over the full elevation span between the wells.
if ylim is None:
ifc = _interp_layers(data, data["interfaces"], xs, ys) # (nwell, L+1)
if np.isfinite(ifc).any():
ylim = (float(np.nanmin(ifc)), float(np.nanmax(ifc)))
n = len(locations)
fig, axes = plt.subplots(1, n, figsize=figsize or (1.4 * n + 0.6, 7),
sharey=sharey, squeeze=False)
axes = axes[0]
for i, (wx, wy) in enumerate(locations):
synthetic_well(data, wx, wy, color_by=color_by, cmap=cmap, ax=axes[i],
tstart=tstart, dt=dt, width=width, vmin=vmin, vmax=vmax,
colorbar=False, title_fontsize=title_fontsize)
# Default title "well N"; `labels` overrides (overwrites the per-well
# "Well (x, y)" set by synthetic_well).
ttl = labels[i] if labels is not None else "well %d" % (i + 1)
axes[i].set_title(ttl, fontsize=title_fontsize)
if ylim is not None:
axes[i].set_ylim(*ylim)
if i > 0 and sharey:
axes[i].set_ylabel("")
# one shared horizontal colour bar under all the wells
sm = cm.ScalarMappable(norm=colors.Normalize(vmin, vmax), cmap=cmp)
fig.colorbar(sm, ax=list(axes), location="bottom", fraction=0.05, pad=0.08,
aspect=40, label=label)
return fig, axes
[docs]
def wheeler(data, kind="x", at=None, path=None, color_by="thickness",
npts=None, tstart=0.0, dt=None, sea_level=None, cmap=None, ax=None,
figsize=None, facies_depths=_FACIES_DEPTHS,
facies_colors=_FACIES_COLORS, facies_labels=_FACIES_LABELS,
legend_loc="upper right", title_fontsize=None, xlim=None,
ylim=None):
"""
Wheeler (chronostratigraphic) diagram along a transect: distance (x) vs
deposition **time** / layer (y), coloured by ``color_by``; **hiatuses and
erosion are blank** (layers with zero preserved thickness along the
transect).
The **shoreline trajectory** is overlaid — the locus where the deposition
water depth (``sea_level − stratZ``) crosses 0 (the subaerial↔marine
boundary) — showing transgression/regression through time. ``sea_level``
defaults to the value read from the simulation for this step
(``data["sea_level"]``); pass a float, or a **per-layer array** (length
``nlayers``) for a time-varying paleo sea level, or it is omitted if the run
did not record one.
``color_by="facies"`` colours each layer by the depositional facies (water
depth at deposition; see :func:`facies_field`) with a discrete legend —
consistent with the shoreline overlay. ``figsize`` and the ``facies_*`` lists
are tunable.
"""
import matplotlib.pyplot as plt
if sea_level is None:
sea_level = data.get("sea_level")
dt = _effective_dt(data, tstart, dt)
facies = color_by == "facies"
xs, ys, dist = _transect(data, kind, at, path, npts)
thick = _interp_layers(data, data["stratH"], xs, ys) # (npts, L)
if facies:
cfld = facies_field(data, sea_level, facies_depths)
label = _LABEL["facies"]
else:
cfld, dcmap, label = color_field(data, color_by, tstart, dt)
cvals = _interp_layers(data, cfld, xs, ys) # (npts, L)
C = np.where(np.nan_to_num(thick) > 1.0e-6, cvals, np.nan) # gap = hiatus
times = tstart + (data["lo"] + np.arange(data["nlayers"] + 1)) * dt
X = np.broadcast_to(dist, (times.size, dist.size))
Y = np.broadcast_to(times[:, None], X.shape)
if ax is None:
_, ax = plt.subplots(figsize=figsize or (9, 4))
if facies:
fcmap, norm, fcolors, nfac = _facies_cmap_norm(facies_depths, facies_colors)
Cf = np.ma.masked_invalid(np.round(C.T[:, :-1]))
mesh = ax.pcolormesh(X, Y, Cf, cmap=fcmap, norm=norm, shading="flat")
else:
mesh = ax.pcolormesh(X, Y, np.ma.masked_invalid(C.T[:, :-1]),
cmap=cmap or dcmap, shading="flat")
ax.figure.colorbar(mesh, ax=ax, label=label, shrink=0.8)
shoreline = None
if sea_level is not None:
# Shoreline position at EVERY time step: the distance along the transect
# where the paleo-surface crosses sea level for each layer. Computed from
# the RAW deposition elevation `stratZ` (recorded for every node and
# layer, even where nothing was deposited), so it is drawn for every step
# — not only where facies/deposits are present. One position per layer is
# connected into a shoreline trajectory; it breaks only for steps whose
# shoreline falls outside the transect.
from matplotlib.lines import Line2D
sZ = _interp_layers(data, data["stratZ"], xs, ys) # (npts, L) raw
sl = np.asarray(sea_level, dtype=float)
depth = (sl[None, :] if sl.ndim == 1 else sl) - sZ # >0 below sea
tmid = 0.5 * (times[:-1] + times[1:]) # layer centres
shore = np.full(data["nlayers"], np.nan)
for k in range(data["nlayers"]):
dk = depth[:, k]
ok = np.isfinite(dk)
if ok.sum() < 2:
continue
di, xi = dk[ok], dist[ok]
sub = di < 0.0 # subaerial side
cr = np.where(sub[:-1] & ~sub[1:])[0] # land -> sea
if cr.size == 0:
cr = np.where(np.diff(np.sign(di)) != 0)[0] # any crossing
if cr.size:
i = cr[0]
d0, d1 = di[i], di[i + 1]
f = d0 / (d0 - d1) if d1 != d0 else 0.0
shore[k] = xi[i] + f * (xi[i + 1] - xi[i])
ax.plot(shore, tmid, color="black", lw=2.0, zorder=6)
shoreline = Line2D([], [], color="black", lw=2.0, label="shoreline")
# Legend: facies patches (+ shoreline) for facies mode, else just the
# shoreline. `legend_loc` is user-positionable.
if facies:
_facies_legend(ax, fcolors, facies_labels, nfac, loc=legend_loc,
extra=[shoreline] if shoreline is not None else None)
elif shoreline is not None:
ax.legend(handles=[shoreline], loc=legend_loc, fontsize=8)
if xlim is not None:
ax.set_xlim(*xlim)
if ylim is not None:
ax.set_ylim(*ylim)
ux = _km(ax, "x", data["geographic"])
ut = _ky(ax, "y")
ax.set_xlabel("distance along %s (%s)" % ("path" if path is not None else kind, ux))
ax.set_ylabel("deposition time (%s)" % ut)
ax.set_title("Wheeler diagram (%s; gaps = hiatus/erosion)" % color_by,
fontsize=title_fontsize)
return ax
def main(argv=None):
p = argparse.ArgumentParser(
description="Stratigraphic sections / wells / Wheeler diagrams from a "
"goSPL run (matplotlib, vector output).")
p.add_argument("--h5dir", required=True, help="goSPL output 'h5' directory")
p.add_argument("--mesh", required=True,
help="global mesh FILE.npz[:vkey[:ckey]] (default keys v,c)")
p.add_argument("--kind", default="cross",
choices=["cross", "slice", "well", "wheeler"])
p.add_argument("--out", default="section.pdf", help="output figure (vector)")
p.add_argument("--step", type=int, default=None)
p.add_argument("--color-by", default="lithology", choices=list(_COLOR_FIELDS))
p.add_argument("--along", default="x", help="cross/wheeler: 'x', 'y'")
p.add_argument("--at", type=float, default=None, help="transect position")
p.add_argument("--path", default=None,
help="x0,y0;x1,y1;... polyline (overrides --along)")
p.add_argument("--z", type=float, default=None, help="slice: elevation")
p.add_argument("--xy", default=None, help="well: 'x,y'")
p.add_argument("--vexag", type=float, default=1.0,
help="vertical exaggeration (true data aspect; labels stay "
"true elevation). 1 = auto-scaled")
p.add_argument("--figsize", default=None,
help="figure size 'W,H' in inches (default 9,4)")
p.add_argument("--layer-lines", type=int, default=0,
help="cross: overlay a thin interface line every N layers "
"(0 = off)")
p.add_argument("--facies-depths", default=None,
help="facies bin edges (m below sea level), comma list "
"(default 0,20,50,75)")
p.add_argument("--facies-colors", default=None,
help="facies colours, comma list (default "
"limegreen,darkkhaki,sandybrown,khaki,c,teal)")
p.add_argument("--sea-level", type=float, default=None,
help="sea-level datum for the shoreline/section line and the "
"facies depth reference; default reads it from the "
"simulation (the step's xmf)")
p.add_argument("--strat-dt", type=float, default=None,
help="layer interval (yr); default derives it from the "
"step's stratal display time (the .xmf Time)")
p.add_argument("--tstart", type=float, default=0.0)
p.add_argument("--legend-loc", default=None,
help="legend position (matplotlib loc, e.g. 'upper right', "
"'lower left')")
p.add_argument("--xlim", default=None,
help="cross/wheeler: horizontal distance range 'MIN,MAX' "
"(same units as the transect, m; default: full extent)")
p.add_argument("--ylim", default=None,
help="cross: elevation range 'MIN,MAX' (m); wheeler: time "
"range (yr). Pinning xlim/ylim leaves the aspect auto")
p.add_argument("--title-fontsize", type=float, default=None,
help="title font size")
p.add_argument("--tight", action="store_true",
help="crop the saved figure to its content (bbox_inches="
"'tight'); default keeps the requested --figsize")
p.add_argument("--file-base", default="gospl")
args = p.parse_args(argv)
figsize = None
if args.figsize:
figsize = tuple(float(v) for v in args.figsize.split(","))
fac_depths = _FACIES_DEPTHS
if args.facies_depths:
fac_depths = tuple(float(v) for v in args.facies_depths.split(","))
fac_colors = _FACIES_COLORS
if args.facies_colors:
fac_colors = tuple(args.facies_colors.split(","))
xlim = None
if args.xlim:
xlim = tuple(float(v) for v in args.xlim.split(","))
ylim = None
if args.ylim:
ylim = tuple(float(v) for v in args.ylim.split(","))
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
parts = args.mesh.split(":")
mesh, vkey, ckey = (parts + ["v", "c"])[:1] + \
[parts[1] if len(parts) > 1 else "v", parts[2] if len(parts) > 2 else "c"]
data = load_strata(args.h5dir, mesh, step=args.step, vkey=vkey, ckey=ckey,
file_base=args.file_base)
path = None
if args.path:
path = [[float(v) for v in pt.split(",")] for pt in args.path.split(";")]
loc = args.legend_loc
tf = args.title_fontsize
if args.kind == "cross":
ax = cross_section(data, kind=args.along, at=args.at, path=path,
color_by=args.color_by, vexag=args.vexag,
sea_level=args.sea_level, tstart=args.tstart,
dt=args.strat_dt, figsize=figsize,
layer_lines=args.layer_lines,
facies_depths=fac_depths, facies_colors=fac_colors,
legend_loc=loc or "lower left", title_fontsize=tf,
xlim=xlim, ylim=ylim)
elif args.kind == "slice":
if args.z is None:
raise SystemExit("--z is required for --kind slice")
ax = horizontal_slice(data, args.z, color_by=args.color_by,
tstart=args.tstart, dt=args.strat_dt,
figsize=figsize, title_fontsize=tf)
elif args.kind == "well":
if not args.xy:
raise SystemExit("--xy 'x,y' is required for --kind well")
wx, wy = (float(v) for v in args.xy.split(","))
ax = synthetic_well(data, wx, wy, color_by=args.color_by,
tstart=args.tstart, dt=args.strat_dt,
figsize=figsize, title_fontsize=tf)
else:
ax = wheeler(data, kind=args.along, at=args.at, path=path,
color_by=args.color_by, tstart=args.tstart, dt=args.strat_dt,
sea_level=args.sea_level, figsize=figsize,
facies_depths=fac_depths, facies_colors=fac_colors,
legend_loc=loc or "upper right", title_fontsize=tf,
xlim=xlim, ylim=ylim)
# Keep the requested --figsize by default; --tight crops to content.
ax.figure.savefig(args.out, dpi=200,
bbox_inches="tight" if args.tight else None)
print("wrote %s (%s; %d layers)" % (args.out, args.kind, data["nlayers"]))
return 0
if __name__ == "__main__":
raise SystemExit(main())