"""Axis resolution engine.
:class:`AxisEngine` is the orchestrator that ties together the resolution
grammar, the resolver functions, the JSONL ledger, and the SQLite materialized
view. One instance is created per :class:`~mud_server.core.world.World` at
startup and remains live for the server's lifetime.
Resolution sequence (``resolve_chat_interaction``):
1. Resolve character IDs from names (world-scoped, raises
:exc:`CharacterNotFoundError` on miss).
2. Acquire per-character threading locks in ascending ID order (deadlock
prevention).
3. Read current axis scores from the SQLite DB.
4. Compute ipc_hash via :func:`~pipeworks_ipc.compute_payload_hash` over the
pre-interaction snapshot.
5. Compute axis deltas for every axis in the chat grammar.
6. Write ``chat.mechanical_resolution`` to the JSONL ledger — the authoritative
act that makes the interaction permanent.
7. Clamp deltas to ``[0.0, 1.0]`` and apply to the DB via
:func:`~mud_server.db.facade.apply_axis_event`.
8. Release locks.
9. Return :class:`~mud_server.axis.types.AxisResolutionResult`.
Locking strategy:
Each character has a :class:`threading.Lock` stored in a dict keyed by
``character_id``. Locks are always acquired in ascending ID order to
prevent deadlocks when two interactions share a character. The dict
itself is protected by a separate ``_locks_mutex``.
Note on ipc_hash computation (deviation from plan):
:func:`~pipeworks_ipc.compute_ipc_id` requires ``system_prompt_hash: str``
— a concept that has no meaning in a purely mechanical resolution (no LLM
call). :func:`~pipeworks_ipc.compute_payload_hash` is used directly on
the resolution payload dict instead. When the translation service
subsequently uses this ``ipc_hash`` for deterministic Ollama seeding it
calls :func:`~pipeworks_ipc.compute_ipc_id` with this hash as
``input_hash``, which is the intended design.
"""
from __future__ import annotations
import logging
import threading
from collections.abc import Callable
from typing import Any
from pipeworks_ipc import compute_payload_hash
from mud_server.axis.grammar import ResolutionGrammar
from mud_server.axis.resolvers import dominance_shift, no_effect, shared_drain
from mud_server.axis.types import AxisDelta, AxisResolutionResult, EntityResolution
from mud_server.db import facade as database
from mud_server.db.constants import DEFAULT_AXIS_SCORE
from mud_server.ledger import append_event as _ledger_append
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Resolver registry
# ---------------------------------------------------------------------------
#: Maps YAML resolver names → resolver callables. New resolver algorithms
#: are registered here; the grammar YAML refers to them by name.
_RESOLVER_REGISTRY: dict[str, Callable] = {
"dominance_shift": dominance_shift,
"shared_drain": shared_drain,
"no_effect": no_effect,
}
# ---------------------------------------------------------------------------
# Exceptions
# ---------------------------------------------------------------------------
[docs]
class CharacterNotFoundError(Exception):
"""Raised when a character name cannot be resolved within a world.
Attributes:
character_name: The name that could not be resolved.
world_id: The world in which the lookup was attempted.
"""
def __init__(self, character_name: str, world_id: str) -> None:
self.character_name = character_name
self.world_id = world_id
super().__init__(f"Character {character_name!r} not found in world {world_id!r}.")
# ---------------------------------------------------------------------------
# Engine
# ---------------------------------------------------------------------------
[docs]
class AxisEngine:
"""General-purpose resolver registry for axis mutations.
Instantiated once per :class:`~mud_server.core.world.World` (same
lifecycle as the translation service). The chat resolver is the first
concrete implementation; future stimulus types (environmental, physical,
economic) will add new ``resolve_*`` methods following the same pattern.
Args:
world_id: The world this engine is scoped to. Used when reading and
writing the JSONL ledger and the DB.
grammar: The parsed :class:`~mud_server.axis.grammar.ResolutionGrammar`
for this world. Immutable for the engine's lifetime.
"""
def __init__(self, *, world_id: str, grammar: ResolutionGrammar) -> None:
self._world_id = world_id
self._grammar = grammar
# Per-character threading.Lock pool. Protects the read-compute-write
# cycle in resolve_chat_interaction against concurrent interactions.
self._locks: dict[int, threading.Lock] = {}
self._locks_mutex = threading.Lock()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
[docs]
def resolve_chat_interaction(
self,
*,
speaker_name: str,
listener_name: str,
channel: str,
world_id: str,
) -> AxisResolutionResult:
"""Resolve a chat interaction between two characters.
This is the primary entry point. All ten steps of the resolution
sequence (see module docstring) are executed here atomically under
per-character locks.
Args:
speaker_name: Display name of the character who sent the message.
listener_name: Display name of the character who received it.
channel: Chat channel — ``"say"``, ``"yell"``, or
``"whisper"``. Governs the channel multiplier
applied to every axis delta.
world_id: World in which the interaction occurs. Must match
``self._world_id`` (passed explicitly to make the
call site readable).
Returns:
:class:`~mud_server.axis.types.AxisResolutionResult` containing
the ipc_hash, per-character deltas (only axes with non-zero actual
change), and the pre-interaction axis snapshot.
Raises:
CharacterNotFoundError: If either *speaker_name* or *listener_name*
is not registered in *world_id*.
"""
# 1. Resolve character IDs (raises CharacterNotFoundError on miss)
speaker_id, listener_id = self._resolve_ids(speaker_name, listener_name, world_id)
# 2. Acquire per-character locks in ascending ID order
lock_order = sorted([speaker_id, listener_id])
locks = [self._get_lock(cid) for cid in lock_order]
for lock in locks:
lock.acquire()
try:
return self._run_resolution(
speaker_id=speaker_id,
speaker_name=speaker_name,
listener_id=listener_id,
listener_name=listener_name,
channel=channel,
world_id=world_id,
)
finally:
# Release in reverse-acquisition order (conventional RAII pattern)
for lock in reversed(locks):
lock.release()
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _get_lock(self, character_id: int) -> threading.Lock:
"""Return the per-character lock, creating it on first use."""
with self._locks_mutex:
if character_id not in self._locks:
self._locks[character_id] = threading.Lock()
return self._locks[character_id]
def _resolve_ids(
self,
speaker_name: str,
listener_name: str,
world_id: str,
) -> tuple[int, int]:
"""Look up character IDs from names; raise on miss."""
speaker_char = database.get_character_by_name_in_world(speaker_name, world_id)
if speaker_char is None:
raise CharacterNotFoundError(speaker_name, world_id)
listener_char = database.get_character_by_name_in_world(listener_name, world_id)
if listener_char is None:
raise CharacterNotFoundError(listener_name, world_id)
return int(speaker_char["id"]), int(listener_char["id"])
def _read_scores(self, character_id: int) -> dict[str, float]:
"""Read current axis scores for one character from the DB.
Returns:
Mapping of ``axis_name → score``. Falls back to
:data:`~mud_server.db.constants.DEFAULT_AXIS_SCORE` for axes with
no score row (character not yet seeded).
"""
state = database.get_character_axis_state(character_id)
if state is None:
return {}
return {a["axis_name"]: float(a["axis_score"]) for a in (state.get("axes") or [])}
def _run_resolution(
self,
*,
speaker_id: int,
speaker_name: str,
listener_id: int,
listener_name: str,
channel: str,
world_id: str,
) -> AxisResolutionResult:
"""Inner resolution logic executed under both character locks.
Separated from :meth:`resolve_chat_interaction` so that the locking
boilerplate stays clean and this method can be tested independently.
"""
# 3. Read current axis scores
speaker_scores = self._read_scores(speaker_id)
listener_scores = self._read_scores(listener_id)
chat_grammar = self._grammar.chat
multiplier = chat_grammar.channel_multipliers.get(channel, 1.0)
# Determine which axes have non-no_effect resolvers (used for snapshot
# scoping — we only include active axes in axis_snapshot_before)
active_axis_names = [
name for name, rule in chat_grammar.axes.items() if rule.resolver != "no_effect"
]
# 4. Build axis_snapshot_before (scoped to active axes only)
axis_snapshot_before: dict[str, dict[str, float]] = {
str(speaker_id): {
name: speaker_scores.get(name, DEFAULT_AXIS_SCORE) for name in active_axis_names
},
str(listener_id): {
name: listener_scores.get(name, DEFAULT_AXIS_SCORE) for name in active_axis_names
},
}
# 5. Compute ipc_hash from pre-interaction state
ipc_hash = _compute_resolution_hash(
world_id=world_id,
speaker_id=speaker_id,
listener_id=listener_id,
channel=channel,
axis_snapshot_before=axis_snapshot_before,
grammar_version=self._grammar.version,
)
# 6. Compute axis deltas for every axis in the grammar
speaker_axis_deltas: list[AxisDelta] = []
listener_axis_deltas: list[AxisDelta] = []
speaker_actual_deltas: dict[str, float] = {}
listener_actual_deltas: dict[str, float] = {}
for axis_name, rule in chat_grammar.axes.items():
sp_old = speaker_scores.get(axis_name, DEFAULT_AXIS_SCORE)
li_old = listener_scores.get(axis_name, DEFAULT_AXIS_SCORE)
sp_raw, li_raw = _call_resolver(
resolver_name=rule.resolver,
speaker_score=sp_old,
listener_score=li_old,
base_magnitude=rule.base_magnitude,
multiplier=multiplier,
min_gap_threshold=chat_grammar.min_gap_threshold,
)
# Clamp new scores to [0.0, 1.0]; compute actual (post-clamp) delta
sp_new = max(0.0, min(1.0, sp_old + sp_raw))
li_new = max(0.0, min(1.0, li_old + li_raw))
sp_actual = sp_new - sp_old
li_actual = li_new - li_old
# Only record axes with a non-zero actual change
if abs(sp_actual) > 1e-12:
speaker_axis_deltas.append(
AxisDelta(
axis_name=axis_name,
old_score=sp_old,
new_score=sp_new,
delta=sp_actual,
)
)
speaker_actual_deltas[axis_name] = sp_actual
if abs(li_actual) > 1e-12:
listener_axis_deltas.append(
AxisDelta(
axis_name=axis_name,
old_score=li_old,
new_score=li_new,
delta=li_actual,
)
)
listener_actual_deltas[axis_name] = li_actual
# 7. Write chat.mechanical_resolution to JSONL ledger (authoritative act)
_write_ledger_event(
world_id=world_id,
ipc_hash=ipc_hash,
channel=channel,
speaker_id=speaker_id,
speaker_name=speaker_name,
listener_id=listener_id,
listener_name=listener_name,
speaker_deltas=speaker_axis_deltas,
listener_deltas=listener_axis_deltas,
axis_snapshot_before=axis_snapshot_before,
grammar_version=self._grammar.version,
)
# 8. Apply deltas to DB (materialization of the ledger event)
if speaker_actual_deltas:
_apply_to_db(
world_id=world_id,
character_id=speaker_id,
actual_deltas=speaker_actual_deltas,
ipc_hash=ipc_hash,
channel=channel,
peer_id=listener_id,
)
if listener_actual_deltas:
_apply_to_db(
world_id=world_id,
character_id=listener_id,
actual_deltas=listener_actual_deltas,
ipc_hash=ipc_hash,
channel=channel,
peer_id=speaker_id,
)
# 9. Return result
return AxisResolutionResult(
ipc_hash=ipc_hash,
world_id=world_id,
channel=channel,
speaker=EntityResolution(
character_id=speaker_id,
character_name=speaker_name,
deltas=tuple(speaker_axis_deltas),
),
listener=EntityResolution(
character_id=listener_id,
character_name=listener_name,
deltas=tuple(listener_axis_deltas),
),
axis_snapshot_before=axis_snapshot_before,
)
# ---------------------------------------------------------------------------
# Module-level helpers (pure functions, no instance state)
# ---------------------------------------------------------------------------
def _compute_resolution_hash(
*,
world_id: str,
speaker_id: int,
listener_id: int,
channel: str,
axis_snapshot_before: dict[str, dict[str, float]],
grammar_version: str,
) -> str:
"""Compute a deterministic fingerprint for a mechanical resolution.
Uses :func:`~pipeworks_ipc.compute_payload_hash` directly rather than
:func:`~pipeworks_ipc.compute_ipc_id`, because ``compute_ipc_id`` requires
``system_prompt_hash: str`` — a concept that has no meaning in a purely
mechanical resolution (no LLM call is involved). This deviation is
documented on :class:`~mud_server.axis.types.AxisResolutionResult`.
Args:
world_id: World of the interaction.
speaker_id: DB primary key of the speaker.
listener_id: DB primary key of the listener.
channel: Chat channel name.
axis_snapshot_before: Pre-interaction axis scores (active axes only).
grammar_version: Version string from the loaded grammar.
Returns:
SHA-256 hex digest of the canonical resolution payload dict.
"""
payload: dict[str, Any] = {
"world_id": world_id,
"speaker_id": speaker_id,
"listener_id": listener_id,
"channel": channel,
"axis_snapshot_before": axis_snapshot_before,
"grammar_version": grammar_version,
}
result: str = compute_payload_hash(payload)
return result
def _call_resolver(
*,
resolver_name: str,
speaker_score: float,
listener_score: float,
base_magnitude: float,
multiplier: float,
min_gap_threshold: float,
) -> tuple[float, float]:
"""Dispatch to the appropriate resolver function.
Unknown resolver names are treated as ``no_effect`` with a WARNING log,
rather than raising. Grammar validation at load time should prevent
unknown names from reaching this point.
Returns:
``(speaker_raw_delta, listener_raw_delta)`` — pre-clamping floats.
"""
if resolver_name == "no_effect":
return no_effect()
if resolver_name == "dominance_shift":
return dominance_shift(
speaker_score,
listener_score,
base_magnitude=base_magnitude,
multiplier=multiplier,
min_gap_threshold=min_gap_threshold,
)
if resolver_name == "shared_drain":
return shared_drain(
base_magnitude=base_magnitude,
multiplier=multiplier,
)
# Should not reach here if grammar was validated correctly
logger.warning(
"Unknown resolver %r encountered during resolution — treating as no_effect.",
resolver_name,
)
return 0.0, 0.0
def _write_ledger_event(
*,
world_id: str,
ipc_hash: str,
channel: str,
speaker_id: int,
speaker_name: str,
listener_id: int,
listener_name: str,
speaker_deltas: list[AxisDelta],
listener_deltas: list[AxisDelta],
axis_snapshot_before: dict[str, dict[str, float]],
grammar_version: str,
) -> None:
"""Write a ``chat.mechanical_resolution`` event to the JSONL ledger.
This is the authoritative act that makes the interaction permanent. The
DB mutation that follows is a materialization of this event.
Ledger failures are logged as WARNING and do not abort the resolution — the
DB mutation still proceeds so the in-memory game state stays consistent.
This is an explicit PoC trade-off: the ledger record may be lost, but the
player interaction completes.
TODO(hardening): In production, a ledger failure should trigger an alert
and possibly halt further ledger writes until the problem is resolved.
"""
event_data: dict[str, Any] = {
"channel": channel,
"speaker": {
"character_id": speaker_id,
"character_name": speaker_name,
"axis_deltas": {d.axis_name: d.delta for d in speaker_deltas},
},
"listener": {
"character_id": listener_id,
"character_name": listener_name,
"axis_deltas": {d.axis_name: d.delta for d in listener_deltas},
},
"axis_snapshot_before": axis_snapshot_before,
"grammar_version": grammar_version,
}
try:
_ledger_append(
world_id=world_id,
event_type="chat.mechanical_resolution",
data=event_data,
ipc_hash=ipc_hash,
)
except Exception:
logger.warning(
"chat.mechanical_resolution ledger write failed for world %r — continuing.",
world_id,
exc_info=True,
)
def _apply_to_db(
*,
world_id: str,
character_id: int,
actual_deltas: dict[str, float],
ipc_hash: str,
channel: str,
peer_id: int,
) -> None:
"""Apply clamped axis deltas to the SQLite DB for one character.
Errors are logged as ERROR (not WARNING) because a failed DB write means
the materialized view is out of sync with the JSONL ledger — a more serious
state than a missing ledger event. The resolution continues regardless so
that the other character's DB row can still be updated.
Args:
world_id: World to apply the event in.
character_id: Character whose scores are being mutated.
actual_deltas: ``{axis_name: actual_delta}`` — post-clamp, non-zero only.
ipc_hash: The resolution's ipc_hash, stored as event metadata.
channel: Chat channel, stored as event metadata.
peer_id: The other character's ID, stored as event metadata.
"""
try:
database.apply_axis_event(
world_id=world_id,
character_id=character_id,
event_type_name="chat.mechanical_resolution",
event_type_description="Axis mutation produced by a chat interaction.",
deltas=actual_deltas,
metadata={
"ipc_hash": ipc_hash,
"channel": channel,
"peer_id": str(peer_id),
},
)
except Exception:
logger.error(
"DB axis mutation failed for character %d in world %r — "
"materialized view may be out of sync with ledger.",
character_id,
world_id,
exc_info=True,
)