"""Layer 3 activation service logic.
This module owns activation pointer mutation, replay consistency checks, and
scope overlay resolution used by runtime callers.
"""
from __future__ import annotations
from typing import Any
from mud_server.db import policy_repo
from .errors import PolicyServiceError
from .types import ActivationScope
from .utils import activation_map_from_rows, ensure_world_exists, now_iso
from .validation import parse_policy_id
[docs]
def set_policy_activation(
*,
scope: ActivationScope,
policy_id: str,
variant: str,
activated_by: str,
rollback_of_activation_id: int | None = None,
) -> dict[str, Any]:
"""Set one activation pointer for a scope and record audit history.
When ``rollback_of_activation_id`` is provided, the target variant is read
from that event after strict scope and policy-id checks.
"""
ensure_world_exists(scope.world_id)
parse_policy_id(policy_id)
resolved_variant = variant
if rollback_of_activation_id is not None:
rollback_event = policy_repo.get_activation_event(rollback_of_activation_id)
if rollback_event is None:
raise PolicyServiceError(
status_code=404,
code="POLICY_ROLLBACK_EVENT_NOT_FOUND",
detail=f"Rollback activation event not found: {rollback_of_activation_id}",
)
if rollback_event["policy_id"] != policy_id:
raise PolicyServiceError(
status_code=409,
code="POLICY_ROLLBACK_POLICY_MISMATCH",
detail="Rollback activation event does not match requested policy_id.",
)
if rollback_event["world_id"] != scope.world_id or rollback_event["client_profile"] != (
scope.client_profile
):
raise PolicyServiceError(
status_code=409,
code="POLICY_ROLLBACK_SCOPE_MISMATCH",
detail="Rollback activation event belongs to a different activation scope.",
)
resolved_variant = str(rollback_event["variant"])
try:
activation_row = policy_repo.set_policy_activation(
world_id=scope.world_id,
client_profile=scope.client_profile,
policy_id=policy_id,
variant=resolved_variant,
activated_by=activated_by,
activated_at=now_iso(),
rollback_of_activation_id=rollback_of_activation_id,
)
except Exception as exc:
raise PolicyServiceError(
status_code=422,
code="POLICY_ACTIVATION_ERROR",
detail=str(exc),
) from exc
assert_activation_replay_consistency(scope=scope)
return activation_row
[docs]
def list_policy_activations(*, scope: ActivationScope) -> list[dict[str, Any]]:
"""List active pointers for exactly one activation scope."""
ensure_world_exists(scope.world_id)
return policy_repo.list_policy_activations(
world_id=scope.world_id,
client_profile=scope.client_profile,
)
[docs]
def resolve_effective_policy_activations(*, scope: ActivationScope) -> list[dict[str, Any]]:
"""Resolve effective scope activations with client-over-world overlay semantics."""
ensure_world_exists(scope.world_id)
world_rows = policy_repo.list_policy_activations(world_id=scope.world_id, client_profile="")
if not scope.client_profile:
return world_rows
client_rows = policy_repo.list_policy_activations(
world_id=scope.world_id,
client_profile=scope.client_profile,
)
merged_by_policy_id: dict[str, dict[str, Any]] = {
str(row["policy_id"]): row for row in world_rows
}
for row in client_rows:
merged_by_policy_id[str(row["policy_id"])] = row
return [merged_by_policy_id[policy_id] for policy_id in sorted(merged_by_policy_id)]
[docs]
def get_effective_policy_variant(
*, scope: ActivationScope, policy_id: str
) -> dict[str, Any] | None:
"""Return effective active policy variant for one scope + policy id."""
parse_policy_id(policy_id)
effective_rows = resolve_effective_policy_activations(scope=scope)
by_policy_id = {str(row["policy_id"]): row for row in effective_rows}
activation = by_policy_id.get(policy_id)
if activation is None:
return None
variant = str(activation["variant"])
row = policy_repo.get_policy(policy_id=policy_id, variant=variant)
if row is None:
raise PolicyServiceError(
status_code=409,
code="POLICY_EFFECTIVE_REFERENCE_MISSING",
detail=f"Effective activation points to missing policy variant: {policy_id}:{variant}",
)
return row
[docs]
def assert_activation_replay_consistency(*, scope: ActivationScope) -> None:
"""Assert pointer table equals replayed activation-event history.
This protects the core auditability invariant of Layer 3.
"""
try:
active_rows = policy_repo.list_policy_activations(
world_id=scope.world_id,
client_profile=scope.client_profile,
)
activation_events = policy_repo.list_activation_events(
world_id=scope.world_id,
client_profile=scope.client_profile,
)
except Exception as exc:
raise PolicyServiceError(
status_code=500,
code="POLICY_AUDIT_REPLAY_READ_ERROR",
detail=str(exc),
) from exc
pointer_state = activation_map_from_rows(active_rows)
replay_state: dict[str, str] = {}
for event in activation_events:
replay_state[str(event["policy_id"])] = str(event["variant"])
if replay_state != pointer_state:
raise PolicyServiceError(
status_code=500,
code="POLICY_ACTIVATION_REPLAY_MISMATCH",
detail=(
"Activation pointer state does not match replayed activation events "
f"for scope {scope.world_id}:{scope.client_profile}."
),
)