Source code for TELF.post_processing.Fox.fox

# fox/core.py

import os
import numpy as np
import pandas as pd
from pathlib import Path
from .clustering_analyzer import ClusteringAnalyzer
from .visualizer import VisualizationManager
from .utils import check_path
from .openAI_summaries import label_clusters_openAI

[docs] class Fox: def __init__(self, summary_model=None, api_key=None, verbose=False, debug=False): self.summary_model = summary_model self.api_key = api_key self.verbose = verbose self.debug = debug self.cluster_analyzer = ClusteringAnalyzer(verbose=verbose) self.summarizer = ClusterSummarizer(api_key=api_key, summary_model=summary_model) self.visualizer = VisualizationManager(verbose=verbose) self.dir_manager = DirectoryManager()
[docs] def post_process(self, npz_path: str, vocabulary_path: str, src_decomp_data_path: str, output_dir: str = None, top_words_per_cluster: int = 50, clean_cols_name: str = "clean_title_abstract", terms: list = None) -> str: if not os.path.exists(npz_path): raise FileNotFoundError(f'File "{npz_path}" not found.') if not os.path.exists(vocabulary_path): raise FileNotFoundError('Vocabulary file not found!') if not os.path.exists(src_decomp_data_path): raise FileNotFoundError('Source data file for decomposition not found!') df = pd.read_csv(src_decomp_data_path) if clean_cols_name not in df: raise ValueError(f'{clean_cols_name} column is missing in the source data.') k = npz_path.split(".npz")[0].split("=")[-1] output_dir = output_dir or os.path.join(os.path.dirname(npz_path), f"k={k}") output_dir = check_path(output_dir) archive_dir = os.path.join(output_dir, 'archive') os.makedirs(archive_dir, exist_ok=True) data = np.load(npz_path, allow_pickle=True) W, H = data['W'], data['H'] vocab = np.array(open(vocabulary_path).read().splitlines()) post_processed_df_path = self.cluster_analyzer.analyze_clusters( H, W, vocab, df, output_dir, archive_dir, top_words_per_cluster, clean_cols_name, terms) return post_processed_df_path
[docs] def post_process_stats(self, processing_path: str, clean_cols_name: str = "clean_title_abstract"): self.visualizer.generate_all_statistics(processing_path, clean_cols_name)
[docs] def makeSummariesAndLabelsOpenAi(self, processing_path: str): self.summarizer.generate_labels_and_summaries(processing_path)
[docs] def rename_cluster_dirs_from_stats(self, processing_path: str): self.dir_manager.rename_from_stats(processing_path)
[docs] def getApiKey(self): return self.api_key
[docs] def setApiKey(self, api_key): self.api_key = api_key
[docs] def getSummaryModel(self): return self.summary_model
[docs] def setSummaryModel(self, summary_model): self.summary_model = summary_model
[docs] class ClusterSummarizer: def __init__(self, api_key=None, summary_model=None): self.api_key = api_key self.summary_model = summary_model
[docs] def generate_labels_and_summaries(self, processing_path: str): df = pd.read_csv(processing_path) base_dir = Path(processing_path).parent cluster_summaries = {c: '' for c in sorted(df.cluster.dropna().unique())} # Get labels via OpenAI top_words_df = pd.read_csv(base_dir / 'top_words.csv') cluster_labels = label_clusters_openAI( top_words_df, api_key=self.api_key, open_ai_model=self.summary_model, embedding_model='SCINCL', df=df ) # Combine into dataframe summary_df = pd.DataFrame({ 'cluster': list(cluster_summaries.keys()), 'label': [cluster_labels.get(k, '') for k in cluster_summaries.keys()], 'summary': [cluster_summaries[k] for k in cluster_summaries.keys()] }) summary_df.to_csv(base_dir / 'cluster_summaries.csv', index=False)
[docs] class DirectoryManager: def __init__(self): pass
[docs] def rename_from_stats(self, post_processed_df_path: str): """ Renames cluster directories using label and document count from stats.csv. Format: <cluster>-<label>_<num_papers>-documents OR if no label: <cluster>-unlabeled_<num_papers>-documents """ base_dir = Path(post_processed_df_path).parent stats_path = base_dir / 'stats.csv' if not stats_path.exists(): raise FileNotFoundError(f"Could not find stats.csv at {stats_path}") stats_df = pd.read_csv(stats_path) # Ensure clusters are numeric and not NaN stats_df = stats_df[pd.to_numeric(stats_df['cluster'], errors='coerce').notna()] stats_df['cluster'] = stats_df['cluster'].astype(float) for _, row in stats_df.iterrows(): cluster_float = row['cluster'] cluster_name = str(int(cluster_float)) # "3.0" → "3" cluster_dir = base_dir / cluster_name if not cluster_dir.exists(): print(f"[WARN] Cluster directory not found: {cluster_dir}") continue # Label logic label = str(row.get('label', '')).strip() if not label or label.lower() == 'nan': label = 'unlabeled' else: label = label[:30].replace(' ', '_') # Document count logic num_papers = row.get('num_papers', 0) if pd.isna(num_papers): num_papers = 0 new_name = f"{cluster_name}-{label}_{int(num_papers)}-documents" new_dir = base_dir / new_name try: cluster_dir.rename(new_dir) print(f"[OK] Renamed {cluster_dir.name}{new_name}") except Exception as e: print(f"[ERROR] Failed to rename {cluster_dir}{new_dir}: {e}")