Source code for mud_server.db.admin_repo

"""Admin inspector repository operations for SQLite backend.

This module groups schema/table inspection and admin dashboard aggregate queries
that were previously implemented directly in ``mud_server.db.database``.
"""

from __future__ import annotations

from typing import Any, NoReturn

from mud_server.db.connection import connection_scope
from mud_server.db.errors import DatabaseError, DatabaseOperationContext, DatabaseReadError


def _raise_read_error(operation: str, exc: Exception, *, details: str | None = None) -> NoReturn:
    """Raise a typed repository read error while preserving chained cause."""
    if isinstance(exc, DatabaseError):
        raise exc
    raise DatabaseReadError(
        context=DatabaseOperationContext(operation=operation, details=details),
        cause=exc,
    ) from exc


def _quote_identifier(identifier: str) -> str:
    """Safely quote an SQLite identifier.

    This escaping strategy is intentionally strict and only supports regular
    table/column identifiers used by admin inspection queries.
    """
    escaped = identifier.replace('"', '""')
    return f'"{escaped}"'


[docs] def get_table_names() -> list[str]: """Return sorted user-defined table names (excluding ``sqlite_*`` internals).""" try: with connection_scope() as conn: cursor = conn.cursor() cursor.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name """) rows = cursor.fetchall() return [row[0] for row in rows] except Exception as exc: _raise_read_error("admin.get_table_names", exc)
[docs] def list_tables() -> list[dict[str, Any]]: """Return table metadata for admin database browsing UIs.""" try: with connection_scope() as conn: cursor = conn.cursor() tables: list[dict[str, Any]] = [] for table_name in get_table_names(): quoted_table = _quote_identifier(table_name) cursor.execute(f"PRAGMA table_info({quoted_table})") columns = [row[1] for row in cursor.fetchall()] cursor.execute(f"SELECT COUNT(*) FROM {quoted_table}") # nosec B608 row_count = int(cursor.fetchone()[0]) tables.append( { "name": table_name, "columns": columns, "row_count": row_count, } ) return tables except Exception as exc: _raise_read_error("admin.list_tables", exc)
[docs] def get_schema_map() -> list[dict[str, Any]]: """Return table schemas and foreign key relationships for admin tooling.""" try: with connection_scope() as conn: cursor = conn.cursor() schema: list[dict[str, Any]] = [] for table_name in get_table_names(): quoted_table = _quote_identifier(table_name) cursor.execute(f"PRAGMA table_info({quoted_table})") columns = [row[1] for row in cursor.fetchall()] cursor.execute(f"PRAGMA foreign_key_list({quoted_table})") foreign_keys = [ { "from_column": row[3], "ref_table": row[2], "ref_column": row[4], "on_update": row[5], "on_delete": row[6], } for row in cursor.fetchall() ] schema.append( { "name": table_name, "columns": columns, "foreign_keys": foreign_keys, } ) return schema except Exception as exc: _raise_read_error("admin.get_schema_map", exc)
[docs] def get_table_rows( table_name: str, limit: int = 100, offset: int = 0, ) -> tuple[list[str], list[list[Any]]]: """Return column names and row values for a table. Raises: ValueError: If ``table_name`` does not exist in the user-visible schema. """ try: table_names = set(get_table_names()) if table_name not in table_names: raise ValueError(f"Table '{table_name}' does not exist") with connection_scope() as conn: cursor = conn.cursor() quoted_table = _quote_identifier(table_name) cursor.execute(f"PRAGMA table_info({quoted_table})") columns = [row[1] for row in cursor.fetchall()] cursor.execute( f"SELECT * FROM {quoted_table} LIMIT ? OFFSET ?", # nosec B608 (limit, offset), ) rows = [list(row) for row in cursor.fetchall()] return columns, rows except ValueError: raise except Exception as exc: _raise_read_error( "admin.get_table_rows", exc, details=f"table_name={table_name!r}, limit={limit}, offset={offset}", )
[docs] def get_all_users_detailed() -> list[dict[str, Any]]: """Return detailed, non-tombstoned account rows for the Active Users card.""" try: with connection_scope() as conn: cursor = conn.cursor() cursor.execute(""" SELECT u.id, u.username, u.password_hash, u.role, u.account_origin, u.is_guest, u.guest_expires_at, u.created_at, u.last_login, u.is_active, u.tombstoned_at, COUNT(c.id) AS character_count, EXISTS( SELECT 1 FROM sessions s WHERE s.user_id = u.id AND (s.expires_at IS NULL OR datetime(s.expires_at) > datetime('now')) ) AS is_online_account, EXISTS( SELECT 1 FROM sessions s WHERE s.user_id = u.id AND s.character_id IS NOT NULL AND (s.expires_at IS NULL OR datetime(s.expires_at) > datetime('now')) ) AS is_online_in_world, ( SELECT GROUP_CONCAT(world_id) FROM ( SELECT DISTINCT s.world_id AS world_id FROM sessions s WHERE s.user_id = u.id AND s.character_id IS NOT NULL AND s.world_id IS NOT NULL AND ( s.expires_at IS NULL OR datetime(s.expires_at) > datetime('now') ) ORDER BY s.world_id ) ) AS online_world_ids_csv FROM users u LEFT JOIN characters c ON c.user_id = u.id WHERE u.tombstoned_at IS NULL GROUP BY u.id ORDER BY u.created_at DESC """) rows = cursor.fetchall() users: list[dict[str, Any]] = [] for row in rows: online_world_ids_csv = row[14] online_world_ids = ( [world_id for world_id in str(online_world_ids_csv).split(",") if world_id] if online_world_ids_csv else [] ) users.append( { "id": row[0], "username": row[1], "password_hash": row[2][:20] + "..." if len(row[2]) > 20 else row[2], "role": row[3], "account_origin": row[4], "is_guest": bool(row[5]), "guest_expires_at": row[6], "created_at": row[7], "last_login": row[8], "is_active": bool(row[9]), "tombstoned_at": row[10], "character_count": row[11], "is_online_account": bool(row[12]), "is_online_in_world": bool(row[13]), "online_world_ids": online_world_ids, } ) return users except Exception as exc: _raise_read_error("admin.get_all_users_detailed", exc)
[docs] def get_all_users() -> list[dict[str, Any]]: """Return basic account rows for admin summaries.""" try: with connection_scope() as conn: cursor = conn.cursor() cursor.execute(""" SELECT username, role, created_at, last_login, is_active FROM users ORDER BY created_at DESC """) rows = cursor.fetchall() return [ { "username": row[0], "role": row[1], "created_at": row[2], "last_login": row[3], "is_active": bool(row[4]), } for row in rows ] except Exception as exc: _raise_read_error("admin.get_all_users", exc)
[docs] def get_character_locations(*, world_id: str | None = None) -> list[dict[str, Any]]: """Return character location rows with names for admin displays.""" try: with connection_scope() as conn: cursor = conn.cursor() if world_id is None: cursor.execute(""" SELECT c.id, c.name, l.world_id, l.room_id, l.updated_at FROM character_locations l JOIN characters c ON c.id = l.character_id ORDER BY c.id """) else: cursor.execute( """ SELECT c.id, c.name, l.world_id, l.room_id, l.updated_at FROM character_locations l JOIN characters c ON c.id = l.character_id WHERE l.world_id = ? ORDER BY c.id """, (world_id,), ) rows = cursor.fetchall() locations: list[dict[str, Any]] = [] for row in rows: locations.append( { "character_id": row[0], "character_name": row[1], "world_id": row[2], "room_id": row[3], "updated_at": row[4], } ) return locations except Exception as exc: _raise_read_error( "admin.get_character_locations", exc, details=f"world_id={world_id!r}", )
[docs] def get_all_sessions(*, world_id: str | None = None) -> list[dict[str, Any]]: """Return all active (non-expired) sessions for optional world scope.""" try: with connection_scope() as conn: cursor = conn.cursor() if world_id is None: cursor.execute(""" SELECT s.id, u.username, c.name, s.world_id, s.session_id, s.created_at, s.last_activity, s.expires_at, s.client_type FROM sessions s JOIN users u ON u.id = s.user_id LEFT JOIN characters c ON c.id = s.character_id WHERE s.expires_at IS NULL OR datetime(s.expires_at) > datetime('now') ORDER BY s.created_at DESC """) else: cursor.execute( """ SELECT s.id, u.username, c.name, s.world_id, s.session_id, s.created_at, s.last_activity, s.expires_at, s.client_type FROM sessions s JOIN users u ON u.id = s.user_id LEFT JOIN characters c ON c.id = s.character_id WHERE (s.expires_at IS NULL OR datetime(s.expires_at) > datetime('now')) AND s.world_id = ? ORDER BY s.created_at DESC """, (world_id,), ) rows = cursor.fetchall() sessions: list[dict[str, Any]] = [] for row in rows: sessions.append( { "id": row[0], "username": row[1], "character_name": row[2], "world_id": row[3], "session_id": row[4], "created_at": row[5], "last_activity": row[6], "expires_at": row[7], "client_type": row[8], } ) return sessions except Exception as exc: _raise_read_error( "admin.get_all_sessions", exc, details=f"world_id={world_id!r}", )
[docs] def get_active_connections(*, world_id: str | None = None) -> list[dict[str, Any]]: """Return active session rows including derived activity age seconds.""" from mud_server.config import config try: with connection_scope() as conn: cursor = conn.cursor() where_clauses = ["(s.expires_at IS NULL OR datetime(s.expires_at) > datetime('now'))"] params: list[str] = [] if config.session.active_window_minutes > 0: where_clauses.append("datetime(s.last_activity) >= datetime('now', ?)") params.append(f"-{config.session.active_window_minutes} minutes") sql = f""" SELECT s.id, u.username, c.name, s.world_id, s.session_id, s.created_at, s.last_activity, s.expires_at, s.client_type, CAST(strftime('%s','now') - strftime('%s', s.last_activity) AS INTEGER) AS age_seconds FROM sessions s JOIN users u ON u.id = s.user_id LEFT JOIN characters c ON c.id = s.character_id WHERE {" AND ".join(where_clauses)} {"" if world_id is None else "AND s.world_id = ?"} ORDER BY s.last_activity DESC """ # nosec B608 if world_id is None: cursor.execute(sql, params) else: cursor.execute(sql, [*params, world_id]) rows = cursor.fetchall() sessions: list[dict[str, Any]] = [] for row in rows: sessions.append( { "id": row[0], "username": row[1], "character_name": row[2], "world_id": row[3], "session_id": row[4], "created_at": row[5], "last_activity": row[6], "expires_at": row[7], "client_type": row[8], "age_seconds": row[9], } ) return sessions except Exception as exc: _raise_read_error( "admin.get_active_connections", exc, details=f"world_id={world_id!r}", )
[docs] def get_all_chat_messages(limit: int = 100, *, world_id: str | None = None) -> list[dict[str, Any]]: """Return recent chat messages across all rooms for optional world scope.""" try: with connection_scope() as conn: cursor = conn.cursor() if world_id is None: cursor.execute( """ SELECT m.id, c.name, m.message, m.world_id, m.room, m.timestamp FROM chat_messages m JOIN characters c ON c.id = m.character_id ORDER BY m.timestamp DESC LIMIT ? """, (limit,), ) else: cursor.execute( """ SELECT m.id, c.name, m.message, m.world_id, m.room, m.timestamp FROM chat_messages m JOIN characters c ON c.id = m.character_id WHERE m.world_id = ? ORDER BY m.timestamp DESC LIMIT ? """, (world_id, limit), ) rows = cursor.fetchall() messages: list[dict[str, Any]] = [] for row in rows: messages.append( { "id": row[0], "username": row[1], "message": row[2], "world_id": row[3], "room_id": row[4], "timestamp": row[5], } ) return messages except Exception as exc: _raise_read_error( "admin.get_all_chat_messages", exc, details=f"world_id={world_id!r}, limit={limit}", )