mud_server.core.bus
PipeWorks MUD Event Bus
The central nervous system of the MUD server. All significant events flow through this bus, creating an auditable, replayable record of what happened.
ARCHITECTURAL PRINCIPLES (see _working/plugin_development.md)
The bus is boring by design. Boring is how we keep it honest.
THE BUS RECORDS FACTS - Events represent things that HAPPENED (past tense) - “player:move” means the player moved, not “please move the player” - The bus does not decide outcomes, it records them
EVENTS ARE IMMUTABLE - Once emitted, an event cannot be changed - This enables replay, debugging, and audit - Handlers receive events, they cannot modify them
EMIT IS SYNCHRONOUS - Event creation and log commit happen synchronously - This guarantees deterministic ordering - Sequence numbers enforce global order
ASYNC IS AN EXECUTION DETAIL - Handlers may be sync or async - Async handlers are SCHEDULED after the event is committed - Async execution does not affect event order
PLUGINS REACT, THEY DO NOT INTERVENE - There are no “before” events that can block - Plugins subscribe and react to facts - If a plugin needs to “respond”, it emits a NEW event
GOBLIN LAWS
Law #7 “No Fat Orcs” - The bus does ONE thing: record events Law #8 “Boundary Guards” - Everything flows through the bus first Law #13 “Guest List” - Single source of truth, one bus, one log Law #37 “No Meddling” - Components communicate via bus, never directly
USAGE
from mud_server.core.bus import bus
# Emit an event (synchronous - returns immediately after commit) event = bus.emit(“player:move”, {
“username”: “Gribnak”, “from_room”: “tavern”, “to_room”: “street”
})
# Subscribe to events def on_player_move(event):
print(f”{event.detail[‘username’]} moved!”)
unsubscribe = bus.on(“player:move”, on_player_move)
# Later: stop listening unsubscribe()
# Async handlers work too async def on_player_move_async(event):
await notify_room(event.detail[‘to_room’])
bus.on(“player:move”, on_player_move_async)
Attributes
Classes
Metadata attached to every event. |
|
A single event on the bus. |
|
The Central Event Bus - Singleton Pattern. |
Module Contents
- mud_server.core.bus.logger
- mud_server.core.bus.SyncHandler
- mud_server.core.bus.AsyncHandler
- mud_server.core.bus.EventHandler
- mud_server.core.bus.Unsubscribe
- class mud_server.core.bus.EventMetadata[source]
Metadata attached to every event.
This metadata provides context for debugging, replay, and audit: - timestamp: When did this happen? (wall clock time) - source: Which component emitted it? (for debugging) - sequence: What’s the global order? (for determinism)
The class is frozen (immutable) because events cannot change after creation.
- timestamp
Unix epoch milliseconds (UTC). Wall clock time of emission. Used for debugging and display, NOT for ordering.
- source
Name of the component that emitted this event. Examples: “engine”, “WeatherPlugin”, “auth”
- sequence
Monotonically increasing integer. The ONLY reliable way to determine event order. Two events with seq 5 and seq 6 are guaranteed to have been emitted in that order, regardless of timestamp.
- class mud_server.core.bus.MudEvent[source]
A single event on the bus.
Events are the atoms of the bus. They represent facts about what happened. Once created, they are immutable - this is enforced by frozen=True.
The event lifecycle: 1. Engine/plugin calls bus.emit(“type”, {detail}) 2. Bus creates MudEvent with metadata (sequence number assigned) 3. Event is appended to the log (COMMITTED - point of no return) 4. Handlers are notified (sync notification, async execution allowed) 5. Event is returned to caller
After step 3, the event is part of history. It cannot be changed.
- type
The event type string. Convention is “domain:action” format. Examples: “player:move”, “item:pickup”, “chat:message”, “tick”
- detail
The event payload. A dictionary of relevant data. Should be treated as immutable even though Python doesn’t enforce this on dict contents.
- _meta
Event metadata (timestamp, source, sequence). Named with underscore to indicate it’s infrastructure, not business data.
Example
- MudEvent(
type=”player:move”, detail={“username”: “Gribnak”, “from”: “tavern”, “to”: “street”}, _meta=EventMetadata(timestamp=1706745600000, source=”engine”, sequence=42)
)
- property meta: EventMetadata | None
Public accessor for event metadata.
Returns the event’s metadata (timestamp, source, sequence number). The underscore-prefixed _meta is the actual attribute; this property provides a cleaner public interface.
- Returns:
EventMetadata instance, or None if event has no metadata
- Return type:
EventMetadata | None
- class mud_server.core.bus.MudBus[source]
The Central Event Bus - Singleton Pattern.
There is exactly ONE bus in the system. This is enforced by the singleton pattern. All events flow through this single point, creating a unified log of everything that happened.
Why singleton? - Single source of truth (Goblin Law #13) - All components share the same event history - No possibility of events “leaking” to a different bus - Simplifies testing (reset_for_testing method)
Thread Safety: - The current implementation is NOT thread-safe - This is intentional - the MUD server is single-threaded with async - If threading is added later, this class will need locks
Key Methods: - emit(): Record an event (synchronous, returns committed event) - on(): Subscribe to an event type (returns unsubscribe function) - once(): Subscribe for a single event only - wait_for(): Async wait for an event (for coordination) - get_event_log(): Retrieve event history (for debugging/replay)
Example
from mud_server.core.bus import bus # The singleton instance
# Emit bus.emit(“player:login”, {“username”: “Gribnak”})
# Subscribe bus.on(“player:login”, lambda e: print(f”Welcome {e.detail[‘username’]}”))
Initialize the bus (only runs once due to _initialized flag).
Sets up: - Handler registry (who listens to what) - Event log (bounded history) - Sequence counter (for ordering) - Wait promises (for async coordination)
- emit(event_type, detail=None, source='engine')[source]
Emit an event to the bus.
THIS IS THE MOST IMPORTANT METHOD IN THE BUS.
The method is SYNCHRONOUS by design. When it returns: 1. The event has been created with a sequence number 2. The event has been committed to the log 3. All sync handlers have been called 4. All async handlers have been scheduled
The event is now part of history. It cannot be changed, suppressed, or reordered.
- Parameters:
event_type (str) – The type of event (e.g., “player:move”, “tick”) Convention: “domain:action” format
detail (dict[str, Any] | None) – The event payload. Optional, defaults to empty dict. Should contain all relevant data for handlers.
source (str) – Which component is emitting. Defaults to “engine”. Used for debugging and filtering.
- Returns:
The committed MudEvent. This is immutable and now part of the event log.
- Return type:
Example
# Simple event bus.emit(“tick”, {“delta”: 1.0})
# Event with details event = bus.emit(“player:move”, {
“username”: “Gribnak”, “from_room”: “tavern”, “to_room”: “street”, “direction”: “north”
}, source=”engine”)
print(event._meta.sequence) # e.g., 42
- on(event_type, handler)[source]
Subscribe to an event type.
When an event of this type is emitted, your handler will be called. Handlers are called in registration order (FIFO).
Remember: You are subscribing to FACTS. The event has already happened by the time your handler is called. You are reacting, not intervening.
- Parameters:
event_type (str) – The event type to listen for (e.g., “player:move”)
handler (EventHandler) – Function to call. Can be sync or async. Receives the MudEvent as its only argument.
- Returns:
An unsubscribe function. Call it to stop receiving events.
- Return type:
Unsubscribe
Example
# Subscribe def on_move(event):
print(f”{event.detail[‘username’]} moved!”)
unsub = bus.on(“player:move”, on_move)
# Later, when done listening: unsub()
- once(event_type, handler)[source]
Subscribe to an event type for a single event only.
Like on(), but automatically unsubscribes after the first event. Useful for one-time initialization or waiting for a specific event.
- Parameters:
event_type (str) – The event type to listen for
handler (EventHandler) – Function to call (once)
- Returns:
An unsubscribe function (in case you want to cancel early)
- Return type:
Unsubscribe
Example
# Wait for first player to login, then do something def on_first_login(event):
print(f”First player: {event.detail[‘username’]}”) # Handler automatically unsubscribes after this
bus.once(“player:login”, on_first_login)
- async wait_for(event_type, timeout_ms=None)[source]
Wait for a specific event type (async).
This is for async coordination - waiting until something happens. It’s an EXECUTION concern, not a LOGICAL concern. The event order is not affected by who is waiting.
Use cases: - Wait for server to be ready before sending commands - Wait for a resource to be loaded - Coordinate between async components
- Parameters:
- Returns:
The MudEvent when it occurs
- Raises:
asyncio.TimeoutError – If timeout is reached before event
- Return type:
Example
# Wait for server to be ready await bus.wait_for(“server:ready”, timeout_ms=5000)
# Now safe to proceed bus.emit(“player:login”, {“username”: “Gribnak”})
- get_event_log(limit=None)[source]
Get events from the log.
The event log is the source of truth for what happened. Events are returned in order (oldest first, newest last).
Use cases: - Debugging: “What events led to this state?” - Replay: “Reconstruct state from events” - Audit: “Who did what when?”
- Parameters:
limit (int | None) – Maximum number of events to return (from the end). None means return all events in the log.
- Returns:
List of MudEvents in chronological order.
- Return type:
Example
# Get last 10 events recent = bus.get_event_log(limit=10) for event in recent:
print(f”[{event._meta.sequence}] {event.type}”)
# Get all events all_events = bus.get_event_log()
- get_sequence()[source]
Get the current sequence number.
Useful for debugging and testing. The sequence number is the total count of events ever emitted by this bus instance.
- Returns:
Current sequence number (last assigned)
- Return type:
- get_handler_count(event_type)[source]
Get the number of handlers for an event type.
Useful for debugging and testing.
- mud_server.core.bus.bus