__all__ = ['NOAADataset', 'UnsupportedFileTypeError']
import logging, warnings
import requests
import pandas as pd
from .BaseDataset import BaseDataset
from ..utils.NOAAStudy import NOAAStudy
from ..utils.helpers import assert_list
from ..utils.Parser.StandardParser import DataFetcher, StandardParser
from ..utils.Parser.NonStandardParser import NonStandardParser
from ..utils.api.constants import BASE_URL
from ..utils.api.query_builder import build_noaa_payload
from ..utils.api.http import get
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='[%(asctime)s][%(levelname)s] - %(message)s')
class UnsupportedFileTypeError(Exception):
"""Raised when a file type is not supported by the parser."""
pass
[docs]
class NOAADataset(BaseDataset):
"""
A wrapper class for interacting with the NOAA Studies API.
Manages the retrieval, parsing, and aggregation of NOAA study data,
and provides methods to access summaries, publications, sites, and external data files.
Attributes
----------
BASE_URL : str
The NOAA API endpoint URL.
studies : dict
A mapping from NOAAStudyId to NOAAStudy instances.
data_table_index : dict
A mapping from dataTableID to associated study, site, and paleo data.
"""
_PROPRIETARY_TYPES = {'crn', 'rwl', 'fhx', 'lpd'}
def __init__(self):
"""
Initialize the Dataset instance.
Attributes are set to their default empty values.
"""
self.studies = {} # NOAAStudyId -> NOAAStudy instance
self.data_table_index = {} # dataTableID -> dict with study, site, paleo_data
self.file_url_to_datatable = {} # file_url -> dataTableID
# self.last_timing = {}
self.logger = logging.getLogger("pyleotups.NOAADataset")
def _reindex(self):
"""Rebuild secondary indexes from `self.studies`."""
self.data_table_index.clear()
self.file_url_to_datatable.clear()
for study in self.studies.values():
for site in study.sites:
for paleo in site.paleo_data:
# map datatable -> study/site/paleo
self.data_table_index[paleo.datatable_id] = {
"study_id": study.study_id,
"site_id": site.site_id,
"paleo_data": paleo,
}
# map file_url -> datatable
for f in paleo.files:
url = f.get("fileUrl")
if url:
self.file_url_to_datatable[url] = paleo.datatable_id
def __add__(self, other):
if not isinstance(other, NOAADataset):
return NotImplemented
merged = NOAADataset()
# Start with a shallow copy of left's studies
merged.studies = dict(self.studies)
# Union by StudyID. If duplicate ID appears, keep left's version
# but sanity-check equality and warn if they differ.
for sid, study in other.studies.items():
if sid in merged.studies:
try:
check_same_study_content = (merged.studies[sid].to_dict() == study.to_dict())
except Exception:
check_same_study_content = False
if not check_same_study_content:
log.warning(
f"NOAADataset union: duplicate StudyID {sid} with differing content. "
"Keeping left-hand version. i.e. if C = A + B is perfomed, contents of A will be kept."
)
# else identical content -> do nothing
else:
merged.studies[sid] = study
# Rebuild indexes so they match the merged studies
merged._reindex()
return merged
def __iadd__(self, other):
if not isinstance(other, NOAADataset):
return NotImplemented
for sid, study in other.studies.items():
if sid in self.studies:
try:
check_same_study_content = (self.studies[sid].to_dict() == study.to_dict())
except Exception:
check_same_study_content = False
if not check_same_study_content:
log.warning(
f"Dataset in-place union: duplicate StudyID {sid} with differing content. "
"Keeping existing version. i.e. IF A = A + B is perfomed, contents of A will be kept"
)
else:
self.studies[sid] = study
self._reindex()
return self
[docs]
def search_studies(
self,
xml_id: int | str | None = None,
noaa_id: int | str | None = None,
search_text: str | None = None,
data_type_id: str | None = None,
investigators: str | list[str] | None = None,
investigators_and_or: str = "or",
locations: str | list[str] | None = None,
locations_and_or: str = "or",
keywords: str | list[str] | None = None,
keywords_and_or: str = "or",
species: str | list[str] | None = None,
species_and_or: str = "or",
variable_name: str | list[str] | None = None,
variable_name_and_or: str = "or",
cv_materials: str | list[str] | None = None,
cv_materials_and_or: str = "or",
cv_seasonalities: str | list[str] | None = None,
cv_seasonalities_and_or: str = "or",
min_lat: int | None = None,
max_lat: int | None = None,
min_lon: int | None = None,
max_lon: int | None = None,
min_elevation: int | None = None,
max_elevation: int | None = None,
earliest_year: int | None = None,
latest_year: int | None = None,
time_format: str | None = None,
time_method: str | None = None,
reconstruction: bool | None = None,
recent: bool = False,
limit: int = 100,
skip: int | None = None,
data_publisher: str = "NOAA",
):
r"""
Search for NOAA studies using the specified parameters.
At least one parameter must be provided to perform a search. This method interfaces with
the NOAA NCEI Paleo Study Search API. Use it to filter studies based on location,
investigators, time range, keywords, and more.
Parameters
----------
xml_id : str, optional
Specify the internal XML document ID. Must be an exact match (e.g., '1840').
noaa_id : str, optional
Provide the unique NOAA Study ID as a number (e.g., '13156').
search_text : str, optional
General text search across study content. Supports wildcards (%) and logical operators (AND, OR).
Examples: 'younger dryas', 'loess AND stratigraphy'
data_publisher : str, default "NOAA"
Choose from: 'NOAA', 'NEOTOMA', or 'PANGAEA'.
Example: 'NOAA'
data_type_id : str, optional
Filter by data type. Use one or more type IDs separated by '|'.
Available IDs.
1. BOREHOLE, 2. CLIMATE FORCING, 3. CLIMATE RECONSTRUCTIONS, 4. CORALS AND SCLEROSPONGES,
6. HISTORICAL, 7. ICE CORES, 8. INSECT, 9. LAKE LEVELS, 10. LOESS,
11. PALEOCLIMATIC MODELING, 12. FIRE HISTORY, 13. PALEOLIMNOLOGY, 14. PALEOCEANOGRAPHY,
15. PLANT MACROFOSSILS, 16. POLLEN, 17. SPELEOTHEMS, 18. TREE RING,
19. OTHER COLLECTIONS, 20. INSTRUMENTAL, 59. SOFTWARE, 60. REPOSITORY
Example: '4', '4|18'
investigators : str or list[str], optional
Investigator(s) in the form ``"LastName, Initials"``. Lists are joined with ``|``.
investigators_and_or : {"and","or"}, default "or"
Logical combiner when multiple investigators are supplied. Only sent when 2+ items.
locations : str or list[str], optional
Location(s) as hierarchical strings using ``>`` (e.g., ``"Continent>Africa>Kenya"``). Lists joined with ``|``.
locations_and_or : {"and","or"}, default "or"
Logical combiner for multiple locations. Only sent when 2+ items.
keywords : str or list[str], optional
Controlled keyword(s); hierarchies with ``>``. Lists joined with ``|``.
keywords_and_or : {"and","or"}, default "or"
Logical combiner for multiple keywords. Only sent when 2+ items.
species : str or list[str], optional
Four-letter tree species codes (uppercase enforced). Lists joined with ``|``.
species_and_or : {"and","or"}, default "or"
Logical combiner for multiple species. Only sent when 2+ items.
variable_name : str or list[str], optional
Refers to PaST "cvWhats" terms (hierarchies with ``>``). Lists joined with ``|``.
variable_name_and_or : {"and","or"}, default "or"
Logical combiner for multiple cvWhats/variable_name. Only sent when 2+ items.
cv_materials : str or list[str], optional
PaST “Material” terms (hierarchies with ``>``). Lists joined with ``|``.
cv_materials_and_or : {"and","or"}, default "or"
Logical combiner for multiple cv_materials. Only sent when 2+ items.
cv_seasonalities : str or list[str], optional
PaST “Seasonality” terms (e.g., ``"annual"`` or ``"3-month>Aug-Oct"``). Lists joined with ``|``.
cv_seasonalities_and_or : {"and","or"}, default "or"
Logical combiner for multiple cv_seasonalities. Only sent when 2+ items.
min_lat, max_lat : int, optional
Latitude bounds in whole degrees (–90..90).
min_lon, max_lon : int, optional
Longitude bounds in whole degrees (–180..180).
min_elevation, max_elevation : int, optional
Elevation bounds in meters (integers; negative allowed).
earliest_year, latest_year : int, optional
Year bounds (integers; negative allowed). If provided without time settings, ``time_format`` defaults to ``'CE'``.
time_format : {"CE","BP"}, optional
Interpretation of years. If omitted with a time window, defaults to ``'CE'``.
time_method : {"overAny","entireOver","overEntire"}, optional, default = None
How to apply the time window (overlap, envelop, or within). NOAA internally defaults to "overAny" if a time window is provided without a method.
reconstruction : bool or str, optional
Accepts True/False or strings (case-insensitive) like ``"true"|"yes"|"y"|"1"`` → ``'Y'`` and
``"false"|"no"|"n"|"0"`` → ``'N'``. ``None`` omits the filter.
recent : bool, default False
If True, restrict to studies from the last ~2 years (newest first).
limit : int, default 100
Number of studies to return (PyleoTUPS default).
skip : int, optional
Number of studies to skip (for pagination). Use with ``limit`` to page through results.
Example: ``limit=10, skip=10`` returns items 11–20.
Returns
-------
pandas.DataFrame
Response DataFrame. Fills the internal `studies` attribute with structured NOAA study data.
Raises
------
ValueError
If no inputs are passed.
requests.HTTPError
If the HTTP request returned an unsuccessful status code.
Notes
-----
User Guide.
Multi-value fields. For ``investigators``, ``locations``, ``keywords``, ``species``, ``variable_name`` (cvWhats),
``cv_materials``, ``cv_seasonalities``:
- Accept a string (already ``|``-separated) or a Python list of strings.
- Lists are joined with ``|``. The corresponding ``*_and_or`` flag is included only when 2+ items.
- Species are validated to four uppercase letters.
Identifiers short-circuit. If ``xml_id`` or ``noaa_id`` is set, the request includes only that id (plus
publisher), ignoring other filters.
Time window defaults. If either ``earliest_year`` or ``latest_year`` is provided and neither ``time_format``
nor ``time_method`` is supplied, ``time_format`` defaults to ``'CE'`` (a note is recorded).
Unsupported parameters. ``headersOnly`` is not supported by PyleoTUPS and ignored if passed.
Boolean normalization. Parameters expected as ``'Y'/'N'`` accept: True/False, or strings like
``"true"|"yes"|"y"|"1"`` → ``'Y'`` and ``"false"|"no"|"n"|"0"`` → ``'N'``.
Examples
--------
Quick start (identifiers)
^^^^^^^^^^^^^^^^^^^^^^^^^
.. jupyter-execute::
import pyleotups as pt
ds = pt.NOAADataset()
df_noaa = ds.search_studies(noaa_id=13156)
df_xml = ds.search_studies(xml_id=1840)
df_noaa.head()
df_xml.head()
Full-text search (Oracle syntax)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. jupyter-execute::
### Single phrase
df_singlephrase = ds.search_studies(search_text="younger dryas", limit=20)
df_singlephrase.head()
.. jupyter-execute::
### Logical operator (AND)
df_logop = ds.search_studies(search_text="loess AND stratigraphy", limit=20)
df_logop.head()
.. jupyter-execute::
### Wildcards: '_' (single char), '%' (multi-char)
df_wc_1 = ds.search_studies(search_text="f_re", limit=20)
df_wc_2 = ds.search_studies(search_text="pol%", limit=20)
df_wc_1.head(), df_wc_2.head()
.. jupyter-execute::
### Escaping special characters (use backslashes)
df_specchar = ds.search_studies(search_text=r"noaa\-tree\-19260", limit=20)
df_specchar.head()
Investigators, keywords, locations
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. jupyter-execute::
### Multiple investigators (OR by default)
df_multinv_default = ds.search_studies(investigators=["Wahl, E.R.", "Vose, R.S."])
df_multinv_default.head()
.. jupyter-execute::
### Multiple investigators (AND by default)
df_multinv_and = ds.search_studies(investigators=["Wahl, E.R.", "Vose, R.S."], investigators_and_or="and")
df_multinv_and.head()
.. jupyter-execute::
### Keywords: hierarchy with '>' and multiple via '|'
df_keywords = ds.search_studies(keywords="earth science>paleoclimate>paleocean>biomarkers")
df_keywords.head()
.. jupyter-execute::
### Location hierarchy
df_loc = ds.search_studies(locations="Continent>Africa>Eastern Africa>Zambia")
df_loc.head()
Species and types
^^^^^^^^^^^^^^^^^
.. jupyter-execute::
### Species: four-letter codes (uppercase enforced)
df_species = ds.search_studies(species=["ABAL", "PIPO"])
df_species.head()
.. jupyter-execute::
### Data types: one or more IDs separated by '|'
df_muldatatypes = ds.search_studies(data_type_id="4|18")
df_muldatatypes.head()
Geography and elevation
^^^^^^^^^^^^^^^^^^^^^^^
.. jupyter-execute::
df_latlong = ds.search_studies(min_lat=68, max_lat=69, min_lon=30, max_lon=40)
df_latlong.head()
.. jupyter-execute::
df_elv = ds.search_studies(min_elevation=100, max_elevation=110)
df_elv.head()
Time window (defaults to CE if no time settings)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. jupyter-execute::
### Explicit BP with method
df_timew = ds.search_studies(earliest_year=12000, time_format="BP", time_method="overAny")
df_timew.head()
.. jupyter-execute::
### No time_format/time_method → defaults to CE
df_time_defualt = ds.search_studies(earliest_year=1500, latest_year=0)
df_time_defualt.head()
Reconstructions and recency
^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. jupyter-execute::
df_recon = ds.search_studies(reconstruction=True)
df_recon.head()
.. jupyter-execute::
df_recent = ds.search_studies(recent=True, limit=25)
df_recent.head()
Limit and Skipping (for pagination)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. jupyter-execute::
### Limit up to first 10 results
df_limit = ds.search_studies(earliest_year=12000, time_format="BP", time_method="overAny", limit=10)
df_limit.head()
### Skip the first 10 results (i.e., get results 11-20)
df_skip = ds.search_studies(earliest_year=12000, time_format="BP", time_method="overAny", limit=10, skip=10)
df_skip.head()
"""
kwargs = locals().copy()
kwargs.pop("self")
if "headers_only" in kwargs:
log.warning("Keyword Argument Pair : 'headers_only' is not supported and will be ignored while making requests.")
kwargs.pop("headers_only", None)
if not any([
kwargs.get("xml_id"), kwargs.get("noaa_id"),
kwargs.get("data_type_id"), kwargs.get("keywords"),
kwargs.get("investigators"),
kwargs.get("max_lat"), kwargs.get("min_lat"),
kwargs.get("max_lon"), kwargs.get("min_lon"),
kwargs.get("location") or kwargs.get("locations"),
kwargs.get("publication"), kwargs.get("search_text"),
kwargs.get("earliest_year"), kwargs.get("latest_year"),
kwargs.get("variable_name"), kwargs.get("min_elevation"),
kwargs.get("max_elevation"), kwargs.get("time_format"),
kwargs.get("time_method"), kwargs.get("reconstruction"),
kwargs.get("species"), kwargs.get("recent"), kwargs.get("skip")
]):
raise ValueError(
"At least one search parameter must be specified to initiate a query. "
"To view available parameters and usage examples, run: help(NOAADataset.search_studies)"
)
if kwargs.get("data_publisher") and kwargs["data_publisher"] != "NOAA":
raise NotImplementedError(
"PyleoTUPS currently supports data_publisher='NOAA' only. "
"Please retry with data_publisher='NOAA'."
)
if kwargs.get("cv_whats") and not kwargs.get("variable_name"):
kwargs["variable_name"] = kwargs.pop("cv_whats")
# Build payload using our utils (handles ids short-circuit, list→'|', Y/N coercion, time default)
payload, notes = build_noaa_payload(**kwargs)
for n in notes:
log.info("search_studies: %s", n)
self.last_search_notes = notes
# --- Make the request with explicit 204 handling ---
try:
resp = get(BASE_URL, payload)
resp.raise_for_status()
status = resp.status_code
except Exception as e:
raise requests.HTTPError(f"HTTP Request Error from NOAA: {e}")
# 204 No Content → no studies for given filters
if status == 204:
inv = payload.get("investigators")
if inv:
log.warning(
"No studies found for investigator(s): "
f"{inv}. NOAA expects 'LastName, Initials'. Try variations like:\n"
" - 'LastName, Initials'\n - 'LastName'\n - 'Initials'"
)
# Nothing to parse; return display summary (empty) or None
log.info(f"Retrieved {len(self.studies)} studies.")
return self.get_summary()
# if ("display" in kwargs and kwargs.get("display")) else log.info(f"Retrieved {len(self.studies)} studies.")
# Non-204: ensure success and parse JSON
try:
response_json = resp.json()
except Exception as e:
raise RuntimeError(f"Failed to parse NOAA response as JSON: {e}")
# Parse into internal structures (you already have this)
self._parse_response(response_json, kwargs.get("limit"))
log.info(f"Retrieved {len(self.studies)} studies.")
return self.get_summary()
# if ("display" in kwargs and kwargs.get("display")) else log.info(f"Retrieved {len(self.studies)} studies.")
def _parse_response(self, data, limit):
"""
Parse the JSON response and populate studies and reverse mapping indexes.
"""
from tqdm import tqdm
self.studies.clear()
self.data_table_index.clear()
self.file_url_to_datatable.clear()
for study_data in tqdm(data.get('study', []), desc="Parsing NOAA studies"):
study_obj = NOAAStudy(study_data)
self.studies[study_obj.study_id] = study_obj
for site in study_obj.sites:
for paleo in site.paleo_data:
self.data_table_index[paleo.datatable_id] = {
'study_id': study_obj.study_id,
'site_id': site.site_id,
'paleo_data': paleo
}
for file_obj in paleo.files:
file_url = file_obj.get('fileUrl')
if file_url:
self.file_url_to_datatable[file_url] = paleo.datatable_id
if isinstance(limit, int) and len(data.get('study', [])) >= limit:
log.warning(
f"Retrieved {limit} studies, which is the specified limit. "
"Consider increasing the limit parameter to fetch more studies."
)
[docs]
def get_summary(self):
"""
Get a DataFrame summarizing all loaded studies.
Returns
-------
pandas.DataFrame
A DataFrame with a summary of study metadata and components.
Examples
--------
Examples
--------
.. jupyter-execute::
from pyleotups import NOAADataset
ds=NOAADataset()
df = ds.search_studies(noaa_id=33213)
df.head()
"""
data = [study.to_dict() for study in self.studies.values()]
return pd.DataFrame(data)
[docs]
def get_publications(self, save=False, path=None, verbose=False):
"""
Get all publications in both BibTeX and DataFrame formats.
Parameters
----------
save : bool, default=False
If True, save the BibTeX to a .bib file.
path : str or None, optional
Path to save the .bib file. If None and save=True,
saves to 'bibtex_<timestamp>.bib'.
verbose : bool, default=False
If True, print the BibTeX content to console.
Returns
-------
tuple (pybtex.database.BibliographyData, pandas.DataFrame)
BibTeX object and DataFrame of publication details.
Examples
--------
.. jupyter-execute::
from pyleotups import NOAADataset
ds=NOAADataset()
dsf = ds.search_studies(noaa_id=33213)
bib, df = ds.get_publications()
df.head()
"""
from pybtex.database import BibliographyData
publications_data = []
bib_entries = {}
# Collect publication metadata and BibTeX entries
for study in self.studies.values():
for pub in study.publications:
pub_dict = pub.to_dict()
pub_dict['StudyID'] = study.study_id
pub_dict['StudyName'] = study.metadata.get("studyName") or "Unknown Study"
publications_data.append(pub_dict)
try:
citation_key = pub.get_citation_key()
bib_entries[citation_key] = pub.to_bibtex_entry()
except Exception as e:
raise ValueError(
f"Failed to convert a publication in study {study.study_id} to BibTeX. "
f"Original error: {e}"
)
df = pd.DataFrame(publications_data)
bibs = BibliographyData(entries=bib_entries)
# Save to file if requested
if save:
import datetime
from pybtex.database.output.bibtex import Writer
from pathlib import Path
if not path:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M")
path = f"bibtex_{timestamp}.bib"
log.warning(f"No path specified. Saving BibTeX to: {path}")
try:
writer = Writer()
with open(Path(path), "w", encoding="utf-8") as f:
writer.write_stream(bibs, f)
except Exception as e:
raise IOError(f"Failed to write BibTeX file to '{path}': {e}")
# Print if verbose
if verbose:
from pybtex.database.output.bibtex import Writer
from io import StringIO
buffer = StringIO()
Writer().write_stream(bibs, buffer)
print(buffer.getvalue())
return bibs, df
[docs]
def get_tables(self):
"""
Get a DataFrame of all sites expanded to paleo data files.
Returns
-------
pandas.DataFrame
A DataFrame with one row per (Site × PaleoData × File).
Examples
--------
.. jupyter-execute::
from pyleotups import NOAADataset
ds=NOAADataset()
dsf = ds.search_studies(noaa_id=33213)
df = ds.get_tables()
df.head()
"""
records = []
for study in self.studies.values():
study_id = study.study_id
study_name = study.metadata.get("studyName")
for site in study.sites:
for paleo in site.paleo_data:
for file_obj in paleo.files:
row = paleo.to_dict(file_obj)
# ❌ REMOVE paleo-level field
row.pop("TotalFilesAvailable", None)
row.update({
"SiteID": site.site_id,
"SiteName": site.site_name,
"LocationName": site.location_name,
"GeoType": site.geo_type,
"GeometryType": site.geometry_type,
"MinLatitude": site.south_lat,
"MaxLatitude": site.north_lat,
"MinLongitude": site.west_lon,
"MaxLongitude": site.east_lon,
"MinElevation": site.min_elevation,
"MaxElevation": site.max_elevation,
"StudyID": study_id,
"StudyName": study_name,
})
records.append(row)
return pd.DataFrame(records)
[docs]
def get_sites(self):
"""
Get a DataFrame of all sites expanded to paleo data files.
Returns
-------
pandas.DataFrame
A DataFrame with one row per (Site × PaleoData × File).
"""
records = []
for study in self.studies.values():
study_id = study.study_id
study_name = study.metadata.get("studyName")
for site in study.sites:
file_urls = []
variables_list = []
for paleo in site.paleo_data:
for file_obj in paleo.files:
file_url = file_obj.get("fileUrl")
file_urls.append(file_url)
variables = list(
paleo.file_variable_map.get(file_url, {}).keys()
)
variables_list.append(variables)
row = {
# ❌ Removed DataTable-level fields
"FileURL": file_urls,
"Variables": variables_list,
"TotalFilesAvailable": len(file_urls),
"SiteID": site.site_id,
"SiteName": site.site_name,
"LocationName": site.location_name,
"GeoType": site.geo_type,
"GeometryType": site.geometry_type,
"MinLatitude": site.south_lat,
"MaxLatitude": site.north_lat,
"MinLongitude": site.west_lon,
"MaxLongitude": site.east_lon,
"MinElevation": site.min_elevation,
"MaxElevation": site.max_elevation,
"StudyID": study_id,
"StudyName": study_name,
}
records.append(row)
df = pd.DataFrame(records)
# Reorder as requested
priority = ["SiteID", "SiteName", "StudyID"]
cols = priority + [c for c in df.columns if c not in priority]
return df[cols]
[docs]
def get_geo(self):
"""
Get a DataFrame of site-level geospatial metadata and associated data types
from all studies loaded into the Dataset.
Returns
-------
pandas.DataFrame
A DataFrame with one row per site and columns:
['StudyID', 'SiteID', 'SiteName', 'LocationName',
'Latitude', 'Longitude', 'MinElevation', 'MaxElevation', 'DataType']
Examples
--------
.. jupyter-execute::
from pyleotups import NOAADataset
ds=NOAADataset()
dsf = ds.search_studies(noaa_id=33213)
df = ds.get_geo()
df.head()
"""
site_records = []
for study in self.studies.values():
study_id = study.study_id
data_type = study.metadata.get("dataType", "Unknown")
for site in study.sites:
site_dict = {
"StudyID": study_id,
"DataType": data_type,
**{
k: v for k, v in site.to_dict()[0].items() # site.to_dict() returns list of dicts (1 per file)
if k in ["SiteID", "SiteName", "LocationName", "GeoType","GeometryType", "MinLatitude", "MaxLatitude", "MinLongitude", "MaxLongitude", "MinElevation", "MaxElevation"]
}
}
site_records.append(site_dict)
return pd.DataFrame(site_records)
[docs]
def get_funding(self):
"""
Get a DataFrame of all funding records across loaded studies.
Returns
-------
pandas.DataFrame
A DataFrame with columns ['StudyID', 'StudyName', 'FundingAgency', 'FundingGrant'].
Returns an empty DataFrame if no funding is available.
Examples
--------
.. jupyter-execute::
from pyleotups import NOAADataset
ds=NOAADataset()
dsf = ds.search_studies(noaa_id=33213)
df = ds.get_funding()
df.head()
"""
records = []
for study in self.studies.values():
study_id = study.study_id
study_name = study.metadata.get("studyName")
for fund in study.funding:
if isinstance(fund, dict):
records.append({
"StudyID": study_id,
"StudyName": study_name,
"FundingAgency": fund.get("fundingAgency", None),
"FundingGrant": fund.get("fundingGrant", None)
})
return pd.DataFrame(records, columns=["StudyID", "StudyName", "FundingAgency", "FundingGrant"])
[docs]
def get_variables(self, dataTableIDs):
"""
Retrieve variable metadata for specified dataTableIDs.
Parameters
----------
dataTableIDs : list or str
One or more NOAA dataTableIDs.
Returns
-------
pandas.DataFrame
A DataFrame indexed by DataTableID with one row per (file × variable).
Includes full variable metadata such as cvShortName, cvUnit, etc.
Examples
--------
.. jupyter-execute::
from pyleotups import NOAADataset
ds=NOAADataset()
dsf = ds.search_studies(noaa_id=33213)
df_var = ds.get_variables(dataTableIDs="45859")
df_var.head()
"""
dataTableIDs = assert_list(dataTableIDs)
records = []
for dt_id in dataTableIDs:
mapping = self.data_table_index.get(dt_id)
if not mapping:
raise ValueError(f"DataTableID '{dt_id}' not found. Please run `search_studies` first.")
paleo = mapping["paleo_data"]
study_id = paleo.study_id
site_id = paleo.site_id
for file in paleo.files:
file_url = file.get("fileUrl")
if not file_url:
continue
var_map = paleo.file_variable_map.get(file_url, {})
for var_name, var_meta in var_map.items():
records.append({
"DataTableID": dt_id,
"StudyID": study_id,
"SiteID": site_id,
"FileURL": file_url,
"VariableName": var_name,
**var_meta # includes all cv* fields
})
df = pd.DataFrame(records)
if df.empty:
return pd.DataFrame(columns=["StudyID", "SiteID", "FileURL", "VariableName"]) # fallback for no data
return df.set_index("DataTableID")
def _process_file(self, file_url, mapping=None):
"""
Process a single file URL: detect parser, parse the file, and attach metadata.
"""
if not file_url:
raise ValueError("File URL is missing.")
file_type = file_url.split('.')[-1].lower()
if file_type in self._PROPRIETARY_TYPES:
raise UnsupportedFileTypeError(
f"pyleotups works with .txt files only. File type '{file_type}' is proprietary."
)
if file_type != 'txt':
raise UnsupportedFileTypeError(
f"Invalid file type '{file_type}'. Only .txt files are supported."
)
# Step 1: Detect parser type by reading initial lines
def detect_parser_type(lines):
# 1. Clean lines: strip whitespace and remove empty lines
lines = [line.strip() for line in lines if line.strip()]
if not lines:
return "unparsable" # Handle empty file
# 2. Check for "standard" parser (all comment lines at start)
# This checks the first 5 non-empty lines
if all(line.startswith("#") or line.startswith("#") for line in lines[:10]):
return "standard"
# 3. Check for "nonstandard" parser (NOAA/WDC header block)
# Find the first line that looks like a separator
start_sep_idx = -1
for i, line in enumerate(lines):
# A line with many dashes is a good separator candidate
if line.count('-') > 10:
start_sep_idx = i
break
# Find the second separator line (must be after the first)
end_sep_idx = -1
if start_sep_idx != -1:
# Look for the *next* separator line
for j in range(start_sep_idx + 1, min(start_sep_idx + 10, len(lines))): # Limit search to 10 lines
if lines[j].count('-') > 10:
end_sep_idx = j
break
# If we found a valid block (start and end separators)...
if start_sep_idx != -1 and end_sep_idx != -1 and end_sep_idx > start_sep_idx + 1:
# ...join all lines *between* the separators into one string
header_block = " ".join(lines[start_sep_idx + 1 : end_sep_idx]).lower()
# Check for the key phrases.
has_noaa = "noaa" in header_block
has_wdc = "world data center" in header_block # More general than the original
if has_noaa or has_wdc:
return "nonstandard"
# 4. If neither format is detected, return "unparsable"
return "unparsable"
try:
response = requests.get(file_url)
response.raise_for_status()
lines = response.text.splitlines()
parser_type = detect_parser_type(lines)
except Exception as e:
raise RuntimeError(f"Failed to read file from '{file_url}': {e}")
# Step 2: Use the appropriate parser
if parser_type == "standard":
parser = StandardParser(file_url)
elif parser_type == "nonstandard":
parser = NonStandardParser(file_url)
else:
raise ValueError(
f"Unable to determine parser for file: {file_url}. "
)
try:
parsed_data = parser.parse()
except Exception as e:
raise RuntimeError(f"Error while parsing file {file_url}: {e}")
# Step 3: Attach metadata
def attach_metadata(df, mapping):
df.attrs['NOAAStudyId'] = mapping.get('study_id')
study_obj = self.studies.get(mapping.get('study_id'), {})
df.attrs['StudyName'] = study_obj.metadata.get("studyName") if hasattr(study_obj, 'metadata') else None
return df
results = []
to_process = []
if isinstance(parsed_data, list):
# Handles NonStandardParser (list of objects) AND
# StandardParser (if it returns a list of DataFrames)
for item in parsed_data:
if isinstance(item, pd.DataFrame):
# Item is already a DataFrame (e.g., from StandardParser)
to_process.append(item)
elif hasattr(item, 'df') and isinstance(getattr(item, 'df'), pd.DataFrame):
# Item is an object with a .df attribute (from NonStandardParser)
to_process.append(item.df)
# else: item is some other object we don't care about (e.g., metadata block)
elif isinstance(parsed_data, pd.DataFrame):
# Handles StandardParser if it returns a single DataFrame
to_process.append(parsed_data)
# else: parsed_data is some other type we can't handle, to_process remains empty.
# Now `to_process` contains only the DataFrames we want
for df in to_process:
if mapping:
df = attach_metadata(df, mapping)
results.append(df)
return results
[docs]
def get_data(self, dataTableIDs=None, file_urls=None):
"""
Fetch external data for given dataTableIDs or file URLs, perform validations,
and attach study and site metadata.
Parameters
----------
dataTableIDs : list or str, optional
One or more NOAA data table IDs.
file_urls : list or str, optional
One or more file URLs.
Returns
-------
list of pandas.DataFrame
A list of DataFrames corresponding to the fetched data.
Raises
------
ValueError
For missing parent study mapping, missing file URL, or proprietary/unsupported file types.
Exception
Propagates any exceptions raised by the parser.
Examples
--------
.. jupyter-execute::
from pyleotups import NOAADataset
ds=NOAADataset()
df = ds.search_studies(noaa_id=33213)
dfs = ds.get_data(dataTableIDs="45859")
dfs[0].head()
"""
dfs = []
# Process based on dataTableIDs.
if dataTableIDs:
dataTableIDs = assert_list(dataTableIDs)
for dt_id in dataTableIDs:
mapping = self.data_table_index.get(dt_id)
if not mapping:
raise ValueError(f"No parent study mapping found for Data Table ID '{dt_id}'. "
"Please perform a search using this DataTableID or provide a specific file URL.")
file_url = mapping['paleo_data'].file_url
if not file_url:
raise ValueError(f"File URL for Data Table ID '{dt_id}' is missing. Cannot fetch data.")
dfs.extend(self._process_file(file_url, mapping))
return dfs
# Process based on file_urls provided directly.
if file_urls:
file_urls = assert_list(file_urls)
for url in file_urls:
mapping = self.file_url_to_datatable.get(url)
if not mapping:
log.warning(f"Attached '{url}' is not linked to any parent study; can not add metadata.")
dfs.extend(self._process_file(url))
else:
mapping_details = self.data_table_index.get(mapping)
if not mapping_details:
log.warning(
f"Mapping details for file URL '{url}' (Data Table ID '{mapping}') not found; can not add metadata.")
dfs.extend(self._process_file(url))
else:
dfs.extend(self._process_file(url, mapping_details))
return dfs
raise ValueError("No dataTableID or file URL provided. Cannot fetch data.")