From e76e7b50730067dee8f36bcac0c267f4fde886ed Mon Sep 17 00:00:00 2001 From: Wolfram Ravenwolf Date: Tue, 14 Apr 2026 00:37:39 +0200 Subject: [PATCH] feat(hooks): session:compress event_callback for MemPalace sync --- agent/agent_init.py | 4 +++- agent/conversation_compression.py | 14 ++++++++++++++ gateway/run.py | 17 ++++++++++++++--- run_agent.py | 5 ++++- 4 files changed, 35 insertions(+), 5 deletions(-) diff --git a/agent/agent_init.py b/agent/agent_init.py index 2c2ded871e5..3687fe55970 100644 --- a/agent/agent_init.py +++ b/agent/agent_init.py @@ -27,7 +27,7 @@ import threading import time import uuid from datetime import datetime -from typing import Any, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional from urllib.parse import urlparse, parse_qs, urlunparse from agent.context_compressor import ContextCompressor @@ -195,6 +195,7 @@ def init_agent( status_callback: callable = None, notice_callback: callable = None, notice_clear_callback: callable = None, + event_callback: Optional[Callable[[str, dict], None]] = None, max_tokens: int = None, reasoning_config: Dict[str, Any] = None, service_tier: str = None, @@ -426,6 +427,7 @@ def init_agent( agent.status_callback = status_callback agent.notice_callback = notice_callback agent.notice_clear_callback = notice_clear_callback + agent.event_callback = event_callback agent.tool_gen_callback = tool_gen_callback diff --git a/agent/conversation_compression.py b/agent/conversation_compression.py index d5469a1b344..318e67d0faf 100644 --- a/agent/conversation_compression.py +++ b/agent/conversation_compression.py @@ -603,6 +603,20 @@ def compress_context( force=True, ) + # Emit session:compress event so hooks (e.g. MemPalace sync) can ingest + # the completed old session before its details are lost. + _old_sid_for_event = locals().get("old_session_id") + if getattr(agent, "event_callback", None): + try: + agent.event_callback("session:compress", { + "platform": agent.platform or "", + "session_id": agent.session_id, + "old_session_id": _old_sid_for_event or "", + "compression_count": agent.context_compressor.compression_count, + }) + except Exception as e: + logger.debug("event_callback error on session:compress: %s", e) + # Keep the post-compression rough estimate for diagnostics, but do not # treat it as provider-reported prompt usage. Schema-heavy rough estimates # can remain above threshold even after the next real API request fits. diff --git a/gateway/run.py b/gateway/run.py index b688f3a3613..0f25f215a42 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -14368,6 +14368,17 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew log_message="agent:step hook scheduling error", ) + # Bridge sync event_callback → async hooks.emit for lifecycle events + # (e.g. session:compress fires after context compression splits a session) + def _event_callback_sync(event_type: str, context: dict) -> None: + try: + asyncio.run_coroutine_threadsafe( + _hooks_ref.emit(event_type, context), + _loop_for_step, + ) + except Exception as _e: + logger.debug("event_callback hook error: %s", _e) + # Bridge sync status_callback → async adapter.send for context pressure _status_adapter = self.adapters.get(source.platform) _status_chat_id = source.chat_id @@ -14702,15 +14713,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew agent.stream_delta_callback = _stream_delta_cb agent.interim_assistant_callback = _interim_assistant_cb if _want_interim_messages else None agent.status_callback = _status_callback_sync - # Credits / out-of-band notices (usage bands, depletion, restored). # Messaging has no persistent status bar, so each notice is a # standalone push: render to a single plaintext line and deliver via # the shared _deliver_platform_notice rail (honors private/public + # thread metadata). Fires from the agent's sync worker thread, so we - # hop onto the gateway loop with safe_schedule_threadsafe — same + # hop onto the gateway loop with safe_schedule_threadsafe - same # pattern as _status_callback_sync. The fired-once latch lives on the - # cached agent and persists across turns, so a band crosses → one + # cached agent and persists across turns, so a band crosses -> one # push (no per-turn re-nag). Recovery ("✓ Credit access restored") # rides the same show path (it's emitted as a success notice, not a # clear). The clear callback is a no-op: a sent platform message @@ -14734,6 +14744,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew agent.notice_callback = _notice_callback_sync agent.notice_clear_callback = None + agent.event_callback = _event_callback_sync agent.reasoning_config = reasoning_config agent.service_tier = self._service_tier agent.request_overrides = turn_route.get("request_overrides") or {} diff --git a/run_agent.py b/run_agent.py index 94d3be3e674..7c38b3ef0a9 100644 --- a/run_agent.py +++ b/run_agent.py @@ -45,7 +45,8 @@ import tempfile import time import threading import uuid -from typing import List, Dict, Any, Optional +from typing import List, Dict, Any, Optional, Callable +from urllib.parse import urlparse, parse_qs, urlunparse # NOTE: `from openai import OpenAI` is deliberately NOT at module top — the # SDK pulls ~240 ms of imports. We expose `OpenAI` as a thin proxy object # that imports the SDK on first call/isinstance check. This preserves: @@ -384,6 +385,7 @@ class AIAgent: status_callback: callable = None, notice_callback: callable = None, notice_clear_callback: callable = None, + event_callback: Optional[Callable[[str, dict], None]] = None, max_tokens: int = None, reasoning_config: Dict[str, Any] = None, service_tier: str = None, @@ -458,6 +460,7 @@ class AIAgent: status_callback=status_callback, notice_callback=notice_callback, notice_clear_callback=notice_clear_callback, + event_callback=event_callback, max_tokens=max_tokens, reasoning_config=reasoning_config, service_tier=service_tier,