Source code for pyrtid.forward.solver

"""Provide a reactive transport solver."""

from __future__ import annotations

import logging

import numpy as np
import scipy as sp

from pyrtid.utils import NDArrayFloat

from .flow_solver import (
    compute_u_darcy_div,
    solve_flow_stationary,
    solve_flow_transient_semi_implicit,
)
from .geochem_solver import solve_geochem
from .models import (
    H_PLUS_CONC,
    TDS_LINEAR_COEFFICIENT,
    VERY_SMALL_NUMBER,
    WATER_DENSITY,
    WATER_MW,
    FlowRegime,
    ForwardModel,
    TransportModel,
)
from .transport_solver import solve_transport_semi_implicit


[docs]def get_max_coupling_error(current_arr, prev_arr) -> float: num = np.where( np.abs(current_arr) <= VERY_SMALL_NUMBER, VERY_SMALL_NUMBER, current_arr ) den = np.where(np.abs(prev_arr) <= VERY_SMALL_NUMBER, VERY_SMALL_NUMBER, prev_arr) return float( np.nan_to_num( np.max(np.abs(1 - num / den)), nan=0.0, ) )
def get_max_coupling_error_forward(tr_model: TransportModel, time_index: int) -> float: r""" Return the maximum transport-chemistry coupling error. The fixed point iteration convergence criteria reads: .. math:: \text{max} \left\lVert 1 - \dfrac{\overline{c}^{n+1, k+1}} {\overline{c}^{n+1, k}} \right\rVert < \epsilon with $k$ the number of fixed point iterations. This error is evaluated from the immobile concentrations (mineral grades). """ return get_max_coupling_error(tr_model.limmob[time_index], tr_model.immob_prev)
[docs]class ForwardSolver: """Class solving the reactive transport forward systems."""
[docs] def __init__(self, model: ForwardModel) -> None: # The model needs to be copied self.model: ForwardModel = model
[docs] def initialize(self) -> None: """ Initialize the system (t=0). It includes the stationary flow resolution. """ # Reinit all self.model.reinit() # If stationary -> equilibrate the initial heads with sources # and boundary conditions # Get the flow and concentration sources unitflw_sources, conc_sources = self.model.get_sources( self.model.time_params.time_elapsed, self.model.grid ) self.model.fl_model.lunitflow.append(unitflw_sources) self.model.tr_model.lsources.append(conc_sources) # Update the initial density self.model.tr_model.ldensity.append( get_density( self.model.tr_model.lmob[0], self.model.gch_params.Ms, self.model.gch_params.Ms2, ) ) if self.model.fl_model.regime == FlowRegime.STATIONARY: solve_flow_stationary( self.model.grid, self.model.fl_model, self.model.tr_model, unitflw_sources, 0, ) else: # To reproduce HYTEC's behavior -> the initial darcy velocity is null self.model.fl_model.lu_darcy_x = [ np.zeros( (self.model.grid.nx + 1, self.model.grid.ny, self.model.grid.nz) ) ] self.model.fl_model.lu_darcy_y = [ np.zeros( (self.model.grid.nx, self.model.grid.ny + 1, self.model.grid.nz) ) ] self.model.fl_model.lu_darcy_z = [ np.zeros( (self.model.grid.nx, self.model.grid.ny, self.model.grid.nz + 1) ) ] compute_u_darcy_div(self.model.fl_model, self.model.grid, 0) # Add the stiffness matrices A and B so that indices match between # the forward and the adjoint. This is for development purposes. ngc = self.model.grid.n_grid_cells # only useful for devs or to check the adjoint state correctness if self.model.fl_model.is_save_spmats: self.model.fl_model.l_q_next.append(sp.sparse.identity(ngc)) self.model.fl_model.l_q_prev.append(sp.sparse.lil_array((ngc, ngc)))
[docs] def solve(self, is_verbose: bool = False) -> None: """Solve the forward problem. Parameters ---------- is_verbose: bool Whether to display info. The default is False. """ self.initialize() time_index = 0 # iteration on time # Sequential iterative approach with operator splitting while self.model.time_params.time_elapsed < self.model.time_params.duration: time_index += 1 # Update the number of time iterations # Reset numerical acceleration if it was temporarily disabled self.model.tr_model.is_num_acc_for_timestep = ( self.model.tr_model.is_numerical_acceleration ) self._solve_system_for_timestep(time_index, is_verbose)
def _solve_system_for_timestep( self, time_index: int, is_verbose: bool = False ) -> None: # Do not update the timestep for the first iteration # update the timestep based on the convergence speed. if time_index != 1: # The CFL criterion is evaluated based on the previous timestep dt_max_cfl = self.model.time_params.get_dt_max_cfl( self.model, time_index - 1 ) self.model.time_params.update_dt( self.model.time_params.nfpi, dt_max_cfl, self.model.tr_model.max_fpi ) # Important: need to save the timestep after the update, otherwise, the # wrong timestep is used in the adjoint # Save the timesteps to the list of timesteps self.model.time_params.save_dt() # Get the sources unitflw_sources_old = self.model.fl_model.lunitflow[-1] conc_sources_old = self.model.tr_model.lsources[-1] # Careful, we need to consider the time at the beginning of the timestep unitflw_sources, conc_sources = self.model.get_sources( np.sum(self.model.time_params.ldt[:-1]), self.model.grid ) self.model.fl_model.lunitflow.append(unitflw_sources) self.model.tr_model.lsources.append(conc_sources) # Solve the flow -> no iterations with transport/chemistry since we don't have # variable permeability nor porosity/diffusion. solve_flow_transient_semi_implicit( self.model.grid, self.model.fl_model, self.model.tr_model, unitflw_sources, unitflw_sources_old, self.model.time_params, time_index, ) # Now the reactive-transport iterations begin... # Reset the number of coupling (Fixed Point) iterations for the current time self.model.time_params.nfpi = 0 # Convergence flag -> set to True to skip the chemistry part has_converged = self.model.tr_model.is_skip_rt # Copy the grades (To place in another function afterwards) self.model.tr_model.limmob.append( self.model.tr_model.limmob[time_index - 1].copy() ) self.model.tr_model.lmob.append(self.model.tr_model.lmob[time_index - 1].copy()) # Iterate the chemistry transport system while the convergence is no meet while not has_converged: if self.model.time_params.nfpi > self.model.tr_model.max_fpi: if self.model.tr_model.is_num_acc_for_timestep: # temporary disabling of numerical acceleration self.model.tr_model.is_num_acc_for_timestep = False # restart the timestep self._solve_system_for_timestep(time_index, is_verbose) # Save the grade for the fix point iterations self.model.tr_model.immob_prev = self.model.tr_model.limmob[ time_index ].copy() # One more coupling iteration has been performed # Update the number of FPI self.model.time_params.nfpi += 1 # Solve the transport solve_transport_semi_implicit( self.model.grid, self.model.fl_model, self.model.tr_model, conc_sources, conc_sources_old, self.model.time_params, time_index, self.model.time_params.nfpi, ) # Solve the chemistry solve_geochem( self.model.grid, self.model.tr_model, self.model.gch_params, self.model.time_params, time_index, ) if is_verbose: logging.info( f"max-coupling error at it = {time_index}" f"-{self.model.time_params.nfpi}:" f"{get_max_coupling_error_forward(self.model.tr_model, time_index)}" ) has_converged = ( get_max_coupling_error_forward(self.model.tr_model, time_index) < self.model.tr_model.fpi_eps ) if is_verbose: logging.info(f"has-converged ?: {has_converged}") # Save the number of fixed point iterations required self.model.time_params.save_nfpi() # Update the density for the current timestep self.model.tr_model.ldensity.append( get_density( self.model.tr_model.lmob[-1], self.model.gch_params.Ms, self.model.gch_params.Ms2, ) )
def get_density(conc: NDArrayFloat, mw1: float, mw2: float) -> NDArrayFloat: """ Parameters ---------- conc : NDArrayFloat Array of concentration (3D) with shape (2, Nx, Ny) in mol/kg. mv1: float Molar weight of the first species (g/mol). mv2: float Molar weight of the second species (g/mol). Returns ------- NDArrayFloat 2D array of densities with shape (Nx, Ny). """ # total dissolved fraction # Add H[+] and OH[-] contributions which concentrations are not # At pH=7, there are always equal numbers of H + and OH -, so we take # the molar weight of water (sum of the two previous). # tds += H_PLUS_CONC * WATER_MW; tds = np.ones(conc[0].shape) * H_PLUS_CONC * WATER_MW # Add species 1 tds += conc[0] * mw1 / 1000 # we divide per 1000 because we need kg/mol # Add species 2 tds += conc[1] * mw2 / 1000 # we divide per 1000 because we need kg/mol # 2) compute the density (this is implemented only for water solvent) # in kg/l # tds[:, :] = ( # 0.1 * time_index # * np.random.default_rng(time_index).random() # ) # * np.random.default_rng(time_index).random(size=tds.shape) return WATER_DENSITY * (TDS_LINEAR_COEFFICIENT * tds + 1.0)