import os
import re
import sys
import pathlib
import pandas as pd
from dataclasses import dataclass, field
from .bunny import Bunny
from ..Cheetah import Cheetah
from ...pre_processing.iPenguin.Scopus import Scopus
from ...pre_processing.iPenguin.SemanticScholar import SemanticScholar
from ...pre_processing.Vulture import Vulture
[docs]
@dataclass
class AutoBunnyStep:
"""Class for keeping track of AutoBunny args"""
modes: list
max_papers: int = 0
hop_priority: str = 'random'
cheetah_settings: dict = field(default_factory = lambda: {'query': None})
vulture_settings: list = field(default_factory = lambda: [])
[docs]
class AutoBunny:
CHEETAH_INDEX = {
'title': None,
'abstract': 'clean_title_abstract',
'year': 'year',
'author_ids': 'author_ids',
'affiliations': 'affiliations',
'country': 'affiliations',
}
def __init__(self, core, s2_key=None, scopus_keys=None, output_dir=None, cache_dir=None, cheetah_index=None, verbose=False):
self.core = core
self.s2_key = s2_key
self.scopus_keys = scopus_keys
self.output_dir = output_dir
self.cache_dir = cache_dir
self.cheetah_index = cheetah_index
self.verbose = verbose
[docs]
def run(self, steps, *, s2_key=None, scopus_keys=None, cheetah_index=None, max_papers=250000, checkpoint=True):
# validate input
if not isinstance(steps, (list, tuple)):
steps = [steps]
for i,x in enumerate(steps):
if not isinstance(x, AutoBunnyStep):
raise ValueError(f'Step at index {i} in `steps` is not valid')
if s2_key is not None:
self.s2_key = s2_key
if scopus_keys is not None:
self.scopus_keys = scopus_keys
if cheetah_index is not None:
self.cheetah_index = cheetah_index
# init search
df = self.core
cheetah_table = None
# run for specified steps
for i, s in enumerate(steps):
modes = s.modes
cheetah_settings = s.cheetah_settings
vulture_settings = s.vulture_settings
step_max_papers = s.max_papers
hop_priority = s.hop_priority
hop = int(df.type.max())
if checkpoint:
df.to_csv(os.path.join(self.output_dir, f'hop-{hop}.csv'), index=False)
cheetah_settings['do_results_table'] = True
if i == 0 and len(cheetah_settings) > 1:
tmp_df = self.__vulture_clean(df, vulture_settings)
tmp_df, cheetah_table = self.__cheetah_filter(tmp_df, cheetah_settings)
if cheetah_table is not None:
cheetah_table.to_csv(os.path.join(self.output_dir, f'cheetah_table-{hop}.csv'), index=False)
hop_estimate = Bunny.estimate_hop(df, modes[0]) # TODO: fix estimate_hop to use all modes
if hop_estimate > max_papers:
print(f'Early termination after {i} hops due to max papers in next hop', file=sys.stderr)
return df
df = self.__bunny_hop(df, modes, step_max_papers, hop_priority)
df = self.__vulture_clean(df, vulture_settings)
df, cheetah_table = self.__cheetah_filter(df, cheetah_settings)
# format df
df.drop(columns=['clean_title_abstract'], inplace=True)
df = df.reset_index(drop=True)
# save final results if checkpointing
if checkpoint:
hop = int(df.type.max())
df.to_csv(os.path.join(self.output_dir, 'final_bunny_papers.csv'), index=False)
if cheetah_table is not None:
cheetah_table.to_csv(os.path.join(self.output_dir, f'cheetah_table-{hop}.csv'), index=False)
final_table = self.__final_cheetah_table()
final_table.to_csv(os.path.join(self.output_dir, 'final_cheetah_table.csv'), index=False)
return df
### Helpers
def __final_cheetah_table(self, stem='cheetah_table'):
files = [x for x in os.listdir(self.output_dir) if x.endswith('.csv') and stem in x]
frames = {}
for f in files:
match = re.search(f"{stem}-(\d+).csv", f)
if match:
x = int(match.group(1))
frames[x] = pd.read_csv(os.path.join(self.output_dir, f))
for hop, df in frames.items():
df = df[df.columns[:-2]].copy()
num_papers_col = df.columns[-1]
df.rename(columns={num_papers_col: f'hop{hop}-{num_papers_col}'}, inplace=True)
frames[hop] = df
frames = list(frames.values())
df = frames[0]
for tmp_df in frames[1:]:
df = df.merge(tmp_df, on=list(df.columns[:2]), how='outer')
return df
def __bunny_hop(self, df, modes, max_papers, hop_priority):
bunny = Bunny(s2_key=self.s2_key, output_dir=self.cache_dir, verbose=self.verbose)
use_scopus = self.scopus_keys is not None
hop_df = bunny.hop(df, 1, modes, use_scopus=use_scopus, filters=None, max_papers=max_papers, hop_priority=hop_priority,
scopus_keys=self.scopus_keys, s2_dir='s2', scopus_dir='scopus')
return hop_df
def __cheetah_filter(self, df, cheetah_settings):
# index settings
cheetah_columns = {
'title': None,
'abstract': 'clean_title_abstract',
'year': 'year',
'author_ids': 'author_ids',
'affiliations': 'affiliations',
'country': 'affiliations',
}
# preserve the previously filtered papers
max_type = df.type.max()
df_prev = df.loc[df.type < max_type]
df_curr = df.loc[df.type == max_type]
# setup cheetah
cheetah = Cheetah(verbose=self.verbose)
index_file = os.path.join(self.output_dir, 'cheetah_index.p')
cheetah.index(df_curr,
columns=cheetah_columns,
index_file=index_file,
reindex=True)
# filter with cheetah
cheetah_df, cheetah_table = cheetah.search(**cheetah_settings)
# fix the cheetah_table (if being computed)
# the cheetah table uses indices set by df. These indices will be reset by the rest of
# this function. It is more robust to replace indices with s2ids.
if cheetah_table is not None and not cheetah_table.empty:
cheetah_table['included_ids'] = cheetah_table.included_ids.fillna('').str.split(';')\
.apply(lambda x: [int(i) for i in x if i] if x else [])
def include_s2ids(indices):
if not indices:
return None
return ';'.join(map(str, df_curr.loc[indices].s2id.to_list()))
def exclude_s2ids(indices):
all_s2ids = {x for x in df_curr.s2id.to_list() if not pd.isna(x)}
if not indices:
return ';'.join(list(all_s2ids))
curr_s2ids = set(df_curr.loc[indices].s2id.to_list())
return ';'.join(list(all_s2ids - curr_s2ids)) or None
cheetah_table['selected_s2ids'] = cheetah_table.included_ids.apply(include_s2ids)
cheetah_table['excluded_s2ids'] = cheetah_table.included_ids.apply(exclude_s2ids)
cheetah_table = cheetah_table.drop(columns='included_ids')
# combine cheetah filter results with frozen results from previous hops
cheetah_df = pd.concat([df_prev, cheetah_df], ignore_index=True)
cheetah_df = cheetah_df.drop_duplicates(subset=['s2id'], keep='first')
cheetah_df = cheetah_df.reset_index(drop=True)
return cheetah_df, cheetah_table
def __vulture_clean(self, df, vulture_settings):
# setup vulture
vulture = Vulture(n_jobs=-1, cache=self.output_dir, verbose=self.verbose)
dataframe_clean_args = {
"df": df,
"columns": ['title', 'abstract'],
"append_to_original_df": True,
"concat_cleaned_cols": True,
}
if vulture_settings:
dataframe_clean_args["steps"] = vulture_settings
return vulture.clean_dataframe(**dataframe_clean_args)
### Getters / Setters
@property
def core(self):
return self._core
@property
def s2_key(self):
return self._s2_key
@property
def scopus_keys(self):
return self._scopus_keys
@property
def cheetah_index(self):
return self._cheetah_index
@property
def output_dir(self):
return self._output_dir
@property
def cache_dir(self):
return self._cache_dir
@core.setter
def core(self, core):
if not isinstance(core, pd.DataFrame):
raise ValueError('AutoBunny expects core to be a SLIC DataFrame!')
if 'type' not in core:
core['type'] = [0] * len(core)
self._core = core
@s2_key.setter
def s2_key(self, key):
if key is not None:
self._s2_key = key
elif isinstance(key, str):
try:
ip = SemanticScholar(key=key)
self._s2_key = key
except ValueError:
raise ValueError(f'The key "{key}" was rejected by the Semantic Scholar API')
else:
raise TypeError(f'Unsupported type "{type(key)}" for Semantic Scholar key')
@scopus_keys.setter
def scopus_keys(self, scopus_keys):
if scopus_keys is None:
self._scopus_keys = scopus_keys
elif isinstance(scopus_keys, (list, set)):
for key in scopus_keys:
try:
ip = Scopus(keys=[key])
except ValueError:
raise ValueError(f'The key "{k}" was rejected by the Scopus API')
self._scopus_keys = list(scopus_keys)
else:
raise TypeError(f'Unsupported type "{type(key)}" for Scopus key')
@cheetah_index.setter
def cheetah_index(self, cheetah_index):
if cheetah_index is None:
self._cheetah_index = self.CHEETAH_INDEX
elif isinstance(cheetah_index, dict):
if not all(key in self.CHEETAH_INDEX for key in cheetah_index.keys()):
raise ValueError(f'Invalid index key in `cheetah_index`. Valid keys are in '
f'{list(self.CHEETAH_INDEX.keys())}')
# fill in any missing keys from cheetah_index with default
self._cheetah_index = {**self.CHEETAH_INDEX, **cheetah_index}
else:
raise TypeError(f'Unsupported type "{type(cheetah_index)}" for `cheetah_index`')
def __check_path(self, path, var_name):
if path.exists() and path.is_file(): # handle the path already existing as file
raise ValueError(f'The path `{var_name}` points to a file instead of a directory')
if not path.exists():
path.mkdir(parents=True) # parents=True ensures all missing parent directories are also created
def __check_path(self, path, var_name):
"""
Checks and ensures the given path exists as a directory. If path does not exist, a new directory
will be created. If the path exists but is a file, a ValueError will be raised. A TypeError is
raised if the provided path is neither a string nor a `pathlib.Path` object.
Parameters:
-----------
path: str, pathlib.Path
The path to be checked and ensured as a directory.
Raises:
-------
TypeError:
If the provided path is neither a string nor a `pathlib.Path` object.
ValueError:
If the path points to an existing file.
"""
if isinstance(path, str):
path = pathlib.Path(path)
if not isinstance(path, pathlib.Path):
raise TypeError(f'Unsupported type "{type(path)}" for `path`')
path = path.resolve()
if path.exists():
if path.is_file():
raise ValueError(f'`{var_name}` points to a file instead of a directory')
else:
path.mkdir(parents=True, exist_ok=True)
def __process_path(self, path, var_name):
if path is None:
return pathlib.Path('/tmp')
elif isinstance(path, str):
_path = pathlib.Path(path)
elif isinstance(path, pathlib.Path):
_path = path
else:
raise TypeError(f'Unsupported type "{type(path)}" for `{var_name}`')
self.__check_path(_path, var_name)
return _path
@output_dir.setter
def output_dir(self, output_dir):
self._output_dir = self.__process_path(output_dir, 'output_dir')
@cache_dir.setter
def cache_dir(self, cache_dir):
self._cache_dir = self.__process_path(cache_dir, 'cache_dir')