mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-17 09:41:58 +00:00
feat(gateway): inject stable human-readable message timestamps
Consolidates these related Amy fork patches: - 429830f39 feat(gateway): inject message timestamps into user messages for LLM context - 3c3d6fac0 fix: handle both ISO string and epoch float timestamps in history replay - 2874f7725 feat: human-friendly timestamp format with weekday and timezone name - 3735f4c8b fix: render gateway message timestamps once
This commit is contained in:
parent
b7f0c9cd52
commit
bd7fc8fdcd
13 changed files with 448 additions and 28 deletions
|
|
@ -599,6 +599,7 @@ def init_agent(
|
|||
# (e.g. CLI voice mode adds a temporary prefix for the live call only).
|
||||
agent._persist_user_message_idx = None
|
||||
agent._persist_user_message_override = None
|
||||
agent._persist_user_message_timestamp = None
|
||||
|
||||
# Cache anthropic image-to-text fallbacks per image payload/URL so a
|
||||
# single tool loop does not repeatedly re-run auxiliary vision on the
|
||||
|
|
|
|||
|
|
@ -474,6 +474,7 @@ def run_conversation(
|
|||
task_id: str = None,
|
||||
stream_callback: Optional[callable] = None,
|
||||
persist_user_message: Optional[str] = None,
|
||||
persist_user_timestamp: Optional[float] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Run a complete conversation with tool calling until completion.
|
||||
|
|
@ -489,6 +490,8 @@ def run_conversation(
|
|||
persist_user_message: Optional clean user message to store in
|
||||
transcripts/history when user_message contains API-only
|
||||
synthetic prefixes.
|
||||
persist_user_timestamp: Optional platform event timestamp to store
|
||||
as metadata on that persisted user message.
|
||||
or queuing follow-up prefetch work.
|
||||
|
||||
Returns:
|
||||
|
|
@ -510,6 +513,7 @@ def run_conversation(
|
|||
task_id,
|
||||
stream_callback,
|
||||
persist_user_message,
|
||||
persist_user_timestamp,
|
||||
restore_or_build_system_prompt=_restore_or_build_system_prompt,
|
||||
install_safe_stdio=_install_safe_stdio,
|
||||
sanitize_surrogates=_sanitize_surrogates,
|
||||
|
|
|
|||
|
|
@ -69,6 +69,7 @@ def build_turn_context(
|
|||
task_id: Optional[str],
|
||||
stream_callback,
|
||||
persist_user_message: Optional[str],
|
||||
persist_user_timestamp: Optional[float] = None,
|
||||
*,
|
||||
restore_or_build_system_prompt,
|
||||
install_safe_stdio,
|
||||
|
|
@ -121,6 +122,7 @@ def build_turn_context(
|
|||
agent._stream_callback = stream_callback
|
||||
agent._persist_user_message_idx = None
|
||||
agent._persist_user_message_override = persist_user_message
|
||||
agent._persist_user_message_timestamp = persist_user_timestamp
|
||||
# Generate unique task_id if not provided to isolate VMs between tasks.
|
||||
effective_task_id = task_id or str(uuid.uuid4())
|
||||
agent._current_task_id = effective_task_id
|
||||
|
|
|
|||
166
gateway/message_timestamps.py
Normal file
166
gateway/message_timestamps.py
Normal file
|
|
@ -0,0 +1,166 @@
|
|||
"""Helpers for rendering gateway message timestamps exactly once.
|
||||
|
||||
Gateway messages need timestamps in the LLM context for temporal awareness, but
|
||||
persisted message content should stay clean so replay does not accumulate
|
||||
``[timestamp] [timestamp] ...`` prefixes across turns.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from datetime import datetime
|
||||
from typing import Any, Optional, Tuple
|
||||
|
||||
|
||||
# Current gateway format: [Tue 2026-04-28 13:40:53 CEST]
|
||||
_HUMAN_TIMESTAMP_RE = re.compile(
|
||||
r"^\[(?P<dow>[A-Z][a-z]{2}) "
|
||||
r"(?P<date>\d{4}-\d{2}-\d{2}) "
|
||||
r"(?P<time>\d{2}:\d{2}:\d{2})"
|
||||
r"(?: (?P<tz>[A-Za-z0-9_+\-/:]+))?\]\s*"
|
||||
)
|
||||
|
||||
# Older gateway format: [2026-04-13T17:02:06+0200] or [+02:00]
|
||||
_ISO_TIMESTAMP_RE = re.compile(
|
||||
r"^\[(?P<iso>\d{4}-\d{2}-\d{2}T[^\]]+)\]\s*"
|
||||
)
|
||||
|
||||
|
||||
def coerce_message_timestamp(ts_value: Any, tz=None) -> Optional[float]:
|
||||
"""Coerce a timestamp-like value to Unix epoch seconds.
|
||||
|
||||
Accepts Unix epoch numbers, datetime objects, ISO strings, and the gateway's
|
||||
bracketed human-readable timestamp format. Returns ``None`` when the value
|
||||
cannot be interpreted.
|
||||
"""
|
||||
if ts_value is None:
|
||||
return None
|
||||
|
||||
if isinstance(ts_value, (int, float)):
|
||||
return float(ts_value)
|
||||
|
||||
if hasattr(ts_value, "timestamp"):
|
||||
try:
|
||||
return float(ts_value.timestamp())
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
if isinstance(ts_value, str):
|
||||
text = ts_value.strip()
|
||||
if not text:
|
||||
return None
|
||||
parsed = _parse_timestamp_prefix(text, tz=tz)
|
||||
if parsed is not None:
|
||||
return parsed
|
||||
try:
|
||||
return float(text)
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
try:
|
||||
dt = datetime.fromisoformat(text)
|
||||
except (TypeError, ValueError):
|
||||
try:
|
||||
dt = datetime.strptime(text, "%Y-%m-%dT%H:%M:%S%z")
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
if dt.tzinfo is None:
|
||||
if tz is not None:
|
||||
dt = dt.replace(tzinfo=tz)
|
||||
else:
|
||||
dt = dt.astimezone()
|
||||
return float(dt.timestamp())
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def format_message_timestamp(ts_value: Any, tz=None) -> str:
|
||||
"""Format a timestamp value as ``[Tue 2026-04-28 13:40:53 CEST]``."""
|
||||
epoch = coerce_message_timestamp(ts_value, tz=tz)
|
||||
if epoch is None:
|
||||
return ""
|
||||
if tz is not None:
|
||||
dt = datetime.fromtimestamp(epoch, tz=tz)
|
||||
else:
|
||||
dt = datetime.fromtimestamp(epoch).astimezone()
|
||||
return "[" + dt.strftime("%a %Y-%m-%d %H:%M:%S %Z") + "]"
|
||||
|
||||
|
||||
def strip_leading_message_timestamps(content: str, tz=None) -> Tuple[str, Optional[float]]:
|
||||
"""Strip one or more leading gateway timestamp prefixes from ``content``.
|
||||
|
||||
Returns ``(clean_content, embedded_epoch)``. If multiple timestamp prefixes
|
||||
are present, the timestamp closest to the actual message text wins. That
|
||||
preserves the original platform-send time for legacy contaminated rows like
|
||||
``[processing time] [platform time] [sender] message``.
|
||||
"""
|
||||
if not isinstance(content, str) or not content:
|
||||
return content, None
|
||||
|
||||
text = content
|
||||
embedded_epoch: Optional[float] = None
|
||||
|
||||
while True:
|
||||
match = _HUMAN_TIMESTAMP_RE.match(text) or _ISO_TIMESTAMP_RE.match(text)
|
||||
if not match:
|
||||
break
|
||||
parsed = _parse_timestamp_match(match, tz=tz)
|
||||
if parsed is not None:
|
||||
embedded_epoch = parsed
|
||||
text = text[match.end():]
|
||||
|
||||
return text, embedded_epoch
|
||||
|
||||
|
||||
def render_user_content_with_timestamp(content: str, ts_value: Any = None, tz=None) -> str:
|
||||
"""Render a user message for LLM context with exactly one timestamp prefix.
|
||||
|
||||
Existing leading timestamp prefixes are removed first. If such a prefix was
|
||||
present, its parsed time wins over ``ts_value``; otherwise ``ts_value`` is
|
||||
formatted and prepended. If no timestamp is available, the cleaned content is
|
||||
returned unchanged.
|
||||
"""
|
||||
clean_content, embedded_epoch = strip_leading_message_timestamps(content, tz=tz)
|
||||
effective_ts = embedded_epoch if embedded_epoch is not None else ts_value
|
||||
prefix = format_message_timestamp(effective_ts, tz=tz)
|
||||
if not prefix:
|
||||
return clean_content
|
||||
if clean_content:
|
||||
return f"{prefix} {clean_content}"
|
||||
return prefix
|
||||
|
||||
|
||||
def _parse_timestamp_prefix(text: str, tz=None) -> Optional[float]:
|
||||
match = _HUMAN_TIMESTAMP_RE.match(text) or _ISO_TIMESTAMP_RE.match(text)
|
||||
if not match:
|
||||
return None
|
||||
return _parse_timestamp_match(match, tz=tz)
|
||||
|
||||
|
||||
def _parse_timestamp_match(match: re.Match, tz=None) -> Optional[float]:
|
||||
if "iso" in match.groupdict() and match.group("iso"):
|
||||
iso_text = match.group("iso")
|
||||
try:
|
||||
dt = datetime.fromisoformat(iso_text)
|
||||
except ValueError:
|
||||
try:
|
||||
dt = datetime.strptime(iso_text, "%Y-%m-%dT%H:%M:%S%z")
|
||||
except ValueError:
|
||||
return None
|
||||
if dt.tzinfo is None:
|
||||
if tz is not None:
|
||||
dt = dt.replace(tzinfo=tz)
|
||||
else:
|
||||
dt = dt.astimezone()
|
||||
return float(dt.timestamp())
|
||||
|
||||
date_part = match.group("date")
|
||||
time_part = match.group("time")
|
||||
try:
|
||||
dt = datetime.strptime(f"{date_part} {time_part}", "%Y-%m-%d %H:%M:%S")
|
||||
except ValueError:
|
||||
return None
|
||||
if tz is not None:
|
||||
dt = dt.replace(tzinfo=tz)
|
||||
else:
|
||||
dt = dt.astimezone()
|
||||
return float(dt.timestamp())
|
||||
|
|
@ -706,6 +706,12 @@ def _build_gateway_agent_history(
|
|||
the current message behind ``history_offset`` during persistence.
|
||||
"""
|
||||
|
||||
from hermes_time import get_timezone as _get_msg_tz
|
||||
from gateway.message_timestamps import (
|
||||
render_user_content_with_timestamp as _render_msg_ts,
|
||||
)
|
||||
|
||||
_msg_tz = _get_msg_tz()
|
||||
agent_history: List[Dict[str, Any]] = []
|
||||
observed_group_context: List[str] = []
|
||||
separate_observed_context = _uses_telegram_observed_group_context(channel_prompt)
|
||||
|
|
@ -725,6 +731,8 @@ def _build_gateway_agent_history(
|
|||
continue
|
||||
|
||||
content = msg.get("content")
|
||||
if role == "user" and isinstance(content, str):
|
||||
content = _render_msg_ts(content, msg.get("timestamp"), tz=_msg_tz)
|
||||
if separate_observed_context and msg.get("observed") and role == "user" and content:
|
||||
observed_group_context.append(str(content).strip())
|
||||
continue
|
||||
|
|
@ -8378,6 +8386,8 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
|
||||
# Read privacy.redact_pii from config (re-read per message)
|
||||
_redact_pii = False
|
||||
persist_user_message = None
|
||||
persist_user_timestamp = None
|
||||
try:
|
||||
_pcfg = _load_gateway_config()
|
||||
_redact_pii = bool((_pcfg.get("privacy") or {}).get("redact_pii", False))
|
||||
|
|
@ -8902,6 +8912,34 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
if message_text is None:
|
||||
return
|
||||
|
||||
# Inject message timestamp so the LLM sees when this message was sent.
|
||||
# Keep the persisted transcript clean: timestamps are stored as
|
||||
# metadata and rendered into context exactly once on replay.
|
||||
try:
|
||||
from hermes_time import get_timezone as _get_evt_tz
|
||||
from gateway.message_timestamps import (
|
||||
coerce_message_timestamp as _coerce_msg_ts,
|
||||
render_user_content_with_timestamp as _render_msg_ts,
|
||||
strip_leading_message_timestamps as _strip_msg_ts,
|
||||
)
|
||||
_evt_tz = _get_evt_tz()
|
||||
_evt_ts = getattr(event, "timestamp", None)
|
||||
if message_text and isinstance(message_text, str):
|
||||
_clean_message_text, _embedded_ts = _strip_msg_ts(
|
||||
message_text, tz=_evt_tz)
|
||||
persist_user_message = _clean_message_text
|
||||
_event_epoch = _coerce_msg_ts(_evt_ts, tz=_evt_tz)
|
||||
persist_user_timestamp = (
|
||||
_event_epoch if _event_epoch is not None else _embedded_ts
|
||||
)
|
||||
message_text = _render_msg_ts(
|
||||
_clean_message_text,
|
||||
persist_user_timestamp,
|
||||
tz=_evt_tz,
|
||||
)
|
||||
except Exception as _ts_err:
|
||||
logger.debug("Message timestamp injection failed (non-fatal): %s", _ts_err)
|
||||
|
||||
# Bind this gateway run generation to the adapter's active-session
|
||||
# event so deferred post-delivery callbacks can be released by the
|
||||
# same run that registered them.
|
||||
|
|
@ -8935,6 +8973,8 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
run_generation=run_generation,
|
||||
event_message_id=self._reply_anchor_for_event(event),
|
||||
channel_prompt=event.channel_prompt,
|
||||
persist_user_message=persist_user_message,
|
||||
persist_user_timestamp=persist_user_timestamp,
|
||||
)
|
||||
|
||||
# Stop persistent typing indicator now that the agent is done
|
||||
|
|
@ -9226,7 +9266,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
"Your next message will start a fresh session."
|
||||
)
|
||||
|
||||
ts = datetime.now().isoformat()
|
||||
ts = time.time() # Unix epoch float — consistent with DB storage
|
||||
|
||||
# If this is a fresh session (no history), write the full tool
|
||||
# definitions as the first entry so the transcript is self-describing
|
||||
|
|
@ -9262,7 +9302,19 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
# message so the next message can load a transcript that
|
||||
# reflects what was said. Skip the assistant error text since
|
||||
# it's a gateway-generated hint, not model output. (#7100)
|
||||
_user_entry = {"role": "user", "content": message_text, "timestamp": ts}
|
||||
_user_entry = {
|
||||
"role": "user",
|
||||
"content": (
|
||||
persist_user_message
|
||||
if persist_user_message is not None
|
||||
else message_text
|
||||
),
|
||||
"timestamp": (
|
||||
persist_user_timestamp
|
||||
if persist_user_timestamp is not None
|
||||
else ts
|
||||
),
|
||||
}
|
||||
if event.message_id:
|
||||
_user_entry["message_id"] = str(event.message_id)
|
||||
self.session_store.append_to_transcript(
|
||||
|
|
@ -9276,7 +9328,19 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
|
||||
# If no new messages found (edge case), fall back to simple user/assistant
|
||||
if not new_messages:
|
||||
_user_entry = {"role": "user", "content": message_text, "timestamp": ts}
|
||||
_user_entry = {
|
||||
"role": "user",
|
||||
"content": (
|
||||
persist_user_message
|
||||
if persist_user_message is not None
|
||||
else message_text
|
||||
),
|
||||
"timestamp": (
|
||||
persist_user_timestamp
|
||||
if persist_user_timestamp is not None
|
||||
else ts
|
||||
),
|
||||
}
|
||||
if event.message_id:
|
||||
_user_entry["message_id"] = str(event.message_id)
|
||||
self.session_store.append_to_transcript(
|
||||
|
|
@ -9401,13 +9465,26 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
_recent_transcript = []
|
||||
for _msg in reversed(_recent_transcript[-10:]):
|
||||
if _msg.get("role") == "user":
|
||||
_already_persisted = (_msg.get("content") == message_text)
|
||||
_expected_user_content = (
|
||||
persist_user_message
|
||||
if persist_user_message is not None
|
||||
else message_text
|
||||
)
|
||||
_already_persisted = (_msg.get("content") == _expected_user_content)
|
||||
break
|
||||
if not _already_persisted:
|
||||
_user_entry = {
|
||||
"role": "user",
|
||||
"content": message_text,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"content": (
|
||||
persist_user_message
|
||||
if persist_user_message is not None
|
||||
else message_text
|
||||
),
|
||||
"timestamp": (
|
||||
persist_user_timestamp
|
||||
if persist_user_timestamp is not None
|
||||
else time.time()
|
||||
),
|
||||
}
|
||||
if getattr(event, "message_id", None):
|
||||
_user_entry["message_id"] = str(event.message_id)
|
||||
|
|
@ -13602,6 +13679,8 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
_interrupt_depth: int = 0,
|
||||
event_message_id: Optional[str] = None,
|
||||
channel_prompt: Optional[str] = None,
|
||||
persist_user_message: Optional[str] = None,
|
||||
persist_user_timestamp: Optional[float] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Run the agent with the given message and context.
|
||||
|
|
@ -15028,7 +15107,8 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
# Keep real user text separate from API-only recovery guidance. If
|
||||
# an auto-continue note is prepended below, persist the original
|
||||
# message so stale guidance never replays as user-authored text.
|
||||
_persist_user_message_override: Optional[Any] = None
|
||||
_persist_user_message_override: Optional[Any] = persist_user_message
|
||||
_persist_user_timestamp_override: Optional[float] = persist_user_timestamp
|
||||
|
||||
# Prepend pending model switch note so the model knows about the switch
|
||||
_pending_notes = getattr(self, '_pending_model_notes', {})
|
||||
|
|
@ -15168,6 +15248,8 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
_conversation_kwargs["persist_user_message"] = _persist_user_message_override
|
||||
elif observed_group_context:
|
||||
_conversation_kwargs["persist_user_message"] = message
|
||||
if _persist_user_timestamp_override is not None:
|
||||
_conversation_kwargs["persist_user_timestamp"] = _persist_user_timestamp_override
|
||||
result = agent.run_conversation(_api_run_message, **_conversation_kwargs)
|
||||
finally:
|
||||
unregister_gateway_notify(_approval_session_key)
|
||||
|
|
|
|||
|
|
@ -1322,6 +1322,7 @@ class SessionStore:
|
|||
message.get("platform_message_id") or message.get("message_id")
|
||||
),
|
||||
observed=bool(message.get("observed")),
|
||||
timestamp=message.get("timestamp"),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Session DB operation failed: %s", e)
|
||||
|
|
|
|||
|
|
@ -2379,6 +2379,7 @@ class SessionDB:
|
|||
codex_message_items: Any = None,
|
||||
platform_message_id: str = None,
|
||||
observed: bool = False,
|
||||
timestamp: Any = None,
|
||||
) -> int:
|
||||
"""
|
||||
Append a message to a session. Returns the message row ID.
|
||||
|
|
@ -2410,6 +2411,16 @@ class SessionDB:
|
|||
# cannot bind list/dict parameters directly.
|
||||
stored_content = self._encode_content(content)
|
||||
|
||||
message_timestamp = time.time()
|
||||
if timestamp is not None:
|
||||
try:
|
||||
if hasattr(timestamp, "timestamp"):
|
||||
message_timestamp = float(timestamp.timestamp())
|
||||
else:
|
||||
message_timestamp = float(timestamp)
|
||||
except (TypeError, ValueError):
|
||||
logger.debug("Ignoring invalid explicit message timestamp: %r", timestamp)
|
||||
|
||||
# Pre-compute tool call count
|
||||
num_tool_calls = 0
|
||||
if tool_calls is not None:
|
||||
|
|
@ -2429,7 +2440,7 @@ class SessionDB:
|
|||
tool_call_id,
|
||||
tool_calls_json,
|
||||
tool_name,
|
||||
time.time(),
|
||||
message_timestamp,
|
||||
token_count,
|
||||
finish_reason,
|
||||
reasoning,
|
||||
|
|
@ -2482,6 +2493,16 @@ class SessionDB:
|
|||
for msg in messages:
|
||||
role = msg.get("role", "unknown")
|
||||
tool_calls = msg.get("tool_calls")
|
||||
message_timestamp = now_ts
|
||||
if msg.get("timestamp") is not None:
|
||||
try:
|
||||
ts_value = msg.get("timestamp")
|
||||
if hasattr(ts_value, "timestamp"):
|
||||
message_timestamp = float(ts_value.timestamp())
|
||||
else:
|
||||
message_timestamp = float(ts_value)
|
||||
except (TypeError, ValueError):
|
||||
logger.debug("Ignoring invalid explicit message timestamp: %r", msg.get("timestamp"))
|
||||
reasoning_details = msg.get("reasoning_details") if role == "assistant" else None
|
||||
codex_reasoning_items = (
|
||||
msg.get("codex_reasoning_items") if role == "assistant" else None
|
||||
|
|
@ -2519,7 +2540,7 @@ class SessionDB:
|
|||
msg.get("tool_call_id"),
|
||||
tool_calls_json,
|
||||
msg.get("tool_name"),
|
||||
now_ts,
|
||||
message_timestamp,
|
||||
msg.get("token_count"),
|
||||
msg.get("finish_reason"),
|
||||
msg.get("reasoning") if role == "assistant" else None,
|
||||
|
|
@ -2536,7 +2557,7 @@ class SessionDB:
|
|||
total_tool_calls += (
|
||||
len(tool_calls) if isinstance(tool_calls, list) else 1
|
||||
)
|
||||
now_ts += 1e-6
|
||||
now_ts = max(now_ts + 1e-6, message_timestamp + 1e-6)
|
||||
|
||||
conn.execute(
|
||||
"UPDATE sessions SET message_count = ?, tool_call_count = ? WHERE id = ?",
|
||||
|
|
@ -2867,9 +2888,9 @@ class SessionDB:
|
|||
rows = self._conn.execute(
|
||||
"SELECT role, content, tool_call_id, tool_calls, tool_name, "
|
||||
"finish_reason, reasoning, reasoning_content, reasoning_details, "
|
||||
"codex_reasoning_items, codex_message_items, platform_message_id, observed "
|
||||
"codex_reasoning_items, codex_message_items, platform_message_id, observed, timestamp "
|
||||
f"FROM messages WHERE session_id IN ({placeholders})"
|
||||
f"{active_clause} ORDER BY id",
|
||||
f"{active_clause} ORDER BY timestamp, id",
|
||||
tuple(session_ids),
|
||||
).fetchall()
|
||||
|
||||
|
|
@ -2879,6 +2900,8 @@ class SessionDB:
|
|||
if row["role"] in {"user", "assistant"} and isinstance(content, str):
|
||||
content = sanitize_context(content).strip()
|
||||
msg = {"role": row["role"], "content": content}
|
||||
if row["timestamp"]:
|
||||
msg["timestamp"] = row["timestamp"]
|
||||
if row["tool_call_id"]:
|
||||
msg["tool_call_id"] = row["tool_call_id"]
|
||||
if row["tool_name"]:
|
||||
|
|
|
|||
24
run_agent.py
24
run_agent.py
|
|
@ -1472,16 +1472,21 @@ class AIAgent:
|
|||
that synthetic text leak into persisted transcripts or resumed session
|
||||
history. When an override is configured for the active turn, mutate the
|
||||
in-memory messages list in place so both persistence and returned
|
||||
history stay clean.
|
||||
history stay clean. A paired timestamp override preserves the platform
|
||||
event time as message metadata, rather than embedding it in content.
|
||||
"""
|
||||
idx = getattr(self, "_persist_user_message_idx", None)
|
||||
override = getattr(self, "_persist_user_message_override", None)
|
||||
if override is None or idx is None:
|
||||
timestamp = getattr(self, "_persist_user_message_timestamp", None)
|
||||
if idx is None or (override is None and timestamp is None):
|
||||
return
|
||||
if 0 <= idx < len(messages):
|
||||
msg = messages[idx]
|
||||
if isinstance(msg, dict) and msg.get("role") == "user":
|
||||
msg["content"] = override
|
||||
if override is not None:
|
||||
msg["content"] = override
|
||||
if timestamp is not None:
|
||||
msg["timestamp"] = timestamp
|
||||
|
||||
def _persist_session(self, messages: List[Dict], conversation_history: List[Dict] = None):
|
||||
"""Save session state to both JSON log and SQLite on any exit path.
|
||||
|
|
@ -1639,6 +1644,7 @@ class AIAgent:
|
|||
reasoning_details=msg.get("reasoning_details") if role == "assistant" else None,
|
||||
codex_reasoning_items=msg.get("codex_reasoning_items") if role == "assistant" else None,
|
||||
codex_message_items=msg.get("codex_message_items") if role == "assistant" else None,
|
||||
timestamp=msg.get("timestamp"),
|
||||
)
|
||||
flushed_ids.add(msg_id)
|
||||
self._last_flushed_db_idx = len(messages)
|
||||
|
|
@ -5218,10 +5224,20 @@ class AIAgent:
|
|||
task_id: str = None,
|
||||
stream_callback: Optional[callable] = None,
|
||||
persist_user_message: Optional[str] = None,
|
||||
persist_user_timestamp: Optional[float] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Forwarder — see ``agent.conversation_loop.run_conversation``."""
|
||||
from agent.conversation_loop import run_conversation
|
||||
return run_conversation(self, user_message, system_message, conversation_history, task_id, stream_callback, persist_user_message)
|
||||
return run_conversation(
|
||||
self,
|
||||
user_message,
|
||||
system_message,
|
||||
conversation_history,
|
||||
task_id,
|
||||
stream_callback,
|
||||
persist_user_message,
|
||||
persist_user_timestamp,
|
||||
)
|
||||
|
||||
def chat(self, message: str, stream_callback: Optional[callable] = None) -> str:
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -211,7 +211,10 @@ class TestListAndCleanup:
|
|||
|
||||
db = manager._get_db()
|
||||
messages = db.get_messages_as_conversation(state.session_id)
|
||||
assert messages == [{"role": "user", "content": "original"}]
|
||||
assert len(messages) == 1
|
||||
assert messages[0]["role"] == "user"
|
||||
assert messages[0]["content"] == "original"
|
||||
assert isinstance(messages[0].get("timestamp"), (int, float))
|
||||
|
||||
def test_cleanup_clears_all(self, manager):
|
||||
s1 = manager.create_session()
|
||||
|
|
@ -501,6 +504,8 @@ class TestPersistence:
|
|||
|
||||
restored = manager.get_session(state.session_id)
|
||||
assert restored is not None
|
||||
msg = restored.history[0]
|
||||
assert isinstance(msg.pop("timestamp", None), (int, float))
|
||||
assert restored.history == [{
|
||||
"role": "assistant",
|
||||
"content": "hello",
|
||||
|
|
|
|||
|
|
@ -23,12 +23,20 @@ class _CapturingAgent:
|
|||
type(self).last_init = dict(kwargs)
|
||||
self.tools = []
|
||||
|
||||
def run_conversation(self, user_message, conversation_history=None, task_id=None, persist_user_message=None):
|
||||
def run_conversation(
|
||||
self,
|
||||
user_message,
|
||||
conversation_history=None,
|
||||
task_id=None,
|
||||
persist_user_message=None,
|
||||
persist_user_timestamp=None,
|
||||
):
|
||||
type(self).last_run = {
|
||||
"user_message": user_message,
|
||||
"conversation_history": conversation_history,
|
||||
"task_id": task_id,
|
||||
"persist_user_message": persist_user_message,
|
||||
"persist_user_timestamp": persist_user_timestamp,
|
||||
}
|
||||
return {
|
||||
"final_response": "ok",
|
||||
|
|
|
|||
91
tests/gateway/test_message_timestamps.py
Normal file
91
tests/gateway/test_message_timestamps.py
Normal file
|
|
@ -0,0 +1,91 @@
|
|||
from datetime import datetime
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from gateway.message_timestamps import (
|
||||
coerce_message_timestamp,
|
||||
render_user_content_with_timestamp,
|
||||
strip_leading_message_timestamps,
|
||||
)
|
||||
from run_agent import AIAgent
|
||||
|
||||
|
||||
BERLIN = ZoneInfo("Europe/Berlin")
|
||||
|
||||
|
||||
def _epoch(year, month, day, hour, minute, second):
|
||||
return datetime(year, month, day, hour, minute, second, tzinfo=BERLIN).timestamp()
|
||||
|
||||
|
||||
def test_render_user_content_adds_single_context_timestamp():
|
||||
ts = _epoch(2026, 4, 28, 13, 40, 53)
|
||||
|
||||
rendered = render_user_content_with_timestamp(
|
||||
"[Example User] Timestamp should be in context",
|
||||
ts,
|
||||
tz=BERLIN,
|
||||
)
|
||||
|
||||
assert rendered == (
|
||||
"[Tue 2026-04-28 13:40:53 CEST] "
|
||||
"[Example User] Timestamp should be in context"
|
||||
)
|
||||
|
||||
|
||||
def test_render_user_content_deduplicates_existing_timestamp_and_preserves_embedded_time():
|
||||
db_processing_ts = _epoch(2026, 4, 27, 15, 55, 36)
|
||||
stored_content = (
|
||||
"[Mon 2026-04-27 15:54:44 CEST] "
|
||||
"[Example User] This should go on our todo list"
|
||||
)
|
||||
|
||||
rendered = render_user_content_with_timestamp(
|
||||
stored_content,
|
||||
db_processing_ts,
|
||||
tz=BERLIN,
|
||||
)
|
||||
|
||||
assert rendered == stored_content
|
||||
assert rendered.count("2026-04-27") == 1
|
||||
|
||||
|
||||
def test_strip_leading_message_timestamps_removes_multiple_prefixes_and_prefers_inner_time():
|
||||
content = (
|
||||
"[Mon 2026-04-27 15:55:36 CEST] "
|
||||
"[Mon 2026-04-27 15:54:44 CEST] "
|
||||
"[Example User] This should go on our todo list"
|
||||
)
|
||||
|
||||
stripped, embedded_ts = strip_leading_message_timestamps(content, tz=BERLIN)
|
||||
|
||||
assert stripped == "[Example User] This should go on our todo list"
|
||||
assert embedded_ts == _epoch(2026, 4, 27, 15, 54, 44)
|
||||
|
||||
|
||||
def test_coerce_message_timestamp_accepts_datetime_and_epoch():
|
||||
dt = datetime(2026, 4, 28, 13, 40, 53, tzinfo=BERLIN)
|
||||
|
||||
assert coerce_message_timestamp(dt, tz=BERLIN) == dt.timestamp()
|
||||
assert coerce_message_timestamp(dt.timestamp(), tz=BERLIN) == dt.timestamp()
|
||||
|
||||
|
||||
def test_persist_user_message_override_keeps_clean_content_and_timestamp_metadata():
|
||||
agent = AIAgent.__new__(AIAgent)
|
||||
agent._persist_user_message_idx = 0
|
||||
agent._persist_user_message_override = "[Example User] Clean content"
|
||||
agent._persist_user_message_timestamp = _epoch(2026, 4, 28, 13, 40, 53)
|
||||
messages = [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "[Tue 2026-04-28 13:40:53 CEST] [Example User] Clean content",
|
||||
}
|
||||
]
|
||||
|
||||
agent._apply_persist_user_message_override(messages)
|
||||
|
||||
assert messages == [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "[Example User] Clean content",
|
||||
"timestamp": _epoch(2026, 4, 28, 13, 40, 53),
|
||||
}
|
||||
]
|
||||
|
|
@ -241,7 +241,11 @@ async def test_session_chat_loads_history_and_preserves_session_headers(auth_ada
|
|||
assert kwargs["session_id"] == session_id
|
||||
assert kwargs["gateway_session_key"] == "client-42"
|
||||
assert kwargs["ephemeral_system_prompt"] == "stay focused"
|
||||
assert kwargs["conversation_history"] == [
|
||||
history = kwargs["conversation_history"]
|
||||
assert len(history) == 2
|
||||
assert isinstance(history[0].pop("timestamp"), (int, float))
|
||||
assert isinstance(history[1].pop("timestamp"), (int, float))
|
||||
assert history == [
|
||||
{"role": "user", "content": "earlier"},
|
||||
{"role": "assistant", "content": "prior answer"},
|
||||
]
|
||||
|
|
|
|||
|
|
@ -347,6 +347,15 @@ class TestMessageStorage:
|
|||
assert messages[0]["content"] == "Hello"
|
||||
assert messages[1]["role"] == "assistant"
|
||||
|
||||
def test_append_message_accepts_explicit_timestamp(self, db):
|
||||
db.create_session(session_id="s1", source="telegram")
|
||||
event_ts = 1777383653.0
|
||||
|
||||
db.append_message("s1", role="user", content="Hello", timestamp=event_ts)
|
||||
|
||||
messages = db.get_messages_as_conversation("s1")
|
||||
assert messages[0]["timestamp"] == event_ts
|
||||
|
||||
def test_message_increments_session_count(self, db):
|
||||
db.create_session(session_id="s1", source="cli")
|
||||
db.append_message("s1", role="user", content="Hello")
|
||||
|
|
@ -370,11 +379,10 @@ class TestMessageStorage:
|
|||
assert messages[1]["observed"] == 0
|
||||
|
||||
conversation = db.get_messages_as_conversation("s1")
|
||||
assert conversation[0] == {
|
||||
"role": "user",
|
||||
"content": "[Alice|111]\nside chatter",
|
||||
"observed": True,
|
||||
}
|
||||
assert conversation[0]["role"] == "user"
|
||||
assert conversation[0]["content"] == "[Alice|111]\nside chatter"
|
||||
assert conversation[0]["observed"] is True
|
||||
assert isinstance(conversation[0].get("timestamp"), float)
|
||||
assert "observed" not in conversation[1]
|
||||
|
||||
def test_tool_response_does_not_increment_tool_count(self, db):
|
||||
|
|
@ -458,7 +466,9 @@ class TestMessageStorage:
|
|||
# get_messages_as_conversation decodes back to the original list
|
||||
conv = db.get_messages_as_conversation("s1")
|
||||
assert len(conv) == 1
|
||||
assert conv[0] == {"role": "user", "content": content}
|
||||
assert conv[0]["role"] == "user"
|
||||
assert conv[0]["content"] == content
|
||||
assert isinstance(conv[0].get("timestamp"), float)
|
||||
|
||||
def test_dict_content_round_trip(self, db):
|
||||
"""Dict-shaped content (e.g. provider wrappers) also round-trips."""
|
||||
|
|
@ -529,8 +539,12 @@ class TestMessageStorage:
|
|||
|
||||
conv = db.get_messages_as_conversation("s1")
|
||||
assert len(conv) == 2
|
||||
assert conv[0] == {"role": "user", "content": "Hello"}
|
||||
assert conv[1] == {"role": "assistant", "content": "Hi!"}
|
||||
assert conv[0]["role"] == "user"
|
||||
assert conv[0]["content"] == "Hello"
|
||||
assert isinstance(conv[0]["timestamp"], float)
|
||||
assert conv[1]["role"] == "assistant"
|
||||
assert conv[1]["content"] == "Hi!"
|
||||
assert isinstance(conv[1]["timestamp"], float)
|
||||
|
||||
def test_platform_message_id_round_trips(self, db):
|
||||
"""Platform-side message ids (yuanbao msg_id, telegram update_id, …)
|
||||
|
|
@ -620,7 +634,10 @@ class TestMessageStorage:
|
|||
)
|
||||
|
||||
conv = db.get_messages_as_conversation("s1")
|
||||
assert conv == [{"role": "assistant", "content": "Visible answer"}]
|
||||
assert len(conv) == 1
|
||||
assert conv[0]["role"] == "assistant"
|
||||
assert conv[0]["content"] == "Visible answer"
|
||||
assert isinstance(conv[0].get("timestamp"), float)
|
||||
|
||||
def test_reasoning_persisted_and_restored(self, db):
|
||||
"""Reasoning text is stored for assistant messages and restored by
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue