from .BaseDataset import BaseDataset
from typing import Any, Dict, List, Optional, Tuple, Union
import logging
import io
import os
import pandas as pd
import requests
import warnings
import datetime
from pybtex.database import BibliographyData, Entry, Person
from pybtex.database.output.bibtex import Writer
from pangaeapy.pandataset import PanDataSet, PanEvent
from ..utils.PangaeaStudy import PangaeaStudy
from ..utils.api.query_builder import build_pangaea_query
logging.getLogger("pangaeapy").setLevel(logging.ERROR)
logger = logging.getLogger(__name__)
# try to import pangaeapy; raise helpful error if missing
try:
from pangaeapy import PanQuery, PanDataSet
except Exception as exc:
PanQuery = None
PanDataSet = None
_PANGAEA_IMPORT_ERROR = exc
import re
_DOI_RE = re.compile(r'(10\.\d{4,9}/\S+)', re.IGNORECASE)
_YEAR_RE = re.compile(r'\b(19|20)\d{2}\b')
[docs]
class PangaeaDataset(BaseDataset):
"""
PangaeaDataset: lightweight provider that mirrors pyleotups.core.Dataset responses.
Notes:
- search_studies(kwargs) registers studies
in self.studies (StudyID -> {'panobj': PanDataSet|None, 'summary': normalized_dict})
- get_summary() returns a pandas.DataFrame exactly matching NOAA Dataset.to_dict() column names.
- get_publications(), get_geo(), get_funding() return DataFrames with the same column names
as the original Dataset methods. Missing data produces empty DataFrames / NaNs.
- get_data(identifier) returns a pandas.DataFrame (parsed table) and sets df.attrs["source"].
This file intentionally mirrors the structure used in NOAADataset.py and NOAAStudy.py
"""
def __init__(self, cache_dir: Optional[str] = None, auth_token: Optional[str] = None):
if PanQuery is None or PanDataSet is None:
raise ImportError(
"pangaeapy is required. Install via `pip install pangaeapy`.\n"
f"Import error: {_PANGAEA_IMPORT_ERROR}"
)
self.cache_dir = cache_dir
self.auth_token = auth_token
# Registry mirroring pyleotups.core.NOAADataset.studies
# keys: StudyID (DOI/URI) -> {'panobj': PanDataSet or None, 'summary': normalized_dict}
self.studies: Dict[str, PangaeaStudy] = {}
def __add__(self, other):
if not isinstance(other, PangaeaDataset):
return NotImplemented
merged = PangaeaDataset(cache_dir=self.cache_dir, auth_token=self.auth_token)
# Start with a shallow copy of left's studies
merged.studies = dict(self.studies)
# Union by StudyID. If duplicate ID appears, keep left's version
# but sanity-check equality and warn if they differ.
for sid, study in other.studies.items():
if sid in merged.studies:
try:
check_same = (merged.studies[sid].to_summary_dict() == study.to_summary_dict())
except Exception:
check_same = False
if not check_same:
logger.warning(
f"PangaeaDataset union: duplicate StudyID {sid} with differing content. "
"Keeping left-hand version."
)
else:
merged.studies[sid] = study
return merged
def __iadd__(self, other):
if not isinstance(other, PangaeaDataset):
return NotImplemented
for sid, study in other.studies.items():
if sid in self.studies:
try:
check_same = (self.studies[sid].to_summary_dict() == study.to_summary_dict())
except Exception:
check_same = False
if not check_same:
logger.warning(
f"PangaeaDataset in-place union: duplicate StudyID {sid} with differing content. "
"Keeping existing version."
)
else:
self.studies[sid] = study
return self
@staticmethod
def _normalize_id(study_id: str) -> int:
"""
Extract numeric PANGAEA ID from DOI or URI string.
Examples
--------
'doi.pangaea.de/10.1594/PANGAEA.830587'
→ 830587
"""
match = re.search(r"PANGAEA\.(\d+)", str(study_id))
if match:
return int(match.group(1))
# Fallback: assume already numeric
return int(study_id)
def _resolve_and_register_ids(self, study_ids):
"""
Normalize and register study IDs.
Parameters
----------
study_ids : int, str, or list
One or more StudyIDs (numeric or DOI string).
Returns
-------
list
List of normalized numeric StudyIDs.
"""
if not isinstance(study_ids, (list, tuple)):
study_ids = [study_ids]
normalized_ids = [self._normalize_id(sid) for sid in study_ids]
for sid in normalized_ids:
# Already registered
if sid in self.studies:
continue
# Check if sid belongs to any registered collection
for parent in self.studies.values():
members = parent._panobj.collection_members
if members:
normalized_members = [
self._normalize_id(m) for m in members
]
if sid in normalized_members:
logger.info(
f"Study {sid} found as collection member. "
f"Registering child dataset."
)
self.studies[sid] = PangaeaStudy(
study_id=sid,
cache_dir=self.cache_dir,
auth_token=self.auth_token,
)
break
else:
# Not in registry, not in collection → direct load
logger.info(
f"Registering Study {sid} via direct lookup."
)
self.studies[sid] = PangaeaStudy(
study_id=sid,
cache_dir=self.cache_dir,
auth_token=self.auth_token,
)
return normalized_ids
# -------------------------
# search_studies: q, bbox, keywords -> registers studies and returns same style as Dataset.search_studies (DataFrame)
# -------------------------
[docs]
def search_studies(
self,
study_ids: int | str | list[int | str] | None = None,
topic: str | list[str] | None = None,
topic_and_or: str = "or",
search_text: str | None = None,
investigators: str | list[str] | None = None,
investigators_and_or: str = "or",
variable_name: str | list[str] | None = None,
variable_name_and_or: str = "or",
min_lat: float | None = None,
max_lat: float | None = None,
min_lon: float | None = None,
max_lon: float | None = None,
limit: int = 100,
skip: int = 0,
) -> Optional[pd.DataFrame]:
"""
Search PANGAEA and register results in self.studies.
Behavior:
- Populates self.studies (StudyID -> {'panobj': None|'PanDataSet', 'summary': minimal})
- Does NOT return the DataFrame by default (returns None).
- If display=True, returns the full normalized summary DataFrame from self.get_summary().
Search for PANGAEA datasets using unified PyleoTUPS query parameters.
This method translates user-friendly query parameters into a PANGAEA-compatible
search query and registers the resulting datasets internally.
Parameters
----------
study_ids : int, str, or list, optional
One or more PANGAEA dataset identifiers (numeric ID or DOI string).
If provided, performs direct lookup and ignores other filters.
topic : str, optional
Filter datasets by PANGAEA topic classification.
Must be one of the predefined topics:
- "all" (default)
- "Agriculture"
- "Atmosphere"
- "Biological Classification"
- "Biosphere"
- "Chemistry"
- "Cryosphere"
- "Ecology"
- "Fisheries"
- "Geophysics"
- "Human Dimensions"
- "Lakes & Rivers"
- "Land Surface"
- "Lithosphere"
- "Oceans"
- "Paleontology"
If set to "all" or omitted, no topic filtering is applied.
If multiple values are provided, they are combined using a logical operator
controlled by ``topic_and_or``.
search_text : str, optional
Free-text search query applied across dataset metadata. Maps to PANGAEA full-text search parameter 'q'.
Example: 'stable carbon and oxygen isotopes'.
investigators : str or list[str], optional
Author names. Mapped internally to PANGAEA query syntax:
``author:<name>``
variable_name : str or list[str], optional
Name of parameters/variables (columns) present in dataset tables.
Internally mapped to PANGAEA query term:
``parameter:<variable_name>``
min_lat, max_lat : float, optional
Latitude bounds (–90..90).
min_lon, max_lon : float, optional
Longitude bounds (–180..180)
limit : int, default 100, maximum 500
Maximum number of results returned.
skip : int, default 0
Number of results to skip (pagination). Maps to PANGAEA 'offset'
topic_and_or : {"and", "or"}, optional, default="or"
Logical operator used to combine multiple ``topic`` values.
- ``"or"`` (default): Matches datasets containing any of the provided topics.
- ``"and"``: Matches datasets containing all provided topics.
investigators_and_or : {"and", "or"}, optional, default="or"
Logical operator used to combine multiple investigator names.
- ``"or"`` (default): Matches datasets authored by any of the investigators.
- ``"and"``: Matches datasets authored by all listed investigators.
variable_name_and_or : {"and", "or"}, optional, default="or"
Logical operator used to combine multiple variable names.
Returns
-------
pandas.DataFrame
DataFrame summarizing matched datasets. Also populates internal registry.
Raises
------
ValueError
If no valid search parameters are provided.
Notes
-----
PANGAEA search is text-based and less structured than NOAA filters.
Results may vary depending on metadata completeness.
Unified query interface.
PyleoTUPS uses consistent parameter names across datasets:
- ``variable_name`` → mapped to ``parameter:`` in PANGAEA
- ``investigators`` → mapped to ``author:``
Query construction.
If ``q`` is not provided, a query string is constructed by combining:
- search_text
- investigators
- variable_name
- keywords
Geospatial filtering.
Bounding box requires all four parameters:
``min_lat, max_lat, min_lon, max_lon``.
Partial inputs are ignored.
Elevation parameters are not currently exposed through the PangaeaPy object or the PANGAEA advanced search endpoint.
Hence, geographical queries are presently limited to 2D horizontal coordinates (lat/lon)
Identifier priority.
If ``study_ids`` is provided, all other filters are ignored.
Multi-value parameters.
When multiple values are provided (e.g., for ``variable_name``, ``investigators``, or ``topic``),
they are combined using a configurable logical operator (``*_and_or``).
By default, PyleoTUPS uses OR semantics (union of matches), even though
PANGAEA interprets space-separated terms as AND.
Examples
--------
Quick Start - Identifier Based search
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. jupyter-execute::
import pyleotups as pt
ds = pt.PangaeaDataset()
### Can use either DOI strings or numeric IDs (extracted from DOIs)
df = ds.search_studies(
study_ids=["10.1594/PANGAEA.830587", "10.1594/PANGAEA.830588"]
)
df.head()
df = ds.search_studies(
study_ids=[830587, 830588]
)
df.head()
Basic search
^^^^^^^^^^^^
.. jupyter-execute::
df = ds.search_studies(search_text="Stable oxygen and carbon isotopes", limit = 5)
df.head()
Variable-based search
^^^^^^^^^^^^^^^^^^^^^
.. jupyter-execute::
df = ds.search_studies(variable_name=["Pulleniatina obliquiloculata δ13C", "Pulleniatina obliquiloculata δ18O"], limit = 5)
df.head()
Investigator/Author-based search
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. jupyter-execute::
df = ds.search_studies(investigators=["Khider, D"], limit = 5)
df.head()
Combined filters
^^^^^^^^^^^^^^^^
.. jupyter-execute::
df = ds.search_studies(
search_text="Stable oxygen and carbon isotopes",
variable_name=["Pulleniatina obliquiloculata δ13C", "Pulleniatina obliquiloculata δ18O"],
investigators="Khider, D",
limit = 5
)
df.head()
Geographic filtering
^^^^^^^^^^^^^^^^^^^^
.. jupyter-execute::
df = ds.search_studies(
min_lat=-10, max_lat=10,
min_lon=120, max_lon=160
)
df.head()
"""
kwargs = locals().copy()
kwargs.pop("self")
self.studies.clear()
study_ids = kwargs.get("study_ids")
# -------------------------------------------
# MODE 1: STUDY IDS (HIGHEST PRIORITY)
# -------------------------------------------
if study_ids is not None:
# Prevent mixing modes
if any([
kwargs.get("search_text"),
kwargs.get("investigators"),
kwargs.get("variable_name"),
kwargs.get("min_lat"),
kwargs.get("max_lat"),
kwargs.get("min_lon"),
kwargs.get("max_lon"),
]):
logger.warning(
"Using identifier-only fetch (Pangaea DOI). Other parameters will be ignored.."
)
self._resolve_and_register_ids(kwargs.get("study_ids"))
logger.info(f"Retrived {len(self.studies)} studies")
return self.get_summary()
# if display else logger.info(f"Retrived {len(self.studies)} studies")
if not any([
kwargs.get("search_text"),
kwargs.get("investigators"),
kwargs.get("variable_name"),
kwargs.get("min_lat"),
kwargs.get("max_lat"),
kwargs.get("min_lon"),
kwargs.get("max_lon"),
]):
raise ValueError(
"At least one search parameter must be specified to initiate a query. "
"To view available parameters and usage examples, run: help(PangaeaDataset.search_studies)"
)
params = build_pangaea_query(**kwargs)
print(params)
try:
pq = PanQuery(
query = params["q"],
bbox = params["bbox"],
limit = params["limit"],
offset = params["offset"])
except Exception as exc:
logger.exception(f"PanQuery failed due to {exc}")
raise
# register results in self.studies but do not accumulate into a dataframe here
for res in pq.result:
raw_id = res.get("URI") or res.get("uri") or res.get("id") or res.get("doi") or None
sid = self._normalize_id(raw_id)
if raw_id is None:
sid = res.get("title") or f"pangaea_unidentified_{len(self.studies) + 1}"
if sid not in self.studies:
self.studies[sid] = PangaeaStudy(
study_id=sid,
cache_dir=self.cache_dir,
auth_token=self.auth_token,
)
logger.info(f"Retrived {len(self.studies)} studies")
return self.get_summary()
# if display else logger.info(f"Retrived {len(self.studies)} studies")
# -------------------------
# get_summary(): returns DataFrame of ALL registered studies (same shape as Dataset.get_summary())
# -------------------------
[docs]
def get_summary(self) -> pd.DataFrame:
"""
Retrieve summary metadata for all registered studies.
Returns
-------
pandas.DataFrame
Return a DataFrame summarizing all loaded/registered PANGAEA datasets.
["StudyID","StudyName","EarliestYearBP","MostRecentYearBP",
"EarliestYearCE","MostRecentYearCE","Coverage [S, N, W, E]",
"StudyNotes","ScienceKeywords","Investigators",
"Publications","Sites","Funding"]
"""
rows = []
collection_found = []
for study in self.studies.values():
if study._panobj.isCollection:
collection_found.append(study.study_id)
rows.append(study.to_summary_dict())
if collection_found:
logger.warning(
f"The search contains dataset(s) [{', '.join(map(str, collection_found))}] marked as collection. "
"Refer to the 'CollectionMembers' column to"
"identify respective child datasets."
)
return pd.DataFrame(rows)
# -------------------------
# get_geo(): per-site DataFrame like Dataset.get_geo()
# -------------------------
[docs]
def get_geo(self) -> pd.DataFrame:
"""
Retrieve geographic information for all studies.
Returns
-------
pandas.DataFrame
Combined geographic metadata.
['StudyID','SiteID','SiteName','LocationName','Latitude','Longitude','MinElevation','MaxElevation','DataType']
If PANGAEA lacks site-level metadata, returns an empty DataFrame.
"""
frames = [study.get_geo() for study in self.studies.values()]
return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
# -------------------------
# get_publications(): aggregated DataFrame per study
# -------------------------
[docs]
def get_publications(
self,
save: bool = False,
path: Optional[str] = None,
verbose: bool = False,
):
"""
Retrieve publication information for all registered PANGAEA studies.
This method aggregates publication metadata across all studies currently
registered in the dataset. It returns both a structured pandas DataFrame
and a BibTeX-compatible ``pybtex.database.BibliographyData`` object.
Publication extraction is performed at the study level via the internal
``PangaeaStudy._extract_publications()`` method, and results are combined
across studies.
Publication Sources
-------------------
Publications are collected from three metadata sources within each
``PanDataSet`` object:
1. **Dataset Citation (Primary Source)**
- Extracted directly from ``PanDataSet.citation``.
- No external API calls are made for this source.
- The citation string is treated as authoritative dataset metadata.
- Parsed minimally into:
- ``Title`` → full citation string
- ``Year`` → extracted using regex
- Other structured fields (authors, journal, etc.) may be unavailable.
- Assigned:
``Type = "citation"``
2. **Supplementary Publications**
- Extracted from ``PanDataSet.supplement_to["uri"]``.
- Represents the publication to which the dataset is a supplement.
- If the URI contains a valid DOI, metadata is fetched via Crossref.
- Assigned:
``Type = "supplement to"``
3. **Related Publications**
- Extracted from ``PanDataSet.relations``.
- Each relation is expected to have:
``{"id", "title", "uri", "type"}``
- Only relations with DOI-containing URIs are processed.
- Metadata is fetched via Crossref.
- Assigned:
``Type = relation["type"]``
Crossref Integration
--------------------
For supplement and relation publications, metadata is retrieved using
``doi2bib.crossref.get_bib()`` and parsed via ``bibtexparser``.
The helper method ``_fetch_publication_from_doi()``:
- Normalizes DOI strings
- Retrieves BibTeX from Crossref
- Parses BibTeX into structured fields
- Converts entries into:
- Row dictionaries for DataFrame output
- ``pybtex.database.Entry`` objects for BibTeX export
Dataset citations are intentionally excluded from Crossref resolution
to avoid inconsistencies and API failures.
Deduplication
-------------
Duplicate DOIs across citation, supplement, and relations are removed
within each study using a DOI-based uniqueness check.
Returns
-------
tuple
A tuple containing:
- ``BibliographyData`` :
A pybtex bibliography object containing all publication entries,
suitable for BibTeX export.
- ``pandas.DataFrame`` :
Tabular representation of publications with columns:
- ``Author`` : str or None
- ``Title`` : str
- ``Journal`` : str or None
- ``Year`` : str or None
- ``Volume`` : str or None
- ``Number`` : str or None
- ``Pages`` : str or None
- ``Type`` : str
One of {"citation", "supplement to", relation type}
- ``DOI`` : str or None
- ``URL`` : str or None
Parameters
----------
save : bool, optional, default=False
If True, saves the BibTeX output to disk.
path : str or None, optional
Output file path or directory. If a directory is provided, a timestamped
filename is generated.
verbose : bool, optional, default=False
If True, prints the BibTeX content to stdout.
Notes
-----
- The dataset citation is always included, even if supplement or relation
publications are present.
- Crossref lookups are only performed for supplement and relation DOIs.
- Some fields may be missing depending on metadata availability.
- BibTeX entry keys are auto-generated and guaranteed to be unique
within the returned bibliography.
See Also
--------
PangaeaStudy._extract_publications : Study-level publication extraction logic
_fetch_publication_from_doi : DOI-based metadata retrieval helper
"""
all_rows = []
all_entries = {}
idx = 0
for study in self.studies.values():
rows, entries = study._extract_publications()
all_rows.extend(rows)
for k, v in entries.items():
idx += 1
all_entries[f"{k}_{idx}"] = v
df = pd.DataFrame(all_rows)
bibs = BibliographyData(entries=all_entries)
if save:
if not path:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M")
path = f"bibtex_{timestamp}.bib"
if os.path.isdir(path):
path = os.path.join(
path,
f"pangaea_publications_{datetime.datetime.now().strftime('%Y%m%d_%H%M')}.bib",
)
writer = Writer()
with open(path, "w", encoding="utf-8") as fh:
writer.write_stream(bibs, fh)
if verbose:
writer = Writer()
s = io.StringIO()
writer.write_stream(bibs, s)
print(s.getvalue())
return bibs, df
# -------------------------
# get_funding(): aggregated funding across studies
# -------------------------
[docs]
def get_funding(self) -> pd.DataFrame:
"""
Retrieve funding information for all registered studies.
Returns
-------
pandas.DataFrame
Combined funding DataFrame across studies.
If no funding is available, returns empty DataFrame
with standardized columns.
"""
frames = [study.get_funding() for study in self.studies.values()]
frames = [f for f in frames if not f.empty]
if not frames:
return pd.DataFrame(
columns=["StudyID", "StudyName", "FundingAgency", "FundingGrant"]
)
return pd.concat(frames, ignore_index=True)
# -------------------------
# get_data(identifier): DOI or file URL -> pandas.DataFrame parsed table and set df.attrs["source"]
# -------------------------
[docs]
def get_data(self, study_id: int) -> pd.DataFrame:
"""
Retrieve dataset for a specific study.
If the study is a collection, a warning is logged
suggesting access to its collection members.
If the study is not registered but exists as a
collection member of a registered study, it will
be automatically loaded and registered.
Parameters
----------
study_id : int, str, or list
One or more StudyIDs.
Returns
-------
pandas.DataFrame or dict
If single ID → DataFrame.
If multiple IDs → dict of {StudyID: DataFrame}.
"""
normalized_ids = self._resolve_and_register_ids(study_id)
results = []
for sid in normalized_ids:
study = self.studies[sid]
if study._panobj.isCollection:
logger.warning(
f"Study {sid} is a collection dataset. Skipping."
)
continue
results.append(study.get_data())
return results
# -------------------------
# translator stub
# -------------------------
[docs]
def convert_tups_to_pangaea(self, tups_query: Dict[str, Any]) -> Dict[str, Any]:
"""
Stub for TUPS -> PANGAEA query translation.
"""
return {}
[docs]
def get_variables(self, study_ids=None) -> pd.DataFrame:
"""
Retrieve variable metadata for specified studies.
Parameters
----------
study_ids : int, str, list, or None
One or more StudyIDs. Can be numeric or DOI string.
If None, variables for all registered studies are returned.
Returns
-------
pandas.DataFrame
One row per (study × variable).
Raises
------
KeyError
If a requested StudyID is not registered and not found
among collection members.
"""
if study_ids is None:
selected = list(self.studies.values())
else:
if not isinstance(study_ids, (list, tuple)):
study_ids = [study_ids]
selected = []
for sid in study_ids:
normalized_id = self._normalize_id(sid)
# Directly registered
if normalized_id in self.studies:
selected.append(self.studies[normalized_id])
continue
# Check collection members
found = False
for parent in self.studies.values():
members = parent._panobj.collection_members
if members:
normalized_members = [
PangaeaStudy._normalize_id(m) for m in members
]
if normalized_id in normalized_members:
# Auto-load and register
self.studies[normalized_id] = PangaeaStudy(
study_id=normalized_id,
cache_dir=self.cache_dir,
auth_token=self.auth_token,
)
selected.append(self.studies[normalized_id])
found = True
break
if not found:
raise KeyError(
f"Study '{sid}' not found. "
f"Run search_studies() first."
)
frames = [study.get_variables() for study in selected]
if not frames:
return pd.DataFrame()
return pd.concat(frames, ignore_index=True)