Source code for pyCP_APR.applications.tensor_anomaly_detection_v2

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
tensor_anomaly_detection_v2.py performs p-value scoring over the tensor decomposition, i.e.
the KRUSKAL tensor M. The calculated p-values are used to detect anomalies.\n
This method was introduced by Eren et al. in [1].\n
The second version performs faster calculation of the inner products of the
components to extract the lambdas.\n
This version also provides dimension fusion methods for lambda calculations.

References
========================================
[1] M. E. Eren, J. S. Moore and B. S. Alexandro, "Multi-Dimensional Anomalous Entity Detection via Poisson Tensor Factorization," 2020 IEEE International Conference on Intelligence and Security Informatics (ISI), 2020, pp. 1-6, doi: 10.1109/ISI49825.2020.9280524.

@author: Juston S. Moore, Maksim Ekin Eren
"""
from . ktensor_utils import get_X_hat
from . ktensor_utils import get_X_size
import numpy as np
import numpy_indexed as npi
from collections import OrderedDict
from scipy.special import logsumexp
from scipy.stats import combine_pvalues
from scipy.stats import chi2
from sklearn.metrics import roc_auc_score, auc, precision_recall_curve
import pandas as pd


[docs]class PoissonTensorAnomaly_v2(): def __init__(self, components, indicies, tensor_weights=[1]): """ Initilize the anomaly detection class.\n Calculates the lambdas, and obtains tensor information. Parameters ---------- components : dict KRUSKAL Tensor M in dict format. indicies : array Non-zero coordinates. tensor_weights : list, optional Weight of each lambda for the tensors.\n Used only when ensemble of tensors used in lambda calculations. The default is [1]. """ temp_lamb = list() for ii, tw in enumerate(tensor_weights): l = get_X_hat(components[ii], indicies) temp_lamb.append(l * tensor_weights[ii]) self.lambdas = sum(temp_lamb) self.indices = indicies self.tensor_shape = get_X_size(components) self.fusion_scores = None self.link_prediction_scores = dict() self.fusion_preds = dict()
[docs] def get_lambdas(self): """ Returns the lambda values that are calculated. Returns ------- lambdas : array Array of lambda values for the indices. """ return self.lambdas
[docs] def get_dimension_fusion_scores(self, axis_map, y_true): """ Calculates the prediction scores given fuzed lambdas and the true labels y.\n Fusion is performed for the dimension in axis_map. Parameters ---------- axis_map : list Which dimensions to fuse. y_true : list List of true labels for each entry. Returns ------- df : Pandas DataFrame Fusion scores. """ log_pval = logsumexp(a=[-self.lambdas, np.zeros_like(self.lambdas)], b=np.array([[-1., 1.]] * self.lambdas.shape[0]).T, axis=0) self.fusion_preds['harmonic_allLinks'] = dict() self.fusion_preds['harmonic_observedLinks'] = dict() self.fusion_preds['fisher_allLinks'] = dict() self.fusion_preds['fisher_observedLinks'] = dict() results = OrderedDict() results = OrderedDict([ (k, { 'y_true': dict(zip( ('keys', 'labels'), zip(*npi.group_by(keys=self.indices[:, axes], values=y_true, reduction=np.max)) )), 'y_pred': OrderedDict([ ('harmonic_allLinks', self.__fuse_pval(self.indices, log_pval, tensor_shape=self.tensor_shape, fuse_to_axes=axes, fuser=self.__log_harmonic_mean, set_N=True)), #('harmonic_observedLinks', self.__fuse_pval(self.indices, log_pval, tensor_shape=self.tensor_shape, fuse_to_axes=axes, fuser=self.__log_harmonic_mean, set_N=False)), #('fisher_allLinks', self.__fuse_pval(self.indices, log_pval, tensor_shape=self.tensor_shape, fuse_to_axes=axes, fuser=self.__fisher_fusion, set_N=True)), #('fisher_observedLinks', self.__fuse_pval(self.indices, log_pval, tensor_shape=self.tensor_shape, fuse_to_axes=axes, fuser=self.__fisher_fusion, set_N=False)) ]) }) for k, axes in axis_map.items() ]) y_true = OrderedDict() for fusion in axis_map.keys(): keys_0 = np.array(results[fusion]['y_true']['keys']) assert np.all(np.array(results[fusion]['y_true']['keys']) == keys_0) labels_0 = np.array(results[fusion]['y_true']['labels']) assert np.all(np.array(results[fusion]['y_true']['labels']) == labels_0) y_true[fusion] = labels_0 performance = {} for fusion, v2 in results.items(): for method, y_pred in v2['y_pred'].items(): # Remove any scores of -inf mask = np.isneginf(y_pred) y_pred[mask] = np.min(y_pred[~mask]).min() - 10. performance[(fusion, method, 'ROC-AUC')] = roc_auc_score(y_true[fusion], -y_pred) precision, recall, _ = precision_recall_curve(y_true[fusion], -y_pred, pos_label=1) performance[(fusion, method, 'PR-AUC')] = auc(recall, precision) self.fusion_preds[str(method)][str(fusion)] = dict() self.fusion_preds[str(method)][str(fusion)]['y_pred'] = y_pred self.fusion_preds[str(method)][str(fusion)]['y_true'] = y_true[fusion] df = pd.DataFrame( data=[ list(k) + [v] for k, v in performance.items() ], columns=[ 'fusion', 'method', 'metric', 'score' ] ) self.fusion_scores = df return df
def __preproc_dec(fn): """ Wraps the log array into a function to be used in fusion. Parameters ---------- fn : function Function to be wrapped. Returns ------- function Wrapped function to be used during fusion. """ def wrap(self, log_arr, axis=None, N=None, **kwargs): log_arr = np.asarray(log_arr, dtype=np.float64) if axis is None: assert log_arr.ndim == 1 axis = 0 if N is None: N = log_arr.shape[axis] N_extra = N - log_arr.shape[axis] assert N_extra >= 0 return fn(self, log_arr, axis=axis, N=N, N_extra=N_extra, **kwargs) return wrap @__preproc_dec def __log_harmonic_mean(self, log_arr, axis, N, N_extra): """ Fuses dimensions using Log of Harmonic Mean. Parameters ---------- log_arr : array Log of lambdas. axis : list which dimensions to fuse. N : int Number of elements in current dimension. N_extra : int Number of elements in other dimensions. Returns ------- array Harmonic mean fusion results. """ if N_extra > 0: sh = list(log_arr.shape) sh[axis] = 1 log_arr = np.concatenate((log_arr, np.ones(sh) * np.log(N_extra)), axis=axis) return np.log(N) - logsumexp(-log_arr, axis=axis) @__preproc_dec def __fisher_fusion(self, log_arr, axis, N, N_extra): """ Fuses dimensions using Fisher Fusion. Parameters ---------- log_arr : array Log of lambdas. axis : list which dimensions to fuse. N : int Number of elements in current dimension. N_extra : int Number of elements in other dimensions. Returns ------- array Fisher fusion results. """ statistic = -2 * np.sum(log_arr, axis=axis) return chi2.logsf(statistic, 2 * N) def __fuse_pval(self, indices, log_pval, fuse_to_axes, fuser, set_N, tensor_shape): """ Perform p-value fusion given the indices and the log of p-values. Parameters ---------- indices : array Array of non-zero indices. log_pval : array Log of p-values for the given indices. fuse_to_axes : list which modes to fuse. fuser : function Fusion function that is used. set_N : bool If true, calculates fusion for all links. Otherwise, calculates only for the observed links. tensor_shape : array Shape of the tensor X, i.e. size of each mode. Returns ------- array p-value fusion of the dimensions. """ if set_N: raw_fuser = fuser tensor_shape = np.array(tensor_shape) N = np.prod(tensor_shape) / np.prod(tensor_shape[fuse_to_axes]) fuser = lambda log_arr: raw_fuser(log_arr, N=N) result = npi.group_by(keys=indices[:, fuse_to_axes], values=log_pval, reduction=fuser) return np.array([x[1] for x in result])