mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-15 09:21:36 +00:00
feat(read): extract notebook and office documents (#37082)
Add stdlib-only extraction for `.ipynb`, `.docx`, and `.xlsx` in read_file with lazy integration and malformed-document fallback.
This commit is contained in:
parent
2b67e96aec
commit
817f392311
3 changed files with 590 additions and 1 deletions
295
tests/tools/test_read_extract.py
Normal file
295
tests/tools/test_read_extract.py
Normal file
|
|
@ -0,0 +1,295 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for structured-document extraction in the read_file tool.
|
||||
|
||||
Covers .ipynb / .docx / .xlsx extraction (ported from Kilo-Org/kilocode
|
||||
#10733, #10737, #10740) and the read_file_tool integration: pagination,
|
||||
line-numbering, graceful fallback on malformed input, and hidden-sheet
|
||||
omission.
|
||||
|
||||
Run with: python -m pytest tests/tools/test_read_extract.py -v
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
import zipfile
|
||||
|
||||
from tools.read_extract import (
|
||||
ExtractionError,
|
||||
extract_document_text,
|
||||
is_extractable_document,
|
||||
)
|
||||
from tools.file_tools import read_file_tool
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixture builders — construct minimal valid OOXML / notebook files.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _write_notebook(path, cells, nbformat=4):
|
||||
nb = {"cells": cells, "metadata": {}, "nbformat": nbformat, "nbformat_minor": 5}
|
||||
with open(path, "w", encoding="utf-8") as fh:
|
||||
json.dump(nb, fh)
|
||||
|
||||
|
||||
def _write_docx(path, document_xml):
|
||||
with zipfile.ZipFile(path, "w") as z:
|
||||
z.writestr("[Content_Types].xml", "<Types/>")
|
||||
z.writestr("word/document.xml", document_xml)
|
||||
|
||||
|
||||
def _write_xlsx(path, *, workbook, rels, shared, sheets):
|
||||
"""sheets: dict of part-name -> xml string."""
|
||||
with zipfile.ZipFile(path, "w") as z:
|
||||
z.writestr("xl/workbook.xml", workbook)
|
||||
z.writestr("xl/_rels/workbook.xml.rels", rels)
|
||||
if shared is not None:
|
||||
z.writestr("xl/sharedStrings.xml", shared)
|
||||
for part, xml in sheets.items():
|
||||
z.writestr(part, xml)
|
||||
|
||||
|
||||
_NS_W = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
|
||||
_NS_S = "http://schemas.openxmlformats.org/spreadsheetml/2006/main"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# is_extractable_document
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestIsExtractable(unittest.TestCase):
|
||||
def test_recognized_extensions(self):
|
||||
self.assertTrue(is_extractable_document("a.ipynb"))
|
||||
self.assertTrue(is_extractable_document("/x/B.DOCX"))
|
||||
self.assertTrue(is_extractable_document("report.xlsx"))
|
||||
|
||||
def test_unrecognized_extensions(self):
|
||||
self.assertFalse(is_extractable_document("a.py"))
|
||||
self.assertFalse(is_extractable_document("a.pdf"))
|
||||
self.assertFalse(is_extractable_document("a.txt"))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Notebooks (.ipynb) — #10733
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestNotebookExtraction(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp(prefix="rex_nb_")
|
||||
|
||||
def tearDown(self):
|
||||
import shutil
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
def test_markdown_and_code_in_order(self):
|
||||
p = os.path.join(self.tmp, "nb.ipynb")
|
||||
_write_notebook(p, [
|
||||
{"cell_type": "markdown", "source": ["# Title\n", "para"]},
|
||||
{"cell_type": "code", "source": "x = 1\nprint(x)",
|
||||
"outputs": [{"output_type": "stream", "text": ["1\n"]}],
|
||||
"execution_count": 1},
|
||||
])
|
||||
text = extract_document_text(p)
|
||||
self.assertIn("# Title", text)
|
||||
self.assertIn("print(x)", text)
|
||||
# Output payloads must NOT leak into the extracted text.
|
||||
self.assertNotIn("output_type", text)
|
||||
self.assertNotIn("execution_count", text)
|
||||
# Order preserved: markdown before code.
|
||||
self.assertLess(text.index("Title"), text.index("print(x)"))
|
||||
|
||||
def test_string_source_form(self):
|
||||
p = os.path.join(self.tmp, "nb2.ipynb")
|
||||
_write_notebook(p, [{"cell_type": "code", "source": "single string source"}])
|
||||
self.assertIn("single string source", extract_document_text(p))
|
||||
|
||||
def test_legacy_worksheets_form(self):
|
||||
p = os.path.join(self.tmp, "nb3.ipynb")
|
||||
nb = {"worksheets": [{"cells": [
|
||||
{"cell_type": "code", "input": "ignored", "source": "legacy cell"}]}],
|
||||
"nbformat": 3}
|
||||
with open(p, "w") as fh:
|
||||
json.dump(nb, fh)
|
||||
self.assertIn("legacy cell", extract_document_text(p))
|
||||
|
||||
def test_malformed_notebook_raises(self):
|
||||
p = os.path.join(self.tmp, "bad.ipynb")
|
||||
with open(p, "w") as fh:
|
||||
fh.write("{ not valid json")
|
||||
with self.assertRaises(ExtractionError):
|
||||
extract_document_text(p)
|
||||
|
||||
def test_empty_cells_raises(self):
|
||||
p = os.path.join(self.tmp, "empty.ipynb")
|
||||
_write_notebook(p, [])
|
||||
with self.assertRaises(ExtractionError):
|
||||
extract_document_text(p)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Word documents (.docx) — #10737
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDocxExtraction(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp(prefix="rex_docx_")
|
||||
|
||||
def tearDown(self):
|
||||
import shutil
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
def _doc(self, body):
|
||||
return (f'<?xml version="1.0"?><w:document xmlns:w="{_NS_W}">'
|
||||
f'<w:body>{body}</w:body></w:document>')
|
||||
|
||||
def test_paragraphs_and_runs(self):
|
||||
p = os.path.join(self.tmp, "d.docx")
|
||||
_write_docx(p, self._doc(
|
||||
'<w:p><w:r><w:t>Hello </w:t></w:r><w:r><w:t>World</w:t></w:r></w:p>'
|
||||
'<w:p><w:r><w:t>Second</w:t></w:r></w:p>'))
|
||||
text = extract_document_text(p)
|
||||
self.assertIn("Hello World", text)
|
||||
self.assertIn("Second", text)
|
||||
|
||||
def test_tabs_and_breaks(self):
|
||||
p = os.path.join(self.tmp, "d2.docx")
|
||||
_write_docx(p, self._doc(
|
||||
'<w:p><w:r><w:t>A</w:t><w:tab/><w:t>B</w:t><w:br/><w:t>C</w:t></w:r></w:p>'))
|
||||
text = extract_document_text(p)
|
||||
self.assertIn("A\tB", text)
|
||||
self.assertIn("C", text)
|
||||
|
||||
def test_not_a_zip_raises(self):
|
||||
p = os.path.join(self.tmp, "bad.docx")
|
||||
with open(p, "wb") as fh:
|
||||
fh.write(b"plain bytes, not a zip")
|
||||
with self.assertRaises(ExtractionError):
|
||||
extract_document_text(p)
|
||||
|
||||
def test_missing_document_xml_raises(self):
|
||||
p = os.path.join(self.tmp, "nodoc.docx")
|
||||
with zipfile.ZipFile(p, "w") as z:
|
||||
z.writestr("other.xml", "<x/>")
|
||||
with self.assertRaises(ExtractionError):
|
||||
extract_document_text(p)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Excel workbooks (.xlsx) — #10740
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestXlsxExtraction(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp(prefix="rex_xlsx_")
|
||||
|
||||
def tearDown(self):
|
||||
import shutil
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
def _build(self, path, *, include_hidden=True):
|
||||
r = "http://schemas.openxmlformats.org/officeDocument/2006/relationships"
|
||||
hidden_sheet = (f'<sheet name="Hidden" sheetId="2" state="hidden" '
|
||||
f'xmlns:r="{r}" r:id="rId2"/>') if include_hidden else ""
|
||||
workbook = (
|
||||
f'<workbook xmlns="{_NS_S}" xmlns:r="{r}"><sheets>'
|
||||
f'<sheet name="Data" sheetId="1" r:id="rId1"/>{hidden_sheet}'
|
||||
f'</sheets></workbook>')
|
||||
rels = (
|
||||
'<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">'
|
||||
'<Relationship Id="rId1" Target="worksheets/sheet1.xml" Type="x"/>'
|
||||
'<Relationship Id="rId2" Target="worksheets/sheet2.xml" Type="x"/>'
|
||||
'</Relationships>')
|
||||
shared = (f'<sst xmlns="{_NS_S}"><si><t>Name</t></si><si><t>Score</t></si>'
|
||||
f'<si><t>Alice</t></si></sst>')
|
||||
sheet1 = (
|
||||
f'<worksheet xmlns="{_NS_S}"><sheetData>'
|
||||
'<row r="1"><c r="A1" t="s"><v>0</v></c><c r="B1" t="s"><v>1</v></c></row>'
|
||||
'<row r="2"><c r="A2" t="s"><v>2</v></c><c r="B2"><v>95</v></c></row>'
|
||||
'</sheetData></worksheet>')
|
||||
sheet2 = (f'<worksheet xmlns="{_NS_S}"><sheetData>'
|
||||
'<row r="1"><c r="A1" t="str"><v>SECRETDATA</v></c></row>'
|
||||
'</sheetData></worksheet>')
|
||||
_write_xlsx(path, workbook=workbook, rels=rels, shared=shared,
|
||||
sheets={"xl/worksheets/sheet1.xml": sheet1,
|
||||
"xl/worksheets/sheet2.xml": sheet2})
|
||||
|
||||
def test_visible_sheet_content(self):
|
||||
p = os.path.join(self.tmp, "wb.xlsx")
|
||||
self._build(p)
|
||||
text = extract_document_text(p)
|
||||
self.assertIn("Data", text) # sheet label
|
||||
self.assertIn("Name\tScore", text) # shared-string header row
|
||||
self.assertIn("Alice\t95", text) # string + numeric cells
|
||||
|
||||
def test_hidden_sheet_omitted(self):
|
||||
p = os.path.join(self.tmp, "wb2.xlsx")
|
||||
self._build(p)
|
||||
text = extract_document_text(p)
|
||||
self.assertNotIn("SECRETDATA", text)
|
||||
self.assertNotIn("Hidden", text)
|
||||
|
||||
def test_not_a_zip_raises(self):
|
||||
p = os.path.join(self.tmp, "bad.xlsx")
|
||||
with open(p, "wb") as fh:
|
||||
fh.write(b"nope")
|
||||
with self.assertRaises(ExtractionError):
|
||||
extract_document_text(p)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# read_file_tool integration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestReadFileToolIntegration(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp(prefix="rex_int_")
|
||||
|
||||
def tearDown(self):
|
||||
import shutil
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
def test_notebook_read_is_line_numbered(self):
|
||||
p = os.path.join(self.tmp, "nb.ipynb")
|
||||
_write_notebook(p, [
|
||||
{"cell_type": "markdown", "source": "# H"},
|
||||
{"cell_type": "code", "source": "print(1)"},
|
||||
])
|
||||
res = json.loads(read_file_tool(p))
|
||||
self.assertTrue(res.get("extracted_document"))
|
||||
self.assertIn("1|", res["content"]) # line-number gutter
|
||||
self.assertIn("print(1)", res["content"])
|
||||
|
||||
def test_pagination(self):
|
||||
p = os.path.join(self.tmp, "nb.ipynb")
|
||||
_write_notebook(p, [
|
||||
{"cell_type": "code", "source": "a\nb\nc\nd\ne\nf"},
|
||||
])
|
||||
res = json.loads(read_file_tool(p, offset=1, limit=2))
|
||||
self.assertTrue(res.get("truncated"))
|
||||
self.assertIn("offset=3", res.get("hint", ""))
|
||||
# Only first 2 lines present.
|
||||
self.assertIn("1|# ── Code cell 1 ──", res["content"])
|
||||
|
||||
def test_corrupt_docx_falls_through_to_binary_guard(self):
|
||||
p = os.path.join(self.tmp, "bad.docx")
|
||||
with open(p, "wb") as fh:
|
||||
fh.write(b"not a zip")
|
||||
res = json.loads(read_file_tool(p))
|
||||
# Should NOT crash; falls through to the binary-extension guard.
|
||||
self.assertIn("error", res)
|
||||
self.assertIn("binary", res["error"].lower())
|
||||
|
||||
def test_docx_read_extracts(self):
|
||||
p = os.path.join(self.tmp, "d.docx")
|
||||
_write_docx(p, (f'<?xml version="1.0"?><w:document xmlns:w="{_NS_W}">'
|
||||
'<w:body><w:p><w:r><w:t>Report body</w:t></w:r></w:p>'
|
||||
'</w:body></w:document>'))
|
||||
res = json.loads(read_file_tool(p))
|
||||
self.assertTrue(res.get("extracted_document"))
|
||||
self.assertIn("Report body", res["content"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
@ -760,6 +760,52 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
|
|||
|
||||
_resolved = _resolve_path_for_task(path, task_id)
|
||||
|
||||
# ── Structured-document extraction ────────────────────────────
|
||||
# Try before the binary-extension guard so .docx/.xlsx can render as text.
|
||||
# Malformed documents fall through to the normal path/binary guard.
|
||||
from tools.read_extract import ExtractionError, extract_document_text, is_extractable_document
|
||||
|
||||
if is_extractable_document(str(_resolved)):
|
||||
try:
|
||||
extracted_text = extract_document_text(str(_resolved))
|
||||
except ExtractionError:
|
||||
logger.debug("document extraction failed for %s", path, exc_info=True)
|
||||
else:
|
||||
file_ops = _get_file_ops(task_id)
|
||||
lines = extracted_text.splitlines()
|
||||
total_lines = len(lines)
|
||||
end_line = offset + limit - 1
|
||||
page_text = "\n".join(lines[offset - 1:end_line])
|
||||
result_dict = {
|
||||
"content": file_ops._add_line_numbers(page_text, offset) if page_text else "",
|
||||
"total_lines": total_lines,
|
||||
"file_size": os.path.getsize(_resolved),
|
||||
"truncated": total_lines > end_line,
|
||||
"extracted_document": True,
|
||||
}
|
||||
if result_dict["truncated"]:
|
||||
result_dict["hint"] = (
|
||||
f"Use offset={end_line + 1} to continue reading "
|
||||
f"(showing {offset}-{min(end_line, total_lines)} of {total_lines} lines)"
|
||||
)
|
||||
content_len = len(result_dict["content"])
|
||||
max_chars = _get_max_read_chars()
|
||||
if content_len > max_chars:
|
||||
return json.dumps({
|
||||
"error": (
|
||||
f"Read produced {content_len:,} characters which exceeds "
|
||||
f"the safety limit ({max_chars:,} chars). "
|
||||
"Use offset and limit to read a smaller range. "
|
||||
f"The document has {total_lines} lines of extracted text."
|
||||
),
|
||||
"path": path,
|
||||
"total_lines": total_lines,
|
||||
"file_size": result_dict["file_size"],
|
||||
}, ensure_ascii=False)
|
||||
if result_dict["content"]:
|
||||
result_dict["content"] = redact_sensitive_text(result_dict["content"], code_file=True)
|
||||
return json.dumps(result_dict, ensure_ascii=False)
|
||||
|
||||
# ── Binary file guard ─────────────────────────────────────────
|
||||
# Block binary files by extension (no I/O).
|
||||
if has_binary_extension(str(_resolved)):
|
||||
|
|
@ -1427,7 +1473,7 @@ def _check_file_reqs():
|
|||
|
||||
READ_FILE_SCHEMA = {
|
||||
"name": "read_file",
|
||||
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Cannot read images or binary files — use vision_analyze for images.",
|
||||
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. Jupyter notebooks (.ipynb), Word documents (.docx), and Excel workbooks (.xlsx) are auto-extracted to readable text. NOTE: Cannot read images or other binary files — use vision_analyze for images.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
|
|
|||
248
tools/read_extract.py
Normal file
248
tools/read_extract.py
Normal file
|
|
@ -0,0 +1,248 @@
|
|||
"""Stdlib document-to-text extraction for ``read_file``.
|
||||
|
||||
Supports Jupyter notebooks, DOCX, and XLSX without adding hard dependencies.
|
||||
Malformed documents raise :class:`ExtractionError`; callers can then fall back to
|
||||
normal text/binary handling.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import posixpath
|
||||
import zipfile
|
||||
from pathlib import Path
|
||||
from xml.etree import ElementTree as ET
|
||||
|
||||
__all__ = ["EXTRACTABLE_EXTENSIONS", "ExtractionError", "extract_document_text", "is_extractable_document"]
|
||||
|
||||
EXTRACTABLE_EXTENSIONS = frozenset({".ipynb", ".docx", ".xlsx"})
|
||||
MAX_XLSX_BYTES = 50 * 1024 * 1024
|
||||
_MAX_XLSX_ROWS_PER_SHEET = 5000
|
||||
_MAX_XLSX_COLS = 256
|
||||
|
||||
_NS_W = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
|
||||
_NS_S = "http://schemas.openxmlformats.org/spreadsheetml/2006/main"
|
||||
_NS_REL = "http://schemas.openxmlformats.org/officeDocument/2006/relationships"
|
||||
_NS_PKG_REL = "http://schemas.openxmlformats.org/package/2006/relationships"
|
||||
|
||||
|
||||
class ExtractionError(Exception):
|
||||
"""Raised when a supported-looking document cannot be rendered as text."""
|
||||
|
||||
|
||||
def _extension(path: str) -> str:
|
||||
ext = Path(path).suffix.lower()
|
||||
return ext if ext in EXTRACTABLE_EXTENSIONS else ""
|
||||
|
||||
|
||||
def is_extractable_document(path: str) -> bool:
|
||||
return bool(_extension(path))
|
||||
|
||||
|
||||
def extract_document_text(path: str) -> str:
|
||||
ext = _extension(path)
|
||||
if ext == ".ipynb":
|
||||
return _extract_notebook(path)
|
||||
if ext == ".docx":
|
||||
return _extract_docx(path)
|
||||
if ext == ".xlsx":
|
||||
return _extract_xlsx(path)
|
||||
raise ExtractionError(f"Unsupported document type: {path!r}")
|
||||
|
||||
|
||||
def _source_text(source) -> str:
|
||||
if isinstance(source, str):
|
||||
return source
|
||||
if isinstance(source, list):
|
||||
return "".join(item for item in source if isinstance(item, str))
|
||||
return ""
|
||||
|
||||
|
||||
def _extract_notebook(path: str) -> str:
|
||||
try:
|
||||
with open(path, encoding="utf-8", errors="replace") as fh:
|
||||
nb = json.load(fh)
|
||||
except (OSError, ValueError, json.JSONDecodeError) as exc:
|
||||
raise ExtractionError(f"Not a valid notebook: {exc}") from exc
|
||||
if not isinstance(nb, dict):
|
||||
raise ExtractionError("Notebook root is not an object")
|
||||
|
||||
cells = nb.get("cells")
|
||||
if not isinstance(cells, list):
|
||||
cells = [
|
||||
cell
|
||||
for ws in nb.get("worksheets", [])
|
||||
if isinstance(ws, dict)
|
||||
for cell in ws.get("cells", [])
|
||||
]
|
||||
if not cells:
|
||||
raise ExtractionError("Notebook contains no cells")
|
||||
|
||||
counts = {"markdown": 0, "code": 0, "raw": 0}
|
||||
labels = {"markdown": "Markdown", "code": "Code", "raw": "Raw"}
|
||||
out: list[str] = []
|
||||
for cell in cells:
|
||||
if not isinstance(cell, dict):
|
||||
continue
|
||||
typ = cell.get("cell_type")
|
||||
if typ not in labels:
|
||||
continue
|
||||
counts[typ] += 1
|
||||
suffix = f" {counts[typ]}" if typ != "raw" else ""
|
||||
out.extend((f"# ── {labels[typ]} cell{suffix} ──", _source_text(cell.get("source", "")).rstrip("\n"), ""))
|
||||
if not out:
|
||||
raise ExtractionError("Notebook contains no readable cells")
|
||||
return "\n".join(out).rstrip("\n") + "\n"
|
||||
|
||||
|
||||
def _zip_xml(zf: zipfile.ZipFile, name: str) -> ET.Element:
|
||||
try:
|
||||
return ET.fromstring(zf.read(name))
|
||||
except KeyError as exc:
|
||||
raise ExtractionError(f"Missing {name}") from exc
|
||||
except ET.ParseError as exc:
|
||||
raise ExtractionError(f"Malformed XML in {name}: {exc}") from exc
|
||||
|
||||
|
||||
def _extract_docx(path: str) -> str:
|
||||
try:
|
||||
with zipfile.ZipFile(path) as zf:
|
||||
root = _zip_xml(zf, "word/document.xml")
|
||||
except zipfile.BadZipFile as exc:
|
||||
raise ExtractionError(f"Not a valid DOCX: {exc}") from exc
|
||||
except OSError as exc:
|
||||
raise ExtractionError(str(exc)) from exc
|
||||
|
||||
w = f"{{{_NS_W}}}"
|
||||
lines: list[str] = []
|
||||
for para in root.iter(f"{w}p"):
|
||||
buf: list[str] = []
|
||||
for node in para.iter():
|
||||
if node.tag == f"{w}t":
|
||||
buf.append(node.text or "")
|
||||
elif node.tag == f"{w}tab":
|
||||
buf.append("\t")
|
||||
elif node.tag in {f"{w}br", f"{w}cr"}:
|
||||
buf.append("\n")
|
||||
lines.extend("".join(buf).split("\n"))
|
||||
if not any(line.strip() for line in lines):
|
||||
raise ExtractionError("DOCX contains no extractable text")
|
||||
return "\n".join(lines).rstrip("\n") + "\n"
|
||||
|
||||
|
||||
def _extract_xlsx(path: str) -> str:
|
||||
try:
|
||||
with zipfile.ZipFile(path) as zf:
|
||||
names = set(zf.namelist())
|
||||
shared = _shared_strings(zf, names)
|
||||
sheets = _workbook_sheets(zf)
|
||||
rels = _workbook_rels(zf, names)
|
||||
out: list[str] = []
|
||||
for name, state, rid in sheets:
|
||||
if state in {"hidden", "veryHidden"}:
|
||||
continue
|
||||
part = _sheet_part(rels.get(rid, ""))
|
||||
if part not in names:
|
||||
continue
|
||||
try:
|
||||
rows = _sheet_rows(zf.read(part), shared)
|
||||
except ET.ParseError:
|
||||
continue
|
||||
out.append(f"# ── Sheet: {name} ──")
|
||||
out.extend("\t".join(row) for row in rows)
|
||||
if not rows:
|
||||
out.append("(empty)")
|
||||
out.append("")
|
||||
except zipfile.BadZipFile as exc:
|
||||
raise ExtractionError(f"Not a valid XLSX: {exc}") from exc
|
||||
except OSError as exc:
|
||||
raise ExtractionError(str(exc)) from exc
|
||||
|
||||
if not out:
|
||||
raise ExtractionError("XLSX has no visible sheets with content")
|
||||
return "\n".join(out).rstrip("\n") + "\n"
|
||||
|
||||
|
||||
def _shared_strings(zf: zipfile.ZipFile, names: set[str]) -> list[str]:
|
||||
if "xl/sharedStrings.xml" not in names:
|
||||
return []
|
||||
try:
|
||||
root = ET.fromstring(zf.read("xl/sharedStrings.xml"))
|
||||
except ET.ParseError:
|
||||
return []
|
||||
s = f"{{{_NS_S}}}"
|
||||
return ["".join(t.text or "" for t in item.iter(f"{s}t")) for item in root.iter(f"{s}si")]
|
||||
|
||||
|
||||
def _workbook_sheets(zf: zipfile.ZipFile) -> list[tuple[str, str, str]]:
|
||||
root = _zip_xml(zf, "xl/workbook.xml")
|
||||
s, r = f"{{{_NS_S}}}", f"{{{_NS_REL}}}"
|
||||
return [
|
||||
(sheet.get("name", "Sheet"), sheet.get("state", "visible"), sheet.get(f"{r}id", ""))
|
||||
for sheet in root.iter(f"{s}sheet")
|
||||
]
|
||||
|
||||
|
||||
def _workbook_rels(zf: zipfile.ZipFile, names: set[str]) -> dict[str, str]:
|
||||
rels_path = "xl/_rels/workbook.xml.rels"
|
||||
if rels_path not in names:
|
||||
return {}
|
||||
try:
|
||||
root = ET.fromstring(zf.read(rels_path))
|
||||
except ET.ParseError:
|
||||
return {}
|
||||
rel_tag = f"{{{_NS_PKG_REL}}}Relationship"
|
||||
return {rel.get("Id", ""): rel.get("Target", "") for rel in root.iter(rel_tag) if rel.get("Id")}
|
||||
|
||||
|
||||
def _sheet_part(target: str) -> str:
|
||||
target = target.lstrip("/")
|
||||
return posixpath.normpath(target if target.startswith("xl/") else f"xl/{target}")
|
||||
|
||||
|
||||
def _col_index(ref: str) -> int:
|
||||
idx = 0
|
||||
for ch in ref:
|
||||
if not ch.isalpha():
|
||||
break
|
||||
idx = idx * 26 + ord(ch.upper()) - ord("A") + 1
|
||||
return max(idx - 1, 0)
|
||||
|
||||
|
||||
def _sheet_rows(xml_bytes: bytes, shared: list[str]) -> list[list[str]]:
|
||||
root = ET.fromstring(xml_bytes)
|
||||
s = f"{{{_NS_S}}}"
|
||||
rows: list[list[str]] = []
|
||||
for row in root.iter(f"{s}row"):
|
||||
if len(rows) >= _MAX_XLSX_ROWS_PER_SHEET:
|
||||
break
|
||||
cells: dict[int, str] = {}
|
||||
max_col = -1
|
||||
for cell in row.iter(f"{s}c"):
|
||||
col = _col_index(cell.get("r", "")) if cell.get("r") else max_col + 1
|
||||
if col >= _MAX_XLSX_COLS:
|
||||
continue
|
||||
cells[col] = _cell_value(cell, shared, s)
|
||||
max_col = max(max_col, col)
|
||||
rows.append([cells.get(i, "") for i in range(max_col + 1)] if max_col >= 0 else [])
|
||||
while rows and not any(value.strip() for value in rows[-1]):
|
||||
rows.pop()
|
||||
return rows
|
||||
|
||||
|
||||
def _cell_value(cell: ET.Element, shared: list[str], s: str) -> str:
|
||||
value = cell.findtext(f"{s}v") or ""
|
||||
typ = cell.get("t", "")
|
||||
if typ == "s":
|
||||
try:
|
||||
return shared[int(value)]
|
||||
except (ValueError, IndexError):
|
||||
return ""
|
||||
if typ == "inlineStr":
|
||||
inline = cell.find(f"{s}is")
|
||||
return "" if inline is None else "".join(t.text or "" for t in inline.iter(f"{s}t"))
|
||||
if typ == "b":
|
||||
return "TRUE" if value.strip() in {"1", "true", "TRUE"} else "FALSE"
|
||||
if typ == "e":
|
||||
return value or "#ERROR"
|
||||
return value
|
||||
Loading…
Add table
Add a link
Reference in a new issue