Source code for mud_server.db.events_repo

"""Event ledger repository operations for axis score mutations."""

from __future__ import annotations

import sqlite3
from typing import Any, NoReturn

from mud_server.db.connection import connection_scope
from mud_server.db.constants import DEFAULT_AXIS_SCORE
from mud_server.db.errors import (
    DatabaseError,
    DatabaseOperationContext,
    DatabaseReadError,
    DatabaseWriteError,
)


def _raise_read_error(operation: str, exc: Exception, *, details: str | None = None) -> NoReturn:
    """Raise a typed repository read error while preserving chained cause."""
    if isinstance(exc, DatabaseError):
        raise exc
    raise DatabaseReadError(
        context=DatabaseOperationContext(operation=operation, details=details),
        cause=exc,
    ) from exc


def _raise_write_error(operation: str, exc: Exception, *, details: str | None = None) -> NoReturn:
    """Raise a typed repository write error while preserving chained cause."""
    if isinstance(exc, DatabaseError):
        raise exc
    raise DatabaseWriteError(
        context=DatabaseOperationContext(operation=operation, details=details),
        cause=exc,
    ) from exc


def _get_or_create_event_type_id(
    cursor: sqlite3.Cursor,
    *,
    world_id: str,
    event_type_name: str,
    description: str | None = None,
) -> int:
    """Return event_type id for a world, creating it when absent."""
    cursor.execute(
        "SELECT id FROM event_type WHERE world_id = ? AND name = ? LIMIT 1",
        (world_id, event_type_name),
    )
    row = cursor.fetchone()
    if row:
        return int(row[0])

    cursor.execute(
        """
        INSERT INTO event_type (world_id, name, description)
        VALUES (?, ?, ?)
        """,
        (world_id, event_type_name, description),
    )
    event_type_id = cursor.lastrowid
    if event_type_id is None:
        raise ValueError("Failed to create event_type.")
    return int(event_type_id)


def _resolve_axis_id(cursor: sqlite3.Cursor, *, world_id: str, axis_name: str) -> int | None:
    """Resolve axis id from world id + axis name."""
    cursor.execute(
        "SELECT id FROM axis WHERE world_id = ? AND name = ? LIMIT 1",
        (world_id, axis_name),
    )
    row = cursor.fetchone()
    return int(row[0]) if row else None


[docs] def apply_axis_event( *, world_id: str, character_id: int, event_type_name: str, deltas: dict[str, float], metadata: dict[str, str] | None = None, event_type_description: str | None = None, ) -> int: """Apply axis deltas and persist a full event-ledger mutation atomically.""" from mud_server.db.axis_repo import _refresh_character_current_snapshot if not deltas: raise ValueError("Event deltas must not be empty.") try: with connection_scope(write=True) as conn: cursor = conn.cursor() event_type_id = _get_or_create_event_type_id( cursor, world_id=world_id, event_type_name=event_type_name, description=event_type_description, ) cursor.execute( """ INSERT INTO event (world_id, event_type_id) VALUES (?, ?) """, (world_id, event_type_id), ) event_id = cursor.lastrowid if event_id is None: raise ValueError("Failed to create event.") event_id = int(event_id) for axis_name, delta in deltas.items(): axis_id = _resolve_axis_id(cursor, world_id=world_id, axis_name=axis_name) if axis_id is None: raise ValueError(f"Unknown axis '{axis_name}' for world '{world_id}'.") cursor.execute( """ SELECT axis_score FROM character_axis_score WHERE character_id = ? AND axis_id = ? """, (character_id, axis_id), ) row = cursor.fetchone() if row is None: old_score = DEFAULT_AXIS_SCORE cursor.execute( """ INSERT INTO character_axis_score (character_id, world_id, axis_id, axis_score) VALUES (?, ?, ?, ?) """, (character_id, world_id, axis_id, old_score), ) else: old_score = float(row[0]) new_score = old_score + float(delta) cursor.execute( """ UPDATE character_axis_score SET axis_score = ?, updated_at = CURRENT_TIMESTAMP WHERE character_id = ? AND axis_id = ? """, (new_score, character_id, axis_id), ) cursor.execute( """ INSERT INTO event_entity_axis_delta (event_id, character_id, axis_id, old_score, new_score, delta) VALUES (?, ?, ?, ?, ?, ?) """, (event_id, character_id, axis_id, old_score, new_score, float(delta)), ) if metadata: for key, value in metadata.items(): cursor.execute( """ INSERT INTO event_metadata (event_id, key, value) VALUES (?, ?, ?) """, (event_id, key, value), ) _refresh_character_current_snapshot( cursor, character_id=character_id, world_id=world_id, ) return event_id except ValueError: # Unknown axis names and empty deltas are intentional domain validation failures. raise except Exception as exc: _raise_write_error( "events.apply_axis_event", exc, details=( f"world_id={world_id!r}, character_id={character_id}, " f"event_type_name={event_type_name!r}" ), )
[docs] def get_character_axis_events(character_id: int, *, limit: int = 50) -> list[dict[str, Any]]: """Return recent axis events with deltas and metadata for one character.""" try: with connection_scope() as conn: cursor = conn.cursor() cursor.execute( """ SELECT DISTINCT e.id FROM event_entity_axis_delta d JOIN event e ON e.id = d.event_id WHERE d.character_id = ? ORDER BY e.id DESC LIMIT ? """, (character_id, limit), ) event_ids = [row[0] for row in cursor.fetchall()] if not event_ids: return [] placeholders = ",".join(["?"] * len(event_ids)) events_query = f""" SELECT e.id, e.world_id, e.timestamp, et.name, et.description, a.name, d.old_score, d.new_score, d.delta FROM event_entity_axis_delta d JOIN event e ON e.id = d.event_id JOIN event_type et ON et.id = e.event_type_id JOIN axis a ON a.id = d.axis_id WHERE d.character_id = ? AND e.id IN ({placeholders}) ORDER BY e.id DESC, a.name ASC """ # nosec B608 cursor.execute(events_query, [character_id, *event_ids]) rows = cursor.fetchall() metadata_query = f""" SELECT event_id, key, value FROM event_metadata WHERE event_id IN ({placeholders}) """ # nosec B608 cursor.execute(metadata_query, event_ids) metadata_rows = cursor.fetchall() metadata_map: dict[int, dict[str, str]] = {} for event_id, key, value in metadata_rows: event_id_int = int(event_id) metadata_map.setdefault(event_id_int, {})[key] = value events: dict[int, dict[str, Any]] = {} for ( event_id, world_id, timestamp, event_type_name, event_type_description, axis_name, old_score, new_score, delta, ) in rows: event_id_int = int(event_id) event = events.get(event_id_int) if event is None: event = { "event_id": event_id_int, "world_id": world_id, "event_type": event_type_name, "event_type_description": event_type_description, "timestamp": timestamp, "metadata": metadata_map.get(event_id_int, {}), "deltas": [], } events[event_id_int] = event event["deltas"].append( { "axis_name": axis_name, "old_score": float(old_score), "new_score": float(new_score), "delta": float(delta), } ) return [events[int(event_id)] for event_id in event_ids if int(event_id) in events] except Exception as exc: _raise_read_error( "events.get_character_axis_events", exc, details=f"character_id={character_id}, limit={limit}", )