__all__ = ['Dataset', 'UnsupportedFileTypeError']
import requests
import pandas as pd
import warnings
from ..utils.NOAADataset import NOAADataset
from ..utils.helpers import assert_list
from ..utils.Parser.StandardParser import DataFetcher, StandardParser
from ..utils.Parser.NonStandardParser import NonStandardParser
class UnsupportedFileTypeError(Exception):
"""Raised when a file type is not supported by the parser."""
pass
[docs]
class Dataset:
"""
A wrapper class for interacting with the NOAA Studies API.
Manages the retrieval, parsing, and aggregation of NOAA study data,
and provides methods to access summaries, publications, sites, and external data files.
Attributes
----------
BASE_URL : str
The NOAA API endpoint URL.
studies : dict
A mapping from NOAAStudyId to NOAADataset instances.
data_table_index : dict
A mapping from dataTableID to associated study, site, and paleo data.
"""
BASE_URL = "https://www.ncei.noaa.gov/access/paleo-search/study/search.json"
_PROPRIETARY_TYPES = {'crn', 'rwl', 'fhx', 'lpd'}
def __init__(self):
"""
Initialize the Dataset instance.
Attributes are set to their default empty values.
"""
self.studies = {} # NOAAStudyId -> NOAADataset instance
self.data_table_index = {} # dataTableID -> dict with study, site, paleo_data
self.file_url_to_datatable = {} # file_url -> dataTableID
[docs]
def search_studies(self, xml_id=None, noaa_id=None, data_publisher="NOAA", data_type_id=None,
keywords=None, investigators=None, max_lat=None, min_lat=None, max_lon=None,
min_lon=None, location=None, publication=None, search_text=None, earliest_year=None,
latest_year=None, cv_whats=None, recent=False, limit = 100):
"""
Search for NOAA studies using the specified parameters.
At least one parameter must be provided to perform a search. This method interfaces with
the NOAA NCEI Paleo Study Search API. Use it to filter studies based on location,
investigators, time range, keywords, and more.
Parameters
----------
xml_id : str, optional
Specify the internal XML document ID. Must be an exact match (e.g., '1840').
noaa_id : str, optional
Provide the unique NOAA Study ID as a number (e.g., '13156').
search_text : str, optional
General text search across study content. Supports wildcards (%) and logical operators (AND, OR).
Examples: 'younger dryas', 'loess AND stratigraphy'
data_publisher : by default 'NOAA'
Choose from: 'NOAA', 'NEOTOMA', or 'PANGAEA'.
Example: 'NOAA'
data_type_id : str, optional
Filter by data type. Use one or more type IDs separated by '|'.
Available IDs:
1: BOREHOLE, 2: CLIMATE FORCING, 3: CLIMATE RECONSTRUCTIONS, 4: CORALS AND SCLEROSPONGES,
6: HISTORICAL, 7: ICE CORES, 8: INSECT, 9: LAKE LEVELS, 10: LOESS,
11: PALEOCLIMATIC MODELING, 12: FIRE HISTORY, 13: PALEOLIMNOLOGY, 14: PALEOCEANOGRAPHY,
15: PLANT MACROFOSSILS, 16: POLLEN, 17: SPELEOTHEMS, 18: TREE RING,
19: OTHER COLLECTIONS, 20: INSTRUMENTAL, 59: SOFTWARE, 60: REPOSITORY
Example: '4|18'
keywords : str, optional
Use hierarchical terms separated by '>'. Separate multiple values using '|'.
Example: 'earth science>paleoclimate>paleocean>biomarkers'
investigators : str, optional
Specify one or more investigator names. Use '|' to separate multiple names.
Example: 'Wahl, E.R.|Vose, R.S.'
max_lat : float, optional
Upper bound for latitude. Must be between -90 and 90.
Example: 90
min_lat : float, optional
Lower bound for latitude. Must be between -90 and 90.
Example: -90
max_lon : float, optional
Upper bound for longitude. Must be between -180 and 180.
Example: 180
min_lon : float, optional
Lower bound for longitude. Must be between -180 and 180.
Example: -180
location : str, optional
Use region hierarchy separated by '>'.
Example: 'Continent>Africa>Eastern Africa>Zambia'
publication : str, optional
Match against publication metadata such as title, author, or citation.
Example: 'Khider'
earliest_year : int, optional
Starting year (can be negative for BCE). Used with `timeFormat` and `timeMethod`.
Example: -500
latest_year : int, optional
Ending year. Used with `timeFormat` and `timeMethod`.
Example: 2020
cv_whats : str, optional
Search using controlled vocabulary terms for measured variables.
Format: Hierarchical string using '>'
Example: 'chemical composition>compound>inorganic compound>carbon dioxide'
recent : bool, optional
Set to True to only return studies from the last two years. Results are sorted by newest.
limit : int, optional
Set to 100 by default. Limits the number of studies retrieved.
Returns
-------
pandas.DataFrame
Response DataFrame. Fills the internal `studies` attribute with structured NOAA study data.
Raises
------
ValueError
If no inputs are passed.
requests.HTTPError
If the HTTP request returned an unsuccessful status code.
Notes
-----
At least one parameter must be specified, otherwise the API call will fail.
Examples
--------
.. jupyter-execute::
from pyleotups import Dataset
ds=Dataset()
ds.search_studies(noaa_id=33213)
"""
# Validate input
if not any([xml_id, noaa_id, data_type_id, keywords, investigators, max_lat, min_lat, max_lon,
min_lon, location, publication, search_text, earliest_year, latest_year, cv_whats, recent]):
raise ValueError(
"At least one search parameter must be specified to initiate a query. "
"To view available parameters and usage examples, run: help(Dataset.search_studies)"
)
if data_publisher != "NOAA":
raise NotImplementedError(
f"PyleoTUPS does not support '{data_publisher}' as the data publisher in the current version."
"Please retry a search with data_publisher = NOAA "
"Please check future versions for support of other publishers."
)
if noaa_id:
params = {'NOAAStudyId': noaa_id}
elif xml_id:
params = {'xmlId': xml_id}
else:
params = {
'dataPublisher': data_publisher,
'dataTypeId': data_type_id,
'keywords': keywords,
'investigators': investigators,
'minLat': min_lat,
'maxLat': max_lat,
'minLon': min_lon,
'maxLon': max_lon,
'locations': location,
'searchText': search_text,
'cvWhats': cv_whats,
'earliestYear': earliest_year,
'latestYear': latest_year,
'recent': recent,
'limit': limit
}
params = {k: v for k, v in params.items() if v is not None}
try:
response = requests.get(self.BASE_URL, params=params)
response.raise_for_status()
response_json = response.json()
except requests.HTTPError as e:
raise RuntimeError(f"HTTP error from NOAA API: {e}")
except Exception as e:
raise RuntimeError(f"Failed to fetch or parse response: {e}")
self.studies.clear()
self.file_url_to_datatable.clear()
self._parse_response(response_json)
return self.get_summary()
def _fetch_api(self, params):
"""
Fetch data from the NOAA API using the given parameters.
Parameters
----------
params : dict
A dictionary of query parameters.
Returns
-------
dict
The JSON response from the NOAA API.
Raises
------
Exception
If the API response status is not 200.
"""
response = requests.get(self.BASE_URL, params=params)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Error fetching studies: {response.status_code}")
def _parse_response(self, data):
"""
Parse the JSON response and populate studies and reverse mapping indexes.
"""
from tqdm import tqdm
self.studies.clear()
self.data_table_index.clear()
self.file_url_to_datatable.clear()
for study_data in tqdm(data.get('study', []), desc="Parsing NOAA studies"):
study_obj = NOAADataset(study_data)
self.studies[study_obj.study_id] = study_obj
for site in study_obj.sites:
for paleo in site.paleo_data:
self.data_table_index[paleo.datatable_id] = {
'study_id': study_obj.study_id,
'site_id': site.site_id,
'paleo_data': paleo
}
for file_obj in paleo.files:
file_url = file_obj.get('fileUrl')
if file_url:
self.file_url_to_datatable[file_url] = paleo.datatable_id
[docs]
def get_summary(self):
"""
Get a DataFrame summarizing all loaded studies.
Returns
-------
pandas.DataFrame
A DataFrame with a summary of study metadata and components.
Examples
--------
Examples
--------
.. jupyter-execute::
from pyleotups import Dataset
ds=Dataset()
df = ds.search_studies(noaa_id=33213)
df.head()
"""
data = [study.to_dict() for study in self.studies.values()]
return pd.DataFrame(data)
[docs]
def get_publications(self, save=False, path=None, verbose=False):
"""
Get all publications in both BibTeX and DataFrame formats.
Parameters
----------
save : bool, default=False
If True, save the BibTeX to a .bib file.
path : str or None, optional
Path to save the .bib file. If None and save=True,
saves to 'bibtex_<timestamp>.bib'.
verbose : bool, default=False
If True, print the BibTeX content to console.
Returns
-------
tuple (pybtex.database.BibliographyData, pandas.DataFrame)
BibTeX object and DataFrame of publication details.
Examples
--------
.. jupyter-execute::
from pyleotups import Dataset
ds=Dataset()
dsf = ds.search_studies(noaa_id=33213)
bib, df = ds.get_publications()
df.head()
"""
from pybtex.database import BibliographyData
publications_data = []
bib_entries = {}
# Collect publication metadata and BibTeX entries
for study in self.studies.values():
for pub in study.publications:
pub_dict = pub.to_dict()
pub_dict['StudyID'] = study.study_id
pub_dict['StudyName'] = study.metadata.get("studyName") or "Unknown Study"
publications_data.append(pub_dict)
try:
citation_key = pub.get_citation_key()
bib_entries[citation_key] = pub.to_bibtex_entry()
except Exception as e:
raise ValueError(
f"Failed to convert a publication in study {study.study_id} to BibTeX. "
f"Original error: {e}"
)
df = pd.DataFrame(publications_data)
bibs = BibliographyData(entries=bib_entries)
# Save to file if requested
if save:
import datetime
from pybtex.database.output.bibtex import Writer
from pathlib import Path
if not path:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M")
path = f"bibtex_{timestamp}.bib"
warnings.warn(f"No path specified. Saving BibTeX to: {path}")
try:
writer = Writer()
with open(Path(path), "w", encoding="utf-8") as f:
writer.write_stream(bibs, f)
except Exception as e:
raise IOError(f"Failed to write BibTeX file to '{path}': {e}")
# Print if verbose
if verbose:
from pybtex.database.output.bibtex import Writer
from io import StringIO
buffer = StringIO()
Writer().write_stream(bibs, buffer)
print(buffer.getvalue())
return bibs, df
[docs]
def get_tables(self):
"""
Get a DataFrame of all sites expanded to paleo data files.
Returns
-------
pandas.DataFrame
A DataFrame with one row per (Site × PaleoData × File).
Examples
--------
.. jupyter-execute::
from pyleotups import Dataset
ds=Dataset()
dsf = ds.search_studies(noaa_id=33213)
df = ds.get_tables()
df.head()
"""
records = []
for study in self.studies.values():
study_id = study.study_id
study_name = study.metadata.get("studyName")
for site in study.sites:
paleo_data_records = site.to_dict() # Already flattened per file
for paleo_record in paleo_data_records:
paleo_record.update({
"StudyID": study_id,
"StudyName": study_name
})
records.append(paleo_record)
return pd.DataFrame(records)
[docs]
def get_sites(self):
"""
Get a DataFrame of all sites expanded to paleo data files.
Returns
-------
pandas.DataFrame
A DataFrame with one row per (Site × PaleoData × File).
"""
records = []
for study in self.studies.values():
study_id = study.study_id
study_name = study.metadata.get("studyName")
for site in study.sites:
paleo_data_records = site.to_dict() # Already flattened per file
for paleo_record in paleo_data_records:
paleo_record.update({
"StudyID": study_id,
"StudyName": study_name
})
records.append(paleo_record)
return pd.DataFrame(records)
[docs]
def get_geo(self):
"""
Get a DataFrame of site-level geospatial metadata and associated data types
from all studies loaded into the Dataset.
Returns
-------
pandas.DataFrame
A DataFrame with one row per site and columns:
['StudyID', 'SiteID', 'SiteName', 'LocationName',
'Latitude', 'Longitude', 'MinElevation', 'MaxElevation', 'DataType']
Examples
--------
.. jupyter-execute::
from pyleotups import Dataset
ds=Dataset()
dsf = ds.search_studies(noaa_id=33213)
df = ds.get_geo()
df.head()
"""
site_records = []
for study in self.studies.values():
study_id = study.study_id
data_type = study.metadata.get("dataType", "Unknown")
for site in study.sites:
site_dict = {
"StudyID": study_id,
"DataType": data_type,
**{
k: v for k, v in site.to_dict()[0].items() # site.to_dict() returns list of dicts (1 per file)
if k in ["SiteID", "SiteName", "LocationName", "Latitude", "Longitude", "MinElevation", "MaxElevation"]
}
}
site_records.append(site_dict)
return pd.DataFrame(site_records)
[docs]
def get_funding(self):
"""
Get a DataFrame of all funding records across loaded studies.
Returns
-------
pandas.DataFrame
A DataFrame with columns ['StudyID', 'StudyName', 'FundingAgency', 'FundingGrant'].
Returns an empty DataFrame if no funding is available.
Examples
--------
.. jupyter-execute::
from pyleotups import Dataset
ds=Dataset()
dsf = ds.search_studies(noaa_id=33213)
df = ds.get_funding()
df.head()
"""
records = []
for study in self.studies.values():
study_id = study.study_id
study_name = study.metadata.get("studyName")
for fund in study.funding:
if isinstance(fund, dict):
records.append({
"StudyID": study_id,
"StudyName": study_name,
"FundingAgency": fund.get("fundingAgency", None),
"FundingGrant": fund.get("fundingGrant", None)
})
return pd.DataFrame(records, columns=["StudyID", "StudyName", "FundingAgency", "FundingGrant"])
[docs]
def get_variables(self, dataTableIDs):
"""
Retrieve variable metadata for specified dataTableIDs.
Parameters
----------
dataTableIDs : list or str
One or more NOAA dataTableIDs.
Returns
-------
pandas.DataFrame
A DataFrame indexed by DataTableID with one row per (file × variable).
Includes full variable metadata such as cvShortName, cvUnit, etc.
Examples
--------
.. jupyter-execute::
from pyleotups import Dataset
ds=Dataset()
dsf = ds.search_studies(noaa_id=33213)
df_var = ds.get_variables(dataTableIDs="45859")
df_var.head()
"""
dataTableIDs = assert_list(dataTableIDs)
records = []
for dt_id in dataTableIDs:
mapping = self.data_table_index.get(dt_id)
if not mapping:
raise ValueError(f"DataTableID '{dt_id}' not found. Please run `search_studies` first.")
paleo = mapping["paleo_data"]
study_id = paleo.study_id
site_id = paleo.site_id
for file in paleo.files:
file_url = file.get("fileUrl")
if not file_url:
continue
var_map = paleo.file_variable_map.get(file_url, {})
for var_name, var_meta in var_map.items():
records.append({
"DataTableID": dt_id,
"StudyID": study_id,
"SiteID": site_id,
"FileURL": file_url,
"VariableName": var_name,
**var_meta # includes all cv* fields
})
df = pd.DataFrame(records)
if df.empty:
return pd.DataFrame(columns=["StudyID", "SiteID", "FileURL", "VariableName"]) # fallback for no data
return df.set_index("DataTableID")
@DeprecationWarning
def get_data_deprecated(self, dataTableIDs=None, file_urls=None):
"""
Fetch external data for given dataTableIDs or file URLs and attach study/site metadata.
Parameters
----------
dataTableIDs : list or str, optional
One or more NOAA data table IDs.
file_urls : list or str, optional
One or more file URLs.
Returns
-------
list of pandas.DataFrame
A list of DataFrames, each corresponding to fetched data.
"""
if dataTableIDs:
dataTableIDs = assert_list(dataTableIDs)
dfs = []
for dt_id in dataTableIDs:
mapping = self.data_table_index.get(dt_id)
if not mapping:
print(f"Data Table ID {dt_id} not found or no associated file URL.")
continue
file_url = mapping['paleo_data'].file_url
if not file_url:
print(f"No file URL for Data Table ID {dt_id}.")
continue
fetched_data = DataFetcher.fetch_data(file_url)
if isinstance(fetched_data, list):
for df in fetched_data:
df.attrs['NOAAStudyId'] = mapping['study_id']
df.attrs['SiteID'] = mapping['site_id']
study_obj = self.studies.get(mapping['study_id'], {})
df.attrs['StudyName'] = study_obj.metadata.get("studyName") if hasattr(study_obj, 'metadata') else None
publications = study_obj.publications if hasattr(study_obj, 'publications') else None
print(len(publications))
for pub in publications:
if hasattr(pub, "doi"):
doi = pub.doi if pub.doi else None
df.attrs['PublicationDOI'].append(doi)
dfs.append(df)
else:
fetched_data.attrs['NOAAStudyId'] = mapping['study_id']
fetched_data.attrs['SiteID'] = mapping['site_id']
study_obj = self.studies.get(mapping['study_id'], {})
fetched_data.attrs['StudyName'] = study_obj.metadata.get("studyName") if hasattr(study_obj, 'metadata') else None
dfs.append(fetched_data)
return dfs
if file_urls:
file_urls = assert_list(file_urls)
dfs = [DataFetcher.fetch_data(url) for url in file_urls]
return dfs
print("No dataTableID or file URL provided.")
return pd.DataFrame()
def _process_file(self, file_url, mapping=None):
"""
Process a single file URL: detect parser, parse the file, and attach metadata.
"""
if not file_url:
raise ValueError("File URL is missing.")
file_type = file_url.split('.')[-1].lower()
if file_type in self._PROPRIETARY_TYPES:
raise UnsupportedFileTypeError(
f"pyleotups works with .txt files only. File type '{file_type}' is proprietary."
)
if file_type != 'txt':
raise UnsupportedFileTypeError(
f"Invalid file type '{file_type}'. Only .txt files are supported."
)
# Step 1: Detect parser type by reading initial lines
import requests
def detect_parser_type(lines):
lines = [line.strip() for line in lines if line.strip()]
if all(line.startswith("#") for line in lines[:5]):
return "standard"
for i in range(len(lines) - 4):
line_normalized = lines[i+1].lower()
if (
(("world data center for paleoclimatology" in line_normalized
and "noaa" in lines[i+3].lower()) or
("noaa" in line_normalized
and "world data center for paleoclimatology" in lines[i+3].lower()))
and "-" in lines[i] and "-" in lines[i+4]
):
return "nonstandard"
return "unparsable"
try:
response = requests.get(file_url)
response.raise_for_status()
lines = response.text.splitlines()
parser_type = detect_parser_type(lines)
except Exception as e:
raise RuntimeError(f"Failed to read file from '{file_url}': {e}")
# Step 2: Use the appropriate parser
if parser_type == "standard":
parser = StandardParser(file_url)
elif parser_type == "nonstandard":
parser = NonStandardParser(file_url)
else:
raise ValueError(
f"Unable to determine parser for file: {file_url}. "
)
try:
parsed_data = parser.parse()
except Exception as e:
raise RuntimeError(f"Error while parsing file {file_url}: {e}")
# Step 3: Attach metadata
def attach_metadata(df, mapping):
df.attrs['NOAAStudyId'] = mapping.get('study_id')
study_obj = self.studies.get(mapping.get('study_id'), {})
df.attrs['StudyName'] = study_obj.metadata.get("studyName") if hasattr(study_obj, 'metadata') else None
return df
results = []
if isinstance(parsed_data, list):
for df in parsed_data:
if mapping:
df = attach_metadata(df, mapping)
results.append(df)
else:
if mapping:
parsed_data = attach_metadata(parsed_data, mapping)
results.append(parsed_data)
return results
[docs]
def get_data(self, dataTableIDs=None, file_urls=None):
"""
Fetch external data for given dataTableIDs or file URLs, perform validations,
and attach study and site metadata.
Parameters
----------
dataTableIDs : list or str, optional
One or more NOAA data table IDs.
file_urls : list or str, optional
One or more file URLs.
Returns
-------
list of pandas.DataFrame
A list of DataFrames corresponding to the fetched data.
Raises
------
ValueError
For missing parent study mapping, missing file URL, or proprietary/unsupported file types.
Exception
Propagates any exceptions raised by the parser.
Examples
--------
.. jupyter-execute::
from pyleotups import Dataset
ds=Dataset()
df = ds.search_studies(noaa_id=33213)
dfs = ds.get_data(dataTableIDs="45859")
dfs[0].head()
"""
dfs = []
# Process based on dataTableIDs.
if dataTableIDs:
dataTableIDs = assert_list(dataTableIDs)
for dt_id in dataTableIDs:
# print(self.data_table_index, type(self.data_table_index.values()))
# for id, value in self.data_table_index.items():
# print(type(id))
# print(value, type(value))
mapping = self.data_table_index.get(dt_id)
if not mapping:
raise ValueError(f"No parent study mapping found for Data Table ID '{dt_id}'. "
"Please perform a search using this DataTableID or provide a specific file URL.")
file_url = mapping['paleo_data'].file_url
if not file_url:
raise ValueError(f"File URL for Data Table ID '{dt_id}' is missing. Cannot fetch data.")
dfs.extend(self._process_file(file_url, mapping))
return dfs
# Process based on file_urls provided directly.
if file_urls:
file_urls = assert_list(file_urls)
for url in file_urls:
mapping = self.file_url_to_datatable.get(url)
if not mapping:
warnings.warn(
f"Attached '{url}' is not linked to any parent study; can not add metadata.",
UserWarning
)
dfs.extend(self._process_file(url))
else:
mapping_details = self.data_table_index.get(mapping)
if not mapping_details:
warnings.warn(
f"Mapping details for file URL '{url}' (Data Table ID '{mapping}') not found; can not add metadata.",
UserWarning
)
dfs.extend(self._process_file(url))
else:
dfs.extend(self._process_file(url, mapping_details))
return dfs
raise ValueError("No dataTableID or file URL provided. Cannot fetch data.")