Source code for pyleotups.utils.Parser.ExcelParser

import io
import os
import math
import requests
import pandas as pd
from dataclasses import dataclass, field
from typing import Any, List, Optional, Tuple, Iterable, Dict
from enum import Enum
from .NonStandardParserUtils import auto_cast_df


NUMERIC_THRESHOLD_HEADER = 0.25
MAX_TABLE_COLUMN_LEN = 2
MIN_REGION_AREA = 1


class BlockType(str, Enum):
    """Enumeration for the different types a Block can be classified as."""
    NARRATIVE = "narrative"
    HEADER_ONLY = "header-only"
    DATA_ONLY = "data-only"
    COMPLETE_TABULAR = "complete-tabular"
    METADATA = "metadata"
    EMPTY = "empty"


@dataclass
class SheetGrid:
    """
    Lightweight 0-based grid wrapper over a sheet's values.

    Attributes
    ----------
    name : str
        Name of the Excel sheet.
    sheet_idx : int
        Index of the sheet in the workbook.
    nrows : int
        Number of used rows.
    ncols : int
        Number of used columns.
    _values : List[List[Any]]
        The raw cell values (row-major).
    merged_spans : List[Tuple[int, int, int, int]], optional
        List of merged cell ranges (r0, r1, c0, c1) inclusive 0-based.
    """
    name: str
    sheet_idx: int
    nrows: int
    ncols: int
    _values: List[List[Any]]
    merged_spans: List[Tuple[int, int, int, int]] = field(default_factory=list)

    def get_value(self, r: int, c: int) -> Any:
        """Safe accessor for grid values."""
        if 0 <= r < self.nrows and 0 <= c < self.ncols:
            return self._values[r][c]
        return None

    def is_empty(self, r: int, c: int) -> bool:
        """Checks if a cell is visually empty (None or whitespace string)."""
        v = self.get_value(r, c)
        if v is None:
            return True
        if isinstance(v, str):
            return len(v.strip()) == 0
        return False


@dataclass
class Block:
    """
    Represents a contiguous non-empty region in an Excel sheet.

    Attributes
    ----------
    idx : int
        Unique identifier for the block in the parsing session.
    sheet_name : str
        Name of the sheet containing this block.
    sheet_idx : int
        Index of the sheet.
    top : int
        Top row index (inclusive).
    bottom : int
        Bottom row index (inclusive).
    left : int
        Left column index (inclusive).
    right : int
        Right column index (inclusive).
    area : int
        Number of cells in the block.
    block_type : BlockType
        Classified type of the block (e.g., TABULAR, NARRATIVE).
    headers : List[str]
        List of column headers strings.
    header_extent : int
        Number of rows detected as headers.
    title : str
        Detected title text for the block.
    stats : dict
        Computed statistics (numeric ratios, dimensions, etc.).
    df : pd.DataFrame
        The parsed DataFrame (if successful).
    """
    idx: int = -1
    sheet_name: str = ""
    sheet_idx: int = 0
    top: int = 0
    bottom: int = 0
    left: int = 0
    right: int = 0
    area: int = 0

    block_type: BlockType = BlockType.NARRATIVE
    headers: Optional[List[str]] = None
    header_extent: int = 0
    title: Optional[str] = None
    stats: dict = field(default_factory=dict)
    df: Optional[pd.DataFrame] = None
    
    # Internal usage for parser logic
    delimiter: str = "excel" 


# ---------------------------------------------------------------------
# ExcelParser Class
# ---------------------------------------------------------------------

[docs] class ExcelParser: """ Parses Excel files by detecting contiguous blocks of non-empty cells and converting them into structured DataFrames. It handles: - Spatial segmentation (BFS) to find tables. - Merged cell propagation. - Statistical header detection. - Multi-row header merging. - Header borrowing for data-only blocks. Attributes ---------- file_path : str The local path or URL to the Excel file. sheets : List[SheetGrid] The loaded sheets content. blocks : List[Block] The segregated and processed blocks. _header_registry : Dict[str, List[Block]] Internal registry for header borrowing logic. """ def __init__(self, file_path: str): """ Initializes the ExcelParser. Parameters ---------- file_path : str The local file path or HTTP/HTTPS URL of the .xls/.xlsx file. """ self.file_path = file_path self.sheets: List[SheetGrid] = [] self.blocks: List[Block] = [] self._header_registry: Dict[str, List[Block]] = {}
[docs] def parse(self) -> List[Block]: """ Executes the full parsing workflow. Returns ------- List[Block] A list of processed Block objects, potentially containing DataFrames. """ self._fetch_workbook() self._segregate_blocks() for idx, block in enumerate(self.blocks): self._process_block(block, idx) return self.blocks
def _fetch_workbook(self): """ Fetches file content and populates self.sheets using openpyxl or xlrd. """ # 1. Determine source bytes or path content = None is_path = False if self.file_path.lower().startswith("http"): response = requests.get(self.file_path) response.raise_for_status() content = io.BytesIO(response.content) else: if os.path.exists(self.file_path): content = self.file_path is_path = True else: raise FileNotFoundError(f"File not found: {self.file_path}") # 2. Open Workbook wb_xlsx, wb_xls = None, None # Try openpyxl (.xlsx) first try: from openpyxl import load_workbook # data_only=True gets values, not formulas wb_xlsx = load_workbook(filename=content, data_only=True, read_only=False) except Exception: # Fallback to xlrd (.xls) if not is_path and hasattr(content, 'seek'): content.seek(0) try: import xlrd if is_path: wb_xls = xlrd.open_workbook(content, formatting_info=True) else: wb_xls = xlrd.open_workbook(file_contents=content.read(), formatting_info=True) except Exception as e: raise RuntimeError( f"Failed to open workbook. Ensure 'openpyxl' (xlsx) or 'xlrd' (xls) is installed. Error: {e}" ) # 3. Convert to SheetGrid objects self.sheets = [] if wb_xlsx: self._parse_openpyxl(wb_xlsx) elif wb_xls: self._parse_xlrd(wb_xls) def _parse_openpyxl(self, wb): """Internal helper to convert openpyxl workbook to SheetGrids.""" for idx, ws in enumerate(wb.worksheets): title = (ws.title or "").lower() if "readme" in title: continue max_row = ws.max_row or 0 max_col = ws.max_column or 0 values = [[cell.value for cell in row] for row in ws.iter_rows(min_row=1, max_row=max_row, min_col=1, max_col=max_col)] merged_spans = [] # Propagate merged cells for rng in ws.merged_cells.ranges: r0, r1 = rng.min_row - 1, rng.max_row - 1 c0, c1 = rng.min_col - 1, rng.max_col - 1 merged_spans.append((r0, r1, c0, c1)) tl = values[r0][c0] if 0 <= r0 < max_row and 0 <= c0 < max_col else None if tl is not None and (isinstance(tl, str) and tl.strip() == ""): tl = None if tl is not None: for rr in range(r0, r1 + 1): for cc in range(c0, c1 + 1): current = values[rr][cc] if current is None or (isinstance(current, str) and current.strip() == ""): values[rr][cc] = tl nrows_used, ncols_used = self._compute_used_bounds(values) trimmed = [row[:ncols_used] for row in values[:nrows_used]] self.sheets.append(SheetGrid(ws.title, idx, nrows_used, ncols_used, trimmed, merged_spans)) def _parse_xlrd(self, wb): """Internal helper to convert xlrd workbook to SheetGrids.""" for idx in range(wb.nsheets): sh = wb.sheet_by_index(idx) title = (sh.name or "").lower() if "readme" in title: continue nrows, ncols = sh.nrows, sh.ncols values = [[sh.cell_value(r, c) for c in range(ncols)] for r in range(nrows)] merged_spans = [] if hasattr(sh, "merged_cells"): for rlo, rhi, clo, chi in sh.merged_cells: r0, r1 = rlo, rhi - 1 c0, c1 = clo, chi - 1 merged_spans.append((r0, r1, c0, c1)) tl = values[r0][c0] if 0 <= r0 < nrows and 0 <= c0 < ncols else None if tl is not None and (isinstance(tl, str) and tl.strip() == ""): tl = None if tl is not None: for rr in range(r0, r1 + 1): for cc in range(c0, c1 + 1): current = values[rr][cc] if current is None or (isinstance(current, str) and current.strip() == ""): values[rr][cc] = tl nrows_used, ncols_used = self._compute_used_bounds(values) trimmed = [row[:ncols_used] for row in values[:nrows_used]] self.sheets.append(SheetGrid(sh.name, idx, nrows_used, ncols_used, trimmed, merged_spans)) def _segregate_blocks(self): """ Identify contiguous blocks of non-empty cells in all sheets. Applies BFS spatial clustering and minimal enclosure consolidation. """ raw_blocks = [] for grid in self.sheets: visited = [[False] * max(1, grid.ncols) for _ in range(max(1, grid.nrows))] for r in range(grid.nrows): for c in range(grid.ncols): if visited[r][c] or grid.is_empty(r, c): visited[r][c] = True continue # BFS to find island queue = [(r, c)] visited[r][c] = True top = bottom = r left = right = c area = 0 while queue: rr, cc = queue.pop() if grid.is_empty(rr, cc): continue area += 1 top = min(top, rr) bottom = max(bottom, rr) left = min(left, cc) right = max(right, cc) for dr, dc in self._neighbors8(): nr, nc = rr + dr, cc + dc if 0 <= nr < grid.nrows and 0 <= nc < grid.ncols: if not visited[nr][nc]: visited[nr][nc] = True if not grid.is_empty(nr, nc): queue.append((nr, nc)) if area >= MIN_REGION_AREA: raw_blocks.append(Block( idx=-1, sheet_name=grid.name, sheet_idx=grid.sheet_idx, top=top, bottom=bottom, left=left, right=right, area=area )) # Consolidate Enclosures & Sort consolidated = self._consolidate_enclosures_minimal(raw_blocks) # Sort: Sheet, then visual reading order (top-left) consolidated.sort(key=lambda b: (b.sheet_idx, b.left, b.top, b.right, -b.area)) # Assign IDs for i, blk in enumerate(consolidated): blk.idx = i self.blocks = consolidated def _process_block(self, block: Block, idx: int): """ Main processing logic for a single block. Computes statistics, detects headers, classifies the block, and generates the DataFrame if applicable. """ grid = self._get_sheet_by_name(block.sheet_name) if not grid: return # 0. Gate: Minimum distinct columns check col_len = self._block_column_len(block, grid) if col_len < MAX_TABLE_COLUMN_LEN: block.block_type = BlockType.NARRATIVE return # 1. Compute Profiles (Stats) block.stats = self._compute_statistics(block, grid) # 2. Detect Header Extent & Title hdr_info = self._detect_header_extent(block, grid, block.stats) block.header_extent = hdr_info.get("header_extent", 0) block.title = self._extract_title_text(block, grid, hdr_info) # 3. Extract & Merge Headers merged_headers = self._extract_and_merge_headers(block, grid, hdr_info) block.headers = merged_headers # 4. Classify block.block_type = self._classify_block(block, block.stats, merged_headers) # 5. Register Header-Bearing Blocks (for borrowing) if block.block_type in (BlockType.COMPLETE_TABULAR, BlockType.HEADER_ONLY) and merged_headers: self._header_registry.setdefault(block.sheet_name, []).append(block) # 6. Parse Data / Handle Borrowing if block.block_type == BlockType.DATA_ONLY: # Attempt to borrow headers from a previous block on the same sheet candidates = self._header_registry.get(block.sheet_name, []) donor = self._find_borrow_candidate(block, candidates) if donor and donor.headers: borrowed = self._borrow_headers(block, donor) block.headers = borrowed block.block_type = BlockType.COMPLETE_TABULAR # Adjust header info for generation (data starts immediately after title) hdr_info_borrow = dict(hdr_info) hdr_info_borrow["header_extent"] = 0 block.df = self._generate_df(block, grid, borrowed, hdr_info_borrow) else: block.df = None elif block.block_type == BlockType.COMPLETE_TABULAR and merged_headers: df = self._generate_df(block, grid, merged_headers, hdr_info) block.df = auto_cast_df(df) else: block.df = None # ----------------------------------------------------------------- # Core Logic Methods # ----------------------------------------------------------------- def _compute_statistics(self, block: Block, grid: SheetGrid) -> dict: """Computes numeric density and non-empty counts for rows/cols.""" row_nonempty, row_numeric = [], [] for r in range(block.top, block.bottom + 1): vals = [grid.get_value(r, c) for c in range(block.left, block.right + 1)] nonempties = [v for v in vals if not self._is_val_empty(v)] k = len(nonempties) row_nonempty.append(k) if k == 0: row_numeric.append(0.0) else: ratio = sum(1 for v in nonempties if self._is_numeric_cell(v)) / k row_numeric.append(ratio) # Basic aggregate stats mean_nonempty = sum(row_nonempty) / len(row_nonempty) if row_nonempty else 0 mean_numeric = sum(row_numeric) / len(row_numeric) if row_numeric else 0 return { "row_nonempty_counts": row_nonempty, "row_numeric_ratio": row_numeric, "mean_row_nonempty": mean_nonempty, "mean_row_numeric_ratio": mean_numeric } def _detect_header_extent(self, block: Block, grid: SheetGrid, stats: dict) -> dict: """ Determines the title row and how many lines constitute the header. Title Rule: First row of block has exactly 1 non-empty cell. Header Rule: Extend while row numeric ratio < threshold. """ row_nonempty = stats["row_nonempty_counts"] row_numeric = stats["row_numeric_ratio"] title_row_idx = None # Check if first row looks like a title (single cell at top-left of block) if row_nonempty and row_nonempty[0] == 1: if not grid.is_empty(block.top, block.left): title_row_idx = block.top start_offset = 1 if title_row_idx is not None else 0 header_extent = 0 for i in range(start_offset, len(row_numeric)): if row_numeric[i] < NUMERIC_THRESHOLD_HEADER: header_extent += 1 else: break return {"title_row": title_row_idx, "header_extent": header_extent} def _extract_title_text(self, block: Block, grid: SheetGrid, hdr_info: dict) -> Optional[str]: tr = hdr_info.get("title_row") if tr is None: return None # Find the single non-empty cell for c in range(block.left, block.right + 1): val = grid.get_value(tr, c) if not self._is_val_empty(val): return str(val).strip() return None def _extract_and_merge_headers(self, block: Block, grid: SheetGrid, hdr_info: dict) -> Optional[List[str]]: """ Extracts multi-row headers, respecting merged cells, and merges them vertically. """ H = hdr_info.get("header_extent", 0) if H <= 0: return None # 1. Build lookup for merged cells merged_idx = {} for (r0, r1, c0, c1) in grid.merged_spans: for r in range(r0, r1 + 1): for c in range(c0, c1 + 1): merged_idx[(r, c)] = (r0, r1, c0, c1) # 2. Extract spans for each header row title_row = hdr_info.get("title_row") start_row = block.top + (1 if title_row is not None else 0) header_rows_spans = [] for r in range(start_row, start_row + H): spans = [] c = block.left while c <= block.right: txt = self._norm_text(grid.get_value(r, c)) if not txt: c += 1 continue # Determine horizontal span of this text if (r, c) in merged_idx: # Use merged cell info _, _, c0, c1 = merged_idx[(r, c)] # Clip to block bounds c0, c1 = max(c0, block.left), min(c1, block.right) if c == c0: # Add only once spans.append({"c0": c0, "c1": c1, "text": txt}) c = c1 + 1 else: # Collapse repeats if not strictly merged c0, c1 = c, c while c1 + 1 <= block.right and self._norm_text(grid.get_value(r, c1 + 1)) == txt: c1 += 1 spans.append({"c0": c0, "c1": c1, "text": txt}) c = c1 + 1 header_rows_spans.append(spans) # 3. Merge vertically down columns width = block.right - block.left + 1 col_tokens = [[] for _ in range(width)] for row_spans in header_rows_spans: for j in range(block.left, block.right + 1): # Check which span covers column j token = "" for sp in row_spans: if sp["c0"] <= j <= sp["c1"]: token = sp["text"] break if token: col_tokens[j - block.left].append(token) # 4. Join and Dedupe headers = [] for tokens in col_tokens: cleaned = [] prev = None for t in tokens: t = t.strip() if t and t != prev: cleaned.append(t) prev = t headers.append(" ".join(cleaned)) # 5. Ensure uniqueness return self._ensure_unique(headers) def _classify_block(self, block: Block, stats: dict, headers: Optional[List[str]]) -> BlockType: """Determines BlockType based on header existence and data presence.""" H = block.header_extent # Data starts after title + header offset = (1 if block.title else 0) + H data_start_row_idx = offset # relative to stats array 0-index # Count data rows (rows with >= 1 non-empty cell) data_rows = 0 counts = stats["row_nonempty_counts"] if data_start_row_idx < len(counts): for i in range(data_start_row_idx, len(counts)): if counts[i] > 0: data_rows += 1 if headers and data_rows > 0: return BlockType.COMPLETE_TABULAR if headers and data_rows == 0: return BlockType.HEADER_ONLY if not headers and data_rows > 0: return BlockType.DATA_ONLY return BlockType.NARRATIVE def _generate_df(self, block: Block, grid: SheetGrid, headers: List[str], hdr_info: dict) -> pd.DataFrame: """Creates DataFrame from the block's data region.""" H = hdr_info.get("header_extent", 0) tr = hdr_info.get("title_row") data_start = block.top + (1 if tr is not None else 0) + H rows = [] ncols = len(headers) for r in range(data_start, block.bottom + 1): raw_vals = [grid.get_value(r, c) for c in range(block.left, block.right + 1)] # Skip completely empty rows if all(self._is_val_empty(v) for v in raw_vals): continue # Normalize length if len(raw_vals) < ncols: raw_vals += [None] * (ncols - len(raw_vals)) elif len(raw_vals) > ncols: raw_vals = raw_vals[:ncols] rows.append(raw_vals) return pd.DataFrame(rows, columns=headers) # ----------------------------------------------------------------- # Helpers: Borrowing & Utils # ----------------------------------------------------------------- def _find_borrow_candidate(self, block: Block, candidates: List[Block]) -> Optional[Block]: """Finds a preceding header block with exact horizontal alignment.""" for cand in reversed(candidates): if cand.left == block.left and cand.right == block.right: return cand return None def _borrow_headers(self, block: Block, donor: Block) -> List[str]: """Copies headers from donor, adjusting length if necessary.""" width = block.right - block.left + 1 hdrs = list(donor.headers) if len(hdrs) < width: hdrs += [f"unnamed_{i}" for i in range(len(hdrs), width)] elif len(hdrs) > width: hdrs = hdrs[:width] return hdrs def _get_sheet_by_name(self, name: str) -> Optional[SheetGrid]: for s in self.sheets: if s.name == name: return s return None def _block_column_len(self, block: Block, grid: SheetGrid) -> int: """Counts columns that have at least one non-empty cell.""" count = 0 for c in range(block.left, block.right + 1): for r in range(block.top, block.bottom + 1): if not grid.is_empty(r, c): count += 1 break return count # ----------------------------------------------------------------- # Static Utils # ----------------------------------------------------------------- @staticmethod def _neighbors8() -> Iterable[Tuple[int, int]]: return [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)] @staticmethod def _compute_used_bounds(values: List[List[Any]]) -> Tuple[int, int]: """Trims trailing empty rows and columns.""" if not values: return 0, 0 nrows = len(values) ncols = max((len(row) for row in values), default=0) # Pad rows for row in values: if len(row) < ncols: row += [None] * (ncols - len(row)) def is_empty(v): return v is None or (isinstance(v, str) and not v.strip()) last_row = -1 for r in range(nrows - 1, -1, -1): if any(not is_empty(values[r][c]) for c in range(ncols)): last_row = r break if last_row < 0: return 0, 0 last_col = -1 for c in range(ncols - 1, -1, -1): if any(not is_empty(values[r][c]) for r in range(last_row + 1)): last_col = c break return last_row + 1, last_col + 1 @staticmethod def _consolidate_enclosures_minimal(blocks: List[Block]) -> List[Block]: """Removes blocks fully enclosed within larger blocks on the same sheet.""" by_sheet = {} for b in blocks: by_sheet.setdefault(b.sheet_idx, []).append(b) survivors = [] for _, sheet_blocks in by_sheet.items(): # Sort by area descending sorted_blocks = sorted(sheet_blocks, key=lambda b: b.area, reverse=True) dropped = set() for i, A in enumerate(sorted_blocks): if i in dropped: continue for j, B in enumerate(sorted_blocks): if i == j or j in dropped: continue # Check if B inside A if (A.left <= B.left and B.right <= A.right and A.top <= B.top and B.bottom <= A.bottom): dropped.add(j) survivors.extend([b for k, b in enumerate(sorted_blocks) if k not in dropped]) return survivors @staticmethod def _is_numeric_cell(val: Any) -> bool: if val is None: return False if isinstance(val, (int, float)): return not (isinstance(val, float) and math.isnan(val)) if isinstance(val, str): s = val.strip().lstrip("([{").rstrip(")]}") if not s: return False if s.endswith("%"): try: float(s[:-1]) return True except ValueError: pass try: float(s.replace(",", "")) return True except ValueError: return False return False @staticmethod def _is_val_empty(v: Any) -> bool: return v is None or (isinstance(v, str) and not v.strip()) @staticmethod def _norm_text(v: Any) -> str: if v is None: return "" return str(v).strip() @staticmethod def _ensure_unique(names: List[str]) -> List[str]: seen = {} out = [] for n in names: base = n if n else "unnamed" if base not in seen: seen[base] = 1 out.append(base) else: seen[base] += 1 out.append(f"{base}_{seen[base]}") return out
if __name__ == "__main__": parser = ExcelParser("https://www.ncei.noaa.gov/pub/data/paleo/contributions_by_author/frank1999/frank1999.xls") blocks = parser.parse() for block in blocks: print(f"Block ID: {block.idx}, Type: {block.block_type}, Sheet: {block.sheet_name}") if block.df is not None: print(block.df.head()) print(block.df.shape)