import os
import glob
from importlib import import_module
import numpy as np
import json
from ase.geometry import complete_cell
from ase import Atoms
from alframework.tools import pyanitools as pyt
from alframework.tools.molecules_class import MoleculesObject
import inspect
[docs]
def annealing_schedule(t, tmax, amp, per, srt, end):
"""Defines the overall temperature profile in the molecular dynamics simulation.
Args:
t (float): Current simulation time.
tmax (float): Total simulation time.
amp (float): Amplitude of the temperature oscillation.
per (float): Period of the temperature oscillation.
srt (float): Initial temperature.
end (float): Final temperature.
Returns:
(float): Temperature at step 't' given the specified annealing schedule.
"""
linear = t / tmax
linear_T = (1 - linear) * srt + linear * end
return amp * np.sin(np.pi * t / per)**2 + linear_T
[docs]
def build_ANI_info(directory):
"""Scrapes directory and create a dictionary containing the information that can be passed to 'ensemblemolecule'.
Args:
directory (str): Directory to look for the information needed to fill the dict.
Returns:
(dict): Dictionary that can be passed into 'ensemblemolecule' in ase_interface module.
"""
# TODO: Check that directory is a valid directory
ani_dict = {}
cnst_files = glob.glob(directory + '/*.params')
assert len(cnst_files) == 1, "Too many or too few params files detected: " + str(cnst_files)
ani_dict['cnstfile'] = cnst_files[0]
ani_dict['saefile'] = directory + '/sae_linfit.dat'
ani_dict['nnfprefix'] = directory + '/train'
model_dirs = glob.glob(ani_dict['nnfprefix'] + '*')
ani_dict['Nnet'] = len(model_dirs)
return ani_dict
[docs]
def random_rotation_matrix(deflection=1.0, randnums=None):
"""Returns a random rotation matrix
Args:
deflection (float): Pole deflection.
randnums (ndarray): A (3,) numpy array containing the parameters theta, phi, and z.
Returns:
(ndarray): Random 3x3 rotation matrix.
"""
if randnums is None:
randnums = np.random.uniform(size=(3,))
theta, phi, z = randnums
theta = theta * 2.0 * deflection * np.pi # Rotation about the pole (Z).
phi = phi * 2.0 * np.pi # For direction of pole deflection.
z = z * 2.0 * deflection # For magnitude of pole deflection.
r = np.sqrt(z)
V = [np.sin(phi) * r, np.cos(phi) * r, np.sqrt(2.0 - z)]
st = np.sin(theta)
ct = np.cos(theta)
R_z = np.array([[ct, st, 0], [-st, ct, 0], [0, 0, 1]])
M = (np.outer(V, V) - np.eye(3)) @ R_z
return M
[docs]
def store_current_data(h5path, system_data, properties):
"""Stores the key results of the QM calculations in the database.
Args:
h5path (str): Path to store the .h5 files.
system_data (list): A list of MoleculesObjects objects.
properties (dict): Dictionary defined in master.json whose keys are the properties that we want to retrieve
from the QM calculations and store in the database.
Returns:
(None)
"""
# system data is a list of [mol-id(string), atoms, properties dictionary]
data_dict = {}
print("Saving h5 file: " + h5path)
total_number = len(system_data)
saved_number = 0
nan_number = 0
unconverged_number = 0
for system in system_data:
assert isinstance(system, MoleculesObject), 'system must be an instance of MoleculesObject'
cur_moliculeid = system.get_moleculeid()
cur_atoms = system.get_atoms()
cur_properties = system.get_results()
molkey = compute_empirical_formula(cur_atoms.get_chemical_symbols())
# Ensure system converged before saving
if system.check_convergence():
saved_number += 1
atom_index = np.argsort(cur_atoms.get_atomic_numbers())
# If there is already a molecule with the same formula, append
if molkey in data_dict:
data_dict[molkey]["_id"].append(cur_moliculeid)
data_dict[molkey]["coordinates"].append(cur_atoms.get_positions()[atom_index])
if any(cur_atoms.get_pbc()):
data_dict[molkey]["cell"].append(complete_cell(cur_atoms.get_cell()))
for prop in properties:
if properties[prop][1].lower() == "system":
data_dict[molkey][properties[prop][0]].append(cur_properties[prop] * properties[prop][2])
elif properties[prop][1].lower() == "atomic":
data_dict[molkey][properties[prop][0]].append(np.array(cur_properties[prop])[atom_index] * properties[prop][2])
else:
raise RuntimeError('Unknown property format')
# If there is not already a molecule with this empirical formula, make a new one
else:
data_dict[molkey] = {}
data_dict[molkey]["species"] = np.array(cur_atoms.get_chemical_symbols())[atom_index]
data_dict[molkey]["_id"] = [cur_moliculeid]
data_dict[molkey]["coordinates"] = [cur_atoms.get_positions()[atom_index]]
if any(cur_atoms.get_pbc()):
data_dict[molkey]["cell"] = [complete_cell(cur_atoms.get_cell())]
for prop in properties.keys():
if properties[prop][1].lower() == "system":
data_dict[molkey][properties[prop][0]] = [cur_properties[prop] * properties[prop][2]]
elif properties[prop][1].lower() == "atomic":
data_dict[molkey][properties[prop][0]] = [np.array(cur_properties[prop])[atom_index] * properties[prop][2]]
else:
raise RuntimeError('Unknown property format')
elif not isinstance(system, MoleculesObject): # code never enter in this line, but leaving for now to avoid problems
nan_number += 1
elif not system.check_convergence():
unconverged_number = unconverged_number + 1
else:
print("Warning: molecule not saved for unknown reason")
print("Total Systems: " + str(total_number))
print("Saved Systems: " + str(saved_number))
print("NAN Systems: " + str(nan_number))
print("Unconverged Systems: " + str(unconverged_number))
for isokey in data_dict:
# print('isokeys:',isokey)
for propkey in data_dict[isokey]:
if propkey.lower() in ['species','_id']:
data_dict[isokey][propkey] = [el.encode('utf-8') for el in list(data_dict[isokey][propkey])]
data_dict[isokey][propkey] = np.array(data_dict[isokey][propkey])
else:
data_dict[isokey][propkey] = np.array(data_dict[isokey][propkey])
# print("encoding species")
# if type(data_dict[isokey][propkey]) is 'numpy.ndarray':
# data_dict[isokey][propkey] = np.stack(data_dict[isokey][propkey])
# else:
# data_dict[isokey][propkey] = np.array(data_dict[isokey][propkey])
# print('propkey:', propkey,data_dict[isokey][propkey].shape)
#
dpack = pyt.datapacker(h5path)
for key in data_dict:
dpack.store_data(key, **data_dict[key])
dpack.cleanup()
# Recommend creation of parsl queue object
[docs]
class parsl_task_queue():
def __init__(self):
# Create a list
self.task_list = []
[docs]
def add_task(self,task):
"""Add a task to the task list
"""
self.task_list.append(task)
#self.task_list[-1].start()
[docs]
def get_completed_number(self):
"""Get the number of completed tasks.
"""
task_status = [task.done() for task in self.task_list]
return int(np.sum(task_status))
[docs]
def get_running_number(self):
"""Get the number of running tasks.
"""
task_status = [task.running() for task in self.task_list]
return int(np.sum(task_status))
[docs]
def get_number(self):
"""Get the the number of tasks in the task list.
"""
return len(self.task_list)
[docs]
def get_queued_number(self):
"""Get the number of queued tasks.
"""
return int(self.get_number() - self.get_running_number() - self.get_completed_number())
[docs]
def get_exec_done_number(self):
successful_number = 0
for taski,task in enumerate(self.task_list):
task_status = task.task_status()
if task_status == 'exec_done' and task.done:
successful_number=successful_number+1
return(successful_number)
[docs]
def get_failed_number(self):
failed_number = 0
for taski,task in enumerate(self.task_list):
task_status = task.task_status()
if task_status == 'failed':
failed_number=failed_number+1
return(failed_number)
[docs]
def get_task_results(self):
"""Get the reults of the tasks
Returns:
(tuple): A tuple containing, respectively, a list that stores the task results and an int that tells
the number of failed tasks.
"""
results_list = []
failed_number = 0
for taski,task in reversed(list(enumerate(self.task_list))):
task_status = task.task_status()
if task_status == 'exec_done' and task.done:
results_list.append(task.result())
del self.task_list[taski]
elif task_status == 'failed':
failed_number += failed_number
del self.task_list[taski]
return results_list, failed_number
[docs]
def get_task_status(self):
"""Get the status of the tasks.
Returns:
(list): List containing the status of the tasks.
"""
status_list = []
for task in self.task_list:
status_list.append(task.task_status())
return status_list
[docs]
def print_status(self):
"""Prints the status of the tasks.
"""
print('Total Tasks: {:d}'.format(self.get_number()))
# print('Queued Tasks: {:d}'.format(self.get_queued_number()))
# print('Running Tasks: {:d}'.format(self.get_running_number()))
print('Finished Tasks: {:d}'.format(self.get_completed_number()))
print('Finished Successfully Tasks: {:d}'.format(self.get_exec_done_number()))
print('Finished Failed Tasks: {:d}'.format(self.get_failed_number()))
# Used to find current version of directories to re-start with
[docs]
def find_empty_directory(pattern):
"""Find an empty directory to restart.
Args:
(str): Common prefix of the directories to check.
Returns:
(int): Number of the empty directory.
"""
curI = 0
while os.path.exists(pattern.format(curI)):
curI += 1
return curI
# Throughout this code individual systems are passed around as three element lists
# element 1: metadata: this is required to include moleculeid, but may also include sampling and other metadata
# element 2: an ASE atoms object.
# element 3: Evaluated QM properties
[docs]
def system_checker(system, kill_on_fail=True, print_error=True):
"""Checks if the system returned by the builder meets all requeriments.
Args:
system (list): A list containing three elements. The first is a dict containing metadata of the system,
and one of its keys must be 'moleculeid' whose value is a unique identifier of the system.
The second element is an ASE Atoms object. The third element is a dict that stores the
desired properties from the QM calculation (e.g. forces and energies).
kill_on_fail (bool): Kills the process if something goes wrong.
print_error (bool): If True prints the error message if something goes wrong.
Returns:
(bool): True if 'system' meets all requirements and False otherwise.
"""
try:
assert isinstance(system, list) or isinstance(system, tuple)
assert len(system) == 3
assert isinstance(system[0], dict)
assert isinstance(system[0]['moleculeid'], str)
assert isinstance(system[1], Atoms)
assert isinstance(system[2], dict)
no_nan = True
if np.sum(np.isnan(system[1].get_positions())) > 0:
no_nan = False
if any(system[1].get_pbc()):
if np.sum(np.isnan(system[1].get_cell())) > 0:
no_nan = False
for prop in system[2]:
if isinstance(system[2][prop], np.ndarray):
if np.sum(np.isnan(system[2][prop])) > 0:
no_nan = False
if not no_nan:
raise RuntimeError('NAN in system')
return True
except Exception as e:
if print_error:
print(e)
if kill_on_fail:
raise RuntimeError('Atomic system failed to meet requirements.')
return False
[docs]
def load_config_file(path, master_directory=None):
"""Extracts the main parameters to run ALF from the master json file.
Args:
path (str): Path of the master json file.
master_directory (str): Master directory to run ALF.
Returns:
(dict): Dictionary containing the main parameters needed to configure ALF.
"""
with open(path, 'r') as input_file:
config = json.load(input_file)
if master_directory is None and "master_directory" in config:
if config["master_directory"] == 'pwd':
master_directory = os.getcwd() + '/'
else:
master_directory = config["master_directory"] + '/'
config["master_directory"] = master_directory
if isinstance(config, dict):
dir_dict = {}
for entry in config:
if (entry[-3:].lower() == 'dir') and config[entry][0] != '/':
config[entry] = master_directory + config[entry]
elif entry[-4:].lower() == 'path':
if config[entry][0] != '/':
config[entry] = master_directory + config[entry]
# For every 'path' entry, make a corresponding 'dir' entry that holds files in the path
dir_dict[entry[:-4]+'dir'] = '/'.join(config[entry].split('/')[:-1]) + '/'
config.update(dir_dict)
return config
[docs]
def load_module_from_config(config, module_field):
"""Loads a module from a config json file.
Args:
config (dict): Dictionary representing the config json file.
module_field (str): Module to load.
Returns:
(type): Type of the class that was loaded from the config file.
"""
module_string = '.'.join(config[module_field].split('.')[:-1])
class_string = config[module_field].split('.')[-1]
return getattr(import_module(module_string), class_string)
[docs]
def load_module_from_string(module_field):
"""Loads a python module from a string.
Args:
module_field (str): Strings that contains the modules to be loaded.
Returns:
(type): Type of the class that was loaded from module_field.
"""
module_string = '.'.join(module_field.split('.')[:-1])
class_string = module_field.split('.')[-1]
return getattr(import_module(module_string), class_string)