Source code for pydfnworks.dfnFlow.fehm

import os
import subprocess
import sys
import glob
import shutil
from time import time
import numpy as np
"""
Functions for using FEHM in dfnWorks
"""

import os
from time import time

def parse_stor_file(filepath):
    """
    Parse a LaGriT-formatted ASCII STOR file into its structured data blocks.

    This function reads a `.stor` file, extracts header metadata and all subsequent
    numerical data blocks (volumes, connectivity, pointers, and coefficients), and
    returns them as a list of NumPy arrays.

    The expected structure of the file is:
        - 3 header lines (text)
        - Line 3 contains: [nedges, nnodes, ptr_size, num_area_coef, ...]
        - 7 numerical blocks follow:
            1. Voronoi volumes (nnodes)
            2. Row counts (nnodes + 1)
            3. Connectivity list (nedges)
            4. Pointers into coefficient list (nedges * num_area_coef)
            5. Diagonal pointer padding (nnodes + 1)
            6. Diagonal matrix indices (nnodes)
            7. Area coefficients (nedges * num_area_coef)

    Parameters
    ----------
    filepath : str
        Path to the STOR file (ASCII format) to parse.

    Returns
    -------
    List[np.ndarray]
        A list of 7 NumPy arrays corresponding to:
            [volumes, row_counts, conn_list, pointer_block,
             diag_pointers, row_diagonals, area_coefficients]

    Raises
    ------
    ValueError
        If the number of parsed float entries is insufficient based on header metadata.

    Notes
    -----
    This function assumes:
        - The file uses whitespace-separated float values for all numerical blocks.
        - Area coefficients are stored as scalars (`num_area_coef = 1`) or higher.
        - The function does not preserve file pointer for later parsing.

    Examples
    --------
    >>> blocks = parse_stor_file("mesh.stor")
    >>> volumes = blocks[0]
    >>> conn_list = blocks[2]
    """

    with open(filepath, 'r') as file:
        lines = file.readlines()

    # Extract dimension info from line 3
    header_vals = list(map(int, lines[2].split()))
    nedges = header_vals[0]
    nnodes = header_vals[1]
    num_area_coef = header_vals[3]  # assumed position

    # Calculate block lengths
    block_sizes = [
        nnodes,
        nnodes + 1,
        nedges,
        nedges * num_area_coef,
        nnodes + 1,
        nnodes,
        nedges * num_area_coef
    ]

    # Read all remaining float values
    data = []
    for line in lines[3:]:  # Skip header
        data.extend(map(float, line.strip().split()))

    # Sanity check
    expected_total = sum(block_sizes)
    if len(data) < expected_total:
        raise ValueError(f"Not enough data: expected {expected_total}, got {len(data)}")

    # Slice data into blocks
    blocks = []
    start = 0
    for size in block_sizes:
        blocks.append(np.array(data[start:start + size]))
        start += size

    return blocks  # list of 7 arrays


def write_stor_file(filepath_out, header_lines, blocks):
    """
     Write numerical data blocks to a LaGriT-formatted ASCII STOR file.

    This function outputs a `.stor` file that conforms to the LaGriT STOR format,
    using fixed-width formatting and 5 values per line, matching Fortran-style
    output conventions.

    The function assumes that `blocks` is a list of 7 NumPy arrays corresponding to
    parsed data from a STOR file. Each block is written in the correct order and format:
        1. Voronoi volumes (floats)
        2. Row counts (integers)
        3. Connectivity list (integers)
        4. Pointer indices into coefficient list (integers)
        5. Diagonal pointer padding (integers)
        6. Diagonal indices (integers)
        7. Area coefficients (floats)

    Parameters
    ----------
    filepath_out : str
        The path to the output `.stor` file to be written.

    header_lines : List[str]
        The first three lines of the original STOR file header, including:
            - Line 1: Title line
            - Line 2: Date or model info line
            - Line 3: Matrix dimension parameters

    blocks : List[np.ndarray]
        A list of 7 NumPy arrays, each containing one of the STOR file's
        numerical blocks, in the following order:
            [volumes, row_counts, conn_list, pointer_block,
             diag_pointers, row_diagonals, area_coefficients]

    Notes
    -----
    - Floats are written with 12-digit precision and scientific notation (width 20).
    - Integers are written right-aligned with 10-character width.
    - All values are formatted 5 per line for strict format compliance.
    - The function overwrites `filepath_out` if it already exists.

    Examples
    --------
    >>> header_lines = ["Title\n", "Date Line\n", "3603 537 4141 1 13\n"]
    >>> write_stor_file("corrected.stor", header_lines, blocks)
    """    
    def format_floats(arr):
        lines = []
        for i, val in enumerate(arr):
            lines.append(f"{val:20.12E}")
            if (i + 1) % 5 == 0 or i == len(arr) - 1:
                lines.append('\n')
            else:
                lines.append(' ')
        return lines

    def format_integers(arr):
        lines = []
        for i, val in enumerate(arr):
            lines.append(f"{int(val):10d}")
            if (i + 1) % 5 == 0 or i == len(arr) - 1:
                lines.append('\n')
            else:
                lines.append(' ')
        return lines

    with open(filepath_out, 'w') as fout:
        # Header (first 3 lines)
        for line in header_lines:
            fout.write(line)

        # Write each block in the correct format
        fout.writelines(format_floats(blocks[0]))  # volumes
        fout.writelines(format_integers(blocks[1]))  # row_counts
        fout.writelines(format_integers(blocks[2]))  # conn_list
        fout.writelines(format_integers(blocks[3]))  # pointer block
        fout.writelines(format_integers(blocks[4]))  # diag pointers
        fout.writelines(format_integers(blocks[5]))  # row diags
        fout.writelines(format_floats(blocks[6]))    # area coefficients


[docs] def correct_stor_file(self): """ Corrects the Voronoi volumes and area coefficients in a STOR file by applying aperture adjustments. This method reads an ASCII-formatted STOR file used in FEHMN simulations, adjusts the volume and area values based on the aperture data, and writes a new corrected version of the file. Assumptions: - Only ASCII STOR format is supported. - Area coefficients are in scalar format (NUM_AREA_COEF = 1). - Instance attributes include: self.stor_file: Path to the input STOR file. self.material_ids: List of material IDs for each cell. self.aperture: List of aperture values (by cell or material). self.cell_based_aperture: Boolean flag. self.print_log: Logging function. """ self.stor_file = "full_mesh.stor" if not os.path.isfile(self.stor_file): self.print_log("Error. Cannot find STOR file.\nExiting\n", 'error') blocks = parse_stor_file(self.stor_file) volumes = blocks[0] for i in range(self.num_nodes): volumes[i] *= self.aperture[self.material_ids[i] - 1] blocks[0] = volumes conn_list = blocks[2].astype(int) # print(conn_list) # print(len(conn_list), len(set(conn_list))) areas = blocks[6] #for i in conn_list: # areas[i] *= self.aperture[self.material_ids[i-1] - 1] for i in range(len(areas)): index = conn_list[i] areas[i] *= self.aperture[self.material_ids[index-1] - 1] blocks[6] = areas # Also grab the first 3 header lines with open(self.stor_file, 'r') as f: header_lines = [next(f) for _ in range(3)] stor_out_file = self.stor_file.replace('.stor', '_vol_area.stor') write_stor_file(stor_out_file, header_lines, blocks) self.print_log("correcting stor file complete")
def correct_perm_for_fehm(): """ FEHM wants an empty line at the end of the perm file This functions adds that line return Parameters ---------- None Returns --------- None Notes ------------ Only adds a new line if the last line is not empty """ # self.print_log("Modifing perm.dat for FEHM") fp = open("perm.dat") lines = fp.readlines() fp.close() # Check if the last line of file is just a new line # If it is not, then add a new line at the end of the file if len(lines[-1].split()) != 0: print("--> Adding line to perm.dat") fp = open("perm.dat", "a") fp.write("\n") fp.close()
[docs] def fehm(self): """Run FEHM Parameters ---------- self : object DFN Class Returns ------- None Notes ----- See https://fehm.lanl.gov/ for details about FEHM """ self.print_log("--> Running FEHM") if self.flow_solver != "FEHM": error = "Error. Incorrect flow solver requested\n" self.print_log(error, 'error') sys.exit(1) try: shutil.copy(self.dfnFlow_file, self.jobname) except: error = f"--> Error copying FEHM run file: {self.dfnFlow_file}" self.print_log(error, 'error') sys.exit(1) path = self.dfnFlow_file.strip(self.local_dfnFlow_file) with open(self.local_dfnFlow_file) as fp: line = fp.readline() fehm_input = line.split()[-1] try: shutil.copy(path + fehm_input, os.getcwd()) except: error = f"--> Error copying FEHM input file: {fehm_input}" self.print_log(error, 'error') sys.exit(1) self.correct_stor_file() self.dump_hydraulic_values(format = "FEHM") correct_perm_for_fehm() tic = time() cmd = os.environ["FEHM_EXE"] + " " + self.local_dfnFlow_file # self.call_executable(cmd) subprocess.call(cmd, shell = True) self.print_log('=' * 80) self.print_log("FEHM Complete") elapsed = time() - tic self.print_log(f"Time Required {elapsed} Seconds") self.print_log('=' * 80) correct_volume_file = os.path.join(self.jobname, "correct_volumes_logfile.log") if os.path.exists(correct_volume_file): self.print_log(f"--> Printing correct volumes output file:") self.print_log(f"filename: {correct_volume_file}") try: with open(correct_volume_file, 'r') as file: for line in file: self.print_log(line.strip()) except FileNotFoundError: self.print_log(f"File not found: {correct_volume_file}") except Exception as e: self.print_log(f"An error occurred: {e}")