from __future__ import annotations
# TELF/pre_processing/Orca/orca.py
import os
import ast
import copy
import pickle
import warnings
import pandas as pd
import networkx as nx
from tqdm import tqdm
# Minimal no-op AuthorMatcher fallback
# Path 1 (temporary to match your current import resolution):
# TELF/pipeline/blocks/AuthorMatcher.py
# Path 2 (correct long-term location):
# TELF/pre_processing/Orca/AuthorMatcher.py
import pandas as pd
from typing import Dict, Iterable, List, Tuple, Union
[docs]
class AuthorMatcher:
"""
Fallback stub used when the real AuthorMatcher isn't available.
Produces an empty matches DataFrame (or builds rows from known_matches if you provide them).
This is enough for Orca to proceed via the Scopus-only/S2-only residual logic.
"""
def __init__(self, df: pd.DataFrame, n_jobs: int = -1, verbose: bool = False):
self.df = df
self.n_jobs = n_jobs
self.verbose = verbose
[docs]
def match(self, known_matches: Dict[str, Union[str, Iterable[str]]] | None = None) -> pd.DataFrame:
cols = ["S2_Author_ID", "S2_Author_Name", "SCOPUS_Author_ID"]
if not known_matches:
# No matches known → return empty, Orca will handle residual mapping.
return pd.DataFrame(columns=cols)
# Build a quick s2_id -> name map if available
s2_name_map: Dict[str, str] = {}
try:
if {"s2_author_ids", "s2_authors"}.issubset(self.df.columns):
tmp = self.df[["s2_author_ids", "s2_authors"]].dropna(how="any")
for ids, names in zip(tmp["s2_author_ids"], tmp["s2_authors"]):
ids = str(ids).split(";")
names = str(names).split(";")
for i, n in zip(ids, names):
s2_name_map.setdefault(i, n)
except Exception:
pass
rows: List[Dict[str, str]] = []
for k, vs in known_matches.items():
if not isinstance(vs, (list, tuple, set)):
vs = [vs]
for v in vs:
k_str, v_str = str(k), str(v)
# Heuristics to assign which side is S2 vs Scopus (best effort)
k_in_s2 = k_str in s2_name_map
v_in_s2 = v_str in s2_name_map
if k_in_s2 and not v_in_s2:
s2_id, scopus_id = k_str, v_str
elif v_in_s2 and not k_in_s2:
s2_id, scopus_id = v_str, k_str
else:
# Ambiguous → skip quietly
continue
rows.append(
{
"S2_Author_ID": s2_id,
"S2_Author_Name": s2_name_map.get(s2_id, "Unknown"),
"SCOPUS_Author_ID": scopus_id,
}
)
return pd.DataFrame(rows, columns=cols)
[docs]
class Orca:
"""
Construct SLIC author ids + apply them to a SLIC-style paper dataframe.
"""
def __init__(self, duplicates=None, s2_duplicates=None, verbose=False):
self.slic_df = None
self.duplicates = duplicates
self.s2_duplicates = s2_duplicates
self.verbose = verbose
# ─────────────────────────────────────────────────────────────────────────
# Public API
# ─────────────────────────────────────────────────────────────────────────
[docs]
def run(self, df, scopus_duplicates=None, s2_duplicates=None, known_matches=None, n_jobs=-1):
"""
Form the SLIC map from Scopus-only, S2-only, or hybrid dataframes.
"""
if scopus_duplicates is not None:
self.duplicates = scopus_duplicates
if s2_duplicates is not None:
self.s2_duplicates = s2_duplicates
has_scopus = {"author_ids", "authors"}.issubset(df.columns)
has_s2 = {"s2_author_ids", "s2_authors"}.issubset(df.columns)
if has_scopus and not has_s2:
self.slic_df = self._run_scopus(df)
return self.slic_df
if has_s2 and not has_scopus:
self.slic_df = self._run_s2_only(df)
return self.slic_df
if not (has_scopus or has_s2):
raise ValueError(
"Orca.run(): DataFrame must contain Scopus ('author_ids','authors') "
"or S2 ('s2_author_ids','s2_authors') columns."
)
# Hybrid path
affiliations_map = self.__generate_affiliations_map(df)
s2_author_map = self.__generate_author_map(df, "s2_author_ids", "s2_authors")
scopus_author_map = self.__generate_author_map(df, "author_ids", "authors")
known_matches = {} if not known_matches else known_matches
am = AuthorMatcher(df, n_jobs=n_jobs, verbose=self.verbose)
am_df = am.match(known_matches=known_matches)
# Enrich with known Scopus duplicates
am_enriched = self.__add_scopus_duplicates(am_df, self.duplicates)
am_df = pd.concat([am_df, am_enriched], axis=0, ignore_index=True)
# Build components across S2/Scopus ids
slic_count = 0
seen_s2, seen_scopus = set(), set()
matches = self.__uncouple_author_matches(am_df, self.s2_duplicates)
slic_df = {
"slic_id": [],
"slic_name": [],
"scopus_ids": [],
"scopus_names": [],
"scopus_affiliations": [],
"s2_ids": [],
"s2_names": [],
}
# 1) Matched S2<->Scopus groups
for entry in matches:
s2_id_set = entry["s2"]
s2_name_set = {s2_author_map.get(x, "Unknown") for x in s2_id_set if x in s2_author_map}
scopus_id_set = entry["scopus"]
scopus_name_set = {scopus_author_map[x] for x in scopus_id_set if x in scopus_author_map}
scopus_affiliations = self.__merge_scopus_affiliations(scopus_id_set, affiliations_map)
slic_name = entry["name"]
seen_s2 |= s2_id_set
seen_scopus |= scopus_id_set
slic_df["slic_id"].append(f"S{slic_count}")
slic_df["slic_name"].append(slic_name)
slic_df["scopus_ids"].append(";".join(sorted(scopus_id_set)))
slic_df["scopus_names"].append(";".join(sorted(scopus_name_set)))
slic_df["scopus_affiliations"].append(scopus_affiliations)
slic_df["s2_ids"].append(";".join(sorted(s2_id_set)))
slic_df["s2_names"].append(";".join(sorted(s2_name_set)))
slic_count += 1
# 2) Scopus-only residuals
if "author_ids" in df.columns:
df_scopus_authors = {x for y in df.author_ids.to_list() if not pd.isna(y) for x in y.split(";")}
df_scopus_authors -= seen_scopus
for scopus_id in sorted(df_scopus_authors):
scopus_name = scopus_author_map.get(scopus_id, None)
slic_df["slic_id"].append(f"S{slic_count}")
slic_df["slic_name"].append(scopus_name)
slic_df["scopus_ids"].append(scopus_id)
slic_df["scopus_names"].append(scopus_name)
slic_df["scopus_affiliations"].append(affiliations_map.get(scopus_id, None))
slic_df["s2_ids"].append(None)
slic_df["s2_names"].append(None)
slic_count += 1
# 3) S2-only residuals
if "s2_author_ids" in df.columns:
df_s2_authors = {x for y in df.s2_author_ids.to_list() if not pd.isna(y) for x in y.split(";")}
df_s2_authors -= seen_s2
for s2_id in sorted(df_s2_authors):
slic_df["slic_id"].append(f"S{slic_count}")
slic_df["scopus_ids"].append(None)
slic_df["scopus_names"].append(None)
slic_df["scopus_affiliations"].append(None)
s2_dup_ids = self.s2_duplicates.get(s2_id)
if s2_dup_ids is not None:
s2_ids_all = {s2_id} | set(s2_dup_ids)
s2_author_map_local = {x: s2_author_map.get(x, "Unknown") for x in s2_ids_all}
s2_name_set = {v for v in s2_author_map_local.values() if v != "Unknown"}
slic_name = max(s2_name_set, key=len) if s2_name_set else None
slic_df["slic_name"].append(slic_name)
slic_df["s2_ids"].append(";".join(sorted(s2_ids_all)) if s2_ids_all else None)
slic_df["s2_names"].append(";".join(sorted(s2_name_set)) if s2_name_set else None)
seen_s2 |= s2_ids_all
else:
s2_name = s2_author_map.get(s2_id, None)
slic_df["slic_name"].append(s2_name)
slic_df["s2_ids"].append(s2_id)
slic_df["s2_names"].append(s2_name)
seen_s2.add(s2_id)
slic_count += 1
slic_df = pd.DataFrame.from_dict(slic_df)
slic_df = slic_df.loc[slic_df.slic_name != "Unknown"].copy().reset_index(drop=True)
self.slic_df = slic_df
return slic_df
[docs]
def apply(self, df, slic_df=None):
"""
Apply the SLIC id mapping to a SLIC papers dataframe.
Keeps papers even when SLIC author ids are missing (warns only).
"""
if slic_df is None and self.slic_df is None:
return ValueError("No SLIC ID map found. First, compute the map with Orca.run()")
if slic_df is not None and self.slic_df is not None:
warnings.warn(
"[Orca]: slic_df was passed as an argument however this Orca object already has a "
"stored slic_df object.\n\t\tOverwriting stored slic_df with given argument. If this "
"message is unexpected, use Orca.apply() without specifying `slic_df`",
RuntimeWarning,
)
if slic_df is not None:
self.slic_df = slic_df
# Uniqueness guards
if "eid" in df.columns and df.eid.nunique() != len(df.loc[~df.eid.isnull()]):
df = df[~df["eid"].duplicated(keep="first") | df["eid"].isna()].copy()
warnings.warn("[Orca]: Encountered duplicate Scopus IDs (`eid`) in df. Dropping duplicate papers.")
if "s2id" in df.columns and df.s2id.nunique() != len(df.loc[~df.s2id.isnull()]):
df = df[~df["s2id"].duplicated(keep="first") | df["s2id"].isna()].copy()
warnings.warn("[Orca]: Encountered duplicate S2 IDs (`s2id`) in df. Dropping duplicate papers.")
# Compute per-source SLIC ids
if "s2id" in df.columns and "eid" in df.columns:
scopus_df = self.__compute_slic_scopus(df)
s2_df = self.__compute_slic_s2(df)
df2 = pd.merge(df, scopus_df, on="eid", how="outer")
df3 = pd.merge(df2, s2_df, on="s2id", how="outer")
orca_df = df3.copy()
# unify slic_author_ids
orca_df["slic_author_ids"] = orca_df["slic_author_ids_x"].combine_first(orca_df["slic_author_ids_y"])
orca_df = orca_df.drop(columns=["slic_author_ids_x", "slic_author_ids_y"])
# unify slic_affiliations if present as suffixes
if "slic_affiliations_x" in orca_df.columns or "slic_affiliations_y" in orca_df.columns:
left = orca_df.get("slic_affiliations_x")
right = orca_df.get("slic_affiliations_y")
if left is not None and right is not None:
orca_df["slic_affiliations"] = left.combine_first(right)
orca_df.drop(
columns=[c for c in ["slic_affiliations_x", "slic_affiliations_y"] if c in orca_df.columns],
inplace=True,
)
elif left is not None:
orca_df.rename(columns={"slic_affiliations_x": "slic_affiliations"}, inplace=True)
else:
orca_df.rename(columns={"slic_affiliations_y": "slic_affiliations"}, inplace=True)
elif "s2id" in df.columns:
s2_df = self.__compute_slic_s2(df)
orca_df = pd.merge(df, s2_df, on="s2id", how="outer")
if "slic_affiliations" not in orca_df.columns:
orca_df["slic_affiliations"] = None
else:
scopus_df = self.__compute_slic_scopus(df)
orca_df = pd.merge(df, scopus_df, on="eid", how="outer")
if "slic_affiliations_x" in orca_df.columns:
orca_df.rename(columns={"slic_affiliations_x": "slic_affiliations"}, inplace=True)
if "slic_affiliations_y" in orca_df.columns:
orca_df.drop(columns=["slic_affiliations_y"], inplace=True)
# Keep papers with missing SLIC ids (warn only)
if "slic_author_ids" in orca_df.columns:
missing = int(orca_df["slic_author_ids"].isna().sum())
if missing:
warnings.warn(f"[Orca]: {missing} papers have no SLIC author IDs (S2-only or unmatched). Keeping them.")
if "slic_affiliations" not in orca_df.columns:
orca_df["slic_affiliations"] = None
# Add SLIC author names
slic_authors = {k: v for k, v in zip(self.slic_df.slic_id.to_list(), self.slic_df.slic_name.to_list())}
def map_ids_to_names(ids):
if pd.isna(ids):
return None
names = [slic_authors.get(str(i), "") for i in ids.split(";")]
names = [name for name in names if name]
return ";".join(names)
orca_df["slic_authors"] = orca_df["slic_author_ids"].apply(map_ids_to_names)
return orca_df.reset_index(drop=True)
# ─────────────────────────────────────────────────────────────────────────
# Internals
# ─────────────────────────────────────────────────────────────────────────
def _run_scopus(self, df):
df = df.copy()
for col in ("author_ids", "authors"):
if col not in df.columns:
df[col] = pd.NA
affiliations_map = self.__generate_affiliations_map(df)
scopus_author_map = self.__generate_author_map(df, "author_ids", "authors")
duplicates = {}
for entry in self.duplicates:
if not (set(scopus_author_map.keys()) & set(entry)):
continue
entry = entry.copy()
best_id = sorted(entry, key=lambda x: len(scopus_author_map.get(x, "")), reverse=True)[0]
merged_affiliations = self.__merge_scopus_affiliations(entry, affiliations_map)
affiliations_map[best_id] = merged_affiliations
entry.remove(best_id)
for x in entry:
if x in scopus_author_map:
del scopus_author_map[x]
if x in affiliations_map:
del affiliations_map[x]
duplicates[best_id] = list(entry)
slic_df = {
"slic_id": [],
"slic_name": [],
"scopus_ids": [],
"scopus_names": [],
"scopus_affiliations": [],
"s2_ids": [],
"s2_names": [],
}
for i, scopus_id in enumerate(scopus_author_map):
scopus_name = scopus_author_map.get(scopus_id)
scopus_affiliations = affiliations_map.get(scopus_id)
if scopus_id in duplicates:
scopus_id = ";".join([scopus_id] + duplicates[scopus_id])
slic_df["slic_id"].append(f"S{i}")
slic_df["slic_name"].append(scopus_name)
slic_df["scopus_ids"].append(scopus_id)
slic_df["scopus_names"].append(scopus_name)
slic_df["scopus_affiliations"].append(scopus_affiliations)
slic_df["s2_ids"].append(None)
slic_df["s2_names"].append(None)
return pd.DataFrame.from_dict(slic_df)
def _run_s2_only(self, df):
df = df.copy()
for col in ("s2_author_ids", "s2_authors"):
if col not in df.columns:
df[col] = pd.NA
s2_author_map = self.__generate_author_map(df, "s2_author_ids", "s2_authors")
visited = set()
groups = []
for s2_id in s2_author_map.keys():
if s2_id in visited:
continue
group = {s2_id}
if s2_id in self.s2_duplicates:
group |= set(self.s2_duplicates[s2_id])
visited |= group
groups.append(group)
for root, dups in self.s2_duplicates.items():
if root not in visited:
group = {root} | set(dups)
visited |= group
groups.append(group)
rows = {
"slic_id": [],
"slic_name": [],
"scopus_ids": [],
"scopus_names": [],
"scopus_affiliations": [],
"s2_ids": [],
"s2_names": [],
}
idx = 0
seen = set()
for g in groups:
name_set = {s2_author_map.get(x, "Unknown") for x in g if x in s2_author_map}
if name_set == {"Unknown"}:
name_set = set()
slic_name = max(name_set, key=len) if name_set else None
rows["slic_id"].append(f"S{idx}")
rows["slic_name"].append(slic_name)
rows["scopus_ids"].append(None)
rows["scopus_names"].append(None)
rows["scopus_affiliations"].append(None)
rows["s2_ids"].append(";".join(sorted(g)))
rows["s2_names"].append(";".join(sorted(name_set)) if name_set else None)
seen |= g
idx += 1
for s2_id, s2_name in s2_author_map.items():
if s2_id in seen:
continue
rows["slic_id"].append(f"S{idx}")
rows["slic_name"].append(s2_name if s2_name != "Unknown" else None)
rows["scopus_ids"].append(None)
rows["scopus_names"].append(None)
rows["scopus_affiliations"].append(None)
rows["s2_ids"].append(s2_id)
rows["s2_names"].append(s2_name)
idx += 1
slic_df = pd.DataFrame.from_dict(rows)
slic_df = slic_df.loc[slic_df.slic_name.notna()].reset_index(drop=True)
return slic_df
# ─────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────
def __add_scopus_duplicates(self, auth_df, duplicates):
out_df = pd.DataFrame(columns=auth_df.columns)
all_df_ids = set(auth_df.SCOPUS_Author_ID.to_list())
if self.verbose:
print("[Orca]: Scanning for Scopus duplicates in dataset. . .")
for scopus_ids in tqdm(duplicates, total=len(duplicates), disable=not self.verbose):
if not scopus_ids & all_df_ids:
continue
tmp_df = auth_df.loc[auth_df.SCOPUS_Author_ID.isin(scopus_ids)]
if len(tmp_df.SCOPUS_Author_ID.unique()) > 1:
row = tmp_df.iloc[0].copy()
for scopus_id in tmp_df.SCOPUS_Author_ID.unique():
if row.SCOPUS_Author_ID == scopus_id:
continue
new_row = row.copy()
new_row.SCOPUS_Author_ID = scopus_id
out_df = pd.concat([out_df, new_row.to_frame().T], ignore_index=True)
return out_df
def __add_s2_duplicates(self, duplicates):
out = {}
seen = set()
for s in duplicates:
if any(elem in seen for elem in s):
raise ValueError(
"Detected multiple entries for s2 duplicates across sets. "
"Make sure that all known duplicates are constrained to a single set"
)
seen.update(s)
if len(s) == 1:
continue
for element in s:
out[element] = [x for x in s if x != element]
return out
def __propagate_duplicates(self, a_map, a_duplicates):
a_map_update = {}
for a_id, b_ids in a_map.items():
if a_id in a_duplicates:
for dup_a_id in a_duplicates[a_id]:
a_map_update.setdefault(dup_a_id, set()).update(set(b_ids))
for dup_a_id in a_map_update:
a_map_update[dup_a_id] = list(a_map_update[dup_a_id])
a_map.update(a_map_update)
def __uncouple_author_matches(self, auth_df, s2_duplicates):
s2_map = auth_df.groupby("S2_Author_ID")["SCOPUS_Author_ID"].agg(set).to_dict()
scopus_map = auth_df.groupby("SCOPUS_Author_ID")["S2_Author_ID"].agg(set).to_dict()
self.__propagate_duplicates(s2_map, s2_duplicates)
s2_map = {f"B_{k}": {f"A_{x}"} if isinstance(v, str) else {f"A_{x}" for x in v} for k, v in s2_map.items()}
scopus_map = {f"A_{k}": {f"B_{x}"} if isinstance(v, str) else {f"B_{x}" for x in v} for k, v in scopus_map.items()}
s2_name_map = auth_df.groupby("S2_Author_ID")["S2_Author_Name"].agg(lambda x: max(x, key=len)).to_dict()
G = nx.DiGraph()
for k, v_set in scopus_map.items():
for v in v_set:
G.add_edge(k, v)
for k, v_set in s2_map.items():
for v in v_set:
G.add_edge(k, v)
components = list(nx.weakly_connected_components(G))
matches = []
for component_set in components:
mdict = {"scopus": set(), "s2": set()}
for c in component_set:
(mdict["scopus"] if c.startswith("A_") else mdict["s2"]).add(c[2:])
str_gen = ((pid, s2_name_map[pid]) for pid in mdict["s2"] if pid in s2_name_map)
_, name = max(str_gen, key=lambda x: len(x[1]), default=(None, "Unknown"))
mdict["name"] = name
matches.append(mdict)
return matches
def __generate_author_map(self, df, id_col, name_col):
if self.verbose:
print(f"[Orca]: Generating {id_col}-{name_col} map. . .")
if id_col not in df.columns or name_col not in df.columns:
return {}
auth_map = {}
tmp = df[[id_col, name_col]].dropna(how="any")
if tmp.empty:
return {}
for id_list, auth_list in tqdm(
zip(tmp[id_col].to_list(), tmp[name_col].to_list()), total=len(tmp), disable=not self.verbose
):
if not isinstance(id_list, str) or not isinstance(auth_list, str):
continue
for auth_id, name in zip(id_list.split(";"), auth_list.split(";")):
if auth_id and auth_id not in auth_map:
auth_map[auth_id] = name
return auth_map
def __compute_slic_scopus(self, df):
tmp_df = df.loc[~df["eid"].isnull()]
scopus_authors, scopus_affiliations = {}, {}
for eid, author_ids, affiliations in zip(
tmp_df["eid"].to_list(), tmp_df["author_ids"].to_list(), tmp_df["affiliations"].to_list()
):
if not pd.isna(author_ids):
scopus_authors[eid] = author_ids
if not pd.isna(affiliations):
if isinstance(affiliations, str):
affiliations = ast.literal_eval(affiliations)
scopus_affiliations[eid] = affiliations
scopus_df = {"eid": [], "slic_author_ids": [], "slic_affiliations": []}
scopus_to_slic = {
x: k
for k, v in zip(self.slic_df.slic_id.to_list(), self.slic_df.scopus_ids.to_list())
if not pd.isna(v)
for x in v.split(";")
}
missing_authors = set()
for eid in tmp_df.eid.to_list():
slic_author_ids = []
author_ids = scopus_authors.get(eid)
if author_ids is not None:
for scopus_id in author_ids.split(";"):
scopus_id = str(scopus_id)
slic_id = scopus_to_slic.get(scopus_id)
if slic_id is None:
missing_authors.add(scopus_id)
else:
slic_author_ids.append(str(slic_id))
aff_dict, del_dict = {}, []
affiliations = scopus_affiliations.get(eid)
if affiliations is not None:
for aff_id, aff_info_shallow in affiliations.items():
if isinstance(aff_info_shallow, list):
continue
del_list = []
aff_info = copy.deepcopy(aff_info_shallow)
for i in range(len(aff_info["authors"])):
scopus_id = str(aff_info["authors"][i])
if scopus_id not in scopus_to_slic:
del_list.append(scopus_id)
missing_authors.add(scopus_id)
else:
aff_info["authors"][i] = scopus_to_slic[scopus_id]
for d in del_list:
for cand in (d, str(d)):
if cand in aff_info["authors"]:
aff_info["authors"].remove(cand)
break
if not aff_info["authors"]:
del_dict.append(aff_id)
aff_dict[aff_id] = aff_info
for d in del_dict:
del aff_dict[d]
scopus_df["eid"].append(eid)
scopus_df["slic_author_ids"].append(";".join(slic_author_ids) if slic_author_ids else None)
scopus_df["slic_affiliations"].append(aff_dict if aff_dict else None)
if len(missing_authors) > 0:
warnings.warn(
f"[Orca]: {len(missing_authors)} Scopus IDs did not have corresponding SLIC ID and were removed"
)
return pd.DataFrame.from_dict(scopus_df)
def __compute_slic_s2(self, df):
tmp_df = df.loc[~df["s2id"].isnull()]
s2_to_slic = {
x: k
for k, v in zip(self.slic_df.slic_id.to_list(), self.slic_df.s2_ids.to_list())
if not pd.isna(v)
for x in v.split(";")
}
s2_df = {"s2id": [], "slic_author_ids": []}
missing_authors = set()
for s2id, s2_author_ids in zip(tmp_df["s2id"].to_list(), tmp_df["s2_author_ids"].to_list()):
slic_author_ids = []
if not pd.isna(s2_author_ids):
for s2_auth_id in s2_author_ids.split(";"):
if s2_auth_id not in s2_to_slic:
missing_authors.add(s2_auth_id)
else:
slic_author_ids.append(s2_to_slic[s2_auth_id])
s2_df["s2id"].append(s2id)
s2_df["slic_author_ids"].append(";".join(slic_author_ids) if slic_author_ids else None)
if len(missing_authors) > 0:
warnings.warn(
f"[Orca]: {len(missing_authors)} S2 IDs did not have corresponding SLIC ID and were removed"
)
return pd.DataFrame.from_dict(s2_df)
def __load_pickle(self, fn):
current_dir = os.path.dirname(os.path.abspath(__file__))
return pickle.load(open((os.path.join(current_dir, "data", fn)), "rb"))
def __generate_affiliations_map(self, df):
"""
Build a map SCOPUS_AUTHOR_ID -> affiliation info with paper provenance.
Paper id priority: eid > s2id > doi > synthetic row id.
"""
import pandas as pd, ast
if "affiliations" not in df.columns or "year" not in df.columns:
return {}
id_priority = ("eid", "s2id", "doi")
pid_col = next((c for c in id_priority if c in df.columns), None)
if pid_col is None:
df = df.copy()
df["_orca_row_id"] = [f"row_{i}" for i in range(len(df))]
pid_col = "_orca_row_id"
affiliations_map = {}
pid_series = df[pid_col].astype(object).where(pd.notna(df[pid_col]), None)
year_series = df["year"]
aff_series = df["affiliations"]
for pid, year, affiliations in zip(pid_series.tolist(), year_series.tolist(), aff_series.tolist()):
if pd.isna(affiliations):
continue
if isinstance(affiliations, str):
affiliations = ast.literal_eval(affiliations)
year = 0 if pd.isna(year) else int(year)
paper_id = None if pid is None else str(pid)
for aff_id, info in affiliations.items():
if isinstance(info, list):
continue
aff_name = info.get("name", "Unknown")
aff_country = info.get("country", "Unknown")
for auth_id in info.get("authors", []):
if auth_id not in affiliations_map:
affiliations_map[auth_id] = {}
if aff_id not in affiliations_map[auth_id]:
affiliations_map[auth_id][aff_id] = {
"name": aff_name,
"country": aff_country,
"first_seen": year,
"last_seen": year,
"papers": set(),
}
entry = affiliations_map[auth_id][aff_id]
if paper_id is not None:
entry["papers"].add(paper_id)
if entry["first_seen"] in (0, None) or (year and year < entry["first_seen"]):
entry["first_seen"] = year
if entry["last_seen"] in (0, None) or (year and year > entry["last_seen"]):
entry["last_seen"] = year
for auth_id in affiliations_map:
for aff_id, aff_info in affiliations_map[auth_id].items():
aff_info["papers"] = list(aff_info["papers"])
if not aff_info["first_seen"]:
aff_info["first_seen"] = None
if not aff_info["last_seen"]:
aff_info["last_seen"] = None
return affiliations_map
def __merge_scopus_affiliations(self, scopus_ids, data):
merged = {}
if not data or not scopus_ids:
return None
all_keys = set()
for sid in scopus_ids:
if sid in data:
all_keys.update(data[sid].keys())
for key in all_keys:
items = [data[sid][key] for sid in scopus_ids if sid in data and key in data[sid]]
if not items:
continue
names = [it.get("name", "Unknown") for it in items if it.get("name") not in (None, "Unknown")]
countries = [it.get("country", "Unknown") for it in items if it.get("country") not in (None, "Unknown")]
paper_sets = []
for it in items:
p = it.get("papers", [])
if isinstance(p, set):
paper_sets.append(p)
elif p is None:
continue
else:
paper_sets.append(set(p))
merged_papers = sorted(set().union(*paper_sets)) if paper_sets else []
first_vals = [it.get("first_seen") for it in items if isinstance(it.get("first_seen"), int)]
last_vals = [it.get("last_seen") for it in items if isinstance(it.get("last_seen"), int)]
merged[key] = {
"name": names[0] if names else "Unknown",
"country": countries[0] if countries else "Unknown",
"first_seen": min(first_vals) if first_vals else "Unknown",
"last_seen": max(last_vals) if last_vals else "Unknown",
"papers": merged_papers,
}
return merged or None
# ─────────────────────────────────────────────────────────────────────────
# Getters / Setters
# ─────────────────────────────────────────────────────────────────────────
@property
def duplicates(self):
return self._duplicates
@duplicates.setter
def duplicates(self, duplicates):
if duplicates is None:
self._duplicates = []
elif isinstance(duplicates, list):
self._duplicates = duplicates
else:
raise TypeError(f"{type(duplicates)} is an invalid type for `duplicates`")
@property
def s2_duplicates(self):
return self._s2_duplicates
@s2_duplicates.setter
def s2_duplicates(self, s2_duplicates):
if s2_duplicates is None:
self._s2_duplicates = {}
elif isinstance(s2_duplicates, list):
self._s2_duplicates = self.__add_s2_duplicates(s2_duplicates)
else:
raise TypeError(f"{type(s2_duplicates)} is an invalid type for `s2_duplicates`")