Source code for mud_server.db.worlds_repo

"""World catalog and world-access policy repository operations."""

from __future__ import annotations

import sqlite3
from typing import Any, NoReturn, cast

from mud_server.db.connection import connection_scope
from mud_server.db.errors import DatabaseError, DatabaseOperationContext, DatabaseReadError
from mud_server.db.types import WorldAccessDecision


def _raise_read_error(operation: str, exc: Exception, *, details: str | None = None) -> NoReturn:
    """Raise a typed repository read error while preserving chained cause."""
    if isinstance(exc, DatabaseError):
        raise exc
    raise DatabaseReadError(
        context=DatabaseOperationContext(operation=operation, details=details),
        cause=exc,
    ) from exc


[docs] def get_world_by_id(world_id: str) -> dict[str, Any] | None: """Return one world catalog row by id.""" try: with connection_scope() as conn: cursor = conn.cursor() cursor.execute( """ SELECT id, name, description, is_active, config_json, created_at FROM worlds WHERE id = ? """, (world_id,), ) row = cursor.fetchone() if not row: return None return { "id": row[0], "name": row[1], "description": row[2], "is_active": bool(row[3]), "config_json": row[4], "created_at": row[5], } except Exception as exc: _raise_read_error("worlds.get_world_by_id", exc, details=f"world_id={world_id!r}")
[docs] def list_worlds(*, include_inactive: bool = False) -> list[dict[str, Any]]: """Return world catalog rows, optionally including inactive worlds.""" try: with connection_scope() as conn: cursor = conn.cursor() if include_inactive: cursor.execute(""" SELECT id, name, description, is_active, config_json, created_at FROM worlds ORDER BY id """) else: cursor.execute(""" SELECT id, name, description, is_active, config_json, created_at FROM worlds WHERE is_active = 1 ORDER BY id """) rows = cursor.fetchall() return [ { "id": row[0], "name": row[1], "description": row[2], "is_active": bool(row[3]), "config_json": row[4], "created_at": row[5], } for row in rows ] except Exception as exc: _raise_read_error( "worlds.list_worlds", exc, details=f"include_inactive={include_inactive}", )
def _query_world_rows(cursor: sqlite3.Cursor, *, include_inactive: bool) -> list[tuple[Any, ...]]: """Query world catalog rows with optional inactive filtering.""" if include_inactive: cursor.execute(""" SELECT id, name, description, is_active, config_json, created_at FROM worlds ORDER BY id """) else: cursor.execute(""" SELECT id, name, description, is_active, config_json, created_at FROM worlds WHERE is_active = 1 ORDER BY id """) return cursor.fetchall() def _user_has_world_permission(cursor: sqlite3.Cursor, *, user_id: int, world_id: str) -> bool: """Return ``True`` when an explicit world access grant exists.""" cursor.execute( """ SELECT 1 FROM world_permissions WHERE user_id = ? AND world_id = ? AND can_access = 1 LIMIT 1 """, (user_id, world_id), ) return cursor.fetchone() is not None def _count_user_characters_in_world(cursor: sqlite3.Cursor, *, user_id: int, world_id: str) -> int: """Count characters owned by ``user_id`` in ``world_id``.""" cursor.execute( """ SELECT COUNT(*) FROM characters WHERE user_id = ? AND world_id = ? """, (user_id, world_id), ) row = cursor.fetchone() return int(row[0]) if row else 0 def _resolve_world_access_for_row( cursor: sqlite3.Cursor, *, user_id: int, role: str | None, world_row: tuple[Any, ...], ) -> WorldAccessDecision: """Resolve effective access/create capabilities for one world row.""" from mud_server.config import config world_id = str(world_row[0]) is_active = bool(world_row[3]) world_policy = config.resolve_world_character_policy(world_id) current_count = _count_user_characters_in_world(cursor, user_id=user_id, world_id=world_id) has_existing_character = current_count > 0 has_permission = _user_has_world_permission(cursor, user_id=user_id, world_id=world_id) # Elevated roles always have operational world access, but slot limits # still constrain character creation counts. if role in {"admin", "superuser"}: can_access = True else: can_access = ( has_permission or has_existing_character or world_policy.creation_mode == "open" ) if not is_active: return WorldAccessDecision( world_id=world_id, can_access=False, can_create=False, access_mode=world_policy.creation_mode, naming_mode=world_policy.naming_mode, slot_limit_per_account=int(world_policy.slot_limit_per_account), current_character_count=current_count, has_permission_grant=has_permission, has_existing_character=has_existing_character, reason="world_inactive", ) if not can_access: return WorldAccessDecision( world_id=world_id, can_access=False, can_create=False, access_mode=world_policy.creation_mode, naming_mode=world_policy.naming_mode, slot_limit_per_account=int(world_policy.slot_limit_per_account), current_character_count=current_count, has_permission_grant=has_permission, has_existing_character=has_existing_character, reason="invite_required", ) slot_limit = max(0, int(world_policy.slot_limit_per_account)) if current_count >= slot_limit: return WorldAccessDecision( world_id=world_id, can_access=True, can_create=False, access_mode=world_policy.creation_mode, naming_mode=world_policy.naming_mode, slot_limit_per_account=slot_limit, current_character_count=current_count, has_permission_grant=has_permission, has_existing_character=has_existing_character, reason="slot_limit_reached", ) return WorldAccessDecision( world_id=world_id, can_access=True, can_create=True, access_mode=world_policy.creation_mode, naming_mode=world_policy.naming_mode, slot_limit_per_account=slot_limit, current_character_count=current_count, has_permission_grant=has_permission, has_existing_character=has_existing_character, reason="ok", )
[docs] def get_world_access_decision( user_id: int, world_id: str, *, role: str | None = None, ) -> WorldAccessDecision: """Resolve world access/create decision for one account and world.""" from mud_server.config import config try: with connection_scope() as conn: cursor = conn.cursor() cursor.execute( """ SELECT id, name, description, is_active, config_json, created_at FROM worlds WHERE id = ? LIMIT 1 """, (world_id,), ) world_row = cursor.fetchone() if world_row is None: world_policy = config.resolve_world_character_policy(world_id) return WorldAccessDecision( world_id=world_id, can_access=False, can_create=False, access_mode=world_policy.creation_mode, naming_mode=world_policy.naming_mode, slot_limit_per_account=int(world_policy.slot_limit_per_account), current_character_count=0, has_permission_grant=False, has_existing_character=False, reason="world_not_found", ) return _resolve_world_access_for_row( cursor, user_id=user_id, role=role, world_row=world_row, ) except Exception as exc: _raise_read_error( "worlds.get_world_access_decision", exc, details=f"user_id={user_id}, world_id={world_id!r}, role={role!r}", )
[docs] def can_user_access_world(user_id: int, world_id: str, *, role: str | None = None) -> bool: """Return ``True`` when the account may access/select the world.""" return bool(get_world_access_decision(user_id, world_id, role=role).can_access)
[docs] def get_user_character_count_for_world(user_id: int, world_id: str) -> int: """Return the number of characters the account owns in a world.""" try: with connection_scope() as conn: cursor = conn.cursor() return _count_user_characters_in_world(cursor, user_id=user_id, world_id=world_id) except Exception as exc: _raise_read_error( "worlds.get_user_character_count_for_world", exc, details=f"user_id={user_id}, world_id={world_id!r}", )
[docs] def get_world_admin_rows() -> list[dict[str, Any]]: """Return operational world rows for admin/superuser world monitoring.""" try: with connection_scope() as conn: cursor = conn.cursor() cursor.execute(""" SELECT w.id, w.name, w.description, w.is_active, w.config_json, w.created_at, s.session_id, s.last_activity, s.client_type, c.id, c.name, u.username FROM worlds w LEFT JOIN sessions s ON s.world_id = w.id AND s.character_id IS NOT NULL AND (s.expires_at IS NULL OR datetime(s.expires_at) > datetime('now')) LEFT JOIN characters c ON c.id = s.character_id LEFT JOIN users u ON u.id = s.user_id ORDER BY w.id ASC, datetime(s.last_activity) DESC """) rows = cursor.fetchall() except Exception as exc: _raise_read_error("worlds.get_world_admin_rows", exc) worlds_by_id: dict[str, dict[str, Any]] = {} for row in rows: world_id = cast(str, row[0]) world = worlds_by_id.get(world_id) if world is None: world = { "world_id": world_id, "name": row[1], "description": row[2], "is_active": bool(row[3]), "config_json": row[4], "created_at": row[5], "active_session_count": 0, "active_character_count": 0, "is_online": False, "last_activity": None, "active_characters": [], "_session_ids": set(), "_character_ids": set(), } worlds_by_id[world_id] = world session_id = row[6] if session_id: session_ids = cast(set[str], world["_session_ids"]) if session_id not in session_ids: session_ids.add(session_id) world["active_session_count"] = int(world["active_session_count"]) + 1 if world["last_activity"] is None and row[7] is not None: world["last_activity"] = row[7] character_id = row[9] if character_id is None: continue character_ids = cast(set[int], world["_character_ids"]) if int(character_id) not in character_ids: character_ids.add(int(character_id)) world["active_character_count"] = int(world["active_character_count"]) + 1 world["is_online"] = True world["active_characters"].append( { "character_id": int(character_id), "character_name": row[10], "username": row[11], "session_id": session_id, "last_activity": row[7], "client_type": row[8] or "unknown", } ) result: list[dict[str, Any]] = [] for world_id in sorted(worlds_by_id): world = worlds_by_id[world_id] world.pop("_session_ids", None) world.pop("_character_ids", None) result.append(world) return result
[docs] def list_worlds_for_user( user_id: int, *, role: str | None = None, include_inactive: bool = False, include_invite_worlds: bool = False, ) -> list[dict[str, Any]]: """Return world rows decorated with access policy for one account.""" from mud_server.db.users_repo import get_user_role, get_username_by_id if role is None: username = get_username_by_id(user_id) if username: role = get_user_role(username) try: with connection_scope() as conn: cursor = conn.cursor() rows = _query_world_rows(cursor, include_inactive=include_inactive) worlds: list[dict[str, Any]] = [] is_elevated = role in {"admin", "superuser"} for row in rows: decision = _resolve_world_access_for_row( cursor, user_id=user_id, role=role, world_row=row, ) if not is_elevated and not include_invite_worlds and not decision.can_access: continue worlds.append( { "id": row[0], "name": row[1], "description": row[2], "is_active": bool(row[3]), "config_json": row[4], "created_at": row[5], "can_access": decision.can_access, "can_create": decision.can_create, "access_mode": decision.access_mode, "naming_mode": decision.naming_mode, "slot_limit_per_account": decision.slot_limit_per_account, "current_character_count": decision.current_character_count, "has_permission_grant": decision.has_permission_grant, "has_existing_character": decision.has_existing_character, "is_invite_only": decision.access_mode == "invite", "is_locked": not decision.can_access, "access_reason": decision.reason, } ) return worlds except Exception as exc: _raise_read_error( "worlds.list_worlds_for_user", exc, details=( f"user_id={user_id}, role={role!r}, include_inactive={include_inactive}, " f"include_invite_worlds={include_invite_worlds}" ), )