Source code for pyDRESCALk.data_io

# @author: Manish Bhattarai
import glob
import os

import h5py
import pandas as pd
from scipy.io import loadmat

from .utils import *
import pickle

[docs]class data_read(): r"""Class for reading data. Parameters: args (class): Class which comprises following attributes fpath (str): Directory path of file to be read pgrid (tuple): Cartesian grid configuration ftype (str): Type of data to read(mat/npy/csv/folder) fname (str): Name of the file to read comm (object): comm object for distributed read """ @comm_timing() def __init__(self, args): self.args = args self.fpath = args.fpath self.pgrid = [args.p_r, args.p_c] self.ftype = args.ftype self.fname = args.fname self.comm = args.comm1 self.rank = self.comm.rank self.data = 0 self.key = var_init(self.args, 'key', default=None) if self.ftype == 'folder': self.file_path = self.fpath + self.fname + str(self.comm.rank) + '.npy' else: self.file_path = self.fpath + self.fname + '.' + self.ftype
[docs] @comm_timing() def read(self): r"""Data read function""" return self.read_dat()
[docs] @comm_timing() def read_file_npy(self): r"""Numpy data read function""" self.data = np.load(self.file_path)
[docs] @comm_timing() def read_file_csv(self): r"""CSV data read function""" self.data = pd.read_csv(self.file_path, header=None).values
[docs] def read_file_pickle(self): self.data = pickle.load(open(self.file_path, "rb"))
[docs] @comm_timing() def read_file_mat(self): r"""mat file read function""" if self.key is not None: self.data = loadmat(self.file_path)[self.key] else: self.data = loadmat(self.file_path)[X]
[docs] @comm_timing() def data_partition( self): r""" This function divides the input matrix into chunks as specified by grid configuration. Return n array of shape (nrows_i, ncols_i) where i is the index of each chunk. \Sum_i^n ( nrows_i * ncols_i ) = arr.size If arr is a 2D array, the returned array should look like n subblocks with each subblock preserving the "physical" layout of arr. """ try: dtr_blk_shp = determine_block_params(self.rank, self.pgrid, self.data.shape) except: dtr_blk_shp = determine_block_params(self.rank, self.pgrid, self.data[0].shape) blk_indices = dtr_blk_shp.determine_block_index_range_asymm() try: self.data = self.data[blk_indices[0][0]:blk_indices[1][0] + 1, blk_indices[0][1]:blk_indices[1][1] + 1] except: self.data = [self.data[i][blk_indices[0][0]:blk_indices[1][0] + 1, blk_indices[0][1]:blk_indices[1][1] + 1] for i in range(len(self.data))]
[docs] @comm_timing() def save_data_to_file(self, fpath): r"""This function saves the splitted data to numpy array indexed with chunk number""" fname = fpath + 'A_' + self.comm.rank + '.npy' np.save(fname, self.data)
[docs] @comm_timing() def read_dat(self): r"""Function for reading the data and split into chunks to be reach by each MPI rank""" if self.ftype == 'npy': self.read_file_npy() self.data_partition() elif self.ftype == 'csv' or self.ftype == 'txt': self.read_file_csv() self.data_partition() elif self.ftype == 'mat': self.read_file_mat() self.data_partition() if self.ftype == 'folder': self.read_file_npy() if self.ftype == 'p' or self.ftype=='pickle': self.read_file_pickle() self.data_partition() return self.data
[docs]class split_files_save(): r"""Rank 0 based data read, split and save""" @comm_timing() def __init__(self, data, pgrid, fpath): self.data = data self.pgrid = pgrid self.p_r = pgrid[0] self.p_c = pgrid[1] self.fpath = fpath
[docs] @comm_timing() def split_files(self): r"""Compute the index range for each block and partition the data as per the chunk""" dtr_blk_idx = [determine_block_params(rank, self.pgrid, self.data.shape).determine_block_index_range_asymm() for rank in range(np.product(self.pgrid))] self.split = [self.data[i[0][0]:i[1][0] + 1, i[0][1]:i[1][1] + 1] for i in dtr_blk_idx]
[docs] @comm_timing() def save_data_to_file(self): r"""Function to save the chunks into numpy files""" s = 0 self.split = self.split_files() for i in range(self.p_r * self.p_c): name = 'A_' + str(s) + '.npy' fname = self.fpath + name arr = self.split[s - 1] np.save(fname, self.data) s += 1
[docs]class data_write(): r"""Class for writing data/results. Parameters: args (class): class which comprises following attributes results_path (str): Directory path of file to write pgrid (tuple): Cartesian grid configuration ftype (str): Type of data to read(mat/npy/csv/folder) comm (object): comm object for distributed read """ @comm_timing() def __init__(self, args): self.p_r, self.p_c = args.p_r, args.p_c self.pgrid = [self.p_r, self.p_c] #self.ftype = args.ftype self.comm = args.comm1 self.params = args self.fpath = self.params.results_paths self.rank = self.comm.rank
[docs] @comm_timing() def create_folder_dir(self, fpath): r"""Create directory if not present""" try: os.mkdir(fpath) except: pass
[docs] @comm_timing() def save_factors(self, factors, reg=False): r"""Save the W and H factors for each MPI process""" self.create_folder_dir(self.fpath) if reg == True: W_factors_pth = self.fpath + 'A_reg_factors/' H_factors_pth = self.fpath + 'R_reg_factors/' else: W_factors_pth = self.fpath + 'A_factors/' H_factors_pth = self.fpath + 'R_factors/' self.create_folder_dir(W_factors_pth) self.create_folder_dir(H_factors_pth) #if self.p_r == 1 and self.p_c != 1: if self.rank%self.p_r==0: np.save(W_factors_pth + 'A_'+str(self.rank//self.p_r)+'.npy', factors[0]) if self.rank == 0: np.save(H_factors_pth + 'R.npy', factors[1])
[docs] @comm_timing() def save_cluster_results(self, params): r"""Save cluster results to a h5 file with rank 0""" if self.rank == 0: with h5py.File(self.fpath + 'results.h5', 'w') as hf: hf.create_dataset('clusterSilhouetteCoefficients', data=params['clusterSilhouetteCoefficients']) hf.create_dataset('avgSilhouetteCoefficients', data=params['avgSilhouetteCoefficients']) #hf.create_dataset('L_err', data=params['L_err']) hf.create_dataset('L_errDist', data=params['L_errDist']) hf.create_dataset('avgErr', data=params['avgErr']) hf.create_dataset('ErrTol', data=params['recon_err'])
[docs]class read_factors(): r"""Class for reading saved factors. Args: factors_path (str): Directory path of factors to read from pgrid (tuple): Cartesian grid configuration """ @comm_timing() def __init__(self, factors_path, pgrid): self.factors_path = factors_path self.W_path = self.factors_path + 'A_factors/' self.H_path = self.factors_path + 'R_factors/' self.p_grid = pgrid self.load_factors()
[docs] @comm_timing() def custom_read_npy(self, fpath): r"""Read numpy files""" data = np.load(fpath) return data
[docs] @comm_timing() def read_factor(self, fpath): """Read factors as chunks and stack them""" files = glob.glob(fpath+'/*') data = [] if len(files) == 1: data = self.custom_read_npy(files[0]) else: for file in np.sort(files): data.append(self.custom_read_npy(file)) return data, len(files)
[docs] @comm_timing() def load_factors(self): r"""Load the final stacked factors for visualization""" W_data, ct_W = self.read_factor(self.W_path) H_data, ct_H = self.read_factor(self.H_path) if ct_W > 1: W_data = np.vstack((W_data)) return W_data, H_data