Source code for mud_server.services.policy.publish

"""Deterministic publish and mirror artifact services.

This module materializes non-authoritative exchange artifacts from canonical
DB state selected by Layer 3 activation pointers.
"""

from __future__ import annotations

import json
import re
from pathlib import Path
from typing import Any

from pipeworks_ipc import compute_payload_hash

from mud_server.db import policy_repo

from .activation import resolve_effective_policy_activations
from .constants import (
    _POLICY_EXPORT_LATEST_FILENAME,
    _POLICY_EXPORT_SCHEMA_VERSION,
    _POLICY_EXPORT_WORLD_DIRNAME,
    _POLICY_SCHEMA_VERSION_V1,
)
from .errors import PolicyServiceError
from .hashing import compute_artifact_hash
from .paths import resolve_policy_export_root
from .types import ActivationScope
from .utils import ensure_world_exists, now_iso


[docs] def publish_scope(*, scope: ActivationScope, actor: str) -> dict[str, Any]: """Build and persist one deterministic publish manifest for a scope.""" ensure_world_exists(scope.world_id) activations = resolve_effective_policy_activations(scope=scope) manifest_items: list[dict[str, Any]] = [] for activation in activations: policy = policy_repo.get_policy( policy_id=str(activation["policy_id"]), variant=str(activation["variant"]), ) if policy is None: raise PolicyServiceError( status_code=409, code="POLICY_PUBLISH_REFERENCE_MISSING", detail=( "Activation references a missing policy variant: " f"{activation['policy_id']}:{activation['variant']}" ), ) manifest_items.append( { "policy_id": policy["policy_id"], "policy_type": policy["policy_type"], "namespace": policy["namespace"], "policy_key": policy["policy_key"], "variant": policy["variant"], "schema_version": policy["schema_version"], "policy_version": policy["policy_version"], "status": policy["status"], "content_hash": policy["content_hash"], "updated_at": policy["updated_at"], } ) manifest_items.sort( key=lambda item: (item["policy_type"], item["namespace"], item["policy_key"]) ) items_hash = str(compute_payload_hash({"items": manifest_items})) manifest_hash = str( compute_payload_hash( { "world_id": scope.world_id, "client_profile": scope.client_profile or None, "items_hash": items_hash, "item_count": len(manifest_items), } ) ) generated_at = now_iso() manifest = { "world_id": scope.world_id, "client_profile": scope.client_profile or None, "generated_at": generated_at, "item_count": len(manifest_items), "items_hash": items_hash, "manifest_hash": manifest_hash, "items": manifest_items, } publish_run_id = policy_repo.insert_publish_run( world_id=scope.world_id, client_profile=scope.client_profile, actor=actor, manifest=manifest, created_at=generated_at, ) artifact = _materialize_publish_artifact( world_id=scope.world_id, client_profile=scope.client_profile, manifest=manifest, ) return { "publish_run_id": publish_run_id, "manifest": manifest, "artifact": artifact, }
[docs] def get_publish_run(*, publish_run_id: int) -> dict[str, Any]: """Get one publish run plus deterministic artifact metadata.""" run_row = policy_repo.get_publish_run(publish_run_id=publish_run_id) if run_row is None: raise PolicyServiceError( status_code=404, code="POLICY_PUBLISH_RUN_NOT_FOUND", detail=f"Publish run not found: {publish_run_id}", ) world_id = str(run_row["world_id"]) client_profile = str(run_row["client_profile"] or "") manifest = _normalize_manifest_for_export( world_id=world_id, client_profile=client_profile, manifest=run_row["manifest"], ) artifact = _materialize_publish_artifact( world_id=world_id, client_profile=client_profile, manifest=manifest, ) return { "publish_run_id": int(run_row["publish_run_id"]), "world_id": world_id, "client_profile": client_profile or None, "actor": str(run_row["actor"]), "created_at": str(run_row["created_at"]), "manifest": manifest, "artifact": artifact, }
def _normalize_manifest_for_export( *, world_id: str, client_profile: str, manifest: dict[str, Any], ) -> dict[str, Any]: """Return normalized manifest with deterministic hash fields populated.""" normalized = dict(manifest) items_raw = normalized.get("items") if not isinstance(items_raw, list): items_raw = [] items: list[dict[str, Any]] = [dict(item) for item in items_raw if isinstance(item, dict)] items.sort( key=lambda item: ( str(item.get("policy_type", "")), str(item.get("namespace", "")), str(item.get("policy_key", "")), ) ) normalized["items"] = items normalized["item_count"] = int(normalized.get("item_count", len(items))) normalized["world_id"] = world_id normalized["client_profile"] = client_profile or None items_hash = str(normalized.get("items_hash") or compute_payload_hash({"items": items})) normalized["items_hash"] = items_hash manifest_hash = str( normalized.get("manifest_hash") or compute_payload_hash( { "world_id": world_id, "client_profile": client_profile or None, "items_hash": items_hash, "item_count": normalized["item_count"], } ) ) normalized["manifest_hash"] = manifest_hash return normalized def _materialize_publish_artifact( *, world_id: str, client_profile: str, manifest: dict[str, Any], ) -> dict[str, Any]: """Write deterministic exchange artifact and return artifact metadata.""" normalized_manifest = _normalize_manifest_for_export( world_id=world_id, client_profile=client_profile, manifest=manifest, ) variants = _build_export_variants(items=normalized_manifest["items"]) variants_hash = str(compute_payload_hash({"variants": variants})) artifact_payload: dict[str, Any] = { "export_schema_version": _POLICY_EXPORT_SCHEMA_VERSION, "policy_authority": "mud_server", "mirror_mode": "non_authoritative", "world_id": world_id, "client_profile": client_profile or None, "manifest_hash": normalized_manifest["manifest_hash"], "items_hash": normalized_manifest["items_hash"], "item_count": normalized_manifest["item_count"], "items": normalized_manifest["items"], "variants_hash": variants_hash, "variants": variants, } artifact_hash = compute_artifact_hash(artifact=artifact_payload) artifact_payload["artifact_hash"] = artifact_hash artifact_path = _publish_artifact_path( world_id=world_id, client_profile=client_profile, manifest_hash=str(normalized_manifest["manifest_hash"]), ) export_root = resolve_policy_export_root() latest_path = artifact_path.parent / _POLICY_EXPORT_LATEST_FILENAME artifact_path_for_latest = str(artifact_path) try: artifact_path_for_latest = str(artifact_path.relative_to(export_root)) except ValueError: # Keep absolute fallback if artifact path is not under export root. artifact_path_for_latest = str(artifact_path) try: artifact_path.parent.mkdir(parents=True, exist_ok=True) artifact_path.write_text( json.dumps(artifact_payload, ensure_ascii=False, sort_keys=True, indent=2) + "\n", encoding="utf-8", ) latest_payload = { "policy_authority": "mud_server", "mirror_mode": "non_authoritative", "world_id": world_id, "scope": _scope_segment(client_profile), "client_profile": client_profile or None, "manifest_hash": normalized_manifest["manifest_hash"], "items_hash": normalized_manifest["items_hash"], "variants_hash": variants_hash, "item_count": normalized_manifest["item_count"], "artifact_hash": artifact_hash, "artifact_file": artifact_path.name, "artifact_path": artifact_path_for_latest, } latest_path.write_text( json.dumps(latest_payload, ensure_ascii=False, sort_keys=True, indent=2) + "\n", encoding="utf-8", ) except Exception as exc: raise PolicyServiceError( status_code=500, code="POLICY_PUBLISH_ARTIFACT_WRITE_ERROR", detail=str(exc), ) from exc return { "artifact_hash": artifact_hash, "artifact_path": str(artifact_path), "latest_path": str(latest_path), } def _publish_artifact_path(*, world_id: str, client_profile: str, manifest_hash: str) -> Path: """Return deterministic artifact path under exchange-repo layout.""" export_root = _resolve_policy_export_root() scope_segment = _scope_segment(client_profile) filename = f"publish_{manifest_hash}.json" return export_root / _POLICY_EXPORT_WORLD_DIRNAME / world_id / scope_segment / filename def _build_export_variants(*, items: list[dict[str, Any]]) -> list[dict[str, Any]]: """Build canonical variant payloads used by import/export round-trips.""" variants: list[dict[str, Any]] = [] for item in items: policy_id = str(item.get("policy_id", "")) variant = str(item.get("variant", "")) policy_row = policy_repo.get_policy(policy_id=policy_id, variant=variant) if policy_row is None: # Historical publish runs may reference variants deleted later. # We preserve inspectability by emitting a non-materialized row. variants.append( { "policy_id": policy_id, "policy_type": str(item.get("policy_type", "")), "namespace": str(item.get("namespace", "")), "policy_key": str(item.get("policy_key", "")), "variant": variant, "schema_version": str(item.get("schema_version", _POLICY_SCHEMA_VERSION_V1)), "policy_version": int(item.get("policy_version", 1) or 1), "status": str(item.get("status", "candidate")), "content": {}, "content_hash": str(item.get("content_hash", "")), "updated_at": str(item.get("updated_at", "")), "updated_by": "unknown", "materialized": False, } ) continue variants.append( { "policy_id": str(policy_row["policy_id"]), "policy_type": str(policy_row["policy_type"]), "namespace": str(policy_row["namespace"]), "policy_key": str(policy_row["policy_key"]), "variant": str(policy_row["variant"]), "schema_version": str(policy_row["schema_version"]), "policy_version": int(policy_row["policy_version"]), "status": str(policy_row["status"]), "content": dict(policy_row["content"]), "content_hash": str(policy_row["content_hash"]), "updated_at": str(policy_row["updated_at"]), "updated_by": str(policy_row["updated_by"]), "materialized": True, } ) variants.sort( key=lambda row: ( str(row.get("policy_type", "")), str(row.get("namespace", "")), str(row.get("policy_key", "")), str(row.get("variant", "")), ) ) return variants def _resolve_policy_export_root() -> Path: """Resolve root directory for publish mirror artifacts. Resolution order: 1. ``MUD_POLICY_EXPORTS_ROOT`` environment variable 2. sibling repo near active working repo 3. sibling repo near ``PROJECT_ROOT`` default """ return resolve_policy_export_root() def _scope_segment(client_profile: str) -> str: """Return stable filesystem path segment for a client-profile scope.""" if not client_profile: return "world" sanitized = re.sub(r"[^A-Za-z0-9_.-]+", "_", client_profile).strip("._") if not sanitized: sanitized = "client" return f"client_{sanitized}"