Source code for pyDNMFk.dist_svd

# @Author: Gopinath Chennupati, Manish Bhattarai, Erik Skau
from datetime import datetime
from math import sqrt
from random import normalvariate

from .utils import *


[docs]class DistSVD(): r""" Distributed Computation of SVD along 1D distribution of the data. Only U or V is distributed based on data size. Parameters ---------- A : ndarray Distributed Data args : class Class which comprises following attributes args.globalm : int Global row dimensions of A args.globaln : int Global column dimension of A args.k : int(optional) Rank for decomposition args.p_r : int Cartesian grid row count args.p_c : int Cartesian grid column count args.seed : int(optional) Set the random seed args.comm : object comm object for distributed read args.eps : float Epsilon value""" @comm_timing() def __init__(self, args, A): super(DistSVD, self).__init__() self.args = args self.globalm = self.args.m # ['globalm'] self.globaln = self.args.n # ['globaln'] # SVD inits self.k = self.args.k if self.args.k else min(self.globalm, self.globaln) self.svdSoFar = [] # MPI inits self.comm = args.comm # ['comm'] self.rank = self.comm.rank self.p = self.comm.size self.grid_comm = self.comm.cartesian2d self.coords = self.comm.coord2d self.proc_rows = self.args.p_r # num_row_ranks() self.proc_cols = self.args.p_c if self.globalm > self.globaln: assert self.proc_rows > self.proc_cols, "m>n , ensure p_r>p_c" elif self.globalm < self.globaln: assert self.proc_rows < self.proc_cols, "m<n , ensure p_r<p_c" self.eps = self.args.eps self.A = A self.mat_for_1D = self.A.copy() # seed try: self.seed = self.args.seed except: self.seed = datetime.now().timestamp() # precision # self._tensor_type = torch.FloatTensor # For some reason torch.cat doesn't accept double tensors
[docs] @comm_timing() def normalize_by_W(self, Wall, Hall, comm1): """Normalize the factors W and H""" Wall_norm = Wall.sum(axis=0, keepdims=True) if self.proc_rows != 1: Wall_norm = comm1.allreduce(Wall_norm, op=MPI.SUM) Wall_norm += self.eps # temp = np.sqrt(Wall_norm) Wall /= Wall_norm Hall *= Wall_norm.T return Wall, Hall
[docs] @comm_timing() def randomUnitVector(self, d): """ Construnct a rondom unit vector """ unnormalized = [normalvariate(0, 1) for _ in range(d)] theNorm = sqrt(sum(x * x for x in unnormalized)) return np.asarray([x / theNorm for x in unnormalized], dtype='float64')
[docs] @comm_timing() def globalGram(self, X, Y): """Compute the global gram betwee X and Y""" B = X @ Y B = self.grid_comm.allreduce(B) return B
[docs] @comm_timing() def svd1D(self): """ One dimensional SVD """ buf = self.randomUnitVector(min(self.globalm, self.globaln)) self.lastV = None if self.rank == 0: buf += np.zeros(buf.shape) self.grid_comm.Bcast(buf, 0) # self.currV = torch.from_numpy(buf).type(self._tensor_type) self.currV = buf # print("current Vector {}".format(self.currV)) # self.At = self.mat_for_1D.t() # Transpose current A self.At = self.mat_for_1D.T # print("At rank {} A.t() \n{}".format(self.rank, self.At)) if self.globalm >= self.globaln: self.B = self.globalGram(self.At, self.mat_for_1D) else: self.B = self.globalGram(self.mat_for_1D, self.A.T) if self.rank != 0: del self.B b = np.zeros(self.currV.shape) if self.rank == 0: iteration = 0 while True: lastV = self.currV self.currV = self.B @ self.currV # self.currV = self.currV / torch.norm(self.currV) self.currV = self.currV / np.linalg.norm(self.currV) r = np.dot(self.currV, lastV) iteration += 1 # if abs(r.item()) > 1. - eps: if abs(r.item()) > 1. - self.eps: # print("Converged in {}".format(iteration)) # b = self.currV.numpy() b += self.currV # self.currV = torch.from_numpy(b) self.currV = b # return self.currV break self.grid_comm.Bcast(self.currV, 0)
[docs] @comm_timing() def calc_norm(self, vec): """Compute the norm of vector""" partial_sq_sum = sum(vec * vec) global_sq_sum = self.grid_comm.allreduce(partial_sq_sum) # norm = torch.sqrt(global_sq_sum) norm = np.sqrt(global_sq_sum) return norm
[docs] @comm_timing() def svd(self): """ Computes the SVD for a given matrix Returns ------- singularValues : list List of singular values of length k Us :ndarray Factor Us of shape (m/p_r,k) Vs : ndarray Factor Vs of shape (k,n/p_c)""" for i in range(self.k): self.mat_for_1D = self.A.copy() for sigma, u, v in self.svdSoFar[:i]: # outer = torch.ger(u, v) outer = np.outer(u, v) self.mat_for_1D -= sigma * (outer) # Need to fix this if self.globalm > self.globaln: self.svd1D() v = self.currV u_unnorm = self.A @ v sig = self.calc_norm(u_unnorm) # next singular value u = u_unnorm / sig else: self.svd1D() u = self.currV v_unnorm = self.A.T @ u sig = self.calc_norm(v_unnorm) # next singular value v = v_unnorm / sig # if self.rank==0: print(sig) self.svdSoFar.append([sig, u, v]) singularValues, us, vs = [np.asarray(x) for x in zip(*self.svdSoFar)] # if self.rank==0: print("Rank ", self.rank, singularValues, us.shape, vs.shape) return singularValues, us.T, vs
[docs] @comm_timing() def rel_error(self, U, S, V): """Computes the relative error between the reconstructed data with factors vs original data""" X_recon = U @ S @ V err_num = np.sum((self.A - X_recon) ** 2) norm_deno = np.sum(self.A ** 2) err_num = self.grid_comm.allreduce(err_num) norm_deno = self.grid_comm.allreduce(norm_deno) err = np.sqrt(err_num) / np.sqrt(norm_deno) return err
[docs] @comm_timing() def nnsvd(self, flag=1, verbose=1): r""" Computes the distributed Non-Negative SVD(NNSVD) components from the computed SVD factors. Parameters ---------- flag : bool, optional Computes nnSVD factors with different configurations verbose : bool, optional Verbose to set returned errors. If true returns SVD and NNSVD reconstruction errors. Returns ------- W :ndarray Non-negative factor W of shape (m/p_r,k) H : ndarray Non-negative factor H of shape (k,n/p_c) error : dictionary (optional) Dictinoary of reconstruction error for svd and nnsvd """ singularValues, U, V = self.svd() if verbose == 1: recon_err_svd = self.rel_error(U, np.diag(singularValues), V) if self.rank == 0: print('Reconstruction error for SVD is :', recon_err_svd) if flag == 0: S = np.diag(singularValues) W = U H = S @ V W[W < 0] = 0 H[H < 0] = 0 # return W,H elif flag == 1: S = singularValues.copy() U, S, V = U[:, :self.k], S[:self.k], V[:self.k, :] V = V.T UP = np.where(U > 0, U, 0) UN = np.where(U < 0, -U, 0) VP = np.where(V > 0, V, 0) VN = np.where(V < 0, -V, 0) UP_norm = np.sum(np.square(UP), 0) UP_norm = np.sqrt(self.grid_comm.allreduce(UP_norm)) UN_norm = np.sum(np.square(UN), 0) UN_norm = np.sqrt(self.grid_comm.allreduce(UN_norm)) VP_norm = np.sum(np.square(VP), 0) VP_norm = np.sqrt(VP_norm) VN_norm = np.sum(np.square(VN), 0) VN_norm = np.sqrt(VN_norm) if self.globalm > self.globaln: UP_norm, UN_norm = UP_norm / self.p, UN_norm / self.p mp = np.sqrt(UP_norm * VP_norm * S) mn = np.sqrt(UN_norm * VN_norm * S) W = np.where(mp > mn, mp * UP / (UP_norm + self.eps), mn * UN / (UN_norm + self.eps)) H = np.where(mp > mn, mp * VP / (VP_norm + self.eps), mn * VN / (VN_norm + self.eps)).T if verbose == 1: # print(W.shape,H.shape) recon_err_nnsvd = self.rel_error(W, np.eye(self.k), H) if self.rank == 0: print('Reconstruction error for nnSVD is :', recon_err_nnsvd) if verbose == 1: return self.normalize_by_W(W, H, self.grid_comm), {'recon_err_svd': recon_err_svd, 'recon_err_nnsvd': recon_err_nnsvd} else: return self.normalize_by_W(W, H, self.grid_comm)