Source code for pyrtid.regularization.geostatistical
"""
Provide classes and functions for geostatistical regularization.
TODO: add the formulas.
"""
import numpy as np
from pyrtid.regularization.base import Regularizator
from pyrtid.regularization.covariances import CovarianceMatrix
from pyrtid.regularization.priors import NullPriorTerm, PriorTerm
from pyrtid.utils import NDArrayFloat
from pyrtid.utils.finite_differences import finite_gradient
from pyrtid.utils.preconditioner import NoTransform, Preconditioner
def identify_function(x: NDArrayFloat) -> NDArrayFloat:
"""Return x untransformed (f(x) = x)."""
return x
def one(x: NDArrayFloat) -> NDArrayFloat:
"""Return 1.0, whatever the input."""
return np.ones(x.shape)
[docs]class GeostatisticalRegularizator(Regularizator):
"""
Implement a regularization based on the parameter covariance matrix.
Attributes
----------
preconditioner: Preconditioner
Parameter pre-transformation operator (variable change for the solver).
The default is the identity function: f(x) = x, which means no change
is made.
"""
__slots__ = ["cov_m", "prior"]
[docs] def __init__(
self,
cov_m: CovarianceMatrix,
prior: PriorTerm = NullPriorTerm(),
preconditioner: Preconditioner = NoTransform(),
) -> None:
"""
Initialize the instance.
Parameters
----------
cov_m : CovarianceMatrix
_description_
prior : Optional[PriorTerm], optional
A prior term for `(x - prior_term)`, by default NullPriorTerm.
transform: Callable, optional
Parameter pre-transformation (variable change for the solver). The default
is the identity function: f(x) = x.
transform_1st_derivative: Callable, optional
Parameter pre-transformation first order derivative.
The default is 1.0 (the first derivative of the identity function).
preconditioner: Preconditioner
Parameter pre-transformation operator (variable change for the solver).
The default is the identity function: f(x) = x, which means no change
is made.
"""
super().__init__(preconditioner)
self.cov_m: CovarianceMatrix = cov_m
self.prior: PriorTerm = prior
[docs] def _eval_loss(self, values: NDArrayFloat) -> float:
r"""
Compute the gradient of the regularization loss function analytically.
.. math::
\mathcal{R}_{Q}(u) = \frac{1}{2} \left(s-Xb\right)^TQ^{-1}\left(s-Xb\right)
Parameters
----------
values : NDArrayFloat
Values of the parameter for which the regularization is computed.
Should be 2D array / 1d vector.
Returns
-------
NDArrayFloat
The regularization gradient.
"""
_values = values
residuals: NDArrayFloat = _values - self.prior.get_values(_values)
return float(
0.5
* np.dot(
residuals.T,
self.cov_m.solve(residuals),
).item()
)
[docs] def _eval_loss_gradient_analytical(self, values: NDArrayFloat) -> NDArrayFloat:
"""
Compute the gradient of the regularization loss function analytically.
Parameters
----------
values : NDArrayFloat
Values of the parameter for which the regularization is computed (2d).
Returns
-------
NDArrayFloat
The regularization gradient (2d).
"""
_values = values
residuals: NDArrayFloat = _values - self.prior.get_values(_values)
# right part $Q^{-1} * (m - m_{prior})$
_right_part = self.cov_m.solve(residuals).ravel()
# left part gradient -> special method to get more efficient
# $ [I - dm_{prior}/dm]^{T} Q^{-1} (m - m_{prior})$
return _right_part - self.prior.get_gradient_dot_product(_right_part)
[docs]class EnsembleRegularizator(GeostatisticalRegularizator):
"""
Implement a regularization based on an ensemble.
Compared to a classic regularization, an ensemble is passed to the functions.
TODO: here add the objective function of the regularization terM.
Attributes
----------
preconditioner: Preconditioner
Parameter pre-transformation operator (variable change for the solver).
The default is the identity function: f(x) = x, which means no change
is made.
"""
[docs] def eval_loss(self, ens: NDArrayFloat) -> float:
r"""
TODO: update this.
Compute the gradient of the regularization loss function analytically.
.. math::
\mathcal{R}_{Q}(u) = \frac{1}{2} \left(s-Xb\right)^TQ^{-1}\left(s-Xb\right)
Parameters
----------
values : NDArrayFloat
Values of the parameter for which the regularization is computed.
Should be 2D array / 1d vector.
Returns
-------
NDArrayFloat
The regularization gradient.
"""
if not ens.ndim == 2:
raise ValueError(
"The 'EnsembleRegularizator.eval_loss' method expects a 2D vector!"
)
_values = ens
residuals: NDArrayFloat = _values - self.prior.get_values(_values)
# residuals = - self.prior.get_values(_values)
# And this is strictly equivalent (element wise multiplication)
return (
0.5 * float(np.sum(residuals * self.cov_m.solve(residuals))) / ens.shape[1] # type: ignore
)
[docs] def eval_loss_gradient_analytical(self, ens: NDArrayFloat) -> NDArrayFloat:
"""
Compute the gradient of the regularization loss function analytically.
Parameters
----------
ens : NDArrayFloat
Ensemble of shape (N_s, Ne). N_s being the number of optimized values,
Ne, the number of members in the ensemble.
Returns
-------
NDArrayFloat
The regularization gradient (2d).
"""
if not ens.ndim == 2:
raise ValueError(
"The 'EnsembleRegularizator.eval_loss_gradient_analytical' "
"method expects a 2D vector!"
)
_values = ens
residuals: NDArrayFloat = _values - self.prior.get_values(_values)
# residuals = _values * 0.0 - self.prior.get_values(_values)
# right part $Q^{-1} * (m - m_{prior})$
_right_part = self.cov_m.solve(residuals)
# We should have the same shape
assert _right_part.shape == ens.shape
# TODO: here we considered than the derivative of the covariance matrix w.r.t.
# the parameters is null, but that is not necessary the case all the time.
# left part gradient -> special method to get more efficient
# $ [I - dm_{prior}/dm]^{T} Q^{-1} (m - m_{prior})$
# Note: dot product must be distributed
# The mean operator comes from the fact that a member j is involved
# in all derivations of the mean operator.
return (
_right_part
- np.mean(
self.prior.get_gradient_dot_product(_right_part),
axis=1,
keepdims=True,
)
) / ens.shape[1]
[docs] def eval_loss_gradient(
self,
ens: NDArrayFloat,
is_finite_differences: bool = False,
max_workers: int = 1,
) -> NDArrayFloat:
"""
Compute the gradient of the regularization loss function.
Parameters
----------
values : NDArrayFloat
The parameter for which the regularization is computed.
is_finite_differences: bool
If true, a numerical approximation by 2nd order finite difference is
returned. Cost twice the `values` dimensions in terms of loss function
calls. The default is False.
max_workers: int
Number of workers used if the gradient is approximated by finite
differences. If different from one, the calculation relies on
multi-processing to decrease the computation time. The default is 1.
Returns
-------
NDArrayFloat
The regularization gradient (not preconditioned).
"""
if not ens.ndim == 2:
raise ValueError(
"The 'EnsembleRegularizator.eval_loss_gradient_analytical' "
"method expects a 2D vector!"
)
if is_finite_differences:
return finite_gradient(ens, self.eval_loss, max_workers=max_workers)
else:
return self.preconditioner.dtransform_vec(
ens,
self.eval_loss_gradient_analytical(self.preconditioner(ens)),
)
# def compute_best_beta(
# values: NDArrayFloat, cov_m: CovarianceMatrix, drift_matrix: DriftMatrix
# ) -> NDArrayFloat:
# """
# Compute the optimal beta (minimal objective function).
# TODO: Add the maths here.
# Parameters
# ----------
# values : NDArrayFloat
# Values of the parameter for which the regularization is computed.
# Should be 2D array / 1d vector.
# cov_m : CovarianceMatrix
# The covariance matrix.
# drift_matrix : DriftMatrix
# The drift matrix instance for which to compute beta.
# Returns
# -------
# NDArrayFloat
# The best beta.
# """
# # This is valid for the linear one only.
# invQs = cov_m.solve(values)
# invQX = cov_m.solve(drift_matrix.mat)
# XTinvQs = np.dot(drift_matrix.mat.T, invQs)
# XTinvQX = np.dot(drift_matrix.mat.T, invQX)
# # inexpensive solve p by p where p <= 3, usually p = 1 (scalar division)
# return np.linalg.solve(np.atleast_2d(XTinvQX), XTinvQs).ravel()