mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
When the terminal shrinks, already-printed box-drawing rules (response, reasoning, streaming TTS, background-task Panels) reflow into multiple narrower rows — visible as duplicated horizontal separators / ghost lines in scrollback. Similarly, prompt_toolkit redraws a fresh status bar on SIGWINCH on top of one the terminal just reflowed, producing double-bar artifacts on column shrink. Two surgical changes: 1. Decorative scrollback boxes now use a new `HermesCLI._scrollback_box_width()` helper that clamps to `max(32, min(width, 56))`. The live TUI footer is unaffected and still uses the full width. Covers: streaming response box (open + close), reasoning box (open + close, both streaming and post-stream paths), streaming-TTS box close, final-response Rich Panel, and the background-task Rich Panel. 2. `_recover_after_resize()` now also sets a new `_status_bar_suppressed_after_resize` flag so the dynamic status bar and both input separator rules stay hidden until the next user input. The flag is cleared in the process loop the moment the user submits their next prompt, restoring chrome cleanly. Tests: - New `test_input_rules_hide_after_resize_until_next_input` covers the flag's effect on rule heights. - New `test_scrollback_box_width_caps_to_resize_safe_value` covers the helper at floor / cap / mid-range / overflow. - Existing resize-recovery test extended to assert the flag flips. Refs: #18449 #19280 #22976 Salvage of #24403. Co-authored-by: Szymonclawd <szymonclawd@mac.home>
649 lines
23 KiB
Python
649 lines
23 KiB
Python
import time
|
||
from datetime import datetime, timedelta
|
||
from types import SimpleNamespace
|
||
from unittest.mock import MagicMock, patch
|
||
|
||
from cli import HermesCLI
|
||
|
||
|
||
def _make_cli(model: str = "anthropic/claude-sonnet-4-20250514"):
|
||
cli_obj = HermesCLI.__new__(HermesCLI)
|
||
cli_obj.model = model
|
||
cli_obj.session_start = datetime.now() - timedelta(minutes=14, seconds=32)
|
||
cli_obj.conversation_history = [{"role": "user", "content": "hi"}]
|
||
cli_obj.agent = None
|
||
return cli_obj
|
||
|
||
|
||
def _attach_agent(
|
||
cli_obj,
|
||
*,
|
||
input_tokens: int | None = None,
|
||
output_tokens: int | None = None,
|
||
cache_read_tokens: int = 0,
|
||
cache_write_tokens: int = 0,
|
||
prompt_tokens: int,
|
||
completion_tokens: int,
|
||
total_tokens: int,
|
||
api_calls: int,
|
||
context_tokens: int,
|
||
context_length: int,
|
||
compressions: int = 0,
|
||
):
|
||
cli_obj.agent = SimpleNamespace(
|
||
model=cli_obj.model,
|
||
provider="anthropic" if cli_obj.model.startswith("anthropic/") else None,
|
||
base_url="",
|
||
session_input_tokens=input_tokens if input_tokens is not None else prompt_tokens,
|
||
session_output_tokens=output_tokens if output_tokens is not None else completion_tokens,
|
||
session_cache_read_tokens=cache_read_tokens,
|
||
session_cache_write_tokens=cache_write_tokens,
|
||
session_prompt_tokens=prompt_tokens,
|
||
session_completion_tokens=completion_tokens,
|
||
session_total_tokens=total_tokens,
|
||
session_api_calls=api_calls,
|
||
get_rate_limit_state=lambda: None,
|
||
context_compressor=SimpleNamespace(
|
||
last_prompt_tokens=context_tokens,
|
||
context_length=context_length,
|
||
compression_count=compressions,
|
||
),
|
||
)
|
||
return cli_obj
|
||
|
||
|
||
class TestCLIStatusBar:
|
||
def test_context_style_thresholds(self):
|
||
cli_obj = _make_cli()
|
||
|
||
assert cli_obj._status_bar_context_style(None) == "class:status-bar-dim"
|
||
assert cli_obj._status_bar_context_style(10) == "class:status-bar-good"
|
||
assert cli_obj._status_bar_context_style(50) == "class:status-bar-warn"
|
||
assert cli_obj._status_bar_context_style(81) == "class:status-bar-bad"
|
||
assert cli_obj._status_bar_context_style(95) == "class:status-bar-critical"
|
||
|
||
def test_build_status_bar_text_for_wide_terminal(self):
|
||
cli_obj = _attach_agent(
|
||
_make_cli(),
|
||
prompt_tokens=10_230,
|
||
completion_tokens=2_220,
|
||
total_tokens=12_450,
|
||
api_calls=7,
|
||
context_tokens=12_450,
|
||
context_length=200_000,
|
||
)
|
||
|
||
text = cli_obj._build_status_bar_text(width=120)
|
||
|
||
assert "claude-sonnet-4-20250514" in text
|
||
assert "12.4K/200K" in text
|
||
assert "6%" in text
|
||
assert "$0.06" not in text # cost hidden by default
|
||
assert "15m" in text
|
||
|
||
def test_input_height_counts_wide_characters_using_cell_width(self):
|
||
cli_obj = _make_cli()
|
||
|
||
class _Doc:
|
||
lines = ["你" * 10]
|
||
|
||
class _Buffer:
|
||
document = _Doc()
|
||
|
||
input_area = SimpleNamespace(buffer=_Buffer())
|
||
|
||
def _input_height():
|
||
try:
|
||
from prompt_toolkit.application import get_app
|
||
from prompt_toolkit.utils import get_cwidth
|
||
|
||
doc = input_area.buffer.document
|
||
prompt_width = max(2, get_cwidth(cli_obj._get_tui_prompt_text()))
|
||
try:
|
||
available_width = get_app().output.get_size().columns - prompt_width
|
||
except Exception:
|
||
import shutil
|
||
available_width = shutil.get_terminal_size((80, 24)).columns - prompt_width
|
||
if available_width < 10:
|
||
available_width = 40
|
||
visual_lines = 0
|
||
for line in doc.lines:
|
||
line_width = get_cwidth(line)
|
||
if line_width <= 0:
|
||
visual_lines += 1
|
||
else:
|
||
visual_lines += max(1, -(-line_width // available_width))
|
||
return min(max(visual_lines, 1), 8)
|
||
except Exception:
|
||
return 1
|
||
|
||
mock_app = MagicMock()
|
||
mock_app.output.get_size.return_value = MagicMock(columns=14)
|
||
with patch.object(HermesCLI, "_get_tui_prompt_text", return_value="❯ "), \
|
||
patch("prompt_toolkit.application.get_app", return_value=mock_app):
|
||
assert _input_height() == 2
|
||
|
||
def test_input_height_uses_prompt_toolkit_width_over_shutil(self):
|
||
cli_obj = _make_cli()
|
||
|
||
class _Doc:
|
||
lines = ["你" * 10]
|
||
|
||
class _Buffer:
|
||
document = _Doc()
|
||
|
||
input_area = SimpleNamespace(buffer=_Buffer())
|
||
|
||
def _input_height():
|
||
try:
|
||
from prompt_toolkit.application import get_app
|
||
from prompt_toolkit.utils import get_cwidth
|
||
|
||
doc = input_area.buffer.document
|
||
prompt_width = max(2, get_cwidth(cli_obj._get_tui_prompt_text()))
|
||
try:
|
||
available_width = get_app().output.get_size().columns - prompt_width
|
||
except Exception:
|
||
import shutil
|
||
available_width = shutil.get_terminal_size((80, 24)).columns - prompt_width
|
||
if available_width < 10:
|
||
available_width = 40
|
||
visual_lines = 0
|
||
for line in doc.lines:
|
||
line_width = get_cwidth(line)
|
||
if line_width <= 0:
|
||
visual_lines += 1
|
||
else:
|
||
visual_lines += max(1, -(-line_width // available_width))
|
||
return min(max(visual_lines, 1), 8)
|
||
except Exception:
|
||
return 1
|
||
|
||
mock_app = MagicMock()
|
||
mock_app.output.get_size.return_value = MagicMock(columns=14)
|
||
with patch.object(HermesCLI, "_get_tui_prompt_text", return_value="❯ "), \
|
||
patch("prompt_toolkit.application.get_app", return_value=mock_app), \
|
||
patch("shutil.get_terminal_size") as mock_shutil:
|
||
assert _input_height() == 2
|
||
mock_shutil.assert_not_called()
|
||
|
||
def test_build_status_bar_text_no_cost_in_status_bar(self):
|
||
cli_obj = _attach_agent(
|
||
_make_cli(),
|
||
prompt_tokens=10000,
|
||
completion_tokens=5000,
|
||
total_tokens=15000,
|
||
api_calls=7,
|
||
context_tokens=50000,
|
||
context_length=200_000,
|
||
)
|
||
|
||
text = cli_obj._build_status_bar_text(width=120)
|
||
assert "$" not in text # cost is never shown in status bar
|
||
|
||
def test_build_status_bar_text_collapses_for_narrow_terminal(self):
|
||
cli_obj = _attach_agent(
|
||
_make_cli(),
|
||
prompt_tokens=10000,
|
||
completion_tokens=2400,
|
||
total_tokens=12400,
|
||
api_calls=7,
|
||
context_tokens=12400,
|
||
context_length=200_000,
|
||
)
|
||
|
||
text = cli_obj._build_status_bar_text(width=60)
|
||
|
||
assert "⚕" in text
|
||
assert "$0.06" not in text # cost hidden by default
|
||
assert "15m" in text
|
||
assert "200K" not in text
|
||
|
||
def test_build_status_bar_text_handles_missing_agent(self):
|
||
cli_obj = _make_cli()
|
||
|
||
text = cli_obj._build_status_bar_text(width=100)
|
||
|
||
assert "⚕" in text
|
||
assert "claude-sonnet-4-20250514" in text
|
||
|
||
def test_compression_count_shown_in_wide_status_bar(self):
|
||
cli_obj = _attach_agent(
|
||
_make_cli(),
|
||
prompt_tokens=10_230,
|
||
completion_tokens=2_220,
|
||
total_tokens=12_450,
|
||
api_calls=7,
|
||
context_tokens=12_450,
|
||
context_length=200_000,
|
||
compressions=3,
|
||
)
|
||
|
||
text = cli_obj._build_status_bar_text(width=120)
|
||
|
||
assert "🗜️ 3" in text
|
||
|
||
def test_compression_count_hidden_when_zero(self):
|
||
cli_obj = _attach_agent(
|
||
_make_cli(),
|
||
prompt_tokens=10_230,
|
||
completion_tokens=2_220,
|
||
total_tokens=12_450,
|
||
api_calls=7,
|
||
context_tokens=12_450,
|
||
context_length=200_000,
|
||
compressions=0,
|
||
)
|
||
|
||
text = cli_obj._build_status_bar_text(width=120)
|
||
|
||
assert "🗜️" not in text
|
||
|
||
def test_compression_count_shown_in_medium_status_bar(self):
|
||
cli_obj = _attach_agent(
|
||
_make_cli(),
|
||
prompt_tokens=10_000,
|
||
completion_tokens=2_400,
|
||
total_tokens=12_400,
|
||
api_calls=7,
|
||
context_tokens=12_400,
|
||
context_length=200_000,
|
||
compressions=2,
|
||
)
|
||
|
||
text = cli_obj._build_status_bar_text(width=60)
|
||
|
||
assert "🗜️ 2" in text
|
||
|
||
def test_compression_count_hidden_in_narrow_status_bar(self):
|
||
cli_obj = _attach_agent(
|
||
_make_cli(),
|
||
prompt_tokens=10_000,
|
||
completion_tokens=2_400,
|
||
total_tokens=12_400,
|
||
api_calls=7,
|
||
context_tokens=12_400,
|
||
context_length=200_000,
|
||
compressions=5,
|
||
)
|
||
|
||
text = cli_obj._build_status_bar_text(width=50)
|
||
|
||
assert "🗜️" not in text
|
||
|
||
def test_compression_count_style_thresholds(self):
|
||
cli_obj = _make_cli()
|
||
|
||
assert cli_obj._compression_count_style(1) == "class:status-bar-dim"
|
||
assert cli_obj._compression_count_style(4) == "class:status-bar-dim"
|
||
assert cli_obj._compression_count_style(5) == "class:status-bar-warn"
|
||
assert cli_obj._compression_count_style(9) == "class:status-bar-warn"
|
||
assert cli_obj._compression_count_style(10) == "class:status-bar-bad"
|
||
assert cli_obj._compression_count_style(25) == "class:status-bar-bad"
|
||
|
||
def test_compression_count_in_wide_fragments(self):
|
||
cli_obj = _attach_agent(
|
||
_make_cli(),
|
||
prompt_tokens=10_230,
|
||
completion_tokens=2_220,
|
||
total_tokens=12_450,
|
||
api_calls=7,
|
||
context_tokens=12_450,
|
||
context_length=200_000,
|
||
compressions=7,
|
||
)
|
||
cli_obj._status_bar_visible = True
|
||
|
||
frags = cli_obj._get_status_bar_fragments()
|
||
frag_texts = [text for _, text in frags]
|
||
|
||
assert "🗜️ 7" in frag_texts
|
||
frag_styles = {text: style for style, text in frags}
|
||
assert frag_styles["🗜️ 7"] == "class:status-bar-warn"
|
||
|
||
def test_compression_count_absent_from_fragments_when_zero(self):
|
||
cli_obj = _attach_agent(
|
||
_make_cli(),
|
||
prompt_tokens=10_230,
|
||
completion_tokens=2_220,
|
||
total_tokens=12_450,
|
||
api_calls=7,
|
||
context_tokens=12_450,
|
||
context_length=200_000,
|
||
compressions=0,
|
||
)
|
||
cli_obj._status_bar_visible = True
|
||
|
||
frags = cli_obj._get_status_bar_fragments()
|
||
frag_texts = [text for _, text in frags]
|
||
|
||
assert not any("🗜️" in t for t in frag_texts)
|
||
|
||
def test_minimal_tui_chrome_threshold(self):
|
||
cli_obj = _make_cli()
|
||
|
||
assert cli_obj._use_minimal_tui_chrome(width=63) is True
|
||
assert cli_obj._use_minimal_tui_chrome(width=64) is False
|
||
|
||
def test_bottom_input_rule_hides_on_narrow_terminals(self):
|
||
cli_obj = _make_cli()
|
||
|
||
assert cli_obj._tui_input_rule_height("top", width=50) == 1
|
||
assert cli_obj._tui_input_rule_height("bottom", width=50) == 0
|
||
assert cli_obj._tui_input_rule_height("bottom", width=90) == 1
|
||
|
||
def test_input_rules_hide_after_resize_until_next_input(self):
|
||
"""When _status_bar_suppressed_after_resize is set, both rules hide.
|
||
|
||
See _recover_after_resize — column shrink reflows already-rendered
|
||
bars into scrollback, so we hide the separators until the user
|
||
submits the next input, at which point the flag is cleared.
|
||
"""
|
||
cli_obj = _make_cli()
|
||
cli_obj._status_bar_suppressed_after_resize = True
|
||
|
||
assert cli_obj._tui_input_rule_height("top", width=90) == 0
|
||
assert cli_obj._tui_input_rule_height("bottom", width=90) == 0
|
||
|
||
cli_obj._status_bar_suppressed_after_resize = False
|
||
assert cli_obj._tui_input_rule_height("top", width=90) == 1
|
||
assert cli_obj._tui_input_rule_height("bottom", width=90) == 1
|
||
|
||
def test_scrollback_box_width_caps_to_resize_safe_value(self):
|
||
"""Decorative scrollback boxes clamp to a width small enough that
|
||
moderate terminal shrinks don't cause reflow into scrollback."""
|
||
from cli import HermesCLI
|
||
|
||
# Floor at 32 — narrow terminals still get something usable.
|
||
assert HermesCLI._scrollback_box_width(20) == 32
|
||
assert HermesCLI._scrollback_box_width(32) == 32
|
||
# Cap at 56 — wide terminals don't get full-width boxes.
|
||
assert HermesCLI._scrollback_box_width(80) == 56
|
||
assert HermesCLI._scrollback_box_width(120) == 56
|
||
assert HermesCLI._scrollback_box_width(200) == 56
|
||
# Mid-range passes through up to the cap.
|
||
assert HermesCLI._scrollback_box_width(48) == 48
|
||
|
||
def test_agent_spacer_reclaimed_on_narrow_terminals(self):
|
||
cli_obj = _make_cli()
|
||
cli_obj._agent_running = True
|
||
|
||
assert cli_obj._agent_spacer_height(width=50) == 0
|
||
assert cli_obj._agent_spacer_height(width=90) == 1
|
||
cli_obj._agent_running = False
|
||
assert cli_obj._agent_spacer_height(width=90) == 0
|
||
|
||
def test_spinner_line_hidden_on_narrow_terminals(self):
|
||
cli_obj = _make_cli()
|
||
cli_obj._spinner_text = "thinking"
|
||
|
||
assert cli_obj._spinner_widget_height(width=50) == 0
|
||
assert cli_obj._spinner_widget_height(width=90) == 1
|
||
cli_obj._spinner_text = ""
|
||
assert cli_obj._spinner_widget_height(width=90) == 0
|
||
|
||
def test_spinner_height_uses_display_width_for_wide_characters(self):
|
||
cli_obj = _make_cli()
|
||
cli_obj._spinner_text = "你" * 40
|
||
cli_obj._tool_start_time = 0
|
||
|
||
assert cli_obj._spinner_widget_height(width=64) == 2
|
||
|
||
def test_spinner_elapsed_format_is_fixed_width_to_reduce_wrap_jitter(self):
|
||
cli_obj = _make_cli()
|
||
cli_obj._spinner_text = "running tool"
|
||
|
||
# <60s path
|
||
cli_obj._tool_start_time = time.monotonic() - 9.2
|
||
short = cli_obj._render_spinner_text()
|
||
|
||
# >=60s path
|
||
cli_obj._tool_start_time = time.monotonic() - 65.2
|
||
long = cli_obj._render_spinner_text()
|
||
|
||
short_elapsed = short.split("(", 1)[1].rstrip(")")
|
||
long_elapsed = long.split("(", 1)[1].rstrip(")")
|
||
|
||
assert len(short_elapsed) == len(long_elapsed)
|
||
assert "m" in long_elapsed and "s" in long_elapsed
|
||
|
||
def test_voice_status_bar_compacts_on_narrow_terminals(self):
|
||
cli_obj = _make_cli()
|
||
cli_obj._voice_mode = True
|
||
cli_obj._voice_recording = False
|
||
cli_obj._voice_processing = False
|
||
cli_obj._voice_tts = True
|
||
cli_obj._voice_continuous = True
|
||
|
||
fragments = cli_obj._get_voice_status_fragments(width=50)
|
||
|
||
assert fragments == [("class:voice-status", " 🎤 Ctrl+B ")]
|
||
|
||
def test_voice_recording_status_bar_compacts_on_narrow_terminals(self):
|
||
cli_obj = _make_cli()
|
||
cli_obj._voice_mode = True
|
||
cli_obj._voice_recording = True
|
||
cli_obj._voice_processing = False
|
||
|
||
fragments = cli_obj._get_voice_status_fragments(width=50)
|
||
|
||
assert fragments == [("class:voice-status-recording", " ● REC ")]
|
||
|
||
# Round-13 Copilot review regressions on #19835. The label in voice
|
||
# status bar / recording hint / placeholder must render the
|
||
# configured ``voice.record_key`` — not hardcoded Ctrl+B. Pinning
|
||
# the cache (``set_voice_record_key_cache``) keeps display in sync
|
||
# with the prompt_toolkit binding without re-reading config on
|
||
# every render.
|
||
def test_voice_status_bar_renders_configured_ctrl_letter(self):
|
||
cli_obj = _make_cli()
|
||
cli_obj._voice_mode = True
|
||
cli_obj._voice_recording = False
|
||
cli_obj._voice_processing = False
|
||
cli_obj._voice_tts = False
|
||
cli_obj._voice_continuous = False
|
||
cli_obj.set_voice_record_key_cache("ctrl+o")
|
||
|
||
wide = cli_obj._get_voice_status_fragments(width=120)
|
||
assert any("Ctrl+O to record" in text for _cls, text in wide)
|
||
|
||
compact = cli_obj._get_voice_status_fragments(width=50)
|
||
assert compact == [("class:voice-status", " 🎤 Ctrl+O ")]
|
||
|
||
def test_voice_recording_status_bar_renders_configured_named_key(self):
|
||
cli_obj = _make_cli()
|
||
cli_obj._voice_mode = True
|
||
cli_obj._voice_recording = True
|
||
cli_obj._voice_processing = False
|
||
cli_obj.set_voice_record_key_cache("ctrl+space")
|
||
|
||
fragments = cli_obj._get_voice_status_fragments(width=120)
|
||
|
||
assert fragments == [("class:voice-status-recording", " ● REC Ctrl+Space to stop ")]
|
||
|
||
def test_voice_status_bar_falls_back_to_ctrl_b_without_cache(self):
|
||
cli_obj = _make_cli()
|
||
cli_obj._voice_mode = True
|
||
cli_obj._voice_recording = False
|
||
cli_obj._voice_processing = False
|
||
cli_obj._voice_tts = False
|
||
cli_obj._voice_continuous = False
|
||
# No cache set — mirrors pre-startup state; fall back to
|
||
# documented Ctrl+B default (Copilot round-13 review).
|
||
|
||
compact = cli_obj._get_voice_status_fragments(width=50)
|
||
|
||
assert compact == [("class:voice-status", " 🎤 Ctrl+B ")]
|
||
|
||
def test_voice_status_bar_renders_malformed_config_as_default(self):
|
||
cli_obj = _make_cli()
|
||
cli_obj._voice_mode = True
|
||
cli_obj._voice_recording = False
|
||
cli_obj._voice_processing = False
|
||
cli_obj._voice_tts = False
|
||
cli_obj._voice_continuous = False
|
||
# Non-string / typoed configs fall through the formatter to the
|
||
# documented default so the status bar never advertises an
|
||
# invalid shortcut.
|
||
cli_obj.set_voice_record_key_cache(True)
|
||
|
||
compact = cli_obj._get_voice_status_fragments(width=50)
|
||
|
||
assert compact == [("class:voice-status", " 🎤 Ctrl+B ")]
|
||
|
||
|
||
class TestCLIUsageReport:
|
||
def test_show_usage_includes_estimated_cost(self, capsys):
|
||
cli_obj = _attach_agent(
|
||
_make_cli(),
|
||
prompt_tokens=10_230,
|
||
completion_tokens=2_220,
|
||
total_tokens=12_450,
|
||
api_calls=7,
|
||
context_tokens=12_450,
|
||
context_length=200_000,
|
||
compressions=1,
|
||
)
|
||
cli_obj.verbose = False
|
||
|
||
cli_obj._show_usage()
|
||
output = capsys.readouterr().out
|
||
|
||
assert "Model:" in output
|
||
assert "Cost status:" in output
|
||
assert "Cost source:" in output
|
||
assert "Total cost:" in output
|
||
assert "$" in output
|
||
assert "0.064" in output
|
||
assert "Session duration:" in output
|
||
assert "Compressions:" in output
|
||
|
||
def test_show_usage_marks_unknown_pricing(self, capsys):
|
||
cli_obj = _attach_agent(
|
||
_make_cli(model="local/my-custom-model"),
|
||
prompt_tokens=1_000,
|
||
completion_tokens=500,
|
||
total_tokens=1_500,
|
||
api_calls=1,
|
||
context_tokens=1_000,
|
||
context_length=32_000,
|
||
)
|
||
cli_obj.verbose = False
|
||
|
||
cli_obj._show_usage()
|
||
output = capsys.readouterr().out
|
||
|
||
assert "Total cost:" in output
|
||
assert "n/a" in output
|
||
assert "Pricing unknown for local/my-custom-model" in output
|
||
|
||
def test_zero_priced_provider_models_stay_unknown(self, capsys):
|
||
cli_obj = _attach_agent(
|
||
_make_cli(model="glm-5"),
|
||
prompt_tokens=1_000,
|
||
completion_tokens=500,
|
||
total_tokens=1_500,
|
||
api_calls=1,
|
||
context_tokens=1_000,
|
||
context_length=32_000,
|
||
)
|
||
cli_obj.verbose = False
|
||
|
||
cli_obj._show_usage()
|
||
output = capsys.readouterr().out
|
||
|
||
assert "Total cost:" in output
|
||
assert "n/a" in output
|
||
assert "Pricing unknown for glm-5" in output
|
||
|
||
|
||
class TestStatusBarWidthSource:
|
||
"""Ensure status bar fragments don't overflow the terminal width."""
|
||
|
||
def _make_wide_cli(self):
|
||
from datetime import datetime, timedelta
|
||
cli_obj = _attach_agent(
|
||
_make_cli(),
|
||
prompt_tokens=100_000,
|
||
completion_tokens=5_000,
|
||
total_tokens=105_000,
|
||
api_calls=20,
|
||
context_tokens=100_000,
|
||
context_length=200_000,
|
||
)
|
||
cli_obj._status_bar_visible = True
|
||
return cli_obj
|
||
|
||
def test_fragments_fit_within_announced_width(self):
|
||
"""Total fragment text length must not exceed the width used to build them."""
|
||
from unittest.mock import MagicMock, patch
|
||
cli_obj = self._make_wide_cli()
|
||
|
||
for width in (40, 52, 76, 80, 120, 200):
|
||
mock_app = MagicMock()
|
||
mock_app.output.get_size.return_value = MagicMock(columns=width)
|
||
|
||
with patch("prompt_toolkit.application.get_app", return_value=mock_app):
|
||
frags = cli_obj._get_status_bar_fragments()
|
||
|
||
total_text = "".join(text for _, text in frags)
|
||
display_width = cli_obj._status_bar_display_width(total_text)
|
||
assert display_width <= width + 4, ( # +4 for minor padding chars
|
||
f"At width={width}, fragment total {display_width} cells overflows "
|
||
f"({total_text!r})"
|
||
)
|
||
|
||
def test_fragments_use_pt_width_over_shutil(self):
|
||
"""When prompt_toolkit reports a width, shutil.get_terminal_size must not be used."""
|
||
from unittest.mock import MagicMock, patch
|
||
cli_obj = self._make_wide_cli()
|
||
|
||
mock_app = MagicMock()
|
||
mock_app.output.get_size.return_value = MagicMock(columns=120)
|
||
|
||
with patch("prompt_toolkit.application.get_app", return_value=mock_app) as mock_get_app, \
|
||
patch("shutil.get_terminal_size") as mock_shutil:
|
||
cli_obj._get_status_bar_fragments()
|
||
|
||
mock_shutil.assert_not_called()
|
||
|
||
def test_fragments_fall_back_to_shutil_when_no_app(self):
|
||
"""Outside a TUI context (no running app), shutil must be used as fallback."""
|
||
from unittest.mock import MagicMock, patch
|
||
cli_obj = self._make_wide_cli()
|
||
|
||
with patch("prompt_toolkit.application.get_app", side_effect=Exception("no app")), \
|
||
patch("shutil.get_terminal_size", return_value=MagicMock(columns=100)) as mock_shutil:
|
||
frags = cli_obj._get_status_bar_fragments()
|
||
|
||
mock_shutil.assert_called()
|
||
assert len(frags) > 0
|
||
|
||
def test_build_status_bar_text_uses_pt_width(self):
|
||
"""_build_status_bar_text() must also prefer prompt_toolkit width."""
|
||
from unittest.mock import MagicMock, patch
|
||
cli_obj = self._make_wide_cli()
|
||
|
||
mock_app = MagicMock()
|
||
mock_app.output.get_size.return_value = MagicMock(columns=80)
|
||
|
||
with patch("prompt_toolkit.application.get_app", return_value=mock_app), \
|
||
patch("shutil.get_terminal_size") as mock_shutil:
|
||
text = cli_obj._build_status_bar_text() # no explicit width
|
||
|
||
mock_shutil.assert_not_called()
|
||
assert isinstance(text, str)
|
||
assert len(text) > 0
|
||
|
||
def test_explicit_width_skips_pt_lookup(self):
|
||
"""An explicit width= argument must bypass both PT and shutil lookups."""
|
||
from unittest.mock import patch
|
||
cli_obj = self._make_wide_cli()
|
||
|
||
with patch("prompt_toolkit.application.get_app") as mock_get_app, \
|
||
patch("shutil.get_terminal_size") as mock_shutil:
|
||
text = cli_obj._build_status_bar_text(width=100)
|
||
|
||
mock_get_app.assert_not_called()
|
||
mock_shutil.assert_not_called()
|
||
assert len(text) > 0
|