Source code for TELF.pre_processing.Vulture.vulture

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
© 2022. Triad National Security, LLC. All rights reserved.
This program was produced under U.S. Government contract 89233218CNA000001 for Los Alamos
National Laboratory (LANL), which is operated by Triad National Security, LLC for the U.S.
Department of Energy/National Nuclear Security Administration. All rights in the program are
reserved by Triad National Security, LLC, and the U.S. Department of Energy/National Nuclear
Security Administration. The Government is granted for itself and others acting on its behalf a
nonexclusive, paid-up, irrevocable worldwide license in this material to reproduce, prepare
derivative works, distribute copies to the public, perform publicly and display publicly, and to permit
others to do so.
"""
import os
import sys
import uuid
import pickle
import pathlib
import warnings
import numpy as np
import pandas as pd
from tqdm import tqdm
import multiprocessing
from joblib import Parallel, delayed, parallel_backend

from TELF.pre_processing.Vulture.modules import SimpleCleaner
from TELF.pre_processing.Vulture.modules import LemmatizeCleaner
from TELF.pre_processing.Vulture.modules import SubstitutionCleaner
from TELF.pre_processing.Vulture.modules import RemoveNonEnglishCleaner
from TELF.pre_processing.Vulture.modules import NEDetector
from TELF.pre_processing.Vulture.default_stop_words import STOP_WORDS
from TELF.pre_processing.Vulture.default_stop_phrases import STOP_PHRASES

try:
    from mpi4py import MPI
except:
    MPI = None


[docs] def chunk_tuple_list(l, n_chunks): """ Splits the given list of (key, value) tuples into sub-lists. Parameters ---------- l: list of tuple List of (key, value) tuples to split. n_chunks: int How many sets of sub-lists to create. Yields ------ list Sub-list containing (key, value) tuples. """ if n_chunks <= 0: return # determine the chunk size and the remainder chunk_size, remainder = divmod(len(l), n_chunks) start = 0 for i in range(n_chunks): end = start + chunk_size + (i < remainder) yield l[start:end] start = end
[docs] class Vulture: """ Vulture is a parallel, multi-node parallel, and distributed parallel document pre-processing tool. It is designed to be simple and fast. Vultures are natures' cleaners! """ PARALLEL_BACKEND_OPTIONS = {'loky', 'multiprocessing', 'threading'} DEFAULT_OPERATOR_PIPELINE = [ NEDetector(library="en_core_web_trf") ] DEFAULT_PIPELINE = [ SimpleCleaner(stop_words = STOP_WORDS, stop_phrases = STOP_PHRASES, order = [ 'standardize_hyphens', 'isolate_frozen', 'remove_copyright_statement', 'remove_stop_phrases', 'make_lower_case', 'remove_formulas', 'normalize', 'remove_next_line', 'remove_email', 'remove_()', 'remove_[]', 'remove_special_characters', 'remove_nonASCII_boundary', 'remove_nonASCII', 'remove_tags', 'remove_stop_words', 'remove_standalone_numbers', 'remove_extra_whitespace', 'min_characters', ] ), ] def __init__(self, *, n_jobs = -1, n_nodes = 1, parallel_backend = "multiprocessing", cache = '/tmp', verbose = False): self.n_jobs = n_jobs self.n_nodes = n_nodes self.parallel_backend = parallel_backend self.cache = cache self.save_path = None self.verbose = verbose # init multi-node (MPI) attributes self.comm = None self.rank = 0 self.size = 1 if self.use_mpi(): self.mpi = MPI self.comm = self.mpi.COMM_WORLD self.rank = self.comm.Get_rank() self.size = self.comm.Get_size() # generate unique ID only on the root process if self.rank == 0: self.unique_id = str(uuid.uuid4()) else: self.unique_id = None # broadcast unique_id from root process to all other processes if self.comm is not None: self.unique_id = self.comm.bcast(self.unique_id, root=0)
[docs] def operate(self, documents, steps=None, save_path=None, file_name=""): if steps is None: steps = self.DEFAULT_OPERATOR_PIPELINE.copy() for step in steps: if "module_type" in vars(step): assert step.module_type == "OPERATOR", "This method can only be used with a OPERATOR type module." # transform documents into list of tuples operate_documents = list(documents.items()) if self.verbose and self.rank == 0: print(f'[Vulture]: Cleaning {len(operate_documents)} documents', file=sys.stderr) # prepare for MPI by chunking data and saving chunks (assuming DFS) if self.use_mpi(): self._mpi_init(operate_documents) self.comm.Barrier() operate_documents = self._mpi_load_chunk_from_disk(self.rank, is_clean=False) # perform operation all_results = [] for step in steps: if save_path is not None: self.save_path = os.path.join(save_path, file_name, f'_{step.__class__.__name__}.p') else: self.save_path = save_path curr_operated_documents = self._clean_helper(operate_documents, [step]) if self.use_mpi(): self._mpi_save_chunk_to_disk(curr_operated_documents, self.rank, is_clean=True, custom_fn=f'{step.__class__.__name__}') self.comm.Barrier() curr_operated_documents = self._mpi_combine(custom_fn=f'{step.__class__.__name__}') if self.save_path is not None: self._save_documents(dict(curr_operated_documents)) else: all_results.append((f'{step.__class__.__name__}', dict(curr_operated_documents))) if self.save_path is None: return all_results
[docs] def clean(self, documents, steps=None, substitutions=None, save_path=None): self.save_path = save_path if steps is None: steps = self.DEFAULT_PIPELINE.copy() for step in steps: if "module_type" in vars(step): assert step.module_type == "CLEANER", "This method can only be used with a CLEANER type module." if substitutions is not None: assert isinstance(substitutions, dict), '`substitutions` must be a dict!' initial_sub = SubstitutionCleaner(substitutions, permute=True, lower=True, lemmatize=True) final_sub = SubstitutionCleaner(substitutions, permute=False, lower=False, lemmatize=True) steps = [initial_sub] + steps + [final_sub] # transform documents into list of tuples clean_documents = list(documents.items()) if self.verbose and self.rank == 0: print(f'[Vulture]: Cleaning {len(clean_documents)} documents', file=sys.stderr) # prepare for MPI by chunking data and saving chunks (assuming DFS) if self.use_mpi(): self._mpi_init(clean_documents) self.comm.Barrier() clean_documents = self._mpi_load_chunk_from_disk(self.rank, is_clean=False) # perform cleaning clean_documents = self._clean_helper(clean_documents, steps) if self.use_mpi(): self._mpi_save_chunk_to_disk(clean_documents, self.rank, is_clean=True) self.comm.Barrier() clean_documents = self._mpi_combine() # save the clean results or return them if self.save_path is not None: self._save_documents(dict(clean_documents)) else: return dict(clean_documents)
[docs] def clean_dataframe(self, df, columns, steps=None, substitutions=None, append_to_original_df=False, concat_cleaned_cols=False): if not df.index.is_unique: # DataFrame must have unique indices for mapping raise ValueError("DataFrame does not contain unique indices!") if not all(col in df.columns for col in columns): # make sure columns exist raise ValueError("One or more columns are invalid!") if steps is None: steps = self.DEFAULT_PIPELINE.copy() for step in steps: if "module_type" in vars(step): assert step.module_type == "CLEANER", "This method can only be used with a CLEANER type module." # make a copy of the DataFrame to prevent changing original and fill nans with empty strings if append_to_original_df: df = df.copy() else: df = df[columns].copy() df[columns] = df[columns].fillna('') # validate that all columns contain strings if not all(df[col].apply(lambda x: isinstance(x, str)).all() for col in columns): raise ValueError("One or more columns do not contain strings!") # prepare the document dictionaries documents = None if concat_cleaned_cols: col_name = f'clean_{"_".join(columns)}' documents = {col_name: df.apply(lambda x: '. '.join(filter(None, x[columns])), axis=1).to_dict()} else: documents = {f'clean_{col}': df[col].to_dict() for col in columns} # clean clean_documents = {} for col, docs in documents.items(): clean_docs = self.clean(docs, steps=steps, substitutions=substitutions) clean_documents[col] = clean_docs # add the clean data back to the DataFrame for col, clean_docs in clean_documents.items(): df[col] = df.index.map(clean_docs) df[col].replace('', np.nan, inplace=True) # set null entries to None return df
def _mpi_init(self, documents): if self.rank == 0: for idx, chunk in enumerate(chunk_tuple_list(documents, self.n_nodes)) : self._mpi_save_chunk_to_disk(chunk, idx, is_clean=False) def _mpi_get_name(self, rank, is_clean, custom_fn=None): if not custom_fn: if is_clean: return f'vulture_{self.unique_id}_{rank}_clean.p' else: return f'vulture_{self.unique_id}_{rank}.p' else: return f'vulture_{self.unique_id}_{rank}_{custom_fn}.p' def _mpi_save_chunk_to_disk(self, data, rank, *, is_clean, custom_fn=None): fn = self._mpi_get_name(rank, is_clean, custom_fn) with open(os.path.join(self.cache, fn), 'wb') as fh: pickle.dump(data, fh) def _mpi_load_chunk_from_disk(self, rank, *, is_clean, custom_fn=None): fn = self._mpi_get_name(rank, is_clean, custom_fn=custom_fn) with open(os.path.join(self.cache, fn), 'rb') as fh: return pickle.load(fh) def _mpi_combine(self, custom_fn=None): clean_documents = [] for rank in range(self.n_nodes): clean_documents += self._mpi_load_chunk_from_disk(rank, is_clean=True, custom_fn=custom_fn) return clean_documents def _save_documents(self, documents): with open(self.save_path, 'wb') as fh: pickle.dump(documents, fh) def _clean_helper(self, clean_documents, steps): frozen = set() if self.n_jobs == 1: for cleaner in tqdm(steps, total=len(steps), disable=not self.verbose): if self.verbose and self.rank == 0: print(f'[Vulture]: Running {cleaner.__class__.__name__} module', file=sys.stderr) frozen |= cleaner.frozen cleaner.frozen = frozen clean_documents = [cleaner(doc) for doc in tqdm(clean_documents, total=len(clean_documents), disable = self.verbose < 10)] else: clean_documents = self._parallel_helper(clean_documents, steps) return clean_documents def _parallel_helper(self, documents, cleaners): """ Helper function to run processing of given documents in parallel Parameters ---------- documents : dict Dictionary of documents to clean. In this dictionary, keys are the unique document identifiers, and values are the text to clean. parameters : dict Parameters of the function to use for processing, except documents. function : callable Processing function to call. Returns ------- dict processed docuements, where keys are the document IDs and values are the text. """ with multiprocessing.Manager() as manager: # split the documents into chunks total_length = len(documents) n_jobs = min(total_length, self.n_jobs) shared_documents = manager.list(documents) chunk_size = total_length // n_jobs # run processing in parallel with persitent processes with parallel_backend(n_jobs=n_jobs, verbose=self.verbose, backend=self.parallel_backend): frozen = set() for cleaner in tqdm(cleaners, disable=not self.verbose): if self.verbose and self.rank == 0: print(f'[Vulture]: Running {cleaner.__class__.__name__} module', file=sys.stderr) frozen |= cleaner.frozen cleaner.frozen = frozen Parallel()( delayed(self._parallel_worker)(shared_documents, i, min(i + chunk_size, total_length), cleaner) for i in range(0, total_length, chunk_size) ) return list(shared_documents) @staticmethod def _parallel_worker(documents, chunk_start, chunk_end, cleaner): for i in range(chunk_start, chunk_end): documents[i] = cleaner(documents[i])
[docs] def use_mpi(self): return 1 < self.n_nodes
### GETTERS / SETTERS @property def n_jobs(self): return self._n_jobs @property def n_nodes(self): return self._n_nodes @property def parallel_backend(self): return self._parallel_backend @property def cache(self): return self._cache @property def save_path(self): return self._save_path @property def verbose(self): return self._verbose @n_jobs.setter def n_jobs(self, n_jobs): if not isinstance(n_jobs, int): raise TypeError('n_jobs must be an int!') cpu_count = multiprocessing.cpu_count() limit = cpu_count + n_jobs if (n_jobs == 0) or (limit < 0) or (2 * cpu_count < limit): raise ValueError(f'n_jobs must take a value on [-{cpu_count}, -1] or [1, {cpu_count}]!') if n_jobs < 0: self._n_jobs = cpu_count - abs(n_jobs) + 1 else: self._n_jobs = n_jobs @n_nodes.setter def n_nodes(self, n_nodes): if not isinstance(n_nodes, int): raise TypeError('n_nodes must be an int!') if 1 > n_nodes: raise ValueError('n_nodes must be greater than 0!') if 1 < n_nodes and MPI is None: raise ImportError('Multiple nodes requested but MPI is not available') self._n_nodes = n_nodes @parallel_backend.setter def parallel_backend(self, parallel_backend): if not isinstance(parallel_backend, str): raise TypeError('parallel_backend must be an str!') if parallel_backend not in self.PARALLEL_BACKEND_OPTIONS: raise ValueError(f'{parallel_backend} is not a valid parallel_backend option!') self._parallel_backend = parallel_backend @cache.setter def cache(self, cache): if not isinstance(cache, (str, pathlib.Path)): raise TypeError(f'`cache` must either be a str or a pathlib.Path object!') if isinstance(cache, str): cache = pathlib.Path(cache) if not cache.exists(): try: cache.mkdir(parents=True, exist_ok=True) except Exception as e: raise ValueError(f'Failed to create cache directory "{cache}": {str(e)}') self._cache = cache @save_path.setter def save_path(self, save_path): if save_path is None: self._save_path = None return if not isinstance(save_path, (str, pathlib.Path)): raise TypeError(f'`cache` must either be a str or a pathlib.Path object!') if isinstance(save_path, str): save_path = pathlib.Path(save_path) if not save_path.parent.exists(): raise ValueError(f'The `save_path` directory "{save_path.parent}" does not exist!') if save_path.is_dir(): raise ValueError(f'The `save_path` "{save_path}" is a directory!') if save_path.exists(): warnings.warn(f'The file "{save_path}" already exists and will be overwritten!') self._save_path = save_path @verbose.setter def verbose(self, verbose): if isinstance(verbose, bool): self._verbose = int(verbose) # convert False to 0, True to 1 elif isinstance(verbose, int): if verbose < 0: raise ValueError("Integer values for verbose must be non-negative!") self._verbose = verbose else: raise TypeError("verbose should be of type bool or int!")