Source code for TELF.applications.Penguin.penguin

import re
import sys
import pathlib
import warnings
import pandas as pd
from tqdm import tqdm
from rbloom import Bloom
from joblib import Parallel, delayed

from pymongo import MongoClient, ASCENDING
from pymongo.errors import ConnectionFailure

from .utils import verify_n_jobs, verify_attributes, create_text_query, transform_dict
from .crocodile import process_scopus_json, process_scopus_xml, process_s2_json 
from .crocodile import form_scopus_df, form_s2_df, form_df

[docs] class Penguin: """ A handler class for managing connections and operations with a documents MongoDB database. The purpose of this class it to create a software layer to easily store and query Scopus and S2 documents. Penguin provides functionality to connect to a MongoDB database, perform read and write operations, and handle authentication for secure database interactions. It supports operations such as adding data, retrieving data, and verifying database integrity. """ # define the default collection names S2_COL = 'S2' SCOPUS_COL = 'Scopus' ## Define the default search settings # S2 and Scopus papers are defined DEFAULT_ATTRIBUTES = { 'S2': { 'id': 'paperId', 'doi': 'externalIds.DOI', 'text': ['title', 'abstract'], 'author_ids': ('authors', 'authorId'), 'citations': 'citations', 'references': 'references', }, 'Scopus': { 'id': 'eid', 'doi': 'doi', 'text': ['title', 'abstract'], 'author_ids': ('bibrecord.head.author-group.author', '@auid'), 'citations': None, 'references': None, }, } def __init__(self, uri, db_name, username=None, password=None, n_jobs=-1, verbose=False): """ Initializes the Penguin instance with the specified URI, database name, and optional settings. This constructor method sets up the Penguin instance by initializing the MongoDB URI, database name, verbosity of output, and number of jobs for parallel processing (if applicable). It then attempts to establish a connection to the MongoDB database by calling Penguin._connect(). Parameters: ----------- uri: str The MongoDB URI used to establish a connection. db_name: str The name of the database to which the connection will be established. username: str, (optional) The username for the Mongo database. If None, will try to use DB without authentication. Default is None. password: str, (optional) The password for the Mongo database. If None, will try to use DB without authentication. Default is None. n_jobs: int, (optional) The number of jobs for parallel processing. Note that this setting is only used for adding new data to the database and converting documents to SLIC DataFrame format for output. Default is -1 (use all available cores). verbose: bool, int (optional) If set to True, the class will print additional output for debugging or information purposes. Can also be an int where verbose >= 1 means True with a higher integer controlling the level of verbosity. Default is True. Raises: ------- ValueError: Attribute was given an invalid value TypeError: Attribute has an invalid type ConnectionFailure: If the connection to the MongoDB instance fails during initialization. """ self.uri = uri self.db_name = db_name self.username = username self.password = password self.n_jobs = n_jobs self.verbose = verbose self.client = None self.db = None self._s2_attributes = self.DEFAULT_ATTRIBUTES[self.S2_COL] self._scopus_attributes = self.DEFAULT_ATTRIBUTES[self.SCOPUS_COL] self._connect() # 1. Connecting to the Penguin Database def _connect(self): """ Establishes a connection to the MongoDB database and validates its structure. This method attempts to connect to a MongoDB instance using the MongoDB URI and database name provided during the class initialization. It first tries to establish a client connection to MongoDB and then selects the specified database. The method then checks if the MongoDB server is reachable by issuing the 'ismaster' command. If the command fails, it raises a ConnectionFailure exception. After establishing a successful connection, the method validates the database structure by ensuring the presence of the Scopus and S2 collections. The database must contain these collections if using the database in read-only mode. Raises: ------- ConnectionFailure If the connection to MongoDB fails or the 'ismaster' command cannot be executed, indicating a connection issue. ValueError If the database does not contain the required Scopus/S2 collections """ try: # create the uri with authentication if provided uri = f"mongodb://{self.username}:{self.password}@{self.uri}" if self.username and self.password else f"mongodb://{self.uri}" # connect to DB self.client = MongoClient(uri) self.db = self.client[self.db_name] # check if connection is succesful try: self.client.admin.command('ismaster') except Exception as e: raise ConnectionFailure(f'Failed to connect to database {self.db_name}: {e}') # make sure databse has expected collections for Scopus and S2 documents if not self._validate_database([self.S2_COL, self.SCOPUS_COL]): raise ValueError('MongoDB connection successful, but the database is not valid.') elif self.verbose: print('[Penguin]: MongoDB connection successful and database is valid.', file=sys.stderr) except ConnectionFailure as e: raise(e) def _validate_database(self, required_collections): """ Validates the presence of required collections in the connected MongoDB database. This method checks if all specified collections in 'required_collections' exist in the currently connected MongoDB database. If the database connection has not been established ('self.db' is None), the validation fails immediately. If any off the collections do not exist, they are created and an index is set on either the Scopus or S2 id. If all required collections are present, it returns True. Parameters: ----------- required_collections: [str] A list of collection names that are required to be present in the database. Returns: -------- bool True if all required collections are present in the database, False otherwise. """ if self.db is None: return False collection_name = 'your_collection_name' existing_collections = self.db.list_collection_names() for col_name in required_collections: if col_name not in existing_collections: # collection does not yet exist collection = self.db[col_name] # create an index on either the scopus or s2 id if col_name == 'S2': index_field = self.s2_attributes['id'] elif col_name == 'Scopus': index_field = self.scopus_attributes['id'] else: warnings.warn(f'Unknown collection name: {col_name!r}') return False collection.create_index([(index_field, ASCENDING)], unique=True) # validated collection name, added index return True # 2. Adding documents to the Penguin Database
[docs] def add_many_documents(self, directory, source, overwrite=True, n_jobs=None): """ Processes a directory of document files that need to be added to the database. The function can add both Scopus and S2 documents depending on the `source` agument. Documents will be added in parallel Parameters: ----------- directory: str, pathlib.Path The directory containing files to be added source: str The data source (either 'Scopus' or 'S2') overwrite: bool, (optional) If True and paper id already exists in the collection then the associated data will be updated/overwritten by the new data. Otherwise, this paper id is skipped. If paper id does not already exist in collection, this flag has no effect. Default is True. n_jobs: int, (optional) The number of jobs to run in parallel. If None, the class default for n_jobs will be used. Default is None. """ if n_jobs is not None: self.n_jobs = n_jobs if self.n_jobs == 1: for file_path in tqdm(self._file_path_generator(directory, ['.json', '.xml']), disable = not self.verbose): self.add_single_document(file_path, source, overwrite=overwrite) else: Parallel(n_jobs=self.n_jobs, backend='threading', verbose=self.verbose)( delayed(self.add_single_document)(file_path, source, overwrite=overwrite) for file_path in self._file_path_generator(directory, ['.json', '.xml']))
[docs] def add_single_document(self, file_path, source, overwrite=True): """ Processes a single data file and adds / updates its content in the database. This function handles the addition of a Scopus or S2 data file. If the file is from Scopus it will be added to the Scopus collection. If S2, it will be added to the S2 collection. In the case of Scopus, the function can hande JSON input (as expected from the Scopus API or iPenguin.Scopus) or XML input (the format used by the purchased data). Parameters: ----------- file_path: str The path to the data file to be processed. source: str The data source (either 'Scopus' or 'S2') overwrite: bool, (optional) If True and paper id already exists in the collection then the associated data will be updated/overwritten by the new data. Otherwise, this paper id is skipped. If paper id does not already exist in collection, this flag has no effect. Default is True. Returns: -------- None Document is added or updated in the database Raises: ------- ValueError If `source` does not match an expected value """ if self.S2_COL.lower() == source.lower(): self._add_single_s2_file(file_path, overwrite=overwrite) elif self.SCOPUS_COL.lower() == source.lower(): self._add_single_scopus_file(file_path, overwrite=overwrite) else: raise ValueError(f'Unknown `source` {source!r}')
def _file_path_generator(self, directory, ext): """ Generator function that yields paths to files with certain extensions in the given directory. Parameters: ----------- directory: str, pathlib.Path The directory to search for files ext: str, list The extension(s) for which to search in the directory. If `ext` is a string then the function assumes that a single extension is being sought. If `ext` is a list then it will check for multiple extensions. The file extension(s) should be provided in the format of '.EXT' (ex: '.json') Yields: ------- pathlib.Path Path object representing a file. """ dir_path = pathlib.Path(directory) if isinstance(ext, str): ext = [ext] ext_set = set(ext) for file_path in dir_path.iterdir(): if file_path.is_file() and file_path.suffix in ext_set: yield file_path def _add_single_s2_file(self, file_path, overwrite=True): """ Processes a single S2 data file for addition Parameters: ----------- file_path: str The path to the data file to be processed. overwrite: bool, (optional) If True and paper id already exists in the collection then the associated data will be updated/overwritten by the new data. Otherwise, this paper id is skipped. Default is True. Returns: -------- None Document is added or updated in the database Raises: ------- ValueError If path is invalid """ if not isinstance(file_path, pathlib.Path): file_path = pathlib.Path(file_path) data = None if file_path.suffix == '.json': data = process_s2_json(file_path, output_dir=None) else: raise ValueError(f'S2 `file_path` has an unsupported file extension: {source!r}') collection = self.db[self.S2_COL] id_attr = self.s2_attributes['id'] if data: existing_entry = collection.find_one({id_attr: data[id_attr]}) if not existing_entry: # if not exists in collection, add to collection collection.insert_one(data) elif overwrite: # if existing_entry and overwrite, update data for id collection.update_one({id_attr: data[id_attr]}, {"$set": data}) else: # do nothing if existing_entry and no overwrite pass def _add_single_scopus_file(self, file_path, overwrite=True): """ Processes a single Scopus data file for addition Parameters: ----------- file_path: str The path to the data file to be processed. overwrite: bool, (optional) If True and paper id already exists in the collection then the associated data will be updated/overwritten by the new data. Otherwise, this paper id is skipped. Default is True. Returns: -------- None Document is added or updated in the database Raises: ------- ValueError If path is invalid """ if not isinstance(file_path, pathlib.Path): file_path = pathlib.Path(file_path) data = None if file_path.suffix == '.xml': data = process_scopus_xml(file_path, output_dir=None) elif file_path.suffix == '.json': data = process_scopus_json(file_path, output_dir=None) else: raise ValueError(f'Scopus `file_path` has an unsupported file extension: {source!r}') collection = self.db[self.SCOPUS_COL] id_attr = self.scopus_attributes['id'] if data: existing_entry = collection.find_one({id_attr: data[id_attr]}) if not existing_entry: # if not exists in collection, add to collection collection.insert_one(data) elif overwrite: # if existing_entry and overwrite, update data for id collection.update_one({id_attr: data[id_attr]}, {"$set": data}) else: # do nothing if existing_entry and no overwrite pass # 3. Search the Penguin Database
[docs] def count_documents(self): """ Return the number of documents in the Scopus and S2. Returns: -------- dict: A dictionary with keys 'scopus' and 's2' containing the number of documents in each respective collection. Raises: ------- Exception If there is an error accessing the database collections. """ try: scopus_count = self.db[self.SCOPUS_COL].count_documents({}) s2_count = self.db[self.S2_COL].count_documents({}) return { 'scopus': scopus_count, 's2': s2_count, } except Exception as e: raise Exception(f"Error accessing the database collections: {e}")
[docs] def query_by_author(self, collection_name, doc_id, id_attribute='paperId', author_attribute_list='authors', author_attribute='authorId'): """ Query a MongoDB collection for a document by a direct match and then locate all of the other papers from the authors of the target paper that are present in the DB. Parameters: ----------- collection_name: str The name of the MongoDB collection to query. doc_id: str The document ID which to look up. id_attribute: str The name of the attribute where the document ID is stored. Defaults to 'paperId' for S2 papers. author_attribute: str The name of the attribute in the documents that contains author information Returns: -------- list: A list of citing papers """ # query to find a document with given id collection = self.db[collection_name] target_document = collection.find_one({id_attribute: doc_id}) if target_document is None: return [] # extract the list of citing papers author_ids = [item[author_attribute] for item in target_document.get(author_attribute_list, []) if author_attribute in item] return collection.find({f"{author_attribute_list}.{author_attribute}": {"$in": author_ids}})
# 4. Tagging
[docs] def resolve_document_id(self, pid): """ Resolves the collection and unique identifier (uid) for a given document_id. The document id can either be a Scopus id or a SemanticScholar id. Parameters: ----------- document_id: str The identifier for the document that needs to be resolved. This should be a document id (either S2 or Scopus). The id should be prepended by either 'eid:' to signify a Scopus document or 's2id:' to signify an S2 document. Returns: -------- tuple A tuple containing the unique identifier (uid) and the associated collection. """ prefix, _, actual_id = pid.partition(':') if not actual_id: raise ValueError(f'Encountered unknown `id`: {doc_id}. Prepend id with eid: or s2id: to specify id type') document = None collection = None if prefix == 'eid': collection = self.db[self.SCOPUS_COL] secondaryId = self.scopus_attributes['id'] document = collection.find_one({secondaryId: actual_id}) elif prefix == 's2id': collection = self.db[self.S2_COL] secondaryId = self.s2_attributes['id'] document = collection.find_one({secondaryId: actual_id}) else: raise ValueError(f'Encountered unknown document id prefix: {prefix}. Valid options are in ["eid", "s2id"]') if document: uid = document['_id'] return uid, collection else: return None, None
[docs] def add_tag(self, document_id, tag): """ Adds a tag to the specified document. Parameters: ----------- document_id: str The identifier for the document to which the tag will be added. This should be a document id (either S2 or Scopus). The id should be prepended by either 'eid:' to signify a Scopus document or 's2id:' to signify an S2 document. tag: str The tag to be added to the document. Returns: -------- None Example: -------- >>> penguin.add_tag('eid:[eid here]', 'tensor') >>> penguin.add_tag('s2id:[s2id here]', 'tensor') """ uid, collection = self.resolve_document_id(document_id) if uid and collection is not None: collection.update_one( {"_id": uid}, {"$addToSet": {"tags": tag}} ) else: raise KeyError(f'Document with id {document_id!r} not found in DB.')
[docs] def remove_tag(self, document_id, tag): """ Removes a tag from the specified document. Parameters: ----------- document_id: str The identifier for the document to which the tag will be removed. This should be a document id (either S2 or Scopus). The id should be prepended by either 'eid:' to signify a Scopus document or 's2id:' to signify an S2 document. tag: str The tag to be removed from the document. Returns: -------- None Example: -------- >>> penguin.remove_tag('eid:[eid here]', 'tensor') >>> penguin.remove_tag('s2id:[s2id here]', 'tensor') """ uid, collection = self.resolve_document_id(document_id) if uid and collection is not None: collection.update_one( {"_id": uid}, {"$pull": {"tags": tag}} ) else: raise KeyError(f'Document with id {document_id!r} not found in DB.')
[docs] def update_tags(self, document_id, new_tags): """ Updates the tags for the specified document with a new set of tags. Used for updating the entire list of tags at once Parameters: ----------- document_id: str The identifier for the document to which the tags will be modified. This should be a document id (either S2 or Scopus). The id should be prepended by either 'eid:' to signify a Scopus document or 's2id:' to signify an S2 document. new_tags: list The new set of tags to be assigned to the document. Returns: -------- None Example: -------- >>> penguin.update_tags('eid:[eid here]', ['tensor', 'PDE']) >>> penguin.update_tags('s2id:[s2id here]', ['tensor', 'PDE']) """ uid, collection = self.resolve_document_id(document_id) if uid and collection is not None: collection.update_one( {"_id": uid}, {"$set": {"tags": new_tags}} ) else: raise KeyError(f'Document with id {document_id!r} not found in DB.')
[docs] def find_by_tag(self, tag, as_pandas=True): """ Finds and returns documents that have the specified tag. Parameters: ----------- tag: str The tag to filter documents by. as_pandas: bool, (optional) If True, returns the search results as a SLIC DataFrame. If False, returns a dictionary Returns: -------- pandas.DataFrame or dict If 'as_pandas' is True, returns a Pandas DataFrame containing the combined search results from for the tag from Scopus and S2 collections. If False, returns a dictionary with keys 'scopus' and 's2' containing the cursors to the search results from the respective collections. """ if as_pandas: query = {"tags": tag} return self._query_to_pandas(query, query) else: out = {} out['scopus'] = self.db[self.SCOPUS_COL].find({"tags": tag}) out['s2'] = self.db[self.S2_COL].find({"tags": tag}) return out
# 5. iPenguin Hook
[docs] def get_id_bloom(self, source, max_items=1.25, false_positive_rate=0.001): """ Initializes a Bloom filter with IDs from a specified source collection. This method selects a collection based on the 'source' parameter, which should match one of the predefined S2 or Scopus collection names (S2_COL or SCOPUS_COL). It then retrieves IDs from the selected collection and adds them to a Bloom filter. The Bloom filter is configured based on the estimated number of items ('max_items') and the desired false positive rate. Parameters: ----------- source: str The name of the source collection from which to retrieve IDs. It should correspond to either SCOPUS_COL or S2_COL. max_items: float, int, (optional) The maximum number of items expected to be stored in the Bloom filter. This can be a fixed integer or a float representing a multiplier of the current document count in the collection. Default is 1.25. false_positive_rate: float, (optional) The desired false positive probability for the Bloom filter. Default is 0.001. Returns: -------- rbloom.Bloom An instance of a Bloom filter populated with IDs from the specified collection. Raises: ------- ValueError If 'source' does not match any of the predefined collection names, or if 'max_items' is not a float or an int. Notes: ------ - The Bloom filter is a probabilistic data structure that is used to test whether an element is a member of a set. False positive matches are possible, but false negatives are not. - The 'max_items' parameter impacts the size and the false positive rate of the Bloom filter. Adjusting this parameter can optimize the performance and accuracy based on the expected dataset size. - The function retrieves only the ID attribute from the documents in the collection, excluding the MongoDB '_id' field, to populate the Bloom filter. """ id_attr = None collection = None if self.S2_COL.lower() == source.lower(): collection = self.db[self.S2_COL] id_attr = self.s2_attributes['id'] elif self.SCOPUS_COL.lower() == source.lower(): collection = self.db[self.SCOPUS_COL] id_attr = self.scopus_attributes['id'] else: raise ValueError(f'Unknown `source` {source!r}') num_documents = collection.count_documents({}) if isinstance(max_items, int): num_documents = max_items elif isinstance(max_items, float): num_documents *= max_items num_documents = int(num_documents) else: raise ValueError('`max_items` is expected to be a float or an int') bf = Bloom(num_documents, false_positive_rate) # initialize a bloom filter cursor = collection.find({}, {id_attr: 1, '_id': 0}) # retrieve only eid/s2id attribute and exclude '_id' field for doc in cursor: if id_attr in doc: if not isinstance(doc[id_attr], str): continue if id_attr == 'eid': bf.add(doc[id_attr][7:]) # remove the '2-s2.0-' prefix from Scopus ids else: bf.add(doc[id_attr]) return bf
def _query_to_pandas(self, scopus_query, s2_query): """ Retrieves data based on two separate queries to Scopus and S2 collections and transforms them into a single Pandas DataFrame in SLIC format. This method executes two different queries using the `_query_to_pandas_helper` method for each. The first query (`scopus_query`) is executed against a collection specified by `self.SCOPUS_COL`, and the second query (`s2_query`) is executed against another collection specified by `self.S2_COL`. Each query's results are independently transformed into a DataFrames using their own handler functions. The two DataFrames are then combined into a single DataFrame by the `form_df` function, which is in the standard SLIC format. Parameters: ----------- scopus_query: dict The MongoDB query to be executed against the Scopus documents s2_query: dict The MongoDB query to be executed against the S2 documents Returns: -------- pd.DataFrame The joint DataFrame for both Scopus and S2. Formatted to be used with the rest of the Zoo libraries. """ scopus_df = self._query_to_pandas_helper(scopus_query, self.SCOPUS_COL, form_scopus_df) s2_df = self._query_to_pandas_helper(s2_query, self.S2_COL, form_s2_df) return form_df(scopus_df, s2_df) def _query_to_pandas_helper(self, query, col, processor): """ Retrieves data from a MongoDB collection based on a query and turns it into a Pandas DataFrame. This method first divides the result set of the query into multiple cursors for parallel processing, if applicable. Each cursor represents a subset of the query results. The method then processes these subsets in parallel, converts them into Pandas DataFrame slices, and concatenates these slices into a single DataFrame. If the number of jobs (cursors) is one, it processes the query result without parallelization. If the query is None, it returns an empty DataFrame with the appropriate S2 or Scopus headers. Parameters: ----------- query: dict or None The MongoDB query to execute. If None, an empty DataFrame is returned. col: str The name of the MongoDB collection to query. processor: callable A function that takes a cursor, an empty list, (or any other iterable) and returns a Pandas DataFrame. This function is responsible for converting the raw data from MongoDB into a DataFrame format. The currently available options are `form_scopus_df()` and `form_s2_df()` for the Scopus collection and the S2 collection respectively. Returns: -------- pd.DataFrame A DataFrame containing the data retrieved from MongoDB based on the provided query, processed by the 'processor' function. If the query is None, returns an empty DataFrame with just the header. """ if query is not None: cursors = self._parallel_cursors(self.db[col], query) n_jobs = len(cursors) if n_jobs > 1: # generate dataframe slices in parallel jobs = Parallel(n_jobs=len(cursors), backend='threading', verbose=self.verbose)( delayed(processor)(c) for c in cursors) df = pd.concat(jobs, sort=False) return df.reset_index(drop=True) else: return processor(next(iter(cursors))) # single core processing else: return processor([]) # create empty df (just header basically) def _parallel_cursors(self, collection, query): """ Generates a list of cursors for a MongoDB collection, each pointing to a subset of documents based on the specified query. The total number of documents matched by the query is divided as evenly as possible among the specified number of jobs (`self.n_jobs`). Parameters: ----------- collection : pymongo.collection.Collection The MongoDB collection to query. query : dict The query to execute on the collection. Returns: -------- list A list of pymongo.cursor.Cursor objects, each configured with skip and limit to iterate over a subset of the query results. The division of documents is as even as possible, with any remainder from the division process distributed among the first cursors in the list. Note: ----- This function is particularly useful for parallel processing of MongoDB query results. Care should be taken when using a large number of cursors, as each cursor consumes server resources. """ total_docs = collection.count_documents(query) n_jobs = min(self.n_jobs, total_docs) if total_docs > 0 else 1 chunk_size, extras = divmod(total_docs, n_jobs) cursors = [] skip_amount = 0 for i in range(n_jobs): limit_amount = chunk_size + (1 if i < extras else 0) cursor = collection.find(query).skip(skip_amount).limit(limit_amount) cursors.append(cursor) skip_amount += limit_amount return cursors # Setters / Getters @property def uri(self): return self._uri @uri.setter def uri(self, value): if not isinstance(value, str): raise TypeError('`uri` must be a string!') if not re.match(r'^[a-zA-Z0-9.-]+:[0-9]+$', value): raise ValueError("`uri` must be in the format 'hostname:port'") self._uri = value @property def db_name(self): return self._db_name @db_name.setter def db_name(self, value): if not isinstance(value, str): raise ValueError('`db_name` must be a string!') self._db_name = value @property def username(self): return self._username @username.setter def username(self, value): if value is not None and (not isinstance(value, str) or not value.strip()): raise ValueError("`username` must be a non-empty string or None") self._username = value @property def password(self): return self._password @password.setter def password(self, value): if value is not None and (not isinstance(value, str) or not value.strip()): raise ValueError("`password` must be a non-empty string or None") self._password = value @property def n_jobs(self): return self._n_jobs @n_jobs.setter def n_jobs(self, n_jobs): self._n_jobs = verify_n_jobs(n_jobs) @property def scopus_attributes(self): return self._scopus_attributes @scopus_attributes.setter def scopus_attributes(self, scopus_attributes): self._scopus_attributes = verify_attributes(scopus_attributes, self.scopus_attributes, 'scopus_attributes') @property def s2_attributes(self): return self._s2_attributes @s2_attributes.setter def s2_attributes(self, s2_attributes): self._s2_attributes = verify_attributes(s2_attributes, self.s2_attributes, 's2_attributes') @property def verbose(self): return self._verbose @verbose.setter def verbose(self, verbose): if isinstance(verbose, bool): self._verbose = int(verbose) # convert False to 0, True to 1 elif isinstance(verbose, int): if verbose < 0: raise ValueError('Integer values for `verbose` must be non-negative!') self._verbose = verbose else: raise TypeError('`verbose` should be of type bool or int!')