feat(api_server): add POST /v1/runs/{run_id}/stop endpoint

Add ability to interrupt a running agent via the runs API. Previously
/v1/runs could start a run and subscribe to events, but there was no
way to cancel it. The new endpoint stores agent and task references
during execution, calls agent.interrupt() to stop LLM calls, then
cancels the asyncio task.

Includes 15 tests covering start, events, and stop scenarios.
This commit is contained in:
ekko 2026-04-25 21:45:40 +08:00 committed by Teknium
parent ce0513dd2e
commit 0a15dbdc43
2 changed files with 404 additions and 0 deletions

View file

@ -9,6 +9,7 @@ Exposes an HTTP server with endpoints:
- GET /v1/models lists hermes-agent as an available model
- POST /v1/runs start a run, returns run_id immediately (202)
- GET /v1/runs/{run_id}/events SSE stream of structured lifecycle events
- POST /v1/runs/{run_id}/stop interrupt a running agent
- GET /health health check
- GET /health/detailed rich status for cross-container dashboard probing
@ -586,6 +587,9 @@ class APIServerAdapter(BasePlatformAdapter):
self._run_streams: Dict[str, "asyncio.Queue[Optional[Dict]]"] = {}
# Creation timestamps for orphaned-run TTL sweep
self._run_streams_created: Dict[str, float] = {}
# Active run agent/task references for stop support
self._active_run_agents: Dict[str, Any] = {}
self._active_run_tasks: Dict[str, "asyncio.Task"] = {}
self._session_db: Optional[Any] = None # Lazy-init SessionDB for session continuity
@staticmethod
@ -2441,6 +2445,7 @@ class APIServerAdapter(BasePlatformAdapter):
stream_delta_callback=_text_cb,
tool_progress_callback=event_cb,
)
self._active_run_agents[run_id] = agent
def _run_sync():
r = agent.run_conversation(
user_message=user_message,
@ -2480,8 +2485,11 @@ class APIServerAdapter(BasePlatformAdapter):
q.put_nowait(None)
except Exception:
pass
self._active_run_agents.pop(run_id, None)
self._active_run_tasks.pop(run_id, None)
task = asyncio.create_task(_run_and_close())
self._active_run_tasks[run_id] = task
try:
self._background_tasks.add(task)
except TypeError:
@ -2540,6 +2548,34 @@ class APIServerAdapter(BasePlatformAdapter):
return response
async def _handle_stop_run(self, request: "web.Request") -> "web.Response":
"""POST /v1/runs/{run_id}/stop — interrupt a running agent."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
run_id = request.match_info["run_id"]
agent = self._active_run_agents.get(run_id)
task = self._active_run_tasks.get(run_id)
if agent is None and task is None:
return web.json_response(_openai_error(f"Run not found: {run_id}", code="run_not_found"), status=404)
if agent is not None:
try:
agent.interrupt("Stop requested via API")
except Exception:
pass
if task is not None and not task.done():
task.cancel()
try:
await task
except (asyncio.CancelledError, Exception):
pass
return web.json_response({"run_id": run_id, "status": "stopping"})
async def _sweep_orphaned_runs(self) -> None:
"""Periodically clean up run streams that were never consumed."""
while True:
@ -2554,6 +2590,8 @@ class APIServerAdapter(BasePlatformAdapter):
logger.debug("[api_server] sweeping orphaned run %s", run_id)
self._run_streams.pop(run_id, None)
self._run_streams_created.pop(run_id, None)
self._active_run_agents.pop(run_id, None)
self._active_run_tasks.pop(run_id, None)
# ------------------------------------------------------------------
# BasePlatformAdapter interface
@ -2589,6 +2627,7 @@ class APIServerAdapter(BasePlatformAdapter):
# Structured event streaming
self._app.router.add_post("/v1/runs", self._handle_runs)
self._app.router.add_get("/v1/runs/{run_id}/events", self._handle_run_events)
self._app.router.add_post("/v1/runs/{run_id}/stop", self._handle_stop_run)
# Start background sweep to clean up orphaned (unconsumed) run streams
sweep_task = asyncio.create_task(self._sweep_orphaned_runs())
try: