Source code for mud_server.api.routes.lab

"""Lab endpoints for the Axis Descriptor Lab research tool.

These endpoints expose the world's OOC→IC translation pipeline directly via
a JSON API, bypassing the game engine's character DB lookup.  They are
intended exclusively for use by the Axis Descriptor Lab — a single-user
research tool for testing IC prompt behaviour against deterministic axis
payloads.

Auth
----
Session-based — same mechanism as all other server endpoints.  Admin or
superuser role is required.  The lab logs in interactively via its UI; no
credentials are stored anywhere on disk.

Endpoints
---------
GET  /api/lab/worlds
    List all active worlds with a flag indicating whether the translation
    layer is enabled.  Used to populate the lab's world-selector dropdown.

GET  /api/lab/world-config/{world_id}
    Return the translation layer configuration for a specific world (model,
    active_axes, strict_mode, etc.).  Used by the lab UI to reflect what the
    server will actually apply to a translation request.

GET  /api/lab/world-image-policy-bundle/{world_id}
    Return the DB-resolved image policy bundle (composition order, runtime
    input requirements, and manifest-derived reference metadata).

POST /api/lab/compile-image-prompt
    Compile a deterministic image prompt from DB-resolved policy assets
    and runtime inputs (species, gender, axes, optional context signals).

POST /api/lab/translate
    Translate an OOC message to IC dialogue using the world's canonical
    pipeline.  Accepts raw axis values — no character DB lookup is
    performed.  Returns the IC text, outcome status, the server-formatted
    profile_summary, and the fully-rendered system prompt sent to Ollama.
"""

from __future__ import annotations

import logging
from typing import Any

from fastapi import APIRouter, HTTPException
from pipeworks_ipc import compute_payload_hash

from mud_server.api.models import (
    LabImageCompileRequest,
    LabImageCompileResponse,
    LabImagePolicyBundleResponse,
    LabTranslateRequest,
    LabTranslateResponse,
    LabWorldConfig,
    LabWorldsResponse,
    LabWorldSummary,
)
from mud_server.api.routes.lab_support import (
    build_lab_world_config,
    get_lab_world,
    require_lab_session,
    require_translation_world,
)
from mud_server.core.engine import GameEngine
from mud_server.services import policy_service

logger = logging.getLogger(__name__)


def _build_image_policy_bundle(world_id: str) -> LabImagePolicyBundleResponse:
    """Build one DB-first image policy bundle response for diagnostic clients."""
    scope = policy_service.ActivationScope(world_id=world_id, client_profile="")
    try:
        resolved = policy_service.resolve_effective_image_policy_bundle(scope=scope)
    except policy_service.PolicyServiceError as error:
        raise HTTPException(status_code=error.status_code, detail=error.detail) from error

    return LabImagePolicyBundleResponse(
        world_id=resolved.world_id,
        policy_schema=resolved.policy_schema,
        policy_bundle_id=resolved.policy_bundle_id,
        policy_bundle_version=resolved.policy_bundle_version,
        policy_hash=resolved.policy_hash,
        composition_order=resolved.composition_order,
        required_runtime_inputs=resolved.required_runtime_inputs,
        descriptor_layer_path=resolved.descriptor_layer_path,
        tone_profile_path=resolved.tone_profile_path,
        species_registry_path=resolved.species_registry_path,
        clothing_registry_path=resolved.clothing_registry_path,
        missing_components=resolved.missing_components,
    )


def _compile_image_prompt(
    req: LabImageCompileRequest,
) -> LabImageCompileResponse:
    """Compile one deterministic image prompt from canonical DB policy objects.

    The compiler is intentionally policy-driven and deterministic:

    1. Resolve effective manifest + Layer 2 policy rows from activation state.
    2. Select species/clothing block references using registry rules.
    3. Resolve Layer 1 block text from canonical DB variants.
    4. Assemble the prompt in manifest-defined composition order.
    5. Return selection metadata and deterministic provenance hashes.

    Raises:
        HTTPException: When required canonical policy rows are missing or
            selection cannot produce required blocks.
    """
    scope = policy_service.ActivationScope(world_id=req.world_id, client_profile="")
    manifest_payload = _resolve_effective_manifest_payload(scope=scope)
    composition_order, required_runtime_inputs = _extract_manifest_image_contract(manifest_payload)

    descriptor_preferred_policy_id = _manifest_policy_id_hint(
        manifest_payload=manifest_payload,
        image_node_key="descriptor_layer",
        policy_type="descriptor_layer",
        namespace="image.descriptors",
    )
    tone_preferred_policy_id = _manifest_policy_id_hint(
        manifest_payload=manifest_payload,
        image_node_key="tone_profile",
        policy_type="tone_profile",
        namespace="image.tone_profiles",
    )

    descriptor_row, descriptor_activation = _resolve_effective_policy_row(
        scope=scope,
        policy_type="descriptor_layer",
        preferred_policy_id=descriptor_preferred_policy_id,
    )
    tone_row, tone_activation = _resolve_effective_policy_row(
        scope=scope,
        policy_type="tone_profile",
        preferred_policy_id=tone_preferred_policy_id,
    )
    species_registry_row, _ = _resolve_effective_policy_row(
        scope=scope,
        policy_type="registry",
        preferred_policy_id="registry:image.registries:species_registry",
    )
    clothing_registry_row, _ = _resolve_effective_policy_row(
        scope=scope,
        policy_type="registry",
        preferred_policy_id="registry:image.registries:clothing_registry",
    )

    descriptor_layer_text = (descriptor_row.get("content") or {}).get("text")
    tone_profile_payload = tone_row.get("content")
    species_registry = species_registry_row.get("content")
    clothing_registry = clothing_registry_row.get("content")

    if not isinstance(descriptor_layer_text, str) or not descriptor_layer_text.strip():
        raise HTTPException(
            status_code=409,
            detail=(
                "Descriptor layer content missing canonical text payload: "
                f"{descriptor_activation['policy_id']}:{descriptor_activation['variant']}"
            ),
        )
    if not isinstance(tone_profile_payload, dict):
        raise HTTPException(
            status_code=409, detail="Tone profile payload missing in policy bundle."
        )
    if not isinstance(species_registry, dict):
        raise HTTPException(
            status_code=409, detail="Species registry payload missing in policy bundle."
        )
    if not isinstance(clothing_registry, dict):
        raise HTTPException(
            status_code=409, detail="Clothing registry payload missing in policy bundle."
        )

    axis_labels = {axis_name: axis_value.label for axis_name, axis_value in req.axes.items()}
    _validate_compile_runtime_inputs(
        required_runtime_inputs=required_runtime_inputs,
        species=req.species,
        gender=req.gender,
        axes=req.axes,
    )
    selected_species_entry = _select_species_entry(
        species_registry=species_registry,
        species=req.species,
        gender=req.gender,
    )
    if selected_species_entry is None:
        raise HTTPException(
            status_code=409,
            detail=(
                f"No active species block matched species={req.species!r} and "
                f"gender={req.gender!r}."
            ),
        )

    species_block_text = _resolve_registry_entry_policy_text(
        scope=scope,
        entry=selected_species_entry,
        allowed_policy_types={"species_block"},
    )

    selected_clothing_profile_id = _extract_clothing_profile_id(clothing_registry)
    selected_clothing_slots, clothing_blocks = _select_clothing_blocks(
        clothing_registry=clothing_registry,
        scope=scope,
        gender=req.gender,
        axis_labels=axis_labels,
        world_context=req.world_context,
        occupation_signals=req.occupation_signals,
    )

    tone_block_text = _render_tone_profile_block(tone_profile_payload)
    compiled_prompt = _assemble_compiled_prompt(
        composition_order=composition_order,
        species_block_text=species_block_text,
        descriptor_layer_text=descriptor_layer_text,
        clothing_block_text="\n".join(clothing_blocks).strip(),
        tone_block_text=tone_block_text,
    )

    # Hash policy/compiler inputs only (no runtime axis/gender in policy hash).
    policy_hash = compute_payload_hash(
        {
            "manifest": manifest_payload,
            "descriptor_layer_text": descriptor_layer_text,
            "tone_profile_payload": tone_profile_payload,
            "descriptor_policy_id": descriptor_activation["policy_id"],
            "descriptor_variant": descriptor_activation["variant"],
            "tone_policy_id": tone_activation["policy_id"],
            "tone_variant": tone_activation["variant"],
            "composition_order": composition_order,
            "selected_blocks": {
                "species": {
                    "id": selected_species_entry.get("id"),
                    "content": species_block_text,
                },
                "clothing_profile_id": selected_clothing_profile_id,
                "clothing_slots": selected_clothing_slots,
                "clothing_block_texts": clothing_blocks,
            },
        }
    )
    axis_hash = compute_payload_hash(
        {
            axis_name: {"label": axis_value.label, "score": axis_value.score}
            for axis_name, axis_value in sorted(req.axes.items(), key=lambda item: item[0])
        }
    )

    descriptor_id = _nested_get(manifest_payload, ["image", "descriptor_layer", "id"]) or str(
        descriptor_activation["policy_id"]
    )
    tone_id = _nested_get(manifest_payload, ["image", "tone_profile", "id"]) or str(
        tone_activation["policy_id"]
    )
    policy_schema = _nested_get(manifest_payload, ["policy_schema"])
    bundle_id = _nested_get(manifest_payload, ["policy_bundle", "id"])
    bundle_version = _nested_get(manifest_payload, ["policy_bundle", "version"])

    return LabImageCompileResponse(
        world_id=req.world_id,
        policy_schema=str(policy_schema) if policy_schema is not None else None,
        policy_bundle_id=str(bundle_id) if bundle_id is not None else None,
        policy_bundle_version=(str(bundle_version) if bundle_version is not None else None),
        policy_hash=policy_hash,
        axis_hash=axis_hash,
        required_runtime_inputs=required_runtime_inputs,
        selected_descriptor_layer_id=str(descriptor_id) if descriptor_id is not None else None,
        selected_tone_profile_id=str(tone_id) if tone_id is not None else None,
        selected_species_block_id=str(selected_species_entry.get("id") or ""),
        selected_clothing_profile_id=selected_clothing_profile_id,
        selected_clothing_slot_ids=selected_clothing_slots,
        compiled_prompt=compiled_prompt,
        generation_defaults={
            "model_id": req.model_id or "flux-2-klein-4b",
            "aspect_ratio": req.aspect_ratio or "1:1",
            "seed": req.seed,
        },
        missing_components=[],
    )


def _resolve_effective_manifest_payload(*, scope: policy_service.ActivationScope) -> dict[str, Any]:
    """Resolve manifest payload from canonical effective activation state."""
    manifest_policy_id = f"manifest_bundle:world.manifests:{scope.world_id}"
    try:
        manifest_row = policy_service.get_effective_policy_variant(
            scope=scope,
            policy_id=manifest_policy_id,
        )
    except policy_service.PolicyServiceError as error:
        raise HTTPException(status_code=error.status_code, detail=error.detail) from error

    if manifest_row is None:
        raise HTTPException(
            status_code=409,
            detail=(
                "No effective manifest bundle activation found for compile scope "
                f"(world_id={scope.world_id!r}, client_profile={scope.client_profile!r})."
            ),
        )

    manifest_payload = (manifest_row.get("content") or {}).get("manifest")
    if not isinstance(manifest_payload, dict):
        raise HTTPException(
            status_code=409,
            detail=(
                "Effective manifest bundle row is missing content.manifest object: "
                f"{manifest_policy_id}:{manifest_row.get('variant')}"
            ),
        )
    return manifest_payload


def _extract_manifest_image_contract(
    manifest_payload: dict[str, Any],
) -> tuple[list[str], list[str]]:
    """Extract image composition contract fields from manifest payload."""
    composition = _nested_get(manifest_payload, ["image", "composition"])
    if not isinstance(composition, dict):
        raise HTTPException(
            status_code=409,
            detail="Manifest content missing image.composition object for compile.",
        )
    composition_order = composition.get("order")
    if not isinstance(composition_order, list) or not composition_order:
        raise HTTPException(
            status_code=409,
            detail="Manifest image.composition.order must be a non-empty list.",
        )
    required_runtime_inputs = composition.get("required_runtime_inputs")
    if not isinstance(required_runtime_inputs, list):
        raise HTTPException(
            status_code=409,
            detail="Manifest image.composition.required_runtime_inputs must be a list.",
        )
    return [str(value) for value in composition_order], [
        str(value) for value in required_runtime_inputs
    ]


def _manifest_policy_id_hint(
    *,
    manifest_payload: dict[str, Any],
    image_node_key: str,
    policy_type: str,
    namespace: str,
) -> str | None:
    """Build preferred canonical policy_id from one manifest image node."""
    node = _nested_get(manifest_payload, ["image", image_node_key])
    if not isinstance(node, dict):
        return None
    raw_id = node.get("id")
    if not isinstance(raw_id, str) or not raw_id.strip():
        return None
    policy_key = raw_id.strip()
    version_value = node.get("version")
    if isinstance(version_value, int) and version_value >= 1:
        suffix = f"_v{version_value}"
        if policy_key.endswith(suffix):
            policy_key = policy_key[: -len(suffix)]
    return f"{policy_type}:{namespace}:{policy_key}"


def _resolve_effective_policy_row(
    *,
    scope: policy_service.ActivationScope,
    policy_type: str,
    preferred_policy_id: str | None = None,
) -> tuple[dict[str, Any], dict[str, Any]]:
    """Resolve one effective activated policy row for a requested policy type."""
    try:
        effective_rows = policy_service.resolve_effective_policy_activations(scope=scope)
    except policy_service.PolicyServiceError as error:
        raise HTTPException(status_code=error.status_code, detail=error.detail) from error

    candidates = [
        row for row in effective_rows if str(row.get("policy_id", "")).startswith(f"{policy_type}:")
    ]
    if preferred_policy_id:
        for row in candidates:
            if str(row.get("policy_id")) == preferred_policy_id:
                selected_row = row
                break
        else:
            raise HTTPException(
                status_code=409,
                detail=(
                    f"Preferred active {policy_type} policy is not activated for scope "
                    f"(world_id={scope.world_id!r}, client_profile={scope.client_profile!r}): "
                    f"{preferred_policy_id}."
                ),
            )
    else:
        if len(candidates) != 1:
            raise HTTPException(
                status_code=409,
                detail=(
                    f"Expected exactly one active {policy_type} policy for scope "
                    f"(world_id={scope.world_id!r}, client_profile={scope.client_profile!r}); "
                    f"found {len(candidates)}."
                ),
            )
        selected_row = candidates[0]

    policy_id = str(selected_row.get("policy_id") or "")
    variant = str(selected_row.get("variant") or "")
    try:
        policy_row = policy_service.get_policy(policy_id=policy_id, variant=variant)
    except policy_service.PolicyServiceError as error:
        raise HTTPException(status_code=error.status_code, detail=error.detail) from error
    return policy_row, selected_row


def _validate_compile_runtime_inputs(
    *,
    required_runtime_inputs: list[str],
    species: str,
    gender: str,
    axes: dict[str, Any],
) -> None:
    """Validate required runtime inputs declared by manifest composition contract."""
    missing: list[str] = []
    required = set(required_runtime_inputs)

    if "entity.species" in required and not str(species).strip():
        missing.append("entity.species")
    if "entity.identity.gender" in required and not str(gender).strip():
        missing.append("entity.identity.gender")
    if "entity.axes" in required and not axes:
        missing.append("entity.axes")

    if missing:
        raise HTTPException(
            status_code=409,
            detail=("Missing required runtime inputs for compile: " + ", ".join(sorted(missing))),
        )


def _select_species_entry(
    *, species_registry: dict[str, Any], species: str, gender: str
) -> dict[str, Any] | None:
    """Select one active species registry entry deterministically."""
    entries = species_registry.get("entries") or []
    if not isinstance(entries, list):
        return None

    candidates: list[dict[str, Any]] = []
    for entry in entries:
        if not isinstance(entry, dict):
            continue
        if str(entry.get("status", "")).lower() != "active":
            continue
        if str(entry.get("block_type", "")) != "species":
            continue
        if not _matches_species(entry, species):
            continue
        if not _matches_gender(entry, gender):
            continue
        if not _matches_species_rule(entry, species):
            continue
        candidates.append(entry)

    if not candidates:
        return None
    return sorted(
        candidates,
        key=lambda item: (
            -int(item.get("render_priority", 0)),
            str(item.get("id", "")),
        ),
    )[0]


def _select_clothing_blocks(
    *,
    clothing_registry: dict[str, Any],
    scope: policy_service.ActivationScope,
    gender: str,
    axis_labels: dict[str, str],
    world_context: list[str],
    occupation_signals: list[str],
) -> tuple[dict[str, str | None], list[str]]:
    """Select one clothing block per slot and return ids + block texts."""
    slots = clothing_registry.get("slots") or {}
    if not isinstance(slots, dict):
        return {}, []

    selected_slot_ids: dict[str, str | None] = {}
    selected_block_texts: list[str] = []

    for slot_name in clothing_registry.get("composition_contract", {}).get("slots", []):
        slot_entries = slots.get(slot_name) or []
        selected = _select_clothing_slot_entry(
            slot_entries=slot_entries,
            slot_name=str(slot_name),
            gender=gender,
            axis_labels=axis_labels,
            world_context=world_context,
            occupation_signals=occupation_signals,
        )
        if selected is None:
            selected_slot_ids[str(slot_name)] = None
            continue

        selected_slot_ids[str(slot_name)] = str(selected.get("id") or "")
        selected_block_texts.append(
            _resolve_registry_entry_policy_text(
                scope=scope,
                entry=selected,
                allowed_policy_types={"clothing_block"},
            )
        )

    return selected_slot_ids, selected_block_texts


def _select_clothing_slot_entry(
    *,
    slot_entries: Any,
    slot_name: str,
    gender: str,
    axis_labels: dict[str, str],
    world_context: list[str],
    occupation_signals: list[str],
) -> dict[str, Any] | None:
    """Select one clothing entry for a slot using deterministic match ordering."""
    if not isinstance(slot_entries, list):
        return None

    candidates: list[tuple[int, int, str, dict[str, Any]]] = []
    for entry in slot_entries:
        if not isinstance(entry, dict):
            continue
        if str(entry.get("status", "")).lower() != "active":
            continue
        if str(entry.get("block_type", "")) != "clothing_fragment":
            continue
        if not _matches_gender(entry, gender):
            continue

        matched, fallback = _matches_clothing_rules(
            entry=entry,
            slot_name=slot_name,
            axis_labels=axis_labels,
            world_context=world_context,
            occupation_signals=occupation_signals,
        )
        if not matched:
            continue

        match_score = 0 if fallback else 1
        candidates.append(
            (
                -match_score,
                -int(entry.get("render_priority", 0)),
                str(entry.get("id", "")),
                entry,
            )
        )

    if not candidates:
        return None
    candidates.sort()
    return candidates[0][3]


def _matches_clothing_rules(
    *,
    entry: dict[str, Any],
    slot_name: str,
    axis_labels: dict[str, str],
    world_context: list[str],
    occupation_signals: list[str],
) -> tuple[bool, bool]:
    """Evaluate clothing entry rules and return ``(matched, fallback_match)``."""
    rules = ((entry.get("selection_rules") or {}).get("when")) or {}
    if not isinstance(rules, dict):
        return True, False

    fallback = bool(rules.get("fallback", False))
    axis_rule_any = _nested_get(rules, ["axis_labels", "wealth_any"])
    if axis_rule_any is not None:
        axis_label = axis_labels.get("wealth")
        if axis_label not in set(axis_rule_any if isinstance(axis_rule_any, list) else []):
            return False, fallback

    world_any = rules.get("world_context_any")
    if world_any is not None:
        world_set = set(world_context)
        if not world_set.intersection(set(world_any if isinstance(world_any, list) else [])):
            return False, fallback

    occupation_any = rules.get("occupation_signal_any")
    if occupation_any is not None:
        occupation_set = set(occupation_signals)
        if not occupation_set.intersection(
            set(occupation_any if isinstance(occupation_any, list) else [])
        ):
            return False, fallback

    _ = slot_name  # Explicitly unused in v0; retained for future per-slot rule logic.
    return True, fallback


def _extract_clothing_profile_id(clothing_registry: dict[str, Any]) -> str | None:
    """Extract default clothing profile id from registry payload."""
    defaults = clothing_registry.get("defaults") or {}
    profile_id = defaults.get("profile_id")
    return str(profile_id) if isinstance(profile_id, str) else None


def _assemble_compiled_prompt(
    *,
    composition_order: list[str],
    species_block_text: str,
    descriptor_layer_text: str,
    clothing_block_text: str,
    tone_block_text: str,
) -> str:
    """Assemble final prompt in strict composition order."""
    block_map = {
        "species_canon_block": species_block_text.strip(),
        "descriptor_layer_output": descriptor_layer_text.strip(),
        "clothing_block": clothing_block_text.strip(),
        "tone_profile_block": tone_block_text.strip(),
    }
    blocks = [block_map.get(block_name, "") for block_name in composition_order]
    return "\n\n".join([block for block in blocks if block])


def _render_tone_profile_block(tone_profile_payload: dict[str, Any]) -> str:
    """Render a tone-profile block from JSON payload.

    The v0 implementation prefers ``prompt_block`` when present. Otherwise it
    composes one conservative sentence from a subset of known tone fields.
    """
    prompt_block = tone_profile_payload.get("prompt_block")
    if isinstance(prompt_block, str) and prompt_block.strip():
        return prompt_block

    linework = str(tone_profile_payload.get("linework_style") or "").strip()
    palette = str(tone_profile_payload.get("palette_descriptor") or "").strip()
    context = str(tone_profile_payload.get("presentation_context") or "").strip()
    phrases = [phrase for phrase in [linework, palette, context] if phrase]
    if not phrases:
        return ""
    return ". ".join(phrases).rstrip(".") + "."


def _resolve_registry_entry_policy_text(
    *,
    scope: policy_service.ActivationScope,
    entry: dict[str, Any],
    allowed_policy_types: set[str],
) -> str:
    """Resolve one registry-selected Layer 1 text block from canonical DB rows."""
    policy_reference = entry.get("policy_ref")
    if not isinstance(policy_reference, dict):
        raise HTTPException(
            status_code=409,
            detail=(
                "Registry entry missing canonical policy_ref mapping for scope "
                f"(world_id={scope.world_id!r}): {entry.get('id')!r}"
            ),
        )

    policy_id = str(policy_reference.get("policy_id") or "").strip()
    variant = str(policy_reference.get("variant") or "").strip()
    if not policy_id or not variant:
        raise HTTPException(
            status_code=409,
            detail=f"Registry entry has invalid policy_ref: {policy_reference!r}",
        )
    policy_type = policy_id.split(":", 1)[0]
    if policy_type not in allowed_policy_types:
        raise HTTPException(
            status_code=409,
            detail=(
                f"Registry entry policy_ref type must be one of {sorted(allowed_policy_types)}; "
                f"got {policy_type!r} ({policy_id}:{variant})."
            ),
        )

    try:
        row = policy_service.get_policy(policy_id=policy_id, variant=variant)
    except policy_service.PolicyServiceError as error:
        raise HTTPException(status_code=error.status_code, detail=error.detail) from error
    content = row.get("content")
    if not isinstance(content, dict):
        raise HTTPException(
            status_code=409,
            detail=f"Policy variant content must be an object: {policy_id}:{variant}",
        )
    text_value = content.get("text")
    if not isinstance(text_value, str) or not text_value.strip():
        raise HTTPException(
            status_code=409,
            detail=f"Policy variant content.text must be non-empty: {policy_id}:{variant}",
        )
    return text_value.strip()


def _matches_species(entry: dict[str, Any], species: str) -> bool:
    """Return whether species entry matches requested species."""
    values = entry.get("compatible_species") or []
    if isinstance(values, list) and values:
        return species in values
    return True


def _matches_gender(entry: dict[str, Any], gender: str) -> bool:
    """Return whether entry supports requested gender."""
    values = entry.get("compatible_genders") or []
    if isinstance(values, list) and values:
        return gender in values
    return True


def _matches_species_rule(entry: dict[str, Any], species: str) -> bool:
    """Return whether species selection rules allow the requested species."""
    when = ((entry.get("selection_rules") or {}).get("when")) or {}
    if not isinstance(when, dict):
        return True
    species_any = when.get("species_any")
    if species_any is None:
        return True
    if isinstance(species_any, list):
        return species in species_any
    return False


def _nested_get(payload: dict[str, Any], path_keys: list[str]) -> Any:
    """Get nested value from mapping path, returning ``None`` when missing."""
    current: Any = payload
    for key in path_keys:
        if not isinstance(current, dict):
            return None
        current = current.get(key)
    return current


[docs] def router(engine: GameEngine) -> APIRouter: """Build and return the lab API router. Args: engine: The live ``GameEngine`` instance, used to access the world registry and translation services. Returns: Configured ``APIRouter`` with all lab endpoints registered. """ api = APIRouter(prefix="/api/lab", tags=["lab"]) @api.get("/worlds", response_model=LabWorldsResponse) async def list_lab_worlds(session_id: str) -> LabWorldsResponse: """List all active worlds with translation-layer availability. Returns every active world known to the server, flagged with whether its translation layer is enabled. Used to populate the lab UI's world-selector dropdown. Requires admin or superuser role. """ require_lab_session(session_id) worlds_data = engine.world_registry.list_worlds() result: list[LabWorldSummary] = [] for row in worlds_data: wid = row.get("world_id") or row.get("id", "") translation_enabled = False try: world = get_lab_world(engine, wid) translation_enabled = world.translation_layer_enabled() except Exception: # World failed to load or is inactive — surface it with # translation_enabled=False so the lab can still show it. logger.debug("World %r: translation check skipped (load error)", wid) result.append( LabWorldSummary( world_id=wid, name=row.get("name", wid), translation_enabled=translation_enabled, ) ) return LabWorldsResponse(worlds=result) @api.get("/world-config/{world_id}", response_model=LabWorldConfig) async def get_world_config(world_id: str, session_id: str) -> LabWorldConfig: """Return the translation layer configuration for a world. Used by the lab UI to reflect the server's canonical active_axes, model, and validation settings without hardcoding them in the lab. Returns 404 if the world does not exist, is inactive, or has its translation layer disabled. Requires admin or superuser role. """ require_lab_session(session_id) world = get_lab_world(engine, world_id) service = require_translation_world(world, world_id) return build_lab_world_config(world_id, service) @api.get( "/world-image-policy-bundle/{world_id}", response_model=LabImagePolicyBundleResponse, ) async def get_world_image_policy_bundle( world_id: str, session_id: str ) -> LabImagePolicyBundleResponse: """Return one world's manifest-resolved image policy bundle. This endpoint is intended for integration clients that need the canonical image policy references and composition contract before calling prompt compilation/generation endpoints. """ require_lab_session(session_id) world = get_lab_world(engine, world_id) _ = world # Route still validates world availability via registry lookup. return _build_image_policy_bundle(world_id) @api.post("/compile-image-prompt", response_model=LabImageCompileResponse) async def compile_image_prompt(req: LabImageCompileRequest) -> LabImageCompileResponse: """Compile one deterministic image prompt from canonical policy assets.""" require_lab_session(req.session_id) world = get_lab_world(engine, req.world_id) _ = world # Route still validates world availability via registry lookup. return _compile_image_prompt(req) @api.post("/translate", response_model=LabTranslateResponse) async def lab_translate(req: LabTranslateRequest) -> LabTranslateResponse: """Translate an OOC message using the world's canonical pipeline. Accepts raw axis values from the lab — no character DB lookup is performed. The server filters the supplied axes to the world's ``active_axes``, builds the ``profile_summary`` in the server's canonical format, renders the system prompt, calls Ollama, and runs the validator. The response includes ``rendered_prompt`` so the lab can display exactly what was sent to Ollama, and ``world_config`` so the lab knows which axes and settings were applied. Returns 404 if the world is not found or inactive. Returns 503 if the world's translation layer is disabled. Requires admin or superuser role. """ require_lab_session(req.session_id) world = get_lab_world(engine, req.world_id) service = require_translation_world(world, req.world_id, status_code=503) axes_raw = {name: ax.model_dump() for name, ax in req.axes.items()} seed = req.seed if req.seed != -1 else None result = service.translate_with_axes( axes_raw, req.ooc_message, character_name=req.character_name, channel=req.channel, seed=seed, temperature=req.temperature, prompt_template_override=req.prompt_template_override, ) cfg = service.config world_config = build_lab_world_config(req.world_id, service) return LabTranslateResponse( ic_text=result.ic_text, status=result.status, profile_summary=result.profile_summary, rendered_prompt=result.rendered_prompt, prompt_template=result.prompt_template, model=cfg.model, world_config=world_config, ) return api