from dataclasses import dataclass
from typing import Optional, Tuple, Union
import numpy as np
from numpy.typing import ArrayLike, NDArray
from scipy.stats._stats_py import _validate_distribution
from pyrtid.regularization.base import Regularizator
from pyrtid.utils import NDArrayFloat, NDArrayInt
from pyrtid.utils.preconditioner import NoTransform, Preconditioner
def ffill(arr: NDArray) -> NDArray:
"""
Forward fill (the NAN) in the given array.
A new array is returned.
"""
prev = np.arange(np.size(arr))
prev[np.isnan(arr)] = 0
prev = np.maximum.accumulate(prev)
return arr[prev]
def make_dist_values_unique(
dist: ArrayLike, weights: Optional[ArrayLike] = None
) -> Tuple[ArrayLike, Union[NDArrayFloat, NDArrayInt], NDArrayInt, NDArrayInt]:
"""
Make the values in the given distribution unique.
Weights are summed.
Parameters
----------
dist : ArrayLike
Values of the (empirical) distribution.
weights : Optional[ArrayLike], optional
_description_, by default None
Returns
-------
Tuple[ArrayLike, Union[NDArrayFloat, NDArrayInt], NDArrayInt, NDArrayInt]
Arrays containing:
- The unique values by increasing order.
- The associated (aggregated) weights. If no weights were provided, it is
simply the count for each unique value.
- The indices of first occurrence in the original array.
- The sorter (indices) by increasing order in the original array (`dist`).
"""
sorter = np.argsort(dist)
vals, _indices, _counts = np.unique(
np.asarray(dist)[sorter], return_counts=True, return_index=True
)
if weights is None:
_weights = _counts
else:
idx = np.zeros_like(dist)
idx[:] = np.nan
idx[_indices] = np.arange(np.size(vals))
idx = np.asarray(ffill(idx)[np.argsort(sorter)], dtype=np.int64)
_weights = np.bincount(idx, weights)
return vals, _weights, _indices, sorter
def get_cdfs(
u_values: ArrayLike,
v_values: ArrayLike,
u_weights: Optional[ArrayLike] = None,
v_weights: Optional[ArrayLike] = None,
) -> Tuple[NDArrayFloat, NDArrayFloat, NDArrayFloat, NDArrayInt]:
"""Return the merged cumulative density functions and associated weights."""
u_values, u_weights = _validate_distribution(u_values, u_weights)
v_values, v_weights = _validate_distribution(v_values, v_weights)
u_sorter = np.argsort(u_values)
v_sorter = np.argsort(v_values)
all_values = np.concatenate((u_values, v_values))
all_sorter = np.argsort(all_values, kind="mergesort")
all_values = all_values[all_sorter]
# Compute the differences between pairs of successive values of u and v.
deltas = np.diff(all_values)
# Get the respective positions of the values of u and v among the values of
# both distributions.
u_cdf_indices = u_values[u_sorter].searchsorted(all_values[:-1], "right")
v_cdf_indices = v_values[v_sorter].searchsorted(all_values[:-1], "right")
# Calculate the CDFs of u and v using their weights, if specified.
if u_weights is None:
u_cdf = u_cdf_indices / u_values.size
else:
u_sorted_cumweights = np.concatenate(([0], np.cumsum(u_weights[u_sorter])))
u_cdf = u_sorted_cumweights[u_cdf_indices] / u_sorted_cumweights[-1]
if v_weights is None:
v_cdf = v_cdf_indices / v_values.size
else:
v_sorted_cumweights = np.concatenate(([0], np.cumsum(v_weights[v_sorter])))
v_cdf = v_sorted_cumweights[v_cdf_indices] / v_sorted_cumweights[-1]
return u_cdf, v_cdf, deltas, all_sorter
def cdf_distance(
p: float,
u_values: ArrayLike,
v_values: ArrayLike,
u_weights: Optional[ArrayLike] = None,
v_weights: Optional[ArrayLike] = None,
) -> float:
r"""
Compute, between two one-dimensional distributions :math:`u` and
:math:`v`, whose respective CDFs are :math:`U` and :math:`V`, the
statistical distance that is defined as:
.. math::
l_p(u, v) = \left( \int_{-\infty}^{+\infty} |U-V|^p \right)^{1/p}
p is a positive parameter; p = 1 gives the Wasserstein distance, p = 2
gives the energy distance.
Parameters
----------
u_values, v_values : array_like
Values observed in the (empirical) distribution.
u_weights, v_weights : array_like, optional
Weight for each value. If unspecified, each value is assigned the same
weight.
`u_weights` (resp. `v_weights`) must have the same length as
`u_values` (resp. `v_values`). If the weight sum differs from 1, it
must still be positive and finite so that the weights can be normalized
to sum to 1.
Returns
-------
distance : float
The computed distance between the distributions.
Notes
-----
The input distributions can be empirical, therefore coming from samples
whose values are effectively inputs of the function, or they can be seen as
generalized functions, in which case they are weighted sums of Dirac delta
functions located at the specified values.
References
----------
.. [1] Bellemare, Danihelka, Dabney, Mohamed, Lakshminarayanan, Hoyer,
Munos "The Cramer Distance as a Solution to Biased Wasserstein
Gradients" (2017). :arXiv:`1705.10743`.
"""
# First make values in the distribution unique to reduce the complexity
# of the algorithm
new_u, new_u_weights = make_dist_values_unique(u_values, u_weights)[:2]
new_v, new_v_weights = make_dist_values_unique(v_values, v_weights)[:2]
u_cdf, v_cdf, deltas, _ = get_cdfs(new_u, new_v, new_u_weights, new_v_weights)
# Compute the value of the integral based on the CDFs.
# If p = 1 or p = 2, we avoid using np.power, which introduces an overhead
# of about 15%.
if p == 1:
return np.sum(np.multiply(np.abs(u_cdf - v_cdf), deltas))
if p == 2:
return np.sqrt(np.sum(np.multiply(np.square(u_cdf - v_cdf), deltas)))
return np.power(
np.sum(np.multiply(np.power(np.abs(u_cdf - v_cdf), p), deltas)), 1 / p
)
def cdf_distance_gradient(
p: float,
u_values: ArrayLike,
v_values: ArrayLike,
u_weights: Optional[ArrayLike] = None,
v_weights: Optional[ArrayLike] = None,
) -> NDArrayFloat:
r"""
Compute, between two one-dimensional distributions :math:`u` and
:math:`v`, whose respective CDFs are :math:`U` and :math:`V`, the
gradient with respect to :math:`u` of the statistical distance that is defined as:
.. math::
l_p(u, v) = \left( \int_{-\infty}^{+\infty} |U-V|^p \right)^{1/p}
p is a positive parameter; p = 1 gives the Wasserstein distance, p = 2
gives the energy distance.
Parameters
----------
u_values, v_values : array_like
Values observed in the (empirical) distribution.
u_weights, v_weights : array_like, optional
Weight for each value. If unspecified, each value is assigned the same
weight.
`u_weights` (resp. `v_weights`) must have the same length as
`u_values` (resp. `v_values`). If the weight sum differs from 1, it
must still be positive and finite so that the weights can be normalized
to sum to 1.
Returns
-------
distance gradient : NDArrayFloat
The computed distance between the distributions.
Notes
-----
The input distributions can be empirical, therefore coming from samples
whose values are effectively inputs of the function, or they can be seen as
generalized functions, in which case they are weighted sums of Dirac delta
functions located at the specified values.
References
----------
.. [1] Bellemare, Danihelka, Dabney, Mohamed, Lakshminarayanan, Hoyer,
Munos "The Cramer Distance as a Solution to Biased Wasserstein
Gradients" (2017). :arXiv:`1705.10743`.
"""
# First make values in the distribution unique to reduce the complexity
# And allow the derivation with respect to repeated values (in u)
new_u, new_u_weights, old_u_indices, u_sorter = make_dist_values_unique(
u_values, u_weights
)
new_v_values, new_v_weights, _, _ = make_dist_values_unique(v_values, v_weights)
u_cdf, v_cdf, deltas, all_sorter = get_cdfs(
new_u, new_v_values, new_u_weights, new_v_weights
)
# Note about the derivation => the derivative of u_cdf with respect to u_values
# is always null (the order is preserved)
# So the only term that matters in the derivation is the one with resepct to deltas
if p == 1:
_temp: NDArrayFloat = np.abs(u_cdf - v_cdf)
elif p == 2:
_temp = (
0.5
/ np.sqrt(np.sum(np.multiply(np.square(u_cdf - v_cdf), deltas)))
* np.square(u_cdf - v_cdf)
)
else:
_temp = (
1
/ p
* np.power(
np.sum(np.multiply(np.power(np.abs(u_cdf - v_cdf), p), deltas)),
1 / p - 1,
)
* np.power(np.abs(u_cdf - v_cdf), p)
)
# First approach = more expensive because one needs to build J,
# the derivatives of deltas with respect to the sorted u_values
# it is a sparse matrix filled by 1 and -1
# n = u_cdf_indices.size
# J = sp.sparse.diags_array([-np.ones(n + 1), np.ones(n)], offsets=[0, 1]).tocsc()[
# :-1, np.argsort(all_sorter)
# ][:, :np.size(u_values)]
# conversion to csc for faster multiplication
# return J.tocsc().T @ _temp
# Second approach strictly equivalent but faster
grad = (
-np.concatenate([_temp, [0.0]], dtype=np.float64)[np.argsort(all_sorter)][
: np.size(new_u)
]
+ np.concatenate([[0.0], _temp], dtype=np.float64)[np.argsort(all_sorter)][
: np.size(new_u)
]
)
# This is to hanbdle duplictaed values
out = np.zeros_like(u_values)
out[:] = np.nan
out[old_u_indices] = grad / new_u_weights
out = ffill(out)[np.argsort(u_sorter)]
if u_weights is not None:
return out * np.array(u_weights, dtype=np.float64)
else:
return out
def _get_subsel(sub_selection: Optional[NDArrayInt]) -> Union[NDArrayInt, slice]:
if sub_selection is not None:
return sub_selection
return slice(None)
[docs]@dataclass
class ProbDistFitting(Regularizator):
r"""
Apply an (empirical) probability distribution fitting regularization.
The fit is based on the Wasserstein distance.
Attributes
----------
target_values: ArrayLike
Values observed in the (empirical) distribution.
target_weights: Optional[ArrayLike]
Weight for each target value. If unspecified, each value is assigned the same
weight.
`target_weights` must have the same length as `target_values`.
If the weight sum differs from 1, it must still be positive and finite so that
the weights can be normalized to sum to 1. The default is None.
sub_selection : Optional[NDArrayInt], optional
Optional sub selection of the field. Non selected elements will be
ignored in the gradient computation (as if non existing). If None, all
elements are used. By default None.
preconditioner: Preconditioner
Parameter pre-transformation operator (variable change for the solver).
The default is the identity function: f(x) = x, which means no change
is made.
order: float
Positive parameter; p = 1 gives the Wasserstein distance, p = 2
gives the energy distance. The default is one.
"""
target_values: ArrayLike
target_weights: Optional[ArrayLike] = None
sub_selection: Optional[NDArrayInt] = None
preconditioner: Preconditioner = NoTransform()
order: int = 1
[docs] def _eval_loss(self, values: NDArrayFloat) -> float:
r"""
Compute the gradient of the regularization loss function analytically.
.. math::
\mathcal{R}_{TN}(u) = \frac{1}{2} \sum_{j=1}^{M} \sum_{i=1}^{N}
\left( \dfrac{u_{i+1, j} - u_{i,j}}{dx} \right)^{2}
Parameters
----------
values : NDArrayFloat
The parameter for which the regularization is computed.
Returns
-------
float
"""
return cdf_distance(
p=self.order,
u_values=values[_get_subsel(self.sub_selection)].ravel("F"),
v_values=self.target_values,
u_weights=None,
v_weights=self.target_weights,
)
[docs] def _eval_loss_gradient_analytical(self, values: NDArrayFloat) -> NDArrayFloat:
r"""
Compute the gradient of the regularization loss function analytically.
Parameters
----------
values : NDArrayFloat
The parameter for which the regularization is computed.
Returns
-------
NDArrayFloat
The regularization gradient.
"""
out = np.zeros_like(values)
out[_get_subsel(self.sub_selection)] = cdf_distance_gradient(
p=self.order,
u_values=values[_get_subsel(self.sub_selection)].ravel("F"),
v_values=self.target_values,
u_weights=None,
v_weights=self.target_weights,
)
return out.reshape(values.shape, order="F")