Source code for pyleotups.core.NOAADataset

__all__ = ['NOAADataset', 'UnsupportedFileTypeError']

import logging, warnings
import requests
import pandas as pd

from .BaseDataset import BaseDataset

from ..utils.NOAAStudy import NOAAStudy
from ..utils.helpers import assert_list
from ..utils.Parser.StandardParser import DataFetcher, StandardParser
from ..utils.Parser.NonStandardParser import NonStandardParser
from ..utils.api.constants import BASE_URL
from ..utils.api.query_builder import build_noaa_payload
from ..utils.api.http import get


log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='[%(asctime)s][%(levelname)s] - %(message)s')

class UnsupportedFileTypeError(Exception):
    """Raised when a file type is not supported by the parser."""
    pass


[docs] class NOAADataset(BaseDataset): """ A wrapper class for interacting with the NOAA Studies API. Manages the retrieval, parsing, and aggregation of NOAA study data, and provides methods to access summaries, publications, sites, and external data files. Attributes ---------- BASE_URL : str The NOAA API endpoint URL. studies : dict A mapping from NOAAStudyId to NOAAStudy instances. data_table_index : dict A mapping from dataTableID to associated study, site, and paleo data. """ _PROPRIETARY_TYPES = {'crn', 'rwl', 'fhx', 'lpd'} def __init__(self): """ Initialize the Dataset instance. Attributes are set to their default empty values. """ self.studies = {} # NOAAStudyId -> NOAAStudy instance self.data_table_index = {} # dataTableID -> dict with study, site, paleo_data self.file_url_to_datatable = {} # file_url -> dataTableID # self.last_timing = {} self.logger = logging.getLogger("pyleotups.NOAADataset") def _reindex(self): """Rebuild secondary indexes from `self.studies`.""" self.data_table_index.clear() self.file_url_to_datatable.clear() for study in self.studies.values(): for site in study.sites: for paleo in site.paleo_data: # map datatable -> study/site/paleo self.data_table_index[paleo.datatable_id] = { "study_id": study.study_id, "site_id": site.site_id, "paleo_data": paleo, } # map file_url -> datatable for f in paleo.files: url = f.get("fileUrl") if url: self.file_url_to_datatable[url] = paleo.datatable_id def __add__(self, other): if not isinstance(other, NOAADataset): return NotImplemented merged = NOAADataset() # Start with a shallow copy of left's studies merged.studies = dict(self.studies) # Union by StudyID. If duplicate ID appears, keep left's version # but sanity-check equality and warn if they differ. for sid, study in other.studies.items(): if sid in merged.studies: try: check_same_study_content = (merged.studies[sid].to_dict() == study.to_dict()) except Exception: check_same_study_content = False if not check_same_study_content: log.warning( f"NOAADataset union: duplicate StudyID {sid} with differing content. " "Keeping left-hand version. i.e. if C = A + B is perfomed, contents of A will be kept." ) # else identical content -> do nothing else: merged.studies[sid] = study # Rebuild indexes so they match the merged studies merged._reindex() return merged def __iadd__(self, other): if not isinstance(other, NOAADataset): return NotImplemented for sid, study in other.studies.items(): if sid in self.studies: try: check_same_study_content = (self.studies[sid].to_dict() == study.to_dict()) except Exception: check_same_study_content = False if not check_same_study_content: log.warning( f"Dataset in-place union: duplicate StudyID {sid} with differing content. " "Keeping existing version. i.e. IF A = A + B is perfomed, contents of A will be kept" ) else: self.studies[sid] = study self._reindex() return self
[docs] def search_studies( self, xml_id: int | str | None = None, noaa_id: int | str | None = None, search_text: str | None = None, data_type_id: str | None = None, investigators: str | list[str] | None = None, investigators_and_or: str = "or", locations: str | list[str] | None = None, locations_and_or: str = "or", keywords: str | list[str] | None = None, keywords_and_or: str = "or", species: str | list[str] | None = None, species_and_or: str = "or", variable_name: str | list[str] | None = None, variable_name_and_or: str = "or", cv_materials: str | list[str] | None = None, cv_materials_and_or: str = "or", cv_seasonalities: str | list[str] | None = None, cv_seasonalities_and_or: str = "or", min_lat: int | None = None, max_lat: int | None = None, min_lon: int | None = None, max_lon: int | None = None, min_elevation: int | None = None, max_elevation: int | None = None, earliest_year: int | None = None, latest_year: int | None = None, time_format: str | None = None, time_method: str | None = None, reconstruction: bool | None = None, recent: bool = False, limit: int = 100, skip: int | None = None, data_publisher: str = "NOAA", ): r""" Search for NOAA studies using the specified parameters. At least one parameter must be provided to perform a search. This method interfaces with the NOAA NCEI Paleo Study Search API. Use it to filter studies based on location, investigators, time range, keywords, and more. Parameters ---------- xml_id : str, optional Specify the internal XML document ID. Must be an exact match (e.g., '1840'). noaa_id : str, optional Provide the unique NOAA Study ID as a number (e.g., '13156'). search_text : str, optional General text search across study content. Supports wildcards (%) and logical operators (AND, OR). Examples: 'younger dryas', 'loess AND stratigraphy' data_publisher : str, default "NOAA" Choose from: 'NOAA', 'NEOTOMA', or 'PANGAEA'. Example: 'NOAA' data_type_id : str, optional Filter by data type. Use one or more type IDs separated by '|'. Available IDs. 1. BOREHOLE, 2. CLIMATE FORCING, 3. CLIMATE RECONSTRUCTIONS, 4. CORALS AND SCLEROSPONGES, 6. HISTORICAL, 7. ICE CORES, 8. INSECT, 9. LAKE LEVELS, 10. LOESS, 11. PALEOCLIMATIC MODELING, 12. FIRE HISTORY, 13. PALEOLIMNOLOGY, 14. PALEOCEANOGRAPHY, 15. PLANT MACROFOSSILS, 16. POLLEN, 17. SPELEOTHEMS, 18. TREE RING, 19. OTHER COLLECTIONS, 20. INSTRUMENTAL, 59. SOFTWARE, 60. REPOSITORY Example: '4', '4|18' investigators : str or list[str], optional Investigator(s) in the form ``"LastName, Initials"``. Lists are joined with ``|``. investigators_and_or : {"and","or"}, default "or" Logical combiner when multiple investigators are supplied. Only sent when 2+ items. locations : str or list[str], optional Location(s) as hierarchical strings using ``>`` (e.g., ``"Continent>Africa>Kenya"``). Lists joined with ``|``. locations_and_or : {"and","or"}, default "or" Logical combiner for multiple locations. Only sent when 2+ items. keywords : str or list[str], optional Controlled keyword(s); hierarchies with ``>``. Lists joined with ``|``. keywords_and_or : {"and","or"}, default "or" Logical combiner for multiple keywords. Only sent when 2+ items. species : str or list[str], optional Four-letter tree species codes (uppercase enforced). Lists joined with ``|``. species_and_or : {"and","or"}, default "or" Logical combiner for multiple species. Only sent when 2+ items. variable_name : str or list[str], optional Refers to PaST "cvWhats" terms (hierarchies with ``>``). Lists joined with ``|``. variable_name_and_or : {"and","or"}, default "or" Logical combiner for multiple cvWhats/variable_name. Only sent when 2+ items. cv_materials : str or list[str], optional PaST “Material” terms (hierarchies with ``>``). Lists joined with ``|``. cv_materials_and_or : {"and","or"}, default "or" Logical combiner for multiple cv_materials. Only sent when 2+ items. cv_seasonalities : str or list[str], optional PaST “Seasonality” terms (e.g., ``"annual"`` or ``"3-month>Aug-Oct"``). Lists joined with ``|``. cv_seasonalities_and_or : {"and","or"}, default "or" Logical combiner for multiple cv_seasonalities. Only sent when 2+ items. min_lat, max_lat : int, optional Latitude bounds in whole degrees (–90..90). min_lon, max_lon : int, optional Longitude bounds in whole degrees (–180..180). min_elevation, max_elevation : int, optional Elevation bounds in meters (integers; negative allowed). earliest_year, latest_year : int, optional Year bounds (integers; negative allowed). If provided without time settings, ``time_format`` defaults to ``'CE'``. time_format : {"CE","BP"}, optional Interpretation of years. If omitted with a time window, defaults to ``'CE'``. time_method : {"overAny","entireOver","overEntire"}, optional, default = None How to apply the time window (overlap, envelop, or within). NOAA internally defaults to "overAny" if a time window is provided without a method. reconstruction : bool or str, optional Accepts True/False or strings (case-insensitive) like ``"true"|"yes"|"y"|"1"`` → ``'Y'`` and ``"false"|"no"|"n"|"0"`` → ``'N'``. ``None`` omits the filter. recent : bool, default False If True, restrict to studies from the last ~2 years (newest first). limit : int, default 100 Number of studies to return (PyleoTUPS default). skip : int, optional Number of studies to skip (for pagination). Use with ``limit`` to page through results. Example: ``limit=10, skip=10`` returns items 11–20. Returns ------- pandas.DataFrame Response DataFrame. Fills the internal `studies` attribute with structured NOAA study data. Raises ------ ValueError If no inputs are passed. requests.HTTPError If the HTTP request returned an unsuccessful status code. Notes ----- User Guide. Multi-value fields. For ``investigators``, ``locations``, ``keywords``, ``species``, ``variable_name`` (cvWhats), ``cv_materials``, ``cv_seasonalities``: - Accept a string (already ``|``-separated) or a Python list of strings. - Lists are joined with ``|``. The corresponding ``*_and_or`` flag is included only when 2+ items. - Species are validated to four uppercase letters. Identifiers short-circuit. If ``xml_id`` or ``noaa_id`` is set, the request includes only that id (plus publisher), ignoring other filters. Time window defaults. If either ``earliest_year`` or ``latest_year`` is provided and neither ``time_format`` nor ``time_method`` is supplied, ``time_format`` defaults to ``'CE'`` (a note is recorded). Unsupported parameters. ``headersOnly`` is not supported by PyleoTUPS and ignored if passed. Boolean normalization. Parameters expected as ``'Y'/'N'`` accept: True/False, or strings like ``"true"|"yes"|"y"|"1"`` → ``'Y'`` and ``"false"|"no"|"n"|"0"`` → ``'N'``. Examples -------- Quick start (identifiers) ^^^^^^^^^^^^^^^^^^^^^^^^^ .. jupyter-execute:: import pyleotups as pt ds = pt.NOAADataset() df_noaa = ds.search_studies(noaa_id=13156) df_xml = ds.search_studies(xml_id=1840) df_noaa.head() df_xml.head() Full-text search (Oracle syntax) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. jupyter-execute:: ### Single phrase df_singlephrase = ds.search_studies(search_text="younger dryas", limit=20) df_singlephrase.head() .. jupyter-execute:: ### Logical operator (AND) df_logop = ds.search_studies(search_text="loess AND stratigraphy", limit=20) df_logop.head() .. jupyter-execute:: ### Wildcards: '_' (single char), '%' (multi-char) df_wc_1 = ds.search_studies(search_text="f_re", limit=20) df_wc_2 = ds.search_studies(search_text="pol%", limit=20) df_wc_1.head(), df_wc_2.head() .. jupyter-execute:: ### Escaping special characters (use backslashes) df_specchar = ds.search_studies(search_text=r"noaa\-tree\-19260", limit=20) df_specchar.head() Investigators, keywords, locations ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. jupyter-execute:: ### Multiple investigators (OR by default) df_multinv_default = ds.search_studies(investigators=["Wahl, E.R.", "Vose, R.S."]) df_multinv_default.head() .. jupyter-execute:: ### Multiple investigators (AND by default) df_multinv_and = ds.search_studies(investigators=["Wahl, E.R.", "Vose, R.S."], investigators_and_or="and") df_multinv_and.head() .. jupyter-execute:: ### Keywords: hierarchy with '>' and multiple via '|' df_keywords = ds.search_studies(keywords="earth science>paleoclimate>paleocean>biomarkers") df_keywords.head() .. jupyter-execute:: ### Location hierarchy df_loc = ds.search_studies(locations="Continent>Africa>Eastern Africa>Zambia") df_loc.head() Species and types ^^^^^^^^^^^^^^^^^ .. jupyter-execute:: ### Species: four-letter codes (uppercase enforced) df_species = ds.search_studies(species=["ABAL", "PIPO"]) df_species.head() .. jupyter-execute:: ### Data types: one or more IDs separated by '|' df_muldatatypes = ds.search_studies(data_type_id="4|18") df_muldatatypes.head() Geography and elevation ^^^^^^^^^^^^^^^^^^^^^^^ .. jupyter-execute:: df_latlong = ds.search_studies(min_lat=68, max_lat=69, min_lon=30, max_lon=40) df_latlong.head() .. jupyter-execute:: df_elv = ds.search_studies(min_elevation=100, max_elevation=110) df_elv.head() Time window (defaults to CE if no time settings) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. jupyter-execute:: ### Explicit BP with method df_timew = ds.search_studies(earliest_year=12000, time_format="BP", time_method="overAny") df_timew.head() .. jupyter-execute:: ### No time_format/time_method → defaults to CE df_time_defualt = ds.search_studies(earliest_year=1500, latest_year=0) df_time_defualt.head() Reconstructions and recency ^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. jupyter-execute:: df_recon = ds.search_studies(reconstruction=True) df_recon.head() .. jupyter-execute:: df_recent = ds.search_studies(recent=True, limit=25) df_recent.head() Limit and Skipping (for pagination) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. jupyter-execute:: ### Limit up to first 10 results df_limit = ds.search_studies(earliest_year=12000, time_format="BP", time_method="overAny", limit=10) df_limit.head() ### Skip the first 10 results (i.e., get results 11-20) df_skip = ds.search_studies(earliest_year=12000, time_format="BP", time_method="overAny", limit=10, skip=10) df_skip.head() """ kwargs = locals().copy() kwargs.pop("self") if "headers_only" in kwargs: log.warning("Keyword Argument Pair : 'headers_only' is not supported and will be ignored while making requests.") kwargs.pop("headers_only", None) if not any([ kwargs.get("xml_id"), kwargs.get("noaa_id"), kwargs.get("data_type_id"), kwargs.get("keywords"), kwargs.get("investigators"), kwargs.get("max_lat"), kwargs.get("min_lat"), kwargs.get("max_lon"), kwargs.get("min_lon"), kwargs.get("location") or kwargs.get("locations"), kwargs.get("publication"), kwargs.get("search_text"), kwargs.get("earliest_year"), kwargs.get("latest_year"), kwargs.get("variable_name"), kwargs.get("min_elevation"), kwargs.get("max_elevation"), kwargs.get("time_format"), kwargs.get("time_method"), kwargs.get("reconstruction"), kwargs.get("species"), kwargs.get("recent"), kwargs.get("skip") ]): raise ValueError( "At least one search parameter must be specified to initiate a query. " "To view available parameters and usage examples, run: help(NOAADataset.search_studies)" ) if kwargs.get("data_publisher") and kwargs["data_publisher"] != "NOAA": raise NotImplementedError( "PyleoTUPS currently supports data_publisher='NOAA' only. " "Please retry with data_publisher='NOAA'." ) if kwargs.get("cv_whats") and not kwargs.get("variable_name"): kwargs["variable_name"] = kwargs.pop("cv_whats") # Build payload using our utils (handles ids short-circuit, list→'|', Y/N coercion, time default) payload, notes = build_noaa_payload(**kwargs) for n in notes: log.info("search_studies: %s", n) self.last_search_notes = notes # --- Make the request with explicit 204 handling --- try: resp = get(BASE_URL, payload) resp.raise_for_status() status = resp.status_code except Exception as e: raise requests.HTTPError(f"HTTP Request Error from NOAA: {e}") # 204 No Content → no studies for given filters if status == 204: inv = payload.get("investigators") if inv: log.warning( "No studies found for investigator(s): " f"{inv}. NOAA expects 'LastName, Initials'. Try variations like:\n" " - 'LastName, Initials'\n - 'LastName'\n - 'Initials'" ) # Nothing to parse; return display summary (empty) or None log.info(f"Retrieved {len(self.studies)} studies.") return self.get_summary() # if ("display" in kwargs and kwargs.get("display")) else log.info(f"Retrieved {len(self.studies)} studies.") # Non-204: ensure success and parse JSON try: response_json = resp.json() except Exception as e: raise RuntimeError(f"Failed to parse NOAA response as JSON: {e}") # Parse into internal structures (you already have this) self._parse_response(response_json, kwargs.get("limit")) log.info(f"Retrieved {len(self.studies)} studies.") return self.get_summary()
# if ("display" in kwargs and kwargs.get("display")) else log.info(f"Retrieved {len(self.studies)} studies.") def _parse_response(self, data, limit): """ Parse the JSON response and populate studies and reverse mapping indexes. """ from tqdm import tqdm self.studies.clear() self.data_table_index.clear() self.file_url_to_datatable.clear() for study_data in tqdm(data.get('study', []), desc="Parsing NOAA studies"): study_obj = NOAAStudy(study_data) self.studies[study_obj.study_id] = study_obj for site in study_obj.sites: for paleo in site.paleo_data: self.data_table_index[paleo.datatable_id] = { 'study_id': study_obj.study_id, 'site_id': site.site_id, 'paleo_data': paleo } for file_obj in paleo.files: file_url = file_obj.get('fileUrl') if file_url: self.file_url_to_datatable[file_url] = paleo.datatable_id if isinstance(limit, int) and len(data.get('study', [])) >= limit: log.warning( f"Retrieved {limit} studies, which is the specified limit. " "Consider increasing the limit parameter to fetch more studies." )
[docs] def get_summary(self): """ Get a DataFrame summarizing all loaded studies. Returns ------- pandas.DataFrame A DataFrame with a summary of study metadata and components. Examples -------- Examples -------- .. jupyter-execute:: from pyleotups import NOAADataset ds=NOAADataset() df = ds.search_studies(noaa_id=33213) df.head() """ data = [study.to_dict() for study in self.studies.values()] return pd.DataFrame(data)
[docs] def get_publications(self, save=False, path=None, verbose=False): """ Get all publications in both BibTeX and DataFrame formats. Parameters ---------- save : bool, default=False If True, save the BibTeX to a .bib file. path : str or None, optional Path to save the .bib file. If None and save=True, saves to 'bibtex_<timestamp>.bib'. verbose : bool, default=False If True, print the BibTeX content to console. Returns ------- tuple (pybtex.database.BibliographyData, pandas.DataFrame) BibTeX object and DataFrame of publication details. Examples -------- .. jupyter-execute:: from pyleotups import NOAADataset ds=NOAADataset() dsf = ds.search_studies(noaa_id=33213) bib, df = ds.get_publications() df.head() """ from pybtex.database import BibliographyData publications_data = [] bib_entries = {} # Collect publication metadata and BibTeX entries for study in self.studies.values(): for pub in study.publications: pub_dict = pub.to_dict() pub_dict['StudyID'] = study.study_id pub_dict['StudyName'] = study.metadata.get("studyName") or "Unknown Study" publications_data.append(pub_dict) try: citation_key = pub.get_citation_key() bib_entries[citation_key] = pub.to_bibtex_entry() except Exception as e: raise ValueError( f"Failed to convert a publication in study {study.study_id} to BibTeX. " f"Original error: {e}" ) df = pd.DataFrame(publications_data) bibs = BibliographyData(entries=bib_entries) # Save to file if requested if save: import datetime from pybtex.database.output.bibtex import Writer from pathlib import Path if not path: timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M") path = f"bibtex_{timestamp}.bib" log.warning(f"No path specified. Saving BibTeX to: {path}") try: writer = Writer() with open(Path(path), "w", encoding="utf-8") as f: writer.write_stream(bibs, f) except Exception as e: raise IOError(f"Failed to write BibTeX file to '{path}': {e}") # Print if verbose if verbose: from pybtex.database.output.bibtex import Writer from io import StringIO buffer = StringIO() Writer().write_stream(bibs, buffer) print(buffer.getvalue()) return bibs, df
[docs] def get_tables(self): """ Get a DataFrame of all sites expanded to paleo data files. Returns ------- pandas.DataFrame A DataFrame with one row per (Site × PaleoData × File). Examples -------- .. jupyter-execute:: from pyleotups import NOAADataset ds=NOAADataset() dsf = ds.search_studies(noaa_id=33213) df = ds.get_tables() df.head() """ records = [] for study in self.studies.values(): study_id = study.study_id study_name = study.metadata.get("studyName") for site in study.sites: for paleo in site.paleo_data: for file_obj in paleo.files: row = paleo.to_dict(file_obj) # ❌ REMOVE paleo-level field row.pop("TotalFilesAvailable", None) row.update({ "SiteID": site.site_id, "SiteName": site.site_name, "LocationName": site.location_name, "GeoType": site.geo_type, "GeometryType": site.geometry_type, "MinLatitude": site.south_lat, "MaxLatitude": site.north_lat, "MinLongitude": site.west_lon, "MaxLongitude": site.east_lon, "MinElevation": site.min_elevation, "MaxElevation": site.max_elevation, "StudyID": study_id, "StudyName": study_name, }) records.append(row) return pd.DataFrame(records)
[docs] def get_sites(self): """ Get a DataFrame of all sites expanded to paleo data files. Returns ------- pandas.DataFrame A DataFrame with one row per (Site × PaleoData × File). """ records = [] for study in self.studies.values(): study_id = study.study_id study_name = study.metadata.get("studyName") for site in study.sites: file_urls = [] variables_list = [] for paleo in site.paleo_data: for file_obj in paleo.files: file_url = file_obj.get("fileUrl") file_urls.append(file_url) variables = list( paleo.file_variable_map.get(file_url, {}).keys() ) variables_list.append(variables) row = { # ❌ Removed DataTable-level fields "FileURL": file_urls, "Variables": variables_list, "TotalFilesAvailable": len(file_urls), "SiteID": site.site_id, "SiteName": site.site_name, "LocationName": site.location_name, "GeoType": site.geo_type, "GeometryType": site.geometry_type, "MinLatitude": site.south_lat, "MaxLatitude": site.north_lat, "MinLongitude": site.west_lon, "MaxLongitude": site.east_lon, "MinElevation": site.min_elevation, "MaxElevation": site.max_elevation, "StudyID": study_id, "StudyName": study_name, } records.append(row) df = pd.DataFrame(records) # Reorder as requested priority = ["SiteID", "SiteName", "StudyID"] cols = priority + [c for c in df.columns if c not in priority] return df[cols]
[docs] def get_geo(self): """ Get a DataFrame of site-level geospatial metadata and associated data types from all studies loaded into the Dataset. Returns ------- pandas.DataFrame A DataFrame with one row per site and columns: ['StudyID', 'SiteID', 'SiteName', 'LocationName', 'Latitude', 'Longitude', 'MinElevation', 'MaxElevation', 'DataType'] Examples -------- .. jupyter-execute:: from pyleotups import NOAADataset ds=NOAADataset() dsf = ds.search_studies(noaa_id=33213) df = ds.get_geo() df.head() """ site_records = [] for study in self.studies.values(): study_id = study.study_id data_type = study.metadata.get("dataType", "Unknown") for site in study.sites: site_dict = { "StudyID": study_id, "DataType": data_type, **{ k: v for k, v in site.to_dict()[0].items() # site.to_dict() returns list of dicts (1 per file) if k in ["SiteID", "SiteName", "LocationName", "GeoType","GeometryType", "MinLatitude", "MaxLatitude", "MinLongitude", "MaxLongitude", "MinElevation", "MaxElevation"] } } site_records.append(site_dict) return pd.DataFrame(site_records)
[docs] def get_funding(self): """ Get a DataFrame of all funding records across loaded studies. Returns ------- pandas.DataFrame A DataFrame with columns ['StudyID', 'StudyName', 'FundingAgency', 'FundingGrant']. Returns an empty DataFrame if no funding is available. Examples -------- .. jupyter-execute:: from pyleotups import NOAADataset ds=NOAADataset() dsf = ds.search_studies(noaa_id=33213) df = ds.get_funding() df.head() """ records = [] for study in self.studies.values(): study_id = study.study_id study_name = study.metadata.get("studyName") for fund in study.funding: if isinstance(fund, dict): records.append({ "StudyID": study_id, "StudyName": study_name, "FundingAgency": fund.get("fundingAgency", None), "FundingGrant": fund.get("fundingGrant", None) }) return pd.DataFrame(records, columns=["StudyID", "StudyName", "FundingAgency", "FundingGrant"])
[docs] def get_variables(self, dataTableIDs): """ Retrieve variable metadata for specified dataTableIDs. Parameters ---------- dataTableIDs : list or str One or more NOAA dataTableIDs. Returns ------- pandas.DataFrame A DataFrame indexed by DataTableID with one row per (file × variable). Includes full variable metadata such as cvShortName, cvUnit, etc. Examples -------- .. jupyter-execute:: from pyleotups import NOAADataset ds=NOAADataset() dsf = ds.search_studies(noaa_id=33213) df_var = ds.get_variables(dataTableIDs="45859") df_var.head() """ dataTableIDs = assert_list(dataTableIDs) records = [] for dt_id in dataTableIDs: mapping = self.data_table_index.get(dt_id) if not mapping: raise ValueError(f"DataTableID '{dt_id}' not found. Please run `search_studies` first.") paleo = mapping["paleo_data"] study_id = paleo.study_id site_id = paleo.site_id for file in paleo.files: file_url = file.get("fileUrl") if not file_url: continue var_map = paleo.file_variable_map.get(file_url, {}) for var_name, var_meta in var_map.items(): records.append({ "DataTableID": dt_id, "StudyID": study_id, "SiteID": site_id, "FileURL": file_url, "VariableName": var_name, **var_meta # includes all cv* fields }) df = pd.DataFrame(records) if df.empty: return pd.DataFrame(columns=["StudyID", "SiteID", "FileURL", "VariableName"]) # fallback for no data return df.set_index("DataTableID")
def _process_file(self, file_url, mapping=None): """ Process a single file URL: detect parser, parse the file, and attach metadata. """ if not file_url: raise ValueError("File URL is missing.") file_type = file_url.split('.')[-1].lower() if file_type in self._PROPRIETARY_TYPES: raise UnsupportedFileTypeError( f"pyleotups works with .txt files only. File type '{file_type}' is proprietary." ) if file_type != 'txt': raise UnsupportedFileTypeError( f"Invalid file type '{file_type}'. Only .txt files are supported." ) # Step 1: Detect parser type by reading initial lines def detect_parser_type(lines): # 1. Clean lines: strip whitespace and remove empty lines lines = [line.strip() for line in lines if line.strip()] if not lines: return "unparsable" # Handle empty file # 2. Check for "standard" parser (all comment lines at start) # This checks the first 5 non-empty lines if all(line.startswith("#") or line.startswith("#") for line in lines[:10]): return "standard" # 3. Check for "nonstandard" parser (NOAA/WDC header block) # Find the first line that looks like a separator start_sep_idx = -1 for i, line in enumerate(lines): # A line with many dashes is a good separator candidate if line.count('-') > 10: start_sep_idx = i break # Find the second separator line (must be after the first) end_sep_idx = -1 if start_sep_idx != -1: # Look for the *next* separator line for j in range(start_sep_idx + 1, min(start_sep_idx + 10, len(lines))): # Limit search to 10 lines if lines[j].count('-') > 10: end_sep_idx = j break # If we found a valid block (start and end separators)... if start_sep_idx != -1 and end_sep_idx != -1 and end_sep_idx > start_sep_idx + 1: # ...join all lines *between* the separators into one string header_block = " ".join(lines[start_sep_idx + 1 : end_sep_idx]).lower() # Check for the key phrases. has_noaa = "noaa" in header_block has_wdc = "world data center" in header_block # More general than the original if has_noaa or has_wdc: return "nonstandard" # 4. If neither format is detected, return "unparsable" return "unparsable" try: response = requests.get(file_url) response.raise_for_status() lines = response.text.splitlines() parser_type = detect_parser_type(lines) except Exception as e: raise RuntimeError(f"Failed to read file from '{file_url}': {e}") # Step 2: Use the appropriate parser if parser_type == "standard": parser = StandardParser(file_url) elif parser_type == "nonstandard": parser = NonStandardParser(file_url) else: raise ValueError( f"Unable to determine parser for file: {file_url}. " ) try: parsed_data = parser.parse() except Exception as e: raise RuntimeError(f"Error while parsing file {file_url}: {e}") # Step 3: Attach metadata def attach_metadata(df, mapping): df.attrs['NOAAStudyId'] = mapping.get('study_id') study_obj = self.studies.get(mapping.get('study_id'), {}) df.attrs['StudyName'] = study_obj.metadata.get("studyName") if hasattr(study_obj, 'metadata') else None return df results = [] to_process = [] if isinstance(parsed_data, list): # Handles NonStandardParser (list of objects) AND # StandardParser (if it returns a list of DataFrames) for item in parsed_data: if isinstance(item, pd.DataFrame): # Item is already a DataFrame (e.g., from StandardParser) to_process.append(item) elif hasattr(item, 'df') and isinstance(getattr(item, 'df'), pd.DataFrame): # Item is an object with a .df attribute (from NonStandardParser) to_process.append(item.df) # else: item is some other object we don't care about (e.g., metadata block) elif isinstance(parsed_data, pd.DataFrame): # Handles StandardParser if it returns a single DataFrame to_process.append(parsed_data) # else: parsed_data is some other type we can't handle, to_process remains empty. # Now `to_process` contains only the DataFrames we want for df in to_process: if mapping: df = attach_metadata(df, mapping) results.append(df) return results
[docs] def get_data(self, dataTableIDs=None, file_urls=None): """ Fetch external data for given dataTableIDs or file URLs, perform validations, and attach study and site metadata. Parameters ---------- dataTableIDs : list or str, optional One or more NOAA data table IDs. file_urls : list or str, optional One or more file URLs. Returns ------- list of pandas.DataFrame A list of DataFrames corresponding to the fetched data. Raises ------ ValueError For missing parent study mapping, missing file URL, or proprietary/unsupported file types. Exception Propagates any exceptions raised by the parser. Examples -------- .. jupyter-execute:: from pyleotups import NOAADataset ds=NOAADataset() df = ds.search_studies(noaa_id=33213) dfs = ds.get_data(dataTableIDs="45859") dfs[0].head() """ dfs = [] # Process based on dataTableIDs. if dataTableIDs: dataTableIDs = assert_list(dataTableIDs) for dt_id in dataTableIDs: mapping = self.data_table_index.get(dt_id) if not mapping: raise ValueError(f"No parent study mapping found for Data Table ID '{dt_id}'. " "Please perform a search using this DataTableID or provide a specific file URL.") file_url = mapping['paleo_data'].file_url if not file_url: raise ValueError(f"File URL for Data Table ID '{dt_id}' is missing. Cannot fetch data.") dfs.extend(self._process_file(file_url, mapping)) return dfs # Process based on file_urls provided directly. if file_urls: file_urls = assert_list(file_urls) for url in file_urls: mapping = self.file_url_to_datatable.get(url) if not mapping: log.warning(f"Attached '{url}' is not linked to any parent study; can not add metadata.") dfs.extend(self._process_file(url)) else: mapping_details = self.data_table_index.get(mapping) if not mapping_details: log.warning( f"Mapping details for file URL '{url}' (Data Table ID '{mapping}') not found; can not add metadata.") dfs.extend(self._process_file(url)) else: dfs.extend(self._process_file(url, mapping_details)) return dfs raise ValueError("No dataTableID or file URL provided. Cannot fetch data.")