"""Runtime-facing policy resolution helpers.
This module resolves effective policy payloads from canonical Layer 3
activation pointers only. It intentionally does not map legacy file paths.
"""
from __future__ import annotations
from typing import Any
from pipeworks_ipc import compute_payload_hash
from mud_server.db import policy_repo
from .activation import get_effective_policy_variant, resolve_effective_policy_activations
from .errors import PolicyServiceError
from .types import ActivationScope, EffectiveAxisBundle, EffectiveImagePolicyBundle
from .utils import ensure_world_exists, resolve_positive_int_version
from .validation import parse_policy_id
[docs]
def resolve_effective_prompt_template(
*,
scope: ActivationScope,
preferred_policy_id: str | None = None,
) -> dict[str, str]:
"""Resolve effective canonical ``prompt`` policy for one scope."""
ensure_world_exists(scope.world_id)
selected_policy_id_hint: str | None = None
if preferred_policy_id:
identity = parse_policy_id(preferred_policy_id)
if identity.policy_type != "prompt":
raise PolicyServiceError(
status_code=422,
code="POLICY_PROMPT_SELECTOR_INVALID",
detail=(
"Configured prompt policy selector must reference a prompt policy_id; "
f"got {preferred_policy_id!r}."
),
)
selected_policy_id_hint = preferred_policy_id
effective_rows = resolve_effective_policy_activations(scope=scope)
prompt_rows = [
row for row in effective_rows if str(row.get("policy_id", "")).startswith("prompt:")
]
if not prompt_rows:
raise PolicyServiceError(
status_code=404,
code="POLICY_EFFECTIVE_PROMPT_NOT_FOUND",
detail=(
"No effective prompt activation found for scope "
f"(world_id={scope.world_id!r}, client_profile={scope.client_profile!r})."
),
)
selected_activation: dict[str, Any] | None = None
if selected_policy_id_hint is not None:
for row in prompt_rows:
if str(row["policy_id"]) == selected_policy_id_hint:
selected_activation = row
break
if selected_activation is None:
raise PolicyServiceError(
status_code=404,
code="POLICY_EFFECTIVE_PROMPT_NOT_FOUND",
detail=(
"Configured prompt selector does not resolve to an effective prompt "
f"activation (policy_id={preferred_policy_id!r})."
),
)
elif len(prompt_rows) == 1:
selected_activation = prompt_rows[0]
else:
raise PolicyServiceError(
status_code=409,
code="POLICY_EFFECTIVE_PROMPT_AMBIGUOUS",
detail=(
"Multiple effective prompt activations are present. Configure "
"prompt_policy_id explicitly."
),
)
selected_policy_id = str(selected_activation["policy_id"])
selected_variant = str(selected_activation["variant"])
selected_policy = policy_repo.get_policy(policy_id=selected_policy_id, variant=selected_variant)
if selected_policy is None:
raise PolicyServiceError(
status_code=409,
code="POLICY_EFFECTIVE_REFERENCE_MISSING",
detail=(
"Effective prompt activation points to missing policy variant: "
f"{selected_policy_id}:{selected_variant}"
),
)
content = selected_policy.get("content")
if not isinstance(content, dict):
raise PolicyServiceError(
status_code=422,
code="POLICY_EFFECTIVE_PROMPT_INVALID",
detail="Effective prompt content must be an object.",
)
content_text = content.get("text")
if not isinstance(content_text, str) or not content_text.strip():
raise PolicyServiceError(
status_code=422,
code="POLICY_EFFECTIVE_PROMPT_INVALID",
detail="Effective prompt content.text must be a non-empty string.",
)
return {
"policy_id": selected_policy_id,
"variant": selected_variant,
"namespace": str(selected_policy.get("namespace", "")),
"policy_key": str(selected_policy.get("policy_key", "")),
"content_text": content_text,
"content_hash": str(selected_policy.get("content_hash", "")),
}
def _get_effective_manifest_payload(
*, scope: ActivationScope
) -> tuple[dict[str, Any], dict[str, Any], str]:
"""Resolve and validate effective manifest payload for a scope.
Returns:
``(manifest_payload, manifest_row, manifest_policy_id)``.
Raises:
PolicyServiceError: If activation is missing or manifest payload shape
is invalid.
"""
manifest_policy_id = f"manifest_bundle:world.manifests:{scope.world_id}"
manifest_row = get_effective_policy_variant(scope=scope, policy_id=manifest_policy_id)
if manifest_row is None:
raise PolicyServiceError(
status_code=404,
code="POLICY_EFFECTIVE_MANIFEST_NOT_FOUND",
detail=(
"No effective manifest bundle activation found for scope "
f"(world_id={scope.world_id!r}, client_profile={scope.client_profile!r})."
),
)
manifest_content = manifest_row.get("content")
if not isinstance(manifest_content, dict):
raise PolicyServiceError(
status_code=409,
code="POLICY_EFFECTIVE_MANIFEST_INVALID",
detail="Effective manifest content must be an object.",
)
manifest_payload = manifest_content.get("manifest")
if not isinstance(manifest_payload, dict):
raise PolicyServiceError(
status_code=409,
code="POLICY_EFFECTIVE_MANIFEST_INVALID",
detail="Effective manifest content missing object field: content.manifest.",
)
return manifest_payload, manifest_row, manifest_policy_id
[docs]
def resolve_effective_axis_bundle(*, scope: ActivationScope) -> EffectiveAxisBundle:
"""Resolve canonical manifest+axis bundle payloads for runtime callers."""
ensure_world_exists(scope.world_id)
manifest_payload, manifest_row, manifest_policy_id = _get_effective_manifest_payload(
scope=scope
)
axis_active_bundle = (manifest_payload.get("axis") or {}).get("active_bundle")
if not isinstance(axis_active_bundle, dict):
raise PolicyServiceError(
status_code=409,
code="POLICY_EFFECTIVE_MANIFEST_INVALID",
detail="Manifest field axis.active_bundle must be an object.",
)
bundle_id = str(axis_active_bundle.get("id", "")).strip()
if not bundle_id:
raise PolicyServiceError(
status_code=409,
code="POLICY_EFFECTIVE_MANIFEST_INVALID",
detail="Manifest field axis.active_bundle.id must be a non-empty string.",
)
bundle_version_int = resolve_positive_int_version(
value=axis_active_bundle.get("version"),
default=1,
context="manifest axis.active_bundle.version",
)
bundle_version = str(bundle_version_int)
expected_axis_variant = f"v{bundle_version_int}"
axis_policy_id = f"axis_bundle:axis.bundles:{bundle_id}"
axis_row = get_effective_policy_variant(scope=scope, policy_id=axis_policy_id)
if axis_row is None:
raise PolicyServiceError(
status_code=404,
code="POLICY_EFFECTIVE_AXIS_BUNDLE_NOT_FOUND",
detail=(
"No effective axis bundle activation found for manifest-selected bundle "
f"{bundle_id!r} in scope (world_id={scope.world_id!r}, "
f"client_profile={scope.client_profile!r})."
),
)
axis_variant = str(axis_row.get("variant", ""))
if axis_variant != expected_axis_variant:
raise PolicyServiceError(
status_code=409,
code="POLICY_EFFECTIVE_AXIS_BUNDLE_VERSION_MISMATCH",
detail=(
"Manifest selected axis bundle variant "
f"{expected_axis_variant!r}, but activation points to {axis_variant!r} "
f"for policy_id={axis_policy_id!r}."
),
)
axis_content = axis_row.get("content")
if not isinstance(axis_content, dict):
raise PolicyServiceError(
status_code=409,
code="POLICY_EFFECTIVE_AXIS_BUNDLE_INVALID",
detail="Effective axis bundle content must be an object.",
)
axes_payload = axis_content.get("axes")
thresholds_payload = axis_content.get("thresholds")
resolution_payload = axis_content.get("resolution")
if not isinstance(axes_payload, dict):
raise PolicyServiceError(
status_code=409,
code="POLICY_EFFECTIVE_AXIS_BUNDLE_INVALID",
detail="Effective axis bundle field content.axes must be an object.",
)
if not isinstance(thresholds_payload, dict):
raise PolicyServiceError(
status_code=409,
code="POLICY_EFFECTIVE_AXIS_BUNDLE_INVALID",
detail="Effective axis bundle field content.thresholds must be an object.",
)
if not isinstance(resolution_payload, dict):
raise PolicyServiceError(
status_code=409,
code="POLICY_EFFECTIVE_AXIS_BUNDLE_INVALID",
detail="Effective axis bundle field content.resolution must be an object.",
)
required_runtime_inputs_raw = (
(manifest_payload.get("image") or {}).get("composition") or {}
).get("required_runtime_inputs") or []
required_runtime_inputs = (
{
str(item).strip()
for item in required_runtime_inputs_raw
if isinstance(item, str) and str(item).strip()
}
if isinstance(required_runtime_inputs_raw, list)
else set()
)
policy_hash = str(
compute_payload_hash(
{
"manifest": manifest_payload,
"axis_bundle": {
"axes": axes_payload,
"thresholds": thresholds_payload,
"resolution": resolution_payload,
},
}
)
)
return EffectiveAxisBundle(
manifest_policy_id=manifest_policy_id,
manifest_variant=str(manifest_row["variant"]),
axis_policy_id=axis_policy_id,
axis_variant=axis_variant,
bundle_id=bundle_id,
bundle_version=bundle_version,
manifest_payload=manifest_payload,
axes_payload=axes_payload,
thresholds_payload=thresholds_payload,
resolution_payload=resolution_payload,
required_runtime_inputs=required_runtime_inputs,
policy_hash=policy_hash,
)
def _nested_get(payload: dict[str, Any], path_keys: list[str]) -> Any:
"""Return nested value from mapping path, or ``None`` if any key is missing."""
current: Any = payload
for key in path_keys:
if not isinstance(current, dict):
return None
current = current.get(key)
return current
def _manifest_policy_id_hint(
*,
manifest_payload: dict[str, Any],
image_node_key: str,
policy_type: str,
namespace: str,
) -> str | None:
"""Build preferred policy id from one manifest image node ``id``/``version``."""
node = _nested_get(manifest_payload, ["image", image_node_key])
if not isinstance(node, dict):
return None
raw_id = node.get("id")
if not isinstance(raw_id, str) or not raw_id.strip():
return None
policy_key = raw_id.strip()
version_value = node.get("version")
if isinstance(version_value, int) and version_value >= 1:
suffix = f"_v{version_value}"
if policy_key.endswith(suffix):
policy_key = policy_key[: -len(suffix)]
return f"{policy_type}:{namespace}:{policy_key}"
def _select_effective_row(
*,
effective_rows: list[dict[str, Any]],
scope: ActivationScope,
policy_type: str,
preferred_policy_id: str | None,
) -> tuple[dict[str, Any] | None, str | None]:
"""Select one effective activation row for a policy type.
Returns:
``(selected_row, diagnostic_error)`` where ``diagnostic_error`` is a
human-readable issue string for diagnostic surfaces.
"""
candidates = [
row for row in effective_rows if str(row.get("policy_id", "")).startswith(f"{policy_type}:")
]
if preferred_policy_id:
for row in candidates:
if str(row.get("policy_id")) == preferred_policy_id:
return row, None
return (
None,
(
f"missing effective {policy_type} activation for preferred policy_id "
f"{preferred_policy_id!r} in scope "
f"(world_id={scope.world_id!r}, client_profile={scope.client_profile!r})"
),
)
if not candidates:
return (
None,
(
f"missing effective {policy_type} activation in scope "
f"(world_id={scope.world_id!r}, client_profile={scope.client_profile!r})"
),
)
if len(candidates) > 1:
return (
None,
(
f"ambiguous effective {policy_type} activation set in scope "
f"(world_id={scope.world_id!r}, client_profile={scope.client_profile!r}); "
f"found {len(candidates)}"
),
)
return candidates[0], None
def _resolve_effective_policy_payload(
*,
effective_rows: list[dict[str, Any]],
scope: ActivationScope,
policy_type: str,
preferred_policy_id: str | None,
) -> tuple[dict[str, Any] | None, str | None]:
"""Resolve one effective policy payload for diagnostic image-bundle reads."""
selected_row, error = _select_effective_row(
effective_rows=effective_rows,
scope=scope,
policy_type=policy_type,
preferred_policy_id=preferred_policy_id,
)
if selected_row is None:
return None, error
policy_id = str(selected_row.get("policy_id") or "")
variant = str(selected_row.get("variant") or "")
row = policy_repo.get_policy(policy_id=policy_id, variant=variant)
if row is None:
return (
None,
("effective activation points to missing policy variant: " f"{policy_id}:{variant}"),
)
content = row.get("content")
if not isinstance(content, dict):
return None, f"policy content is not an object: {policy_id}:{variant}"
return content, None
[docs]
def resolve_effective_image_policy_bundle(*, scope: ActivationScope) -> EffectiveImagePolicyBundle:
"""Resolve image-policy diagnostic bundle from canonical DB activations.
This keeps the existing route response shape while changing the source of
truth from filesystem manifest parsing to canonical DB policy rows.
"""
ensure_world_exists(scope.world_id)
manifest_payload, _manifest_row, _manifest_policy_id = _get_effective_manifest_payload(
scope=scope
)
composition_order = _nested_get(manifest_payload, ["image", "composition", "order"])
required_inputs = _nested_get(
manifest_payload, ["image", "composition", "required_runtime_inputs"]
)
missing_components: list[str] = []
if not isinstance(composition_order, list):
missing_components.append("manifest field missing/invalid: image.composition.order")
composition_order = []
if not isinstance(required_inputs, list):
missing_components.append(
"manifest field missing/invalid: image.composition.required_runtime_inputs"
)
required_inputs = []
effective_rows = resolve_effective_policy_activations(scope=scope)
descriptor_policy_hint = _manifest_policy_id_hint(
manifest_payload=manifest_payload,
image_node_key="descriptor_layer",
policy_type="descriptor_layer",
namespace="image.descriptors",
)
tone_policy_hint = _manifest_policy_id_hint(
manifest_payload=manifest_payload,
image_node_key="tone_profile",
policy_type="tone_profile",
namespace="image.tone_profiles",
)
descriptor_payload, descriptor_error = _resolve_effective_policy_payload(
effective_rows=effective_rows,
scope=scope,
policy_type="descriptor_layer",
preferred_policy_id=descriptor_policy_hint,
)
if descriptor_error:
missing_components.append(f"descriptor_layer: {descriptor_error}")
tone_payload, tone_error = _resolve_effective_policy_payload(
effective_rows=effective_rows,
scope=scope,
policy_type="tone_profile",
preferred_policy_id=tone_policy_hint,
)
if tone_error:
missing_components.append(f"tone_profile: {tone_error}")
species_registry_payload, species_error = _resolve_effective_policy_payload(
effective_rows=effective_rows,
scope=scope,
policy_type="registry",
preferred_policy_id="registry:image.registries:species_registry",
)
if species_error:
missing_components.append(f"species_registry: {species_error}")
clothing_registry_payload, clothing_error = _resolve_effective_policy_payload(
effective_rows=effective_rows,
scope=scope,
policy_type="registry",
preferred_policy_id="registry:image.registries:clothing_registry",
)
if clothing_error:
missing_components.append(f"clothing_registry: {clothing_error}")
axis_payload: dict[str, Any] = {}
try:
resolved_axis = resolve_effective_axis_bundle(scope=scope)
axis_payload = {
"axes": resolved_axis.axes_payload,
"thresholds": resolved_axis.thresholds_payload,
"resolution": resolved_axis.resolution_payload,
}
except PolicyServiceError as error:
missing_components.append(f"axis_bundle: {error.detail}")
policy_hash = str(
compute_payload_hash(
{
"manifest": manifest_payload,
"axis_bundle": axis_payload,
"descriptor_layer_payload": descriptor_payload,
"tone_profile_payload": tone_payload,
"species_registry_payload": species_registry_payload,
"clothing_registry_payload": clothing_registry_payload,
"composition_order": [str(value) for value in composition_order],
"required_runtime_inputs": [str(value) for value in required_inputs],
"missing_components": list(missing_components),
}
)
)
return EffectiveImagePolicyBundle(
world_id=scope.world_id,
policy_schema=(
str(manifest_payload.get("policy_schema"))
if manifest_payload.get("policy_schema") is not None
else None
),
policy_bundle_id=(
str(_nested_get(manifest_payload, ["policy_bundle", "id"]))
if _nested_get(manifest_payload, ["policy_bundle", "id"]) is not None
else None
),
policy_bundle_version=_nested_get(manifest_payload, ["policy_bundle", "version"]),
policy_hash=policy_hash,
composition_order=[str(value) for value in composition_order],
required_runtime_inputs=[str(value) for value in required_inputs],
descriptor_layer_path=(
str(_nested_get(manifest_payload, ["image", "descriptor_layer", "path"]))
if _nested_get(manifest_payload, ["image", "descriptor_layer", "path"]) is not None
else None
),
tone_profile_path=(
str(_nested_get(manifest_payload, ["image", "tone_profile", "path"]))
if _nested_get(manifest_payload, ["image", "tone_profile", "path"]) is not None
else None
),
species_registry_path=(
str(_nested_get(manifest_payload, ["image", "registries", "species"]))
if _nested_get(manifest_payload, ["image", "registries", "species"]) is not None
else None
),
clothing_registry_path=(
str(_nested_get(manifest_payload, ["image", "registries", "clothing"]))
if _nested_get(manifest_payload, ["image", "registries", "clothing"]) is not None
else None
),
missing_components=missing_components,
)