mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-15 09:21:36 +00:00
Add stdlib-only extraction for `.ipynb`, `.docx`, and `.xlsx` in read_file with lazy integration and malformed-document fallback.
295 lines
11 KiB
Python
295 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tests for structured-document extraction in the read_file tool.
|
|
|
|
Covers .ipynb / .docx / .xlsx extraction (ported from Kilo-Org/kilocode
|
|
#10733, #10737, #10740) and the read_file_tool integration: pagination,
|
|
line-numbering, graceful fallback on malformed input, and hidden-sheet
|
|
omission.
|
|
|
|
Run with: python -m pytest tests/tools/test_read_extract.py -v
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import tempfile
|
|
import unittest
|
|
import zipfile
|
|
|
|
from tools.read_extract import (
|
|
ExtractionError,
|
|
extract_document_text,
|
|
is_extractable_document,
|
|
)
|
|
from tools.file_tools import read_file_tool
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixture builders — construct minimal valid OOXML / notebook files.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _write_notebook(path, cells, nbformat=4):
|
|
nb = {"cells": cells, "metadata": {}, "nbformat": nbformat, "nbformat_minor": 5}
|
|
with open(path, "w", encoding="utf-8") as fh:
|
|
json.dump(nb, fh)
|
|
|
|
|
|
def _write_docx(path, document_xml):
|
|
with zipfile.ZipFile(path, "w") as z:
|
|
z.writestr("[Content_Types].xml", "<Types/>")
|
|
z.writestr("word/document.xml", document_xml)
|
|
|
|
|
|
def _write_xlsx(path, *, workbook, rels, shared, sheets):
|
|
"""sheets: dict of part-name -> xml string."""
|
|
with zipfile.ZipFile(path, "w") as z:
|
|
z.writestr("xl/workbook.xml", workbook)
|
|
z.writestr("xl/_rels/workbook.xml.rels", rels)
|
|
if shared is not None:
|
|
z.writestr("xl/sharedStrings.xml", shared)
|
|
for part, xml in sheets.items():
|
|
z.writestr(part, xml)
|
|
|
|
|
|
_NS_W = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
|
|
_NS_S = "http://schemas.openxmlformats.org/spreadsheetml/2006/main"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# is_extractable_document
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestIsExtractable(unittest.TestCase):
|
|
def test_recognized_extensions(self):
|
|
self.assertTrue(is_extractable_document("a.ipynb"))
|
|
self.assertTrue(is_extractable_document("/x/B.DOCX"))
|
|
self.assertTrue(is_extractable_document("report.xlsx"))
|
|
|
|
def test_unrecognized_extensions(self):
|
|
self.assertFalse(is_extractable_document("a.py"))
|
|
self.assertFalse(is_extractable_document("a.pdf"))
|
|
self.assertFalse(is_extractable_document("a.txt"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Notebooks (.ipynb) — #10733
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestNotebookExtraction(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp(prefix="rex_nb_")
|
|
|
|
def tearDown(self):
|
|
import shutil
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def test_markdown_and_code_in_order(self):
|
|
p = os.path.join(self.tmp, "nb.ipynb")
|
|
_write_notebook(p, [
|
|
{"cell_type": "markdown", "source": ["# Title\n", "para"]},
|
|
{"cell_type": "code", "source": "x = 1\nprint(x)",
|
|
"outputs": [{"output_type": "stream", "text": ["1\n"]}],
|
|
"execution_count": 1},
|
|
])
|
|
text = extract_document_text(p)
|
|
self.assertIn("# Title", text)
|
|
self.assertIn("print(x)", text)
|
|
# Output payloads must NOT leak into the extracted text.
|
|
self.assertNotIn("output_type", text)
|
|
self.assertNotIn("execution_count", text)
|
|
# Order preserved: markdown before code.
|
|
self.assertLess(text.index("Title"), text.index("print(x)"))
|
|
|
|
def test_string_source_form(self):
|
|
p = os.path.join(self.tmp, "nb2.ipynb")
|
|
_write_notebook(p, [{"cell_type": "code", "source": "single string source"}])
|
|
self.assertIn("single string source", extract_document_text(p))
|
|
|
|
def test_legacy_worksheets_form(self):
|
|
p = os.path.join(self.tmp, "nb3.ipynb")
|
|
nb = {"worksheets": [{"cells": [
|
|
{"cell_type": "code", "input": "ignored", "source": "legacy cell"}]}],
|
|
"nbformat": 3}
|
|
with open(p, "w") as fh:
|
|
json.dump(nb, fh)
|
|
self.assertIn("legacy cell", extract_document_text(p))
|
|
|
|
def test_malformed_notebook_raises(self):
|
|
p = os.path.join(self.tmp, "bad.ipynb")
|
|
with open(p, "w") as fh:
|
|
fh.write("{ not valid json")
|
|
with self.assertRaises(ExtractionError):
|
|
extract_document_text(p)
|
|
|
|
def test_empty_cells_raises(self):
|
|
p = os.path.join(self.tmp, "empty.ipynb")
|
|
_write_notebook(p, [])
|
|
with self.assertRaises(ExtractionError):
|
|
extract_document_text(p)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Word documents (.docx) — #10737
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDocxExtraction(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp(prefix="rex_docx_")
|
|
|
|
def tearDown(self):
|
|
import shutil
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def _doc(self, body):
|
|
return (f'<?xml version="1.0"?><w:document xmlns:w="{_NS_W}">'
|
|
f'<w:body>{body}</w:body></w:document>')
|
|
|
|
def test_paragraphs_and_runs(self):
|
|
p = os.path.join(self.tmp, "d.docx")
|
|
_write_docx(p, self._doc(
|
|
'<w:p><w:r><w:t>Hello </w:t></w:r><w:r><w:t>World</w:t></w:r></w:p>'
|
|
'<w:p><w:r><w:t>Second</w:t></w:r></w:p>'))
|
|
text = extract_document_text(p)
|
|
self.assertIn("Hello World", text)
|
|
self.assertIn("Second", text)
|
|
|
|
def test_tabs_and_breaks(self):
|
|
p = os.path.join(self.tmp, "d2.docx")
|
|
_write_docx(p, self._doc(
|
|
'<w:p><w:r><w:t>A</w:t><w:tab/><w:t>B</w:t><w:br/><w:t>C</w:t></w:r></w:p>'))
|
|
text = extract_document_text(p)
|
|
self.assertIn("A\tB", text)
|
|
self.assertIn("C", text)
|
|
|
|
def test_not_a_zip_raises(self):
|
|
p = os.path.join(self.tmp, "bad.docx")
|
|
with open(p, "wb") as fh:
|
|
fh.write(b"plain bytes, not a zip")
|
|
with self.assertRaises(ExtractionError):
|
|
extract_document_text(p)
|
|
|
|
def test_missing_document_xml_raises(self):
|
|
p = os.path.join(self.tmp, "nodoc.docx")
|
|
with zipfile.ZipFile(p, "w") as z:
|
|
z.writestr("other.xml", "<x/>")
|
|
with self.assertRaises(ExtractionError):
|
|
extract_document_text(p)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Excel workbooks (.xlsx) — #10740
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestXlsxExtraction(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp(prefix="rex_xlsx_")
|
|
|
|
def tearDown(self):
|
|
import shutil
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def _build(self, path, *, include_hidden=True):
|
|
r = "http://schemas.openxmlformats.org/officeDocument/2006/relationships"
|
|
hidden_sheet = (f'<sheet name="Hidden" sheetId="2" state="hidden" '
|
|
f'xmlns:r="{r}" r:id="rId2"/>') if include_hidden else ""
|
|
workbook = (
|
|
f'<workbook xmlns="{_NS_S}" xmlns:r="{r}"><sheets>'
|
|
f'<sheet name="Data" sheetId="1" r:id="rId1"/>{hidden_sheet}'
|
|
f'</sheets></workbook>')
|
|
rels = (
|
|
'<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">'
|
|
'<Relationship Id="rId1" Target="worksheets/sheet1.xml" Type="x"/>'
|
|
'<Relationship Id="rId2" Target="worksheets/sheet2.xml" Type="x"/>'
|
|
'</Relationships>')
|
|
shared = (f'<sst xmlns="{_NS_S}"><si><t>Name</t></si><si><t>Score</t></si>'
|
|
f'<si><t>Alice</t></si></sst>')
|
|
sheet1 = (
|
|
f'<worksheet xmlns="{_NS_S}"><sheetData>'
|
|
'<row r="1"><c r="A1" t="s"><v>0</v></c><c r="B1" t="s"><v>1</v></c></row>'
|
|
'<row r="2"><c r="A2" t="s"><v>2</v></c><c r="B2"><v>95</v></c></row>'
|
|
'</sheetData></worksheet>')
|
|
sheet2 = (f'<worksheet xmlns="{_NS_S}"><sheetData>'
|
|
'<row r="1"><c r="A1" t="str"><v>SECRETDATA</v></c></row>'
|
|
'</sheetData></worksheet>')
|
|
_write_xlsx(path, workbook=workbook, rels=rels, shared=shared,
|
|
sheets={"xl/worksheets/sheet1.xml": sheet1,
|
|
"xl/worksheets/sheet2.xml": sheet2})
|
|
|
|
def test_visible_sheet_content(self):
|
|
p = os.path.join(self.tmp, "wb.xlsx")
|
|
self._build(p)
|
|
text = extract_document_text(p)
|
|
self.assertIn("Data", text) # sheet label
|
|
self.assertIn("Name\tScore", text) # shared-string header row
|
|
self.assertIn("Alice\t95", text) # string + numeric cells
|
|
|
|
def test_hidden_sheet_omitted(self):
|
|
p = os.path.join(self.tmp, "wb2.xlsx")
|
|
self._build(p)
|
|
text = extract_document_text(p)
|
|
self.assertNotIn("SECRETDATA", text)
|
|
self.assertNotIn("Hidden", text)
|
|
|
|
def test_not_a_zip_raises(self):
|
|
p = os.path.join(self.tmp, "bad.xlsx")
|
|
with open(p, "wb") as fh:
|
|
fh.write(b"nope")
|
|
with self.assertRaises(ExtractionError):
|
|
extract_document_text(p)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# read_file_tool integration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestReadFileToolIntegration(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp(prefix="rex_int_")
|
|
|
|
def tearDown(self):
|
|
import shutil
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def test_notebook_read_is_line_numbered(self):
|
|
p = os.path.join(self.tmp, "nb.ipynb")
|
|
_write_notebook(p, [
|
|
{"cell_type": "markdown", "source": "# H"},
|
|
{"cell_type": "code", "source": "print(1)"},
|
|
])
|
|
res = json.loads(read_file_tool(p))
|
|
self.assertTrue(res.get("extracted_document"))
|
|
self.assertIn("1|", res["content"]) # line-number gutter
|
|
self.assertIn("print(1)", res["content"])
|
|
|
|
def test_pagination(self):
|
|
p = os.path.join(self.tmp, "nb.ipynb")
|
|
_write_notebook(p, [
|
|
{"cell_type": "code", "source": "a\nb\nc\nd\ne\nf"},
|
|
])
|
|
res = json.loads(read_file_tool(p, offset=1, limit=2))
|
|
self.assertTrue(res.get("truncated"))
|
|
self.assertIn("offset=3", res.get("hint", ""))
|
|
# Only first 2 lines present.
|
|
self.assertIn("1|# ── Code cell 1 ──", res["content"])
|
|
|
|
def test_corrupt_docx_falls_through_to_binary_guard(self):
|
|
p = os.path.join(self.tmp, "bad.docx")
|
|
with open(p, "wb") as fh:
|
|
fh.write(b"not a zip")
|
|
res = json.loads(read_file_tool(p))
|
|
# Should NOT crash; falls through to the binary-extension guard.
|
|
self.assertIn("error", res)
|
|
self.assertIn("binary", res["error"].lower())
|
|
|
|
def test_docx_read_extracts(self):
|
|
p = os.path.join(self.tmp, "d.docx")
|
|
_write_docx(p, (f'<?xml version="1.0"?><w:document xmlns:w="{_NS_W}">'
|
|
'<w:body><w:p><w:r><w:t>Report body</w:t></w:r></w:p>'
|
|
'</w:body></w:document>'))
|
|
res = json.loads(read_file_tool(p))
|
|
self.assertTrue(res.get("extracted_document"))
|
|
self.assertIn("Report body", res["content"])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|