"""Canonical policy-object API routes for the 3-layer architecture pilot.
These handlers intentionally stay thin:
- authenticate/authorize request session
- translate HTTP payloads to service-layer calls
- map structured service errors to stable API responses
"""
from __future__ import annotations
from typing import Any
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import JSONResponse
from mud_server.api.auth import validate_session
from mud_server.api.models_policy import (
PolicyActivationEntryResponse,
PolicyActivationListResponse,
PolicyActivationRequest,
PolicyCapabilitiesResponse,
PolicyImportEntryResponse,
PolicyImportRequest,
PolicyImportResponse,
PolicyListResponse,
PolicyObjectResponse,
PolicyPublishRequest,
PolicyPublishResponse,
PolicyPublishRunResponse,
PolicyUpsertRequest,
PolicyValidateRequest,
PolicyValidateResponse,
)
from mud_server.core.engine import GameEngine
from mud_server.services.policy_service import (
PolicyServiceError,
parse_scope,
)
from mud_server.services.policy_service import (
get_policy as service_get_policy,
)
from mud_server.services.policy_service import (
get_policy_capabilities as service_get_policy_capabilities,
)
from mud_server.services.policy_service import (
get_publish_run as service_get_publish_run,
)
from mud_server.services.policy_service import (
import_published_artifact as service_import_published_artifact,
)
from mud_server.services.policy_service import (
list_policies as service_list_policies,
)
from mud_server.services.policy_service import (
list_policy_activations as service_list_policy_activations,
)
from mud_server.services.policy_service import (
publish_scope as service_publish_scope,
)
from mud_server.services.policy_service import (
resolve_effective_policy_activations as service_resolve_effective_policy_activations,
)
from mud_server.services.policy_service import (
set_policy_activation as service_set_policy_activation,
)
from mud_server.services.policy_service import (
upsert_policy_variant as service_upsert_policy_variant,
)
from mud_server.services.policy_service import (
validate_policy_variant as service_validate_policy_variant,
)
def _error_response(error: PolicyServiceError) -> JSONResponse:
"""Map service-layer policy errors to canonical API payloads."""
return JSONResponse(status_code=error.status_code, content=error.to_response_payload())
def _derive_slot_kinds(row: dict[str, Any]) -> list[str]:
"""Derive the slot_kinds list surfaced by ``PolicyObjectResponse``.
Authors may set ``content.slot_kinds`` on snippet policies to control which
prompt-composer slot kinds the snippet is eligible for. When omitted, the
snippet defaults to its own ``namespace`` so existing policies remain
pickable without policy-author migration.
"""
content = row.get("content") if isinstance(row, dict) else None
if isinstance(content, dict):
raw = content.get("slot_kinds")
if isinstance(raw, list):
seen: set[str] = set()
cleaned: list[str] = []
for item in raw:
if not isinstance(item, str):
continue
stripped = item.strip()
if not stripped or stripped in seen:
continue
seen.add(stripped)
cleaned.append(stripped)
if cleaned:
return cleaned
namespace = str(row.get("namespace") or "").strip()
return [namespace] if namespace else []
def _to_policy_object_response(row: dict[str, Any]) -> PolicyObjectResponse:
"""Build a ``PolicyObjectResponse`` with the derived ``slot_kinds`` field."""
enriched = {**row, "slot_kinds": _derive_slot_kinds(row)}
return PolicyObjectResponse.model_validate(enriched)
def _normalize_activation_response(row: dict[str, Any]) -> PolicyActivationEntryResponse:
"""Normalize service-layer activation rows to API response shape."""
return PolicyActivationEntryResponse(
world_id=row["world_id"],
client_profile=row["client_profile"] or None,
policy_id=row["policy_id"],
variant=row["variant"],
activated_at=row["activated_at"],
activated_by=row["activated_by"],
rollback_of_activation_id=row["rollback_of_activation_id"],
audit_event_id=row.get("audit_event_id"),
)
def _validate_policy_session_admin_or_superuser(session_id: str) -> tuple[int, str, str]:
"""Validate session and enforce admin/superuser-only policy API access."""
user_id, username, role = validate_session(session_id)
if role not in {"admin", "superuser"}:
raise HTTPException(
status_code=403,
detail="Policy API requires admin or superuser role.",
)
return user_id, username, role
[docs]
def router(_engine: GameEngine) -> APIRouter:
"""Build API router for canonical policy-object endpoints.
Args:
_engine: Reserved for future runtime-aware policy endpoints. The
current pilot endpoints do not require direct engine access.
"""
api = APIRouter(tags=["policy"])
@api.get("/api/policy-capabilities", response_model=PolicyCapabilitiesResponse)
async def get_policy_capabilities(session_id: str) -> Any:
"""Return canonical policy API capabilities for current authorized session."""
_user_id, _username, role = _validate_policy_session_admin_or_superuser(session_id)
return PolicyCapabilitiesResponse.model_validate(service_get_policy_capabilities(role=role))
@api.get("/api/policies", response_model=PolicyListResponse)
async def list_policies(
session_id: str,
policy_type: str | None = Query(default=None),
namespace: str | None = Query(default=None),
status: str | None = Query(default=None),
) -> Any:
"""List policy variants filtered by optional policy-type predicates."""
_validate_policy_session_admin_or_superuser(session_id)
try:
rows = service_list_policies(
policy_type=policy_type, namespace=namespace, status=status
)
return PolicyListResponse(items=[_to_policy_object_response(row) for row in rows])
except PolicyServiceError as error:
return _error_response(error)
@api.get("/api/policies/{policy_id}", response_model=PolicyObjectResponse)
async def get_policy(
policy_id: str,
session_id: str,
variant: str | None = Query(default=None),
) -> Any:
"""Get one policy variant by id with optional variant selector."""
_validate_policy_session_admin_or_superuser(session_id)
try:
row = service_get_policy(policy_id=policy_id, variant=variant)
return _to_policy_object_response(row)
except PolicyServiceError as error:
return _error_response(error)
@api.post("/api/policies/{policy_id}/validate", response_model=PolicyValidateResponse)
async def validate_policy_variant(
policy_id: str,
payload: PolicyValidateRequest,
session_id: str,
variant: str = Query(min_length=1),
) -> Any:
"""Validate one policy variant payload and persist validation history."""
_user_id, username, _role = _validate_policy_session_admin_or_superuser(session_id)
validated_by = payload.validated_by or username
try:
result = service_validate_policy_variant(
policy_id=policy_id,
variant=variant,
schema_version=payload.schema_version,
policy_version=payload.policy_version,
status=payload.status,
content=payload.content,
validated_by=validated_by,
)
return PolicyValidateResponse(
policy_id=result.policy_id,
variant=result.variant,
is_valid=result.is_valid,
errors=result.errors,
content_hash=result.content_hash,
validated_at=result.validated_at,
validated_by=result.validated_by,
validation_run_id=result.validation_run_id,
)
except PolicyServiceError as error:
return _error_response(error)
@api.put("/api/policies/{policy_id}/variants/{variant}", response_model=PolicyObjectResponse)
async def upsert_policy_variant(
policy_id: str,
variant: str,
payload: PolicyUpsertRequest,
session_id: str,
) -> Any:
"""Validate and write one canonical policy variant.
The service enforces validate-before-write so this endpoint is safe as
the primary Phase 2 save path for API-only authoring.
"""
_user_id, username, _role = _validate_policy_session_admin_or_superuser(session_id)
updated_by = payload.updated_by or username
try:
row = service_upsert_policy_variant(
policy_id=policy_id,
variant=variant,
schema_version=payload.schema_version,
policy_version=payload.policy_version,
status=payload.status,
content=payload.content,
updated_by=updated_by,
)
return _to_policy_object_response(row)
except PolicyServiceError as error:
return _error_response(error)
@api.post("/api/policy-activations", response_model=PolicyActivationEntryResponse)
async def set_policy_activation(payload: PolicyActivationRequest, session_id: str):
"""Create or switch one activation pointer for a scope."""
_user_id, username, _role = _validate_policy_session_admin_or_superuser(session_id)
activated_by = payload.activated_by or username
scope_text = (
payload.world_id
if payload.client_profile is None
else f"{payload.world_id}:{payload.client_profile}"
)
try:
scope = parse_scope(scope_text)
row = service_set_policy_activation(
scope=scope,
policy_id=payload.policy_id,
variant=payload.variant,
activated_by=activated_by,
rollback_of_activation_id=payload.rollback_of_activation_id,
)
return _normalize_activation_response(row)
except PolicyServiceError as error:
return _error_response(error)
@api.get("/api/policy-activations", response_model=PolicyActivationListResponse)
async def list_policy_activations(
scope: str = Query(min_length=1),
session_id: str = Query(),
effective: bool = Query(default=True),
):
"""List active pointers for one scope, optionally with world-default overlay.
``effective=true`` applies Layer 3 scope resolution:
- world scope returns world pointers
- world+client scope overlays client pointers on world defaults
"""
_validate_policy_session_admin_or_superuser(session_id)
try:
parsed_scope = parse_scope(scope)
rows = (
service_resolve_effective_policy_activations(scope=parsed_scope)
if effective
else service_list_policy_activations(scope=parsed_scope)
)
return PolicyActivationListResponse(
world_id=parsed_scope.world_id,
client_profile=parsed_scope.client_profile or None,
items=[_normalize_activation_response(row) for row in rows],
)
except PolicyServiceError as error:
return _error_response(error)
@api.post("/api/policy-publish", response_model=PolicyPublishResponse)
async def publish_policies(payload: PolicyPublishRequest, session_id: str):
"""Generate and persist one deterministic policy publish manifest."""
_user_id, username, _role = _validate_policy_session_admin_or_superuser(session_id)
actor = payload.actor or username
scope_text = (
payload.world_id
if payload.client_profile is None
else f"{payload.world_id}:{payload.client_profile}"
)
try:
scope = parse_scope(scope_text)
result = service_publish_scope(scope=scope, actor=actor)
return PolicyPublishResponse.model_validate(result)
except PolicyServiceError as error:
return _error_response(error)
@api.get("/api/policy-publish/{publish_run_id}", response_model=PolicyPublishRunResponse)
async def get_publish_run(publish_run_id: int, session_id: str):
"""Fetch one publish run with deterministic export artifact metadata."""
_validate_policy_session_admin_or_superuser(session_id)
try:
result = service_get_publish_run(publish_run_id=publish_run_id)
return PolicyPublishRunResponse.model_validate(result)
except PolicyServiceError as error:
return _error_response(error)
@api.post("/api/policy-import", response_model=PolicyImportResponse)
async def import_policy_artifact(payload: PolicyImportRequest, session_id: str):
"""Import one deterministic publish artifact into canonical DB state."""
_user_id, username, _role = _validate_policy_session_admin_or_superuser(session_id)
actor = payload.actor or username
try:
summary = service_import_published_artifact(
artifact=payload.artifact,
actor=actor,
activate=payload.activate,
)
return PolicyImportResponse(
world_id=summary.world_id,
client_profile=summary.client_profile or None,
activate=summary.activate,
item_count=summary.item_count,
imported_count=summary.imported_count,
updated_count=summary.updated_count,
skipped_count=summary.skipped_count,
error_count=summary.error_count,
activated_count=summary.activated_count,
activation_skipped_count=summary.activation_skipped_count,
manifest_hash=summary.manifest_hash,
items_hash=summary.items_hash,
artifact_hash=summary.artifact_hash,
variants_hash=summary.variants_hash,
entries=[
PolicyImportEntryResponse(
policy_id=entry.policy_id,
variant=entry.variant,
action=entry.action,
detail=entry.detail,
)
for entry in summary.entries
],
)
except PolicyServiceError as error:
return _error_response(error)
return api