# @Author: Manish Bhattarai, Erik Skau
import argparse
import os
from .dist_comm import *
import numpy as np
from mpi4py import MPI
[docs]def parser():
r"""
Reads the input arguments from the user and parses the parameters to the data generator module.
"""
parser = argparse.ArgumentParser(description='Data generator arguments')
parser.add_argument('--p_r', type=int, help='Now of row processors')
parser.add_argument('--p_c', type=int, help='Now of column processors')
parser.add_argument('--m', type=int, help='Global m')
parser.add_argument('--n', type=int, help='Global n')
parser.add_argument('--k', type=int, help='factors')
parser.add_argument('--fpath', default='../data/tmp/', type=str, help='data path to store(eg: tmp/)')
args = parser.parse_args()
return args
[docs]class data_generator():
r"""
Generates synthetic data in distributed manner where each MPI process generates a chunk from the data parallelly.
The W matrix is generated with gaussian distribution whereas the H matrix is random.
Parameters
----------
args : class
Class which comprises following attributes
fpath : str
Directory path of file to be stored
p_r : int
Count of row processor in the cartesian grid
p_c : int
Count of column processor in the cartesian grid
m : int
row dimension of the data
n : int
Column dimension of the data
k : int
Feature count
"""
def __init__(self, args):
self.args = args
self.rank = args.rank
self.pgrid = [args.p_r, args.p_c]
self.shape = [args.m, args.n]
self.p_r = args.p_r
self.p_c = args.p_c
self.m = args.m
self.n = args.n
self.fpath = args.fpath
self.k = args.k
# self.factor = k
[docs] def gauss_matrix_generator(self, n, k,axis=0):
r"""
Construct a matrix of dimensions n by k where the ith column is a Gaussian kernel corresponding to approximately N(i*n/k, 0.01*n^2)
Parameters
----------
n : int
the ambient space dimension
k :int
the latent space diemnsion
Returns
----------
W : ndarray
A matrix with Gaussian kernel columns of size n x k.
"""
offset = n / k / 2 - 0.5
noverk = n / k
coeff = -k / (.01 * n ** 2)
if axis==0:
return lambda i, j: np.exp(coeff * (i - (j * noverk + offset)) ** 2)
elif axis==1:
return lambda j, i: np.exp(coeff * (i - (j * noverk + offset)) ** 2)
[docs] def determine_block_index_range_asymm(self):
'''Determines the start and end indices for the Data block for each rank'''
chunk_ind = np.unravel_index(self.rank, self.pgrid)
start_inds = [i * (n // k) + min(i, n % k) for n, k, i in zip(self.shape, self.pgrid, chunk_ind)]
end_inds = [(i + 1) * (n // k) + min((i + 1), n % k) - 1 for n, k, i in zip(self.shape, self.pgrid, chunk_ind)]
return start_inds, end_inds
[docs] def determine_block_shape_asymm(self):
'''Determines the shape for the Data block for each rank'''
start_inds, end_inds = self.determine_block_index_range_asymm()
return [(j - i + 1) for (i, j) in zip(start_inds, end_inds)]
[docs] def random_matrix_generator(self, k, n, seed):
'''Generator for random matric with given seed'''
np.random.seed(seed)
return np.random.rand(n, k,k)
[docs] def dist_fromfunction(self, func, shape, pgrid, *args, unravel_index=np.unravel_index, **kwargs):
"""
produces X_{i,j} = func(i,j) in a distributed manner, so that each processor has an array_split section of X according to the grid.
"""
grid_index = unravel_index()
block_shape = [(n // k) + (i < (n % k)) * 1 for n, k, i in zip(shape, pgrid, grid_index)]
start_index = [i * (n // k) + min(i, n % k) for n, k, i in zip(shape, pgrid, grid_index)]
return np.fromfunction(lambda *x: func(*[a + b for a, b in zip(x, start_index)]), block_shape, *args, **kwargs)
[docs] def unravel_column(self):
'''finds the column rank for 2d grid'''
def wrapper(*args, **kwargs):
row, col = np.unravel_index(self.rank, self.pgrid)
return (row, col // self.pgrid[1])
return wrapper
[docs] def unravel_row(self): # ,ind, shape):
'''finds the row rank for 2d grid'''
row, col = np.unravel_index(self.rank, self.pgrid)
return (row // self.pgrid[0], col)
[docs] def create_folder_dir(self, fpath):
'''Create a folder if doesn't exist'''
try:
os.mkdir(fpath)
except:
pass
[docs] def generate_factors_data(self):
"""Generates the chunk of factors W,H and data X for each MPI process"""
A_gen1 = self.dist_fromfunction(self.gauss_matrix_generator(self.n, self.k,axis=0), (self.n, self.k), (self.p_r, 1),
unravel_index=self.unravel_column())
A_gen2 = self.dist_fromfunction(self.gauss_matrix_generator(self.n, self.k,axis=1), (self.k, self.n), (1,self.p_c),
unravel_index=self.unravel_row)
R_gen = self.random_matrix_generator(self.k, self.m,1)
X_gen = np.stack(([A_gen1@R_gen[i]@A_gen2 for i in range(self.m)]))
print('For rank=', self.rank, ' dimensions of A,R and X are ', A_gen1.shape, R_gen.shape, X_gen.shape)
return [A_gen1,A_gen2], R_gen, X_gen
[docs] def fit(self):
'''generates and save factors'''
A_gen, R_gen, X_gen = self.generate_factors_data()
self.create_folder_dir(self.fpath)
self.create_folder_dir(self.fpath + 'A_factors')
self.create_folder_dir(self.fpath + 'R_factors')
self.create_folder_dir(self.fpath + 'X')
np.save(self.fpath + 'A_factors/A_' + str(self.rank), A_gen)
np.save(self.fpath + 'R_factors/R_' + str(self.rank), R_gen)
np.save(self.fpath + 'X/X_' + str(self.rank), X_gen)
print('File successfully created and saved')
return A_gen,R_gen,X_gen
if __name__ == '__main__':
args = parser()
main_comm = MPI.COMM_WORLD
rank = main_comm.rank
comm = MPI_comm(main_comm, args.p_r, args.p_c)
args.comm1 = comm.comm
args.comm = comm
args.col_comm = comm.cart_1d_column()
args.row_comm = comm.cart_1d_row()
args.rank = main_comm.rank
data_gen = data_generator(args)
data_gen.fit()