#@author: Manish Bhattarai
from scipy.stats import wilcoxon
from . import config
from .dist_clustering import *
from .pyDNMF import *
from .plot_results import *
[docs]class sample():
"""
Generates perturbed version of data based on sampling distribution.
Parameters
----------
data : ndarray
Array of which to find a perturbation.
noise_var : float
The perturbation amount.
method : str
Method for sampling (uniform/poisson)
seed : float(optional)
Set seed for random data generation
"""
@comm_timing()
def __init__(self, data, noise_var, method, seed=None):
self.X = data
self.noise_var = noise_var
self.seed = seed
if self.seed != None:
np.random.seed(self.seed)
self.method = method
self.X_per = 0
[docs] @comm_timing()
def randM(self):
"""
Multiplies each element of X by a uniform random number in (1-epsilon, 1+epsilon).
"""
M = 2 * self.noise_var * np.random.random_sample(self.X.shape).astype(self.X.dtype) + self.noise_var
M = M + 1
self.X_per = np.multiply(self.X, M)
[docs] @comm_timing()
def poisson(self):
"""Resamples each element of a matrix from a Poisson distribution with the mean set by that element. Y_{i,j} = Poisson(X_{i,j}"""
self.X_per = np.random.poisson(self.X).astype(self.X.dtype)
[docs] @comm_timing()
def fit(self):
r"""
Calls the sub routines to perform resampling on data
Returns
-------
X_per : ndarry
Perturbed version of data
"""
if self.method == 'uniform':
self.randM()
elif self.method == 'poisson':
self.poisson()
return self.X_per
[docs]class PyNMFk():
r"""
Performs the distributed NMF decomposition with custom clustering for estimating hidden factors k
Parameters
----------
A_ij : ndarray
Distributed Data
factors : tuple (optional)
Distributed factors W and H
params : class
Class which comprises following attributes
params.init : str
NMF initialization(rand/nnsvd)
params.comm1 : object
Global Communicator
params.comm : object
Modified communicator object
params.k : int
Rank for decomposition
params.m : int
Global dimensions m
params.n : int
Global dimensions n
params.p_r : int
Cartesian grid row count
params.p_c : int
Cartesian grid column count
params.row_comm : object
Sub communicator along row
params.col_comm : object
Sub communicator along columns
params.W_update : bool
flag to set W update True/False
params.norm : str
NMF norm to be minimized
params.method : str
NMF optimization method
params.eps : float
Epsilon value
params.verbose : bool
Flag to enable/disable display results
params.save_factors : bool
Flag to enable/disable saving computed factors
params.perturbations : int
Number of Perturbations for clustering
params.noise_var : float
Set noise variance for perturbing the data
params.sill_thr : float
Set the sillhouette threshold for estimating K with p-test
params.start_k : int
Starting range for Feature search K
params.end_k : int
Ending range for Feature search K"""
@comm_timing()
def __init__(self, A_ij, factors=None, params=None):
self.A_ij = A_ij
self.local_m, self.local_n = self.A_ij.shape
self.params = params
self.comm1 = self.params.comm1
self.rank = self.comm1.rank
self.p_r, self.p_c = self.params.p_r, self.params.p_c
self.fpath = self.params.fpath
self.fname = self.params.fname
self.p = self.p_r * self.p_c
if self.p_r != 1 and self.p_c != 1:
self.topo = '2d'
else:
self.topo = '1d'
self.sampling = var_init(self.params,'sampling',default='uniform')
self.perturbations = var_init(self.params,'perturbations',default=20)
self.noise_var = var_init(self.params,'noise_var',default=.03)
self.Hall = 0
self.Wall = 0
self.recon_err = 0
self.AvgH = 0
self.AvgG = 0
self.col_err = 0
self.clusterSilhouetteCoefficients, self.avgSilhouetteCoefficients = 0, 0
self.L_errDist = 0
self.avgErr = 0
self.start_k = self.params.start_k # ['start_k']
self.end_k = self.params.end_k # ['end_k']
self.sill_thr = var_init(params,'sill_thr',default=0.9)
self.verbose = var_init(params,'verbose',default=False)
[docs] @comm_timing()
def fit(self):
r"""
Calls the sub routines to perform distributed NMF decomposition and then custom clustering to estimate k
Returns
-------
nopt : int
Estimated value of latent features
"""
SILL_MIN = []
errRegres = []
errRegresTol = []
RECON = []
RECON1 = []
self.params.results_path = self.params.results_path + self.params.fname + '/'
if self.rank == 0:
try: os.makedirs(self.params.results_paths)
except: pass
for self.k in range(self.start_k, self.end_k + 1):
self.params.k = self.k
self.pynmfk_per_k()
SILL_MIN.append(round(np.min(self.clusterSilhouetteCoefficients), 2))
errRegres.append([self.col_err])
errRegresTol.append([self.recon_err])
RECON.append(self.L_errDist)
RECON1.append(self.avgErr)
if self.rank == 0:
nopt1, pvalue1 = self.pvalueAnalysis(errRegres, SILL_MIN)
print('Rank estimated by NMFk = ', nopt1)
plot_results(self.start_k, self.end_k, RECON, RECON1, SILL_MIN, self.params.results_path, self.fname)
else:
nopt1 = None
nopt1 = self.comm1.bcast(nopt1, root=0)
self.comm1.barrier()
return nopt1
[docs] @comm_timing()
def pynmfk_per_k(self):
"""Performs NMF decomposition and clustering for each k to estimate silhouette statistics"""
self.params.results_paths = self.params.results_path+ str(self.k) + '/'
if self.rank == 0:
try: os.makedirs(self.params.results_paths)
except: pass
results = []
if self.rank == 0: print('*************Computing for k=', self.k, '************')
for i in range(self.perturbations):
if self.rank == 0: print('Current perturbation =', i)
data = sample(data=self.A_ij, noise_var=self.noise_var, method=self.sampling, seed=i * 1000).fit()
self.params.W_update = True
results.append(PyNMF(data, factors=None, params=self.params).fit())
self.Wall = np.hstack(([results[i][0] for i in range(self.perturbations)]))
self.Wall = self.Wall.reshape(self.Wall.shape[0], self.k, self.perturbations, order='F')
self.Hall = np.vstack(([results[i][1] for i in range(self.perturbations)]))
self.Hall = self.Hall.reshape(self.k, self.Hall.shape[1], self.perturbations)
self.recon_err = [results[i][2] for i in range(self.perturbations)]
[processAvg, processSTD, self.Hall, self.clusterSilhouetteCoefficients, self.avgSilhouetteCoefficients,
idx] = custom_clustering(self.Wall, self.Hall, self.params).fit()
self.AvgH = np.median(self.Hall, axis=-1)
self.AvgW = processAvg
self.params.W_update = False
regressH = PyNMF(self.A_ij, factors=[self.AvgW, self.AvgH], params=self.params)
self.AvgW, self.AvgH, self.L_errDist = regressH.fit()
self.col_err = regressH.column_err()
self.avgErr = np.mean(self.recon_err)
cluster_stats = {'clusterSilhouetteCoefficients': self.clusterSilhouetteCoefficients,
'avgSilhouetteCoefficients': self.avgSilhouetteCoefficients, 'L_errDist': self.L_errDist, \
'L_err': self.col_err, 'avgErr': self.avgErr, 'recon_err': self.recon_err}
data_writer = data_write(self.params)
data_writer.save_factors([self.AvgW, self.AvgH], reg=True)
data_writer.save_cluster_results(cluster_stats)
[docs] @comm_timing()
def pvalueAnalysis(self, errRegres, SILL_MIN):
"""
Calculates nopt by analysing the errors distributions
Parameters
----------
errRegres : array
array for storing the distributions of errors
SILL_MIN : float
Minimum of silhouette score
"""
pvalue = np.ones(self.end_k - self.start_k + 1)
oneDistrErr = errRegres[0][0];
i = 1
i_old = 0
nopt = 1
while i < (self.end_k - self.start_k + 1):
i_next = i
if SILL_MIN[i - 1] > self.sill_thr: # 0.75:
pvalue[i] = wilcoxon(oneDistrErr, errRegres[i][0])[1]
if pvalue[i] < 0.05:
i_old = i
nopt = i
oneDistrErr = np.copy(errRegres[i][0])
i = i + 1
else:
i = i + 1
else:
i = i + 1
# print('nopt=', nopt)
return nopt + self.start_k - 1, pvalue