fix(profiles): normalize profile IDs for Kanban assignees and lookups

- Add normalize_profile_name() for lowercase canonical IDs and Default alias
- Use canonical names in create/delete/rename/export/import/set_active paths
- Canonicalize Kanban assignee on create/assign, list filter, and worker spawn
- Tests for mixed-case assignees and profile resolution (fixes #18498)
This commit is contained in:
changchun989 2026-05-02 03:03:30 +08:00 committed by Teknium
parent 60c4bc96fd
commit a31477dabb
4 changed files with 160 additions and 72 deletions

View file

@ -179,29 +179,50 @@ def _get_wrapper_dir() -> Path:
# Validation
# ---------------------------------------------------------------------------
def normalize_profile_name(name: str) -> str:
"""Return the canonical profile id used on disk and in CLI ``-p`` argv.
Named profiles are stored lowercase under ``profiles/<id>/``. The special
alias ``default`` is matched case-insensitively (``Default`` ``default``).
Dashboards and tools may pass title-cased display labels; normalize before
validation, assignment, and subprocess spawn (see issue #18498).
"""
if not isinstance(name, str):
name = str(name)
stripped = name.strip()
if not stripped:
raise ValueError("profile name cannot be empty")
if stripped.casefold() == "default":
return "default"
return stripped.lower()
def validate_profile_name(name: str) -> None:
"""Raise ``ValueError`` if *name* is not a valid profile identifier."""
if name == "default":
canon = normalize_profile_name(name)
if canon == "default":
return # special alias for ~/.hermes
if not _PROFILE_ID_RE.match(name):
if not _PROFILE_ID_RE.match(canon):
raise ValueError(
f"Invalid profile name {name!r}. Must match "
f"Invalid profile name {canon!r}. Must match "
f"[a-z0-9][a-z0-9_-]{{0,63}}"
)
def get_profile_dir(name: str) -> Path:
"""Resolve a profile name to its HERMES_HOME directory."""
if name == "default":
canon = normalize_profile_name(name)
if canon == "default":
return _get_default_hermes_home()
return _get_profiles_root() / name
return _get_profiles_root() / canon
def profile_exists(name: str) -> bool:
"""Check whether a profile directory exists."""
if name == "default":
canon = normalize_profile_name(name)
if canon == "default":
return True
return get_profile_dir(name).is_dir()
return get_profile_dir(canon).is_dir()
# ---------------------------------------------------------------------------
@ -213,28 +234,29 @@ def check_alias_collision(name: str) -> Optional[str]:
Checks: reserved names, hermes subcommands, existing binaries in PATH.
"""
if name in _RESERVED_NAMES:
return f"'{name}' is a reserved name"
if name in _HERMES_SUBCOMMANDS:
return f"'{name}' conflicts with a hermes subcommand"
canon = normalize_profile_name(name)
if canon in _RESERVED_NAMES:
return f"'{canon}' is a reserved name"
if canon in _HERMES_SUBCOMMANDS:
return f"'{canon}' conflicts with a hermes subcommand"
# Check existing commands in PATH
wrapper_dir = _get_wrapper_dir()
try:
result = subprocess.run(
["which", name], capture_output=True, text=True, timeout=5,
["which", canon], capture_output=True, text=True, timeout=5,
)
if result.returncode == 0:
existing_path = result.stdout.strip()
# Allow overwriting our own wrappers
if existing_path == str(wrapper_dir / name):
if existing_path == str(wrapper_dir / canon):
try:
content = (wrapper_dir / name).read_text()
content = (wrapper_dir / canon).read_text()
if "hermes -p" in content:
return None # it's our wrapper, safe to overwrite
except Exception:
pass
return f"'{name}' conflicts with an existing command ({existing_path})"
return f"'{canon}' conflicts with an existing command ({existing_path})"
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
@ -252,6 +274,7 @@ def create_wrapper_script(name: str) -> Optional[Path]:
Returns the path to the created wrapper, or None if creation failed.
"""
canon = normalize_profile_name(name)
wrapper_dir = _get_wrapper_dir()
try:
wrapper_dir.mkdir(parents=True, exist_ok=True)
@ -259,9 +282,9 @@ def create_wrapper_script(name: str) -> Optional[Path]:
print(f"⚠ Could not create {wrapper_dir}: {e}")
return None
wrapper_path = wrapper_dir / name
wrapper_path = wrapper_dir / canon
try:
wrapper_path.write_text(f'#!/bin/sh\nexec hermes -p {name} "$@"\n')
wrapper_path.write_text(f'#!/bin/sh\nexec hermes -p {canon} "$@"\n')
wrapper_path.chmod(wrapper_path.stat().st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)
return wrapper_path
except OSError as e:
@ -271,7 +294,7 @@ def create_wrapper_script(name: str) -> Optional[Path]:
def remove_wrapper_script(name: str) -> bool:
"""Remove the wrapper script for a profile. Returns True if removed."""
wrapper_path = _get_wrapper_dir() / name
wrapper_path = _get_wrapper_dir() / normalize_profile_name(name)
if wrapper_path.exists():
try:
# Verify it's our wrapper before removing
@ -422,15 +445,16 @@ def create_profile(
The newly created profile directory.
"""
validate_profile_name(name)
canon = normalize_profile_name(name)
if name == "default":
if canon == "default":
raise ValueError(
"Cannot create a profile named 'default' — it is the built-in profile (~/.hermes)."
)
profile_dir = get_profile_dir(name)
profile_dir = get_profile_dir(canon)
if profile_dir.exists():
raise FileExistsError(f"Profile '{name}' already exists at {profile_dir}")
raise FileExistsError(f"Profile '{canon}' already exists at {profile_dir}")
# Resolve clone source
source_dir = None
@ -541,23 +565,24 @@ def delete_profile(name: str, yes: bool = False) -> Path:
Returns the path that was removed.
"""
validate_profile_name(name)
canon = normalize_profile_name(name)
if name == "default":
if canon == "default":
raise ValueError(
"Cannot delete the default profile (~/.hermes).\n"
"To remove everything, use: hermes uninstall"
)
profile_dir = get_profile_dir(name)
profile_dir = get_profile_dir(canon)
if not profile_dir.is_dir():
raise FileNotFoundError(f"Profile '{name}' does not exist.")
raise FileNotFoundError(f"Profile '{canon}' does not exist.")
# Show what will be deleted
model, provider = _read_config_model(profile_dir)
gw_running = _check_gateway_running(profile_dir)
skill_count = _count_skills(profile_dir)
print(f"\nProfile: {name}")
print(f"\nProfile: {canon}")
print(f"Path: {profile_dir}")
if model:
print(f"Model: {model}" + (f" ({provider})" if provider else ""))
@ -569,7 +594,7 @@ def delete_profile(name: str, yes: bool = False) -> Path:
]
# Check for service
wrapper_path = _get_wrapper_dir() / name
wrapper_path = _get_wrapper_dir() / canon
has_wrapper = wrapper_path.exists()
if has_wrapper:
items.append(f"Command alias ({wrapper_path})")
@ -584,16 +609,16 @@ def delete_profile(name: str, yes: bool = False) -> Path:
if not yes:
print()
try:
confirm = input(f"Type '{name}' to confirm: ").strip()
confirm = input(f"Type '{canon}' to confirm: ").strip()
except (KeyboardInterrupt, EOFError):
print("\nCancelled.")
return profile_dir
if confirm != name:
if confirm != canon:
print("Cancelled.")
return profile_dir
# 1. Disable service (prevents auto-restart)
_cleanup_gateway_service(name, profile_dir)
_cleanup_gateway_service(canon, profile_dir)
# 2. Stop running gateway
if gw_running:
@ -601,7 +626,7 @@ def delete_profile(name: str, yes: bool = False) -> Path:
# 3. Remove wrapper script
if has_wrapper:
if remove_wrapper_script(name):
if remove_wrapper_script(canon):
print(f"✓ Removed {wrapper_path}")
# 4. Remove profile directory
@ -614,13 +639,13 @@ def delete_profile(name: str, yes: bool = False) -> Path:
# 5. Clear active_profile if it pointed to this profile
try:
active = get_active_profile()
if active == name:
if active == canon:
set_active_profile("default")
print("✓ Active profile reset to default")
except Exception:
pass
print(f"\nProfile '{name}' deleted.")
print(f"\nProfile '{canon}' deleted.")
return profile_dir
@ -731,21 +756,22 @@ def set_active_profile(name: str) -> None:
Writes to ``~/.hermes/active_profile``. Use ``"default"`` to clear.
"""
validate_profile_name(name)
if name != "default" and not profile_exists(name):
canon = normalize_profile_name(name)
if canon != "default" and not profile_exists(canon):
raise FileNotFoundError(
f"Profile '{name}' does not exist. "
f"Create it with: hermes profile create {name}"
f"Profile '{canon}' does not exist. "
f"Create it with: hermes profile create {canon}"
)
path = _get_active_profile_path()
path.parent.mkdir(parents=True, exist_ok=True)
if name == "default":
if canon == "default":
# Remove the file to indicate default
path.unlink(missing_ok=True)
else:
# Atomic write
tmp = path.with_suffix(".tmp")
tmp.write_text(name + "\n")
tmp.write_text(canon + "\n")
tmp.replace(path)
@ -812,15 +838,16 @@ def export_profile(name: str, output_path: str) -> Path:
import tempfile
validate_profile_name(name)
profile_dir = get_profile_dir(name)
canon = normalize_profile_name(name)
profile_dir = get_profile_dir(canon)
if not profile_dir.is_dir():
raise FileNotFoundError(f"Profile '{name}' does not exist.")
raise FileNotFoundError(f"Profile '{canon}' does not exist.")
output = Path(output_path)
# shutil.make_archive wants the base name without extension
base = str(output).removesuffix(".tar.gz").removesuffix(".tgz")
if name == "default":
if canon == "default":
# The default profile IS ~/.hermes itself — its parent is ~/ and its
# directory name is ".hermes", not "default". We stage a clean copy
# under a temp dir so the archive contains ``default/...``.
@ -836,14 +863,14 @@ def export_profile(name: str, output_path: str) -> Path:
# Named profiles — stage a filtered copy to exclude credentials
with tempfile.TemporaryDirectory() as tmpdir:
staged = Path(tmpdir) / name
staged = Path(tmpdir) / canon
_CREDENTIAL_FILES = {"auth.json", ".env"}
shutil.copytree(
profile_dir,
staged,
ignore=lambda d, contents: _CREDENTIAL_FILES & set(contents),
)
result = shutil.make_archive(base, "gztar", tmpdir, name)
result = shutil.make_archive(base, "gztar", tmpdir, canon)
return Path(result)
@ -952,16 +979,17 @@ def import_profile(archive_path: str, name: Optional[str] = None) -> Path:
# Archives exported from the default profile have "default/" as top-level
# dir. Importing as "default" would target ~/.hermes itself — disallow
# that and guide the user toward a named profile.
if inferred_name == "default":
validate_profile_name(inferred_name)
canon = normalize_profile_name(inferred_name)
if canon == "default":
raise ValueError(
"Cannot import as 'default' — that is the built-in root profile (~/.hermes). "
"Specify a different name: hermes profile import <archive> --name <name>"
)
validate_profile_name(inferred_name)
profile_dir = get_profile_dir(inferred_name)
profile_dir = get_profile_dir(canon)
if profile_dir.exists():
raise FileExistsError(f"Profile '{inferred_name}' already exists at {profile_dir}")
raise FileExistsError(f"Profile '{canon}' already exists at {profile_dir}")
profiles_root = _get_profiles_root()
profiles_root.mkdir(parents=True, exist_ok=True)
@ -977,8 +1005,8 @@ def import_profile(archive_path: str, name: Optional[str] = None) -> Path:
)
final_source = extracted
if archive_root != inferred_name:
final_source = staging_root / inferred_name
if archive_root != canon:
final_source = staging_root / canon
extracted.rename(final_source)
shutil.move(str(final_source), str(profile_dir))
@ -1050,23 +1078,25 @@ def rename_profile(old_name: str, new_name: str) -> Path:
"""
validate_profile_name(old_name)
validate_profile_name(new_name)
old_canon = normalize_profile_name(old_name)
new_canon = normalize_profile_name(new_name)
if old_name == "default":
if old_canon == "default":
raise ValueError("Cannot rename the default profile.")
if new_name == "default":
if new_canon == "default":
raise ValueError("Cannot rename to 'default' — it is reserved.")
old_dir = get_profile_dir(old_name)
new_dir = get_profile_dir(new_name)
old_dir = get_profile_dir(old_canon)
new_dir = get_profile_dir(new_canon)
if not old_dir.is_dir():
raise FileNotFoundError(f"Profile '{old_name}' does not exist.")
raise FileNotFoundError(f"Profile '{old_canon}' does not exist.")
if new_dir.exists():
raise FileExistsError(f"Profile '{new_name}' already exists.")
raise FileExistsError(f"Profile '{new_canon}' already exists.")
# 1. Stop gateway if running
if _check_gateway_running(old_dir):
_cleanup_gateway_service(old_name, old_dir)
_cleanup_gateway_service(old_canon, old_dir)
_stop_gateway_process(old_dir)
# 2. Rename directory
@ -1074,22 +1104,22 @@ def rename_profile(old_name: str, new_name: str) -> Path:
print(f"✓ Renamed {old_dir.name}{new_dir.name}")
# 3. Update profile-scoped Honcho host blocks, preserving aiPeer identity
_migrate_honcho_profile_host(old_name, new_name, new_dir)
_migrate_honcho_profile_host(old_canon, new_canon, new_dir)
# 4. Update wrapper script
remove_wrapper_script(old_name)
collision = check_alias_collision(new_name)
remove_wrapper_script(old_canon)
collision = check_alias_collision(new_canon)
if not collision:
create_wrapper_script(new_name)
print(f"✓ Alias updated: {new_name}")
create_wrapper_script(new_canon)
print(f"✓ Alias updated: {new_canon}")
else:
print(f"⚠ Cannot create alias '{new_name}'{collision}")
print(f"⚠ Cannot create alias '{new_canon}'{collision}")
# 5. Update active_profile if it pointed to old name
try:
if get_active_profile() == old_name:
set_active_profile(new_name)
print(f"✓ Active profile updated: {new_name}")
if get_active_profile() == old_canon:
set_active_profile(new_canon)
print(f"✓ Active profile updated: {new_canon}")
except Exception:
pass
@ -1192,12 +1222,13 @@ def resolve_profile_env(profile_name: str) -> str:
are imported, to set the HERMES_HOME environment variable.
"""
validate_profile_name(profile_name)
profile_dir = get_profile_dir(profile_name)
canon = normalize_profile_name(profile_name)
profile_dir = get_profile_dir(canon)
if profile_name != "default" and not profile_dir.is_dir():
if canon != "default" and not profile_dir.is_dir():
raise FileNotFoundError(
f"Profile '{profile_name}' does not exist. "
f"Create it with: hermes profile create {profile_name}"
f"Profile '{canon}' does not exist. "
f"Create it with: hermes profile create {canon}"
)
return str(profile_dir)