Source code for pydna_epbd.simulation.simulation_steps

from pydna_epbd.simulation.dna import DNA
from pydna_epbd.simulation.mc_simulation import Simulation
from pydna_epbd.monitors.all_monitors import Monitors


[docs]def run_single_iteration( n_preheating_steps, n_steps_after_preheating, seq_id, seq, temp, iter_no ): """This runs a single MCMC simulation iteration. Args: n_preheating_steps (int): Number of preheating steps (from input config). n_steps_after_preheating (int): Number of post preheating steps (from input config). seq_id (str): Seq-id attached to each input sequence. seq (str): DNA sequence. temp (float): Temperature in Kelvin (from input config). iter_no (int): Iteration index. Returns: Monitors: A Monitors object. """ # every iteration is independent # start_time = time.time() total_steps = n_preheating_steps + n_steps_after_preheating dna = DNA(seq) monitors = Monitors(dna, n_preheating_steps, n_steps_after_preheating) monitors.update_state(seq_id, temp, iter_no) simulation = Simulation(dna) simulation.init_temp(temp) simulation.execute(monitors, total_steps, n_preheating_steps) monitors.collect_at_iter() # print(f"finished -> seq_id:{seq_id} | temp:{temp} | iter:{iter_no} -> {(time.time()-start_time)} seconds to execute") # per iteration time log return monitors
import os, time from pydna_epbd.simulation.aggregate_outputs_and_write import ( aggregate_outputs_for_single_temp, ) from joblib import delayed, Parallel
[docs]def run_sequences(sequences, input_configs): """Main function to run MCMC simulations for all DNA sequences. This initializes 100 or the number of available cpu cores-1 cpus to parallaly run n_iterations. Args: sequences (list): List of tuples. Format: [("seq_output_dir", "seq_id", "seq")] input_configs (InputConfigs): A InputConfigs object contaning all configurations. """ if input_configs.save_runtime: runtime_filepath = "runtimes/" + sequences[0][0].split("/")[-2] + ".txt" # print(runtime_filepath) runtime_write_mode = "a" if os.path.exists(runtime_filepath) else "w" runtime_out_handle = open(runtime_filepath, runtime_write_mode) with Parallel(n_jobs=min(100, os.cpu_count() - 1), verbose=1) as parallel: for i in range(0, len(sequences)): seq_output_dir, seq_id, seq = sequences[i] simulation_out_filepath = f"{seq_output_dir}{seq_id}.pkl" if os.path.exists(simulation_out_filepath): print("Already computed:", simulation_out_filepath) continue else: k = 0 # for k in range(10): # for 10 runs to do runtime analysis print(f"Running simulation: seq_idx:{i} | seq_id:{seq_id}") start_time = time.time() list_of_monitors = parallel( delayed(run_single_iteration)( input_configs.n_preheating_steps, input_configs.n_steps_after_preheating, seq_id, seq, input_configs.temperature, iter_no, ) for iter_no in range(input_configs.n_iterations) ) aggregate_outputs_for_single_temp( list_of_monitors, input_configs, simulation_out_filepath ) runtime = time.time() - start_time print( f"finished -> {simulation_out_filepath} -> {runtime} seconds to execute" ) if input_configs.save_runtime: runtime_out_handle.write( f"{k}:{simulation_out_filepath}:{runtime}\n" ) # break # to run 1st seq, comment-out this line if input_configs.save_runtime: runtime_out_handle.close()
# temp_idx = 0 # for seq_idx in range(0, len(sequences)): # seq_name, seq = sequences[seq_idx] # if os.path.exists(f"{input_configs.outputs_dir}{seq_name}.pkl"): # print("Already computed:", f"{input_configs.outputs_dir}{seq_name}.pkl") # else: # print(f"Running simulation for seq_idx:{seq_idx} | seq_name:{seq_name}") # # run_single_sequence(seq_name, seq) # list_of_monitors = run_single_temp(seq_name, seq, temp_idx) # aggregate_outputs_for_single_temp(seq_name, seq, list_of_monitors, input_configs) # if seq_idx==1: break # to run 1st seq, comment-out this line # def collect_at_temp(monitors:Monitors): # monitors.collect_at_temp() # return monitors # def run_single_temp(seq_name, seq, temp_idx): # list_of_monitors = Parallel(n_jobs=47, verbose=1)(delayed(run_single_iter)(seq_name, seq, temp_idx, iter_no) for iter_no in range(input_configs.n_iterations)) # # list_of_monitors = Parallel(n_jobs=47, verbose=1)(delayed(collect_at_temp)(monitors) for monitors in list_of_monitors) # return list_of_monitors # corresponding to n-iterations of the same temp # def run_single_sequence(seq_name, seq): # # corrsponding to n-temps n-iters of the same seq # list_of_list_of_monitors = Parallel(n_jobs=input_configs.n_temperatures, verbose=1)(delayed(run_single_temp)(seq_name, seq, temp_idx) for temp_idx in range(input_configs.n_temperatures)) # aggregate_outputs_for_many_temp(seq_name, seq, list_of_list_of_monitors, input_configs) # aggregating all iterations for a 'seq' at 'temp'