"""
Database initialization and management for the MUD server.
This module provides all database operations for the MUD server using SQLite.
It handles:
- Database schema initialization
- User account management (create, authentication, roles)
- Character management (creation, locations, inventory)
- Session tracking (login/logout, active users)
- Chat message storage and retrieval
Database Design:
Tables:
- users: Account identities (login, role, status)
- characters: World-facing personas owned by users
- character_locations: Per-character room state
- sessions: Active login sessions with activity tracking
- chat_messages: All chat messages with room and recipient info
Security Considerations:
- Passwords hashed with bcrypt (never plain text)
- Email stored as hashed value only (privacy-first)
- SQL injection prevented using parameterized queries
- Session IDs are UUIDs (hard to guess)
Performance Notes:
- SQLite handles basic concurrency (~50-100 players)
- No connection pooling (single file database)
- Suitable for small-medium deployments
"""
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
from typing import Any, cast
# ==========================================================================
# CONFIGURATION
# ==========================================================================
DEFAULT_WORLD_ID = "pipeworks_web"
# Default world identifier used for legacy code paths that do not yet provide
# an explicit world_id. This keeps the server functional during migration and
# will be replaced by config-driven defaults in a later phase.
def _get_db_path() -> Path:
"""
Get the database path from configuration.
Returns:
Absolute path to the SQLite database file.
"""
from mud_server.config import config
return config.database.absolute_path
# ==========================================================================
# DATABASE INITIALIZATION
# ==========================================================================
[docs]
def init_database(*, skip_superuser: bool = False) -> None:
"""
Initialize the SQLite database with required tables.
Creates all necessary tables if they don't exist. If MUD_ADMIN_USER and
MUD_ADMIN_PASSWORD environment variables are set and no users exist,
creates a superuser with those credentials (unless skip_superuser=True).
Args:
skip_superuser: If True, skip superuser creation from env vars.
Side Effects:
- Creates data/mud.db file if it doesn't exist
- Creates tables if they don't exist
- Creates superuser if env vars set and no users exist
"""
import os
from mud_server.api.password import hash_password
from mud_server.config import config
db_path = _get_db_path()
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
email_hash TEXT UNIQUE,
role TEXT NOT NULL DEFAULT 'player',
is_active INTEGER NOT NULL DEFAULT 1,
is_guest INTEGER NOT NULL DEFAULT 0,
guest_expires_at TIMESTAMP,
account_origin TEXT NOT NULL DEFAULT 'legacy',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP,
tombstoned_at TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS characters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
name TEXT UNIQUE NOT NULL,
world_id TEXT NOT NULL,
inventory TEXT NOT NULL DEFAULT '[]',
is_guest_created INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE SET NULL
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS character_locations (
character_id INTEGER PRIMARY KEY,
world_id TEXT NOT NULL,
room_id TEXT NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(character_id) REFERENCES characters(id) ON DELETE CASCADE
)
""")
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_character_locations_room_id "
"ON character_locations(room_id)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_character_locations_world_room "
"ON character_locations(world_id, room_id)"
)
cursor.execute("""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
character_id INTEGER,
world_id TEXT,
session_id TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_activity TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP,
client_type TEXT DEFAULT 'unknown',
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY(character_id) REFERENCES characters(id) ON DELETE SET NULL
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS chat_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
character_id INTEGER,
user_id INTEGER,
message TEXT NOT NULL,
world_id TEXT NOT NULL,
room TEXT NOT NULL,
recipient_character_id INTEGER,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(character_id) REFERENCES characters(id) ON DELETE SET NULL,
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE SET NULL,
FOREIGN KEY(recipient_character_id) REFERENCES characters(id) ON DELETE SET NULL
)
""")
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_chat_messages_world_room "
"ON chat_messages(world_id, room)"
)
cursor.execute("""
CREATE TABLE IF NOT EXISTS worlds (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
is_active INTEGER NOT NULL DEFAULT 1,
config_json TEXT NOT NULL DEFAULT '{}',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS world_permissions (
user_id INTEGER NOT NULL,
world_id TEXT NOT NULL,
can_access INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, world_id),
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY(world_id) REFERENCES worlds(id) ON DELETE CASCADE
)
""")
# Ensure the default world exists in the catalog.
cursor.execute(
"""
INSERT OR IGNORE INTO worlds (id, name, description, is_active, config_json)
VALUES (?, ?, '', 1, '{}')
""",
(DEFAULT_WORLD_ID, DEFAULT_WORLD_ID),
)
_create_character_limit_triggers(conn, max_slots=config.characters.max_slots)
conn.commit()
if skip_superuser:
conn.close()
return
cursor.execute("SELECT COUNT(*) FROM users")
user_count = int(cursor.fetchone()[0])
if user_count == 0:
admin_user = os.environ.get("MUD_ADMIN_USER")
admin_password = os.environ.get("MUD_ADMIN_PASSWORD")
if admin_user and admin_password:
if len(admin_password) < 8:
print("Warning: MUD_ADMIN_PASSWORD must be at least 8 characters. Skipping.")
else:
password_hash = hash_password(admin_password)
cursor.execute(
"""
INSERT INTO users (username, password_hash, role, account_origin)
VALUES (?, ?, ?, ?)
""",
(admin_user, password_hash, "superuser", "system"),
)
user_id = cursor.lastrowid
if user_id is None:
raise ValueError("Failed to create superuser.")
character_id = _create_default_character(
cursor, int(user_id), admin_user, world_id=DEFAULT_WORLD_ID
)
_seed_character_location(cursor, character_id, world_id=DEFAULT_WORLD_ID)
conn.commit()
print("\n" + "=" * 60)
print("SUPERUSER CREATED FROM ENVIRONMENT VARIABLES")
print("=" * 60)
print(f"Username: {admin_user}")
print("=" * 60 + "\n")
else:
print("\n" + "=" * 60)
print("DATABASE INITIALIZED (no superuser created)")
print("=" * 60)
print("To create a superuser, either:")
print(" 1. Set MUD_ADMIN_USER and MUD_ADMIN_PASSWORD environment variables")
print(" and run: mud-server init-db")
print(" 2. Run interactively: mud-server create-superuser")
print("=" * 60 + "\n")
conn.close()
def _create_character_limit_triggers(conn: sqlite3.Connection, *, max_slots: int) -> None:
"""
Create triggers that enforce the per-user character slot limit.
Note:
SQLite cannot read config at runtime inside a trigger. We bake the
configured limit into the trigger at init time.
"""
cursor = conn.cursor()
cursor.execute("DROP TRIGGER IF EXISTS enforce_character_limit_insert")
cursor.execute("DROP TRIGGER IF EXISTS enforce_character_limit_update")
cursor.execute(f"""
CREATE TRIGGER enforce_character_limit_insert
BEFORE INSERT ON characters
WHEN NEW.user_id IS NOT NULL
BEGIN
SELECT
CASE
WHEN (SELECT COUNT(*) FROM characters WHERE user_id = NEW.user_id) >= {int(max_slots)}
THEN RAISE(ABORT, 'character limit exceeded')
END;
END;
""") # nosec B608 - limit is validated and interpolated into DDL
cursor.execute(f"""
CREATE TRIGGER enforce_character_limit_update
BEFORE UPDATE OF user_id ON characters
WHEN NEW.user_id IS NOT NULL
BEGIN
SELECT
CASE
WHEN (SELECT COUNT(*) FROM characters WHERE user_id = NEW.user_id) >= {int(max_slots)}
THEN RAISE(ABORT, 'character limit exceeded')
END;
END;
""") # nosec B608 - limit is validated and interpolated into DDL
def _generate_default_character_name(cursor: Any, username: str) -> str:
"""
Generate a unique default character name for the given username.
The name intentionally differs from the account username to reduce
confusion in admin views (characters vs. users).
"""
base = f"{username}_char"
candidate = base
counter = 1
while True:
cursor.execute("SELECT 1 FROM characters WHERE name = ? LIMIT 1", (candidate,))
if cursor.fetchone() is None:
return candidate
counter += 1
candidate = f"{base}_{counter}"
def _create_default_character(
cursor: Any, user_id: int, username: str, *, world_id: str = DEFAULT_WORLD_ID
) -> int:
"""
Create a default character for a user during bootstrap flows.
Returns:
The newly created character id.
"""
character_name = _generate_default_character_name(cursor, username)
cursor.execute(
"""
INSERT INTO characters (user_id, name, world_id, is_guest_created)
VALUES (?, ?, ?, 0)
""",
(user_id, character_name, world_id),
)
character_id = cursor.lastrowid
if character_id is None:
raise ValueError("Failed to create default character.")
return int(character_id)
def _seed_character_location(
cursor: Any, character_id: int, *, world_id: str = DEFAULT_WORLD_ID
) -> None:
"""Seed a new character's location to the spawn room for the given world."""
cursor.execute(
"""
INSERT INTO character_locations (character_id, world_id, room_id)
VALUES (?, ?, ?)
""",
(character_id, world_id, "spawn"),
)
def _resolve_character_name(cursor: Any, name: str, *, world_id: str | None = None) -> str | None:
"""
Resolve a character name from either a character name or a username.
This preserves compatibility with legacy callers that pass usernames
into character-facing functions by mapping them to the user's first
character (oldest by created_at).
"""
if world_id is None:
world_id = DEFAULT_WORLD_ID
cursor.execute(
"SELECT name FROM characters WHERE name = ? AND world_id = ? LIMIT 1",
(name, world_id),
)
row = cursor.fetchone()
if row:
return cast(str, row[0])
cursor.execute("SELECT id FROM users WHERE username = ? LIMIT 1", (name,))
user_row = cursor.fetchone()
if not user_row:
return None
user_id = int(user_row[0])
cursor.execute(
"SELECT name FROM characters WHERE user_id = ? AND world_id = ? "
"ORDER BY created_at ASC LIMIT 1",
(user_id, world_id),
)
char_row = cursor.fetchone()
return cast(str, char_row[0]) if char_row else None
[docs]
def resolve_character_name(name: str, *, world_id: str | None = None) -> str | None:
"""
Public wrapper for resolving character names from usernames or character names.
This preserves legacy call sites that still supply usernames while the
character model is being adopted across the codebase.
"""
conn = get_connection()
cursor = conn.cursor()
resolved = _resolve_character_name(cursor, name, world_id=world_id)
conn.close()
return resolved
# ==========================================================================
# CONNECTION MANAGEMENT
# ==========================================================================
[docs]
def get_connection() -> sqlite3.Connection:
"""
Create a new SQLite connection to the database file.
Returns:
sqlite3.Connection object
"""
return sqlite3.connect(str(_get_db_path()))
# ==========================================================================
# USER ACCOUNT MANAGEMENT
# ==========================================================================
[docs]
def create_user_with_password(
username: str,
password: str,
*,
role: str = "player",
account_origin: str = "legacy",
email_hash: str | None = None,
is_guest: bool = False,
guest_expires_at: str | None = None,
create_default_character: bool = True,
world_id: str = DEFAULT_WORLD_ID,
) -> bool:
"""
Create a new user account and (optionally) a default character.
Args:
username: Unique account username.
password: Plain text password (hashed with bcrypt).
role: Role string.
account_origin: Provenance marker for cleanup/auditing.
email_hash: Hashed email value (nullable during development).
is_guest: Whether this is a guest account.
guest_expires_at: Expiration timestamp for guest accounts.
create_default_character: If True, create a character with the same name.
world_id: World to bind the default character to.
Returns:
True if created successfully, False if username already exists.
"""
from mud_server.api.password import hash_password
try:
conn = get_connection()
cursor = conn.cursor()
password_hash = hash_password(password)
cursor.execute(
"""
INSERT INTO users (
username,
password_hash,
email_hash,
role,
is_guest,
guest_expires_at,
account_origin
)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
username,
password_hash,
email_hash,
role,
int(is_guest),
guest_expires_at,
account_origin,
),
)
user_id = cursor.lastrowid
if user_id is None:
raise ValueError("Failed to create user.")
user_id = int(user_id)
if create_default_character:
character_id = _create_default_character(cursor, user_id, username, world_id=world_id)
_seed_character_location(cursor, character_id, world_id=world_id)
conn.commit()
conn.close()
return True
except sqlite3.IntegrityError:
return False
[docs]
def create_character_for_user(
user_id: int,
name: str,
*,
is_guest_created: bool = False,
room_id: str = "spawn",
world_id: str = DEFAULT_WORLD_ID,
) -> bool:
"""
Create a character for an existing user.
Args:
user_id: Owning user id.
name: Character name (globally unique for now).
is_guest_created: Marks characters created from guest flow.
room_id: Initial room id.
world_id: World the character belongs to.
Returns:
True if character created, False on constraint violation.
"""
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO characters (user_id, name, world_id, is_guest_created)
VALUES (?, ?, ?, ?)
""",
(user_id, name, world_id, int(is_guest_created)),
)
character_id = cursor.lastrowid
if character_id is None:
raise ValueError("Failed to create character.")
character_id = int(character_id)
_seed_character_location(cursor, character_id, world_id=world_id)
if room_id != "spawn":
cursor.execute(
"""
UPDATE character_locations
SET room_id = ?, updated_at = CURRENT_TIMESTAMP
WHERE character_id = ?
""",
(room_id, character_id),
)
conn.commit()
conn.close()
return True
except sqlite3.IntegrityError:
return False
[docs]
def user_exists(username: str) -> bool:
"""Return True if a user account exists."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT id FROM users WHERE username = ?", (username,))
result = cursor.fetchone()
conn.close()
return result is not None
[docs]
def get_user_id(username: str) -> int | None:
"""Return user id for the given username, or None if not found."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT id FROM users WHERE username = ?", (username,))
row = cursor.fetchone()
conn.close()
return int(row[0]) if row else None
[docs]
def get_username_by_id(user_id: int) -> str | None:
"""Return username for a user id, or None if not found."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT username FROM users WHERE id = ?", (user_id,))
row = cursor.fetchone()
conn.close()
return row[0] if row else None
[docs]
def get_user_role(username: str) -> str | None:
"""Return the role for a username, or None if not found."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT role FROM users WHERE username = ?", (username,))
row = cursor.fetchone()
conn.close()
return row[0] if row else None
[docs]
def get_user_account_origin(username: str) -> str | None:
"""Return account_origin for the given username."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT account_origin FROM users WHERE username = ?", (username,))
row = cursor.fetchone()
conn.close()
return row[0] if row else None
[docs]
def set_user_role(username: str, role: str) -> bool:
"""Update a user's role."""
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("UPDATE users SET role = ? WHERE username = ?", (role, username))
conn.commit()
conn.close()
return True
except Exception:
return False
[docs]
def verify_password_for_user(username: str, password: str) -> bool:
"""
Verify a password against stored bcrypt hash.
Uses a dummy hash for timing safety when user doesn't exist.
"""
from mud_server.api.password import verify_password
DUMMY_HASH = "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.G5j1L3tDPZ3q4q" # nosec B105
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT password_hash FROM users WHERE username = ?", (username,))
row = cursor.fetchone()
conn.close()
if not row:
verify_password(password, DUMMY_HASH)
return False
return verify_password(password, row[0])
[docs]
def is_user_active(username: str) -> bool:
"""Return True if the user is active (not banned)."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT is_active FROM users WHERE username = ?", (username,))
row = cursor.fetchone()
conn.close()
return bool(row[0]) if row else False
[docs]
def deactivate_user(username: str) -> bool:
"""Deactivate (ban) a user account."""
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("UPDATE users SET is_active = 0 WHERE username = ?", (username,))
conn.commit()
conn.close()
return True
except Exception:
return False
[docs]
def activate_user(username: str) -> bool:
"""Activate (unban) a user account."""
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("UPDATE users SET is_active = 1 WHERE username = ?", (username,))
conn.commit()
conn.close()
return True
except Exception:
return False
[docs]
def change_password_for_user(username: str, new_password: str) -> bool:
"""Change a user's password (hashes with bcrypt)."""
from mud_server.api.password import hash_password
try:
conn = get_connection()
cursor = conn.cursor()
password_hash = hash_password(new_password)
cursor.execute(
"UPDATE users SET password_hash = ? WHERE username = ?",
(password_hash, username),
)
conn.commit()
conn.close()
return True
except Exception:
return False
[docs]
def tombstone_user(user_id: int) -> None:
"""Tombstone a user account without deleting rows."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"""
UPDATE users
SET is_active = 0,
tombstoned_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(user_id,),
)
conn.commit()
conn.close()
[docs]
def delete_user(username: str) -> bool:
"""
Delete a user account while preserving character data.
This performs:
- Unlink characters from the user (user_id -> NULL)
- Remove all sessions
- Tombstone the user row (soft delete)
"""
user_id = get_user_id(username)
if not user_id:
return False
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("UPDATE characters SET user_id = NULL WHERE user_id = ?", (user_id,))
cursor.execute("DELETE FROM sessions WHERE user_id = ?", (user_id,))
cursor.execute(
"UPDATE users SET is_active = 0, tombstoned_at = CURRENT_TIMESTAMP WHERE id = ?",
(user_id,),
)
conn.commit()
conn.close()
return True
except Exception:
return False
# ==========================================================================
# CHARACTER MANAGEMENT
# ==========================================================================
[docs]
def character_exists(name: str) -> bool:
"""Return True if a character with this name exists."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT id FROM characters WHERE name = ?", (name,))
row = cursor.fetchone()
conn.close()
return row is not None
[docs]
def get_character_by_name(name: str) -> dict[str, Any] | None:
"""Return character row by name."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"""
SELECT id, user_id, name, world_id, inventory, is_guest_created, created_at, updated_at
FROM characters
WHERE name = ?
""",
(name,),
)
row = cursor.fetchone()
conn.close()
if not row:
return None
return {
"id": int(row[0]),
"user_id": row[1],
"name": row[2],
"world_id": row[3],
"inventory": row[4],
"is_guest_created": bool(row[5]),
"created_at": row[6],
"updated_at": row[7],
}
[docs]
def get_character_by_id(character_id: int) -> dict[str, Any] | None:
"""Return character row by id."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"""
SELECT id, user_id, name, world_id, inventory, is_guest_created, created_at, updated_at
FROM characters
WHERE id = ?
""",
(character_id,),
)
row = cursor.fetchone()
conn.close()
if not row:
return None
return {
"id": int(row[0]),
"user_id": row[1],
"name": row[2],
"world_id": row[3],
"inventory": row[4],
"is_guest_created": bool(row[5]),
"created_at": row[6],
"updated_at": row[7],
}
[docs]
def get_character_name_by_id(character_id: int) -> str | None:
"""Return character name for the given id, or None if not found."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT name FROM characters WHERE id = ?", (character_id,))
row = cursor.fetchone()
conn.close()
return row[0] if row else None
[docs]
def get_user_characters(user_id: int, *, world_id: str | None = None) -> list[dict[str, Any]]:
"""
Return all characters owned by the given user for a world.
When world_id is omitted, the default world is used to keep legacy code
paths functional during the migration.
"""
if world_id is None:
world_id = DEFAULT_WORLD_ID
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"""
SELECT id, name, world_id, is_guest_created, created_at, updated_at
FROM characters
WHERE user_id = ? AND world_id = ?
ORDER BY created_at ASC
""",
(user_id, world_id),
)
rows = cursor.fetchall()
conn.close()
return [
{
"id": int(row[0]),
"name": row[1],
"world_id": row[2],
"is_guest_created": bool(row[3]),
"created_at": row[4],
"updated_at": row[5],
}
for row in rows
]
[docs]
def get_user_character_world_ids(user_id: int) -> set[str]:
"""
Return the set of world ids in which the user has characters.
This is used to enforce allow_multi_world_characters when creating
new characters.
"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"""
SELECT DISTINCT world_id
FROM characters
WHERE user_id = ?
""",
(user_id,),
)
rows = cursor.fetchall()
conn.close()
return {row[0] for row in rows}
[docs]
def unlink_characters_for_user(user_id: int) -> None:
"""Detach characters from a user (used when tombstoning guest accounts)."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("UPDATE characters SET user_id = NULL WHERE user_id = ?", (user_id,))
conn.commit()
conn.close()
# ==========================================================================
# CHARACTER STATE AND LOCATION
# ==========================================================================
[docs]
def get_character_room(name: str, *, world_id: str | None = None) -> str | None:
"""Return the current room for a character by name within a world."""
if world_id is None:
world_id = DEFAULT_WORLD_ID
conn = get_connection()
cursor = conn.cursor()
resolved_name = _resolve_character_name(cursor, name, world_id=world_id)
if not resolved_name:
conn.close()
return None
cursor.execute("SELECT id FROM characters WHERE name = ?", (resolved_name,))
row = cursor.fetchone()
if not row:
conn.close()
return None
character_id = int(row[0])
cursor.execute(
"SELECT room_id FROM character_locations WHERE character_id = ? AND world_id = ?",
(character_id, world_id),
)
row = cursor.fetchone()
conn.close()
return row[0] if row else None
[docs]
def set_character_room(name: str, room: str, *, world_id: str | None = None) -> bool:
"""Set the current room for a character by name within a world."""
if world_id is None:
world_id = DEFAULT_WORLD_ID
try:
conn = get_connection()
cursor = conn.cursor()
resolved_name = _resolve_character_name(cursor, name, world_id=world_id)
if not resolved_name:
conn.close()
return False
cursor.execute("SELECT id FROM characters WHERE name = ?", (resolved_name,))
row = cursor.fetchone()
if not row:
conn.close()
return False
character_id = int(row[0])
cursor.execute(
"""
INSERT INTO character_locations (character_id, world_id, room_id, updated_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(character_id) DO UPDATE
SET world_id = excluded.world_id,
room_id = excluded.room_id,
updated_at = CURRENT_TIMESTAMP
""",
(character_id, world_id, room),
)
conn.commit()
conn.close()
return True
except Exception:
return False
[docs]
def get_characters_in_room(room: str, *, world_id: str | None = None) -> list[str]:
"""Return character names in a room with active sessions for a world."""
if world_id is None:
world_id = DEFAULT_WORLD_ID
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"""
SELECT DISTINCT c.name
FROM characters c
JOIN character_locations l ON c.id = l.character_id
JOIN sessions s ON s.character_id = c.id
WHERE l.world_id = ?
AND l.room_id = ?
AND (s.world_id IS NULL OR s.world_id = ?)
AND (s.expires_at IS NULL OR datetime(s.expires_at) > datetime('now'))
""",
(world_id, room, world_id),
)
rows = cursor.fetchall()
conn.close()
return [row[0] for row in rows]
# ==========================================================================
# INVENTORY MANAGEMENT
# ==========================================================================
[docs]
def get_character_inventory(name: str) -> list[str]:
"""Return the character inventory as a list of item ids."""
conn = get_connection()
cursor = conn.cursor()
resolved_name = _resolve_character_name(cursor, name)
if not resolved_name:
conn.close()
return []
cursor.execute("SELECT inventory FROM characters WHERE name = ?", (resolved_name,))
row = cursor.fetchone()
conn.close()
if not row:
return []
inventory: list[str] = json.loads(row[0])
return inventory
[docs]
def set_character_inventory(name: str, inventory: list[str]) -> bool:
"""Set the character inventory."""
try:
conn = get_connection()
cursor = conn.cursor()
resolved_name = _resolve_character_name(cursor, name)
if not resolved_name:
conn.close()
return False
cursor.execute(
"UPDATE characters SET inventory = ?, updated_at = CURRENT_TIMESTAMP WHERE name = ?",
(json.dumps(inventory), resolved_name),
)
conn.commit()
conn.close()
return True
except Exception:
return False
# ==========================================================================
# CHAT MESSAGES
# ==========================================================================
[docs]
def add_chat_message(
character_name: str,
message: str,
room: str,
recipient_character_name: str | None = None,
recipient: str | None = None,
*,
world_id: str | None = None,
) -> bool:
"""
Add a chat message for a character.
Supports optional whisper recipient and uses world scoping. If world_id
is omitted, the default world is used during migration.
"""
if world_id is None:
world_id = DEFAULT_WORLD_ID
try:
conn = get_connection()
cursor = conn.cursor()
resolved_sender = _resolve_character_name(cursor, character_name, world_id=world_id)
if not resolved_sender:
conn.close()
return False
cursor.execute(
"SELECT id, user_id FROM characters WHERE name = ? AND world_id = ?",
(resolved_sender, world_id),
)
sender_row = cursor.fetchone()
if not sender_row:
conn.close()
return False
sender_id = int(sender_row[0])
user_id = sender_row[1]
recipient_id: int | None = None
if recipient_character_name is None and recipient is not None:
recipient_character_name = recipient
if recipient_character_name:
resolved_recipient = _resolve_character_name(
cursor, recipient_character_name, world_id=world_id
)
if resolved_recipient:
recipient_character_name = resolved_recipient
cursor.execute(
"SELECT id FROM characters WHERE name = ? AND world_id = ?",
(recipient_character_name, world_id),
)
recipient_row = cursor.fetchone()
if recipient_row:
recipient_id = int(recipient_row[0])
cursor.execute(
"""
INSERT INTO chat_messages (
character_id,
user_id,
message,
world_id,
room,
recipient_character_id
)
VALUES (?, ?, ?, ?, ?, ?)
""",
(sender_id, user_id, message, world_id, room, recipient_id),
)
conn.commit()
conn.close()
return True
except Exception:
return False
[docs]
def get_room_messages(
room: str,
*,
limit: int = 50,
character_name: str | None = None,
username: str | None = None,
world_id: str | None = None,
) -> list[dict[str, Any]]:
"""
Get recent messages from a room. Filters whispers based on character.
Messages are scoped to the provided world_id; default world is used when
omitted to preserve legacy code paths during migration.
"""
if world_id is None:
world_id = DEFAULT_WORLD_ID
conn = get_connection()
cursor = conn.cursor()
if character_name is None and username is not None:
character_name = username
if character_name:
resolved_name = _resolve_character_name(cursor, character_name, world_id=world_id)
if resolved_name is None and username is not None:
resolved_name = _resolve_character_name(cursor, username, world_id=world_id)
if not resolved_name:
conn.close()
return []
cursor.execute(
"SELECT id FROM characters WHERE name = ? AND world_id = ?",
(resolved_name, world_id),
)
row = cursor.fetchone()
if not row:
conn.close()
return []
character_id = int(row[0])
cursor.execute(
"""
SELECT c.name, m.message, m.timestamp
FROM chat_messages m
JOIN characters c ON c.id = m.character_id
WHERE m.world_id = ? AND m.room = ? AND (
m.recipient_character_id IS NULL OR
m.recipient_character_id = ? OR
m.character_id = ?
)
ORDER BY m.timestamp DESC, m.id DESC
LIMIT ?
""",
(world_id, room, character_id, character_id, limit),
)
else:
cursor.execute(
"""
SELECT c.name, m.message, m.timestamp
FROM chat_messages m
JOIN characters c ON c.id = m.character_id
WHERE m.world_id = ? AND m.room = ?
ORDER BY m.timestamp DESC, m.id DESC
LIMIT ?
""",
(world_id, room, limit),
)
rows = cursor.fetchall()
conn.close()
messages = []
for name, message, timestamp in reversed(rows):
messages.append({"username": name, "message": message, "timestamp": timestamp})
return messages
# ==========================================================================
# SESSION MANAGEMENT
# ==========================================================================
[docs]
def create_session(
user_id: int | str,
session_id: str,
*,
client_type: str = "unknown",
character_id: int | None = None,
world_id: str | None = None,
) -> bool:
"""
Create a new session record for a user.
Behavior depends on configuration:
- allow_multiple_sessions = False: remove existing sessions for the user
- allow_multiple_sessions = True: keep existing sessions
If world_id is omitted, the default world is used. This is temporary
compatibility behavior until the API layer is updated to provide it.
"""
from mud_server.config import config
try:
if world_id is None:
world_id = DEFAULT_WORLD_ID
if isinstance(user_id, str):
resolved = get_user_id(user_id)
if not resolved:
return False
user_id = resolved
if character_id is None:
conn = get_connection()
cursor = conn.cursor()
if world_id:
cursor.execute(
"SELECT id FROM characters WHERE user_id = ? AND world_id = ? "
"ORDER BY created_at ASC",
(user_id, world_id),
)
else:
cursor.execute(
"SELECT id FROM characters WHERE user_id = ? ORDER BY created_at ASC",
(user_id,),
)
rows = cursor.fetchall()
conn.close()
if len(rows) == 1:
character_id = int(rows[0][0])
conn = get_connection()
cursor = conn.cursor()
if not config.session.allow_multiple_sessions:
cursor.execute("DELETE FROM sessions WHERE user_id = ?", (user_id,))
client_type = client_type.strip().lower() if client_type else "unknown"
if config.session.ttl_minutes > 0:
cursor.execute(
"""
INSERT INTO sessions (
user_id,
character_id,
world_id,
session_id,
created_at,
last_activity,
expires_at,
client_type
)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, datetime('now', ?), ?)
""",
(
user_id,
character_id,
world_id,
session_id,
f"+{config.session.ttl_minutes} minutes",
client_type,
),
)
else:
cursor.execute(
"""
INSERT INTO sessions (
user_id,
character_id,
world_id,
session_id,
created_at,
last_activity,
expires_at,
client_type
)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, NULL, ?)
""",
(user_id, character_id, world_id, session_id, client_type),
)
cursor.execute(
"UPDATE users SET last_login = CURRENT_TIMESTAMP WHERE id = ?",
(user_id,),
)
conn.commit()
conn.close()
return True
except Exception:
return False
[docs]
def set_session_character(
session_id: str, character_id: int, *, world_id: str | None = None
) -> bool:
"""Attach a character (and optionally world) to an existing session."""
if world_id is None:
world_id = DEFAULT_WORLD_ID
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"UPDATE sessions SET character_id = ?, world_id = ? WHERE session_id = ?",
(character_id, world_id, session_id),
)
conn.commit()
conn.close()
return True
except Exception:
return False
[docs]
def remove_session_by_id(session_id: str) -> bool:
"""Remove a specific session by its session_id."""
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
removed = int(cursor.rowcount or 0)
conn.commit()
conn.close()
return removed > 0
except Exception:
return False
[docs]
def remove_sessions_for_user(user_id: int) -> bool:
"""Remove all sessions for a user (used for forced logout/ban)."""
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM sessions WHERE user_id = ?", (user_id,))
removed = int(cursor.rowcount or 0)
conn.commit()
conn.close()
return removed > 0
except Exception:
return False
[docs]
def update_session_activity(session_id: str) -> bool:
"""
Update last_activity for a session and extend expiry when sliding is enabled.
"""
from mud_server.config import config
try:
conn = get_connection()
cursor = conn.cursor()
if config.session.sliding_expiration and config.session.ttl_minutes > 0:
cursor.execute(
"""
UPDATE sessions
SET last_activity = CURRENT_TIMESTAMP,
expires_at = datetime('now', ?)
WHERE session_id = ?
""",
(f"+{config.session.ttl_minutes} minutes", session_id),
)
else:
cursor.execute(
"UPDATE sessions SET last_activity = CURRENT_TIMESTAMP WHERE session_id = ?",
(session_id,),
)
conn.commit()
conn.close()
return True
except Exception:
return False
[docs]
def get_session_by_id(session_id: str) -> dict[str, Any] | None:
"""Return session record by session_id (or None if not found)."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"""
SELECT user_id, character_id, world_id, session_id, created_at, last_activity, expires_at,
client_type
FROM sessions WHERE session_id = ?
""",
(session_id,),
)
row = cursor.fetchone()
conn.close()
if not row:
return None
return {
"user_id": int(row[0]),
"character_id": row[1],
"world_id": row[2],
"session_id": row[3],
"created_at": row[4],
"last_activity": row[5],
"expires_at": row[6],
"client_type": row[7],
}
[docs]
def get_active_session_count() -> int:
"""Count active sessions within the configured activity window."""
from mud_server.config import config
conn = get_connection()
cursor = conn.cursor()
where_clauses = ["(expires_at IS NULL OR datetime(expires_at) > datetime('now'))"]
params: list[str] = []
if config.session.active_window_minutes > 0:
where_clauses.append("datetime(last_activity) >= datetime('now', ?)")
params.append(f"-{config.session.active_window_minutes} minutes")
sql = f"""
SELECT COUNT(*) FROM sessions
WHERE {" AND ".join(where_clauses)}
""" # nosec B608
cursor.execute(sql, params)
row = cursor.fetchone()
count = int(row[0]) if row else 0
conn.close()
return count
[docs]
def cleanup_expired_sessions() -> int:
"""Remove expired sessions based on expires_at timestamp."""
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
DELETE FROM sessions
WHERE expires_at IS NOT NULL AND datetime(expires_at) <= datetime('now')
""")
removed_count: int = cursor.rowcount
conn.commit()
conn.close()
return removed_count
except Exception:
return 0
[docs]
def clear_all_sessions() -> int:
"""Remove all sessions from the database."""
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM sessions")
removed_count: int = cursor.rowcount
conn.commit()
conn.close()
return removed_count
except Exception:
return 0
[docs]
def get_active_characters(*, world_id: str | None = None) -> list[str]:
"""Return character names with active sessions for a world."""
if world_id is None:
world_id = DEFAULT_WORLD_ID
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"""
SELECT DISTINCT c.name
FROM sessions s
JOIN characters c ON c.id = s.character_id
WHERE s.character_id IS NOT NULL
AND (s.world_id IS NULL OR s.world_id = ?)
AND (s.expires_at IS NULL OR datetime(s.expires_at) > datetime('now'))
""",
(world_id,),
)
rows = cursor.fetchall()
conn.close()
return [row[0] for row in rows]
# ==========================================================================
# GUEST ACCOUNT CLEANUP
# ==========================================================================
[docs]
def cleanup_expired_guest_accounts() -> int:
"""
Delete expired guest accounts and unlink their characters.
Returns:
Number of guest users deleted.
"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT id
FROM users
WHERE tombstoned_at IS NULL
AND (
(is_guest = 1 AND guest_expires_at IS NOT NULL
AND datetime(guest_expires_at) <= datetime('now'))
OR
(account_origin = 'visitor'
AND guest_expires_at IS NULL
AND datetime(created_at) <= datetime('now', '-24 hours'))
)
""")
rows = cursor.fetchall()
if not rows:
conn.close()
return 0
user_ids = [int(row[0]) for row in rows]
placeholders = ",".join(["?"] * len(user_ids))
cursor.execute(
f"UPDATE characters SET user_id = NULL WHERE user_id IN ({placeholders})", # nosec B608
user_ids,
)
cursor.execute(
f"DELETE FROM sessions WHERE user_id IN ({placeholders})", # nosec B608
user_ids,
)
cursor.execute(
f"DELETE FROM users WHERE id IN ({placeholders})", # nosec B608
user_ids,
)
conn.commit()
conn.close()
return len(user_ids)
# ==========================================================================
# ADMIN QUERIES
# ==========================================================================
[docs]
def get_world_by_id(world_id: str) -> dict[str, Any] | None:
"""
Return a world catalog entry by id.
Args:
world_id: World identifier (primary key).
Returns:
Dict with world fields or None if not found.
"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"""
SELECT id, name, description, is_active, config_json, created_at
FROM worlds
WHERE id = ?
""",
(world_id,),
)
row = cursor.fetchone()
conn.close()
if not row:
return None
return {
"id": row[0],
"name": row[1],
"description": row[2],
"is_active": bool(row[3]),
"config_json": row[4],
"created_at": row[5],
}
[docs]
def list_worlds(*, include_inactive: bool = False) -> list[dict[str, Any]]:
"""
Return all worlds in the catalog.
Args:
include_inactive: When False, only active worlds are returned.
"""
conn = get_connection()
cursor = conn.cursor()
if include_inactive:
cursor.execute("""
SELECT id, name, description, is_active, config_json, created_at
FROM worlds
ORDER BY id
""")
else:
cursor.execute("""
SELECT id, name, description, is_active, config_json, created_at
FROM worlds
WHERE is_active = 1
ORDER BY id
""")
rows = cursor.fetchall()
conn.close()
return [
{
"id": row[0],
"name": row[1],
"description": row[2],
"is_active": bool(row[3]),
"config_json": row[4],
"created_at": row[5],
}
for row in rows
]
[docs]
def list_worlds_for_user(
user_id: int,
*,
role: str | None = None,
include_inactive: bool = False,
) -> list[dict[str, Any]]:
"""
Return worlds accessible to the given user.
Admins and superusers have implicit access to all worlds. Other roles
must have a world_permissions row with can_access=1.
"""
if role is None:
username = get_username_by_id(user_id)
if username:
role = get_user_role(username)
if role in {"admin", "superuser"}:
return list_worlds(include_inactive=include_inactive)
conn = get_connection()
cursor = conn.cursor()
if include_inactive:
cursor.execute(
"""
SELECT w.id, w.name, w.description, w.is_active, w.config_json, w.created_at
FROM worlds w
JOIN world_permissions p ON p.world_id = w.id
WHERE p.user_id = ? AND p.can_access = 1
ORDER BY w.id
""",
(user_id,),
)
else:
cursor.execute(
"""
SELECT w.id, w.name, w.description, w.is_active, w.config_json, w.created_at
FROM worlds w
JOIN world_permissions p ON p.world_id = w.id
WHERE p.user_id = ? AND p.can_access = 1 AND w.is_active = 1
ORDER BY w.id
""",
(user_id,),
)
rows = cursor.fetchall()
if rows:
conn.close()
return [
{
"id": row[0],
"name": row[1],
"description": row[2],
"is_active": bool(row[3]),
"config_json": row[4],
"created_at": row[5],
}
for row in rows
]
# Fallback: allow worlds where the user already has characters.
cursor.execute(
"""
SELECT DISTINCT w.id, w.name, w.description, w.is_active, w.config_json, w.created_at
FROM worlds w
JOIN characters c ON c.world_id = w.id
WHERE c.user_id = ?
ORDER BY w.id
""",
(user_id,),
)
rows = cursor.fetchall()
conn.close()
return [
{
"id": row[0],
"name": row[1],
"description": row[2],
"is_active": bool(row[3]),
"config_json": row[4],
"created_at": row[5],
}
for row in rows
]
def _quote_identifier(identifier: str) -> str:
"""Safely quote an SQLite identifier (table/column name)."""
escaped = identifier.replace('"', '""')
return f'"{escaped}"'
[docs]
def get_table_names() -> list[str]:
"""Return a sorted list of user-defined table names (excludes sqlite_*)."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT name
FROM sqlite_master
WHERE type='table' AND name NOT LIKE 'sqlite_%'
ORDER BY name
""")
rows = cursor.fetchall()
conn.close()
return [row[0] for row in rows]
[docs]
def list_tables() -> list[dict[str, Any]]:
"""Return table metadata for admin database browsing."""
conn = get_connection()
cursor = conn.cursor()
tables: list[dict[str, Any]] = []
for table_name in get_table_names():
quoted_table = _quote_identifier(table_name)
cursor.execute(f"PRAGMA table_info({quoted_table})")
columns = [row[1] for row in cursor.fetchall()]
cursor.execute(f"SELECT COUNT(*) FROM {quoted_table}") # nosec B608
row_count = int(cursor.fetchone()[0])
tables.append({"name": table_name, "columns": columns, "row_count": row_count})
conn.close()
return tables
[docs]
def get_table_rows(table_name: str, limit: int = 100) -> tuple[list[str], list[list[Any]]]:
"""Return column names and rows for a given table."""
table_names = set(get_table_names())
if table_name not in table_names:
raise ValueError(f"Table '{table_name}' does not exist")
conn = get_connection()
cursor = conn.cursor()
quoted_table = _quote_identifier(table_name)
cursor.execute(f"PRAGMA table_info({quoted_table})")
columns = [row[1] for row in cursor.fetchall()]
cursor.execute(f"SELECT * FROM {quoted_table} LIMIT ?", (limit,)) # nosec B608
rows = [list(row) for row in cursor.fetchall()]
conn.close()
return columns, rows
[docs]
def get_all_users_detailed() -> list[dict[str, Any]]:
"""Return detailed user list for admin database viewer."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT u.id,
u.username,
u.password_hash,
u.role,
u.account_origin,
u.is_guest,
u.guest_expires_at,
u.created_at,
u.last_login,
u.is_active,
u.tombstoned_at,
COUNT(c.id) AS character_count
FROM users u
LEFT JOIN characters c ON c.user_id = u.id
GROUP BY u.id
ORDER BY u.created_at DESC
""")
rows = cursor.fetchall()
conn.close()
users = []
for row in rows:
users.append(
{
"id": row[0],
"username": row[1],
"password_hash": row[2][:20] + "..." if len(row[2]) > 20 else row[2],
"role": row[3],
"account_origin": row[4],
"is_guest": bool(row[5]),
"guest_expires_at": row[6],
"created_at": row[7],
"last_login": row[8],
"is_active": bool(row[9]),
"tombstoned_at": row[10],
"character_count": row[11],
}
)
return users
[docs]
def get_all_users() -> list[dict[str, Any]]:
"""Return basic user list for admin summaries."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT username, role, created_at, last_login, is_active
FROM users
ORDER BY created_at DESC
""")
rows = cursor.fetchall()
conn.close()
return [
{
"username": row[0],
"role": row[1],
"created_at": row[2],
"last_login": row[3],
"is_active": bool(row[4]),
}
for row in rows
]
[docs]
def get_character_locations(*, world_id: str | None = None) -> list[dict[str, Any]]:
"""Return character location rows with names for admin display."""
conn = get_connection()
cursor = conn.cursor()
if world_id is None:
cursor.execute("""
SELECT c.id,
c.name,
l.world_id,
l.room_id,
l.updated_at
FROM character_locations l
JOIN characters c ON c.id = l.character_id
ORDER BY c.id
""")
else:
cursor.execute(
"""
SELECT c.id,
c.name,
l.world_id,
l.room_id,
l.updated_at
FROM character_locations l
JOIN characters c ON c.id = l.character_id
WHERE l.world_id = ?
ORDER BY c.id
""",
(world_id,),
)
rows = cursor.fetchall()
conn.close()
locations: list[dict[str, Any]] = []
for row in rows:
locations.append(
{
"character_id": row[0],
"character_name": row[1],
"world_id": row[2],
"room_id": row[3],
"updated_at": row[4],
}
)
return locations
[docs]
def get_all_sessions(*, world_id: str | None = None) -> list[dict[str, Any]]:
"""Return all active (non-expired) sessions."""
conn = get_connection()
cursor = conn.cursor()
if world_id is None:
cursor.execute("""
SELECT s.id,
u.username,
c.name,
s.world_id,
s.session_id,
s.created_at,
s.last_activity,
s.expires_at,
s.client_type
FROM sessions s
JOIN users u ON u.id = s.user_id
LEFT JOIN characters c ON c.id = s.character_id
WHERE s.expires_at IS NULL OR datetime(s.expires_at) > datetime('now')
ORDER BY s.created_at DESC
""")
else:
cursor.execute(
"""
SELECT s.id,
u.username,
c.name,
s.world_id,
s.session_id,
s.created_at,
s.last_activity,
s.expires_at,
s.client_type
FROM sessions s
JOIN users u ON u.id = s.user_id
LEFT JOIN characters c ON c.id = s.character_id
WHERE (s.expires_at IS NULL OR datetime(s.expires_at) > datetime('now'))
AND (s.world_id IS NULL OR s.world_id = ?)
ORDER BY s.created_at DESC
""",
(world_id,),
)
rows = cursor.fetchall()
conn.close()
sessions = []
for row in rows:
sessions.append(
{
"id": row[0],
"username": row[1],
"character_name": row[2],
"world_id": row[3],
"session_id": row[4],
"created_at": row[5],
"last_activity": row[6],
"expires_at": row[7],
"client_type": row[8],
}
)
return sessions
[docs]
def get_active_connections(*, world_id: str | None = None) -> list[dict[str, Any]]:
"""Return active sessions with activity age in seconds."""
from mud_server.config import config
conn = get_connection()
cursor = conn.cursor()
where_clauses = ["(s.expires_at IS NULL OR datetime(s.expires_at) > datetime('now'))"]
params: list[str] = []
if config.session.active_window_minutes > 0:
where_clauses.append("datetime(s.last_activity) >= datetime('now', ?)")
params.append(f"-{config.session.active_window_minutes} minutes")
sql = f"""
SELECT s.id,
u.username,
c.name,
s.world_id,
s.session_id,
s.created_at,
s.last_activity,
s.expires_at,
s.client_type,
CAST(strftime('%s','now') - strftime('%s', s.last_activity) AS INTEGER) AS age_seconds
FROM sessions s
JOIN users u ON u.id = s.user_id
LEFT JOIN characters c ON c.id = s.character_id
WHERE {" AND ".join(where_clauses)} {"" if world_id is None else "AND (s.world_id IS NULL OR s.world_id = ?)"}
ORDER BY s.last_activity DESC
""" # nosec B608
if world_id is None:
cursor.execute(sql, params)
else:
cursor.execute(sql, [*params, world_id])
rows = cursor.fetchall()
conn.close()
sessions: list[dict[str, Any]] = []
for row in rows:
sessions.append(
{
"id": row[0],
"username": row[1],
"character_name": row[2],
"world_id": row[3],
"session_id": row[4],
"created_at": row[5],
"last_activity": row[6],
"expires_at": row[7],
"client_type": row[8],
"age_seconds": row[9],
}
)
return sessions
[docs]
def get_all_chat_messages(limit: int = 100, *, world_id: str | None = None) -> list[dict[str, Any]]:
"""Return recent chat messages across all rooms."""
conn = get_connection()
cursor = conn.cursor()
if world_id is None:
cursor.execute(
"""
SELECT m.id,
c.name,
m.message,
m.world_id,
m.room,
m.timestamp
FROM chat_messages m
JOIN characters c ON c.id = m.character_id
ORDER BY m.timestamp DESC
LIMIT ?
""",
(limit,),
)
else:
cursor.execute(
"""
SELECT m.id,
c.name,
m.message,
m.world_id,
m.room,
m.timestamp
FROM chat_messages m
JOIN characters c ON c.id = m.character_id
WHERE m.world_id = ?
ORDER BY m.timestamp DESC
LIMIT ?
""",
(world_id, limit),
)
rows = cursor.fetchall()
conn.close()
messages = []
for row in rows:
messages.append(
{
"id": row[0],
"username": row[1],
"message": row[2],
"world_id": row[3],
"room": row[4],
"timestamp": row[5],
}
)
return messages
# ==========================================================================
# LEGACY COMPATIBILITY SHIMS (BREAKING CHANGE TRANSITION)
# ==========================================================================
[docs]
def player_exists(username: str) -> bool:
"""Backward-compatible alias for user_exists()."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT id FROM users WHERE username = ? AND tombstoned_at IS NULL",
(username,),
)
row = cursor.fetchone()
conn.close()
return row is not None
[docs]
def get_player_role(username: str) -> str | None:
"""Backward-compatible alias for get_user_role()."""
return get_user_role(username)
[docs]
def get_player_account_origin(username: str) -> str | None:
"""Backward-compatible alias for get_user_account_origin()."""
return get_user_account_origin(username)
[docs]
def set_player_role(username: str, role: str) -> bool:
"""Backward-compatible alias for set_user_role()."""
return set_user_role(username, role)
[docs]
def is_player_active(username: str) -> bool:
"""Backward-compatible alias for is_user_active()."""
return is_user_active(username)
[docs]
def deactivate_player(username: str) -> bool:
"""Backward-compatible alias for deactivate_user()."""
return deactivate_user(username)
[docs]
def activate_player(username: str) -> bool:
"""Backward-compatible alias for activate_user()."""
return activate_user(username)
[docs]
def create_player_with_password(
username: str,
password: str,
role: str = "player",
account_origin: str = "legacy",
) -> bool:
"""Backward-compatible alias for create_user_with_password()."""
return create_user_with_password(
username,
password,
role=role,
account_origin=account_origin,
)
[docs]
def get_player_room(username: str) -> str | None:
"""Backward-compatible alias for get_character_room()."""
return get_character_room(username)
[docs]
def set_player_room(username: str, room: str) -> bool:
"""Backward-compatible alias for set_character_room()."""
return set_character_room(username, room)
[docs]
def get_player_inventory(username: str) -> list[str]:
"""Backward-compatible alias for get_character_inventory()."""
return get_character_inventory(username)
[docs]
def set_player_inventory(username: str, inventory: list[str]) -> bool:
"""Backward-compatible alias for set_character_inventory()."""
return set_character_inventory(username, inventory)
[docs]
def get_active_players() -> list[str]:
"""Backward-compatible alias for get_active_characters()."""
return get_active_characters()
[docs]
def get_players_in_room(room: str) -> list[str]:
"""Backward-compatible alias for get_characters_in_room()."""
return get_characters_in_room(room)
[docs]
def get_all_players_detailed() -> list[dict[str, Any]]:
"""Backward-compatible alias for get_all_users_detailed()."""
return get_all_users_detailed()
[docs]
def get_all_players() -> list[dict[str, Any]]:
"""Backward-compatible alias for get_all_users()."""
return get_all_users()
[docs]
def get_player_locations() -> list[dict[str, Any]]:
"""Backward-compatible alias for get_character_locations()."""
return get_character_locations()
[docs]
def delete_player(username: str) -> bool:
"""Backward-compatible alias for delete_user()."""
return delete_user(username)
[docs]
def cleanup_temporary_accounts(max_age_hours: int = 24, origin: str = "visitor") -> int:
"""
Backward-compatible alias for cleanup_expired_guest_accounts().
Args are ignored because guest expiry is timestamp-driven.
"""
return cleanup_expired_guest_accounts()
[docs]
def remove_session(username: str) -> bool:
"""Backward-compatible alias for removing sessions by username."""
user_id = get_user_id(username)
if not user_id:
return False
return remove_sessions_for_user(user_id)
if __name__ == "__main__":
init_database()
print(f"Database initialized at {_get_db_path()}")