Source code for TELF.factorization.decompositions.tri_nmf_fro_mu


# %%
from .utilities.generic_utils import get_np, get_scipy, update_opts
from .utilities.math_utils import fro_norm
from tqdm import tqdm
from .utilities.concensus_matrix import compute_connectivity_mat


[docs] def H_update(X, W, H, opts=None, use_gpu=True, mask=None, alpha_h=1.0): r""" Multiplicative update algorithm for the right factor, :math:`H`, in a nonnegative optimization with Frobenius norm loss function. .. math:: \underset{H}{\operatorname{minimize}} &= \frac{1}{2} \Vert X - W H \Vert_F^2 \\ \text{subject to} & \quad H \geq 0 Args: X (ndarray, sparse matrix): Nonnegative m by n matrix to decompose. W (ndarray): Nonnegative m by k left factor of X. H (ndarray): Nonnegative k by n initialization of right factor of X. opts (dict), optional: Dictionary or optional arguments. 'hist' (list): list to append the objective function to. 'niter' (int): number of iterations. Returns: H (ndarray): Nonnegative k by n right factor of X. """ if mask is not None: mask = mask.T return W_update(X.T, H.T, W.T, opts=opts, use_gpu=use_gpu, mask=mask, alpha_w=alpha_h).T
[docs] def W_update(X, W, H, opts=None, use_gpu=True, mask=None, alpha_w=1.0): r""" Multiplicative update algorithm for the left factor, :math:`W`, in a tri - nonnegative optimization with Frobenius norm loss function. .. math:: \underset{W}{\operatorname{minimize}} &= \frac{1}{2} \Vert X - W H \Vert_F^2 \\ \text{subject to} & \quad W \geq 0 Args: X (ndarray, sparse matrix): Nonnegative m by n matrix to decompose. W (ndarray): Nonnegative m by k initialization of left factor of X. H (ndarray): Nonnegative k by n right factor of X. opts (dict), optional: Dictionary or optional arguments. 'hist' (list): list to append the objective function to. 'niter' (int): number of iterations. Returns: W (ndarray): Nonnegative m by k right factor of X. """ default_opts = {"niter": 1000, "hist": None} opts = update_opts(default_opts, opts) np = get_np(X, use_gpu=use_gpu) scipy = get_scipy(X, use_gpu=use_gpu) dtype = X.dtype if np.issubdtype(dtype, np.integer): eps = np.finfo(float).eps elif np.issubdtype(dtype, np.floating): eps = np.finfo(dtype).eps else: raise Exception("Unknown data type!") # * deal with NaNs in data if mask is not None: X[mask] = 0 # * set NaNs to zeros first, will update later W = np.maximum(W.astype(dtype), eps) H = H.astype(dtype) HHT = H @ H.T if mask is not None: for i in range(opts["niter"]): if scipy.sparse.issparse(X): X = X.tocsr() X._has_canonical_format = True XHT = X.dot(H.T) else: XHT = X @ H.T W /= (W @ HHT + 2.0*alpha_w*W@W.T@W + eps) W *= (XHT + 2.0*alpha_w*W) if (i + 1) % 10 == 0: W = np.maximum(W, eps) Xhat = W@H X[mask] = Xhat[mask] # *update NaN spots if opts["hist"] is not None: opts["hist"].append(fro_norm(X - W @ H)) else: if scipy.sparse.issparse(X): # bug in setting has_canonical_format flag in cupy # https://github.com/cupy/cupy/issues/2365 # issue is closed, but still not fixed. X = X.tocsr() X._has_canonical_format = True XHT = X.dot(H.T) else: XHT = X @ H.T for i in range(opts["niter"]): W /= (W @ HHT + 2.0*alpha_w*W@W.T@W + eps) W *= (XHT + 2.0*alpha_w*W) if (i + 1) % 10 == 0: W = np.maximum(W, eps) if opts["hist"] is not None: opts["hist"].append(fro_norm(X - W @ H)) return W
[docs] def S_update(X, W, S, H, opts=None, use_gpu=True): r""" Multiplicative update algorithm for the R factor in a nonnegative optimization with Frobenius norm loss function. .. math:: \underset{R}{\operatorname{minimize}} &= \sum \frac{1}{2} \Vert X_i - A R_i A^\top \Vert_F^2 \\ \text{subject to} & \quad R \geq 0 Args: X (list of ndarray, sparse matrix): List of nonnegative m by m matrices to decompose. A (ndarray): Nonnegative m by k matrix in the decomposition. R (list of ndarray): List of nonnegative k by k initilizations for the decomposition so X[k] is paired with R[k]. opts (dict), optional: Dictionary or optional arguments. 'hist' (list): list to append the objective function to. 'niter' (int): number of iterations. Returns: R (list of ndarray): List of nonnegative k by k factors in the decomposition. """ np = get_np(X, use_gpu=use_gpu) scipy = get_scipy(X, use_gpu=use_gpu) dtype = X.dtype if np.issubdtype(dtype, np.integer): eps = np.finfo(float).eps elif np.issubdtype(dtype, np.floating): eps = np.finfo(dtype).eps else: raise Exception("Unknown data type!") default_opts = {"niter": 1000, "hist": None} opts = update_opts(default_opts, opts) W = W.astype(dtype) S = S.astype(dtype) H = H.astype(dtype) if scipy.sparse.issparse(X): X = X.astype(dtype).tocsr() X._has_canonical_format = True else: X = X.astype(dtype) # ATXA = [A.T.dot(x.dot(A)) for x in X] WTXHT = W.T.dot(X.dot(H.T)) WTW = W.T.dot(W) HHT = H.dot(H.T) for i in range(opts["niter"]): S /= np.maximum(WTW @ S @ HHT, eps) S *= WTXHT if (i + 1) % 10 == 0: S = np.maximum(S, eps) if opts["hist"] is not None: opts["hist"].append(fro_norm(X - W @ S @ H)) return S
[docs] def trinmf(X, W, S, H, niter=1000, hist=None, W_opts={"niter": 1, "hist": None}, H_opts={"niter": 1, "hist": None}, use_gpu=True, nmf_verbose=True, mask=None, use_consensus_stopping=0, alpha=[1.0, 1.0] ): r""" Multiplicative update algorithm for a nonnegative optimization with Frobenius norm loss function With orthogonality constraints Args: X (ndarray, sparse matrix): Nonnegative m by n matrix to decompose. W (ndarray): Nonnegative m by kw initialization of left factor of X. S (ndarray): Nonnegative kx by kh initialization of the middle factor of X. H (ndarray): Nonnegative kh by n initialization of right factor of X. opts (dict), optional: Dictionary or optional arguments. 'hist' (list): list to append the objective function to. 'niter' (int): number of iterations. 'W_opts' (dict): options dictionary for :meth:`W_update`. 'H_opts' (dict): options dictionary for :meth:`H_update`. Returns: W (ndarray): Nonnegative m by kw left factor of X. S (ndarray): Nonnegative kw by kh middle factor of X. H (ndarray): Nonnegative kh by n right factor of X. """ np = get_np(X, use_gpu=use_gpu) scipy = get_scipy(X, use_gpu=use_gpu) dtype = X.dtype # * Nans currently only works with numpy if mask is not None: X[mask] = 0 # * set NaNs to zeros first, will update later if np.issubdtype(dtype, np.integer): eps = np.finfo(float).eps elif np.issubdtype(dtype, np.floating): eps = np.finfo(dtype).eps else: raise Exception("Unknown data type!") W = np.maximum(W.astype(dtype), eps) S = np.maximum(S.astype(dtype), eps) H = np.maximum(H.astype(dtype), eps) if scipy.sparse.issparse(X): Xcsc = X.T.T.tocsc() Xcsr = X.T.T.tocsr() else: pass if use_consensus_stopping > 0: conmatold = 0 conmat = 0 inc = 0 for i in tqdm(range(niter), disable=nmf_verbose == False): if scipy.sparse.issparse(X): H = H_update(Xcsc, W@S, H, H_opts, alpha_h=alpha[1], use_gpu=use_gpu) W = W_update(Xcsr, W, S@H, W_opts, alpha_w=alpha[0], use_gpu=use_gpu) S = S_update(Xcsr, W, S, H, use_gpu=use_gpu) else: H = H_update(X, W@S, H, H_opts, alpha_h=alpha[1], use_gpu=use_gpu) W = W_update(X, W, S@H, W_opts, alpha_w=alpha[0], use_gpu=use_gpu) S = S_update(X, W, S, H, use_gpu=use_gpu) if i % 10 == 0: H = np.maximum(H.astype(dtype), eps) W = np.maximum(W.astype(dtype), eps) S = np.maximum(S.astype(dtype), eps) if hist is not None: hist.append(fro_norm(X - W @ S @ H)) if mask is not None: # *update mask Xhat = W@H X[mask] = Xhat[mask] # * checking the consensus stopping if use_consensus_stopping > 0: conmat = compute_connectivity_mat(H) if np.sum(conmat != conmatold) == 0: inc += 1 if inc >= use_consensus_stopping: break else: conmatold = conmat # * Normalize W and H to have sum one along the mode axis # * factors are merged into S matrix Wsum = np.sum(W, 0, keepdims=True) Hsum = np.sum(H, 1, keepdims=True) # H = H * Wsum.T H = H / Hsum W = W / Wsum S = Wsum.T * S * Hsum.T # print('Done NMF') return (W, S, H)