hermes-agent/tests/honcho_plugin/test_async_memory.py
Eri Barrett ba9e3a491b
feat(memory): Honcho OAuth connect — desktop and CLI flows + token refresh (#44335)
* feat(memory): OAuth token storage and refresh for the Honcho provider

* feat(memory): refresh the Honcho OAuth token in the client and session

* feat(memory): zero-CLI loopback OAuth authorization flow

* feat(memory): generic memory-provider OAuth connect endpoints

* feat(desktop): memory-provider OAuth connect link

* feat(memory): CLI OAuth sign-in with source-tagged authorize links

* fix(memory): IP-literal loopback redirect and consent config_path on the authorize link

* fix(memory): profile-scope the memory-provider OAuth endpoints

* refactor(desktop): generic memory-provider OAuth client functions

* docs(memory): trim OAuth module docstrings to the invariants

* docs(memory): document OAuth connect as an optional auth method

* fix(memory): send home-relative display path to consent, not the absolute path

* perf(memory): cache OAuth token expiry in memory to skip the hot-path disk read

* fix(memory): log OAuth refresh failures at warning, not debug

* feat(memory): fall back to an OS-assigned loopback port when 8765 is taken

* test(memory): cover the desktop Connect launcher, status, and provider dispatch

* fix(desktop): keep the memory-provider dropdown one size regardless of connect state

* fix(desktop): move the memory connect link to the description line, leaving the dropdown untouched

* refactor(memory): move OAuth connect routes out of web_server into a memory-layer router

* refactor(desktop): import MemoryConnect directly, drop the single-export barrel

* fix(memory): launch CLI OAuth sign-in right after the auth choice, not after the wizard

* fix(desktop): auto-clear the OAuth error state instead of leaving it sticky

* test(honcho): isolate auth-method prompt from deployment-shape wizard tests

main's wizard suite scripts the cloud prompts without the OAuth auth-method step; auto-answer it in the shared helper so the answer lists stay shape-only.

* docs(honcho): document query-adaptive reasoning level (reasoningHeuristic)

README never mentioned reasoningHeuristic and listed reasoningLevelCap as an orphaned cap with the wrong default (— vs "high"). Add the query-adaptive scaling note + the reasoningHeuristic/reasoningLevelCap rows (grouped under Dialectic & Reasoning), matching the wording already on the hosted honcho.md page, and add a pointer from the memory-providers overview.

* fix(honcho): default the CLI peer prompt to the OAuth consent name

The CLI runs the grant with apply_config=False, so the peerName the user just entered at consent was dropped and the wizard's 'Your name' prompt fell back to $USER. Surface it as a transient OAuthCredential.consent_peer_name (set even when config isn't merged) and seed the prompt default from it.

* feat(honcho): split OAuth client_id by surface (cli=hermes-agent, desktop=hermes-desktop)

resolve_endpoints now picks the client_id from the initiating surface and
threads it through authorize -> token exchange -> persisted grant -> refresh,
so the CLI and desktop register as distinct OAuth clients. Surface-specific
env overrides (HONCHO_OAUTH_CLIENT_ID_CLI/_DESKTOP) win over the generic
HONCHO_OAUTH_CLIENT_ID, which still overrides every surface.

* feat(honcho): show OAuth vs API key in status; detect existing OAuth in setup

status now prints 'Auth: OAuth (clientId, token valid Xm/expired)' instead of
masking the OAuth access token as a generic API key; setup notes an existing
OAuth grant when re-run.

* docs(honcho): drop 'shared pool' wording from unified observation mode help

* fix(honcho): cross-process lock around OAuth refresh to prevent grant revocation

The in-process threading lock can't stop a sibling process (another profile or
the desktop app sharing honcho.json) from replaying the single-use refresh
token and tripping reuse-detection, which revokes the whole grant. Guard the
read-refresh-persist section with an OS file lock on <config>.lock so only one
process rotates at a time; the others re-read the freshly-persisted token.
Best-effort: platforms without flock degrade to in-process serialization.

* refactor(honcho): one OAuth client (hermes-agent) for all surfaces

Collapse the per-surface client_id split. CLI and desktop now use a single
client_id (hermes-agent); consent branding/UI still adapt via the source query
param. One grant identity means no clientId-vs-refresh-token desync that could
get the grant revoked. HONCHO_OAUTH_CLIENT_ID still overrides for self-hosting.

* fix(honcho): per-session resolves to session_id, never remapped by title

Reorder resolve_session_name so stable identifiers win over labels: gateway
per-chat key first, then the per-session session_id, then the cwd map / title.
A (possibly auto-generated) title can no longer remap a live per-session
conversation onto a second Honcho session mid-stream — fixes the desktop, which
is per-conversation via session_id. Consequence: a gateway's per-chat key now
also wins over a title (titles never remap a stable id).
2026-06-22 19:16:47 -05:00

476 lines
18 KiB
Python

"""Tests for the async-memory Honcho improvements.
Covers:
- write_frequency parsing (async / turn / session / int)
- resolve_session_name with session_title
- HonchoSessionManager.save() routing per write_frequency
- async writer thread lifecycle and retry
- flush_all() drains pending messages
- shutdown() joins the thread
"""
import json
import time
from unittest.mock import MagicMock, patch
from plugins.memory.honcho.client import HonchoClientConfig
from plugins.memory.honcho.session import (
HonchoSession,
HonchoSessionManager,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_session(**kwargs) -> HonchoSession:
return HonchoSession(
key=kwargs.get("key", "cli:test"),
user_peer_id=kwargs.get("user_peer_id", "eri"),
assistant_peer_id=kwargs.get("assistant_peer_id", "hermes"),
honcho_session_id=kwargs.get("honcho_session_id", "cli-test"),
messages=kwargs.get("messages", []),
)
def _make_manager(write_frequency="turn") -> HonchoSessionManager:
cfg = HonchoClientConfig(
write_frequency=write_frequency,
api_key="test-key",
enabled=True,
)
mgr = HonchoSessionManager(config=cfg)
mgr._honcho = MagicMock()
return mgr
# ---------------------------------------------------------------------------
# write_frequency parsing from config file
# ---------------------------------------------------------------------------
class TestWriteFrequencyParsing:
def test_string_async(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({"apiKey": "k", "writeFrequency": "async"}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.write_frequency == "async"
def test_string_turn(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({"apiKey": "k", "writeFrequency": "turn"}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.write_frequency == "turn"
def test_string_session(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({"apiKey": "k", "writeFrequency": "session"}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.write_frequency == "session"
def test_integer_frequency(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({"apiKey": "k", "writeFrequency": 5}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.write_frequency == 5
def test_integer_string_coerced(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({"apiKey": "k", "writeFrequency": "3"}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.write_frequency == 3
def test_host_block_overrides_root(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({
"apiKey": "k",
"writeFrequency": "turn",
"hosts": {"hermes": {"writeFrequency": "session"}},
}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.write_frequency == "session"
def test_defaults_to_async(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({"apiKey": "k"}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.write_frequency == "async"
# ---------------------------------------------------------------------------
# resolve_session_name with session_title
# ---------------------------------------------------------------------------
class TestResolveSessionNameTitle:
def test_manual_override_beats_title(self):
cfg = HonchoClientConfig(sessions={"/my/project": "manual-name"})
result = cfg.resolve_session_name("/my/project", session_title="the-title")
assert result == "manual-name"
def test_title_beats_dirname(self):
cfg = HonchoClientConfig()
result = cfg.resolve_session_name("/some/dir", session_title="my-project")
assert result == "my-project"
def test_title_with_peer_prefix(self):
cfg = HonchoClientConfig(peer_name="eri", session_peer_prefix=True)
result = cfg.resolve_session_name("/some/dir", session_title="aeris")
assert result == "eri-aeris"
def test_title_sanitized(self):
cfg = HonchoClientConfig()
result = cfg.resolve_session_name("/some/dir", session_title="my project/name!")
# trailing dashes stripped by .strip('-')
assert result == "my-project-name"
def test_title_all_invalid_chars_falls_back_to_dirname(self):
cfg = HonchoClientConfig()
result = cfg.resolve_session_name("/some/dir", session_title="!!! ###")
# sanitized to empty → falls back to dirname
assert result == "dir"
def test_none_title_falls_back_to_dirname(self):
cfg = HonchoClientConfig()
result = cfg.resolve_session_name("/some/dir", session_title=None)
assert result == "dir"
def test_empty_title_falls_back_to_dirname(self):
cfg = HonchoClientConfig()
result = cfg.resolve_session_name("/some/dir", session_title="")
assert result == "dir"
def test_per_session_uses_session_id(self):
cfg = HonchoClientConfig(session_strategy="per-session")
result = cfg.resolve_session_name("/some/dir", session_id="20260309_175514_9797dd")
assert result == "20260309_175514_9797dd"
def test_per_session_with_peer_prefix(self):
cfg = HonchoClientConfig(session_strategy="per-session", peer_name="eri", session_peer_prefix=True)
result = cfg.resolve_session_name("/some/dir", session_id="20260309_175514_9797dd")
assert result == "eri-20260309_175514_9797dd"
def test_per_session_no_id_falls_back_to_dirname(self):
cfg = HonchoClientConfig(session_strategy="per-session")
result = cfg.resolve_session_name("/some/dir", session_id=None)
assert result == "dir"
def test_per_session_id_beats_title(self):
# per-session: the run's session_id is authoritative; an (auto-)generated
# title must NOT remap a live conversation onto a second Honcho session.
cfg = HonchoClientConfig(session_strategy="per-session")
result = cfg.resolve_session_name("/some/dir", session_title="my-title", session_id="20260309_175514_9797dd")
assert result == "20260309_175514_9797dd"
def test_per_session_id_beats_manual_map(self):
# per-session: session_id also wins over a stale cwd map entry (e.g. the
# desktop launching from a mapped home dir).
cfg = HonchoClientConfig(session_strategy="per-session", sessions={"/some/dir": "pinned"})
result = cfg.resolve_session_name("/some/dir", session_id="20260309_175514_9797dd")
assert result == "20260309_175514_9797dd"
def test_title_still_applies_for_non_per_session(self):
# Outside per-session, /title still names the Honcho session.
cfg = HonchoClientConfig(session_strategy="per-directory")
result = cfg.resolve_session_name("/some/dir", session_title="my-title", session_id="20260309_175514_9797dd")
assert result == "my-title"
def test_gateway_key_beats_per_session_id(self):
# Gateways keep per-chat isolation even in per-session.
cfg = HonchoClientConfig(session_strategy="per-session")
result = cfg.resolve_session_name("/some/dir", gateway_session_key="agent:main:telegram:dm:42", session_id="20260309_175514_9797dd")
assert result == "agent-main-telegram-dm-42"
def test_global_strategy_returns_workspace(self):
cfg = HonchoClientConfig(session_strategy="global", workspace_id="my-workspace")
result = cfg.resolve_session_name("/some/dir")
assert result == "my-workspace"
# ---------------------------------------------------------------------------
# save() routing per write_frequency
# ---------------------------------------------------------------------------
class TestSaveRouting:
def _make_session_with_message(self, mgr=None):
sess = _make_session()
sess.add_message("user", "hello")
sess.add_message("assistant", "hi")
if mgr:
mgr._cache[sess.key] = sess
return sess
def test_turn_flushes_immediately(self):
mgr = _make_manager(write_frequency="turn")
sess = self._make_session_with_message(mgr)
with patch.object(mgr, "_flush_session") as mock_flush:
mgr.save(sess)
mock_flush.assert_called_once_with(sess)
def test_session_mode_does_not_flush(self):
mgr = _make_manager(write_frequency="session")
sess = self._make_session_with_message(mgr)
with patch.object(mgr, "_flush_session") as mock_flush:
mgr.save(sess)
mock_flush.assert_not_called()
def test_async_mode_enqueues(self):
mgr = _make_manager(write_frequency="async")
sess = self._make_session_with_message(mgr)
with patch.object(mgr, "_flush_session") as mock_flush:
mgr.save(sess)
# flush_session should NOT be called synchronously
mock_flush.assert_not_called()
assert not mgr._async_queue.empty()
def test_int_frequency_flushes_on_nth_turn(self):
mgr = _make_manager(write_frequency=3)
sess = self._make_session_with_message(mgr)
with patch.object(mgr, "_flush_session") as mock_flush:
mgr.save(sess) # turn 1
mgr.save(sess) # turn 2
assert mock_flush.call_count == 0
mgr.save(sess) # turn 3
assert mock_flush.call_count == 1
def test_int_frequency_skips_other_turns(self):
mgr = _make_manager(write_frequency=5)
sess = self._make_session_with_message(mgr)
with patch.object(mgr, "_flush_session") as mock_flush:
for _ in range(4):
mgr.save(sess)
assert mock_flush.call_count == 0
mgr.save(sess) # turn 5
assert mock_flush.call_count == 1
# ---------------------------------------------------------------------------
# flush_all()
# ---------------------------------------------------------------------------
class TestFlushAll:
def test_flushes_all_cached_sessions(self):
mgr = _make_manager(write_frequency="session")
s1 = _make_session(key="s1", honcho_session_id="s1")
s2 = _make_session(key="s2", honcho_session_id="s2")
s1.add_message("user", "a")
s2.add_message("user", "b")
mgr._cache = {"s1": s1, "s2": s2}
with patch.object(mgr, "_flush_session") as mock_flush:
mgr.flush_all()
assert mock_flush.call_count == 2
def test_flush_all_drains_async_queue(self):
mgr = _make_manager(write_frequency="async")
sess = _make_session()
sess.add_message("user", "pending")
with patch.object(mgr, "_flush_session") as mock_flush:
# Put the item AFTER the mock is installed so the background
# writer thread (if it dequeues before flush_all) still hits
# the mock rather than the real _flush_session.
mgr._async_queue.put(sess)
mgr.flush_all()
# Called at least once for the queued item
assert mock_flush.call_count >= 1
def test_flush_all_tolerates_errors(self):
mgr = _make_manager(write_frequency="session")
sess = _make_session()
mgr._cache = {"key": sess}
with patch.object(mgr, "_flush_session", side_effect=RuntimeError("oops")):
# Should not raise
mgr.flush_all()
# ---------------------------------------------------------------------------
# async writer thread lifecycle
# ---------------------------------------------------------------------------
class TestAsyncWriterThread:
def test_thread_started_on_async_mode(self):
mgr = _make_manager(write_frequency="async")
assert mgr._async_thread is not None
assert mgr._async_thread.is_alive()
mgr.shutdown()
def test_no_thread_for_turn_mode(self):
mgr = _make_manager(write_frequency="turn")
assert mgr._async_thread is None
assert mgr._async_queue is None
def test_shutdown_joins_thread(self):
mgr = _make_manager(write_frequency="async")
assert mgr._async_thread.is_alive()
mgr.shutdown()
assert not mgr._async_thread.is_alive()
def test_async_writer_calls_flush(self):
mgr = _make_manager(write_frequency="async")
sess = _make_session()
sess.add_message("user", "async msg")
flushed = []
def capture(s):
flushed.append(s)
return True
mgr._flush_session = capture
mgr._async_queue.put(sess)
# Give the daemon thread time to process
deadline = time.time() + 2.0
while not flushed and time.time() < deadline:
time.sleep(0.05)
mgr.shutdown()
assert len(flushed) == 1
assert flushed[0] is sess
def test_shutdown_sentinel_stops_loop(self):
mgr = _make_manager(write_frequency="async")
thread = mgr._async_thread
mgr.shutdown()
thread.join(timeout=3)
assert not thread.is_alive()
# ---------------------------------------------------------------------------
# async retry on failure
# ---------------------------------------------------------------------------
class TestAsyncWriterRetry:
def test_retries_once_on_failure(self):
mgr = _make_manager(write_frequency="async")
sess = _make_session()
sess.add_message("user", "msg")
call_count = [0]
def flaky_flush(s):
call_count[0] += 1
if call_count[0] == 1:
raise ConnectionError("network blip")
# second call succeeds silently
mgr._flush_session = flaky_flush
with patch("time.sleep"): # skip the 2s sleep in retry
mgr._async_queue.put(sess)
deadline = time.time() + 3.0
while call_count[0] < 2 and time.time() < deadline:
time.sleep(0.05)
mgr.shutdown()
assert call_count[0] == 2
def test_drops_after_two_failures(self):
mgr = _make_manager(write_frequency="async")
sess = _make_session()
sess.add_message("user", "msg")
call_count = [0]
def always_fail(s):
call_count[0] += 1
raise RuntimeError("always broken")
mgr._flush_session = always_fail
with patch("time.sleep"):
mgr._async_queue.put(sess)
deadline = time.time() + 3.0
while call_count[0] < 2 and time.time() < deadline:
time.sleep(0.05)
mgr.shutdown()
# Should have tried exactly twice (initial + one retry) and not crashed
assert call_count[0] == 2
assert not mgr._async_thread.is_alive()
def test_retries_when_flush_reports_failure(self):
mgr = _make_manager(write_frequency="async")
sess = _make_session()
sess.add_message("user", "msg")
call_count = [0]
def fail_then_succeed(_session):
call_count[0] += 1
return call_count[0] > 1
mgr._flush_session = fail_then_succeed
with patch("time.sleep"):
mgr._async_queue.put(sess)
deadline = time.time() + 3.0
while call_count[0] < 2 and time.time() < deadline:
time.sleep(0.05)
mgr.shutdown()
assert call_count[0] == 2
class TestMemoryFileMigrationTargets:
def test_soul_upload_targets_ai_peer(self, tmp_path):
mgr = _make_manager(write_frequency="turn")
session = _make_session(
key="cli:test",
user_peer_id="custom-user",
assistant_peer_id="custom-ai",
honcho_session_id="cli-test",
)
mgr._cache[session.key] = session
user_peer = MagicMock(name="user-peer")
ai_peer = MagicMock(name="ai-peer")
mgr._peers_cache[session.user_peer_id] = user_peer
mgr._peers_cache[session.assistant_peer_id] = ai_peer
honcho_session = MagicMock()
mgr._sessions_cache[session.honcho_session_id] = honcho_session
(tmp_path / "MEMORY.md").write_text("memory facts", encoding="utf-8")
(tmp_path / "USER.md").write_text("user profile", encoding="utf-8")
(tmp_path / "SOUL.md").write_text("ai identity", encoding="utf-8")
uploaded = mgr.migrate_memory_files(session.key, str(tmp_path))
assert uploaded is True
assert honcho_session.upload_file.call_count == 3
peer_by_upload_name = {}
for call_args in honcho_session.upload_file.call_args_list:
payload = call_args.kwargs["file"]
peer_by_upload_name[payload[0]] = call_args.kwargs["peer"]
assert peer_by_upload_name["consolidated_memory.md"] is user_peer
assert peer_by_upload_name["user_profile.md"] is user_peer
assert peer_by_upload_name["agent_soul.md"] is ai_peer
# ---------------------------------------------------------------------------
# HonchoClientConfig dataclass defaults for new fields
# ---------------------------------------------------------------------------
class TestNewConfigFieldDefaults:
def test_write_frequency_default(self):
cfg = HonchoClientConfig()
assert cfg.write_frequency == "async"
def test_write_frequency_set(self):
cfg = HonchoClientConfig(write_frequency="turn")
assert cfg.write_frequency == "turn"
class TestPrefetchCacheAccessors:
def test_set_and_pop_context_result(self):
mgr = _make_manager(write_frequency="turn")
payload = {"representation": "Known user", "card": "prefers concise replies"}
mgr.set_context_result("cli:test", payload)
assert mgr.pop_context_result("cli:test") == payload
assert mgr.pop_context_result("cli:test") == {}