Source code for pyrtid.forward.models

"""Provide a model representing the reactive transport system.

Note: this part is handle with numba so it can be used in functions later on.

Note :
- The timestep is fixed.
- The grid is composed of regular grid cells
"""

from __future__ import annotations

import copy
import types
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union

import numpy as np
from scipy.sparse import lil_array
from scipy.sparse.linalg import LinearOperator, SuperLU

from pyrtid.utils import (
    NDArrayBool,
    NDArrayFloat,
    NDArrayInt,
    RectilinearGrid,
    StrEnum,
    get_a_not_in_b_1d,
    node_number_to_indices,
    object_or_object_sequence_to_list,
    span_to_node_numbers_3d,
)

GRAVITY = 9.81
WATER_DENSITY = 997
WATER_MW = 0.01801528  # kg/mol
TDS_LINEAR_COEFFICIENT = 1.0  # -> for the densities calculation
H_PLUS_CONC = 1.00603e-07  # mol/l -> for the densities calculation
SMALL_NUMBER = 1e-10
VERY_SMALL_NUMBER = 1e-30


[docs]class TimeParameters: """ Class defining the time parameters used in the simulation. It also handles the variable timestep. Attributes ---------- duration : float Desired duration of the simulation.Duration dt : float Current timestep in seconds. dt_init : float Initial timestep in seconds. dt_min : Optional[float] Minimum timestep in seconds. dt_max : Optional[float] Maximum timestep in seconds. courant_factor: float The timestep is generally limited to some maximum value by the flow and transport models, to assure numerical stability. The Courant-Friedlichs- Lewy-Factor is a relaxation parameter for the maximum timestep. Reactive systems often allow to relax the (very restrictive) maximum timestep, imposed by the transport model. For values greater than 1, the timestep re- striction will be relaxed. On the contrary, the restriction will be tightened for values inferior to 1. Reactive systems often allow to relax the maximum timestep by a factor 5, 10 or even 20. Using this option, however, may be dangerous and possibly lead to failure of the model. Always test the results obtained against a case without this parameter set. The default is 1.0. ldt: List[float] List of successive timesteps (in seconds) used in the forward modelling. nts: int Number of timesteps in the simulation. nt: int Number of times in the simulation (nts + 1). nfpi: int Number of fixed point iterations used in the last time iteration. lnfpi: List of the number of fixed point iterations used for each time iteration. This list should have the same length as `ldt`. times: NDArrayFloat Array of times in second from 0 to t_max." """ __slots__ = [ "duration", "dt", "dt_init", "dt_min", "dt_max", "ldt", "nfpi", "lnfpi", "courant_factor", ]
[docs] def __init__( self, duration: float, dt_init: float, dt_min: Optional[float] = None, dt_max: Optional[float] = None, courant_factor: float = 1.0, ) -> None: """Initialize the instance.""" self.duration = duration # First pass on the min/max if dt_min is not None: _dt_min: float = dt_min else: _dt_min = dt_init if dt_max is not None: _dt_max: float = dt_max else: _dt_max = dt_init _dt_init = max(min(_dt_max, dt_init), _dt_min, dt_init) # Second pass on the min/max if dt_min is not None: self.dt_min: float = dt_min else: self.dt_min = _dt_init if dt_max is not None: self.dt_max: float = dt_max else: self.dt_max = _dt_init # Check dt_min and dt_max consistency if self.dt_min > self.dt_max: raise ValueError(f"dt_min ({self.dt_min}) is above dt_max ({self.dt_max})!") self.courant_factor: float = courant_factor # Apply bounds self.dt_init: float = _dt_init self.dt = _dt_init self.nfpi: int = 0 self.ldt: List[float] = [] self.lnfpi: List[int] = []
@property def time_elapsed(self) -> float: """Time elapsed in the simulation.""" return np.sum(self.ldt) @property def nts(self) -> int: """ Number of timesteps (dt). It is the number of times (`nt`) - 1. """ return len(self.ldt) @property def nt(self) -> int: """ Number of times (including t0). It is the number of timesteps (`nts`) +1. """ return self.nts + 1 @property def times(self) -> NDArrayFloat: """Return all the times in second from 0 to t_max.""" return np.cumsum([0] + self.ldt)
[docs] def reset_to_init(self) -> None: """Empty the list of timesteps and set dt to its initial value.""" self.dt = self.dt_init self.ldt = [] self.lnfpi = []
[docs] def save_dt(self) -> None: "Save the current timestep to the list of timesteps." self.ldt.append(self.dt)
[docs] def save_nfpi(self) -> None: "Save the current number of fixed point iterations." self.lnfpi.append(self.nfpi)
[docs] def update_dt(self, n_iter: float, dt_max_cfl: float, max_fpi: int) -> None: """ Update the timestep. Parameters ---------- n_iter: int Number of iterations required to solve the last timestep. dt_max_cfl: float Maximum timestep according to the CFL. max_fpi: int Maximum number of fixed point iterations per timestep. """ if n_iter < max_fpi: # increase dt by 2% self.dt *= 1.02 else: # decrease dt by 30% self.dt *= 0.7 # Ensure CFL respect if self.dt > dt_max_cfl: self.dt = dt_max_cfl # Ensure timebounds if self.dt < self.dt_min: self.dt = self.dt_min if self.dt > self.dt_max: self.dt = self.dt_max
[docs] def get_dt_max_cfl(self, model: ForwardModel, time_index: int) -> float: """Get the maximum timestep to respect the CFL condition.""" dt_cfl = np.min( self.courant_factor * model.tr_model.porosity * model.get_ij_over_u(time_index) ) return float(np.min(dt_cfl))
[docs]class FlowRegime(StrEnum): STATIONARY = "stationary" TRANSIENT = "transient"
[docs]class VerticalAxis(StrEnum): X = "x" Y = "y" Z = "z"
[docs]class FlowParameters: """ Class defining the time parameters used in the simulation. Attributes ---------- k0: float, optional Default permeability in the grid (m/s). The default is 1.e-4 m/s. storage_coefficient: float, optional The default storage coefficient in the grid ($m^{-1}$). The default is 1e-3 $m^{-1}$. crank_nicolson: float The Crank-Nicolson parameter allows to set the temporal resolution scheme to explicit, fully implicit or somewhere in between these two extremes. The value must be comprised between 0.0 and 1.0, 0.0 being a full explicit scheme and 1.0 fully implicit. The default is 1.0. is_gravity: bool, optional Whether the gravity is taken into account, i.e. density driven flow. vertical_axis: VerticalAxis Define which axis is the vertical one. It only affects if the gravity is enabled. The default is the z axis. tolerance: float, optional The tolerance on the flow. The default is 1e-8. """
[docs] def __init__( self, permeability: float = 1e-4, storage_coefficient: float = 1.0, crank_nicolson: float = 1.0, regime: FlowRegime = FlowRegime.STATIONARY, is_gravity: bool = False, vertical_axis: VerticalAxis = VerticalAxis.Z, rtol: float = 1e-8, ) -> None: """Initialize the instance.""" self.permeability: float = permeability self.storage_coefficient: float = storage_coefficient self.crank_nicolson: float = crank_nicolson self.regime: FlowRegime = regime self.is_gravity: bool = is_gravity self.vertical_axis: VerticalAxis = vertical_axis self.rtol: float = rtol
[docs]class TransportParameters: """ Class defining the transport parameters used in the simulation. Attributes ---------- diffusion: float, optional Default diffusion coefficient in the grid in [m2/s]. The default is 1e-4 m2/s. dispercivity: float, optional The dispersivity (kinematic and numeric) in meters. The default is 0.1 m. porosity: float, optional Default porosity in the grid Should be a number between 0 and 1. The default is 1.0. crank_nicolson: float The Crank-Nicholson parameter allows to set the temporal resolution scheme to explicit, fully implicit or somewhere in between these two extremes. The value must be comprised between 0.0 and 1.0, 0.0 being a full explicit scheme and 1.0 fully implicit. The default is 0.5. tolerance: float, optional The tolerance on the transport. The default is 1e-8. is_numerical_acceleration: bool, optional Whether to use the chemical source term from the previous iteration (at t=n-1) as a first guess in the transport equation (only apply to the first coupling fixed point iteration). In practise it might save one iteration (transport- chemistry) or more if the system is in a quasi steady-state and it might also reduce the overall coupling error. However if the timestep is large or the system unstable (stiff), it might lead to non-convergence as well. For more information, refer to :cite:`lagneauOperatorsplittingbasedReactiveTransport2010`. The default is False. is_skip_rt: bool Whether to skip the reactive-transport step, considering only the flow problem. fpi_eps: float Tolerance on the transport-chemistry coupling error. The default value is 1e-5. max_fpi: int Maximum number of fixed point iterations per timestep. If this number is reached then the numerical acceleration is temporarily disabled, otherwise, the timestep is reduced. """
[docs] def __init__( self, diffusion: float = 1e-4, dispersivity: float = 0.1, porosity: float = 1.0, crank_nicolson_advection: float = 0.5, crank_nicolson_diffusion: float = 1.0, rtol: float = 1e-8, is_numerical_acceleration: bool = False, is_skip_rt: bool = False, fpi_eps: float = 1e-5, max_fpi: int = 20, ) -> None: """Initialize the instance.""" self.diffusion: float = diffusion self.dispersivity: float = dispersivity self.porosity: float = porosity self.crank_nicolson_advection: float = crank_nicolson_advection self.crank_nicolson_diffusion: float = crank_nicolson_diffusion self.rtol: float = rtol self.is_numerical_acceleration: bool = is_numerical_acceleration self.is_skip_rt: bool = is_skip_rt self.fpi_eps: float = fpi_eps self.max_fpi: int = max_fpi
[docs]class GeochemicalParameters: """ Class defining the geocgemical parameters used in the simulation. Attributes ---------- conc: float, optional Initial tracer concentration in the grid in molal. The default is 0.0. conc2: float, optional Initial reagent concentration in the grid in molal. The default is 0.0. grade: float, optional Default mineral grade in the grid in mol/kg (kg of water). The default is 0.0. kv: float, optional The kinetic rate of the mineral in [mol/m2/s]. The default is -6.9e-9. As: float, optional Specific area in [m2/mol]. The default is 13.5. Ks: float, optional Solubility constant (no unit). The default is 6.3e-4. Ms: float, optional Molar mass in g/mol. stocoef: float Number of mole of species 2 consumed when dissolving the mineral. """
[docs] def __init__( self, conc: float = SMALL_NUMBER, conc2: float = SMALL_NUMBER, grade: float = SMALL_NUMBER, grade2: float = SMALL_NUMBER, kv: float = -6.9e-9, As: float = 13.5, Ks: float = 6.3e-4, Ms: float = 270, Ms2: float = 270, stocoef: float = 1.0, use_explicit_formulation: bool = True, ) -> None: """Initialize the instance.""" self.conc: float = conc self.conc2: float = conc2 self.grade: float = grade self.grade2: float = grade2 self.kv: float = kv self.As: float = As self.Ks: float = Ks self.Ms: float = Ms self.Ms2: float = Ms2 self.stocoef: float = stocoef self.use_explicit_formulation = use_explicit_formulation
[docs]class SourceTerm: """ Define a source term object. A well object can pump or inject. Attributes ---------- name: str Name of the instance. x_coord: float x coordinate of the well. y_coord: float y_coordinate of the well. flowrates: NDArrayFloat Sequence of flowrates of the well. Positive = injection, negative = pumping. concentrations: NDArrayFloat Concentrations of the first species, used only if flowrates is positive. """ __slots__ = [ "name", "node_ids", "times", "flowrates", "concentrations", ]
[docs] def __init__( self, name: str, node_ids: NDArrayInt, times: NDArrayFloat, flowrates: NDArrayFloat, concentrations: NDArrayFloat, ) -> None: """ Initialize the instance. Parameters ---------- name: str Name of the instance. x_coord: float x coordinate of the well. y_coord: float y_coordinate of the well. flowrates: NDArrayFloat Sequence of flowrates of the well (m3/s). Positive = injection, negative = pumping. concentrations: NDArrayFloat Concentration, used only if flowrates is positive (mol/l). With dimension (nt, n_sp). """ self.name = name self.node_ids = np.array(node_ids).reshape(-1) self.times = np.array(times).reshape(-1) self.flowrates = np.array(flowrates).reshape(-1) _conc = np.array(concentrations) self.concentrations = _conc.reshape(_conc.shape[0], -1) if ( self.concentrations.shape[0] != self.times.size or self.flowrates.size != self.times.size ): raise ValueError( "Times, flowrates and concentrations must have the same dimension !" )
[docs] def get_node_indices(self, grid: RectilinearGrid) -> NDArrayInt: """Return the node indices.""" return np.array( node_number_to_indices(self.node_ids, nx=grid.nx, ny=grid.ny) ).reshape(3, -1)
@property def n_nodes(self) -> int: """Return the number of nodes.""" return np.size(self.node_ids)
[docs] def get_values(self, time: float) -> Tuple[float, float]: """Return the concentrations and the flowrates for a given time.""" if time < self.times[0]: return 0.0, 0.0 time_index = 0 # index in times # This is matching the "modify" process behavior of HYTEC for time_index, _time in enumerate(self.times): if _time > time: if time_index > 0: time_index -= 1 break return self.flowrates[time_index], self.concentrations[time_index]
@dataclass class BoundaryCondition(ABC): """ Represent a boundary condition. Parameters ---------- span: slice The span over which the condition applies. """ span: Union[NDArrayInt, Tuple[slice, slice, slice]]
[docs]@dataclass class ConstantHead(BoundaryCondition): """ Represent a constant head condition (Dirichlet). Parameters ---------- span: slice The span over which the condition applies. values: Union[float, NDArrayFloat] The values to set. """ span: Union[NDArrayInt, Tuple[slice, slice, slice], slice] values: Union[float, NDArrayFloat]
[docs]@dataclass class ConstantConcentration(BoundaryCondition): """ Represent a constant conentration boundary condition (Dirichlet). Parameters ---------- span: slice The span over which the condition applies. values: Union[float, NDArrayFloat] The values to set. """ span: Union[NDArrayInt, Tuple[slice, slice, slice], slice] values: Union[float, NDArrayFloat]
[docs]@dataclass class ZeroConcGradient(BoundaryCondition): """ Represent a zero conentration gradient boundary condition (Neumann). Parameters ---------- span: slice The span over which the condition applies. """ span: Union[NDArrayInt, Tuple[slice, slice, slice], slice]
class FlowModel(ABC): """Represent a flow model.""" __slots__ = [ "vertical_axis", "vertical_mesh_size", "crank_nicolson", "storage_coefficient", "permeability", "lhead", "lpressure", "lu_darcy_x", "lu_darcy_y", "lu_darcy_z", "lu_darcy_div", "lunitflow", "boundary_conditions", "cst_head_nn", "regime", "q_prev_no_dt", "q_next_no_dt", "q_prev", "q_next", "rtol", "west_boundary_idx", "east_boundary_idx", "north_boundary_idx", "south_boundary_idx", "top_boundary_idx", "bottom_boundary_idx", "is_save_spmats", "l_q_next", "l_q_prev", "is_save_spilu", "super_ilu", "preconditioner", ] def __init__( self, grid: RectilinearGrid, time_params: TimeParameters, fl_params: FlowParameters, ) -> None: """Initialize the instance.""" self.crank_nicolson: float = fl_params.crank_nicolson self.storage_coefficient: NDArrayFloat = ( np.ones(grid.shape, dtype=np.float64) * fl_params.storage_coefficient ) self.regime: FlowRegime = fl_params.regime self.permeability: NDArrayFloat = ( np.ones(grid.shape, dtype=np.float64) * fl_params.permeability ) self.lu_darcy_x: List[NDArrayFloat] = [] self.lu_darcy_y: List[NDArrayFloat] = [] self.lu_darcy_z: List[NDArrayFloat] = [] self.lu_darcy_div: List[NDArrayFloat] = [] self.lunitflow: List[NDArrayFloat] = [] self.boundary_conditions: List[BoundaryCondition] = [] self.q_prev_no_dt = lil_array((grid.n_grid_cells, grid.n_grid_cells)) self.q_next_no_dt = lil_array((grid.n_grid_cells, grid.n_grid_cells)) self.q_prev = lil_array((grid.n_grid_cells, grid.n_grid_cells)) self.q_next = lil_array((grid.n_grid_cells, grid.n_grid_cells)) self.cst_head_nn: NDArrayInt = np.array([], dtype=np.int64) self.rtol = fl_params.rtol self.vertical_axis = fl_params.vertical_axis self.vertical_mesh_size = { VerticalAxis.X: grid.dx, VerticalAxis.Y: grid.dy, VerticalAxis.Z: grid.dz, }[fl_params.vertical_axis] # Empty arrays self.west_boundary_idx: NDArrayInt = np.zeros([], dtype=np.int64) self.east_boundary_idx: NDArrayInt = np.zeros([], dtype=np.int64) self.south_boundary_idx: NDArrayInt = np.zeros([], dtype=np.int64) self.north_boundary_idx: NDArrayInt = np.zeros([], dtype=np.int64) self.bottom_boundary_idx: NDArrayInt = np.zeros([], dtype=np.int64) self.top_boundary_idx: NDArrayInt = np.zeros([], dtype=np.int64) # These are list of ndarrays self.lhead: List[NDArrayFloat] = [np.zeros(grid.shape, dtype=np.float64)] # TODO: provide the initial density self.lpressure: List[NDArrayFloat] = [ ( np.zeros(grid.shape, dtype=np.float64) - self._get_mesh_center_vertical_pos() ) * GRAVITY * WATER_DENSITY ] # List to store the successive stiffness matrices # This is mostly for development purposes. # only activated with the adjoint state or specific devs. self.is_save_spmats: bool = False self.l_q_next: List[lil_array] = [] self.l_q_prev: List[lil_array] = [] # preconditioner (LU) for q_next, only useful to store with the forward # sensivitiy approach. self.is_save_spilu: bool = False self.super_ilu: Optional[SuperLU] = None self.preconditioner: Optional[LinearOperator] = None @property def head(self) -> NDArrayFloat: """ Return head [m] as array with dimension (nx, ny, nz, nt). This is read-only. """ return np.transpose(np.array(self.lhead), axes=(1, 2, 3, 0)) @property def pressure(self) -> NDArrayFloat: """ Return pressure [Pa] as array with dimension (nx, ny, nz, nt). This is read-only. """ return np.transpose(np.array(self.lpressure), axes=(1, 2, 3, 0)) @property def u_darcy_x(self) -> NDArrayFloat: """ Return x-darcy velocities as array with dimension (nx, ny, nz, nt + 1). This is read-only. """ return np.transpose(np.array(self.lu_darcy_x), axes=(1, 2, 3, 0)) @property def u_darcy_y(self) -> NDArrayFloat: """ Return y-darcy velocities as array with dimension (nx, ny, nz, nt + 1). This is read-only. """ return np.transpose(np.array(self.lu_darcy_y), axes=(1, 2, 3, 0)) @property def u_darcy_z(self) -> NDArrayFloat: """ Return z-darcy velocities as array with dimension (nx, ny, nz, nt + 1). This is read-only. """ return np.transpose(np.array(self.lu_darcy_z), axes=(1, 2, 3, 0)) @property def u_darcy_div(self) -> NDArrayFloat: """ Return darcy divergence as array with dimension (nx, ny, nz, nt + 1). This is read-only. """ return np.transpose(np.array(self.lu_darcy_div), axes=(1, 2, 3, 0)) @property def unitflow(self) -> NDArrayFloat: """ Return flow sources sources as array with dimension (nx, ny, nz, nt + 1). This is read-only. """ return np.transpose(np.array(self.lunitflow), axes=(1, 2, 3, 0)) def add_boundary_conditions(self, condition: BoundaryCondition) -> None: """Add a boundary condition to the flow model.""" if not isinstance(condition, ConstantHead): raise ValueError( f"{condition} is not a valid boundary condition for the flow model !" ) self.boundary_conditions.append(condition) if isinstance(condition, ConstantHead): # 0) Set the values self.lhead[0][condition.span] = condition.values # 1) Get the new constant head node numbers def set_constant_head_indices(self) -> None: """Set the indices of nodes with constant head.""" node_numbers = np.array([], dtype=np.int32) nx, ny, nz = self.lhead[0].shape # type: ignore _west_bidx: Set[Tuple[int, int]] = set() _east_bidx: Set[Tuple[int, int]] = set() _south_bidx: Set[Tuple[int, int]] = set() _north_bidx: Set[Tuple[int, int]] = set() _bottom_bidx: Set[Tuple[int, int]] = set() _top_bidx: Set[Tuple[int, int]] = set() for condition in self.boundary_conditions: if isinstance(condition, ConstantHead): # 1) Get the new constant head node numbers new_nn: NDArrayInt = span_to_node_numbers_3d(condition.span, nx, ny, nz) # 2) add the new nn to the global list of nn node_numbers = np.hstack([node_numbers, new_nn]) # 3) determine if the segment is along one of the 5 borders of the # domain. First we start by getting the indices in the grid _ix, _iy, _iz = node_number_to_indices(new_nn, nx, ny) # The span must be continuous (rectangular group of grid cells), # so we can estimate the direction of constant head segment: # must be more than 2 values on one of the borders # X non_zero_west: int = np.count_nonzero(_ix == 0) if non_zero_west > 1 or (non_zero_west == 1 and ny == 1 and nz == 1): for iy, iz in zip(_iy, _iz): _west_bidx.add((iy, iz)) non_zero_east: int = np.count_nonzero(_ix == nx - 1) if non_zero_east > 1 or (non_zero_east == 1 and ny == 1 and nz == 1): for iy, iz in zip(_iy, _iz): _east_bidx.add((iy, iz)) # Y non_zero_south: int = np.count_nonzero(_iy == 0) if non_zero_south > 1 or (non_zero_south == 1 and nx == 1 and nz == 1): for ix, iz in zip(_ix, _iz): _south_bidx.add((ix, iz)) non_zero_north: int = np.count_nonzero(_iy == ny - 1) if non_zero_north > 1 or (non_zero_north == 1 and nx == 1 and nz == 1): for ix, iz in zip(_ix, _iz): _north_bidx.add((ix, iz)) # Z non_zero_bottom: int = np.count_nonzero(_iz == 0) if non_zero_bottom > 1 or ( non_zero_bottom == 1 and nx == 1 and ny == 1 ): for ix, iy in zip(_ix, _iy): _bottom_bidx.add((ix, iy)) non_zero_top: int = np.count_nonzero(_iz == nz - 1) if non_zero_top > 1 or (non_zero_top == 1 and nx == 1 and ny == 1): for ix, iy in zip(_ix, _iy): _top_bidx.add((ix, iy)) # domain boundary indices to numpy => easier indexing self.west_boundary_idx = np.atleast_2d( np.array(list(_west_bidx), dtype=np.int64).T ) self.east_boundary_idx = np.atleast_2d( np.array(list(_east_bidx), dtype=np.int64).T ) self.south_boundary_idx = np.atleast_2d( np.array(list(_south_bidx), dtype=np.int64).T ) self.north_boundary_idx = np.atleast_2d( np.array(list(_north_bidx), dtype=np.int64).T ) self.bottom_boundary_idx = np.atleast_2d( np.array(list(_bottom_bidx), dtype=np.int64).T ) self.top_boundary_idx = np.atleast_2d( np.array(list(_top_bidx), dtype=np.int64).T ) # remove duplicates from the global list self.cst_head_nn: NDArrayInt = np.unique(node_numbers.flatten()) @property def cst_head_indices(self) -> NDArrayInt: """Return the indices (array) of the constant head grid cells.""" # [:2] to ignore the z axis return np.array( node_number_to_indices( self.cst_head_nn, nx=self.head.shape[0], ny=self.head.shape[1] ) ) @property def free_head_nn(self) -> NDArrayInt: """Return the free head node numbers.""" return get_a_not_in_b_1d( np.arange(np.prod(self.lhead[0].shape), dtype=np.int32), # type: ignore self.cst_head_nn, ) @property def free_head_indices(self) -> NDArrayInt: """Return the indices (array) of the free head grid cells.""" # [:2] to ignore the z axis return np.array( node_number_to_indices( self.free_head_nn, nx=self.head.shape[0], ny=self.head.shape[1] ) ) def reinit(self) -> None: """Set all arrays to zero except for the initial conditions(first time).""" self.lhead = self.lhead[:1] self.lpressure = self.lpressure[:1] self.lu_darcy_x = [] self.lu_darcy_y = [] self.lu_darcy_z = [] self.lu_darcy_div = [] self.lunitflow = [] self.set_constant_head_indices() self.l_q_next = [] self.l_q_prev = [] self.super_ilu = None self.preconditioner = None @property def u_darcy_x_center(self) -> NDArrayFloat: """The darcy x-velocities estimated at the mesh centers.""" # Compute the average velocity tmp = np.zeros((self.head.shape)) tmp += self.u_darcy_x[:-1, :, :, :] tmp += self.u_darcy_x[1:, :, :, :] # All nodes have 2 boundaries along the y axis, except for the # borders grid cells tmp[1:-1, :, :, :] /= 2 # for the borders we need to check if a boundary (flow) exist or not # this is a consequence of constant head and imposed flux if self.west_boundary_idx.size != 0: tmp[0, self.west_boundary_idx[0], self.west_boundary_idx[1], :] /= 2 if self.east_boundary_idx.size != 0: tmp[-1, self.east_boundary_idx[0], self.east_boundary_idx[1], :] /= 2 return tmp @property def u_darcy_y_center(self) -> NDArrayFloat: """The darcy y-velocities estimated at the mesh centers.""" # Compute the average velocity tmp = np.zeros((self.head.shape)) tmp += self.u_darcy_y[:, :-1, :, :] tmp += self.u_darcy_y[:, 1:, :, :] tmp[:, 1:-1, :, :] /= 2 # All nodes have 2 boundaries along the x axis, except for the # borders grid cells # for the borders we need to check if a boundary (flow) exist or not # this is a consequence of constant head and imposed flux if self.south_boundary_idx.size != 0: tmp[self.south_boundary_idx[0], 0, self.south_boundary_idx[1], :] /= 2 if self.north_boundary_idx.size != 0: tmp[self.north_boundary_idx[0], -1, self.north_boundary_idx[1], :] /= 2 return tmp @property def u_darcy_z_center(self) -> NDArrayFloat: """The darcy Z-velocities estimated at the mesh centers.""" # Compute the average velocity tmp = np.zeros((self.head.shape)) tmp += self.u_darcy_z[:, :, :-1, :] tmp += self.u_darcy_z[:, :, 1:, :] tmp[:, :, 1:-1, :] /= 2 # All nodes have 2 boundaries along the z axis, except for the # borders grid cells # for the borders we need to check if a boundary (flow) exist or not # this is a consequence of constant head and imposed flux if self.bottom_boundary_idx.size != 0: tmp[self.bottom_boundary_idx[0], self.bottom_boundary_idx[1], 0, :] /= 2 if self.top_boundary_idx.size != 0: tmp[self.top_boundary_idx[0], self.top_boundary_idx[1], -1, :] /= 2 return tmp @property def u_darcy_norm(self) -> NDArrayFloat: """The norm of the darcy velocity estimated at the center of the mesh.""" return np.sqrt(self.u_darcy_x_center**2 + self.u_darcy_y_center**2) def get_u_darcy_norm_sample(self, time_index: int) -> NDArrayFloat: """The norm of the darcy velocity estimated at the center of the grid cell.""" # for x tmp_x = np.zeros_like(self.lhead[time_index]) tmp_x += ( self.lu_darcy_x[time_index][:-1, :, :] + self.lu_darcy_x[time_index][1:, :, :] ) # All nodes have 2 boundaries along the y axis, except for the # borders grid cells tmp_x[1:-1, :, :] /= 2 # for the borders we need to check if a boundary (flow) exist or not # this is a consequence of constant head and imposed flux if self.west_boundary_idx.size != 0: tmp_x[0, self.west_boundary_idx[0], self.west_boundary_idx[1]] /= 2 if self.east_boundary_idx.size != 0: tmp_x[-1, self.east_boundary_idx[0], self.east_boundary_idx[1]] /= 2 # for y tmp_y = np.zeros((self.lhead[time_index].shape)) tmp_y += ( self.lu_darcy_y[time_index][:, :-1, :] + self.lu_darcy_y[time_index][:, 1:, :] ) # All nodes have 2 boundaries along the y axis, except for the # borders grid cells tmp_y[:, 1:-1, :] /= 2 # for the borders we need to check if a boundary (flow) exist or not # this is a consequence of constant head and imposed flux if self.south_boundary_idx.size != 0: tmp_y[self.south_boundary_idx[0], 0, self.south_boundary_idx[1]] /= 2 if self.north_boundary_idx.size != 0: tmp_y[self.north_boundary_idx[0], -1, self.north_boundary_idx[1]] /= 2 # for z tmp_z = np.zeros((self.lhead[time_index].shape)) tmp_z += ( self.lu_darcy_z[time_index][:, :, :-1] + self.lu_darcy_z[time_index][:, :, 1:] ) # All nodes have 2 boundaries along the y axis, except for the # borders grid cells tmp_z[:, :, 1:-1] /= 2 # for the borders we need to check if a boundary (flow) exist or not # this is a consequence of constant head and imposed flux if self.bottom_boundary_idx.size != 0: tmp_z[self.bottom_boundary_idx[0], self.bottom_boundary_idx[1], 0] /= 2 if self.top_boundary_idx.size != 0: tmp_z[self.top_boundary_idx[0], self.top_boundary_idx[1], -1] /= 2 # norm return np.sqrt(tmp_x**2 + tmp_y**2 + tmp_z**2) def get_du_darcy_norm_sample( self, time_index: int ) -> Tuple[NDArrayFloat, NDArrayFloat, NDArrayFloat]: """The norm of the darcy velocity estimated at the center of the grid cell.""" # for x tmp_x = np.zeros_like(self.lhead[time_index]) tmp_x += ( self.lu_darcy_x[time_index][:-1, :, :] + self.lu_darcy_x[time_index][1:, :, :] ) # All nodes have 2 boundaries along the y axis, except for the # borders grid cells divx = np.ones_like(tmp_x) divx[1:-1, :, :] /= 2 # for the borders we need to check if a boundary (flow) exist or not # this is a consequence of constant head and imposed flux if self.west_boundary_idx.size != 0: divx[0, self.west_boundary_idx[0], self.west_boundary_idx[1]] /= 2 if self.east_boundary_idx.size != 0: divx[-1, self.east_boundary_idx[0], self.east_boundary_idx[1]] /= 2 tmp_x *= divx # for y tmp_y = np.zeros((self.lhead[time_index].shape)) tmp_y += ( self.lu_darcy_y[time_index][:, :-1, :] + self.lu_darcy_y[time_index][:, 1:, :] ) # All nodes have 2 boundaries along the y axis, except for the # borders grid cells divy = np.ones_like(tmp_y) divy[:, 1:-1, :] /= 2 # for the borders we need to check if a boundary (flow) exist or not # this is a consequence of constant head and imposed flux if self.south_boundary_idx.size != 0: divy[self.south_boundary_idx[0], 0, self.south_boundary_idx[1]] /= 2 if self.north_boundary_idx.size != 0: divy[self.north_boundary_idx[0], -1, self.north_boundary_idx[1]] /= 2 tmp_y *= divy # for z tmp_z = np.zeros((self.lhead[time_index].shape)) tmp_z += ( self.lu_darcy_z[time_index][:, :, :-1] + self.lu_darcy_z[time_index][:, :, 1:] ) # All nodes have 2 boundaries along the y axis, except for the # borders grid cells divz = np.ones_like(tmp_z) divz[:, :, 1:-1] /= 2 # for the borders we need to check if a boundary (flow) exist or not # this is a consequence of constant head and imposed flux if self.bottom_boundary_idx.size != 0: divz[self.bottom_boundary_idx[0], self.bottom_boundary_idx[1], 0] /= 2 if self.top_boundary_idx.size != 0: divz[self.top_boundary_idx[0], self.top_boundary_idx[1], -1] /= 2 tmp_z *= divz # norm norm = np.sqrt(tmp_x**2 + tmp_y**2, tmp_z**2) # inverse of the norm -> avoid division by zero inv_norm = np.zeros_like(norm) mask = norm > 0.0 inv_norm[mask] = 1.0 / norm[mask] # return (d|U|/dUx , d|U|/dUy, d|U|/dUz) return inv_norm * tmp_x * divx, inv_norm * tmp_y * divy, inv_norm * tmp_z * divz def get_u_darcy_norm(self) -> NDArrayFloat: """The norm of the darcy velocity estimated at the center of the grid cell.""" # for x tmp_x = np.zeros_like(self.head) tmp_x += self.u_darcy_x[:-1, :, :] + self.u_darcy_x[1:, :, :] # All nodes have 2 boundaries along the y axis, except for the # borders grid cells tmp_x[1:-1, :, :] /= 2 # for the borders we need to check if a boundary (flow) exist or not # this is a consequence of constant head and imposed flux if self.west_boundary_idx.size != 0: tmp_x[0, self.west_boundary_idx[0], self.west_boundary_idx[1]] /= 2 if self.east_boundary_idx.size != 0: tmp_x[-1, self.east_boundary_idx[0], self.east_boundary_idx[1]] /= 2 # for y tmp_y = np.zeros((self.head.shape)) tmp_y += self.u_darcy_y[:, :-1, :] + self.u_darcy_y[:, 1:, :] # All nodes have 2 boundaries along the y axis, except for the # borders grid cells tmp_y[:, 1:-1, :] /= 2 # for the borders we need to check if a boundary (flow) exist or not # this is a consequence of constant head and imposed flux if self.south_boundary_idx.size != 0: tmp_y[self.south_boundary_idx[0], 0, self.south_boundary_idx[1]] /= 2 if self.north_boundary_idx.size != 0: tmp_y[self.north_boundary_idx[0], -1, self.north_boundary_idx[1]] /= 2 # for z tmp_z = np.zeros((self.head.shape)) tmp_z += self.u_darcy_z[:, :, :-1] + self.u_darcy_z[:, :, 1:] # All nodes have 2 boundaries along the y axis, except for the # borders grid cells tmp_z[:, :, 1:-1] /= 2 # for the borders we need to check if a boundary (flow) exist or not # this is a consequence of constant head and imposed flux if self.bottom_boundary_idx.size != 0: tmp_z[self.bottom_boundary_idx[0], self.bottom_boundary_idx[1], 0] /= 2 if self.top_boundary_idx.size != 0: tmp_z[self.top_boundary_idx[0], self.top_boundary_idx[1], -1] /= 2 # norm return np.sqrt(tmp_x**2 + tmp_y**2 + tmp_z**2) def get_vertical_dim(self) -> int: """Return the number of voxel along the vertical_axis axis.""" if self.vertical_axis == VerticalAxis.X: return self.lhead[0].shape[0] elif self.vertical_axis == VerticalAxis.Y: return self.lhead[0].shape[1] else: return self.lhead[0].shape[2] # TODO: cache def _get_mesh_center_vertical_pos(self) -> NDArrayFloat: """Return the vertical position of the grid cells centers.""" xv, yv, zv = np.meshgrid( range(self.lhead[0].shape[0]), range(self.lhead[0].shape[1]), range(self.lhead[0].shape[2]), indexing="ij", ) if self.vertical_axis == VerticalAxis.X: return (xv + 0.5) * self.vertical_mesh_size elif self.vertical_axis == VerticalAxis.Y: return (yv + 0.5) * self.vertical_mesh_size else: return (zv + 0.5) * self.vertical_mesh_size def get_pressure_pa(self) -> NDArrayFloat: """Return the pressure in Pa.""" return self.pressure def get_pressure_bar(self) -> NDArrayFloat: """Return the pressure in bar.""" return self.get_pressure_pa() / 1e5 def pressure_to_head( self, pressure: NDArrayFloat, ) -> NDArrayFloat: """Convert pressure [Pa] to head [m].""" return pressure / GRAVITY / WATER_DENSITY + self._get_mesh_center_vertical_pos() def head_to_pressure( self, head: NDArrayFloat, ) -> NDArrayFloat: """Convert head [m] to pressure [Pa].""" return (head - self._get_mesh_center_vertical_pos()) * GRAVITY * WATER_DENSITY def set_initial_head( self, values: Union[float, int, NDArrayInt, NDArrayFloat], span: Union[NDArrayInt, Tuple[slice, slice, slice], NDArrayBool] = ( slice(None), slice(None), slice(None), ), ) -> None: """Set the initial head field.""" self.lhead[0][span] = values self.lpressure[0][span] = self.head_to_pressure(self.lhead[0])[span] def set_initial_pressure( self, values: Union[float, int, NDArrayInt, NDArrayFloat], span: Union[NDArrayInt, Tuple[slice, slice, slice], NDArrayBool] = ( slice(None), slice(None), slice(None), ), ) -> None: """Set the initial pressure field in Pa.""" self.lpressure[0][span] = values self.lhead[0][span] = self.pressure_to_head(self.lpressure[0])[span] @property @abstractmethod def is_gravity(self) -> bool: """Return False because the gravity effect is ignored with saturated flow.""" ... # TODO: make the link with the initial density for the pressure class SaturatedFlowModel(FlowModel): __slots__ = [ "_head", ] def __init__( self, grid: RectilinearGrid, time_params: TimeParameters, fl_params: FlowParameters, ) -> None: """Initialize the instance.""" super().__init__(grid, time_params, fl_params) @property def is_gravity(self) -> bool: """Return False because the gravity effect is ignored with saturated flow.""" return False class DensityFlowModel(FlowModel): __slots__ = ["_pressure", "density"] def __init__( self, grid: RectilinearGrid, time_params: TimeParameters, fl_params: FlowParameters, ) -> None: """Initialize the instance.""" super().__init__(grid, time_params, fl_params) @property def is_gravity(self) -> bool: """Return True because the gravity effect is considered with density flow.""" return True class TransportModel: """Represent a flow model.""" __slots__ = [ "crank_nicolson_diffusion", "crank_nicolson_advection", "diffusion", "dispersivity", "porosity", "lmob", "limmob", "lsources", # this is needed for the adjoint state "ldensity", "immob_prev", "boundary_conditions", "cst_conc_nn", "q_prev", "q_next", "rtol", "is_numerical_acceleration", "is_num_acc_for_timestep", "fpi_eps", "max_fpi", "molar_mass", "is_skip_rt", "is_save_spmats", "l_q_next", "l_q_prev", "is_save_spilu", "super_ilu", "preconditioner", ] def __init__( self, grid: RectilinearGrid, time_params: TimeParameters, tr_params: TransportParameters, gch_params: GeochemicalParameters, ) -> None: """Initialize the instance.""" self.crank_nicolson_diffusion: float = tr_params.crank_nicolson_diffusion self.crank_nicolson_advection: float = tr_params.crank_nicolson_advection self.diffusion = np.ones(grid.shape, dtype=np.float64) * tr_params.diffusion self.dispersivity = ( np.ones(grid.shape, dtype=np.float64) * tr_params.dispersivity ) self.porosity = np.ones(grid.shape, dtype=np.float64) * tr_params.porosity self.lmob: List[NDArrayFloat] = [ np.zeros((self.n_sp, grid.nx, grid.ny, grid.nz), dtype=np.float64) ] self.lmob[0][0, :, :, :] = gch_params.conc self.lmob[0][1, :, :, :] = gch_params.conc2 self.limmob: List[NDArrayFloat] = [ np.zeros((self.n_sp, grid.nx, grid.ny, grid.nz), dtype=np.float64) ] # For now, only on mineral self.limmob[0][0, :, :, :] = gch_params.grade self.limmob[0][1, :, :, :] = gch_params.grade2 self.ldensity: List[NDArrayFloat] = [] self.lsources: List[NDArrayFloat] = [] self.immob_prev = np.zeros( (self.n_sp, grid.nx, grid.ny, grid.nz), dtype=np.float64 ) self.boundary_conditions: List[BoundaryCondition] = [] self.q_prev: lil_array = lil_array((grid.n_grid_cells, grid.n_grid_cells)) self.q_next: lil_array = lil_array((grid.n_grid_cells, grid.n_grid_cells)) self.cst_conc_nn: NDArrayInt = np.array([], dtype=np.int64) self.rtol: float = tr_params.rtol self.is_numerical_acceleration: bool = tr_params.is_numerical_acceleration # The numerical acceleration can be temporarily disabled self.is_num_acc_for_timestep: bool = self.is_numerical_acceleration self.fpi_eps: float = tr_params.fpi_eps self.max_fpi: int = tr_params.max_fpi self.molar_mass: float = gch_params.Ms self.is_skip_rt: bool = tr_params.is_skip_rt # List to store the successive stiffness matrices # This is mostly for development purposes. # only activated with the adjoint state or specific devs. self.is_save_spmats: bool = False self.l_q_next: List[lil_array] = [] self.l_q_prev: List[lil_array] = [] # preconditioner (LU) for q_next, only useful to store with the forward # sensivitiy approach. self.is_save_spilu: bool = False self.super_ilu: Optional[SuperLU] = None self.preconditioner: Optional[LinearOperator] = None @property def mob(self) -> NDArrayFloat: """ Return mobile concentrations as array with dimension (nsp, nx, ny, nz, nt+1). This is read-only. """ return np.transpose(np.array(self.lmob), axes=(1, 2, 3, 4, 0)) @property def immob(self) -> NDArrayFloat: """ Return immobile concentrations as array with dimension (nsp, nx, ny, nz, nt+1). This is read-only. """ return np.transpose(np.array(self.limmob), axes=(1, 2, 3, 4, 0)) @property def conc(self) -> NDArrayFloat: """ Return mobile concentrations as array with dimension (2, nx, ny, nz, nt + 1). This is read-only. Alias for mob. """ return self.mob[0] @property def conc2(self) -> NDArrayFloat: """ Return mobile concentrations as array with dimension (2, nx, ny, nz, nt + 1). This is read-only. Alias for mob. """ return self.mob[1] @property def grade(self) -> NDArrayFloat: """ Return immobile concentrations as array with dimension (nx, ny, nz, nt + 1). This is read-only. Alias for immob[0]. """ return self.immob[0] @property def grade2(self) -> NDArrayFloat: """ Return immobile concentrations as array with dimension (nx, ny, nz, nt + 1). This is read-only. Alias for immob[1]. """ return self.immob[1] @property def density(self) -> NDArrayFloat: """ Return densities in g/l as array with dimension (nx, ny, nz, nt + 1). This is read-only. """ if len(self.ldensity) == 0: return np.array([]) return np.transpose(np.array(self.ldensity), axes=(1, 2, 3, 0)) @property def sources(self) -> NDArrayFloat: """ Return concentration sources as array with dimension (2, nx, ny, nz, nt + 1). This is read-only. """ return np.transpose(np.array(self.lsources), axes=(1, 2, 3, 4, 0)) @property def effective_diffusion(self) -> NDArrayFloat: """Return the effective diffusion (diffusion * porosity).""" return self.diffusion * self.porosity @property def n_sp(self) -> int: """ Return the number of mobile species in the system. This is hard-coded for now. """ return 2 def set_initial_grade( self, values: Union[float, int, NDArrayInt, NDArrayFloat], sp: Optional[int] = 0, span: Union[NDArrayInt, Tuple[slice, slice, slice], NDArrayBool] = ( slice(None), slice(None), slice(None), ), ) -> None: """Set the initial grades.""" self.limmob[0][sp][span] = values def set_initial_conc( self, values: Union[float, int, NDArrayInt, NDArrayFloat], sp: int = 0, span: Union[NDArrayInt, Tuple[slice, slice, slice], NDArrayBool] = ( slice(None), slice(None), slice(None), ), ) -> None: """Set the initial concentrations.""" self.lmob[0][sp][span] = values def add_boundary_conditions(self, condition: BoundaryCondition) -> None: """Add a boundary condition to the transport model.""" if not isinstance(condition, ConstantConcentration) and not isinstance( condition, ZeroConcGradient ): raise ValueError( f"{condition} is not a valid boundary condition for the " "transport model !" ) self.boundary_conditions.append(condition) if isinstance(condition, ConstantConcentration): # 0) Set the values # TODO pass def set_constant_conc_indices(self) -> None: """Set the indices of nodes with constant head.""" node_numbers = np.array([], dtype=np.int32) for condition in self.boundary_conditions: if isinstance(condition, ConstantConcentration): node_numbers = np.hstack( [ node_numbers, span_to_node_numbers_3d( condition.span, self.mob.shape[0], self.mob.shape[1], self.mob.shape[2], ), ] ) self.cst_conc_nn: NDArrayInt = np.unique(node_numbers.flatten()) @property def cst_conc_indices(self) -> NDArrayInt: """Return the indices (array) of the constant conc grid cells.""" # [:2] to ignore the z axis return np.array( node_number_to_indices( self.cst_conc_nn, nx=self.mob.shape[0], ny=self.mob.shape[1] ) ) @property def free_conc_nn(self) -> NDArrayInt: """Return the free conc node numbers.""" return get_a_not_in_b_1d( np.arange(np.prod(self.lmob[0].shape), dtype=np.int32), # type: ignore self.cst_conc_nn, ) @property def free_conc_indices(self) -> NDArrayInt: """Return the indices (array) of the free conc grid cells.""" # [:2] to ignore the z axis return np.array( node_number_to_indices( self.free_conc_nn, nx=self.mob.shape[0], ny=self.mob.shape[1] ) ) def reinit(self) -> None: """Set all arrays to zero except for the initial conditions(first time).""" self.lmob = self.lmob[:1] self.limmob = self.limmob[:1] self.immob_prev = self.limmob[0] # There is no initial condition for the density. It is all computed. self.ldensity.clear() self.lsources.clear() self.set_constant_conc_indices() self.l_q_next = [] self.l_q_prev = [] self.super_ilu = None self.preconditioner = None
[docs]class ForwardModel: """ Class representing the reactive transport model. wadv: float Advection weight (for testing between 0.0 and 1.0). The default is 1.0. """ slots = [ "grid", "time_params", "fl_params", "tr_params", "gch_params", "source_terms", "boundary_conditions", ]
[docs] def __init__( self, grid: RectilinearGrid, time_params: TimeParameters, fl_params: FlowParameters = FlowParameters(), tr_params: TransportParameters = TransportParameters(), gch_params: GeochemicalParameters = GeochemicalParameters(), source_terms: Optional[Union[SourceTerm, Sequence[SourceTerm]]] = None, boundary_conditions: Optional[ Union[BoundaryCondition, Sequence[BoundaryCondition]] ] = None, ) -> None: """ Initialize the instance. Parameters ---------- grid : RectilinearGrid _description_ time_params : TimeParameters _description_ fl_params : FlowParameters _description_ tr_params : TransportParameters _description_ gch_params : GeochemicalParameters _description_ wells : Sequence[Well], optional _description_, by default default_field([]) """ self.grid: RectilinearGrid = grid self.time_params: TimeParameters = time_params self.gch_params: GeochemicalParameters = gch_params # Two possible flowmodels if fl_params.is_gravity: self.fl_model: FlowModel = DensityFlowModel(grid, time_params, fl_params) else: self.fl_model: FlowModel = SaturatedFlowModel(grid, time_params, fl_params) self.tr_model: TransportModel = TransportModel( grid, time_params, tr_params, gch_params ) if source_terms is not None: self.source_terms: Dict[str, SourceTerm] = { v.name: v for v in object_or_object_sequence_to_list(source_terms) } else: self.source_terms: Dict[str, SourceTerm] = {} if boundary_conditions is None: return for condition in object_or_object_sequence_to_list(boundary_conditions): self.add_boundary_conditions(condition) self.fl_model.set_constant_head_indices()
[docs] def get_sources( self, time: float, grid: RectilinearGrid ) -> Tuple[NDArrayFloat, NDArrayFloat]: """Get the flow sources and sink terms.""" _unitflw_src = np.zeros(grid.shape) _conc_src = np.zeros((self.tr_model.n_sp, grid.nx, grid.ny, grid.nz)) # iterate the source terms for source in self.source_terms.values(): # identify the source term applying _flw, _conc = source.get_values(time) nids = source.get_node_indices(grid) # Add the flowrates contribution _unitflw_src[nids[0], nids[1], nids[2]] += _flw / source.n_nodes # Keep only non negative flowrates (remove sink terms) if _flw > 0: for sp in range(self.tr_model.n_sp): try: _conc_src[sp, nids[0], nids[1], nids[2]] += ( _flw * _conc[sp] / source.n_nodes ) except IndexError: pass for condition in self.fl_model.boundary_conditions: if isinstance(condition, ConstantHead): # Set zero where there constant head _unitflw_src[condition.span] = 0.0 for condition in self.tr_model.boundary_conditions: if isinstance(condition, ConstantConcentration): # Set zero where there constant concentration for sp in range(self.tr_model.n_sp): _conc_src[sp][condition.span] = 0.0 return ( _unitflw_src / self.grid.grid_cell_volume, # /s _conc_src / self.grid.grid_cell_volume, # mol/L )
[docs] def add_src_term(self, source_term: SourceTerm) -> None: """Add a source term.""" if self.source_terms.get(source_term.name) is not None: warnings.warn( f"{source_term.name} is already among the source terms" " and has been overwritten!" ) self.source_terms[source_term.name] = source_term
[docs] def add_boundary_conditions(self, condition: BoundaryCondition) -> None: """Add a boundary condition to the flow or the transport model.""" # TODO: add a check to see if the given condition is on a border of the grid or # not. if isinstance(condition, ConstantHead): self.fl_model.add_boundary_conditions(condition) return if isinstance(condition, ConstantConcentration) or isinstance( condition, ZeroConcGradient ): self.tr_model.add_boundary_conditions(condition) return raise ValueError(f"{condition} is not a valid boundary condition !")
[docs] def reinit(self) -> None: """Set all arrays to zero except for the initial conditions(first time).""" self.fl_model.reinit() self.tr_model.reinit() self.time_params.reset_to_init()
[docs] def get_ij_over_u(self, time_index: int) -> NDArrayFloat: """Get the ij/Unorm for the CFL condition.""" num = 1e300 if self.grid.nx > 1: num = min(self.grid.dx, num) if self.grid.ny > 1: num = min(self.grid.dy, num) if self.grid.nz > 1: num = min(self.grid.dz, num) den = np.sqrt( self.fl_model.u_darcy_x_center[:, :, :, time_index] ** 2 + self.fl_model.u_darcy_y_center[:, :, :, time_index] ** 2 + self.fl_model.u_darcy_z_center[:, :, :, time_index] ** 2 ) # VERY_SMALL_NUMBER to avoid division by zero. den = np.where(den < VERY_SMALL_NUMBER, VERY_SMALL_NUMBER, den) return num / den
def __deepcopy__(self, memo): deepcopy_method = self.__deepcopy__ self.__deepcopy__ = None # Handle non pickebeable objects tmp_fl_spilu = self.fl_model.super_ilu self.fl_model.super_ilu = None tmp_fl_pcd = self.fl_model.preconditioner self.fl_model.preconditioner = None tmp_tr_spilu = self.tr_model.super_ilu self.tr_model.super_ilu = None tmp_tr_pcd = self.tr_model.preconditioner self.tr_model.preconditioner = None cp = copy.deepcopy(self, memo) self.__deepcopy__ = deepcopy_method # Bind to cp by types.MethodType cp.__deepcopy__ = types.MethodType(deepcopy_method.__func__, cp) # custom treatments # restore the attributes cp.fl_model.super_ilu = tmp_fl_spilu cp.fl_model.preconditioner = tmp_fl_pcd cp.tr_model.super_ilu = tmp_tr_spilu cp.tr_model.preconditioner = tmp_tr_pcd return cp
def remove_cst_bound_indices( indices_owner: NDArrayInt, indices_neigh: NDArrayInt, indices_to_remove: NDArrayInt ) -> Tuple[NDArrayInt, NDArrayInt]: """ Remove the indices because of the boundary condition. Parameters ---------- indices_owner : _type_ Indices of owner grid cells. indices_neigh : _type_ Indices of neighbor grid cells. indices_to_remove : DArrayInt Indices to remove. Returns ------- Tuple[NDArrayInt, NDArrayInt] _description_ """ is_kept = ~np.isin(indices_owner, indices_to_remove) return indices_owner[is_kept], indices_neigh[is_kept] def keep_a_b_if_c_in_a( a: NDArrayInt, b: NDArrayInt, c: NDArrayInt ) -> Tuple[NDArrayInt, NDArrayInt]: """Keep values in a and b if c in a.""" is_kept = np.isin(a, c) return a[is_kept], b[is_kept] # TODO cache
[docs]def get_owner_neigh_indices( grid: RectilinearGrid, span_owner: Tuple[slice, slice, slice], span_neigh: Tuple[slice, slice, slice], owner_indices_to_keep: Optional[NDArrayInt] = None, neigh_indices_to_keep: Optional[NDArrayInt] = None, ) -> Tuple[NDArrayInt, NDArrayInt]: """_summary_ Parameters ---------- grid : RectilinearGrid _description_ span_owner : Tuple[slice, slice, slice] _description_ span_neigh : Tuple[slice, slice, slice] _description_ indices_to_remove : NDArrayInt _description_ Returns ------- Tuple[NDArrayInt, NDArrayInt] _description_ """ # Get indices indices_owner: NDArrayInt = span_to_node_numbers_3d( span_owner, nx=grid.nx, ny=grid.ny, nz=grid.nz ) indices_neigh: NDArrayInt = span_to_node_numbers_3d( span_neigh, nx=grid.nx, ny=grid.ny, nz=grid.nz ) if owner_indices_to_keep is not None: indices_owner, indices_neigh = keep_a_b_if_c_in_a( indices_owner, indices_neigh, owner_indices_to_keep ) if neigh_indices_to_keep is not None: indices_neigh, indices_owner = keep_a_b_if_c_in_a( indices_neigh, indices_owner, neigh_indices_to_keep ) return indices_owner, indices_neigh