import re
from typing import Optional, Dict, Any, List, Tuple
import pandas as pd
from pangaeapy import PanDataSet
from pybtex.database import Entry, Person
import bibtexparser
from doi2bib import crossref
import logging
logger = logging.getLogger(__name__)
_DOI_RE = re.compile(r"(10\.\d{4,9}/\S+)", re.IGNORECASE)
_YEAR_RE = re.compile(r"\b(19|20)\d{2}\b")
def _extract_dois(s: Optional[str]) -> List[str]:
if not s:
return []
return [m.rstrip(".,);") for m in _DOI_RE.findall(s)]
def _extract_year(s: Optional[str]) -> Optional[str]:
if not s:
return None
m = _YEAR_RE.search(s)
return m.group(0) if m else None
def _make_citation_key(base: str, idx: int) -> str:
safe = base.replace("/", "_").replace(".", "_").replace(":", "_").replace(" ", "_")
return f"{safe}_{idx}"
def _split_authors(author_str: Optional[str]) -> List[Person]:
if not author_str:
return []
parts = [p.strip() for p in re.split(r";| and |, and |,", author_str) if p.strip()]
return [Person(p) for p in parts]
[docs]
class PangaeaStudy:
"""
Utility class representing a single PANGAEA study.
This class wraps a persistent `pangaeapy.PanDataSet` instance and provides:
- Lazy data loading
- NOAA-style summary normalization
- Geographic extraction
- Deep publication parsing (including supplement handling)
Parameters
----------
study_id : str
DOI, URI, or identifier of the PANGAEA dataset.
cache_dir : str or None, optional
Directory for pangaeapy cache.
auth_token : str or None, optional
PANGAEA authentication token for restricted datasets.
"""
def __init__(
self,
study_id: str,
cache_dir: Optional[str] = None,
auth_token: Optional[str] = None,
):
self.study_id = study_id
self.cache_dir = cache_dir
self.auth_token = auth_token
self._panobj = PanDataSet(
id=study_id,
cachedir=cache_dir,
auth_token=auth_token,
)
# ------------------------------------------------------------------
# Data Handling
# ------------------------------------------------------------------
@staticmethod
def _normalize_id(study_id: str) -> int:
"""
Extract numeric PANGAEA ID from DOI or URI string.
Examples
--------
'doi.pangaea.de/10.1594/PANGAEA.830587'
→ 830587
"""
match = re.search(r"PANGAEA\.(\d+)", str(study_id))
if match:
return int(match.group(1))
# Fallback: assume already numeric
return int(study_id)
[docs]
def get_data(self) -> pd.DataFrame:
"""
Retrieve the dataset as a pandas DataFrame.
Returns
-------
pandas.DataFrame
Copy of the dataset table with metadata stored in ``df.attrs``.
"""
df = self._panobj.data.copy()
df.attrs["source"] = "PANGAEA"
df.attrs["StudyID"] = self.study_id
df.attrs["DOI"] = self._panobj.doi
df.attrs["Citation"] = self._panobj.citation
return df
# ------------------------------------------------------------------
# Summary Metadata
# ------------------------------------------------------------------
def _extract_temporal_extent(
self,
) -> Tuple[Optional[int], Optional[int], Optional[int], Optional[int]]:
"""
Extract temporal coverage from Age columns (CE and/or BP).
1.Collect all CE-type columns
2.Collect all BP-type columns
3.Compute CE from CE columns
4.Compute BP from BP columns
5.If one side missing → derive from the other
6.If still missing → fallback to Date/Time column (if present)
Returns
-------
tuple
(EarliestYearBP, MostRecentYearBP,
EarliestYearCE, MostRecentYearCE)
"""
earliest_bp = latest_bp = None
earliest_ce = latest_ce = None
try:
df = self._panobj.data
params = self._panobj.params
ce_values = []
bp_values = []
# --------------------------------------------------
# Identify all age-related columns
# --------------------------------------------------
AGE_PATTERN = re.compile(r"\bage\b", re.IGNORECASE)
EXCLUDE_PATTERN = re.compile(
r"(error|std|deviation|uncertainty|dated|comment|\be\b)",
re.IGNORECASE
)
for col_name, param in params.items():
name = param.name or ""
short = param.shortName or ""
unit = (param.unit or "").lower()
# -----------------------------------------
# Detect Age columns (word boundary safe)
# -----------------------------------------
if not (AGE_PATTERN.search(name) or AGE_PATTERN.search(short)):
continue
# -----------------------------------------
# Exclude uncertainty / error columns
# -----------------------------------------
if EXCLUDE_PATTERN.search(name) or EXCLUDE_PATTERN.search(short):
continue
# if col_name not in df.columns:
# continue
series = pd.to_numeric(df[col_name], errors="coerce").dropna()
if series.empty:
continue
# -----------------------------------------
# CE units
# -----------------------------------------
if re.search(r"\b(ad|ce)\b", unit):
ce_values.extend(series.tolist())
# -----------------------------------------
# BP units
# -----------------------------------------
if re.search(r"\bbp\b", unit):
if "ka" in unit:
bp_values.extend((series * 1000).tolist())
else:
bp_values.extend(series.tolist())
# --------------------------------------------------
# Compute CE if present
# --------------------------------------------------
if ce_values:
ce_min = min(ce_values)
ce_max = max(ce_values)
earliest_ce = int(ce_min)
latest_ce = int(ce_max)
# --------------------------------------------------
# Compute BP if present
# --------------------------------------------------
if bp_values:
bp_min = min(bp_values)
bp_max = max(bp_values)
# In BP: larger = older
earliest_bp = int(bp_max)
latest_bp = int(bp_min)
# --------------------------------------------------
# Derive missing side if necessary
# --------------------------------------------------
if earliest_ce is not None and earliest_bp is None:
earliest_bp = 1950 - latest_ce
latest_bp = 1950 - earliest_ce
if earliest_bp is not None and earliest_ce is None:
earliest_ce = 1950 - earliest_bp
latest_ce = 1950 - latest_bp
# --------------------------------------------------
# Fallback to Date/Time if nothing found
# --------------------------------------------------
# if (
# earliest_ce is None
# and earliest_bp is None
# and "Date/Time" in df.columns
# ):
# years = pd.to_datetime(
# df["Date/Time"], errors="coerce"
# ).dt.year.dropna()
# if not years.empty:
# earliest_ce = int(years.min())
# latest_ce = int(years.max())
# earliest_bp = 1950 - latest_ce
# latest_bp = 1950 - earliest_ce
except Exception:
pass
return earliest_bp, latest_bp, earliest_ce, latest_ce
def _compute_coverage(self) -> Optional[Tuple[float, float, float, float]]:
"""
Compute consolidated geographic coverage for the study events.
The coverage is based on all event latitude/longitude pairs.
If latitude2/longitude2 are not provided for an event, the single
coordinate is reused for both bounds.
Returns
-------
tuple or None
(MinLatitude, MaxLatitude, MinLongitude, MaxLongitude)
or None when no valid coordinates exist.
"""
latitudes = []
longitudes = []
for ev in self._panobj.events:
lat1 = ev.latitude
lat2 = ev.latitude2 if getattr(ev, "latitude2", None) is not None else lat1
lon1 = ev.longitude
lon2 = ev.longitude2 if getattr(ev, "longitude2", None) is not None else lon1
if lat1 is not None:
latitudes.append(lat1)
if lat2 is not None:
latitudes.append(lat2)
if lon1 is not None:
longitudes.append(lon1)
if lon2 is not None:
longitudes.append(lon2)
if not latitudes or not longitudes:
return None
return (
min(latitudes),
max(latitudes),
min(longitudes),
max(longitudes),
)
[docs]
def to_summary_dict(self) -> Dict[str, Any]:
"""
Convert study metadata to NOAA-style summary dictionary.
Returns
-------
dict
Dictionary with standardized summary fields.
"""
ds = self._panobj
self.earliest_bp, self.latest_bp, self.earliest_ce, self.latest_ce = (
self._extract_temporal_extent()
)
self.coverage = self._compute_coverage()
return {
"StudyID": self.study_id,
"StudyName": ds.title,
"EarliestYearBP": self.earliest_bp,
"MostRecentYearBP": self.latest_bp,
"EarliestYearCE": self.earliest_ce,
"MostRecentYearCE": self.latest_ce,
"Coverage [S, N, W, E]": self.coverage,
"StudyNotes": ds.abstract,
"ScienceKeywords": getattr(ds, "keywords", None),
"Investigators": ", ".join(a.fullname for a in ds.authors),
"Publications": ds.citation,
"Sites": [e.label for e in ds.events],
"Funding": [
{"name": p.name, "url": p.URL, "award": p.awardURI}
for p in ds.projects
],
"CollectionMembers": (
[self._normalize_id(m) for m in ds.collection_members]
if ds.isCollection and ds.collection_members
else None
),
}
# ------------------------------------------------------------------
# Geographic Information
# ------------------------------------------------------------------
[docs]
def get_geo(self) -> pd.DataFrame:
"""
Retrieve geographic metadata for study events.
Returns
-------
pandas.DataFrame
DataFrame containing event-level geographic information.
"""
rows = []
for ev in self._panobj.events:
lat1 = ev.latitude
lon1 = ev.longitude
lat2 = ev.latitude2 if getattr(ev, "latitude2", None) is not None else lat1
lon2 = ev.longitude2 if getattr(ev, "longitude2", None) is not None else lon1
rows.append(
{
"StudyID": self.study_id,
"SiteID": ev.id,
"SiteName": ev.label,
"LocationName": ev.location,
"MinLatitude": min(v for v in [lat1, lat2] if v is not None) if lat1 is not None or lat2 is not None else None,
"MaxLatitude": max(v for v in [lat1, lat2] if v is not None) if lat1 is not None or lat2 is not None else None,
"MinLongitude": min(v for v in [lon1, lon2] if v is not None) if lon1 is not None or lon2 is not None else None,
"MaxLongitude": max(v for v in [lon1, lon2] if v is not None) if lon1 is not None or lon2 is not None else None,
"Elevation": ev.elevation,
}
)
# --------------------------------------------------
# Fallback: dataset-level geometryextent
# --------------------------------------------------
if not rows:
geo = getattr(self._panobj, "geometryextent", None)
if geo:
rows.append(
{
"StudyID": self.study_id,
"SiteID": None,
"SiteName": None,
"LocationName": None,
"MinLatitude": ( float(geo["southBoundLatitude"]) if geo.get("southBoundLatitude") is not None else None),
"MaxLatitude": ( float(geo["northBoundLatitude"]) if geo.get("northBoundLatitude") is not None else None),
"MinLongitude": ( float(geo["westBoundLongitude"]) if geo.get("westBoundLongitude") is not None else None),
"MaxLongitude": ( float(geo["eastBoundLongitude"]) if geo.get("eastBoundLongitude") is not None else None),
"Elevation": None,
}
)
return pd.DataFrame(rows)
# ------------------------------------------------------------------
# Publications
# ------------------------------------------------------------------
def _fetch_publication_from_doi(self, doi: str, type):
"""
Fetch publication metadata from a DOI using Crossref.
Parameters
----------
doi : str
DOI string (may include prefixes or extra text)
Returns
-------
tuple
(row_dict, pybtex_entry)
Returns (None, None) if retrieval fails.
"""
if not doi or not isinstance(doi, str):
return None, None
# try:
# --------------------------------------------------
# Normalize DOI
# --------------------------------------------------
doi_clean = doi.replace("doi:", "").strip()
# --------------------------------------------------
# Fetch BibTeX from Crossref
# --------------------------------------------------
status, citation = crossref.get_bib(doi=doi_clean)
# if "PANGAEA" in doi_clean:
if not status or not citation:
logger.warning(f"Failed to fetch bibtex for DOI {doi_clean}, status: {status}, citation: {citation}")
return None, None
# --------------------------------------------------
# Fix BibTeX formatting issues (month field)
# --------------------------------------------------
citation = re.sub(
r'month\s*=\s*(\w+)',
r'month={\1}',
citation
)
# --------------------------------------------------
# Parse BibTeX
# --------------------------------------------------
parser = bibtexparser.bparser.BibTexParser(common_strings=True)
library = bibtexparser.loads(citation, parser=parser)
if not library.entries:
logger.warning(f"No entries parsed from bibtex for DOI {doi_clean}")
return None, None
entry = library.entries[0]
# --------------------------------------------------
# Build row dictionary
# --------------------------------------------------
author_str = entry.get("author", "")
persons = []
if author_str:
persons = [Person(a.strip()) for a in author_str.split(" and ") if a.strip()]
row = {
"Author": author_str,
"Title": entry.get("title", "").strip("{}"),
"Journal": entry.get("journal", "").strip("{}"),
"Year": entry.get("year", ""),
"Volume": entry.get("volume", ""),
"Number": entry.get("number", ""),
"Pages": entry.get("pages", ""),
"Type": None, # ← IMPORTANT: assigned outside
"DOI": doi_clean,
"URL": entry.get("url", ""),
}
# --------------------------------------------------
# Build pybtex Entry
# --------------------------------------------------
bib_entry = Entry(
entry.get("ENTRYTYPE", "article"),
persons={"authors": persons},
fields={
k: v for k, v in entry.items()
if k not in ["ENTRYTYPE", "ID", "author"]
}
)
if entry.get("ID"):
bib_entry.key = entry.get("ID")
return row, bib_entry
# except Exception as e:
# logger.error(f"Error fetching publication for DOI {doi}: {str(e)}")
# return None, None
def _extract_publications(self):
"""
Extract structured publication information from PANGAEA dataset.
Sources:
1. Dataset citation (always included)
2. supplement_to (if DOI present)
3. relations (if DOI present)
Returns
-------
tuple
(list_of_rows, dict_of_bibtex_entries)
"""
ds = self._panobj
rows = []
bib_entries = {}
idx = 0
seen_dois = set()
# -------------------------------------------------------
# Helper to safely add publication
# -------------------------------------------------------
def _add_publication(doi, pub_type):
nonlocal idx
if not doi or doi in seen_dois:
return
row, entry = self._fetch_publication_from_doi(doi, pub_type)
if row:
row["Type"] = pub_type
rows.append(row)
if entry:
if not entry.key:
key = _make_citation_key(doi, idx)
else:
key = entry.key
bib_entries[key] = entry
seen_dois.add(doi)
idx += 1
# -------------------------------------------------------
# 1. Dataset citation (ALWAYS INCLUDED)
# -------------------------------------------------------
citation = ds.citation
if citation:
row = {
"Author": None,
"Title": citation,
"Journal": "Pangaea",
"Year": _extract_year(citation),
"Volume": None,
"Number": None,
"Pages": None,
"Type": "dataset_citation",
"DOI": _extract_dois(citation)[0] if _extract_dois(citation) else None,
"URL": None,
}
rows.append(row)
# -------------------------------------------------------
# 2. supplement_to
# -------------------------------------------------------
supp = ds.supplement_to
if supp and supp.get("uri"):
uri = supp.get("uri")
doi_list = _extract_dois(uri)
if doi_list:
_add_publication(doi_list[0], "supplement")
# -------------------------------------------------------
# 3. relations
# -------------------------------------------------------
for rel in ds.relations:
uri = rel.get("uri")
rel_type = rel.get("type", "")
rel = "reference" if rel_type and "related" in rel_type.lower() else None
if not uri:
continue
doi_list = _extract_dois(uri)
if not doi_list:
continue
_add_publication(doi_list[0], rel)
return rows, bib_entries
[docs]
def get_funding(self) -> pd.DataFrame:
"""
Retrieve funding information for this study.
Returns
-------
pandas.DataFrame
DataFrame with columns:
['StudyID', 'StudyName', 'FundingAgency', 'FundingGrant'].
If no funding metadata is available,
returns an empty DataFrame with columns preserved.
"""
ds = self._panobj
rows = []
projects = getattr(ds, "projects", None)
if projects:
for p in projects if isinstance(projects, (list, tuple)) else [projects]:
grant = ""
if getattr(p, "label", None):
grant += p.label
if getattr(p, "id", None):
grant += f" / {p.id}"
rows.append(
{
"StudyID": self.study_id,
"StudyName": ds.title,
"FundingAgency": getattr(p, "URL", None)
or getattr(p, "url", None),
"FundingGrant": grant if grant else None,
}
)
if not rows:
return pd.DataFrame(
columns=["StudyID", "StudyName", "FundingAgency", "FundingGrant"]
)
return pd.DataFrame(
rows,
columns=["StudyID", "StudyName", "FundingAgency", "FundingGrant"],
)
[docs]
def get_variables(self) -> pd.DataFrame:
"""
Retrieve variable (parameter) metadata for this study.
Returns
-------
pandas.DataFrame
One row per parameter with the following columns:
- StudyID
- VariableName
- ShortName
- Unit
- OntologyTerms
Notes
-----
For collection datasets, this returns an empty DataFrame.
"""
ds = self._panobj
# Collections do not contain parameters
if ds.isCollection:
return pd.DataFrame(
columns=[
"StudyID",
"VariableName",
"ShortName",
"Unit",
"OntologyTerms",
]
)
rows = []
for col_name, param in ds.params.items():
rows.append(
{
"StudyID": self.study_id,
"VariableName": param.name,
"ShortName": param.shortName,
"Unit": param.unit,
"OntologyTerms": param.terms,
}
)
return pd.DataFrame(rows)