"""Provide a representation of observables."""
from __future__ import annotations
import json
from typing import Optional, Sequence, Tuple, Union
import numpy as np
from pyrtid.forward import ForwardModel
from pyrtid.utils import (
Int,
NDArrayFloat,
NDArrayInt,
node_number_to_indices,
object_or_object_sequence_to_list,
)
from pyrtid.utils.enum import StrEnum
from pyrtid.utils.means import (
MeanType,
get_mean_values_for_last_axis,
get_mean_values_gradient_for_last_axis,
)
[docs]class StateVariable(StrEnum):
"""Type of observable existing."""
CONCENTRATION = "concentration"
DENSITY = "density"
DIFFUSION = "diffusion"
HEAD = "head"
GRADE = "grade"
PERMEABILITY = "permeability"
POROSITY = "porosity"
PRESSURE = "pressure"
STORAGE_COEFFICIENT = "storage_coefficient"
DISPERSIVITY = "dispersivity"
[docs]class Observable:
"""
Class representing observations data within time at a defined location.
Note
----
It is fine if the times are not sorted in increasing order.
Attributes
----------
state_variable: StateVariable
Name of the state variable or the parameter being observed.
node_indices: NDArrayInt
Node indices to locate of the observation in the grid.
times: NDArrayFloat
Times matching the values.
values: NDArrayFloat
Observed values.
uncertainties: NDArrayFloat
Absolute uncertainties associated with the observed values.
mean_type: MeanType
Type of mean used to averaged the simulated values when the observations
are performed over several grid cells.
perturbations: NDArrayFloat
Perturbations to add to the values when performing the inversion. This
is only useful for the ensemble method when the observed values
are perturbed with samples from N(0, R).
sp: Optional[int]
Index of the concentration being observed. Must be provided if a
concentration or a grade is observed.
"""
__slots__ = [
"state_variable",
"node_indices",
"times",
"_values",
"uncertainties",
"_mean_type",
"perturbations",
"sp",
]
[docs] def __init__(
self,
state_variable: StateVariable,
node_indices: Int,
times: NDArrayFloat,
values: NDArrayFloat,
uncertainties: Optional[Union[float, NDArrayFloat]] = None,
mean_type: Optional[MeanType] = None,
sp: Optional[int] = None,
) -> None:
"""
Initiate the instance.
Warning
-------
For the control parameters such as porosity or permeability which are constant
in time, we consider that all observations are done at time 0, no matter
what is set by the user.
Parameters
----------
state_variable: StateVariable
Name of the state variable or the parameter being observed.
node_indices: Int
Location of the observation in the grid (indices from 0 to nx * ny - 1).
times: NDArrayFloat
Timesteps matching the values.
values: NDArrayFloat
Observed values.
uncertainties: Optional[Union[float, NDArrayFloat]]
Absolute uncertainties associated with the observed values.
mean_type: Optional[MeanType] = None
Type of mean used to averaged the simulated values when the observations
are performed over several grid cells. If None, an harmonic mean is used
for diffusion and hydraulic conductivity coefficients, otherwise,
an arithmetic mean is used.
The default is None.
"""
self.state_variable = state_variable
self.node_indices = np.sort(np.array(node_indices).ravel())
self.times = times.ravel()
self.set_values(values.ravel())
# perturbations must be initialized before any ".values" attribute call
self.perturbations = np.zeros(values.shape)
_uncertainties = (
np.array(uncertainties) if uncertainties is not None else np.array([])
).ravel()
if _uncertainties.size == 0:
self.uncertainties = np.ones(self.values.shape)
elif _uncertainties.size == 1:
self.uncertainties = np.ones(self.values.shape) * _uncertainties.ravel()[0]
elif _uncertainties.size != self.values.size:
raise ValueError(
"``uncertainties`` parameter should be a float value or a numpy "
"array with the same dimension as the ``values`` parameter."
)
else:
self.uncertainties = _uncertainties
if self.times.size != self.values.size:
raise ValueError(
"``times`` parameter should be a float value or a numpy "
"array with the same dimension as the ``values`` parameter."
)
self.mean_type = mean_type
if (
self.state_variable in [StateVariable.CONCENTRATION, StateVariable.GRADE]
and sp is None
):
raise ValueError(
"sp must be provided when observing grades or concentrations!"
)
# To avoid having None values for typing
if sp is not None:
self.sp: int = sp
else:
self.sp = 0
@property
def values(self) -> NDArrayFloat:
"""
Return the values plus the optional perturbations.
This is read-only. Use the method set_values to update the observations.
"""
return self._values + self.perturbations
[docs] def set_values(self, values: NDArrayFloat) -> None:
"""Set the observed values."""
self._values = values
@property
def mean_type(self) -> MeanType:
"""Return the mean type used to interpolate values over several grid cells."""
return self._mean_type
@mean_type.setter
def mean_type(self, value: Optional[MeanType]) -> None:
"""Set the mean type used to interpolate values over several grid cells."""
if value is None:
if self.state_variable in [
StateVariable.DIFFUSION,
StateVariable.PERMEABILITY,
]:
self._mean_type = MeanType.HARMONIC
else:
self._mean_type = MeanType.ARITHMETIC
else:
self._mean_type = value
def __str__(self) -> str:
"""Return a string representation of the instance."""
return json.dumps(
{
"state_variable": self.state_variable,
"node_indices": self.node_indices,
"nb_values": self.values.size,
"min_value": np.min(self.values),
"max_value": np.max(self.values),
"min_time": np.min(self.times),
"max_time": np.max(self.times),
"min_std": np.max(self.uncertainties),
"mean_type": self.mean_type,
},
indent=4,
sort_keys=False,
default=str,
).replace("null", "None")
[docs] def set_perturbations(self, pvals: NDArrayFloat) -> None:
"""Set the values perturbations (see ensemble smoothers)."""
if self.values.size != pvals.size:
raise ValueError(
"perturbations size should match observation values size !"
)
self.perturbations = pvals
# new type
Observables = Union[Observable, Sequence[Observable]]
[docs]def update_perturbation_values(observables: Observables, pvals: NDArrayFloat) -> None:
"""Update the perturbation values for the given observables instances."""
first_index = 0
for obs in object_or_object_sequence_to_list(observables):
last_index = first_index + obs.values.size
# Update perturbations
obs.perturbations = pvals[first_index:last_index]
def _get_obs_ascending_time_sorting_permutations(
obs: Observable, max_time: Optional[float] = None
) -> NDArrayInt:
"""
Get the permutations required to sort the observation time in ascending order.
Parameters
----------
obs : Observable
The observble instance.
max_time : Optional[float], optional
Maximum time value to consider, by default None
Returns
-------
NDArrayInt
The permutations as an array of int giving the new absolute positions.
"""
valid_indices = np.arange(obs.times.size)
if max_time is not None:
valid_indices = valid_indices[obs.times <= max_time]
sorted_indices = obs.times.argsort()
return sorted_indices[np.isin(sorted_indices, valid_indices)]
[docs]def get_sorted_observable_times(
obs: Observable, max_time: Optional[float] = None
) -> NDArrayFloat:
"""
Get the observation times sorted in ascending order.
Parameters
----------
obs : Observable
The observable instance.
max_time : Optional[float], optional
Maximum time value to consider, by default None
Returns
-------
NDArrayFloat
Observation times sorted in ascending order.
"""
return obs.times[
_get_obs_ascending_time_sorting_permutations(obs, max_time)
].flatten()
[docs]def get_sorted_observable_values(
obs: Observable, max_time: Optional[float] = None
) -> NDArrayFloat:
"""
Get the observation values sorted by ascending corresponding times.
Parameters
----------
obs : Observable
The observable instance.
max_time : Optional[float], optional
Maximum time value to consider, by default None
Returns
-------
NDArrayFloat
Observation values sorted by ascending corresponding times.
"""
return obs.values[
_get_obs_ascending_time_sorting_permutations(obs, max_time)
].flatten()
[docs]def get_sorted_observable_uncertainties(
obs: Observable, max_time: Optional[float] = None
) -> NDArrayFloat:
"""
Get the observation uncertainties sorted by ascending corresponding times.
Parameters
----------
obs : Observable
The observable instance.
max_time : Optional[float], optional
Maximum time value to consider, by default None
Returns
-------
NDArrayFloat
Observation uncertainties sorted by ascending corresponding times.
"""
return obs.uncertainties[
_get_obs_ascending_time_sorting_permutations(obs, max_time)
].flatten()
def get_array_from_state_variable(
model: ForwardModel, state_variable: StateVariable, sp: Optional[int] = None
) -> NDArrayFloat:
if state_variable == StateVariable.CONCENTRATION:
if sp is None:
raise ValueError("sp cannot be None for concentrations")
return model.tr_model.mob[sp]
if state_variable == StateVariable.HEAD:
return model.fl_model.head
if state_variable == StateVariable.PRESSURE:
return model.fl_model.pressure
if state_variable == StateVariable.GRADE:
if sp is None:
raise ValueError("sp cannot be None for grades")
return model.tr_model.immob[sp]
if state_variable == StateVariable.PERMEABILITY:
return model.fl_model.permeability
if state_variable == StateVariable.POROSITY:
return model.tr_model.porosity
if state_variable == StateVariable.DIFFUSION:
return model.tr_model.diffusion
if state_variable == StateVariable.DISPERSIVITY:
return model.tr_model.dispersivity
if state_variable == StateVariable.DENSITY:
return model.tr_model.density
if state_variable == StateVariable.STORAGE_COEFFICIENT:
return model.fl_model.storage_coefficient
else:
raise ValueError(
f'"{state_variable}" is not a valid state variable or parameter type!'
)
[docs]def get_observables_values_as_1d_vector(
observables: Observables,
max_obs_time: Optional[float] = None,
) -> NDArrayFloat:
"""
Return the values of all given observables as a 1D vector.
Note
----
The observation values are sorted first by Observable instance (observation
location) and by ascending time at a second level.
Parameters
----------
observables
Sequence of observable instances.
max_obs_time : Optional[float], optional
Maximum time for which to consider an observation value, by default None.
"""
return np.hstack(
[
get_sorted_observable_values(obs, max_obs_time)
for obs in object_or_object_sequence_to_list(observables)
]
).ravel()
[docs]def get_observables_uncertainties_as_1d_vector(
observables: Observables,
max_obs_time: Optional[float] = None,
) -> NDArrayFloat:
"""Return the uncertainties of all observables as a 1D vector.
Note
----
The observation values are sorted first by Observable instance (observation
location) and by ascending time at a second level.
Parameters
----------
observables
Sequence of observable instances.
max_obs_time : Optional[float], optional
Maximum time for which to consider an observation value, by default None.
"""
return np.hstack(
[
get_sorted_observable_uncertainties(obs, max_obs_time)
for obs in object_or_object_sequence_to_list(observables)
]
).ravel()
def get_times_idx_before_after_obs(
obs_times: NDArrayFloat, simu_times: NDArrayFloat
) -> Tuple[NDArrayInt, NDArrayInt]:
"""
Get the calculated times before and after the observation times.
Parameters
----------
obs_times : NDArrayFloat
Times of observated values.
simu_times : NDArrayFloat
Times of simulated values.
Returns
-------
Tuple[NDArrayFloat, NDArrayFloat]
Indices of simulated values coming after and before the observed values.
"""
simu_times_before_idx = []
simu_times_after_idx = []
idx = 0
for obs_time in obs_times:
while obs_time > simu_times[idx]:
idx += 1
simu_times_before_idx.append(idx - 1)
simu_times_after_idx.append(idx)
return np.array(simu_times_before_idx), np.array(simu_times_after_idx)
def get_weights(
obs_times: NDArrayFloat,
simu_times: NDArrayFloat,
before_idx: NDArrayInt,
after_idx: NDArrayInt,
) -> Tuple[NDArrayFloat, NDArrayFloat]:
"""
Get the weights to apply on the calculated values after and before observations.
Parameters
----------
obs_times : NDArrayFloat
Times of observated values.
simu_times : NDArrayFloat
Times of simulated values.
before_idx : NDArrayInt
Indices of simulated values coming before the observed values.
after_idx : NDArrayInt
Indices of simulated values coming after the observed values.
Returns
-------
Tuple[NDArrayFloat, NDArrayFloat]
Weights to apply on simulated values before and after observations.
"""
den = simu_times[after_idx] - simu_times[before_idx]
_den = den.copy()
_den[den == 0] = 1.0
weights_before = 1 - (obs_times - simu_times[before_idx]) / _den
weights_before[before_idx < 0] = 0.0 # Handle obs at time zero.
weights_after = 1 - (simu_times[after_idx] - obs_times) / _den
weights_before[den == 0] = 1.0
weights_after[den == 0] = 0.0
return weights_before, weights_after
[docs]def get_values_matching_node_indices(
node_indices: NDArrayInt, input_values: NDArrayFloat
) -> NDArrayFloat:
r"""
Return the values for the given node_indices with shape.
The output shape is (:math:`\lvert \mathrm{node_indices} \rvert|, [nt]),
where :math:`\lvert . \rvert` means cardinality.
Parameters
----------
node_indices : NDArrayInt
Indices in the grid from 0 to nx * ny -1.
input_values : NDArrayFloat
Array of input values with shape (nx, ny, nt) or (nx, ny).
Returns
-------
NDArrayFloat
Simulated values at the observation location
"""
nx, ny, nz = input_values.shape[:3]
X, Y, Z = node_number_to_indices(node_indices, nx=nx, ny=ny)
if len(input_values.shape) == 4:
return input_values[X, Y, Z, :]
# state variable constant within time
return input_values[X, Y, Z]
def get_interp_simu_values_matching_obs_times(
obs_times: NDArrayFloat,
simu_times: NDArrayFloat,
simu_values: NDArrayFloat,
) -> NDArrayFloat:
"""
Get an array of interpolated values that match the observation times.
TODO: add ref to paper + a scheme.
A simple linear interpolation is performed with two values only (the one before
and the one after the observation time.)
Note
----
If measured on several nodes, the simulated values should already
have been averaged at this point.
Parameters
----------
obs_times : NDArrayFloat
1D array of observation times.
simu_times : NDArrayFloat
1D array of simulation times.
simu_values : NDArrayFloat
1D array of simulated values.
Returns
-------
NDArrayFloat
Simulated values interpolated at observation times.
"""
idx_before, idx_after = get_times_idx_before_after_obs(obs_times, simu_times)
weights_before, weights_after = get_weights(
obs_times, simu_times, idx_before, idx_after
)
return (
weights_before * simu_values[idx_before]
+ weights_after * simu_values[idx_after]
)
def get_simulated_values_matching_obs(
model: ForwardModel,
obs: Observable,
max_obs_time: Optional[float] = None,
) -> NDArrayFloat:
"""
Get the simulated values matching the given observable and the hm end time.
Parameters
----------
model : ForwardModel
The forward model.
obs : Observable
The observable instance.
ldt : List[float]
The list of timesteps used to solve the forward model. The simulation times
are derived from the list.
max_obs_time : Optional[float], optional
Maximum time for which to consider an observation value, by default None.
Returns
-------
NDArrayFloat
Simulated values matching the given observable and the hm end time.
"""
simu_times = np.cumsum([0] + model.time_params.ldt)
if max_obs_time is not None:
max_obs_time = min(np.max(simu_times), max_obs_time)
else:
max_obs_time = np.max(simu_times)
field = get_array_from_state_variable(model, obs.state_variable, obs.sp)
obs_times = get_sorted_observable_times(obs, max_obs_time)
if len(field.shape) == 3:
_field = field.reshape((*field.shape, 1))
else:
_field = field
_simu_values = get_mean_values_for_last_axis(
get_values_matching_node_indices(obs.node_indices, _field),
mean_type=obs.mean_type,
weights=None,
)
# control parameter -> not varying in time
if len(field.shape) == 3:
# The interpolated value is the same for all time
return np.repeat(_simu_values, obs_times.size)
# state variable varying in time
return get_interp_simu_values_matching_obs_times(
obs_times, simu_times, _simu_values
)
[docs]def get_adjoint_sources_for_obs(
model: ForwardModel,
obs: Observable,
n_obs: int,
max_obs_time: Optional[float] = None,
) -> NDArrayFloat:
r"""
Get the adjoint sources for a given observable instance.
The objective function with respect to the vector of observed
state variables and control parameters :math:`(\mathbf{d})`
is defined as:
.. math::
\mathcal{J}(\mathbf{d}_{\mathrm{calc}}) = \dfrac{1}{2N} \sum_{n=0}^{N}
\left(\dfrac{d_{\mathrm{obs}}^{t}
- d_{\mathrm{calc}}^{t}}{\sigma_{\mathrm{obs}}^{t}} \right)^{2}
= \dfrac{1}{2N} \sum_{n=0}^{N}
\left(\dfrac{d_{\mathrm{obs}}^{t}
- \left(\omega_{n}av(d_{\mathrm{calc}}^{n})
+ \omega_{n+1}av(d_{\mathrm{calc}}^{n+1})\right)}{
\sigma_{\mathrm{obs}}^{t}} \right)^{2}
with :math:`d_{\mathrm{obs}}^{t}` an observation at a time :math:`t`
comprised between simulation iterations :math:`n` and :math:`n+1`,
:math:`N = \lvert \mathbf{d}_{\mathrm{obs}} \rvert`
the number of observation points, and :math:`\omega` the weights for the linear
interpolation of the calculated value which read:
- :math:`\omega_{n} = 1 - \dfrac{t - t(n)}{t(n+1) - t(n)}`
- :math:`\omega_{n+1} = 1 - \dfrac{(t(n+1) - t)}{t(n+1) - t(n)}`
And :math:`av` a spatial averaging operator required if the observation is done on
several grid cells. Consequently, the objective function depends both on
:math:`d_{\mathrm{calc}}^{n}` and :math:`d_{\mathrm{calc}}^{n+1}`,
and the derivatives read:
.. math::
\begin{eqnarray}
\dfrac{\partial\mathcal{J}}{\partial d_{\mathrm{calc}}^{n}} & = &
\dfrac{\partial}{\partial d_{\mathrm{calc}}^{n}} \left(\dfrac{1}{2 \lvert
\mathbf{d}_{\mathrm{obs}} \rvert } \sum_{n=0}^{N}
\left(\dfrac{d_{\mathrm{obs}}^{t} - \left(\omega_{n}av(d_{\mathrm{calc}}^{n})
+ \omega_{n+1}av(d_{\mathrm{calc}}^{n+1})\right)}{
\sigma_{\mathrm{obs}}^{t}} \right)^{2}\right) \\
& = & - \dfrac{\omega_{n}}{N }
\dfrac{\partial av(d_{\mathrm{calc}}^{n})}{\partial d_{\mathrm{calc}}^{n}}
\dfrac{d_{\mathrm{obs}}^{t} - \left(\omega_{n}av(d_{\mathrm{calc}}^{n})
+ \omega_{n+1}av(d_{\mathrm{calc}}^{n+1})\right)}{
\left(\sigma_{\mathrm{obs}}^{t}\right)^{2}}
\end{eqnarray}
And
.. math::
\dfrac{\partial\mathcal{J}}{\partial d_{\mathrm{calc}}^{n+1}} =
- \dfrac{\omega_{n+1}}{N}
\dfrac{\partial av(d_{\mathrm{calc}}^{n+1})}{\partial d_{\mathrm{calc}}^{n}}
\dfrac{d_{\mathrm{obs}}^{t} - \left(\omega_{n}av(d_{\mathrm{calc}}^{n})
+ \omega_{n+1}av(d_{\mathrm{calc}}^{n+1})\right)}{
\left(\sigma_{\mathrm{obs}}^{t}\right)^{2}}
Parameters
----------
model : ForwardModel
_description_
obs : Observable
_description_
max_obs_time : Optional[float], optional
Maximum time for which to consider an observation value, by default None.
n_obs: float
The number of observation point to consider.
Returns
-------
NDArrayFloat
The adjoint sources for the given Observable instance.
"""
simu_times = np.cumsum([0] + model.time_params.ldt)
if max_obs_time is not None:
max_obs_time = min(np.max(simu_times), max_obs_time)
else:
max_obs_time = np.max(simu_times)
obs_times = get_sorted_observable_times(obs, max_obs_time)
obs_values = get_sorted_observable_values(obs, max_obs_time)
obs_std = get_sorted_observable_uncertainties(obs, max_obs_time)
simu_values = get_simulated_values_matching_obs(model, obs, max_obs_time)
# 1) Taking into account the derivative linked with the values averaging over the
# the grid (observations defined on more than one mesh)
field = get_array_from_state_variable(model, obs.state_variable, obs.sp)
_averaging_derivative = get_mean_values_gradient_for_last_axis(
get_values_matching_node_indices(obs.node_indices, field),
mean_type=obs.mean_type,
weights=None,
)
adj_src = np.zeros(field.shape)
# Location in the grid
X, Y, Z = node_number_to_indices(
obs.node_indices, nx=model.grid.nx, ny=model.grid.ny
)
# 2) Taking into account the derivative linked with the values time interpolation
# (observations defined between two times of the simulation)
# no time dimension, so normally, the adj_src are for the parameter at t=0
# (ex: permeability)
if len(adj_src.shape) == 3:
for n in range(len(obs_times)):
adj_src[X, Y, Z] += (
_averaging_derivative # in this case
* (simu_values - obs_values)[n]
/ (obs_std[n] ** 2)
)
else:
# Otherwise, we derive the weights of the linear interpolation
idx_before, idx_after = get_times_idx_before_after_obs(obs_times, simu_times)
weights_before, weights_after = get_weights(
obs_times, simu_times, idx_before, idx_after
)
for n in range(len(obs_times)):
# Note: ici, on a simu_values = w1 * av(c(n+1)) + w2 * av(c(n))
adj_src[X, Y, Z, idx_before[n]] += (
_averaging_derivative[:, idx_before[n]].ravel("F")
* weights_before[n]
* (simu_values - obs_values)[n]
/ (obs_std[n] ** 2)
)
adj_src[X, Y, Z, idx_after[n]] += (
_averaging_derivative[:, idx_after[n]].ravel("F")
* weights_after[n]
* (simu_values - obs_values)[n]
/ (obs_std[n] ** 2)
)
# Return the adjoint sources
return adj_src
[docs]def get_predictions_matching_observations(
model: ForwardModel,
observables: Observables,
max_obs_time: Optional[float] = None,
) -> NDArrayFloat:
"""
Return the 1D vector of predictions matching the observations.
Parameters
----------
model : ForwardModel
_description_
observables : Observables
_description_
max_obs_time : Optional[float], optional
Maximum time for which to consider an observation value, by default None.
Returns
-------
NDArrayFloat
_description_
"""
res = []
for obs in object_or_object_sequence_to_list(observables):
res.append(
get_simulated_values_matching_obs(model, obs, max_obs_time).flatten()
)
return np.hstack(res).ravel()