# NonStandardParser.py
import re
import pandas as pd
import requests
# Import all models and utilities from the utils file
from .NonStandardParserUtils import (
Block, LineInfo, BlockType,
count_tokens, numeric_ratio, is_numeric, generate_row_pattern,
_safe_mean, _safe_var, _safe_cv, _most_common,
get_token_intervals_multi, merge_headers_by_overlap,
generate_df, assign_tokens_by_overlap,
refine_headers_by_correspondence
)
[docs]
class NonStandardParser:
"""
Parses non-standard, fixed-width, or misaligned text files (like
those from NOAA) into a structured list of Blocks, each potentially
containing a pandas DataFrame.
The parser uses statistical heuristics to classify contiguous blocks
of text and then applies different parsing strategies based on the
classification.
Attributes
----------
file_path : str
The local path or URL to the file being parsed.
use_skip : bool
Whether to skip to the "DATA:" descriptor in the file.
lines : list[str]
A list of all lines read from the file.
blocks : list[Block]
The final list of processed Block objects.
Notes
-----
The parsing workflow is as follows.
- A `NonStandardParser` instance is created with a `file_path`.
- The public `parse()` method is called.
- `_fetch_lines()` reads the file into `self.lines`.
- `_segregate_blocks()` splits `self.lines` into `Block` objects -a groups of non-empty lines- and saves them to `self.blocks`.
- `parse()` iterates through each `block` in `self.blocks`.
- `_process_block()` is called on each block, which a. Computes statistics for the block, b. Classifies it (e.g., TABULAR, DATA, NARRATIVE), c. Dispatches to a specific parsing method (e.g.,`_parse_tabular_block`).
- The specific parse methods (e.g., `_parse_data_block`) handle logic for header borrowing, DataFrame generation, and error handling, modifying the `block` object in place.
- `parse()` returns the fully processed `self.blocks` list.
"""
def __init__(self, file_path, use_skip=True, use_refinement=True):
"""
Initializes the parser.
Parameters
----------
file_path : str
The local file path or HTTP/HTTPS URL of the text file to parse.
use_skip : bool, optional
If True (default), the parser will skip all lines until it finds
a line starting with "DATA:". This is standard for NOAA files.
use_refinement : bool, optional
If True (default), attempts to refine column headers by analyzing the
vertical alignment of data columns (histogramming).
Notes
-----
Do's and Don'ts:
- Do set this to True for standard NOAA Paleo files.
- Do not set this to True if your file has no "DATA:" marker.
In that case, it will raise a ValueError. Set it to False
to parse the entire file from the beginning.
"""
self.file_path = file_path
self.use_skip = use_skip
self.lines = []
self.blocks = []
self.use_refinement = use_refinement
[docs]
def parse(self):
"""
Executes the full parsing workflow on the file.
Returns
-------
list[Block]
A list of processed Block objects. Each block may contain
a DataFrame (`block.df`) if parsing was successful, or an
error message (`block.error_message`) if it failed.
Raises
------
ValueError
If `use_skip` is True and no "DATA:" line is found.
requests.exceptions.RequestException
If the `file_path` is a URL and it fails to fetch.
"""
self._fetch_lines()
start_idx = self._find_data_descriptor() if self.use_skip else -1
if start_idx == -1 and self.use_skip:
raise ValueError("No Data Descriptor found in the file.")
self._segregate_blocks(start_idx + 1, len(self.lines))
for idx, block in enumerate(self.blocks):
self._process_block(block, idx)
return self.blocks
def _get_cv_for_delimiter(self, block, delimiter):
"""Returns the Coefficient of Variation for the given delimiter."""
if not delimiter: return 1.0
if "2," in delimiter: return block.stats.get("cv_multi", 1.0)
if "\\t" in delimiter: return block.stats.get("cv_tab", 1.0)
return block.stats.get("cv_single", 1.0)
def _fetch_lines(self):
"""Fetches file content from path/URL and sets self.lines."""
if self.file_path.lower().startswith('http'):
response = requests.get(self.file_path)
response.raise_for_status()
self.lines = response.text.splitlines()
else:
with open(self.file_path, 'r', encoding='utf-8') as f:
text = f.read()
self.lines = text.splitlines()
def _find_data_descriptor(self):
"""Finds the 'DATA:' line index in self.lines."""
return next((j for j, line in enumerate(self.lines)
if line.lower().startswith("data:")), -1)
def _segregate_blocks(self, start=0, end=None):
"""
Segregates self.lines into Blocks and pre-computes line stats.
Splits the file into blocks based on empty lines. For each line,
it also pre-computes token counts and numeric ratios for all
potential delimiters.
"""
blocks = []
block = None
block_idx = 0
end = len(self.lines) if end is None else end
for i in range(start, end):
line = self.lines[i]
if line.strip():
if block is None:
block = Block(idx=block_idx, start=i, end=i)
# Pre-compute stats for the line
li = LineInfo(idx=i, text=line.rstrip("\n"))
li.line_len = len(li.text)
li.count_single_tokens = count_tokens(li.text, r"\s+")
li.count_multispace_tokens = count_tokens(li.text, r"(\s{2,})")
li.count_tab_tokens = count_tokens(li.text, r"\t")
li.numeric_single_ratio = numeric_ratio(li.text, r"\s+")
li.numeric_multispace_ratio = numeric_ratio(li.text, r"(\s{2,})")
li.numeric_tab_ratio = numeric_ratio(li.text, r"\t")
block.lines.append(li)
block.end = i
else:
if block is not None:
blocks.append(block)
block_idx += 1
block = None
if block is not None:
blocks.append(block)
self.blocks = blocks
@staticmethod
def _compute_statistics(block):
"""Computes and returns aggregate statistics for a block."""
s, m, t, l, ns, nm, nt, lens = [], [], [], [], [], [], [], []
for ln in block.lines:
s.append(getattr(ln, "count_single_tokens", 0) or 0)
m.append(getattr(ln, "count_multispace_tokens", 0) or 0)
t.append(getattr(ln, "count_tab_tokens", 0) or 0)
l.append(getattr(ln, "line_len", 0) or 0)
ns.append(getattr(ln, "numeric_single_ratio", 0.0) or 0.0)
nm.append(getattr(ln, "numeric_multispace_ratio", 0.0) or 0.0)
nt.append(getattr(ln, "numeric_tab_ratio", 0.0) or 0.0)
lens.append(getattr(ln, "line_len",
len(getattr(ln, "text", "") or "")))
mean_single = _safe_mean(s)
mean_multi = _safe_mean(m)
mean_tab = _safe_mean(t)
mean_line_len = _safe_mean(l) # Changed from 'l'
# Bug fix: Corrected last key from 'cv_tab' to 'cv_line'
stats = {
"mean_single": mean_single,
"cv_single": _safe_cv(s, mean_single),
"mode_single": _most_common(s),
"mean_multi": mean_multi,
"cv_multi": _safe_cv(m, mean_multi),
"mode_multi": _most_common(m),
"max_multi": max(m) if m else 0,
"mean_tab": mean_tab,
"cv_tab": _safe_cv(t, mean_tab),
"mode_tab": _most_common(t),
"mean_numeric_single": _safe_mean(ns),
"mean_numeric_multi": _safe_mean(nm),
"mean_numeric_tab": _safe_mean(nt),
"n_lines": len(block.lines),
"n_nonempty": sum(1 for ln in block.lines if
(getattr(ln, "text", "") or "").strip()),
"mean_line_len": _safe_mean(lens), # Use 'lens' for mean
"cv_line": _safe_cv(lens, _safe_mean(lens)), # Use 'lens'
}
return stats
@staticmethod
def _choose_delimiter(block, strict=False):
"""
Chooses the best delimiter for a block based on its stats.
Parameters
----------
block : Block
The block to analyze (must have `block.stats` computed).
strict : bool, optional
If True, only returns a delimiter if its token count is
perfectly consistent (CV=0) and it has > 1 token.
If False, returns the "best guess" delimiter (tab or
multi-space) with the lowest CV, even if imperfect.
Returns
-------
str or None
The regex string of the best delimiter, or None if no
suitable delimiter is found.
"""
candidates = [("tab", r"\t"), ("multi", r"(\s{2,})"), ("single", r"\s+")]
if strict:
for k, pattern in candidates:
cv = block.stats.get(f"cv_{k}", 1)
mode = block.stats.get(f"mode_{k}", 0)
if cv == 0 and mode > 1:
return pattern
return None # No "perfect" delimiter found
# Non-strict mode: find best "imperfect" delimiter
best_pattern, best_cv = None, float('inf')
# Only check tab and multi-space for non-strict
for k, pattern in candidates[:2]:
cv = block.stats.get(f"cv_{k}", 1)
mode = block.stats.get(f"mode_{k}", 0)
if mode > 1 and cv < best_cv:
best_cv = cv
best_pattern = pattern
return best_pattern
@staticmethod
def _detect_header_extent(block, delimiter):
"""Detects the number of header lines (extent) in a block."""
patterns, title_line = [], None
for i, line in enumerate(block.lines):
tokens = [t for t in re.split(delimiter, line.text.strip()) if t.strip()]
pattern = generate_row_pattern(tokens)
patterns.append(pattern)
if i == 0 and pattern == "S": # Check for title line
title_line = i
start_i = title_line + 1 if title_line is not None else 0
extent = 0
for i, pattern in enumerate(patterns[start_i:]):
is_all_s = all(c == "S" for c in pattern)
is_all_n = all(c == "N" for c in pattern)
next_is_all_s = (start_i + i + 1 < len(patterns)) and all(
c == "S" for c in patterns[start_i + i + 1]
)
# A header line is all-string, or all-numeric *if*
# it's the first line and the next line is all-string.
if is_all_s or (i == 0 and is_all_n and next_is_all_s):
extent += 1
else:
break
return extent, title_line
@staticmethod
def _detect_header_indices(block, delimiter):
"""
Scans the block to identify the indices of the Title and Header lines.
Returns
-------
title_idx : int
The index of the title line, or -1 if not found.
header_start : int
The inclusive start index of the header rows.
header_end : int
The exclusive end index of the header rows.
"""
patterns = []
for line in block.lines:
tokens = [t for t in re.split(delimiter, line.text.strip()) if t.strip()]
patterns.append(generate_row_pattern(tokens))
# 1. Scan for the "Candidate Block" (consecutive String or Unit lines)
candidate_end = 0
for i, pattern in enumerate(patterns):
curr_idx = i
is_all_s = all(c == "S" for c in pattern)
# Check for Unit Line (current is string, next is mostly numeric)
is_unit_line = False
if curr_idx + 1 < len(patterns):
next_pattern = patterns[curr_idx + 1]
if len(next_pattern) > 0:
next_numeric_ratio = next_pattern.count("N") / len(next_pattern)
curr_numeric_ratio = pattern.count("N") / len(pattern) if pattern else 0
if next_numeric_ratio > 0.5 and curr_numeric_ratio < 0.5:
is_unit_line = True
# Special case: Numeric Year as Title (e.g. "2023")
is_first_numeric_title = (
i == 0 and all(c == "N" for c in pattern) and
(curr_idx + 1 < len(patterns) and all(c == "S" for c in patterns[curr_idx + 1]))
)
if is_all_s or is_unit_line or is_first_numeric_title:
candidate_end += 1
else:
break
# 2. Resolve Title vs. Headers based on Geometry
title_idx = -1
header_start = 0
header_end = candidate_end
if candidate_end == 0:
return -1, 0, 0
def get_token_count(idx):
if idx < 0 or idx >= len(block.lines): return 0
return len([t for t in re.split(delimiter, block.lines[idx].text.strip()) if t.strip()])
# Check A: Interposed Title (Title at the BOTTOM of header block)
if candidate_end > 1:
last_idx = candidate_end - 1
prev_idx = last_idx - 1
if get_token_count(last_idx) == 1 and get_token_count(prev_idx) > 1:
title_idx = last_idx
header_end = last_idx # Exclude last line from headers
return title_idx, header_start, header_end
# Check B: Top Title (Title at the TOP)
if candidate_end > 0:
if get_token_count(0) == 1:
if candidate_end > 1 and get_token_count(1) > 1:
title_idx = 0
header_start = 1
elif candidate_end == 1:
# Single line title, no headers
title_idx = 0
header_start = 1
header_end = 1
return title_idx, header_start, header_end
@staticmethod
def _extract_headers(block, delimiter):
"""Extracts header names and intervals using normalized indices."""
title_idx, h_start, h_end = NonStandardParser._detect_header_indices(block, delimiter)
# 1. Handle Title
if title_idx != -1:
block.title = block.lines[title_idx].text
# 2. Slice Headers
header_lines = block.lines[h_start : h_end]
# 3. Calculate Data Offset (header_extent)
# Data starts after the MAX of header_end or (Title + 1)
structure_end = h_end
if title_idx != -1:
structure_end = max(structure_end, title_idx + 1)
block.header_extent = structure_end
# 4. Process Headers
if not header_lines:
return [], block.header_extent
token_maps = [get_token_intervals_multi(line.text, delimiter)
for line in header_lines]
if len(header_lines) == 1:
token_objs = get_token_intervals_multi(header_lines[0].text, delimiter)
return [{"name": t["display"], "interval": t["interval"]}
for t in token_objs], block.header_extent
return merge_headers_by_overlap(token_maps), block.header_extent
@staticmethod
def _classify_block(block):
"""Classifies a block into a BlockType based on its stats."""
stats = block.stats
if stats["n_lines"] < 2:
return BlockType.NARRATIVE
if stats["mean_numeric_single"] < 0.25:
# Stricter Narrative Check 2: Max tokens is 1 (Pure Narrative)
if stats["max_multi"] == 1:
return BlockType.NARRATIVE
elif (stats["mode_multi"] > 1 or stats["mode_tab"] > 1) and stats["n_lines"] <= 6:
return BlockType.HEADER_ONLY
best_delimiter = NonStandardParser._choose_delimiter(block, strict=True)
if not best_delimiter:
# Fallback only if block is substantial, else Narrative
return BlockType.TABULAR if len(block.lines) > 2 else BlockType.NARRATIVE
headers, extent = NonStandardParser._extract_headers(block, best_delimiter)
if headers:
block.headers = headers
block.header_extent = extent
block.delimiter = best_delimiter
if extent < stats["n_lines"]:
return BlockType.COMPLETE_TABULAR
else:
return BlockType.HEADER_ONLY
else:
block.delimiter = best_delimiter
return BlockType.DATA
def _process_block(self, block, current_idx):
"""Main dispatcher for parsing logic based on block type."""
block.stats = self._compute_statistics(block)
block_type = self._classify_block(block)
block.block_type = block_type
# Dispatch to the correct parsing method
if block_type == BlockType.COMPLETE_TABULAR:
self._parse_complete_tabular_block(block)
elif block_type == BlockType.TABULAR:
self._parse_tabular_block(block, current_idx)
elif block_type == BlockType.HEADER_ONLY:
self._parse_header_block(block)
elif block_type == BlockType.DATA:
self._parse_data_block(block, current_idx)
# NARRATIVE blocks are left as-is
# ERROR blocks (if classified early) are also left as-is
def _find_previous_header_block(self, current_idx):
"""Finds the most recent valid header-providing block."""
for i in range(current_idx - 1, -1, -1):
prev_block = self.blocks[i]
# Only borrow from "clean" header or table blocks
if prev_block.block_type in (BlockType.HEADER_ONLY,
BlockType.COMPLETE_TABULAR, BlockType.TABULAR):
if prev_block.headers and prev_block.delimiter:
return prev_block
return None
def _parse_complete_tabular_block(self, block):
"""Parses a "perfect" (CV=0) tabular block."""
try:
df = generate_df(
block.lines,
block.delimiter,
block.headers,
block.header_extent
)
block.df = df
except Exception as e:
block.block_type = BlockType.ERROR
block.error_message = f"Failed to parse complete tabular block: {e}"
block.df = None
def _parse_tabular_block(self, block, current_idx):
"""Parses an imperfect tabular block, attempting refinement."""
try:
if not block.delimiter:
block.delimiter = self._choose_delimiter(block, strict=False)
if not block.delimiter:
raise ValueError("Could not determine a suitable delimiter.")
if not block.headers:
headers, extent = self._extract_headers(block, block.delimiter)
block.headers = headers
# header_extent is updated inside _extract_headers
# --- Refinement Step ---
if self.use_refinement and block.header_extent > 0:
# Re-detect indices to get clean header lines (excluding Title)
title_idx, h_start, h_end = self._detect_header_indices(block, block.delimiter)
if h_end > h_start:
header_lines_subset = block.lines[h_start:h_end]
data_lines = block.lines[block.header_extent:]
refined = refine_headers_by_correspondence(
header_lines_subset, data_lines, block.delimiter
)
if refined:
block.headers = refined
if not block.headers:
prev = self._find_previous_header_block(current_idx)
if not prev:
raise ValueError("No headers found in block or preceding blocks.")
borrowed = prev.headers
if self.use_refinement and prev.header_extent > 0:
p_title, p_start, p_end = self._detect_header_indices(prev, prev.delimiter)
if p_end > p_start:
header_lines_subset = prev.lines[p_start:p_end]
data_lines = block.lines # Use CURRENT data for refinement
refined = refine_headers_by_correspondence(
header_lines_subset, data_lines, prev.delimiter
)
if refined:
borrowed = refined
block.headers = borrowed
block.delimiter = prev.delimiter
# Ensure extent is set if it wasn't already (e.g. if we had a Title but no headers)
if block.header_extent is None:
block.header_extent = 0
prev.used_as_header_for.append(block.idx)
cv = self._get_cv_for_delimiter(block, block.delimiter)
try:
if cv > 0:
raise ValueError("Detected jagged columns (CV > 0).")
df = generate_df(block.lines, block.delimiter, block.headers, block.header_extent)
except ValueError:
df = assign_tokens_by_overlap(block.lines, block.delimiter, block.headers, block.header_extent)
block.df = df
except Exception as e:
block.block_type = BlockType.ERROR
block.error_message = f"Failed to parse tabular block: {e}"
block.df = None
def _parse_header_block(self, block):
"""Ensures headers are extracted for HEADER_ONLY blocks."""
try:
if not block.delimiter:
block.delimiter = self._choose_delimiter(block, strict=False)
if not block.headers and block.delimiter:
headers, extent = self._extract_headers(block, block.delimiter)
block.headers = headers
block.header_extent = extent
if not block.headers:
raise ValueError("Could not extract headers from header-only block.")
except Exception as e:
block.block_type = BlockType.ERROR
block.error_message = f"Failed to parse header block: {e}"
def _parse_data_block(self, block, current_idx):
"""Parses a DATA block by borrowing headers, respecting local Titles."""
try:
prev = self._find_previous_header_block(current_idx)
if not prev:
raise ValueError("No preceding headers found for this data block.")
borrowed = prev.headers
# --- Refinement Step ---
if self.use_refinement and prev.header_extent > 0:
p_title, p_start, p_end = self._detect_header_indices(prev, prev.delimiter)
if p_end > p_start:
header_lines_subset = prev.lines[p_start:p_end]
data_lines = block.lines
refined = refine_headers_by_correspondence(
header_lines_subset, data_lines, prev.delimiter
)
if refined:
borrowed = refined
block.headers = borrowed
block.delimiter = prev.delimiter
try:
df = generate_df(block.lines, block.delimiter, block.headers, block.header_extent)
except ValueError:
df = assign_tokens_by_overlap(block.lines, block.delimiter, block.headers, block.header_extent)
block.df = df
prev.used_as_header_for.append(block.idx)
except Exception as e:
block.block_type = BlockType.ERROR
block.error_message = f"Failed to parse data block: {e}"
block.df = None