#!/usr/bin/env python3
"""
Tests for structured-document extraction in the read_file tool.
Covers .ipynb / .docx / .xlsx extraction (ported from Kilo-Org/kilocode
#10733, #10737, #10740) and the read_file_tool integration: pagination,
line-numbering, graceful fallback on malformed input, and hidden-sheet
omission.
Run with: python -m pytest tests/tools/test_read_extract.py -v
"""
import json
import os
import tempfile
import unittest
import zipfile
from tools.read_extract import (
ExtractionError,
extract_document_text,
is_extractable_document,
)
from tools.file_tools import read_file_tool
# ---------------------------------------------------------------------------
# Fixture builders — construct minimal valid OOXML / notebook files.
# ---------------------------------------------------------------------------
def _write_notebook(path, cells, nbformat=4):
nb = {"cells": cells, "metadata": {}, "nbformat": nbformat, "nbformat_minor": 5}
with open(path, "w", encoding="utf-8") as fh:
json.dump(nb, fh)
def _write_docx(path, document_xml):
with zipfile.ZipFile(path, "w") as z:
z.writestr("[Content_Types].xml", "")
z.writestr("word/document.xml", document_xml)
def _write_xlsx(path, *, workbook, rels, shared, sheets):
"""sheets: dict of part-name -> xml string."""
with zipfile.ZipFile(path, "w") as z:
z.writestr("xl/workbook.xml", workbook)
z.writestr("xl/_rels/workbook.xml.rels", rels)
if shared is not None:
z.writestr("xl/sharedStrings.xml", shared)
for part, xml in sheets.items():
z.writestr(part, xml)
_NS_W = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
_NS_S = "http://schemas.openxmlformats.org/spreadsheetml/2006/main"
# ---------------------------------------------------------------------------
# is_extractable_document
# ---------------------------------------------------------------------------
class TestIsExtractable(unittest.TestCase):
def test_recognized_extensions(self):
self.assertTrue(is_extractable_document("a.ipynb"))
self.assertTrue(is_extractable_document("/x/B.DOCX"))
self.assertTrue(is_extractable_document("report.xlsx"))
def test_unrecognized_extensions(self):
self.assertFalse(is_extractable_document("a.py"))
self.assertFalse(is_extractable_document("a.pdf"))
self.assertFalse(is_extractable_document("a.txt"))
# ---------------------------------------------------------------------------
# Notebooks (.ipynb) — #10733
# ---------------------------------------------------------------------------
class TestNotebookExtraction(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp(prefix="rex_nb_")
def tearDown(self):
import shutil
shutil.rmtree(self.tmp, ignore_errors=True)
def test_markdown_and_code_in_order(self):
p = os.path.join(self.tmp, "nb.ipynb")
_write_notebook(p, [
{"cell_type": "markdown", "source": ["# Title\n", "para"]},
{"cell_type": "code", "source": "x = 1\nprint(x)",
"outputs": [{"output_type": "stream", "text": ["1\n"]}],
"execution_count": 1},
])
text = extract_document_text(p)
self.assertIn("# Title", text)
self.assertIn("print(x)", text)
# Output payloads must NOT leak into the extracted text.
self.assertNotIn("output_type", text)
self.assertNotIn("execution_count", text)
# Order preserved: markdown before code.
self.assertLess(text.index("Title"), text.index("print(x)"))
def test_string_source_form(self):
p = os.path.join(self.tmp, "nb2.ipynb")
_write_notebook(p, [{"cell_type": "code", "source": "single string source"}])
self.assertIn("single string source", extract_document_text(p))
def test_legacy_worksheets_form(self):
p = os.path.join(self.tmp, "nb3.ipynb")
nb = {"worksheets": [{"cells": [
{"cell_type": "code", "input": "ignored", "source": "legacy cell"}]}],
"nbformat": 3}
with open(p, "w") as fh:
json.dump(nb, fh)
self.assertIn("legacy cell", extract_document_text(p))
def test_malformed_notebook_raises(self):
p = os.path.join(self.tmp, "bad.ipynb")
with open(p, "w") as fh:
fh.write("{ not valid json")
with self.assertRaises(ExtractionError):
extract_document_text(p)
def test_empty_cells_raises(self):
p = os.path.join(self.tmp, "empty.ipynb")
_write_notebook(p, [])
with self.assertRaises(ExtractionError):
extract_document_text(p)
# ---------------------------------------------------------------------------
# Word documents (.docx) — #10737
# ---------------------------------------------------------------------------
class TestDocxExtraction(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp(prefix="rex_docx_")
def tearDown(self):
import shutil
shutil.rmtree(self.tmp, ignore_errors=True)
def _doc(self, body):
return (f''
f'{body}')
def test_paragraphs_and_runs(self):
p = os.path.join(self.tmp, "d.docx")
_write_docx(p, self._doc(
'Hello World'
'Second'))
text = extract_document_text(p)
self.assertIn("Hello World", text)
self.assertIn("Second", text)
def test_tabs_and_breaks(self):
p = os.path.join(self.tmp, "d2.docx")
_write_docx(p, self._doc(
'ABC'))
text = extract_document_text(p)
self.assertIn("A\tB", text)
self.assertIn("C", text)
def test_not_a_zip_raises(self):
p = os.path.join(self.tmp, "bad.docx")
with open(p, "wb") as fh:
fh.write(b"plain bytes, not a zip")
with self.assertRaises(ExtractionError):
extract_document_text(p)
def test_missing_document_xml_raises(self):
p = os.path.join(self.tmp, "nodoc.docx")
with zipfile.ZipFile(p, "w") as z:
z.writestr("other.xml", "")
with self.assertRaises(ExtractionError):
extract_document_text(p)
# ---------------------------------------------------------------------------
# Excel workbooks (.xlsx) — #10740
# ---------------------------------------------------------------------------
class TestXlsxExtraction(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp(prefix="rex_xlsx_")
def tearDown(self):
import shutil
shutil.rmtree(self.tmp, ignore_errors=True)
def _build(self, path, *, include_hidden=True):
r = "http://schemas.openxmlformats.org/officeDocument/2006/relationships"
hidden_sheet = (f'') if include_hidden else ""
workbook = (
f''
f'{hidden_sheet}'
f'')
rels = (
''
''
''
'')
shared = (f'NameScore'
f'Alice')
sheet1 = (
f''
'01
'
'295
'
'')
sheet2 = (f''
'SECRETDATA
'
'')
_write_xlsx(path, workbook=workbook, rels=rels, shared=shared,
sheets={"xl/worksheets/sheet1.xml": sheet1,
"xl/worksheets/sheet2.xml": sheet2})
def test_visible_sheet_content(self):
p = os.path.join(self.tmp, "wb.xlsx")
self._build(p)
text = extract_document_text(p)
self.assertIn("Data", text) # sheet label
self.assertIn("Name\tScore", text) # shared-string header row
self.assertIn("Alice\t95", text) # string + numeric cells
def test_hidden_sheet_omitted(self):
p = os.path.join(self.tmp, "wb2.xlsx")
self._build(p)
text = extract_document_text(p)
self.assertNotIn("SECRETDATA", text)
self.assertNotIn("Hidden", text)
def test_not_a_zip_raises(self):
p = os.path.join(self.tmp, "bad.xlsx")
with open(p, "wb") as fh:
fh.write(b"nope")
with self.assertRaises(ExtractionError):
extract_document_text(p)
# ---------------------------------------------------------------------------
# read_file_tool integration
# ---------------------------------------------------------------------------
class TestReadFileToolIntegration(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp(prefix="rex_int_")
def tearDown(self):
import shutil
shutil.rmtree(self.tmp, ignore_errors=True)
def test_notebook_read_is_line_numbered(self):
p = os.path.join(self.tmp, "nb.ipynb")
_write_notebook(p, [
{"cell_type": "markdown", "source": "# H"},
{"cell_type": "code", "source": "print(1)"},
])
res = json.loads(read_file_tool(p))
self.assertTrue(res.get("extracted_document"))
self.assertIn("1|", res["content"]) # line-number gutter
self.assertIn("print(1)", res["content"])
def test_pagination(self):
p = os.path.join(self.tmp, "nb.ipynb")
_write_notebook(p, [
{"cell_type": "code", "source": "a\nb\nc\nd\ne\nf"},
])
res = json.loads(read_file_tool(p, offset=1, limit=2))
self.assertTrue(res.get("truncated"))
self.assertIn("offset=3", res.get("hint", ""))
# Only first 2 lines present.
self.assertIn("1|# ── Code cell 1 ──", res["content"])
def test_corrupt_docx_falls_through_to_binary_guard(self):
p = os.path.join(self.tmp, "bad.docx")
with open(p, "wb") as fh:
fh.write(b"not a zip")
res = json.loads(read_file_tool(p))
# Should NOT crash; falls through to the binary-extension guard.
self.assertIn("error", res)
self.assertIn("binary", res["error"].lower())
def test_docx_read_extracts(self):
p = os.path.join(self.tmp, "d.docx")
_write_docx(p, (f''
'Report body'
''))
res = json.loads(read_file_tool(p))
self.assertTrue(res.get("extracted_document"))
self.assertIn("Report body", res["content"])
if __name__ == "__main__":
unittest.main()