Source code for mud_server.services.policy.validation

"""Canonical policy validation and upsert workflows.

This module owns contract-level validation and validate-before-write behavior.
Storage repositories remain persistence-only; this layer enforces business
rules that define valid canonical policy objects.
"""

from __future__ import annotations

from typing import Any

from mud_server.db import policy_repo

from .constants import (
    _LAYER1_POLICY_TYPES,
    _LAYER2_POLICY_TYPES,
    _SPECIES_PILOT_NAMESPACE,
    _SPECIES_PILOT_POLICY_TYPE,
    _SUPPORTED_POLICY_TYPES,
    _SUPPORTED_STATUSES,
)
from .errors import PolicyServiceError
from .hashing import compute_content_hash
from .types import PolicyIdentity, PolicyValidationResult
from .utils import now_iso


[docs] def parse_policy_id(policy_id: str) -> PolicyIdentity: """Parse canonical ``policy_type:namespace:policy_key`` identity. Raises: PolicyServiceError: When the identity format is invalid or policy type is not supported by canonical policy APIs. """ parts = [part.strip() for part in policy_id.split(":", 2)] if len(parts) != 3 or any(not part for part in parts): raise PolicyServiceError( status_code=422, code="POLICY_ID_INVALID", detail="policy_id must be formatted as 'policy_type:namespace:policy_key'.", ) policy_type, namespace, policy_key = parts if policy_type not in _SUPPORTED_POLICY_TYPES: raise PolicyServiceError( status_code=422, code="POLICY_TYPE_UNSUPPORTED", detail=f"Unsupported policy_type: {policy_type!r}", ) return PolicyIdentity( policy_id=policy_id, policy_type=policy_type, namespace=namespace, policy_key=policy_key, )
[docs] def list_policies( *, policy_type: str | None, namespace: str | None, status: str | None, ) -> list[dict[str, Any]]: """List canonical policy variants with optional filter constraints.""" return policy_repo.list_policies(policy_type=policy_type, namespace=namespace, status=status)
[docs] def get_policy(*, policy_id: str, variant: str | None) -> dict[str, Any]: """Get a canonical policy row by id and optional variant selector.""" parse_policy_id(policy_id) row = policy_repo.get_policy(policy_id=policy_id, variant=variant) if row is None: raise PolicyServiceError( status_code=404, code="POLICY_NOT_FOUND", detail=f"Policy not found for id={policy_id!r} variant={variant!r}.", ) return row
[docs] def validate_policy_variant( *, policy_id: str, variant: str, schema_version: str, policy_version: int, status: str, content: dict[str, Any], validated_by: str, ) -> PolicyValidationResult: """Validate one policy payload and persist validation-run history.""" identity = parse_policy_id(policy_id) validated_at = now_iso() errors = _validate_common_fields( identity=identity, variant=variant, schema_version=schema_version, policy_version=policy_version, status=status, ) errors.extend(_validate_policy_type_content(identity=identity, content=content)) content_hash = compute_content_hash(policy_id=policy_id, variant=variant, content=content) is_valid = len(errors) == 0 validation_run_id = policy_repo.insert_validation_run( policy_id=policy_id, variant=variant, is_valid=is_valid, errors=errors, validated_at=validated_at, validated_by=validated_by, ) return PolicyValidationResult( policy_id=policy_id, variant=variant, is_valid=is_valid, errors=errors, content_hash=content_hash, validated_at=validated_at, validated_by=validated_by, validation_run_id=validation_run_id, )
[docs] def upsert_policy_variant( *, policy_id: str, variant: str, schema_version: str, policy_version: int, status: str, content: dict[str, Any], updated_by: str, ) -> dict[str, Any]: """Validate then upsert one canonical policy variant row. The write path enforces the invariant that invalid payloads never reach canonical variant storage. """ validation = validate_policy_variant( policy_id=policy_id, variant=variant, schema_version=schema_version, policy_version=policy_version, status=status, content=content, validated_by=updated_by, ) if not validation.is_valid: raise PolicyServiceError( status_code=422, code="POLICY_VALIDATION_ERROR", detail="; ".join(validation.errors), ) identity = parse_policy_id(policy_id) policy_repo.upsert_policy_item( policy_id=identity.policy_id, policy_type=identity.policy_type, namespace=identity.namespace, policy_key=identity.policy_key, ) return policy_repo.upsert_policy_variant( policy_id=policy_id, variant=variant, schema_version=schema_version, policy_version=policy_version, status=status, content=content, content_hash=validation.content_hash, updated_at=now_iso(), updated_by=updated_by, )
[docs] def is_policy_variant_unchanged( *, existing_row: dict[str, Any] | None, schema_version: str | None = None, policy_version: int, status: str, content: dict[str, Any], ) -> bool: """Return whether the existing variant row already matches provided payload.""" if existing_row is None: return False if schema_version is not None and str(existing_row.get("schema_version")) != schema_version: return False if int(existing_row.get("policy_version", 0)) != policy_version: return False if str(existing_row.get("status", "")) != status: return False existing_content = existing_row.get("content") if not isinstance(existing_content, dict): return False return existing_content == content
def _validate_common_fields( *, identity: PolicyIdentity, variant: str, schema_version: str, policy_version: int, status: str, ) -> list[str]: """Validate policy fields shared across all policy families.""" errors: list[str] = [] if not identity.namespace: errors.append("namespace must not be empty") if not identity.policy_key: errors.append("policy_key must not be empty") if not variant.strip(): errors.append("variant must not be empty") if not schema_version.strip(): errors.append("schema_version must not be empty") if policy_version < 1: errors.append("policy_version must be >= 1") if status not in _SUPPORTED_STATUSES: errors.append("status must be one of: draft, candidate, active, archived") return errors def _validate_slot_kinds_field(content: dict[str, Any]) -> list[str]: """Validate the optional ``content.slot_kinds`` snippet metadata field. The field is optional. When present it must be a non-empty list whose entries are non-empty strings after stripping. This metadata is consumed by the image-generator prompt composer to scope which snippets a given slot kind is allowed to pick. Snippets without the field default to the snippet's own namespace at API serialization time. """ if "slot_kinds" not in content: return [] raw = content.get("slot_kinds") if not isinstance(raw, list): return ["content.slot_kinds must be a list of strings when present"] if len(raw) == 0: return ["content.slot_kinds must not be empty when present"] cleaned: list[str] = [] for index, entry in enumerate(raw): if not isinstance(entry, str) or not entry.strip(): return [f"content.slot_kinds[{index}] must be a non-empty string"] cleaned.append(entry.strip()) return [] def _validate_policy_type_content( *, identity: PolicyIdentity, content: dict[str, Any] ) -> list[str]: """Validate policy-type-specific content invariants. Canonical authoring currently supports Layer 1 and Layer 2 families used by runtime and publish flows. """ errors: list[str] = list(_validate_slot_kinds_field(content)) if identity.policy_type == _SPECIES_PILOT_POLICY_TYPE: if identity.namespace != _SPECIES_PILOT_NAMESPACE: errors.append("species_block namespace must be exactly 'image.blocks.species'.") text_value = content.get("text") if not isinstance(text_value, str) or not text_value.strip(): errors.append("species_block content.text must be a non-empty string") return errors if identity.policy_type == "clothing_block": clothing_text = content.get("text") if not isinstance(clothing_text, str) or not clothing_text.strip(): errors.append("clothing_block content.text must be a non-empty string") return errors if identity.policy_type == "image_block": image_text = content.get("text") if not isinstance(image_text, str) or not image_text.strip(): errors.append("image_block content.text must be a non-empty string") return errors if identity.policy_type == "location": location_text = content.get("text") if not isinstance(location_text, str) or not location_text.strip(): errors.append("location content.text must be a non-empty string") return errors if identity.policy_type == "prompt": prompt_text = content.get("text") if not isinstance(prompt_text, str) or not prompt_text.strip(): errors.append("prompt content.text must be a non-empty string") return errors if identity.policy_type == "tone_profile": prompt_block = content.get("prompt_block") if not isinstance(prompt_block, str) or not prompt_block.strip(): errors.append("tone_profile content.prompt_block must be a non-empty string") return errors if identity.policy_type == "axis_bundle": for key in ("axes", "thresholds", "resolution"): if not isinstance(content.get(key), dict) or not content.get(key): errors.append(f"axis_bundle content.{key} must be a non-empty object") return errors if identity.policy_type == "manifest_bundle": manifest_payload = content.get("manifest") if not isinstance(manifest_payload, dict) or not manifest_payload: errors.append("manifest_bundle content.manifest must be a non-empty object") return errors if identity.policy_type in _LAYER2_POLICY_TYPES: errors.extend(_validate_layer2_references(content=content)) if identity.policy_type == "descriptor_layer": descriptor_text = content.get("text") if not isinstance(descriptor_text, str) or not descriptor_text.strip(): errors.append("descriptor_layer content.text must be a non-empty string") return errors errors.append( "Validation/writes currently support policy_type values: " "'image_block', 'species_block', 'clothing_block', 'location', " "'prompt', 'tone_profile', 'axis_bundle', 'manifest_bundle', " "'descriptor_layer', 'registry'." ) return errors def _validate_layer2_references(*, content: dict[str, Any]) -> list[str]: """Validate Layer 2 references as strict Layer 1 identity pointers.""" errors: list[str] = [] references = content.get("references") if not isinstance(references, list) or len(references) == 0: return ["Layer 2 content.references must be a non-empty list."] for index, reference in enumerate(references): prefix = f"content.references[{index}]" if not isinstance(reference, dict): errors.append(f"{prefix} must be an object with policy_id and variant.") continue referenced_policy_id = reference.get("policy_id") referenced_variant = reference.get("variant") if not isinstance(referenced_policy_id, str) or not referenced_policy_id.strip(): errors.append(f"{prefix}.policy_id must be a non-empty string.") continue if not isinstance(referenced_variant, str) or not referenced_variant.strip(): errors.append(f"{prefix}.variant must be a non-empty string.") continue try: referenced_identity = parse_policy_id(referenced_policy_id.strip()) except PolicyServiceError as error: errors.append(f"{prefix}.policy_id is invalid: {error.detail}") continue if referenced_identity.policy_type not in _LAYER1_POLICY_TYPES: errors.append( f"{prefix}.policy_id must reference a Layer 1 policy type " f"{sorted(_LAYER1_POLICY_TYPES)}; got {referenced_identity.policy_type!r}." ) continue referenced_row = policy_repo.get_policy( policy_id=referenced_identity.policy_id, variant=referenced_variant.strip(), ) if referenced_row is None: errors.append( f"{prefix} references missing Layer 1 variant: " f"{referenced_identity.policy_id}:{referenced_variant.strip()}" ) return errors