Source code for pydfnworks.dfnGraph.graph_transport

"""
.. module:: graph_transport.py
   :synopsis: simulate transport on a pipe network representaiton of a DFN 
.. moduleauthor:: Shriram Srinivasan <shrirams@lanl.gov>, Jeffrey Hyman <jhyman@lanl.gov>

"""

import timeit
import sys
import networkx as nx
import numpy as np
import multiprocessing as mp

# pydfnworks graph modules modules
import pydfnworks.dfnGraph.particle_io as io
from pydfnworks.dfnGraph.graph_tdrw import set_up_limited_matrix_diffusion
from pydfnworks.dfnGraph.particle_class import Particle


def track_particle(data, verbose=False):
    """ Tracks a single particle through the graph

        all input parameters are in the dictionary named data 

        Parameters
        ----------
            data : dict
                Dictionary of parameters the includes particle_number, initial_position, 
                tdrw_flag, matrix_porosity, matrix_diffusivity, cp_flag, control_planes,
                 direction, G, and nbrs_dict.  

        Returns
        -------
            particle : object
                Particle will full trajectory

    """
    if verbose:
        p = mp.current_process()
        _, cpu_id = p.name.split("-")
        cpu_id = int(cpu_id)
        print(
            f"--> Particle {data['particle_number']} is starting on worker {cpu_id}"
        )

    particle = Particle(data["particle_number"], data["initial_position"],
                        data["tdrw_flag"], data["matrix_porosity"],
                        data["matrix_diffusivity"], data["fracture_spacing"],
                        data["trans_prob"], data["transfer_time"],
                        data["cp_flag"], data["control_planes"],
                        data["direction"])

    # # get current process information
    global nbrs_dict
    global G_global
    particle.track(G_global, nbrs_dict)
    if verbose:
        print(
            f"--> Particle {data['particle_number']} is complete on worker {cpu_id}"
        )
    return particle


def get_initial_posititions(G, initial_positions, nparticles):
    """ Distributes initial particle positions 

        Parameters
        ----------
                
            G : NetworkX graph 
                obtained from graph_flow

        initial_positions : str
            distribution of initial conditions. options are uniform and flux (flux-weighted)

        nparticles : int
            requested number of particles

        Returns
        -------
            ip : numpy array
                array nparticles long. Each element is the initial position for each particle

        """

    inlet_nodes = [v for v in nx.nodes(G) if G.nodes[v]['inletflag']]
    cnt = len(inlet_nodes)
    print(f"--> There are {cnt} inlet nodes")
    if cnt == 0:
        error = "Error. There are no nodes in the inlet.\nExiting"
        sys.stderr.write(error)
        sys.exit(1)

    # Uniform Distribution for particles
    if initial_positions == "uniform":
        print("--> Using uniform initial positions.")
        ip = np.zeros(nparticles).astype(int)
        n = int(np.ceil(nparticles / cnt))
        print(f"--> {n} particles will be placed at every inflow node.\n")
        ## this could be cleaned up using clever indexing
        inflow_idx = 0
        inflow_cnt = 0
        for i in range(nparticles):
            ip[i] = inlet_nodes[inflow_idx]
            inflow_cnt += 1
            if inflow_cnt >= n:
                inflow_idx += 1
                inflow_cnt = 0

    ## flux weighted initial positions for particles
    elif initial_positions == "flux":
        print("--> Using flux-weighted initial positions.\n")
        flux = np.zeros(cnt)
        for i, u in enumerate(inlet_nodes):
            for v in G.successors(u):
                flux[i] += G.edges[u, v]['flux']
        flux /= flux.sum()
        flux_cnts = [np.ceil(nparticles * i) for i in flux]
        nparticles = int(sum(flux_cnts))
        ip = np.zeros(nparticles).astype(int)
        ## Populate ip with Flux Cnts
        ## this could be cleaned up using clever indexing
        inflow_idx = 0
        inflow_cnt = 0
        for i in range(nparticles):
            ip[i] = inlet_nodes[inflow_idx]
            inflow_cnt += 1
            if inflow_cnt >= flux_cnts[inflow_idx]:
                inflow_idx += 1
                inflow_cnt = 0

    # Throw error if unknown initial position is provided
    else:
        error = f"Error. Unknown initial_positions input {initial_positions}. Options are uniform or flux \n"
        sys.stderr.write(error)
        sys.exit(1)

    return ip, nparticles


def create_neighbor_list(G):
    """ Create a list of downstream neighbor vertices for every vertex on NetworkX graph obtained after running graph_flow

    Parameters
    ----------
        G: NetworkX graph 
            Directed Graph obtained from output of graph_flow

    Returns
    -------
        dict : nested dictionary.

    Notes
    -----
        dict[n]['child'] is a list of vertices downstream to vertex n
        dict[n]['prob'] is a list of probabilities for choosing a downstream node for vertex n
    """

    nbrs_dict = {}

    for u in nx.nodes(G):

        if G.nodes[u]['outletflag']:
            continue

        node_list = []
        prob_list = []
        nbrs_dict[u] = {}

        for v in G.successors(u):
            node_list.append(v)
            prob_list.append(G.edges[u, v]['vol_flow_rate'])

        if node_list:
            nbrs_dict[u]['child'] = node_list
            nbrs_dict[u]['prob'] = np.asarray(prob_list) / sum(prob_list)
        else:
            nbrs_dict[u]['child'] = None
            nbrs_dict[u]['prob'] = None

    return nbrs_dict


def check_tdrw_params(matrix_porosity, matrix_diffusivity, fracture_spacing):
    """ Check that the provided tdrw values are physiscal


    Parameters
    ----------
        G: NetworkX graph 
            Directed Graph obtained from output of graph_flow

    Returns
    -------
        dict : nested dictionary.

    Notes
    -----
        dict[n]['child'] is a list of vertices downstream to vertex n
        dict[n]['prob'] is a list of probabilities for choosing a downstream node for vertex n
    
    """

    if matrix_porosity is None:
        error = f"Error. Requested TDRW but no value for matrix_porosity was provided\n"
        sys.stderr.write(error)
        sys.exit(1)
    elif matrix_porosity < 0 or matrix_porosity > 1:
        error = f"Error. Requested TDRW but value for matrix_porosity provided is outside of [0,1]. Value provided {matrix_porosity}\n"
        sys.stderr.write(error)
        sys.exit(1)
    if matrix_diffusivity is None:
        error = f"Error. Requested TDRW but no value for matrix_diffusivity was provided\n"
        sys.stderr.write(error)
        sys.exit(1)

    if fracture_spacing is not None:
        if fracture_spacing <= 0:
            error = f"Error. Non-positive value for fracture_spacing was provided.\nValue {fracture_spacing}\nExiting program"
            sys.stderr.write(error)
            sys.exit(1)


def check_control_planes(control_planes, direction):
    control_plane_flag = False
    if not type(control_planes) is list:
        error = f"Error. provided controls planes are not a list\n"
        sys.stderr.write(error)
        sys.exit(1)
    else:
        # add None to indicate the end of the control plane list
        control_plane_flag = True

    if direction is None:
        error = f"Error. Primary direction not provided. Required for control planes\n"
        sys.stderr.write(error)
        sys.exit(1)
    elif direction not in ['x', 'y', 'z']:
        error = f"Error. Primary direction is not known. Acceptable values are x,y, and z\n"
        sys.stderr.write(error)
        sys.exit(1)

    print(f"--> Control Planes: {control_planes}")
    print(f"--> Direction: {direction}")
    return control_plane_flag


[docs] def run_graph_transport(self, G, nparticles, partime_file, frac_id_file=None, format='hdf5', initial_positions="uniform", dump_traj=False, tdrw_flag=False, matrix_porosity=None, matrix_diffusivity=None, fracture_spacing=None, control_planes=None, direction=None, cp_filename='control_planes'): """ Run particle tracking on the given NetworkX graph Parameters ---------- self : object DFN Class G : NetworkX graph obtained from graph_flow nparticles: int number of particles initial_positions : str distribution of initial conditions. options are uniform and flux (flux-weighted) partime_file : string name of file to which the total travel times and lengths will be written for each particle frac_id_file : string name of file to which detailed information of each particle's travel will be written dump_flag: bool on/off to write full trajectory information to file tdrw_flag : Bool if False, matrix_porosity and matrix_diffusivity are ignored matrix_porosity: float Matrix Porosity used in TDRW matrix_diffusivity: float Matrix Diffusivity used in TDRW (SI units m^2/s) fracture_spaceing : float finite block size for limited matrix diffusion control_planes : list of floats list of control plane locations to dump travel times. Only in primary direction of flow. primary direction : string (x,y,z) string indicating primary direction of flow Returns ------- particles : list list of particles objects Notes ----- Information on individual functions is found therein """ ## the flow graph needs to be a global variable so all processors can access it ## without making a copy of it. global G_global G_global = nx.Graph() G_global = G.copy() if not format in ['ascii', 'hdf5']: error = ( f"--> Error. Unknown file format provided in run_graph_transport.\n\n--> Provided value is {format}.\n--> Options: 'ascii' or 'hdf5'.\n\nExitting\n\n" ) sys.stderr.write(error) sys.exit(1) print("\n--> Running Graph Particle Tracking") # Check parameters for TDRW if tdrw_flag: check_tdrw_params(matrix_porosity, matrix_diffusivity, fracture_spacing) print( f"--> Running particle transport with TDRW.\n--> Matrix porosity {matrix_porosity}.\n--> Matrix Diffusivity {matrix_diffusivity} m^2/s" ) if control_planes is None: control_plane_flag = False else: control_plane_flag = check_control_planes( control_planes=control_planes, direction=direction) print(f"--> Control Plane Flag {control_plane_flag}") print("--> Creating downstream neighbor list") global nbrs_dict nbrs_dict = create_neighbor_list(G) print("--> Getting initial Conditions") ip, nparticles = get_initial_posititions(G, initial_positions, nparticles) print(f"--> Starting particle tracking for {nparticles} particles") if dump_traj: print(f"--> Writing trajectory information to file") if fracture_spacing is not None: print(f"--> Using limited matrix block size for TDRW") print(f"--> Fracture spacing {fracture_spacing:0.2e} [m]") trans_prob = set_up_limited_matrix_diffusion(G, fracture_spacing, matrix_porosity, matrix_diffusivity) # This doesn't change for the system. # Transfer time diffusing between fracture blocks transfer_time = fracture_spacing**2 / (2 * matrix_diffusivity) else: trans_prob = None transfer_time = None ## main loop if self.ncpu == 1: tic = timeit.default_timer() particles = [] for i in range(nparticles): if i % 1000 == 0: print(f"--> Starting particle {i} out of {nparticles}") particle = Particle(i, ip[i], tdrw_flag, matrix_porosity, matrix_diffusivity, fracture_spacing, trans_prob, transfer_time, control_plane_flag, control_planes, direction) particle.track(G, nbrs_dict) particles.append(particle) elapsed = timeit.default_timer() - tic print( f"--> Main Tracking Loop Complete. Time Required {elapsed:0.2e} seconds" ) stuck_particles = io.dump_particle_info(particles, partime_file, frac_id_file, format) if control_plane_flag: io.dump_control_planes(particles, control_planes, cp_filename, format) if dump_traj: io.dump_trajectories(particles, 1) if self.ncpu > 1: print(f"--> Using {self.ncpu} processors") ## Prepare input data inputs = [] tic = timeit.default_timer() pool = mp.Pool(min(self.ncpu, nparticles)) particles = [] def gather_output(output): particles.append(output) for i in range(nparticles): data = {} data["particle_number"] = i data["initial_position"] = ip[i] data["tdrw_flag"] = tdrw_flag data["matrix_porosity"] = matrix_porosity data["matrix_diffusivity"] = matrix_diffusivity data["fracture_spacing"] = fracture_spacing data["transfer_time"] = transfer_time data["trans_prob"] = trans_prob data["cp_flag"] = control_plane_flag data["control_planes"] = control_planes data["direction"] = direction pool.apply_async(track_particle, args=(data, ), callback=gather_output) pool.close() pool.join() pool.terminate() elapsed = timeit.default_timer() - tic print( f"--> Main Tracking Loop Complete. Time Required {elapsed:0.2e} seconds" ) stuck_particles = io.dump_particle_info(particles, partime_file, frac_id_file, format) if control_plane_flag: io.dump_control_planes(particles, control_planes, cp_filename, format) if dump_traj: io.dump_trajectories(particles, min(self.ncpu, nparticles)) if stuck_particles == 0: print("--> All particles exited the network") print("--> Graph Particle Tracking Completed Successfully.") else: print( f"--> Out of {nparticles} particles, {stuck_particles} particles did not exit" ) # Clean up and delete the global versions del G_global del nbrs_dict return particles