"""Provide a reactive transport solver."""
from __future__ import annotations
from typing import Callable, Optional
import numpy as np
from scipy.optimize import OptimizeResult
from pyrtid.forward.geochem_utils import (
get_polish,
newton,
solve_with_svd,
standalone_linesearch,
)
from pyrtid.forward.models import (
ConstantConcentration,
GeochemicalParameters,
TimeParameters,
TransportModel,
)
from pyrtid.utils import NDArrayFloat, RectilinearGrid
from pyrtid.utils.preconditioner import NoTransform, Preconditioner
def solve_geochem(
grid: RectilinearGrid,
tr_model: TransportModel,
gch_params: GeochemicalParameters,
time_params: TimeParameters,
time_index: int,
) -> None:
r"""
Compute the mineral dissolution.
The equation reads:
.. math::
\overline{c}_{i}^{n+1} = \overline{c}_{i}^{n} + \Delta t^{n} k_{v} A_{s}
\overline{c}_{i}^{n} \left( 1 - \dfrac{c_{i}^{n+1}}{K_{s}}\right)
"""
if gch_params.use_explicit_formulation:
solve_geochem_explicit(tr_model, gch_params, time_params, time_index)
else:
solve_geochem_implicit(grid, tr_model, gch_params, time_params, time_index)
[docs]def solve_geochem_explicit(
tr_model: TransportModel,
gch_params: GeochemicalParameters,
time_params: TimeParameters,
time_index: int,
) -> None:
r"""
Compute the mineral dissolution.
The equation reads:
.. math::
\overline{c}_{i}^{n+1} = \overline{c}_{i}^{n} + \Delta t^{n} k_{v} A_{s}
\overline{c}_{i}^{n} \left( 1 - \dfrac{c_{i}^{n+1}}{K_{s}}\right)
"""
immob1 = tr_model.limmob[time_index - 1][0]
immob2 = tr_model.limmob[time_index - 1][1]
# The mobile concentration is from the transport
dM = get_dM(tr_model, gch_params, time_index, time_params.dt)
for condition in tr_model.boundary_conditions:
if isinstance(condition, ConstantConcentration):
dM[condition.span] = 0.0
# elif isinstance(condition, ZeroConcGradient):
assert np.count_nonzero(immob1 + dM < 0) == 0
# overwrite the grade for species 1
tr_model.limmob[time_index][0, :, :] = immob1 + dM
# And for species 2 -> species being consumed
tr_model.limmob[time_index][1, :, :] = immob2 - gch_params.stocoef * dM
def get_dM(
tr_model: TransportModel,
gch_params: GeochemicalParameters,
time_index: int,
dt: float,
) -> NDArrayFloat:
mob1 = tr_model.lmob[time_index][0]
mob2 = tr_model.lmob[time_index][1]
immob1 = tr_model.limmob[time_index - 1][0]
dM = -np.min(
np.array(
[
(
-dt
* gch_params.kv
* gch_params.As
* immob1
* (1 - mob1 / gch_params.Ks)
* mob2
),
immob1,
mob2 / gch_params.stocoef,
]
),
axis=0,
)
# Handle special cases: because there might be some negative values in the
# transport because of the semi-implicit time scheme for advection
mask = (1 - mob1 / gch_params.Ks) <= 0.0 # (1 - mob1 / Ks) positive: precipitation
dM[mask] = 0.0
mask = (1 - mob1 / gch_params.Ks) > 1.0 # (1 - mob1 / Ks) positive: precipitation
dM[mask] = 0.0
mask = mob2 <= 0.0
dM[mask] = 0.0
return dM
def get_dM_pos(
tr_model: TransportModel,
gch_params: GeochemicalParameters,
time_index: int,
dt: float,
) -> NDArrayFloat:
mob1 = tr_model.lmob[time_index][0]
mob2 = tr_model.lmob[time_index][1]
immob1 = tr_model.limmob[time_index - 1][0]
dM_pos = np.argmin(
np.array(
[
(
-dt
* gch_params.kv
* gch_params.As
* immob1
* (1 - mob1 / gch_params.Ks)
* mob2
),
immob1,
mob2 / gch_params.stocoef,
]
),
axis=0,
)
return dM_pos
[docs]def solve_geochem_implicit(
grid: RectilinearGrid,
tr_model: TransportModel,
gch_params: GeochemicalParameters,
time_params: TimeParameters,
time_index: int,
) -> None:
r"""
Compute the mineral dissolution.
The equation reads:
.. math::
\overline{c}_{i}^{n+1} = \overline{c}_{i}^{n} + \Delta t^{n} k_{v} A_{s}
\overline{c}_{i}^{n} \left( 1 - \dfrac{c_{i}^{n+1}}{K_{s}}\right)
"""
# Mineral grades
immob_next = tr_model.limmob[time_index]
immob_prev = tr_model.limmob[time_index - 1]
# Mobile concentrations
mob_next = tr_model.lmob[time_index]
mob_prev = tr_model.lmob[time_index - 1]
max_chem_iter = 0
# Loop over x, y, z = > this is not very efficient and we should find a way to
# parallelize this or vectorize it at least
for i in range(grid.nx):
for j in range(grid.ny):
for k in range(grid.nz):
print(i, j, k)
# Newton-Raphson to solve the geochemical system
opt_res = solve_geochem_system(
mob_next[:, i, j, k],
immob_next[:, i, j, k],
mob_prev[:, i, j, k],
immob_prev[:, i, j, k],
atol=1e-20,
gch_params=gch_params,
is_use_svd=True,
is_use_ln=True,
dt=time_params.dt,
)
tr_model.lmob[time_index][:, i, j, k] = opt_res.x[:2]
tr_model.limmob[time_index][:, i, j, k] = opt_res.x[2:]
max_chem_iter = max(max_chem_iter, opt_res.nfev)
print(max_chem_iter)
assert np.count_nonzero(mob_next < 0) == 0
assert np.count_nonzero(immob_next < 0) == 0
def get_phi(
mob_next: NDArrayFloat, immob_prev: NDArrayFloat, gch_params: GeochemicalParameters
) -> float:
return (
gch_params.kv
* gch_params.As
* immob_prev[0]
* mob_next[1]
* (1 - mob_next[0] / gch_params.Ks)
)
# Define the vector function P(C)
def F(
mob_next: NDArrayFloat,
immob_next: NDArrayFloat,
mob_prev: NDArrayFloat,
immob_prev: NDArrayFloat,
gch_params: GeochemicalParameters,
dt: float,
) -> NDArrayFloat:
# mass conservation for species 0 and species 1
P1, P2 = immob_next + mob_next - immob_prev - mob_prev
# mass conservation for species 1
# P2 = immob_next[2] + C_np1[3] - C_n[2] - C_n[3]
# kinetics
phi = get_phi(mob_next, immob_prev, gch_params)
# Change for species 0
P3 = immob_next[0] - immob_prev[0] - dt * phi
# change for species 1
P4 = immob_next[1] - immob_prev[1] + dt * gch_params.stocoef * phi
return np.array([P1, P2, P3, P4])
# Define the Jacobian matrix of P(C)
def Jacobian(
mob_next: NDArrayFloat,
immob_next: NDArrayFloat,
mob_prev: NDArrayFloat,
immob_prev: NDArrayFloat,
gch_params: GeochemicalParameters,
dt: float,
) -> NDArrayFloat:
# Initialise a matrix for the Jacobian
J = np.zeros((4, 4))
dtKvAs = dt * gch_params.kv * gch_params.As
# Partial derivatives of P1 with respect to (mob_next, immob_next)
J[0, 0] = 1.0
J[0, 1] = 0.0
J[0, 2] = 1.0
J[0, 3] = 0.0
# Partial derivatives of P2 with respect to (mob_next, immob_next)
J[1, 0] = 0.0
J[1, 1] = 1.0
J[1, 2] = 0.0
J[1, 3] = 1.0
# Partial derivatives of P3 with respect to (mob_next, immob_next)
J[2, 0] = dtKvAs * mob_next[1] * immob_prev[0] / gch_params.Ks
J[2, 1] = -dtKvAs * immob_prev[0] * (1 - mob_next[0] / gch_params.Ks)
J[2, 2] = 1.0
J[2, 3] = 0.0
# Partial derivatives of P4 with respect to (mob_next, immob_next)
J[3, 0] = -gch_params.stocoef * dtKvAs * mob_next[1] * immob_prev[0] / gch_params.Ks
J[3, 1] = (
+dtKvAs
* gch_params.stocoef
* (immob_prev[0] * (1 - mob_next[0] / gch_params.Ks))
)
J[3, 2] = 0.0
J[3, 3] = 1.0
return J
def solve_geochem_system(
mob_next: NDArrayFloat,
immob_next: NDArrayFloat,
mob_prev: NDArrayFloat,
immob_prev: NDArrayFloat,
gch_params: GeochemicalParameters,
dt: float,
atol: float = 1e-15,
is_use_svd: bool = False,
is_use_ln: bool = False,
is_use_polish: bool = False,
pcd: Preconditioner = NoTransform(),
) -> OptimizeResult:
# change the variable
c0 = np.hstack([mob_next, immob_next])
x0 = pcd(c0)
def F_wrapper(_C) -> NDArrayFloat:
return F(_C[0:2], _C[2:], mob_prev, immob_prev, gch_params, dt)
def jac_wrapper(_C) -> NDArrayFloat:
return Jacobian(_C[0:2], _C[2:], mob_prev, immob_prev, gch_params, dt)
class FunctionsWrapper:
"""Wrapper to keep track of the function calls."""
def __init__(self, x0: NDArrayFloat) -> None:
"""Initialize the instance"""
# comptors for the number of residuals evaluation and the number
# of jacobian evaluations
self.nfev = 0
self.njev = 0
self.nhev = 0
self.x = x0
self.res_updated = False
self.jac_updated = False
self.invjacres_updated = False
self.residuals = self.get_residuals(self.x)
self.jac = self.get_jac(self.x)
self.invjacres = self.get_invjacres(self.x)
def get_residuals(self, x: NDArrayFloat) -> NDArrayFloat:
"""Get the residuals vector."""
if not np.array_equal(x, self.x):
self.update_x(x)
if self.res_updated:
# no need to update the residuals since x has not changed
return self.residuals
self.nfev += 1
self.residuals = F_wrapper(pcd.backtransform(x))
# print(self.residuals)
self.res_updated = True
return self.residuals
def get_jac(self, x) -> NDArrayFloat:
"""
Get the jacobian matrix of the residuals (for the preconditioned variable).
"""
if not np.array_equal(x, self.x):
self.update_x(x)
if self.jac_updated:
# no need to update the residuals since x has not changed
return self.jac
self.jac = jac_wrapper(pcd.backtransform(x))
self.njev += 1
self.jac_updated = True
return self.jac
def get_invjacres(self, x: NDArrayFloat) -> NDArrayFloat:
"""
Get the inverse of the jacobian matrix of the residuals times the residuals.
This is for the non-preconditioned x.
"""
if not np.array_equal(x, self.x):
self.update_x(x)
if self.invjacres_updated:
# no need to update the residuals since x has not changed
return self.invjacres
# we do not check the update of x because it is done in self.get_jac(x)
# 1) Compute the Jacobian inverse time the input vector
if is_use_svd:
self.invjacres = solve_with_svd(self.get_jac(x), self.get_residuals(x))
else:
self.invjacres = np.linalg.inv(self.get_jac(x)).dot(
self.get_residuals(x)
)
self.nhev += 1
self.invjacres_updated = True
return self.invjacres
def get_invjacvres_pcd(self, x: NDArrayFloat) -> NDArrayFloat:
"""
Get the inverse of the jacobian matrix of the residuals times the residuals.
This is for the preconditioned x.
"""
# Apply the preconditioner
return pcd.dtransform_vec(pcd.backtransform(x), self.get_invjacres(x))
def update_x(self, x: NDArrayFloat) -> None:
"""Update the stored x
Note
----
x is preconditioned.
"""
# ensure that self.x is a copy of x. Don't store a reference
# otherwise the memoization doesn't work properly.
self.x = np.atleast_1d(x).astype(float)
self.res_updated = False
self.jac_updated = False
self.invjacres_updated = False
# create an instance of the function wrapper
# TODO: cache system for the residuals
fw = FunctionsWrapper(x0)
# define the linesearch or polishing.
def _ln_obj_func(x: NDArrayFloat) -> float:
"""SSE on residuals."""
return 0.5 * np.sum(fw.get_residuals(x) ** 2).item()
def _ln_obj_grad(x: NDArrayFloat) -> NDArrayFloat:
"""Gradient of the above loss function."""
return pcd.dbacktransform_vec(x, fw.get_jac(x).T @ fw.get_residuals(x))
def _linesearch_wrapper(
x: NDArrayFloat, dx: NDArrayFloat, n_iterations: int
) -> float:
# See 9.7.1 LineSearchesandBacktracking in W. H. Press and S. A. Teukolsky.
# Numerical Recipes 3rd Edition: The Art of Scientific Computing.
# Cambridge University Press, 2007. isbn: 978-0-521-880688.
# url: http://nrbook.com.
# It is the same algorithm used in B.2. Line search strategy from Numerical
# Recipes. The thesis manuscript of Maxime Jonval.
_alpha, nfev, ngev, fun, f0, grad = standalone_linesearch(
x0=x,
fun=_ln_obj_func,
grad=_ln_obj_grad,
d=dx,
opt_iter=n_iterations,
)
if _alpha is not None:
alpha = _alpha
else:
alpha = 1.0
print(f"alpha = {alpha}")
print(f"fun = {fun}")
print(f"f0 = {f0}")
return alpha
def _get_polish_wrapper(x: NDArrayFloat, *args) -> float:
alpha = get_polish(fw.get_invjacres(x), pcd.backtransform(x))
print(f"alpha = {alpha}")
return alpha
def _get_linesearch() -> Optional[Callable]:
if is_use_ln:
return _linesearch_wrapper
elif is_use_polish:
return _get_polish_wrapper
return None
# call newton
opt_res = newton(
x0, fw.get_residuals, fw.get_invjacvres_pcd, atol, linesearch=_get_linesearch()
)
# apply the preconditionning
opt_res.x = pcd.backtransform(opt_res.x)
# compute the residuals
opt_res.nfev = fw.nfev
opt_res.njev = fw.njev
opt_res.hjev = fw.nhev
return opt_res