Source code for pyrtid.regularization.base

"""
@author: acollet
"""

import logging
from abc import ABC, abstractmethod
from typing import List, Literal, Optional, Tuple

import numpy as np
from scipy.sparse import csc_array, lil_array

from pyrtid.forward.models import get_owner_neigh_indices
from pyrtid.utils import NDArrayFloat, NDArrayInt, RectilinearGrid
from pyrtid.utils.finite_differences import finite_gradient
from pyrtid.utils.preconditioner import NoTransform, Preconditioner


[docs]class RegWeightUpdateStrategy(ABC): """ Strategy to update the regularization parameter while optimizing. """ __slots__ = ["_reg_weight"]
[docs] def __init__(self, reg_weight: float = 1.0) -> None: self.reg_weight = reg_weight
@property def reg_weight(self) -> float: return self._reg_weight @reg_weight.setter def reg_weight(self, value: float) -> None: self._reg_weight = value
[docs] def update_reg_weight( self, loss_ls_history: List[float], loss_reg_history: List[float], reg_weight_history: List[float], loss_ls_grad: NDArrayFloat, loss_reg_grad: NDArrayFloat, n_obs: int, logger: Optional[logging.Logger] = None, ) -> bool: """ Update the regularization weight. Parameters ---------- loss_ls_history : List[float] List of past LS cost function. loss_reg_history : List[float] List of past regularization cost function. reg_weight_history : List[float] List of past regularization parameter (weight). loss_ls_grad : NDArrayFloat Current LS cost function gradient. loss_reg_grad : NDArrayFloat Current Reg cost function gradient. n_obs : int Number of observations used in the LS cost function. logger: Optional[logging.Logger] Optional :class:`logging.Logger` instance used for event logging. The default is None. Returns ------- bool Whether the regularization parameter (weight) has changed. """ return self._update_reg_weight( loss_ls_history, loss_reg_history, reg_weight_history, loss_ls_grad, loss_reg_grad, n_obs, logger, )
[docs] @classmethod def is_adaptive(cls) -> bool: """Return whether the method is adaptive.""" return False
def _update_reg_weight( self, loss_ls_history: List[float], loss_reg_history: List[float], reg_weight_history: List[float], loss_ls_grad: NDArrayFloat, loss_reg_grad: NDArrayFloat, n_obs: int, logger: Optional[logging.Logger] = None, ) -> bool: # by default no update return False
[docs]class ConstantRegWeight(RegWeightUpdateStrategy): """ Implement a constant regularization parameter. Attributes ---------- reg_weight: float Current regularization weight (parameter). """
[docs] def __init__(self, reg_weight: float = 1.0) -> None: super().__init__(reg_weight)
[docs]class Regularizator(ABC): """ Represent a regularizator. This is an abstract class. """ __slots__ = ["preconditioner"]
[docs] def __init__(self, preconditioner: Preconditioner = NoTransform()) -> None: """ Initialize the instance. Parameters ---------- preconditioner : Preconditioner Whether the regularization is applied to the preconditioned values or not, by default False. """ self.preconditioner: Preconditioner = preconditioner
[docs] def eval_loss(self, values: NDArrayFloat) -> float: """ Compute the regularization loss function. Parameters ---------- values : NDArrayFloat The parameter for which the regularization is computed. Returns ------- NDArrayFloat The regularization gradient. """ if not values.ndim == 1: raise ValueError("The 'eval_loss' method expects a 1D vector!") return self._eval_loss(self.preconditioner(values))
@abstractmethod def _eval_loss(self, values: NDArrayFloat) -> float: ... # pragma: no cover
[docs] def eval_loss_gradient_analytical(self, values: NDArrayFloat) -> NDArrayFloat: """ Compute the gradient of the regularization loss function analytically. Parameters ---------- values : NDArrayFloat The parameter for which the regularization is computed. Returns ------- NDArrayFloat The regularization gradient. """ if not values.ndim == 1: raise ValueError( "The 'eval_loss_gradient_analytical' method expects a 1D vector!" ) return self._eval_loss_gradient_analytical(values)
[docs] @abstractmethod def _eval_loss_gradient_analytical(self, values: NDArrayFloat) -> NDArrayFloat: """ Compute the gradient of the regularization loss function analytically. Parameters ---------- values : NDArrayFloat The parameter for which the regularization is computed. Returns ------- NDArrayFloat The regularization gradient. """
[docs] def eval_loss_gradient( self, values: NDArrayFloat, is_finite_differences: bool = False, max_workers: int = 1, ) -> NDArrayFloat: """ Compute the gradient of the regularization loss function. Parameters ---------- values : NDArrayFloat The parameter for which the regularization is computed. is_finite_differences: bool If true, a numerical approximation by 2nd order finite difference is returned. Cost twice the `values` dimensions in terms of loss function calls. The default is False. max_workers: int Number of workers used if the gradient is approximated by finite differences. If different from one, the calculation relies on multi-processing to decrease the computation time. The default is 1. Returns ------- NDArrayFloat The regularization gradient (not preconditioned). """ if not values.ndim == 1: raise ValueError("The 'eval_loss_gradient' method expects a 1D vector!") if is_finite_differences: return finite_gradient(values, self.eval_loss, max_workers=max_workers) else: return self.preconditioner.dtransform_vec( values, self.eval_loss_gradient_analytical(self.preconditioner(values)), )
def make_spatial_gradient_matrices( grid: RectilinearGrid, sub_selection: Optional[NDArrayInt] = None, which: Literal["forward", "backward", "both"] = "both", ) -> Tuple[csc_array, csc_array]: """ Make matrices to compute the spatial gradient along x and y axes of a field. The gradient is obtained by the dot product between the field and the matrix. Parameters ---------- grid : RectilinearGrid RectilinearGrid of the field sub_selection : Optional[NDArrayInt], optional Optional sub selection of the field. Non selected elements will be ignored in the gradient computation (as if non existing). If None, all elements are used. By default None. Returns ------- Tuple[csc_array, csc_array] Spatial gradient matrices for x and y axes. """ dim = grid.n_grid_cells # matrix for the spatial gradient along the x axis mat_grad_x = lil_array((dim, dim), dtype=np.float64) # matrix for the spatial gradient along the y axis mat_grad_y = lil_array((dim, dim), dtype=np.float64) if sub_selection is None: _sub_selection: NDArrayInt = np.arange(dim) else: _sub_selection = sub_selection # X contribution if grid.nx >= 2: tmp = grid.gamma_ij_x / grid.grid_cell_volume if which in ["forward", "both"]: # Forward scheme only: see PhD manuscript, chapter 7 for the explanaition. idc_owner, idc_neigh = get_owner_neigh_indices( grid, (slice(0, grid.nx - 1), slice(None)), (slice(1, grid.nx), slice(None)), owner_indices_to_keep=_sub_selection, neigh_indices_to_keep=_sub_selection, ) mat_grad_x[idc_owner, idc_neigh] -= tmp * np.ones(idc_owner.size) # type: ignore mat_grad_x[idc_owner, idc_owner] += tmp * np.ones(idc_owner.size) # type: ignore if which in ["backward", "both"]: # Forward scheme only: see PhD manuscript, chapter 7 for the explanaition. idc_owner, idc_neigh = get_owner_neigh_indices( grid, (slice(1, grid.nx), slice(None)), (slice(0, grid.nx - 1), slice(None)), owner_indices_to_keep=_sub_selection, neigh_indices_to_keep=_sub_selection, ) mat_grad_x[idc_owner, idc_neigh] -= tmp * np.ones(idc_owner.size) # type: ignore mat_grad_x[idc_owner, idc_owner] += tmp * np.ones(idc_owner.size) # type: ignore # Y contribution if grid.ny >= 2: tmp = grid.gamma_ij_y / grid.grid_cell_volume if which in ["forward", "both"]: # Forward scheme only: see PhD manuscript, chapter 7 for the explanaition. idc_owner, idc_neigh = get_owner_neigh_indices( grid, (slice(None), slice(0, grid.ny - 1)), (slice(None), slice(1, grid.ny)), owner_indices_to_keep=_sub_selection, neigh_indices_to_keep=_sub_selection, ) mat_grad_y[idc_owner, idc_neigh] -= tmp * np.ones(idc_owner.size) # type: ignore mat_grad_y[idc_owner, idc_owner] += tmp * np.ones(idc_owner.size) # type: ignore if which in ["backward", "both"]: # Forward scheme only: see PhD manuscript, chapter 7 for the explanaition. idc_owner, idc_neigh = get_owner_neigh_indices( grid, (slice(None), slice(1, grid.ny)), (slice(None), slice(0, grid.ny - 1)), owner_indices_to_keep=_sub_selection, neigh_indices_to_keep=_sub_selection, ) mat_grad_y[idc_owner, idc_neigh] -= tmp * np.ones(idc_owner.size) # type: ignore mat_grad_y[idc_owner, idc_owner] += tmp * np.ones(idc_owner.size) # type: ignore return mat_grad_x.tocsc(), mat_grad_y.tocsc() def make_spatial_permutation_matrices( grid: RectilinearGrid, sub_selection: Optional[NDArrayInt] = None ) -> Tuple[csc_array, csc_array]: """ Make matrices to compute the spatial permutations along x and y axes of a field. Parameters ---------- grid : RectilinearGrid RectilinearGrid of the field sub_selection : Optional[NDArrayInt], optional Optional sub selection of the field. Non selected elements will be ignored in the gradient computation (as if non existing). If None, all elements are used. By default None. Returns ------- Tuple[csc_array, csc_array] Spatial permutation matrices for x and y axes. """ dim = grid.n_grid_cells # matrix for the spatial permutation along the x axis mat_perm_x = lil_array((dim, dim), dtype=np.float64) # matrix for the spatial permutation along the y axis mat_perm_y = lil_array((dim, dim), dtype=np.float64) if sub_selection is None: _sub_selection: NDArrayInt = np.arange(dim) else: _sub_selection = sub_selection # X contribution if grid.nx >= 2: # Forward scheme: idc_owner, idc_neigh = get_owner_neigh_indices( grid, (slice(0, grid.nx - 1), slice(None)), (slice(1, grid.nx), slice(None)), owner_indices_to_keep=_sub_selection, neigh_indices_to_keep=_sub_selection, ) mat_perm_x[idc_neigh, idc_owner] = np.ones(idc_owner.size) # type: ignore # Y contribution if grid.ny >= 2: # Forward scheme: idc_owner, idc_neigh = get_owner_neigh_indices( grid, (slice(None), slice(0, grid.ny - 1)), (slice(None), slice(1, grid.ny)), owner_indices_to_keep=_sub_selection, neigh_indices_to_keep=_sub_selection, ) mat_perm_y[idc_neigh, idc_owner] = np.ones(idc_owner.size) # type: ignore return mat_perm_x.tocsc(), mat_perm_y.tocsc()