Source code for TELF.factorization.decompositions.nmf_mc_fro_mu

# %%
from .utilities.generic_utils import get_np, get_scipy, update_opts
from .utilities.math_utils import prune, unprune, fro_norm
import numpy as np


[docs] def H_update_ADMM(X, W, H, opts=None, dual=None, use_gpu=True): """ ADMM algorithm for the right factor in a nonnegative optimization with Frobenius norm loss function. ||X-WH||_F^2 Args: X (ndarray, sparse matrix): Nonnegative m by n matrix to decompose. W (ndarray): Nonnegative m by k left factor of X. H (ndarray): Nonnegative k by n initialization of right factor of X. opts (dict), optional: Dictionary or optional arguments. 'hist' (list): list to append the objective function to. 'niter' (int): number of iterations. 'rho' (double): convergence parameter. Returns: H (ndarray): Nonnegative k by n right factor of X. """ if dual is not None: dual = dual.T return W_update_ADMM(X.T, H.T, W.T, opts=opts, dual=dual, use_gpu=use_gpu).T
[docs] def W_update_ADMM(X, W, H, opts=None, dual=None, use_gpu=True): """ ADMM algorithm for the left factor in a nonnegative optimization with Frobenius norm loss function. ||X-WH||_F^2 Args: X (ndarray, sparse matrix): Nonnegative m by n matrix to decompose. W (ndarray): Nonnegative m by k initialization of left factor of X. H (ndarray): Nonnegative k by n right factor of X. opts (dict), optional: Dictionary or optional arguments. 'hist' (list): list to append the objective function to. 'niter' (int): number of iterations. 'rho' (double): convergence parameter. Returns: W (ndarray): Nonnegative m by k left factor of X. """ default_opts = {"niter": 1000, "hist": None, "rho": 1.0} opts = update_opts(default_opts, opts) np = get_np(X, use_gpu=use_gpu) scipy = get_scipy(X, use_gpu=use_gpu) dtype = X.dtype if np.issubdtype(dtype, np.integer): eps = np.finfo(float).eps elif np.issubdtype(dtype, np.floating): eps = np.finfo(dtype).eps else: raise Exception("Unknown data type!") W = W.astype(dtype) if dual is None: dual = np.zeros_like(W) H = H.astype(dtype) HHT_rhoI = scipy.linalg.lu_factor( H @ H.T + opts["rho"] * np.identity(H.shape[0], dtype=dtype), overwrite_a=True, check_finite=False, ) if scipy.sparse.issparse(X): # bug in setting has_canonical_format flag in cupy # https://github.com/cupy/cupy/issues/2365 # issue is closed, but still not fixed. X = X.tocsr() X._has_canonical_format = True XHT = np.array(X.dot(H.T)) else: XHT = X @ H.T for i in range(opts["niter"]): primal = scipy.linalg.lu_solve( HHT_rhoI, (XHT + opts["rho"] * (W - dual)).T, overwrite_b=True, check_finite=False, ).T W = np.maximum(primal + dual, 0) # ! change this line for W constraint W = np.minimum(primal + dual, 1) dual += primal - W if opts["hist"] is not None: opts["hist"].append(fro_norm(W)) return W
[docs] def H_update_MU(X, W, H, opts=None, use_gpu=True): """ Multiplicative update algorithm for the right factor in a nonnegative optimization with Frobenius norm loss function. ||X-WH||_F^2 Args: X (ndarray, sparse matrix): Nonnegative m by n matrix to decompose. W (ndarray): Nonnegative m by k left factor of X. H (ndarray): Nonnegative k by n initialization of right factor of X. opts (dict), optional: Dictionary or optional arguments. 'hist' (list): list to append the objective function to. 'niter' (int): number of iterations. Returns: H (ndarray): Nonnegative k by n right factor of X. """ return W_update_MU(X.T, H.T, W.T, opts=opts, use_gpu=use_gpu).T
[docs] def W_update_MU(X, W, H, opts=None, use_gpu=True): """ Multiplicative update algorithm for the left factor in a nonnegative optimization with Frobenius norm loss function. ||X-WH||_F^2 Args: X (ndarray, sparse matrix): Nonnegative m by n matrix to decompose. W (ndarray): Nonnegative m by k initialization of left factor of X. H (ndarray): Nonnegative k by n right factor of X. opts (dict), optional: Dictionary or optional arguments. 'hist' (list): list to append the objective function to. 'niter' (int): number of iterations. Returns: W (ndarray): Nonnegative m by k left factor of X. """ default_opts = {"niter": 1000, "hist": None, "alpha": 0} opts = update_opts(default_opts, opts) alpha = opts[ "alpha" ] #! this is the regularization parameter (lambda in derivation) np = get_np(X, use_gpu=use_gpu) scipy = get_scipy(X, use_gpu=use_gpu) dtype = X.dtype if np.issubdtype(dtype, np.integer): eps = np.finfo(float).eps elif np.issubdtype(dtype, np.floating): eps = np.finfo(dtype).eps else: raise Exception("Unknown data type!") W = np.maximum(W.astype(dtype), eps) H = H.astype(dtype) HHT = H @ H.T if scipy.sparse.issparse(X): # bug in setting has_canonical_format flag in cupy # https://github.com/cupy/cupy/issues/2365 # issue is closed, but still not fixed. X = X.tocsr() X._has_canonical_format = True XHT = X.dot(H.T) else: XHT = X @ H.T for i in range(opts["niter"]): # import pdb; pdb.set_trace() #! stop here for debugging W = ( W * (XHT + 3 * alpha * W ** 2) / (W @ HHT + 2 * alpha * (W ** 3) + alpha * W + 1e-9) ) if (i + 1) % 10 == 0: W = np.maximum(W, eps) if opts["hist"] is not None: opts["hist"].append(W) return W
[docs] def nmf_with_ADMM(X, W, H, lowerthres=1, upperthres=None, opts=None, use_gpu=True): """ Multiplicative update algorithm for NMF with Frobenius norm loss function. ||X-WH||_F^2 Args: X (ndarray, sparse matrix): Nonnegative m by n matrix to decompose. W (ndarray): Nonnegative m by k initialization of left factor of X. H (ndarray): Nonnegative k by n initialization of right factor of X. opts (dict), optional: Dictionary or optional arguments. 'hist' (list): list to append the objective function to. 'niter' (int): number of iterations. 'pruned' (bool): indicator if the input matrix needs to be pruned of zeros. 'W_opts' (dict): options dictionary for W update 'H_opts' (dict): options dictionary for H update Returns: W (ndarray): Nonnegative m by k left factor of X. H (ndarray): Nonnegative k by n right factor of X. """ # *update algorithm if opts["algorithm"] == "MU": W_update = W_update_MU H_update = H_update_MU default_opts = { "niter": 1000, "histX": None, "hist": None, "pruned": False, "algorithm": "MU", "tol": None, "W_opts": {"niter": 1, "hist": None, "alpha": 0}, "H_opts": {"niter": 1, "hist": None, "alpha": 0}, } elif opts["algorithm"] == "ADMM": W_update = W_update_ADMM H_update = H_update_ADMM default_opts = { "niter": 1000, "histX": None, "hist": None, "pruned": False, "algorithm": "ADMM", "tol": None, "W_opts": {"niter": 1, "hist": None, "rho": 1.0}, "H_opts": {"niter": 1, "hist": None, "rho": 1.0}, } # print('Using {}'.format(opts['algorithm'])) opts = update_opts(default_opts, opts) np = get_np(X, use_gpu=use_gpu) scipy = get_scipy(X, use_gpu=use_gpu) dtype = X.dtype if np.issubdtype(dtype, np.integer): eps = np.finfo(float).eps elif np.issubdtype(dtype, np.floating): eps = np.finfo(dtype).eps else: raise Exception("Unknown data type!") W = np.maximum(W.astype(dtype), eps) H = np.maximum(H.astype(dtype), eps) if upperthres is None: upperthres = W.shape[1] #! X is a boolean matrix dtype = float Mask = X == 1 if opts["pruned"]: X, rows, cols, _ = prune(X, use_gpu=use_gpu) W = W[rows, :] H = H[:, cols] if scipy.sparse.issparse(X): Xcsc = X.T.T.tocsc() Xcsr = X.T.T.tocsr() else: pass YErr = YErrold = YErrchange = np.nan for i in range(opts["niter"]): if scipy.sparse.issparse(X): H = H_update(Xcsc, W, H, opts["H_opts"]) W = W_update(Xcsr, W, H, opts["W_opts"]) else: H = H_update(X, W, H, opts["H_opts"]) W = W_update(X, W, H, opts["W_opts"]) #! update X step # * update rule: only update X at ij location if (WH)_{ij}>1 and X_{ij} = 1 # Xhat = W@H if upperthres is not None: X = np.where(Mask, np.clip(W @ H, lowerthres, upperthres), 0) else: X = np.where(Mask, np.maximum(lowerthres, W @ H), 0) YErrold = YErr YErr = fro_norm(X - W @ H) / fro_norm(X) YErrchange = np.abs(YErr - YErrold) if opts["hist"] is not None: opts["hist"].append(YErr) if opts["histX"] is not None: # opts['histX'].append(fro_norm(Mask.astype(float) - W@H)) opts["histX"].append(W @ H) if opts["tol"] is not None: if np.abs(YErr - YErrold) < opts["tol"]: print("converge in {} iterations -- {}".format(i + 1, YErrchange)) break # * Normalize W # Wsum = np.sum(W,0,keepdims=True) # H = H * Wsum.T # W = W / Wsum if opts["pruned"]: W = unprune(W, rows, 0) H = unprune(H, cols, 1) # print('id of X = {}'.format(id(X))) # print('id of Mask = {}'.format(id(Mask))) return (W, H, X)
[docs] def old_nmf(X, W, H, lowerthres=1, upperthres=None, opts=None): """ Multiplicative update algorithm for NMF with Frobenius norm loss function. ||X-WH||_F^2 Args: X (ndarray, sparse matrix): Nonnegative m by n matrix to decompose. W (ndarray): Nonnegative m by k initialization of left factor of X. H (ndarray): Nonnegative k by n initialization of right factor of X. opts (dict), optional: Dictionary or optional arguments. 'hist' (list): list to append the objective function to. 'niter' (int): number of iterations. 'pruned' (bool): indicator if the input matrix needs to be pruned of zeros. 'W_opts' (dict): options dictionary for W update 'H_opts' (dict): options dictionary for H update Returns: W (ndarray): Nonnegative m by k left factor of X. H (ndarray): Nonnegative k by n right factor of X. """ # *update algorithm default_opts = { "niter": 1000, "histX": None, "hist": None, "pruned": False, "tol": None, "W_opts": {"niter": 1, "hist": None, "alpha": 0}, "H_opts": {"niter": 1, "hist": None, "alpha": 0}, } opts = update_opts(default_opts, opts) np = get_np(X, use_gpu=use_gpu) scipy = get_scipy(X, use_gpu=use_gpu) dtype = X.dtype if np.issubdtype(dtype, np.integer): eps = np.finfo(float).eps elif np.issubdtype(dtype, np.floating): eps = np.finfo(dtype).eps else: raise Exception("Unknown data type!") W = np.maximum(W.astype(dtype), eps) H = np.maximum(H.astype(dtype), eps) k = W.shape[1] #! X is a boolean matrix dtype = float Mask = X == 1 if opts["pruned"]: X, rows, cols = prune(X, use_gpu=use_gpu) W = W[rows, :] H = H[:, cols] if scipy.sparse.issparse(X): Xcsc = X.T.T.tocsc() Xcsr = X.T.T.tocsr() else: pass YErr = YErrold = YErrchange = np.nan for i in range(opts["niter"]): #! set adaptive alpha here if scipy.sparse.issparse(X): H = H_update_MU(Xcsc, W, H, opts["H_opts"]) W = W_update_MU(Xcsr, W, H, opts["W_opts"]) else: H = H_update_MU(X, W, H, opts["H_opts"]) W = W_update_MU(X, W, H, opts["W_opts"]) #! update X step # * update rule: only update X at ij location if (WH)_{ij}>1 and X_{ij} = 1 # Xhat = W@H if upperthres is not None: X = np.where(Mask, np.clip(W @ H, lowerthres, upperthres), 0) else: X = np.where(Mask, np.maximum(lowerthres, W @ H), 0) if opts["hist"] is not None: opts["hist"].append(fro_norm(X - W @ H)) if opts["histX"] is not None: # opts['histX'].append(fro_norm(Mask.astype(float) - W@H)) opts["histX"].append(W @ H) if opts["tol"] is not None: YErrold = YErr YErr = fro_norm(X - W @ H) / fro_norm(X) YErrchange = np.abs(YErr - YErrold) if np.abs(YErr - YErrold) < opts["tol"]: print("converge in {} iterations -- {}".format(i + 1, YErrchange)) break if opts["pruned"]: W = unprune(W, rows, 0) H = unprune(H, cols, 1) return (W, H, X)
[docs] def nmf(X, W, H, lowerthres=1, upperthres=None, opts=None, Mask=None): """ penalized nmf with adaptive alpha """ default_opts = { "niter": 1000, "histX": None, "hist": None, "pruned": True, "tol": None, "constrainttol": None, "W_opts": {"niter": 1, "hist": None, "alpha": 0.0}, "H_opts": {"niter": 1, "hist": None, "alpha": 0.0}, } opts = update_opts(default_opts, opts) np = get_np(X, use_gpu=use_gpu) scipy = get_scipy(X, use_gpu=use_gpu) dtype = X.dtype if np.issubdtype(dtype, np.integer): eps = np.finfo(float).eps elif np.issubdtype(dtype, np.floating): eps = np.finfo(dtype).eps else: raise Exception("Unknown data type!") W = np.maximum(W.astype(dtype), eps) H = np.maximum(H.astype(dtype), eps) #! X is a boolean matrix dtype = float if Mask is None: Mask = X == 1 if opts["pruned"]: X, rows, cols = prune(X, use_gpu=use_gpu) W = W[rows, :] H = H[:, cols] if scipy.sparse.issparse(X): Xcsc = X.T.T.tocsc() Xcsr = X.T.T.tocsr() else: pass YErr = YErrold = YErrchange = np.nan for i in range(opts["niter"]): #! adaptive alpha if opts["constrainttol"] is not None: opts["H_opts"]["alpha"] = np.logspace(-4, 10, opts["niter"])[i] opts["W_opts"]["alpha"] = np.logspace(-4, 10, opts["niter"])[i] if scipy.sparse.issparse(X): H = H_update_MU(Xcsc, W, H, opts["H_opts"]) W = W_update_MU(Xcsr, W, H, opts["W_opts"]) else: H = H_update_MU(X, W, H, opts["H_opts"]) W = W_update_MU(X, W, H, opts["W_opts"]) #! update X step # * update rule: only update X at ij location if (WH)_{ij}>1 and X_{ij} = 1 # Xhat = W@H if upperthres is not None: X = np.where(Mask, np.clip(W @ H, lowerthres, upperthres), 0) else: X = np.where(Mask, np.maximum(lowerthres, W @ H), 0) #!check for constraint tolerance if opts["constrainttol"] is not None: if (fro_norm(H ** 2 - H) + fro_norm(W ** 2 - W)) < opts["constrainttol"]: print("factors converge in {} iterations".format(i + 1)) break if opts["hist"] is not None: opts["hist"].append(fro_norm(Mask.astype(float) - W @ H)) if opts["histX"] is not None: # opts['histX'].append(fro_norm(Mask.astype(float) - W@H)) opts["histX"].append(fro_norm(X - W @ H) / fro_norm(X)) if opts["tol"] is not None: YErrold = YErr YErr = fro_norm(X - W @ H) / fro_norm(X) YErrchange = np.abs(YErr - YErrold) if np.abs(YErr - YErrold) < opts["tol"]: print("converge in {} iterations -- {}".format(i + 1, YErrchange)) break # * Normalize W # Wsum = np.sum(W,0,keepdims=True) # H = H * Wsum.T # W = W / Wsum if opts["pruned"]: W = unprune(W, rows, 0) H = unprune(H, cols, 1) return (W, H, X)
[docs] def thres_norm(W, H): # try to normalize BOTH W,H by dividing to column minimum Wsum = np.max(W, axis=0) Hsum = np.max(H, axis=1) # Wsum = np.max(W) # Hsum = np.max(H) W = W / Wsum H = (H.T / Hsum).T D = np.diag(Wsum * Hsum) W = W @ np.sqrt(D) H = np.sqrt(D) @ H return W, H
[docs] def old_find_thres_WH(X, Wn, Hn, method="grid_search", output="best", npoint=100): # Wn, Hn are normalized using thres_norm function if method == "W@H": # npoint = 100 k = Wn.shape[1] y = X.reshape(1, -1) x = (Wn @ Hn).reshape(1, -1) a = np.max(x[y == 0]) b = np.min(x[y == 1]) L = np.linspace(a, b, npoint) err = np.zeros((1, len(L))) # *brute force way for i in range(npoint): lambdaWH = L[i] lambdaW = lambdaWH / np.sqrt(k) lambdaH = lambdaWH / np.sqrt(k) Xhat = np.dot(Wn > lambdaW, Hn > lambdaH) err[0, i] = np.sum(X.astype(bool) ^ Xhat) I = np.argmin(err) return L[I] / np.sqrt(k), err[0, I] # optimal threshold for W and H and error elif method == "grid_search": k = Wn.shape[1] Lw = np.linspace(np.min(Wn), np.max(Wn), npoint) Lh = np.linspace(np.min(Hn), np.max(Hn), npoint) # Lw = np.unique(np.concatenate(Wn.reshape(-1,1))) # Lh = np.unique(np.concatenate(Hn.reshape(-1,1))) Wpoint = len(Lw) Hpoint = len(Lh) err = np.zeros((len(Lw), len(Lh))) for i in range(Wpoint): lambdaW = Lw[i] for j in range(Hpoint): lambdaH = Lh[j] Xhat = np.dot(Wn >= lambdaW, Hn >= lambdaH) err[i, j] = np.sum(X.astype(bool) ^ Xhat) [iw, ih] = np.unravel_index(err.argmin(), err.shape) if output == "best": return np.min(err), [Lw[iw], Lh[ih]] elif output == "grid": return err, Lw, Lh elif method == "WH": Lthres = np.unique(np.concatenate((Wn.reshape(-1, 1), Hn.reshape(-1, 1)))) Lthres.sort() err = np.zeros((1, len(Lthres))) # *brute force way for i in range(len(Lthres)): t = Lthres[i] Xhat = np.dot(Wn > t, Hn > t) err[0, i] = np.sum(X.astype(bool) ^ Xhat) I = np.argmin(err) return err[0, I], Lthres[I]
[docs] def find_thres_WH(X, Wn, Hn, output="best", npoint=None): # Wn, Hn are normalized using thres_norm function np = get_np(X) if npoint is None: Lw = np.unique(np.concatenate(Wn.reshape(-1, 1))) Lh = np.unique(np.concatenate(Hn.reshape(-1, 1))) else: Lw = np.linspace(np.min(Wn), np.max(Wn), npoint) Lh = np.linspace(np.min(Hn), np.max(Hn), npoint) Wpoint = len(Lw) Hpoint = len(Lh) err = np.zeros((len(Lw), len(Lh))) for i in range(Wpoint): lambdaW = Lw[i] for j in range(Hpoint): lambdaH = Lh[j] Xhat = np.dot(Wn >= lambdaW, Hn >= lambdaH) err[i, j] = np.mean(X.astype(bool) ^ Xhat) if output == "best": [iw, ih] = np.unravel_index(err.argmin(), err.shape) return {"error":err[iw, ih], "Lw":Lw[iw], "Lh":Lh[ih]} elif output == "grid": return {"error":err, "Lw":Lw, "Lh":Lh}
[docs] def coord_desc_thresh(X, W, H, wt=None, ht=None, max_iter=100): np = get_np(X) k = W.shape[1] if wt == None: wt = np.array([0.5 for _ in range(k)]) if ht == None: ht = np.array([0.5 for _ in range(k)]) Wrange = [np.unique(W[:, i]) for i in range(k)] Hrange = [np.unique(H[i, :]) for i in range(k)] err = 1 for j in range(max_iter): old_err = err for i in range(k): np.random.shuffle(Wrange[i]) wrange = Wrange[i] Wt = W >= wt[None, :] Ht = H >= ht[:, None] cache = ( Wt[:, [j for j in range(k) if j != i]]
[docs] @ Ht[[j for j in range(k) if j != i], :] ) this_component = (W[None, :, i] >= wrange[:, None])[:, :, None] @ Ht[[i], :] Ys = np.logical_or(cache, this_component) errors = np.sum(np.logical_xor(X, Ys), axis=(1, 2)) / X.size wt[i] = wrange[np.argmin(errors)] # * update H np.random.shuffle(Hrange[i]) hrange = Hrange[i] Ht = H >= ht[:, None] Wt = W >= wt[None, :] cache = ( Wt[:, [j for j in range(k) if j != i]] @ Ht[[j for j in range(k) if j != i], :] ) this_component = Wt[:, [i]] @ (H[None, i, :] >= hrange[:, None])[:, None, :] Ys = np.logical_or(cache, this_component) errors = np.sum(np.logical_xor(X, Ys), axis=(1, 2)) / X.size ht[i] = hrange[np.argmin(errors)] # * error after one sweep err = np.min(errors) # * stopping criteria if err == old_err: # print('converged in num iter = {}, err = {}'.format(j, np.min(errors))) pass return err, wt, ht
def coord_desc_thresh_onefactor(X, W, H, max_iter=100): # * only threshold W given H. Use transpose to threshold H np = get_np(X) k = W.shape[1] wt = np.array([0.5 for _ in range(k)]) Wrange = [np.unique(W[:, i]) for i in range(k)] err = 1 for _ in range(max_iter): old_err = err for i in range(k): np.random.shuffle(Wrange[i]) wrange = Wrange[i] Wt = W >= wt[None, :] Ht = H.astype(bool) cache = ( Wt[:, [j for j in range(k) if j != i]]
[docs] @ Ht[[j for j in range(k) if j != i], :] ) this_component = (W[None, :, i] >= wrange[:, None])[:, :, None] @ Ht[[i], :] Ys = np.logical_or(cache, this_component) errors = np.sum(np.logical_xor(X, Ys), axis=(1, 2)) / X.size wt[i] = wrange[np.argmin(errors)] # * error after one sweep err = np.min(errors) return err, wt
def roc_W_H(X, W, H): Lthres = np.unique(np.concatenate((W.reshape(-1, 1), H.reshape(-1, 1)))) Lthres.sort() fpr = np.zeros((len(Lthres), 1)) tpr = np.zeros((len(Lthres), 1)) for j in range(len(Lthres)): t = Lthres[j] Xhat = np.dot(W > t, H > t) Xhat = Xhat.astype(float) # * compute TP, TN, FP, FN tp = np.sum((X + Xhat) == 2) tn = np.sum((X + Xhat) == 0) fp = np.sum((X - Xhat) == -1) fn = np.sum((X - Xhat) == 1) fpr[j] = fp / (fp + tn) tpr[j] = tp / (tp + fn) return fpr, tpr, Lthres