Source code for TELF.factorization.utilities.co_occurance_matrix

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Nov 22 18:38:26 2021

@author: maksimekineren
"""

from collections import defaultdict
from tqdm import tqdm
import numpy as np
import sparse


[docs] def co_occurrence(documents, vocabulary, window_size=20, dense=True, verbose=True, sentences=False): """ Forms co-occurance matrix. Parameters ---------- documents: list List of documents. Each entry in the list contains text. if sentences=True, then documents is a list of lists, where each entry in documents is a list of sentences. window_size: int, optional Number of consecutive words to perform counting. If sentences based analysis used, window size indicate the number of sentences. vocabulary: list List of unqiue words present in the all documents as vocabulary. dense: bool, optional If True, dense Numpy array is build. If False, Sparse CSR matrix is build. verbose: bool, optional Print progress or not. sentences: bool, optional If True, documents are list of lists, where each entry in documents is a list of sentences from that document. In this case, window size is used as number of sentences, and the matrix is build based on the sentences. When False, window size is used, and documents is a list of documents. Returns ------- M: np.ndarray or sparse CSR matrix Co-occurance matrix. """ d = defaultdict(int) # turn the vocab into a hashmap for O(1) lookup V_map = dict() for ii, word in enumerate(vocabulary): V_map[word] = ii for document in tqdm(documents, total=len(documents), disable=not (verbose)): # document sentence based analysis if sentences: curr_sentences = document curr_sentences_combined = [] tmp = [] # combine sentences based on window size for sent_idx, sent in enumerate(curr_sentences): tmp.append(sent) if (len(tmp) == window_size) or (sent_idx + 1 == len(curr_sentences)): curr_sentences_combined.append(" ".join(tmp)) tmp = [] # iterate over sentences for sent in curr_sentences_combined: tokens = sent.split(" ") # iterate over words for ii in range(len(tokens)): token=tokens[ii] next_token = tokens[ii + 1 : len(tokens)] for t in next_token: key = tuple(sorted([t, token])) d[key] += 1 # document window size based analysis else: tokens = document.split(" ") # iterate over words for ii in range(len(tokens)): token = tokens[ii] next_token = tokens[ii + 1 : min(ii + 1 + window_size, len(tokens))] for t in next_token: key = tuple(sorted([t, token])) d[key] += 1 cols = list() rows = list() values = list() M = list() if dense: M = np.zeros((len(V_map), len(V_map)), dtype=np.int32) for key, value in tqdm(d.items(), disable=not (verbose)): try: r = V_map[key[0]] c = V_map[key[1]] except Exception: continue # if diogonal if c == r: value = value * 2 # sparse if not dense: rows.append(r) cols.append(c) values.append(value) # non diogonal if r != c: rows.append(c) cols.append(r) values.append(value) # dense elif dense: M[r, c] = value # non diogonal if r != c: M[c, r] = value # sparse matrix from coordinate and values if not dense: coords = np.vstack([np.array(rows, dtype="int"), np.array(cols, dtype="int")]) M = sparse.COO(coords, values, shape=(len(V_map), len(V_map))) M = M.tocsr() return M