Source code for pyrtid.forward.transport_solver

"""Provide a reactive transport solver."""

from __future__ import annotations

import warnings
from typing import Tuple

import numpy as np
from scipy.sparse import lil_array
from scipy.sparse.linalg import gmres

from pyrtid.forward.models import (
    FlowModel,
    TimeParameters,
    TransportModel,
    get_owner_neigh_indices,
)
from pyrtid.utils import NDArrayFloat, RectilinearGrid, harmonic_mean
from pyrtid.utils.operators import get_super_ilu_preconditioner


def fill_trmat_for_axis(
    grid: RectilinearGrid,
    fl_model: FlowModel,
    tr_model: TransportModel,
    q_next: lil_array,
    q_prev: lil_array,
    disp: NDArrayFloat,
    time_index: int,
    axis: int,
) -> None:
    if axis == 0:
        u_darcy = fl_model.u_darcy_x
    elif axis == 1:
        u_darcy = fl_model.u_darcy_y
    elif axis == 2:
        u_darcy = fl_model.u_darcy_z
    else:
        raise ValueError()

    crank_adv: float = tr_model.crank_nicolson_advection
    crank_diff: float = tr_model.crank_nicolson_diffusion
    fwd_slicer = grid.get_slicer_forward(axis)
    bwd_slicer = grid.get_slicer_backward(axis)

    dmean: NDArrayFloat = np.zeros(grid.shape, dtype=np.float64)
    dmean[fwd_slicer] = harmonic_mean(disp[fwd_slicer], disp[bwd_slicer])
    dmean = dmean.flatten(order="F")

    tmp_diff: float = grid.gamma_ij(axis) / grid.pipj(axis) / grid.grid_cell_volume

    tmp_un = np.zeros(grid.shape)
    tmp_un[fwd_slicer] = u_darcy[*bwd_slicer, time_index]
    tmp_un_old = np.zeros(grid.shape)
    tmp_un_old[fwd_slicer] = u_darcy[*bwd_slicer, time_index - 1]

    un = tmp_un.flatten(order="F")
    un_old = tmp_un_old.flatten(order="F")

    tmp_adv = grid.gamma_ij(axis) / grid.grid_cell_volume

    # Forward scheme:
    normal = 1.0
    idc_owner, idc_neigh = get_owner_neigh_indices(
        grid,
        fwd_slicer,
        bwd_slicer,
        owner_indices_to_keep=tr_model.free_conc_nn,
    )

    q_next[idc_owner, idc_owner] += crank_diff * dmean[idc_owner] * tmp_diff + (
        crank_adv * np.where(normal * un > 0.0, normal * un, 0.0)[idc_owner] * tmp_adv
    )  # type: ignore
    q_next[idc_owner, idc_neigh] += -(crank_diff * dmean[idc_owner] * tmp_diff) + (
        crank_adv * np.where(normal * un <= 0.0, normal * un, 0.0)[idc_owner] * tmp_adv
    )  # type: ignore

    q_prev[idc_owner, idc_owner] -= (1.0 - crank_diff) * dmean[idc_owner] * tmp_diff + (
        (1 - crank_adv)
        * np.where(normal * un_old > 0.0, normal * un_old, 0.0)[idc_owner]
        * tmp_adv
    )  # type: ignore
    q_prev[idc_owner, idc_neigh] -= -(
        (1.0 - crank_diff) * dmean[idc_owner] * tmp_diff
    ) + (
        (1 - crank_adv)
        * np.where(normal * un_old <= 0.0, normal * un_old, 0.0)[idc_owner]
        * tmp_adv
    )  # type: ignore

    # Backward scheme
    normal = -1.0
    idc_owner, idc_neigh = get_owner_neigh_indices(
        grid,
        bwd_slicer,
        fwd_slicer,
        owner_indices_to_keep=tr_model.free_conc_nn,
    )

    q_next[idc_owner, idc_owner] += crank_diff * dmean[idc_neigh] * tmp_diff + (
        crank_adv * np.where(normal * un > 0.0, normal * un, 0.0)[idc_neigh] * tmp_adv
    )  # type: ignore
    q_next[idc_owner, idc_neigh] += -(crank_diff * dmean[idc_neigh] * tmp_diff) + (
        crank_adv * np.where(normal * un <= 0.0, normal * un, 0.0)[idc_neigh] * tmp_adv
    )  # type: ignore
    q_prev[idc_owner, idc_owner] -= (1.0 - crank_diff) * dmean[idc_neigh] * tmp_diff + (
        (1.0 - crank_adv)
        * np.where(normal * un_old > 0.0, normal * un_old, 0.0)[idc_neigh]
        * tmp_adv
    )  # type: ignore

    q_prev[idc_owner, idc_neigh] -= -(1.0 - crank_diff) * dmean[
        idc_neigh
    ] * tmp_diff + (
        (1.0 - crank_adv)
        * np.where(normal * un_old <= 0.0, normal * un_old, 0.0)[idc_neigh]
        * tmp_adv
    )  # type: ignore


def make_transport_matrices(
    grid: RectilinearGrid,
    tr_model: TransportModel,
    fl_model: FlowModel,
    time_index: int,
) -> Tuple[lil_array, lil_array]:
    """
    Make matrices for the transport.


    Parameters
    ----------
    time_index: int
        The iteration, or timestep id.
    """

    dim = grid.n_grid_cells
    q_prev = lil_array((dim, dim), dtype=np.float64)
    q_next = lil_array((dim, dim), dtype=np.float64)

    # diffusion + dispersivity
    disp = (
        tr_model.effective_diffusion
        + tr_model.dispersivity * fl_model.get_u_darcy_norm_sample(time_index)
    )

    for n, axis in zip(grid.shape, (0, 1, 2)):
        if n >= 2:
            fill_trmat_for_axis(
                grid, fl_model, tr_model, q_next, q_prev, disp, time_index, axis
            )

    _apply_transport_sink_term(fl_model, tr_model, q_next, q_prev, time_index)

    _apply_divergence_effect(fl_model, tr_model, q_next, q_prev, time_index)

    # Handle boundary conditions
    _add_transport_boundary_conditions(
        grid, fl_model, tr_model, q_next, q_prev, time_index
    )

    return q_next, q_prev


def _apply_transport_sink_term(
    fl_model: FlowModel,
    tr_model: TransportModel,
    q_next: lil_array,
    q_prev: lil_array,
    time_index: int,
) -> None:
    flw = fl_model.lunitflow[time_index].flatten(order="F")
    _flw = np.where(flw < 0, flw, 0.0)  # keep only negative flowrates
    flw_old = fl_model.lunitflow[time_index - 1].flatten(order="F")
    _flw_old = np.where(flw_old < 0, flw_old, 0.0)  # keep only negative flowrates
    q_next.setdiag(q_next.diagonal() - tr_model.crank_nicolson_advection * _flw)
    q_prev.setdiag(
        q_prev.diagonal() + (1 - tr_model.crank_nicolson_advection) * _flw_old
    )


def _apply_divergence_effect(
    fl_model: FlowModel,
    tr_model: TransportModel,
    q_next: lil_array,
    q_prev: lil_array,
    time_index: int,
) -> None:
    """
    Take into account the divergence: dcdt+U.grad(c)=L(u)."""

    div = (fl_model.lu_darcy_div[time_index] - fl_model.lunitflow[time_index]).flatten(
        order="F"
    )
    div_old = (
        fl_model.lu_darcy_div[time_index - 1] - fl_model.lunitflow[time_index - 1]
    ).flatten(order="F")

    q_next.setdiag(q_next.diagonal() - tr_model.crank_nicolson_advection * div)
    q_prev.setdiag(
        q_prev.diagonal() + (1 - tr_model.crank_nicolson_advection) * div_old
    )


def _add_transport_boundary_conditions_for_axis(
    grid: RectilinearGrid,
    fl_model: FlowModel,
    tr_model: TransportModel,
    q_next: lil_array,
    q_prev: lil_array,
    time_index: int,
    axis: int,
) -> None:
    if axis == 0:
        u_darcy = fl_model.u_darcy_x
        bd1_slicer = (slice(0, 1), slice(None), slice(None))
        bd2_slicer = (slice(grid.nx - 1, grid.nx), slice(None), slice(None))
    elif axis == 1:
        u_darcy = fl_model.u_darcy_y
        bd1_slicer = (slice(None), slice(0, 1), slice(None))
        bd2_slicer = (slice(None), slice(grid.ny - 1, grid.ny), slice(None))
    elif axis == 2:
        u_darcy = fl_model.u_darcy_z
        bd1_slicer = (slice(None), slice(None), slice(0, 1))
        bd2_slicer = (slice(None), slice(None), slice(grid.nz - 1, grid.nz))
    else:
        raise ValueError()

    fwd_slicer = grid.get_slicer_forward(axis, shift=1)
    bwd_slicer = grid.get_slicer_backward(axis, shift=1)

    idc_left_border, idc_right_border = get_owner_neigh_indices(
        grid,
        bd1_slicer,
        bd2_slicer,
    )
    tmp = grid.gamma_ij(axis) / grid.grid_cell_volume

    # left border
    _un = u_darcy[*fwd_slicer, time_index].ravel("F")[idc_left_border]
    _un_old = u_darcy[*fwd_slicer, time_index - 1].ravel("F")[idc_left_border]
    normal = -1.0
    q_next[idc_left_border, idc_left_border] += (
        tr_model.crank_nicolson_advection * _un * tmp * normal
    )  # type: ignore
    q_prev[idc_left_border, idc_left_border] -= (
        (1 - tr_model.crank_nicolson_advection) * _un_old * tmp * normal
    )  # type: ignore

    # right border
    _un = u_darcy[*bwd_slicer, time_index].ravel("F")[idc_right_border]
    _un_old = u_darcy[*bwd_slicer, time_index - 1].ravel("F")[idc_right_border]
    normal = 1.0
    q_next[idc_right_border, idc_right_border] += (
        tr_model.crank_nicolson_advection * _un * tmp * normal
    )  # type: ignore
    q_prev[idc_right_border, idc_right_border] -= (
        (1 - tr_model.crank_nicolson_advection) * _un_old * tmp * normal
    )  # type: ignore


def _add_transport_boundary_conditions(
    grid: RectilinearGrid,
    fl_model: FlowModel,
    tr_model: TransportModel,
    q_next: lil_array,
    q_prev: lil_array,
    time_index: int,
) -> None:
    """Add the boundary conditions to the matrix."""
    # We get the indices of the four borders and we apply a zero gradient.

    for n, axis in zip(grid.shape, (0, 1, 2)):
        if n >= 2:
            _add_transport_boundary_conditions_for_axis(
                grid, fl_model, tr_model, q_next, q_prev, time_index, axis
            )


[docs]def solve_transport_semi_implicit( grid: RectilinearGrid, fl_model: FlowModel, tr_model: TransportModel, conc_sources: NDArrayFloat, conc_sources_old: NDArrayFloat, time_params: TimeParameters, time_index: int, nfpi: int, ) -> int: """ Compute the transport of the mobile concentrations. Parameters ---------- grid: RectilinearGrid RectilinearGrid of the system. fl_model: FlowModel The flow model. tr_model: TransportModel The transport model. time_params: TimeParameters Time parameters of the system. time_index: int The iteration, or timestep id. nfpi: Number of fixed point iterations. """ # The matrix with respect to the diffusion never changes. # The matrix with respect to the advection only needs to be updated if the head # have changed. if nfpi == 1: q_next, q_prev = make_transport_matrices(grid, tr_model, fl_model, time_index) # Add 1/dt for the left term contribution q_next.setdiag( q_next.diagonal() + tr_model.porosity.flatten("F") / time_params.dt ) q_prev.setdiag( q_prev.diagonal() + tr_model.porosity.flatten("F") / time_params.dt ) tr_model.q_next = q_next tr_model.q_prev = q_prev if tr_model.is_save_spmats: tr_model.l_q_next.append(q_next) tr_model.l_q_prev.append(q_prev) # Build the LU preconditioning -> to do only once. super_ilu, preconditioner = get_super_ilu_preconditioner( q_next.tocsc(), drop_tol=1e-10, fill_factor=100 ) tr_model.super_ilu = super_ilu tr_model.preconditioner = preconditioner if super_ilu is None: warnings.warn( f"SuperILU: q_next is singular in transport at it={time_index}!" ) else: q_next = tr_model.q_next q_prev = tr_model.q_prev super_ilu = tr_model.super_ilu preconditioner = tr_model.preconditioner # Multiply prev matrix by prev vector tmp = tr_model.q_prev.dot( tr_model.lmob[time_index - 1].reshape(tr_model.n_sp, -1, order="F").T ).T # Chemical source term if tr_model.is_num_acc_for_timestep and nfpi == 1 and time_index != 1: dmdt = tr_model.limmob[time_index - 1] - tr_model.limmob[time_index - 2] # avoid negative values if np.any(tr_model.lmob[time_index - 1] - dmdt < 0): dmdt = tr_model.limmob[time_index] - tr_model.limmob[time_index - 1] else: dmdt = tr_model.limmob[time_index] - tr_model.limmob[time_index - 1] # The volume is included in the diffusion term tmp -= ( dmdt.reshape(tr_model.n_sp, -1, order="F") * tr_model.porosity.ravel("F") / time_params.dt ) # Add the source terms -> depends on the advection (positive flowrates = injection) tmp[:, :] += tr_model.crank_nicolson_advection * conc_sources.reshape( 2, -1, order="F" ) + (1.0 - tr_model.crank_nicolson_advection) * conc_sources_old.reshape( 2, -1, order="F" ) # Solve Ax = b with A sparse using LU preconditioner for sp in range(tmp.shape[0]): tmp[sp, :], exit_code = gmres( q_next.tocsc(), tmp[sp, :], x0=super_ilu.solve(tmp[sp, :]) if super_ilu is not None else None, M=preconditioner, rtol=tr_model.rtol, ) tr_model.lmob[time_index] = tmp.reshape(tr_model.n_sp, *grid.shape, order="F") return exit_code