"""
job-schedule - 200
job-complete - 300
signal-exit - 400
"""
from .NMFk import NMFk
from pathlib import Path
import numpy as np
import uuid
import os
import time
import sys
import pickle
import warnings
from .utilities.hpc_comm_helpers import signal_workers_exit, worker_check_exit_status, get_next_job_at_worker, collect_results_from_workers, send_job_to_worker_nodes
try:
from mpi4py import MPI
except:
MPI = None
[docs]
class OnlineNode():
def __init__(self,
node_path:str,
parent_node:object,
child_nodes:list,
) -> None:
self.node_path = node_path
self.parent_node = parent_node
self.child_nodes = child_nodes
self.node_data = None
def __call__(self, persistent=False):
if persistent:
if self.node_data is None:
self.node_data = pickle.load(open(self.node_path, "rb"))
return self.node_data
else:
return pickle.load(open(self.node_path, "rb"))
[docs]
class Node():
def __init__(self,
node_name: str,
depth: int,
parent_topic: int,
W: np.ndarray, H: np.ndarray, k: int,
parent_node_name: str,
child_node_names: list,
original_indices: np.ndarray,
num_samples:int,
leaf:bool,
user_node_data:dict
):
self.node_name = node_name
self.depth = depth
self.W = W
self.H = H
self.k = k
self.parent_topic = parent_topic
self.parent_node_name = parent_node_name
self.child_node_names = child_node_names
self.original_indices = original_indices
self.num_samples = num_samples
self.leaf = leaf
self.user_node_data = user_node_data
[docs]
class HNMFk():
def __init__(self,
nmfk_params=[{}],
cluster_on="H",
depth=1,
sample_thresh=-1,
Ks_deep_min=1,
Ks_deep_max=None,
Ks_deep_step=1,
K2=False,
experiment_name="HNMFk_Output",
generate_X_callback=None,
n_nodes=1,
verbose=True,
comm_buff_size=10000000,
):
"""
HNMFk is a Hierarchical Non-negative Matrix Factorization module with the capability to do automatic model determination.
Parameters
----------
nmfk_params : list of dicts, optional
We can specify NMFk parameters for each depth, or use same for all depth.\n
If there is single items in ``nmfk_params``, HMMFk will use the same NMFk parameters for all depths.\n
When using for each depth, append to the list. For example, [nmfk_params0, nmfk_params1, nmfk_params2] for depth of 2
The default is ``[{}]``, which defaults to NMFk with defaults with required ``params["collect_output"] = False``, ``params["save_output"] = True``, and ``params["predict_k"] = True`` when ``K2=False``.
cluster_on : str, optional
Where to perform clustering, can be W or H. Ff W, row of X should be samples, and if H, columns of X should be samples.
The default is "H".
depth : int, optional
How deep to go in each topic after root node. if -1, it goes until samples cannot be seperated further. The default is 1.
sample_thresh : int, optional
Stopping criteria for num of samples in the cluster.
When -1, this criteria is not used.
The default is -1.
Ks_deep_min : int, optional
After first nmfk, when selecting Ks search range, minimum k to start. The default is 1.
Ks_deep_max : int, optinal
After first nmfk, when selecting Ks search range, maximum k to try.\n
When None, maximum k will be same as k selected for parent node.\n
The default is None.
Ks_deep_step : int, optional
After first nmfk, when selecting Ks search range, k step size. The default is 1.
K2 : bool, optional
If K2=True, decomposition is done only for k=2 instead of finding and predicting the number of stable latent features. The default is False.
experiment_name : str, optional
Where to save the results.
generate_X_callback : object, optional
This can be used to re-generate the data matrix X before each NMFk operation. When not used, slice of original X is taken, which is equal to serial decomposition.\n
``generate_X_callback`` object should be a class with ``def __call__(original_indices)`` defined so that ``new_X, save_at_node=generate_X_callback(original_indices)`` can be done.\n
``original_indices`` hyper-parameter is the indices of samples (columns of original X when clustering on H).\n
Here ``save_at_node`` is a dictionary that can be used to save additional information in each node's ``user_node_data`` variable.
The default is None.
n_nodes : int, optional
Number of HPC nodes. The default is 1.
verbose : bool, optional
If True, it prints progress. The default is True.
Returns
-------
None.
"""
# user defined settings
self.sample_thresh = sample_thresh
self.depth = depth
self.cluster_on = cluster_on
self.Ks_deep_min = Ks_deep_min
self.Ks_deep_max = Ks_deep_max
self.Ks_deep_step = Ks_deep_step
self.K2 = K2
self.experiment_name = experiment_name
self.generate_X_callback = generate_X_callback
self.n_nodes = n_nodes
self.verbose = verbose
self.comm_buff_size = comm_buff_size
organized_nmfk_params = []
for params in nmfk_params:
organized_nmfk_params.append(self._organize_nmfk_params(params))
self.nmfk_params = organized_nmfk_params
# object variables
self.X = None
self.total_exec_seconds = 0
self.num_features = 0
self._all_nodes = []
self.iterator = None
self.root = None
self.root_name = ""
self.node_save_paths = {}
# path to save
self.experiment_save_path = os.path.join(self.experiment_name)
try:
if not Path(self.experiment_save_path).is_dir():
Path(self.experiment_save_path).mkdir(parents=True)
except Exception as e:
print(e)
# HPC job management variables
self.target_jobs = {}
self.node_status = {}
assert self.cluster_on in ["W", "H"], "Unknown clustering method!"
assert (self.n_nodes > 1 and MPI is not None) or (self.n_nodes ==
1), "n_nodes was greater than 1 but MPI is not installed!"
[docs]
def fit(self, X, Ks, from_checkpoint=False, save_checkpoint=False):
"""
Factorize the input matrix ``X`` for the each given K value in ``Ks``.
Parameters
----------
X : ``np.ndarray`` or ``scipy.sparse._csr.csr_matrix`` matrix
Input matrix to be factorized.
Ks : list
List of K values to factorize the input matrix.\n
**Example:** ``Ks=range(1, 10, 1)``.
from_checkpoint : bool, optional
If True, it continues the process from the checkpoint. The default is False.
save_checkpoint : bool, optional
If True, it saves checkpoints. The default is False.
Returns
-------
None
"""
#
# Job scheduling setup
#
# multi-node processing
if self.n_nodes > 1:
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# single node processing
else:
comm = None
rank = 0
#
# Checkpointing and job setup
#
if from_checkpoint:
checkpoint_file = self._load_checkpoint_file()
else:
checkpoint_file = {}
if from_checkpoint and len(checkpoint_file) > 0:
if self.verbose:
print("Continuing from checkpoint...")
if rank == 0:
self._load_checkpoint(checkpoint_file)
# setting up for a new job
elif not from_checkpoint or len(checkpoint_file) == 0:
if rank == 0:
if self.cluster_on == "W":
original_indices = np.arange(0, X.shape[0], 1)
elif self.cluster_on == "H":
original_indices = np.arange(0, X.shape[1], 1)
self.root_name = str(uuid.uuid1())
self.target_jobs[self.root_name] = {
"parent_node_name":"None",
"node_name":self.root_name,
"Ks":Ks,
"original_indices":original_indices,
"depth":0,
"parent_topic":None
}
# organize node status
if self.n_nodes > 1:
self.node_status = {}
for ii in range(1, self.n_nodes, 1):
self.node_status[ii] = {"free":True, "job":None}
else:
self.node_status = {}
self.node_status[0] = {"free": True, "job":None}
# save data matrix
self.X = X
if self.cluster_on == "W":
self.num_features = X.shape[1]
elif self.cluster_on == "H":
self.num_features = X.shape[0]
else:
raise Exception("Unknown clustering method!")
# wait for everyone
start_time = time.time()
if self.n_nodes > 1:
comm.Barrier()
#
# Run HNMFk
#
while True:
# check exit status
if len(self.target_jobs) == 0 and rank == 0 and all([info["free"] for _, info in self.node_status.items()]):
if self.n_nodes > 1:
signal_workers_exit(comm, self.n_nodes)
break
#
# worker nodes check exit status
#
if self.n_nodes > 1:
worker_check_exit_status(rank, comm)
#
# send job to worker nodes
#
if rank == 0 and self.n_nodes > 1:
send_job_to_worker_nodes(
comm, self.target_jobs, self.node_status
)
#
# recieve jobs from rank 0 at worker nodes
#
elif rank != 0 and self.n_nodes > 1:
job_data, job_flag = get_next_job_at_worker(
rank, comm, self.comm_buff_size
)
#
# single node job schedule
#
else:
available_jobs = list(self.target_jobs.keys())
next_job = available_jobs.pop(0)
job_data = self.target_jobs[next_job]
job_flag = True
# do the job
if (rank != 0 and job_flag) or (self.n_nodes == 1 and rank == 0):
# process the current node
node_results = self._process_node(**job_data)
# send worker node results to root
if self.n_nodes > 1 and rank != 0:
req = comm.isend(node_results, dest=0, tag=int(f'300{rank}'))
req.wait()
# collect results at root
elif rank == 0 and self.n_nodes > 1:
all_node_results = collect_results_from_workers(
rank, comm, self.n_nodes,
self.node_status, self.comm_buff_size
)
if len(all_node_results) == 0:
continue
# process results for job scheduling
if rank == 0:
if self.n_nodes > 1:
for node_results in all_node_results:
self._process_results(node_results, save_checkpoint=save_checkpoint)
else:
self._process_results(node_results, save_checkpoint=save_checkpoint)
# total execution time
total_exec_seconds = time.time() - start_time
self.total_exec_seconds = total_exec_seconds
# prepare online iterator
self.root = OnlineNode(
node_path=self.node_save_paths[self.root_name],
parent_node=None,
child_nodes=[]
)
self.iterator = self.root
self._prepare_iterator(self.root)
if self.verbose:
print("Done")
return {"time":total_exec_seconds}
def _prepare_iterator(self, node):
node_object = pickle.load(open(node.node_path, "rb"))
child_node_names = node_object.child_node_names
for child_name in child_node_names:
child_node = OnlineNode(
node_path=self.node_save_paths[child_name],
parent_node=node,
child_nodes=[])
node.child_nodes.append(child_node)
self._prepare_iterator(child_node)
return
def _process_node(self, Ks, depth, original_indices, node_name, parent_node_name, parent_topic):
# where to save current node
try:
node_save_path = os.path.join(self.experiment_name, "depth_"+str(depth), node_name)
try:
if not Path(node_save_path).is_dir():
Path(node_save_path).mkdir(parents=True)
except Exception as e:
print(e)
except Exception as e:
print(e)
#
# Create a node object
#
target_jobs = []
current_node = Node(
node_name=node_name,
depth=depth,
parent_topic=parent_topic,
parent_node_name=parent_node_name,
original_indices=original_indices,
num_samples=len(original_indices),
child_node_names=[],
user_node_data={},
leaf=False,
W=None, H=None,
k=None,
)
#
# check if leaf node status based on number of samples
#
if (current_node.num_samples == 1):
current_node.leaf = True
pickle_path = f'{node_save_path}/node_{current_node.node_name}.p'
pickle.dump(current_node, open(pickle_path, "wb"))
return {"name":node_name, "target_jobs":[], "node_save_path":pickle_path}
#
# obtain the current X
#
if self.generate_X_callback is None or current_node.depth == 0:
save_at_node = {}
if self.cluster_on == "W":
curr_X = self.X[current_node.original_indices]
elif self.cluster_on == "H":
curr_X = self.X[:, current_node.original_indices]
else:
curr_X, save_at_node = self.generate_X_callback(current_node.original_indices)
current_node.user_node_data = save_at_node.copy()
#
# Based on number of features or samples, no seperation possible
#
if min(curr_X.shape) <= 1:
current_node.leaf = True
pickle_path = f'{node_save_path}/node_{current_node.node_name}.p'
pickle.dump(current_node, open(pickle_path, "wb"))
return {"name":node_name, "target_jobs":[], "node_save_path":pickle_path}
#
# prepare the current nmfk parameters
#
if current_node.depth >= len(self.nmfk_params):
select_params = -1
else:
select_params = current_node.depth
curr_nmfk_params = self.nmfk_params[select_params % len(self.nmfk_params)]
curr_nmfk_params["save_path"] = node_save_path
#
# check for K range
#
Ks = self._adjust_curr_Ks(curr_X.shape, Ks)
if len(Ks) == 0 or (len(Ks) == 1 and Ks[0] < 2):
current_node.leaf = True
pickle_path = f'{node_save_path}/node_{current_node.node_name}.p'
pickle.dump(current_node, open(pickle_path, "wb"))
return {"name":node_name, "target_jobs":[], "node_save_path":pickle_path}
#
# apply nmfk
#
model = NMFk(**curr_nmfk_params)
results = model.fit(curr_X, Ks, name=f'NMFk-{node_name}')
#
# Check if decomposition was not possible
#
if results is None:
current_node.leaf = True
pickle_path = f'{node_save_path}/node_{current_node.node_name}.p'
pickle.dump(current_node, open(pickle_path, "wb"))
return {"name":node_name, "target_jobs":[], "node_save_path":pickle_path}
#
# latent factors
#
if self.K2:
factors_data = np.load(f'{model.save_path_full}/WH_k=2.npz')
current_node.W = factors_data["W"]
current_node.H = factors_data["H"]
current_node.k = 2
else:
predict_k = results["k_predict"]
factors_data = np.load(f'{model.save_path_full}/WH_k={predict_k}.npz')
current_node.W = factors_data["W"]
current_node.H = factors_data["H"]
current_node.k = predict_k
# sample threshold check for leaf node determination
if self.sample_thresh > 0 and (current_node.num_samples <= self.sample_thresh):
current_node.leaf = True
pickle_path = f'{node_save_path}/node_{current_node.node_name}.p'
pickle.dump(current_node, open(pickle_path, "wb"))
return {"name":node_name, "target_jobs":[], "node_save_path":pickle_path}
#
# apply clustering
#
if self.cluster_on == "W":
cluster_labels = np.argmax(current_node.W, axis=1)
clusters = np.arange(0, current_node.W.shape[1], 1)
elif self.cluster_on == "H":
cluster_labels = np.argmax(current_node.H, axis=0)
clusters = np.arange(0, current_node.H.shape[0], 1)
# obtain the unique number of clusters that samples falls to
n_clusters = len(set(cluster_labels))
# leaf node based on depth limit or single cluster or all samples in same cluster
if ((current_node.depth >= self.depth) and self.depth > 0) or current_node.k == 1 or n_clusters == 1:
current_node.leaf = True
pickle_path = f'{node_save_path}/node_{current_node.node_name}.p'
pickle.dump(current_node, open(pickle_path, "wb"))
return {"name":node_name, "target_jobs":[], "node_save_path":pickle_path}
#
# go through each topic/cluster
#
for c in clusters:
# current cluster samples
cluster_c_indices = np.argwhere(cluster_labels == c).flatten()
# empty cluster
if len(cluster_c_indices) == 0:
continue
extracted_indicies = [current_node.original_indices[i] for i in cluster_c_indices]
# save current results
next_name = str(uuid.uuid1())
current_node.child_node_names.append(next_name)
# prepare next job
next_job = {
"parent_node_name":node_name,
"node_name":next_name,
"Ks":self._get_curr_Ks(node_k=current_node.k, num_samples=len(extracted_indicies)),
"original_indices":extracted_indicies.copy(),
"depth":current_node.depth+1,
"parent_topic":c,
}
target_jobs.append(next_job)
# save the node
pickle_path = f'{node_save_path}/node_{current_node.node_name}.p'
pickle.dump(current_node, open(pickle_path, "wb"))
return {"name":node_name, "target_jobs":target_jobs, "node_save_path":pickle_path}
def _adjust_curr_Ks(self, X_shape, Ks):
if max(Ks) >= min(X_shape):
try:
Ks = range(1, min(X_shape), self.Ks_deep_step)
except Exception as e:
print(e)
return []
return Ks
def _get_curr_Ks(self, node_k, num_samples):
if not self.K2:
if self.Ks_deep_max is None:
k_max = node_k + 1
else:
k_max = self.Ks_deep_max + 1
new_max_K = min(k_max, min(num_samples, self.num_features))
new_Ks = range(self.Ks_deep_min, new_max_K, self.Ks_deep_step)
else:
new_Ks = [2]
return new_Ks
[docs]
def traverse_nodes(self):
"""
Graph iterator. Returns all nodes in list format.\n
This operation will load each node persistently into the memory.
Returns
-------
data : list
List of all nodes where each node is a dictionary.
"""
self._all_nodes = []
self._get_traversal(self.root)
return_data = self._all_nodes.copy()
self._all_nodes = []
return return_data
def _get_traversal(self, node):
for nn in node.child_nodes:
self._get_traversal(nn)
data = vars(node(persistent=True)).copy()
self._all_nodes.append(data)
[docs]
def go_to_root(self):
"""
Graph iterator. Goes to root node.\n
This operation is online, only one node is kept in the memory at a time.
Returns
-------
data : dict
Dictionary format of node.
"""
self.iterator = self.root
data = vars(self.iterator()).copy()
return data
[docs]
def get_node(self):
"""
Graph iterator. Returns the current node.\n
This operation is online, only one node is kept in the memory at a time.
Returns
-------
data : dict
Dictionary format of node.
"""
data = vars(self.iterator()).copy()
return data
[docs]
def go_to_parent(self):
"""
Graph iterator. Goes to the parent of current node.\n
This operation is online, only one node is kept in the memory at a time.
Returns
-------
data : dict
Dictionary format of node.
"""
if self.iterator is not None:
self.iterator = self.iterator.parent_node
data = vars(self.iterator()).copy()
return data
else:
print("At the root! There is no parents.")
[docs]
def go_to_children(self, idx: int):
"""
Graph iterator. Goes to the child node specified by index.\n
This operation is online, only one node is kept in the memory at a time.
Parameters
----------
idx : int
Child index.
Returns
-------
data : dict
Dictionary format of node.
"""
try:
self.iterator = self.iterator.child_nodes[idx]
data = vars(self.iterator()).copy()
return data
except:
print("Children at index "+str(idx)+" from the current does not exist!")
def _organize_nmfk_params(self, params):
#
# Required
#
params["collect_output"] = False
params["save_output"] = True
params["n_nodes"] = 1
if not self.K2:
params["predict_k"] = True
return params
def _process_results(self, node_results, save_checkpoint):
# remove the job
del self.target_jobs[node_results["name"]]
# save node save paths
self.node_save_paths[node_results["name"]] = node_results["node_save_path"]
# save
for next_job in node_results["target_jobs"]:
self.target_jobs[next_job["node_name"]] = next_job
# checkpointing
if save_checkpoint:
self._save_checkpoint()
def _load_checkpoint_file(self):
try:
saved_class_params = pickle.load(
open(os.path.join(self.experiment_save_path, "checkpoint.p"), "rb")
)
return saved_class_params
except Exception:
warnings.warn("No checkpoint file found!")
return {}
def _load_checkpoint(self, saved_class_params):
if self.verbose:
print("Loading saved object state from checkpoint...")
self._set_params(saved_class_params)
def _set_params(self, class_parameters):
"""Sets class variables from the loaded checkpoint"""
for parameter, value in class_parameters.items():
setattr(self, parameter, value)
def _save_checkpoint(self):
class_params = vars(self).copy()
del class_params["X"]
if self.generate_X_callback is not None:
del class_params["generate_X_callback"]
pickle.dump(class_params, open(os.path.join(
self.experiment_save_path, "checkpoint.p"), "wb"))