Source code for pyrtid.regularization.covariances

"""Provide covariance matrix representation.

Note: add some notes about:
https://github.com/arvindks/kle/blob/master/covariance/covariance.py

And cite Saibaba's phd thesis about the uncertainty and all.
"""

from __future__ import annotations

import logging
from abc import abstractmethod
from time import time
from typing import Callable, List, Optional, Sequence, Tuple, Union

import numpy as np
import scipy as sp
from numpy.random import Generator, RandomState
from scipy.linalg import solve
from scipy.sparse import csc_array, csr_array
from scipy.sparse.linalg import LinearOperator, eigsh, gmres, lgmres
from scipy.spatial import cKDTree, distance_matrix
from scipy.spatial.distance import cdist
from sksparse.cholmod import Factor

from pyrtid.regularization.hmatrix import Hmatrix
from pyrtid.regularization.toeplitz import create_toepliz_first_row, toeplitz_product
from pyrtid.utils import (
    NDArrayFloat,
    NDArrayInt,
    check_random_state,
    get_pts_coords_regular_grid,
    sparse_cholesky,
)
from pyrtid.utils.spde import get_variance


class CallBack:
    """Represents a callback instance."""

    __slots__: List[str] = ["res"]

    def __init__(self) -> None:
        """Initialize the instance."""
        self.res: List[NDArrayFloat] = []

    def __call__(self, rk) -> None:
        self.res.append(rk)

    @property
    def itercount(self) -> int:
        """Return the number of times the callback as been called."""
        return len(self.res)

    def clear(self) -> None:
        """Delete all results."""
        self.res = []


[docs]class CovarianceMatrix(LinearOperator): """ Represents a covariance matrix. This is an abstract class. """ __slots__: List[str] = ["dtype", "count", "solvmatvecs"]
[docs] def __init__(self, shape) -> None: """ Initialize the instance. Parameters ---------- shape: Tuple[int, int] Shape of the matrix. """ # counters self.count: int = 0 self.solvmatvecs: int = 0 super().__init__(dtype="d", shape=shape)
@property def number_pts(self) -> int: """Number of points in the domain (n).""" return self.shape[0]
[docs] def reset_comptors(self) -> None: """Set the comptors to zero.""" self.count = 0 self.solvmatvecs = 0
[docs] def itercount(self) -> int: """Return the number of counts.""" return self.count
[docs] @abstractmethod def solve(self, b: NDArrayFloat) -> NDArrayFloat: """Solve Ax = b, with A, the current covariance matrix instance."""
[docs] def get_diagonal(self) -> NDArrayFloat: """ Return the diagonal entries of the matrix (variances). The matrix is never built explicitly. Instead the matvec interface is used to multiply all column of the identity matrix. """ approx_diag = np.zeros(self.number_pts) for i in range(self.number_pts): # construct the ith row of the identity matrix v = np.zeros(self.number_pts) v[i] = 1.0 approx_diag[i] = self.matvec(v)[i] return approx_diag
[docs] def get_trace(self) -> float: """Return the trace of the covariance matrix.""" return float(np.sum(self.get_diagonal()))
class KernelCovarianceMatrix(CovarianceMatrix): __slots__: List[str] = ["kernel", "pts", "nugget"] def __init__( self, pts: NDArrayFloat, kernel: Callable, nugget: float = 0.0 ) -> None: """ Initialize the instance. Parameters ---------- pts : NDArrayFloat _description_ kernel : Callable _description_ nugget : float, optional _description_, by default 0.0 """ super().__init__((pts.shape[0], pts.shape[0])) self.kernel: Callable = kernel self.pts: NDArrayFloat = pts self.nugget: float = nugget # counters self.count: int = 0 self.solvmatvecs: int = 0 def build_preconditioner( pts: NDArrayFloat, kernel: Callable, k: int = 100 ) -> csr_array: """ Implementation of the preconditioner based on changing basis. Parameters ---------- pts : NDArrayFloat The points (n, m) with n the number of data points and m the dimension of coordinates. k : int, optional Number of local centers in the preconditioner. Controls the sparity of the preconditioner. By default 100. Returns ------- csr_array _description_ Raises ------ ValueError _description_ Notes: ------ Implementation of the preconditioner based on local centers. The parameter k controls the sparsity and the effectiveness of the preconditioner. Larger k is more expensive but results in fewer iterations. For large ill-conditioned systems, it was best to use a nugget effect to make the problem better conditioned. To Do: implementation based on local centers and additional points. Will remove the hack of using nugget effect. """ nb_pts: int = pts.shape[0] if nb_pts <= 0: raise ValueError("The number of points cannot be null !") if nb_pts < k: raise ValueError("k must be superior to the number of points !") # Build the tree start: float = time() tree: cKDTree = cKDTree(pts, leafsize=32) end: float = time() logging.log(logging.INFO, f"Tree building time = {end - start}") # Find the nearest neighbors of all the points start = time() _dist, ind = tree.query(pts, k=k) end = time() logging.log(logging.INFO, f"Nearest neighbor computation time = {end - start}") Q = np.zeros((k, k), dtype="d") y = np.zeros((k, 1), dtype="d") row = np.tile(np.arange(nb_pts), (k, 1)).transpose() col = np.copy(ind) nu = np.zeros((nb_pts, k), dtype="d") y[0] = 1.0 start = time() # TODO: This is very inefficient and must be re-written for i in np.arange(nb_pts): Q = kernel(cdist(pts[ind[i, :], :], pts[ind[i, :], :])) nui = np.linalg.solve(Q, y) nu[i, :] = np.copy(nui.transpose()) end = time() logging.log(logging.INFO, "Elapsed time = %g" % (end - start)) ij = np.zeros((nb_pts * k, 2), dtype="i") ij[:, 0] = np.copy(np.reshape(row, nb_pts * k, order="F").transpose()) ij[:, 1] = np.copy(np.reshape(col, nb_pts * k, order="F").transpose()) data = np.copy(np.reshape(nu, nb_pts * k, order="F").transpose()) return csr_array((data, ij.transpose()), shape=(nb_pts, nb_pts), dtype="d")
[docs]class DenseCovarianceMatrix(CovarianceMatrix): """Represents a dense covariance matrix."""
[docs] def __init__( self, mat: NDArrayFloat, nugget: float = 0, ) -> None: self.mat = mat self.nugget = nugget super().__init__((mat.shape[0], mat.shape[0]))
[docs] def _matvec(self, x: NDArrayFloat) -> NDArrayFloat: """Return the covariance matrix times the vector x.""" return np.dot(self.mat, x) * (1 + self.nugget)
[docs] def _rmatvec(self, x: NDArrayFloat) -> NDArrayFloat: """Return the covariance matrix conjugate transpose times the vector x.""" return np.dot(self.mat.T, x)
[docs] def solve(self, b: NDArrayFloat) -> NDArrayFloat: """Solve Ax = b, with A, the current covariance matrix instance.""" return solve(self.mat, b, assume_a="sym")
[docs] def get_diagonal(self) -> NDArrayFloat: """Return the diagonal entries of the matrix (variances).""" return self.mat.diagonal()
[docs] def get_trace(self) -> NDArrayFloat: """Return the trace of the covariance matrix.""" return self.mat.trace()
[docs]def generate_dense_matrix( pts: NDArrayFloat, kernel: Callable, len_scale: NDArrayFloat, nugget: float = 0.0 ) -> DenseCovarianceMatrix: """ Generate a dense matrix. Compute O(dim^2) interactions. Parameters ---------- pts : NDArrayFloat DESCRIPTION. kernel : TYPE DESCRIPTION. len_scale: NDArrayFloat DESCRIPTION. Returns ------- NDArrayFloat The dense matrix. """ # Scale the points coordinates scaled_pts = np.array(pts, copy=True) for dim in range(scaled_pts.shape[1]): scaled_pts[:, dim] /= len_scale[dim] return DenseCovarianceMatrix( kernel(distance_matrix(scaled_pts, scaled_pts)), nugget=nugget )
[docs]class EnsembleCovarianceMatrix(CovarianceMatrix): r""" Represents a covariance matrix as an ensemble of realizations. For a given ensemble with shape (:math:`N_{s}`, :math:`N_{e}`), the number of points and the number of members in the ensemble respectively, the covariance matrix :math:`\mathbf{\Sigma_{ss}}` is approximated from the ensemble in the standard way of EnKF :cite:p:`evensenDataAssimilationEnsemble2007,aanonsenEnsembleKalmanFilter2009`: .. math:: \mathbf{\Sigma_{ss}} = \frac{1}{N_{e} - 1} \sum_{j=1}^{N_{e}}\left(s_{j} - \overline{s}\right)\left(s_{j} - \overline{s^{l}} \right)^{T} Or by defining a matrix of anomalies :math:`\mathbf{A} = \mathbf{S} - \overline{\mathbf{S}}` with shape (:math:`N_{s}`, :math:`N_{e}`): .. math:: \mathbf{\Sigma_{ss}} = \frac{1}{N_{e} - 1} \mathbf{A}^{T}\mathbf{A} Note ---- Practically, the dense covariance matrix is never built, only the anomalies matrix :math:`\mathbf{A}` is used. The product between the inverse of the covariance matrix and a vector :math:`\mathbf{x} = \mathbf{\Sigma_{ss}}^{-1}\mathbf{b}` is obtained solving the system :math:`\mathbf{A}^{T}\mathbf{Ax} = \mathbf{b}`, using gmres, where only anomalies matrix vector products are required. """
[docs] def __init__( self, ensemble: NDArrayFloat, ) -> None: """ Initiate the instance. Parameters ---------- ensemble : NDArrayFloat Ensemble of realization with shape (:math:`N_{s}`, :math:`N_{e}`). """ # on axis 1, the number of parameters super().__init__((ensemble.shape[1], ensemble.shape[1])) self.ensemble = ensemble
@property def anomalies(self) -> NDArrayFloat: """ Return the matrix of anomalies. """ return self.ensemble - np.mean(self.ensemble, axis=0, keepdims=True) @property def n_ens(self) -> int: """Return the number of members in the ensemble.""" return self.ensemble.shape[0]
[docs] def _matvec(self, x: NDArrayFloat) -> NDArrayFloat: """Return the covariance matrix times the vector x (dot product).""" return np.linalg.multi_dot([self.anomalies.T, self.anomalies, x]) / ( self.n_ens - 1 ) # type: ignore
[docs] def todense(self) -> NDArrayFloat: """ Return a dense representation of the matrix. """ return self.anomalies.T @ self.anomalies / (self.n_ens - 1)
[docs] def solve( self, b: NDArrayFloat, rtol: float = 1e-12, maxiter: int = 1000 ) -> NDArrayFloat: """ Solve A^{T}Ax = b, with A, the anomalies matrix instance. Note that the dense covariance matrix is never built. """ residual = CallBack() x, info = gmres( self, b, rtol=rtol, maxiter=maxiter, callback=residual, atol=0.0, callback_type="legacy", ) self.solvmatvecs += residual.itercount return x
[docs] def get_diagonal(self) -> NDArrayFloat: """Return the diagonal entries of the matrix (variances).""" return np.sum((self.anomalies**2), axis=0) / (self.n_ens - 1.0)
[docs]class FFTCovarianceMatrix(KernelCovarianceMatrix): """ Represents a fast fourier transform covariance matrix. FFT based operations if kernel is stationary or translation invariant and points are on a regular grid. """
[docs] def __init__( self, kernel, mesh_dim: Union[float, NDArrayFloat, Sequence[float]], domain_shape: Union[int, NDArrayInt, Sequence[int]], len_scale: NDArrayFloat, nugget: float = 0.0, k: int = 100, is_use_preconditioner: bool = False, ) -> None: """_summary_ Parameters ---------- kernel : _type_ _description_ mesh_dim : Union[NDArrayInt, Tuple[float, float]] _description_ domain_shape : Union[NDArrayInt, Tuple[int, int]] _description_ len_scale : NDArrayFloat _description_ nugget : float, optional _description_, by default 0.0 k : int, optional Number of local centers in the preconditioner. Controls the sparity of the preconditioner. It should be inferior to the number of points. By default 100. is_use_preconditioner: bool Whether to build the preconditioner at instance creation and use it to solve Ax = b systems. The default is False. """ self.param_shape: NDArrayInt = np.array(domain_shape, dtype=np.int8) # Coordinates of the points in the grid with shape (Npts, Ndim) pts = get_pts_coords_regular_grid(mesh_dim, self.param_shape) self.first_row = create_toepliz_first_row(pts, kernel, len_scale) super().__init__(pts, kernel, nugget) if is_use_preconditioner: self.preconditioner: Optional[csr_array] = build_preconditioner( pts, kernel, k=k ) else: self.preconditioner = None
[docs] def _matvec(self, x: NDArrayFloat) -> NDArrayFloat: """Return the covariance matrix times the vector x.""" return toeplitz_product(x, self.first_row, self.param_shape) * (1 + self.nugget)
[docs] def solve( self, b: NDArrayFloat, rtol: float = 1e-12, maxiter: int = 1000 ) -> NDArrayFloat: """Solve Ax = b, with A, the current covariance matrix instance.""" residual = CallBack() x, info = gmres( self, b, rtol=rtol, maxiter=maxiter, callback=residual, M=self.preconditioner, atol=0.0, callback_type="legacy", ) self.solvmatvecs += residual.itercount return x
[docs] def get_diagonal(self) -> NDArrayFloat: """Return the diagonal entries of the matrix (variances).""" return self.kernel(np.zeros(len(self.pts)))
[docs] def get_trace(self) -> float: """Return the trace (sum of the diagonal) of the covariance matrix.""" return float(np.sum(self.get_diagonal()))
[docs]class HCovarianceMatrix(KernelCovarianceMatrix): """ Represents a hierarchical covariance matrix. Works for arbitrary kernels on irregular grids """
[docs] def __init__( self, kernel: Callable, pts: NDArrayFloat, len_scale: NDArrayFloat, rkmax: int = 32, eps: float = 1.0e-9, nugget: float = 0.0, is_verbose: bool = False, k: int = 100, ) -> None: n: int = np.size(pts, 0) ind = np.arange(n) self.H = Hmatrix(pts, kernel, ind, is_verbose, rkmax, eps) super().__init__(pts, kernel, nugget) self.is_verbose = is_verbose self.preconditioner: csr_array = build_preconditioner(pts, kernel, k=k)
[docs] def _matvec(self, x: NDArrayFloat) -> NDArrayFloat: """Return the covariance matrix times the vector x.""" y = np.zeros_like(x, dtype="d") return self.H.mult(x, y, self.is_verbose) * (1 + self.nugget)
[docs] def _rmatvec(self, x: NDArrayFloat) -> NDArrayFloat: """Return the covariance matrix conjugate transpose times the vector x.""" y = np.zeros_like(x, dtype="d") return self.H.transpmult(x, y, self.is_verbose)
[docs] def solve( self, b: NDArrayFloat, rtol: float = 1e-12, maxiter: int = 1000 ) -> NDArrayFloat: """Solve Ax = b, with A, the current covariance matrix instance.""" residual = CallBack() x, info = lgmres( self, b, rtol=rtol, maxiter=maxiter, callback=residual, M=self.preconditioner, atol=0.0, ) self.solvmatvecs += residual.itercount return x
[docs] def get_diagonal(self) -> NDArrayFloat: """Return the diagonal entries of the matrix (variances).""" return self.kernel(np.zeros(len(self.pts)))
[docs] def get_trace(self) -> float: """Return the trace of the covariance matrix.""" return float(np.sum(self.get_diagonal()))
[docs]class EigenFactorizedCovarianceMatrix(CovarianceMatrix): """Compressed version of the covariance matrix."""
[docs] def __init__( self, eig_vals: NDArrayFloat, eig_vects: NDArrayFloat, ) -> None: """ Initialize the instance. Parameters ---------- eig_vals : NDArrayFloat 1D vector of eigen values with size `n_pc`. eig_vects : NDArrayFloat 2D arrays of eigen vectors (columns) with size `(Ns, n_pc)`. Ns being the number of elements in the original covariance matrix. """ super().__init__((eig_vects.shape[0], eig_vects.shape[0])) self.eig_vals: NDArrayFloat = eig_vals self.eig_vects: NDArrayFloat = eig_vects assert self.eig_vals.size == self.eig_vects.shape[1]
@property def n_pc(self) -> int: """ Return the number of eigen vectors/values, i.e. principal components. It is determined from the eigen values vector size. """ return self.eig_vals.size
[docs] def _matvec(self, x: NDArrayFloat) -> NDArrayFloat: """Return the covariance matrix times the vector x.""" return np.dot( self.eig_vects, np.multiply(self.eig_vals, np.dot(self.eig_vects.T, x.reshape(-1, 1))), )
[docs] def solve(self, x: NDArrayFloat) -> NDArrayFloat: r""" Return $Q^{-1} x = ZD^{-1}Z^{T}x$. Parameters ---------- x: NDArrayFloat Column vector with shape ($N_{\mathrm{s}}$, 1) or ensemble matrix with shape ($N_{\mathrm{s}}$, $N_e$). Returns ------- NDAarrayFloat Column vector with shape ($N_{\mathrm{s}}$, 1) or ensemble matrix with shape ($N_{\mathrm{s}}$, $N_e$). """ # np.dot(invZs.T, invZs) # Note: x must be a column vector of a matrix with size (Ns, Ne) ne = 1 # case of a column vector if x.ndim > 1: ne = x.shape[1] return np.dot( self.eig_vects, np.multiply( 1.0 / self.eig_vals, np.dot(self.eig_vects.T, x.reshape(-1, ne)) ), )
[docs] def get_trace(self) -> float: """Return the trace of the covariance matrix.""" return float(np.sum(self.eig_vals))
def todense(self) -> NDArrayFloat: return np.dot(self.eig_vects, np.multiply(self.eig_vals, self.eig_vects.T))
[docs] def get_sparse_LLT_factor(self) -> csc_array: """ Return the sparse factor L of the LL^T factorization of the eigen matrix. Return ------ L: csc_array L = U * V^{T/2}. """ # 1) Convert U sqrt(V) to a sparse format sp_mat = sp.sparse.lil_array(self.eig_vects * np.sqrt(self.eig_vals).T) # 2) Resize -> we now have a square matrix and indices are preserved sp_mat.resize(self.shape) # 3) Convert to column format return sp_mat.tocsc()
[docs]class SparseInvCovarianceMatrix(CovarianceMatrix): """ Represents a sparse inverse covariance matrix. Works for arbitrary kernels on irregular grids. """ __slots__ = ["inv_mat", "inv_mat_cho_factor", "preconditioner"]
[docs] def __init__( self, inv_mat: csc_array, inv_mat_cho_factor: Optional[Factor] = None, ) -> None: """ Initialize the instance. Parameters ---------- inv_mat : csc_array Sparse precision matrix (inverse of the covariance matrix). inv_mat_cho_factor: Optional[Factor] inv_mat CHOLMOD Factor. If not provided, the factorization is performed at the instance initialization. The default is None. """ self.inv_mat: csc_array = csc_array(inv_mat) if inv_mat_cho_factor is None: self.inv_mat_cho_factor: Factor = sparse_cholesky(self.inv_mat) else: self.inv_mat_cho_factor: Factor = inv_mat_cho_factor super().__init__(inv_mat.shape)
[docs] def _matvec(self, x: NDArrayFloat) -> NDArrayFloat: """Return the covariance matrix times the vector x.""" return self.inv_mat_cho_factor(x)
[docs] def solve(self, x: NDArrayFloat) -> NDArrayFloat: """Return $Q^{-1} x.""" return self.inv_mat.dot(x)
[docs] def get_diagonal(self) -> NDArrayFloat: """ Return the diagonal entries of the matrix (variances). The matrix is never built explicitly. Instead the matvec interface is used to multiply all column of the identity matrix. """ return get_variance(self.inv_mat, self.inv_mat_cho_factor)
[docs]def get_matrix_eigen_factorization( cov_mat: CovarianceMatrix, n_pc: int, random_state: Optional[Union[int, RandomState, Generator]] = None, ) -> Tuple[NDArrayFloat, NDArrayFloat]: """ Compute Eigenmodes of the covariance. Parameters ---------- cov_mat : CovarianceMatrix The covariance matrix instance to decompose. n_pc : int Number of principal component in the matrix. random_state: Optional[Union[int, np.random.Generator, np.random.RandomState]] Pseudorandom number generator state used to generate resamples. If `random_state` is ``None`` (or `np.random`), the `numpy.random.RandomState` singleton is used. If `random_state` is an int, a new ``RandomState`` instance is used, seeded with `random_state`. If `random_state` is already a ``Generator`` or ``RandomState`` instance then that instance is used. Raises ------ NotImplementedError If a method difference from arpack is used for decomposition. Returns ------- Tuple[NDArrayFloat, NDArrayFloat] Eigen values and eigen vectors. """ logging.info("Eigendecomposition of Prior Covariance") # twopass = False if not 'twopass' in self.params else self.params['twopass'] start = time() # Random state for v0 vector used by eigsh and svds if random_state is not None: random_state = check_random_state(random_state) v0 = random_state.uniform(size=(cov_mat.shape[0],)) else: v0 = None eig_vals, eig_vects = eigsh(cov_mat, k=n_pc, v0=v0) eig_vals = eig_vals[::-1] eig_vals = eig_vals.reshape(-1, 1) # make a column vector eig_vects = eig_vects[:, ::-1] logging.info( "- time for eigendecomposition with k = %d is %g sec" % (n_pc, round(time() - start)) ) if (eig_vals > 0).sum() < n_pc: n_pc = (eig_vals > 0).sum() eig_vals = eig_vals[:n_pc, :] eig_vects = eig_vects[:, :n_pc] logging.warning("Warning: n_pc changed to %d for positive eigenvalues" % (n_pc)) logging.info( f"- 1st eigv : {eig_vals[0]}, {n_pc}-th eigv : {eig_vals[-1]}, " f"ratio: {eig_vals[-1] / eig_vals[0]}" ) return eig_vals, eig_vects
[docs]def eigen_factorize_cov_mat( cov_mat: CovarianceMatrix, n_pc: int, random_state: Optional[Union[int, RandomState, Generator]] = None, ) -> EigenFactorizedCovarianceMatrix: """ Return an eigen factorized covariance matrix from the input covariance matrix. Parameters ---------- cov_mat : CovarianceMatrix The covariance matrix instance to decompose. n_pc : int Number of principal component in the matrix. random_state: Optional[Union[int, np.random.Generator, np.random.RandomState]] Pseudorandom number generator state used to generate resamples. If `random_state` is ``None`` (or `np.random`), the `numpy.random.RandomState` singleton is used. If `random_state` is an int, a new ``RandomState`` instance is used, seeded with `random_state`. If `random_state` is already a ``Generator`` or ``RandomState`` instance then that instance is used. Returns ------- EigenFactorizedCovarianceMatrix Decomposed matrix instance. """ if isinstance(cov_mat, EigenFactorizedCovarianceMatrix): return cov_mat eig_vals, eig_vects = get_matrix_eigen_factorization(cov_mat, n_pc, random_state) return EigenFactorizedCovarianceMatrix(eig_vals, eig_vects)
[docs]def get_explained_var( eigval: NDArrayFloat, cov_mat: Optional[CovarianceMatrix] = None, trace_cov_mat: Optional[float] = None, ) -> NDArrayFloat: """Return the variance explained by each eigen value.""" if trace_cov_mat is not None: return eigval / trace_cov_mat if cov_mat is not None: return eigval / cov_mat.get_trace() else: raise ValueError("You must provide a Covariance matrix instance or the trace !")
[docs]def sample_from_sparse_cov_factor( mean: NDArrayFloat, factor: csc_array, n_samples: int = 100, random_state: Optional[ Union[int, np.random.Generator, np.random.RandomState] ] = None, ) -> NDArrayFloat: r""" Sample from the given sparse factor of the covariance matrix and the given mean. Parameters ---------- mean: NDArrayFloat Mean of the field with shape $N_{\mathrm{s}}$. factor: NDArrayFloat Sparse factor of the covariance matrix from which to sample from. It has shape $(N_{\mathrm{s}} \times N_{\mathrm{s}})$. n_samples: int The number of samples required ($N_{\mathrm{e}}$). By default 100. random_state : Optional[Union[int, np.random.Generator, np.random.RandomState]] Pseudorandom number generator state used to generate resamples. If `random_state` is ``None`` (or `np.random`), the `numpy.random.RandomState` singleton is used. If `random_state` is an int, a new ``RandomState`` instance is used, seeded with `random_state`. If `random_state` is already a ``Generator`` or ``RandomState`` instance then that instance is used. The default is None Returns ------- NDArrayFloat The ensemble of realizations with shape $(N_{\mathrm{s}} \times N_{\mathrm{e}})$ """ if random_state is not None: _random_state = check_random_state(random_state) else: _random_state = np.random.default_rng() return factor @ _random_state.normal( scale=1.0, size=(factor.shape[0], n_samples) ) + mean.reshape(-1, 1)