Source code for alframework.samplers.reactive_sampler

import os
import numpy as np
import ase
from ase.md.langevin import Langevin
from ase import units
from ase.io.trajectory import Trajectory
import time
from ase import Atoms
import pickle as pkl
from parsl import python_app, bash_app

from alframework.tools.tools import annealing_schedule
from alframework.tools.tools import system_checker
from alframework.tools.tools import load_module_from_config
from alframework.samplers.ASE_ensemble_constructor import MLMD_calculator
from alframework.tools.tools import build_input_dict
from ase.mep import DimerControl, MinModeAtoms, MinModeTranslate

import sys

from ase import io
from ase.mep import NEB
from ase.optimize import BFGS

@python_app(executors=['alf_sampler_executor'])
def reactive_sampling(molecule_object, sampler_config,model_path,current_model_id,gpus_per_node, ase_calculator, N_neb, max_iter, neb_steps, reactive_sampling_method, Escut, Fscut, Fmmult=3.0) :
    # Performs reactive sampling with NEB/dimer methods based on input based in the config file
    
    failed=False

    # NEB runs first and attempts to find structure above uncertainity
    if 'neb' in reactive_sampling_method.lower(): # NEB sampling
        neb_images = neb_sampler(molecule_object, sampler_config, model_path, current_model_id, N_neb, neb_steps)
    
        for i in range(0,max_iter):

            # Check if NEB image already above threshold for uncertainty
            for neb_im in neb_images[1:]:
                neb_im.calc.calculate(neb_im,properties=['energy_stdev','forces_stdev_mean','forces_stdev_max'])
                Es = neb_im.calc.results['energy_stdev']
                Fs = neb_im.calc.results['forces_stdev_mean']
                Fsmax = neb_im.calc.results['forces_stdev_max']

                Ecrit = Es > Escut
                Fcrit = Fs > Fscut
                Fmcrit = Fsmax > Fmmult*Fscut

                if Ecrit or Fcrit or Fmcrit: # If above threshold then breaks loop
                    print('Above critical')
                    last_bad = Atoms(neb_im.get_chemical_symbols(),positions=neb_im.get_positions(wrap=True),cell=neb_im.get_cell(),pbc=neb_im.get_pbc())
                    failed = True
                    break

            if failed:
                break
        
            # Run neb again
            neb_images = neb_sampler(molecule_object, sampler_config, model_path, current_model_id, N_neb, neb_steps)

    # Dimer sampling only occurs if NEB structure not identified or only dimer method passed in config file
    if failed == False and 'dimer' in reactive_sampling_method.lower():
        
        dimer_struct = dimer_sampler(molecule_object, sampler_config, model_path, current_model_id, N_neb, neb_steps)

        for i in range(0,max_iter):
            dimer_struct.calc.calculate(dimer_struct,properties=['energy_stdev','forces_stdev_mean','forces_stdev_max'])
            Es = dimer_struct.calc.results['energy_stdev']
            Fs = dimer_struct.calc.results['forces_stdev_mean']
            Fsmax = dimer_struct.calc.results['forces_stdev_max']
            
            Ecrit = Es > Escut
            Fcrit = Fs > Fscut
            Fmcrit = Fsmax > Fmmult*Fscut

            if Ecrit or Fcrit or Fmcrit: 
                last_bad = Atoms(dimer_struct.get_chemical_symbols(),positions=dimer_struct.get_positions(wrap=True),cell=dimer_struct.get_cell(),pbc=dimer_struct.get_pbc())
                failed = True
                break

            dimer_struct = dimer_sampler(molecule_object, sampler_config, model_path, current_model_id, N_neb, neb_steps)


    if failed:
        ase_atoms = last_bad
        
        meta_dict = { "chemical_symbols" : ase_atoms.get_chemical_symbols(),
                      "positions" : ase_atoms.get_positions(wrap=True),
                      "cell" : ase_atoms.get_cell(),
                      "reactant":molecule_object.metadata['reactant'],
                      "product": molecule_object.metadata['product'],
                      "ts":molecule_object.metadata['ts']
                     }

        meta_dict.update(molecule_object.get_metadata())
    
        ase_atoms.calc = None  # replace calculator for return

        molecule_object.update_metadata(meta_dict)

        molecule_object.update_atoms(ase_atoms)
        return molecule_object
    else:
        # Else no update
        molecule_object.update_atoms(None)
        return molecule_object
    


[docs] def neb_sampler(molecule_object, sampler_config, model_path, current_model_id, N_neb, neb_steps): # ASE's neb sampler calc_class = load_module_from_config(sampler_config, 'ase_calculator') gpu = '0' #os.environ.get('PARSL_WORKER_RANK') now using cuda visible devices calculator_list = calc_class(model_path.format(current_model_id) + '/',device='cuda:'+gpu) # Read initial and final states: initial = molecule_object.metadata['reactant'] final = molecule_object.metadata['product'] print(initial.positions) print(final.positions) # Make a band consisting of N_neb images: images = [initial] images += [initial.copy() for i in range(N_neb)] images += [final] neb = NEB(images) # Interpolate linearly the potisions of the three middle images: neb.interpolate() # Set calculators: for image in neb.images: image.calc = MLMD_calculator(calculator_list) # Optimize: optimizer = BFGS(neb) optimizer.run(fmax=0.04, steps=neb_steps) print('Finished NEB sampler') return neb.images
[docs] def dimer_sampler(molecule_object, sampler_config, model_path, current_model_id, N_neb, neb_steps): # ASE's dimer sampler calc_class = load_module_from_config(sampler_config, 'ase_calculator') gpu = '0' #os.environ.get('PARSL_WORKER_RANK') now using cuda visible devices calculator_list = calc_class(model_path.format(current_model_id) + '/',device='cuda:'+gpu) ts = molecule_object.metadata['ts'] ts.calc = MLMD_calculator(calculator_list) # Set up the dimer with DimerControl( initial_eigenmode_method="gauss", displacement_method="vector", logfile=None, ) as d_control: d_atoms = MinModeAtoms(ts, d_control) # Converge to a saddle point with MinModeTranslate( d_atoms, trajectory="dimer_method.traj", logfile='dimer.log' ) as dim_rlx: dim_rlx.run(fmax=0.01, steps=50) ts.get_forces() return ts