Source code for TELF.applications.Bunny.auto_bunny

import os
import re
import sys
import pathlib
import pandas as pd
from dataclasses import dataclass, field

from .bunny import Bunny, BunnyFilter
from ..Cheetah import Cheetah
from ...pre_processing.iPenguin.Scopus import Scopus
from ...pre_processing.iPenguin.SemanticScholar import SemanticScholar
from ...pre_processing.Vulture import Vulture
from ...helpers.file_system import check_path_var as check_path

[docs] @dataclass class AutoBunnyStep: """Class for keeping track of AutoBunny args""" modes: list max_papers: int = 2000 hop_priority: str = 'random' cheetah_settings: dict = field(default_factory = lambda: {'query': None}) vulture_settings: list = field(default_factory = lambda: [])
[docs] class AutoBunny: CHEETAH_INDEX = { 'title': None, 'abstract': 'clean_title_abstract', 'year': 'year', 'author_ids': 'author_ids', 'affiliations': 'affiliations', 'country': 'affiliations', } def __init__(self, core, s2_key=None, scopus_keys=None, output_dir=None, cache_dir=None, cheetah_index=None, verbose=False, use_vulture_cheetah=True): self.core = core self.s2_key = s2_key self.scopus_keys = scopus_keys self.output_dir = output_dir self.cache_dir = cache_dir self.cheetah_index = cheetah_index self.verbose = verbose self.use_vulture_cheetah = use_vulture_cheetah if self.verbose: print("[AutoBunny.__init__] Initialized") print(f" s2_key={self.s2_key}") print(f" scopus_keys={self.scopus_keys}") print(f" output_dir={self.output_dir}") print(f" cache_dir={self.cache_dir}") print(f" cheetah_index={self.cheetah_index}") print(f" core_rows={len(self.core)}")
[docs] def run(self, steps, *, s2_key=None, scopus_keys=None, cheetah_index=None, max_papers=250000, checkpoint=True, filter_type:str=None, # must be a key from Bunny.FILTERS filter_value=None): # validate input if not isinstance(steps, (list, tuple)): steps = [steps] for i,x in enumerate(steps): if not isinstance(x, AutoBunnyStep): raise ValueError(f'Step at index {i} in `steps` is not valid') if self.verbose: print(f"[AutoBunny.run] Steps to run: {len(steps)}") for idx, s in enumerate(steps): print(f" Step {idx}: modes={s.modes}, max_papers={s.max_papers}, hop_priority={s.hop_priority}, " f"cheetah_settings={s.cheetah_settings}, vulture_settings={s.vulture_settings}") if s2_key is not None: if self.verbose: print(f"[AutoBunny.run] Overriding s2_key") self.s2_key = s2_key if scopus_keys is not None: if self.verbose: print(f"[AutoBunny.run] Overriding scopus_keys") self.scopus_keys = scopus_keys if cheetah_index is not None: if self.verbose: print(f"[AutoBunny.run] Overriding cheetah_index") self.cheetah_index = cheetah_index # init search df = self.core cheetah_table = None # run for specified steps if self.verbose: print(f"[AutoBunny.run] Starting run loop with df size={len(df)}; checkpoint={checkpoint}; max_papers={max_papers}") print(len(steps), "steps to run") print(steps) for i, s in enumerate(steps): if self.verbose: print(f"\n[AutoBunny.run] ------- STEP {i} START -------") modes = s.modes if self.use_vulture_cheetah: cheetah_settings = s.cheetah_settings vulture_settings = s.vulture_settings step_max_papers = s.max_papers hop_priority = s.hop_priority hop = int(df.type.max()) if self.verbose: print(f"[AutoBunny.run] Current hop={hop}") print(f"[AutoBunny.run] Modes={modes}") print(f"[AutoBunny.run] Step max_papers={step_max_papers}, hop_priority={hop_priority}") if self.use_vulture_cheetah: print(f"[AutoBunny.run] cheetah_settings={cheetah_settings}") print(f"[AutoBunny.run] vulture_settings={vulture_settings}") if checkpoint: path = os.path.join(self.output_dir, f'hop-{hop}.csv') if self.verbose: print(f"[AutoBunny.run] Checkpoint: writing {path}") df.to_csv(path, index=False) if self.use_vulture_cheetah: cheetah_settings['do_results_table'] = True if i == 0 and len(cheetah_settings) > 1: if self.verbose: print("[AutoBunny.run] Pre-step vulture + cheetah on initial df") tmp_df = self.__vulture_clean(df, vulture_settings) tmp_df, cheetah_table = self.__cheetah_filter(tmp_df, cheetah_settings) if cheetah_table is not None: path = os.path.join(self.output_dir, f'cheetah_table-{hop}.csv') cheetah_table.to_csv(path, index=False) if self.verbose: print(f"[AutoBunny.run] Checkpoint: writing {path}") hop_estimate = Bunny.estimate_hop(df, modes[0]) # TODO: fix estimate_hop to use all modes if self.verbose: print(f"[AutoBunny.run] hop_estimate (next hop)={hop_estimate}") if hop_estimate > max_papers: msg = f'Early termination after {i} hops due to max papers in next hop' if self.verbose: print(f"[AutoBunny.run] {msg} (max_papers={max_papers})", file=sys.stderr) print(msg, file=sys.stderr) return df if self.verbose: print("[AutoBunny.run] Executing bunny hop...") df = self.__bunny_hop(df, modes, step_max_papers, hop_priority) if filter_value and filter_type: if self.verbose: print(f"[AutoBunny.run] Applying BunnyFilter {filter_type}={filter_value}") bunny = Bunny() query = BunnyFilter(filter_type, filter_value) subset_df = bunny.apply_filter(df, query, filter_in_core=True, do_author_match=False).reset_index(drop=True) if len(subset_df) < 1: print("No papers for filter_value, using original df without filter.") if self.verbose: print("[AutoBunny.run] Filter produced empty set; keeping unfiltered df") else: if self.verbose: print(f"[AutoBunny.run] Filtered df size={len(subset_df)} (from {len(df)})") df = subset_df if self.use_vulture_cheetah and self.verbose: print("[AutoBunny.run] Running vulture clean...") if self.use_vulture_cheetah: df = self.__vulture_clean(df, vulture_settings) if self.use_vulture_cheetah and self.verbose: print("[AutoBunny.run] Running cheetah filter...") if self.use_vulture_cheetah: df, cheetah_table = self.__cheetah_filter(df, cheetah_settings) # format df # if 'clean_title_abstract' in df.columns: # if self.verbose: # print("[AutoBunny.run] Dropping temporary column 'clean_title_abstract'") # df.drop(columns=['clean_title_abstract'], inplace=True) df = df.reset_index(drop=True) if self.verbose: print(f"[AutoBunny.run] Step {i} complete; df size now {len(df)}") print(f"[AutoBunny.run] ------- STEP {i} END -------") # save final results if checkpointing if checkpoint: hop = int(df.type.max()) final_papers_path = os.path.join(self.output_dir, 'final_bunny_papers.csv') if self.verbose: print(f"[AutoBunny.run] Final checkpoint: writing {final_papers_path}") df.to_csv(final_papers_path, index=False) if self.use_vulture_cheetah and cheetah_table is not None: path = os.path.join(self.output_dir, f'cheetah_table-{hop}.csv') cheetah_table.to_csv(path, index=False) if self.verbose: print(f"[AutoBunny.run] Final checkpoint: writing {path}") if self.verbose: print("[AutoBunny.run] Building final cheetah table aggregation...") final_table = self.__final_cheetah_table() final_cheetah_path = os.path.join(self.output_dir, 'final_cheetah_table.csv') final_table.to_csv(final_cheetah_path, index=False) if self.verbose: print(f"[AutoBunny.run] Final checkpoint: writing {final_cheetah_path}") print(len(df), "papers after all hops") if self.verbose: print("[AutoBunny.run] Completed all steps") return df
### Helpers def __final_cheetah_table(self, stem='cheetah_table'): if self.verbose: print(f"[__final_cheetah_table] Aggregating by stem='{stem}' in {self.output_dir}") files = [x for x in os.listdir(self.output_dir) if x.endswith('.csv') and stem in x] if self.verbose: print(f"[__final_cheetah_table] Found {len(files)} candidate files") frames = {} for f in files: match = re.search(f"{stem}-(\d+).csv", f) if match: x = int(match.group(1)) path = os.path.join(self.output_dir, f) frames[x] = pd.read_csv(path) if self.verbose: print(f"[__final_cheetah_table] Loaded hop={x} file '{f}' with {len(frames[x])} rows") for hop, df in frames.items(): df = df[df.columns[:-2]].copy() num_papers_col = df.columns[-1] df.rename(columns={num_papers_col: f'hop{hop}-{num_papers_col}'}, inplace=True) frames[hop] = df if self.verbose: print(f"[__final_cheetah_table] Renamed count col for hop {hop} -> 'hop{hop}-{num_papers_col}'") frames = list(frames.values()) df = frames[0] for tmp_df in frames[1:]: df = df.merge(tmp_df, on=list(df.columns[:2]), how='outer') if self.verbose: print(f"[__final_cheetah_table] Merged frame; current shape={df.shape}") return df def __bunny_hop(self, df, modes, max_papers, hop_priority): if self.verbose: print(f"[__bunny_hop] modes={modes}, max_papers={max_papers}, hop_priority={hop_priority}") print(f"[__bunny_hop] Using scopus={'yes' if self.scopus_keys is not None else 'no'}; cache_dir={self.cache_dir}") bunny = Bunny(s2_key=self.s2_key, output_dir=self.cache_dir, verbose=self.verbose) use_scopus = self.scopus_keys is not None hop_df = bunny.hop(df, 1, modes, use_scopus=use_scopus, filters=None, max_papers=max_papers, hop_priority=hop_priority, scopus_keys=self.scopus_keys, s2_dir='s2', scopus_dir='scopus') if self.verbose: print(f"[__bunny_hop] Hop returned {len(hop_df)} rows") return hop_df def __cheetah_filter(self, df, cheetah_settings): if self.verbose: print(f"[__cheetah_filter] Settings: {cheetah_settings}") print(f"[__cheetah_filter] Input df size={len(df)}") # index settings cheetah_columns = { 'title': None, 'abstract': 'clean_title_abstract', 'year': 'year', 'author_ids': 'author_ids', 'affiliations': 'affiliations', 'country': 'affiliations', } # preserve the previously filtered papers max_type = df.type.max() df_prev = df.loc[df.type < max_type] df_curr = df.loc[df.type == max_type] if self.verbose: print(f"[__cheetah_filter] df_prev={len(df_prev)}, df_curr={len(df_curr)}, max_type={max_type}") # setup cheetah cheetah = Cheetah(verbose=self.verbose) index_file = os.path.join(self.output_dir, 'cheetah_index.p') if self.verbose: print(f"[__cheetah_filter] Indexing df_curr to {index_file} with columns={cheetah_columns}") cheetah.index(df_curr, columns=cheetah_columns, index_file=index_file, reindex=True) # filter with cheetah if self.verbose: print(f"[__cheetah_filter] Searching with cheetah_settings={cheetah_settings}") cheetah_df, cheetah_table = cheetah.search(**cheetah_settings) if self.verbose: print(f"[__cheetah_filter] cheetah_df size={len(cheetah_df)}") if cheetah_table is not None: print(f"[__cheetah_filter] cheetah_table size={len(cheetah_table)}") # fix the cheetah_table (if being computed) if cheetah_table is not None and not cheetah_table.empty: if self.verbose: print("[__cheetah_filter] Rewriting indices to s2ids in cheetah_table") cheetah_table['included_ids'] = cheetah_table.included_ids.fillna('').str.split(';')\ .apply(lambda x: [int(i) for i in x if i] if x else []) def include_s2ids(indices): if not indices: return None return ';'.join(map(str, df_curr.loc[indices].s2id.to_list())) def exclude_s2ids(indices): all_s2ids = {x for x in df_curr.s2id.to_list() if not pd.isna(x)} if not indices: return ';'.join(list(all_s2ids)) curr_s2ids = set(df_curr.loc[indices].s2id.to_list()) return ';'.join(list(all_s2ids - curr_s2ids)) or None cheetah_table['selected_s2ids'] = cheetah_table.included_ids.apply(include_s2ids) cheetah_table['excluded_s2ids'] = cheetah_table.included_ids.apply(exclude_s2ids) cheetah_table = cheetah_table.drop(columns='included_ids') if self.verbose: print("[__cheetah_filter] cheetah_table indices updated") # combine cheetah filter results with frozen results from previous hops cheetah_df = pd.concat([df_prev, cheetah_df], ignore_index=True) cheetah_df = cheetah_df.drop_duplicates(subset=['s2id'], keep='first') cheetah_df = cheetah_df.reset_index(drop=True) if self.verbose: print(f"[__cheetah_filter] Combined cheetah_df size={len(cheetah_df)} (after concat + dedupe)") return cheetah_df, cheetah_table def __vulture_clean(self, df, vulture_settings): if self.verbose: print(f"[__vulture_clean] Input size={len(df)}; settings={vulture_settings}") # setup vulture vulture = Vulture(n_jobs=-1, cache=self.output_dir, verbose=self.verbose) dataframe_clean_args = { "df": df, "columns": ['title', 'abstract'], "append_to_original_df": True, "concat_cleaned_cols": True, } if vulture_settings: dataframe_clean_args["steps"] = vulture_settings cleaned = vulture.clean_dataframe(**dataframe_clean_args) if self.verbose: print(f"[__vulture_clean] Output size={len(cleaned)}") return cleaned ### Getters / Setters @property def core(self): return self._core @property def s2_key(self): return self._s2_key @property def scopus_keys(self): return self._scopus_keys @property def cheetah_index(self): return self._cheetah_index @property def output_dir(self): return self._output_dir @property def cache_dir(self): return self._cache_dir @core.setter def core(self, core): if not isinstance(core, pd.DataFrame): raise ValueError('AutoBunny expects core to be a SLIC DataFrame!') if 'type' not in core: core['type'] = [0] * len(core) self._core = core @s2_key.setter def s2_key(self, key): if key is not None: self._s2_key = key elif isinstance(key, str): try: ip = SemanticScholar(key=key) self._s2_key = key except ValueError: raise ValueError(f'The key "{key}" was rejected by the Semantic Scholar API') else: raise TypeError(f'Unsupported type "{type(key)}" for Semantic Scholar key') @scopus_keys.setter def scopus_keys(self, scopus_keys): if scopus_keys is None: self._scopus_keys = scopus_keys elif isinstance(scopus_keys, (list, set)): for key in scopus_keys: try: ip = Scopus(keys=[key]) except ValueError: raise ValueError(f'The key "{key}" was rejected by the Scopus API') self._scopus_keys = list(scopus_keys) else: raise TypeError(f'Unsupported type "{type(key)}" for Scopus key') @cheetah_index.setter def cheetah_index(self, cheetah_index): if cheetah_index is None: self._cheetah_index = self.CHEETAH_INDEX elif isinstance(cheetah_index, dict): if not all(key in self.CHEETAH_INDEX for key in cheetah_index.keys()): raise ValueError(f'Invalid index key in `cheetah_index`. Valid keys are in ' f'{list(self.CHEETAH_INDEX.keys())}') # fill in any missing keys from cheetah_index with default self._cheetah_index = {**self.CHEETAH_INDEX, **cheetah_index} else: raise TypeError(f'Unsupported type "{type(cheetah_index)}" for `cheetah_index`') def __process_path(self, path, var_name): if path is None: return pathlib.Path('/tmp') elif isinstance(path, str): _path = pathlib.Path(path) elif isinstance(path, pathlib.Path): _path = path else: raise TypeError(f'Unsupported type "{type(path)}" for `{var_name}`') check_path(_path, var_name) return _path @output_dir.setter def output_dir(self, output_dir): self._output_dir = self.__process_path(output_dir, 'output_dir') @cache_dir.setter def cache_dir(self, cache_dir): self._cache_dir = self.__process_path(cache_dir, 'cache_dir')