Source code for pyDRESCALk.pyDRESCALk

#@author:  Namita Kharat,Manish Bhattarai
from scipy.stats import wilcoxon
from . import config
from .dist_clustering import *
from .pyDRESCAL import *
from .plot_results import *

[docs]class sample(): """ Generates perturbed version of data based on sampling distribution. Args: data (ndarray, sparse matrix): Array of which to find a perturbation. noise_var (float): The perturbation amount. method (str) : Method for sampling (uniform/poisson) seed (float),optional : Set seed for random data generation """ @comm_timing() def __init__(self, data, noise_var, method, params,seed=None): self.np = params.np self.X = data self.noise_var = noise_var self.seed = seed if self.seed != None: self.np.random.seed(self.seed) self.method = method self.X_per = 0
[docs] @comm_timing() def randM(self): """ Multiplies each element of X by a uniform random number in (1-epsilon, 1+epsilon). """ M = 2 * self.noise_var * self.np.random.random_sample(self.X.shape).astype(self.X.dtype) + self.noise_var M = M + 1 self.X_per = self.np.multiply(self.X, M)
[docs] @comm_timing() def poisson(self): """Resamples each element of a matrix from a Poisson distribution with the mean set by that element. Y_{i,j} = Poisson(X_{i,j}""" self.X_per = self.np.random.poisson(self.X).astype(self.X.dtype)
[docs] @comm_timing() def fit(self): r""" Calls the sub routines to perform resampling on data Returns ------- X_per : ndarry Perturbed version of data """ if self.method == 'uniform': self.randM() elif self.method == 'poisson': self.poisson() return self.X_per
[docs]class pyDRESCALk(): r""" Performs the distributed RESCAL decomposition with custom clustering for estimating hidden factors k Parameters: A_ij (ndarray) : Distributed Data factors (tuple), optional : Distributed factors W and H params (class): Class which comprises following attributes params.init (str) : RESCAL initialization(rand/nnsvd) params.comm1 (object): Global Communicator params.comm (object): Modified communicator object params.k (int) : Rank for decomposition params.m (int) : Global dimensions m params.n (int) : Global dimensions n params.p_r (int): Cartesian grid row count params.p_c (int): Cartesian grid column count params.row_comm (object) : Sub communicator along row params.col_comm (object) : Sub communicator along columns params.A_update (bool) : flag to set W update True/False params.norm (str): RESCAL norm to be minimized params.method(str): RESCAL optimization method params.eps (float) : Epsilon value params.verbose (bool) : Flag to enable/disable display results params.save_factors (bool) : Flag to enable/disable saving computed factors params.perturbations (int) : Number of Perturbations for clustering params.noise_var (float) : Set noise variance for perturbing the data params.sill_thr (float) : Set the sillhouette threshold for estimating K with p-test params.start_k (int) : Starting range for Feature search K params.end_k (int) : Ending range for Feature search K""" @comm_timing() def __init__(self, X_ijk, factors=None, params=None): self.X_ijk = X_ijk self.local_m, self.local_n, self.local_n = len(self.X_ijk),self.X_ijk[0].shape[0],self.X_ijk[0].shape[1] self.params = params self.np = self.params.np self.comm1 = self.params.comm1 self.rank = self.comm1.rank self.p_r, self.p_c = self.params.p_r, self.params.p_c self.fpath = self.params.fpath self.fname = self.params.fname #self.fname = "Testrescalk" self.p = self.p_r * self.p_c if self.p_r != 1 and self.p_c != 1: self.topo = '2d' else: self.topo = '1d' self.sampling = var_init(self.params,'sampling',default='uniform') self.perturbations = var_init(self.params,'perturbations',default=10) self.noise_var = var_init(self.params,'noise_var',default=.03) self.Rall = 0 self.Aall = 0 self.recon_err = 0 self.AvgR = 0 self.AvgG = 0 self.col_err = 0 self.clusterSilhouetteCoefficients, self.avgSilhouetteCoefficients = 0, 0 self.L_errDist = 0 self.avgErr = 0 self.start_k = self.params.start_k # ['start_k'] self.end_k = self.params.end_k # ['end_k'] self.step_k = var_init(self.params,'step_k',default=1) self.verbose = var_init(params,'verbose',default=True)
[docs] @comm_timing() def fit(self): r""" Calls the sub routines to perform distributed RESCAL decomposition and then custom clustering to estimate k Returns ------- nopt : int Estimated value of latent features """ SILL_MIN = [] SILL_AVG = [] errRegres = [] errRegresTol = [] RECON = [] RECON1 = [] self.params.results_paths = self.params.results_path +self.params.fname + '/' if self.rank == 0: try: os.makedirs(self.params.results_paths) except: pass for self.k in range(self.start_k, self.end_k + 1,self.step_k): self.params.k = self.k self.pyrescalk_per_k() SILL_MIN.append(self.np.around(self.np.min(self.clusterSilhouetteCoefficients), 2)) SILL_AVG.append(self.np.around(self.np.mean(self.clusterSilhouetteCoefficients), 2)) errRegres.append([self.col_err]) errRegresTol.append([self.recon_err]) RECON.append(self.L_errDist) RECON1.append(self.avgErr) if self.rank==0: plot_results_paper(self.start_k, self.end_k,self.step_k, RECON, SILL_AVG, SILL_MIN, self.params.results_path, self.fname)
[docs] @comm_timing() def pyrescalk_per_k(self): """Performs RESCAL decomposition and clustering for each k to estimate silhouette statistics""" self.params.results_paths = self.params.results_path+ str(self.k) + '/' if self.rank == 0: try: os.makedirs(self.params.results_paths) except: pass results = [] if self.rank == 0: print('*************Computing for k=', self.k, '************') for i in range(self.perturbations): if self.rank == 0: print('Current perturbation =', i) self.params.perturbation = i data = sample(data=self.X_ijk, noise_var=self.noise_var, method=self.sampling,params=self.params, seed=self.rank*1000+i*100).fit() self.params.A_update = True results.append(pyDRESCAL(data, factors=None, params=self.params).fit()) self.Aall = self.np.stack([results[i][0] for i in range(self.perturbations)],axis=-1) #self.Aall = self.Aall.reshape(self.Aall.shape[0], self.k, self.perturbations, order='F') #n x k x perturbations self.Rall = self.np.stack([results[i][2] for i in range(self.perturbations)],axis=-1) #self.Rall = self.Rall.reshape(results[0][2].shape[0], self.k, self.Rall.shape[1], self.perturbations) #m x k x k x perturbations self.recon_err = [results[i][3] for i in range(self.perturbations)] [processAvg, processSTD, self.Rall, self.clusterSilhouetteCoefficients, self.avgSilhouetteCoefficients, idx] = custom_clustering(self.Aall, self.Rall, self.params).fit() self.AvgR = self.np.median(self.Rall, axis=-1) self.AvgA = processAvg self.params.A_update = False regressH = pyDRESCAL(self.X_ijk, factors=[self.AvgA, self.AvgR], params=self.params) self.AvgA, self.AvgA_j, self.AvgR, self.L_errDist = regressH.fit() self.avgErr = np.mean(self.recon_err) cluster_stats = {'clusterSilhouetteCoefficients': self.clusterSilhouetteCoefficients, 'avgSilhouetteCoefficients': self.avgSilhouetteCoefficients, \ 'avgErr': self.avgErr, 'recon_err': self.recon_err,'L_errDist':self.L_errDist} data_writer = data_write(self.params) data_writer.save_factors([self.AvgA, self.AvgR], reg=True) data_writer.save_cluster_results(cluster_stats)