Source code for pyleotups.utils.PangaeaStudy

import re
from typing import Optional, Dict, Any, List, Tuple

import pandas as pd
from pangaeapy import PanDataSet
from pybtex.database import Entry, Person
import bibtexparser
from doi2bib import crossref

import logging
logger = logging.getLogger(__name__)

_DOI_RE = re.compile(r"(10\.\d{4,9}/\S+)", re.IGNORECASE)
_YEAR_RE = re.compile(r"\b(19|20)\d{2}\b")


def _extract_dois(s: Optional[str]) -> List[str]:
    if not s:
        return []
    return [m.rstrip(".,);") for m in _DOI_RE.findall(s)]


def _extract_year(s: Optional[str]) -> Optional[str]:
    if not s:
        return None
    m = _YEAR_RE.search(s)
    return m.group(0) if m else None


def _make_citation_key(base: str, idx: int) -> str:
    safe = base.replace("/", "_").replace(".", "_").replace(":", "_").replace(" ", "_")
    return f"{safe}_{idx}"


def _split_authors(author_str: Optional[str]) -> List[Person]:
    if not author_str:
        return []
    parts = [p.strip() for p in re.split(r";| and |, and |,", author_str) if p.strip()]
    return [Person(p) for p in parts]


[docs] class PangaeaStudy: """ Utility class representing a single PANGAEA study. This class wraps a persistent `pangaeapy.PanDataSet` instance and provides: - Lazy data loading - NOAA-style summary normalization - Geographic extraction - Deep publication parsing (including supplement handling) Parameters ---------- study_id : str DOI, URI, or identifier of the PANGAEA dataset. cache_dir : str or None, optional Directory for pangaeapy cache. auth_token : str or None, optional PANGAEA authentication token for restricted datasets. """ def __init__( self, study_id: str, cache_dir: Optional[str] = None, auth_token: Optional[str] = None, ): self.study_id = study_id self.cache_dir = cache_dir self.auth_token = auth_token self._panobj = PanDataSet( id=study_id, cachedir=cache_dir, auth_token=auth_token, ) # ------------------------------------------------------------------ # Data Handling # ------------------------------------------------------------------ @staticmethod def _normalize_id(study_id: str) -> int: """ Extract numeric PANGAEA ID from DOI or URI string. Examples -------- 'doi.pangaea.de/10.1594/PANGAEA.830587' → 830587 """ match = re.search(r"PANGAEA\.(\d+)", str(study_id)) if match: return int(match.group(1)) # Fallback: assume already numeric return int(study_id)
[docs] def get_data(self) -> pd.DataFrame: """ Retrieve the dataset as a pandas DataFrame. Returns ------- pandas.DataFrame Copy of the dataset table with metadata stored in ``df.attrs``. """ df = self._panobj.data.copy() df.attrs["source"] = "PANGAEA" df.attrs["StudyID"] = self.study_id df.attrs["DOI"] = self._panobj.doi df.attrs["Citation"] = self._panobj.citation return df
# ------------------------------------------------------------------ # Summary Metadata # ------------------------------------------------------------------ def _extract_temporal_extent( self, ) -> Tuple[Optional[int], Optional[int], Optional[int], Optional[int]]: """ Extract temporal coverage from Age columns (CE and/or BP). 1.Collect all CE-type columns 2.Collect all BP-type columns 3.Compute CE from CE columns 4.Compute BP from BP columns 5.If one side missing → derive from the other 6.If still missing → fallback to Date/Time column (if present) Returns ------- tuple (EarliestYearBP, MostRecentYearBP, EarliestYearCE, MostRecentYearCE) """ earliest_bp = latest_bp = None earliest_ce = latest_ce = None try: df = self._panobj.data params = self._panobj.params ce_values = [] bp_values = [] # -------------------------------------------------- # Identify all age-related columns # -------------------------------------------------- AGE_PATTERN = re.compile(r"\bage\b", re.IGNORECASE) EXCLUDE_PATTERN = re.compile( r"(error|std|deviation|uncertainty|dated|comment|\be\b)", re.IGNORECASE ) for col_name, param in params.items(): name = param.name or "" short = param.shortName or "" unit = (param.unit or "").lower() # ----------------------------------------- # Detect Age columns (word boundary safe) # ----------------------------------------- if not (AGE_PATTERN.search(name) or AGE_PATTERN.search(short)): continue # ----------------------------------------- # Exclude uncertainty / error columns # ----------------------------------------- if EXCLUDE_PATTERN.search(name) or EXCLUDE_PATTERN.search(short): continue # if col_name not in df.columns: # continue series = pd.to_numeric(df[col_name], errors="coerce").dropna() if series.empty: continue # ----------------------------------------- # CE units # ----------------------------------------- if re.search(r"\b(ad|ce)\b", unit): ce_values.extend(series.tolist()) # ----------------------------------------- # BP units # ----------------------------------------- if re.search(r"\bbp\b", unit): if "ka" in unit: bp_values.extend((series * 1000).tolist()) else: bp_values.extend(series.tolist()) # -------------------------------------------------- # Compute CE if present # -------------------------------------------------- if ce_values: ce_min = min(ce_values) ce_max = max(ce_values) earliest_ce = int(ce_min) latest_ce = int(ce_max) # -------------------------------------------------- # Compute BP if present # -------------------------------------------------- if bp_values: bp_min = min(bp_values) bp_max = max(bp_values) # In BP: larger = older earliest_bp = int(bp_max) latest_bp = int(bp_min) # -------------------------------------------------- # Derive missing side if necessary # -------------------------------------------------- if earliest_ce is not None and earliest_bp is None: earliest_bp = 1950 - latest_ce latest_bp = 1950 - earliest_ce if earliest_bp is not None and earliest_ce is None: earliest_ce = 1950 - earliest_bp latest_ce = 1950 - latest_bp # -------------------------------------------------- # Fallback to Date/Time if nothing found # -------------------------------------------------- # if ( # earliest_ce is None # and earliest_bp is None # and "Date/Time" in df.columns # ): # years = pd.to_datetime( # df["Date/Time"], errors="coerce" # ).dt.year.dropna() # if not years.empty: # earliest_ce = int(years.min()) # latest_ce = int(years.max()) # earliest_bp = 1950 - latest_ce # latest_bp = 1950 - earliest_ce except Exception: pass return earliest_bp, latest_bp, earliest_ce, latest_ce def _compute_coverage(self) -> Optional[Tuple[float, float, float, float]]: """ Compute consolidated geographic coverage for the study events. The coverage is based on all event latitude/longitude pairs. If latitude2/longitude2 are not provided for an event, the single coordinate is reused for both bounds. Returns ------- tuple or None (MinLatitude, MaxLatitude, MinLongitude, MaxLongitude) or None when no valid coordinates exist. """ latitudes = [] longitudes = [] for ev in self._panobj.events: lat1 = ev.latitude lat2 = ev.latitude2 if getattr(ev, "latitude2", None) is not None else lat1 lon1 = ev.longitude lon2 = ev.longitude2 if getattr(ev, "longitude2", None) is not None else lon1 if lat1 is not None: latitudes.append(lat1) if lat2 is not None: latitudes.append(lat2) if lon1 is not None: longitudes.append(lon1) if lon2 is not None: longitudes.append(lon2) if not latitudes or not longitudes: return None return ( min(latitudes), max(latitudes), min(longitudes), max(longitudes), )
[docs] def to_summary_dict(self) -> Dict[str, Any]: """ Convert study metadata to NOAA-style summary dictionary. Returns ------- dict Dictionary with standardized summary fields. """ ds = self._panobj self.earliest_bp, self.latest_bp, self.earliest_ce, self.latest_ce = ( self._extract_temporal_extent() ) self.coverage = self._compute_coverage() return { "StudyID": self.study_id, "StudyName": ds.title, "EarliestYearBP": self.earliest_bp, "MostRecentYearBP": self.latest_bp, "EarliestYearCE": self.earliest_ce, "MostRecentYearCE": self.latest_ce, "Coverage [S, N, W, E]": self.coverage, "StudyNotes": ds.abstract, "ScienceKeywords": getattr(ds, "keywords", None), "Investigators": ", ".join(a.fullname for a in ds.authors), "Publications": ds.citation, "Sites": [e.label for e in ds.events], "Funding": [ {"name": p.name, "url": p.URL, "award": p.awardURI} for p in ds.projects ], "CollectionMembers": ( [self._normalize_id(m) for m in ds.collection_members] if ds.isCollection and ds.collection_members else None ), }
# ------------------------------------------------------------------ # Geographic Information # ------------------------------------------------------------------
[docs] def get_geo(self) -> pd.DataFrame: """ Retrieve geographic metadata for study events. Returns ------- pandas.DataFrame DataFrame containing event-level geographic information. """ rows = [] for ev in self._panobj.events: lat1 = ev.latitude lon1 = ev.longitude lat2 = ev.latitude2 if getattr(ev, "latitude2", None) is not None else lat1 lon2 = ev.longitude2 if getattr(ev, "longitude2", None) is not None else lon1 rows.append( { "StudyID": self.study_id, "SiteID": ev.id, "SiteName": ev.label, "LocationName": ev.location, "MinLatitude": min(v for v in [lat1, lat2] if v is not None) if lat1 is not None or lat2 is not None else None, "MaxLatitude": max(v for v in [lat1, lat2] if v is not None) if lat1 is not None or lat2 is not None else None, "MinLongitude": min(v for v in [lon1, lon2] if v is not None) if lon1 is not None or lon2 is not None else None, "MaxLongitude": max(v for v in [lon1, lon2] if v is not None) if lon1 is not None or lon2 is not None else None, "Elevation": ev.elevation, } ) # -------------------------------------------------- # Fallback: dataset-level geometryextent # -------------------------------------------------- if not rows: geo = getattr(self._panobj, "geometryextent", None) if geo: rows.append( { "StudyID": self.study_id, "SiteID": None, "SiteName": None, "LocationName": None, "MinLatitude": ( float(geo["southBoundLatitude"]) if geo.get("southBoundLatitude") is not None else None), "MaxLatitude": ( float(geo["northBoundLatitude"]) if geo.get("northBoundLatitude") is not None else None), "MinLongitude": ( float(geo["westBoundLongitude"]) if geo.get("westBoundLongitude") is not None else None), "MaxLongitude": ( float(geo["eastBoundLongitude"]) if geo.get("eastBoundLongitude") is not None else None), "Elevation": None, } ) return pd.DataFrame(rows)
# ------------------------------------------------------------------ # Publications # ------------------------------------------------------------------ def _fetch_publication_from_doi(self, doi: str, type): """ Fetch publication metadata from a DOI using Crossref. Parameters ---------- doi : str DOI string (may include prefixes or extra text) Returns ------- tuple (row_dict, pybtex_entry) Returns (None, None) if retrieval fails. """ if not doi or not isinstance(doi, str): return None, None # try: # -------------------------------------------------- # Normalize DOI # -------------------------------------------------- doi_clean = doi.replace("doi:", "").strip() # -------------------------------------------------- # Fetch BibTeX from Crossref # -------------------------------------------------- status, citation = crossref.get_bib(doi=doi_clean) # if "PANGAEA" in doi_clean: if not status or not citation: logger.warning(f"Failed to fetch bibtex for DOI {doi_clean}, status: {status}, citation: {citation}") return None, None # -------------------------------------------------- # Fix BibTeX formatting issues (month field) # -------------------------------------------------- citation = re.sub( r'month\s*=\s*(\w+)', r'month={\1}', citation ) # -------------------------------------------------- # Parse BibTeX # -------------------------------------------------- parser = bibtexparser.bparser.BibTexParser(common_strings=True) library = bibtexparser.loads(citation, parser=parser) if not library.entries: logger.warning(f"No entries parsed from bibtex for DOI {doi_clean}") return None, None entry = library.entries[0] # -------------------------------------------------- # Build row dictionary # -------------------------------------------------- author_str = entry.get("author", "") persons = [] if author_str: persons = [Person(a.strip()) for a in author_str.split(" and ") if a.strip()] row = { "Author": author_str, "Title": entry.get("title", "").strip("{}"), "Journal": entry.get("journal", "").strip("{}"), "Year": entry.get("year", ""), "Volume": entry.get("volume", ""), "Number": entry.get("number", ""), "Pages": entry.get("pages", ""), "Type": None, # ← IMPORTANT: assigned outside "DOI": doi_clean, "URL": entry.get("url", ""), } # -------------------------------------------------- # Build pybtex Entry # -------------------------------------------------- bib_entry = Entry( entry.get("ENTRYTYPE", "article"), persons={"authors": persons}, fields={ k: v for k, v in entry.items() if k not in ["ENTRYTYPE", "ID", "author"] } ) if entry.get("ID"): bib_entry.key = entry.get("ID") return row, bib_entry # except Exception as e: # logger.error(f"Error fetching publication for DOI {doi}: {str(e)}") # return None, None def _extract_publications(self): """ Extract structured publication information from PANGAEA dataset. Sources: 1. Dataset citation (always included) 2. supplement_to (if DOI present) 3. relations (if DOI present) Returns ------- tuple (list_of_rows, dict_of_bibtex_entries) """ ds = self._panobj rows = [] bib_entries = {} idx = 0 seen_dois = set() # ------------------------------------------------------- # Helper to safely add publication # ------------------------------------------------------- def _add_publication(doi, pub_type): nonlocal idx if not doi or doi in seen_dois: return row, entry = self._fetch_publication_from_doi(doi, pub_type) if row: row["Type"] = pub_type rows.append(row) if entry: if not entry.key: key = _make_citation_key(doi, idx) else: key = entry.key bib_entries[key] = entry seen_dois.add(doi) idx += 1 # ------------------------------------------------------- # 1. Dataset citation (ALWAYS INCLUDED) # ------------------------------------------------------- citation = ds.citation if citation: row = { "Author": None, "Title": citation, "Journal": "Pangaea", "Year": _extract_year(citation), "Volume": None, "Number": None, "Pages": None, "Type": "dataset_citation", "DOI": _extract_dois(citation)[0] if _extract_dois(citation) else None, "URL": None, } rows.append(row) # ------------------------------------------------------- # 2. supplement_to # ------------------------------------------------------- supp = ds.supplement_to if supp and supp.get("uri"): uri = supp.get("uri") doi_list = _extract_dois(uri) if doi_list: _add_publication(doi_list[0], "supplement") # ------------------------------------------------------- # 3. relations # ------------------------------------------------------- for rel in ds.relations: uri = rel.get("uri") rel_type = rel.get("type", "") rel = "reference" if rel_type and "related" in rel_type.lower() else None if not uri: continue doi_list = _extract_dois(uri) if not doi_list: continue _add_publication(doi_list[0], rel) return rows, bib_entries
[docs] def get_funding(self) -> pd.DataFrame: """ Retrieve funding information for this study. Returns ------- pandas.DataFrame DataFrame with columns: ['StudyID', 'StudyName', 'FundingAgency', 'FundingGrant']. If no funding metadata is available, returns an empty DataFrame with columns preserved. """ ds = self._panobj rows = [] projects = getattr(ds, "projects", None) if projects: for p in projects if isinstance(projects, (list, tuple)) else [projects]: grant = "" if getattr(p, "label", None): grant += p.label if getattr(p, "id", None): grant += f" / {p.id}" rows.append( { "StudyID": self.study_id, "StudyName": ds.title, "FundingAgency": getattr(p, "URL", None) or getattr(p, "url", None), "FundingGrant": grant if grant else None, } ) if not rows: return pd.DataFrame( columns=["StudyID", "StudyName", "FundingAgency", "FundingGrant"] ) return pd.DataFrame( rows, columns=["StudyID", "StudyName", "FundingAgency", "FundingGrant"], )
[docs] def get_variables(self) -> pd.DataFrame: """ Retrieve variable (parameter) metadata for this study. Returns ------- pandas.DataFrame One row per parameter with the following columns: - StudyID - VariableName - ShortName - Unit - OntologyTerms Notes ----- For collection datasets, this returns an empty DataFrame. """ ds = self._panobj # Collections do not contain parameters if ds.isCollection: return pd.DataFrame( columns=[ "StudyID", "VariableName", "ShortName", "Unit", "OntologyTerms", ] ) rows = [] for col_name, param in ds.params.items(): rows.append( { "StudyID": self.study_id, "VariableName": param.name, "ShortName": param.shortName, "Unit": param.unit, "OntologyTerms": param.terms, } ) return pd.DataFrame(rows)