fix(security): redact secrets in kanban tool payloads before persistence

This commit is contained in:
Stephen Chin 2026-05-23 21:12:07 -07:00 committed by Teknium
parent d19aabbf2d
commit 3b56d3a29a
3 changed files with 206 additions and 0 deletions

View file

@ -1650,6 +1650,7 @@ AUTHOR_MAP = {
"philip.a.dsouza@gmail.com": "PhilipAD", # direct email match
"qs2816661685@gmail.com": "qingshan89", # PR #46895 co-author (desktop remote artifact download)
"yspdev@gmail.com": "AJ", # PR #44510 co-author (desktop named-profile boot loop)
"steveonjava@gmail.com": "steveonjava", # PR #29669 (redact secrets in kanban tool payloads)
}

View file

@ -0,0 +1,191 @@
"""Tests: redact_sensitive_text is applied in kanban tool handlers.
Verifies that secrets embedded in kanban_comment body, kanban_complete
summary/result/metadata, and kanban_block reason are masked before the
values reach the DB. Uses the same worker_env fixture pattern as
test_kanban_tools.py.
"""
from __future__ import annotations
import json
import pytest
# ---------------------------------------------------------------------------
# Shared fixture — mirrors test_kanban_tools.py
# ---------------------------------------------------------------------------
@pytest.fixture
def worker_env(monkeypatch, tmp_path):
"""Isolated HERMES_HOME with a running task; returns the task id."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setenv("HERMES_PROFILE", "test-worker")
monkeypatch.delenv("HERMES_SESSION_ID", raising=False)
from pathlib import Path as _Path
monkeypatch.setattr(_Path, "home", lambda: tmp_path)
from hermes_cli import kanban_db as kb
kb._INITIALIZED_PATHS.clear()
kb.init_db()
conn = kb.connect()
try:
tid = kb.create_task(conn, title="worker-test", assignee="test-worker")
kb.claim_task(conn, tid)
finally:
conn.close()
monkeypatch.setenv("HERMES_KANBAN_TASK", tid)
return tid
# ---------------------------------------------------------------------------
# Positive tests — secrets are masked
# ---------------------------------------------------------------------------
def test_kanban_comment_body_scrubbed_github_pat(worker_env):
"""ghp_ PAT in comment body must be masked before DB write."""
from tools import kanban_tools as kt
from hermes_cli import kanban_db as kb
secret = "ghp_" + "A" * 40
kt._handle_comment({"task_id": worker_env, "body": f"token: {secret}"})
conn = kb.connect()
try:
comments = kb.list_comments(conn, worker_env)
finally:
conn.close()
assert comments, "expected at least one comment"
stored = comments[-1].body
assert secret not in stored
assert stored # something was stored
def test_kanban_comment_body_scrubbed_openai_key(worker_env):
"""sk- key in comment body must be masked before DB write."""
from tools import kanban_tools as kt
from hermes_cli import kanban_db as kb
secret = "sk-" + "A" * 48
kt._handle_comment({"task_id": worker_env, "body": f"key={secret}"})
conn = kb.connect()
try:
comments = kb.list_comments(conn, worker_env)
finally:
conn.close()
stored = comments[-1].body
assert secret not in stored
def test_kanban_complete_summary_scrubbed(worker_env):
"""sk-ant- key in summary must be masked before DB write."""
from tools import kanban_tools as kt
from hermes_cli import kanban_db as kb
secret = "sk-ant-" + "A" * 40
kt._handle_complete({"summary": f"done, key={secret}"})
conn = kb.connect()
try:
run = kb.latest_run(conn, worker_env)
finally:
conn.close()
assert run is not None
stored = run.summary or ""
assert secret not in stored
def test_kanban_complete_metadata_scrubbed(worker_env):
"""Token in metadata dict must be masked in JSON stored in DB."""
from tools import kanban_tools as kt
from hermes_cli import kanban_db as kb
secret = "ghp_" + "B" * 40
metadata = {"token": secret, "count": 5}
kt._handle_complete({"summary": "done", "metadata": metadata})
conn = kb.connect()
try:
run = kb.latest_run(conn, worker_env)
finally:
conn.close()
assert run is not None
# metadata is stored on the run; serialize to catch any nesting
meta_raw = json.dumps(run.metadata) if run.metadata else "{}"
assert secret not in meta_raw
def test_kanban_block_reason_scrubbed_jwt(worker_env):
"""JWT in block reason must be masked before DB write."""
from tools import kanban_tools as kt
from hermes_cli import kanban_db as kb
# Minimal valid-ish JWT (header.payload.sig)
jwt = (
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
".eyJzdWIiOiIxMjM0NTY3ODkwIn0"
".dozjgNryP4J3jVmNHl0w5N_5NjP1-iXkpHgcth826Iw"
)
kt._handle_block({"reason": f"Bearer {jwt}"})
conn = kb.connect()
try:
run = kb.latest_run(conn, worker_env)
finally:
conn.close()
# block_task stores reason as run.summary
assert run is not None
stored = run.summary or ""
assert jwt not in stored
# ---------------------------------------------------------------------------
# Negative test — plain text passes through unchanged
# ---------------------------------------------------------------------------
def test_kanban_comment_no_secret_passthrough(worker_env):
"""Plain text without credential patterns must pass through unchanged."""
from tools import kanban_tools as kt
from hermes_cli import kanban_db as kb
plain = "hello from the pipeline — no secrets here"
kt._handle_comment({"task_id": worker_env, "body": plain})
conn = kb.connect()
try:
comments = kb.list_comments(conn, worker_env)
finally:
conn.close()
stored = comments[-1].body
assert stored == plain
# ---------------------------------------------------------------------------
# Negative test — force=True bypasses HERMES_REDACT_SECRETS=false
# ---------------------------------------------------------------------------
def test_scrub_respects_force_flag_regardless_of_config(worker_env, monkeypatch):
"""force=True must fire even when HERMES_REDACT_SECRETS=false is set."""
monkeypatch.setenv("HERMES_REDACT_SECRETS", "false")
from tools import kanban_tools as kt
from hermes_cli import kanban_db as kb
secret = "ghp_" + "C" * 40
kt._handle_comment({"task_id": worker_env, "body": f"token: {secret}"})
conn = kb.connect()
try:
comments = kb.list_comments(conn, worker_env)
finally:
conn.close()
stored = comments[-1].body
assert secret not in stored
# ---------------------------------------------------------------------------
# Negative test — legacy result field is also scrubbed
# ---------------------------------------------------------------------------
def test_kanban_complete_result_field_scrubbed(worker_env):
"""Legacy result field must be scrubbed just like summary."""
from tools import kanban_tools as kt
from hermes_cli import kanban_db as kb
secret = "sk-" + "D" * 48
kt._handle_complete({"result": f"finished with key={secret}"})
conn = kb.connect()
try:
run = kb.latest_run(conn, worker_env)
finally:
conn.close()
assert run is not None
stored = run.summary or run.result if hasattr(run, "result") else run.summary or ""
assert secret not in (stored or "")

View file

@ -33,6 +33,7 @@ import logging
import os
from typing import Any, Optional
from agent.redact import redact_sensitive_text
from tools.registry import registry, tool_error
from hermes_cli.config import cfg_get, load_config
@ -487,6 +488,17 @@ def _handle_complete(args: dict, **kw) -> str:
summary = args.get("summary")
metadata = args.get("metadata")
result = args.get("result")
if summary:
summary = redact_sensitive_text(str(summary), force=True)
if result:
result = redact_sensitive_text(str(result), force=True)
if metadata is not None and isinstance(metadata, dict):
meta_json = json.dumps(metadata)
meta_json = redact_sensitive_text(meta_json, force=True)
try:
metadata = json.loads(meta_json)
except json.JSONDecodeError:
pass
created_cards = args.get("created_cards")
artifacts = args.get("artifacts")
if created_cards is not None:
@ -609,6 +621,7 @@ def _handle_block(args: dict, **kw) -> str:
reason = args.get("reason")
if not reason or not str(reason).strip():
return tool_error("reason is required — explain what input you need")
reason = redact_sensitive_text(str(reason), force=True)
board = args.get("board")
try:
kb, conn = _connect(board=board)
@ -696,6 +709,7 @@ def _handle_comment(args: dict, **kw) -> str:
body = args.get("body")
if not body or not str(body).strip():
return tool_error("body is required")
body = redact_sensitive_text(str(body), force=True)
# Author is intentionally derived from the worker's own runtime
# identity, NOT from caller-supplied args. Comments are injected
# into the next worker's system prompt by ``build_worker_context``