Source code for mud_server.api.auth

"""
Session management and authentication.

This module handles session-based authentication for the MUD server. It provides:
1. Database-backed session storage (source of truth)
2. Session validation with expiration enforcement
3. Permission checks for admin endpoints

Session Lifecycle:
1. Login: New session ID created and stored in the database
2. Requests: Each API call validates session, enforces expiry, updates activity
3. Logout: Session removed from the database
4. Server Restart: Sessions survive until they expire or are revoked

Security Considerations:
- Sessions use opaque IDs (UUIDs by default)
- Sessions expire using a TTL and (optionally) sliding expiration
- Database is the source of truth (supports restart persistence)
- Session validation updates activity timestamp to track last action

Future Improvements:
- Implement session refresh tokens ("remember me")
- Add device/session management UI
- Add optional IP/User-Agent tracking for session audits
"""

from datetime import UTC, datetime

from fastapi import HTTPException

from mud_server.api.permissions import Permission, has_permission
from mud_server.db import facade as database
from mud_server.db.errors import DatabaseError

# ============================================================================
# SESSION LIFECYCLE MANAGEMENT
# ============================================================================


[docs] def clear_all_sessions() -> int: """ Clear all sessions from the database. This should be called: 1. When performing emergency session resets 2. In test fixtures to ensure clean state Returns: Number of sessions removed from the database. Side Effects: - Deletes all rows from sessions table (committed to database) """ return database.clear_all_sessions()
[docs] def remove_session(session_id: str) -> bool: """ Remove a specific session from the database. This function handles targeted session removal, typically used when: 1. A user explicitly logs out (via logout endpoint) 2. An admin force-disconnects a user 3. A session is detected as invalid and needs cleanup Args: session_id: The UUID session identifier to remove. Returns: True if the session was removed, False if it was not found. """ return database.remove_session_by_id(session_id)
[docs] def get_active_session_count() -> int: """ Get the count of active (non-expired) sessions. This is useful for: 1. Health check endpoints (reporting active_players) 2. Admin dashboards showing current load 3. Rate limiting decisions based on server load 4. Logging and monitoring """ return database.get_active_session_count()
# ============================================================================ # SESSION LOOKUP FUNCTIONS # ============================================================================
[docs] def get_username_from_session(session_id: str) -> str | None: """ Get username from session ID. Returns None if the session is not found or has expired. """ session = _get_valid_session(session_id) if not session: return None return database.get_username_by_id(int(session["user_id"]))
[docs] def get_username_and_role_from_session(session_id: str) -> tuple[str, str] | None: """ Get both username and role from session ID. Returns None if the session is not found or has expired. """ session = _get_valid_session(session_id) if not session: return None user_id = int(session["user_id"]) username = database.get_username_by_id(user_id) if not username: return None role = database.get_user_role(username) if not role: return None return username, role
# ============================================================================ # SESSION VALIDATION FUNCTIONS # ============================================================================
[docs] def validate_session(session_id: str) -> tuple[int, str, str]: """ Validate session and return user information. Returns: (user_id, username, role) Raises: HTTPException(401): If session_id is invalid or expired """ try: session = _get_valid_session(session_id) if not session: raise HTTPException(status_code=401, detail="Invalid or expired session") database.update_session_activity(session_id) user_id = int(session["user_id"]) username = database.get_username_by_id(user_id) if not username: raise HTTPException(status_code=401, detail="Invalid session user") role = database.get_user_role(username) if not role: raise HTTPException(status_code=401, detail="Invalid session user") return user_id, username, role except DatabaseError as exc: raise HTTPException(status_code=500, detail="Session store unavailable") from exc
[docs] def validate_session_for_game(session_id: str) -> tuple[int, str, str, int, str, str]: """ Validate session and ensure a character is selected for gameplay. Returns: (user_id, username, role, character_id, character_name, world_id) Raises: HTTPException(401): If session_id is invalid or expired HTTPException(409): If no character is selected for this session """ try: session = _get_valid_session(session_id) if not session: raise HTTPException(status_code=401, detail="Invalid or expired session") database.update_session_activity(session_id) user_id = int(session["user_id"]) username = database.get_username_by_id(user_id) if not username: raise HTTPException(status_code=401, detail="Invalid session user") role = database.get_user_role(username) if not role: raise HTTPException(status_code=401, detail="Invalid session user") # Account and character sessions are intentionally separate. # Gameplay access requires an explicit prior character selection. character_id = session.get("character_id") if not character_id: raise HTTPException( status_code=409, detail="No character selected for session. Select a character first.", ) character_name = database.get_character_name_by_id(int(character_id)) if not character_name: raise HTTPException(status_code=409, detail="Selected character not found") character = database.get_character_by_id(int(character_id)) if not character or not character.get("world_id"): raise HTTPException(status_code=409, detail="Character world not found") world_id = character["world_id"] if session.get("world_id") != world_id: database.set_session_character(session_id, int(character_id), world_id=world_id) return user_id, username, role, int(character_id), character_name, world_id except DatabaseError as exc: raise HTTPException(status_code=500, detail="Session store unavailable") from exc
[docs] def validate_session_with_permission( session_id: str, permission: Permission ) -> tuple[int, str, str]: """ Validate session and check if user has required permission. Raises: HTTPException(401): If session is invalid or expired HTTPException(403): If session valid but user lacks required permission """ # First validate the session (raises 401 if invalid) user_id, username, role = validate_session(session_id) # Then check if the user's role has the required permission if not has_permission(role, permission): raise HTTPException( status_code=403, detail=f"Insufficient permissions. Required: {permission.value}", ) return user_id, username, role
# ============================================================================ # INTERNAL HELPERS # ============================================================================ def _get_valid_session(session_id: str) -> dict | None: """ Return session record if valid and not expired. If the session is expired, it is removed from the database so it cannot be reused. """ session = database.get_session_by_id(session_id) if not session: return None expires_at = session.get("expires_at") if expires_at and _is_expired(expires_at): database.remove_session_by_id(session_id) return None return session def _is_expired(expires_at: str) -> bool: """ Check if a stored SQLite timestamp is expired relative to current UTC time. SQLite CURRENT_TIMESTAMP format is "YYYY-MM-DD HH:MM:SS". """ try: expires_dt = datetime.strptime(expires_at, "%Y-%m-%d %H:%M:%S") except ValueError: # If parsing fails, treat as expired for safety. return True now = datetime.now(UTC).replace(tzinfo=None) return expires_dt <= now