mud_server.core.bus

PipeWorks MUD Event Bus

The central nervous system of the MUD server. All significant events flow through this bus, creating an auditable, replayable record of what happened.

ARCHITECTURAL PRINCIPLES (see _working/plugin_development.md)

The bus is boring by design. Boring is how we keep it honest.

  1. THE BUS RECORDS FACTS - Events represent things that HAPPENED (past tense) - “player:move” means the player moved, not “please move the player” - The bus does not decide outcomes, it records them

  2. EVENTS ARE IMMUTABLE - Once emitted, an event cannot be changed - This enables replay, debugging, and audit - Handlers receive events, they cannot modify them

  3. EMIT IS SYNCHRONOUS - Event creation and log commit happen synchronously - This guarantees deterministic ordering - Sequence numbers enforce global order

  4. ASYNC IS AN EXECUTION DETAIL - Handlers may be sync or async - Async handlers are SCHEDULED after the event is committed - Async execution does not affect event order

  5. PLUGINS REACT, THEY DO NOT INTERVENE - There are no “before” events that can block - Plugins subscribe and react to facts - If a plugin needs to “respond”, it emits a NEW event

GOBLIN LAWS

Law #7 “No Fat Orcs” - The bus does ONE thing: record events Law #8 “Boundary Guards” - Everything flows through the bus first Law #13 “Guest List” - Single source of truth, one bus, one log Law #37 “No Meddling” - Components communicate via bus, never directly

USAGE

from mud_server.core.bus import bus

# Emit an event (synchronous - returns immediately after commit) event = bus.emit(“player:move”, {

“username”: “Gribnak”, “from_room”: “tavern”, “to_room”: “street”

})

# Subscribe to events def on_player_move(event):

print(f”{event.detail[‘username’]} moved!”)

unsubscribe = bus.on(“player:move”, on_player_move)

# Later: stop listening unsubscribe()

# Async handlers work too async def on_player_move_async(event):

await notify_room(event.detail[‘to_room’])

bus.on(“player:move”, on_player_move_async)


Attributes

logger

SyncHandler

AsyncHandler

EventHandler

Unsubscribe

bus

Classes

EventMetadata

Metadata attached to every event.

MudEvent

A single event on the bus.

MudBus

The Central Event Bus - Singleton Pattern.

Module Contents

mud_server.core.bus.logger
mud_server.core.bus.SyncHandler
mud_server.core.bus.AsyncHandler
mud_server.core.bus.EventHandler
mud_server.core.bus.Unsubscribe
class mud_server.core.bus.EventMetadata[source]

Metadata attached to every event.

This metadata provides context for debugging, replay, and audit: - timestamp: When did this happen? (wall clock time) - source: Which component emitted it? (for debugging) - sequence: What’s the global order? (for determinism)

The class is frozen (immutable) because events cannot change after creation.

timestamp

Unix epoch milliseconds (UTC). Wall clock time of emission. Used for debugging and display, NOT for ordering.

source

Name of the component that emitted this event. Examples: “engine”, “WeatherPlugin”, “auth”

sequence

Monotonically increasing integer. The ONLY reliable way to determine event order. Two events with seq 5 and seq 6 are guaranteed to have been emitted in that order, regardless of timestamp.

timestamp: int
source: str
sequence: int
static create(source, sequence)[source]

Factory method to create metadata with current timestamp.

Parameters:
  • source (str) – The component emitting the event

  • sequence (int) – The sequence number (from bus)

Returns:

New EventMetadata instance

Return type:

EventMetadata

class mud_server.core.bus.MudEvent[source]

A single event on the bus.

Events are the atoms of the bus. They represent facts about what happened. Once created, they are immutable - this is enforced by frozen=True.

The event lifecycle: 1. Engine/plugin calls bus.emit(“type”, {detail}) 2. Bus creates MudEvent with metadata (sequence number assigned) 3. Event is appended to the log (COMMITTED - point of no return) 4. Handlers are notified (sync notification, async execution allowed) 5. Event is returned to caller

After step 3, the event is part of history. It cannot be changed.

type

The event type string. Convention is “domain:action” format. Examples: “player:move”, “item:pickup”, “chat:message”, “tick”

detail

The event payload. A dictionary of relevant data. Should be treated as immutable even though Python doesn’t enforce this on dict contents.

_meta

Event metadata (timestamp, source, sequence). Named with underscore to indicate it’s infrastructure, not business data.

Example

MudEvent(

type=”player:move”, detail={“username”: “Gribnak”, “from”: “tavern”, “to”: “street”}, _meta=EventMetadata(timestamp=1706745600000, source=”engine”, sequence=42)

)

type: str
detail: dict
property meta: EventMetadata | None

Public accessor for event metadata.

Returns the event’s metadata (timestamp, source, sequence number). The underscore-prefixed _meta is the actual attribute; this property provides a cleaner public interface.

Returns:

EventMetadata instance, or None if event has no metadata

Return type:

EventMetadata | None

class mud_server.core.bus.MudBus[source]

The Central Event Bus - Singleton Pattern.

There is exactly ONE bus in the system. This is enforced by the singleton pattern. All events flow through this single point, creating a unified log of everything that happened.

Why singleton? - Single source of truth (Goblin Law #13) - All components share the same event history - No possibility of events “leaking” to a different bus - Simplifies testing (reset_for_testing method)

Thread Safety: - The current implementation is NOT thread-safe - This is intentional - the MUD server is single-threaded with async - If threading is added later, this class will need locks

Key Methods: - emit(): Record an event (synchronous, returns committed event) - on(): Subscribe to an event type (returns unsubscribe function) - once(): Subscribe for a single event only - wait_for(): Async wait for an event (for coordination) - get_event_log(): Retrieve event history (for debugging/replay)

Example

from mud_server.core.bus import bus # The singleton instance

# Emit bus.emit(“player:login”, {“username”: “Gribnak”})

# Subscribe bus.on(“player:login”, lambda e: print(f”Welcome {e.detail[‘username’]}”))

Initialize the bus (only runs once due to _initialized flag).

Sets up: - Handler registry (who listens to what) - Event log (bounded history) - Sequence counter (for ordering) - Wait promises (for async coordination)

debug: bool = False
emit(event_type, detail=None, source='engine')[source]

Emit an event to the bus.

THIS IS THE MOST IMPORTANT METHOD IN THE BUS.

The method is SYNCHRONOUS by design. When it returns: 1. The event has been created with a sequence number 2. The event has been committed to the log 3. All sync handlers have been called 4. All async handlers have been scheduled

The event is now part of history. It cannot be changed, suppressed, or reordered.

Parameters:
  • event_type (str) – The type of event (e.g., “player:move”, “tick”) Convention: “domain:action” format

  • detail (dict[str, Any] | None) – The event payload. Optional, defaults to empty dict. Should contain all relevant data for handlers.

  • source (str) – Which component is emitting. Defaults to “engine”. Used for debugging and filtering.

Returns:

The committed MudEvent. This is immutable and now part of the event log.

Return type:

MudEvent

Example

# Simple event bus.emit(“tick”, {“delta”: 1.0})

# Event with details event = bus.emit(“player:move”, {

“username”: “Gribnak”, “from_room”: “tavern”, “to_room”: “street”, “direction”: “north”

}, source=”engine”)

print(event._meta.sequence) # e.g., 42

on(event_type, handler)[source]

Subscribe to an event type.

When an event of this type is emitted, your handler will be called. Handlers are called in registration order (FIFO).

Remember: You are subscribing to FACTS. The event has already happened by the time your handler is called. You are reacting, not intervening.

Parameters:
  • event_type (str) – The event type to listen for (e.g., “player:move”)

  • handler (EventHandler) – Function to call. Can be sync or async. Receives the MudEvent as its only argument.

Returns:

An unsubscribe function. Call it to stop receiving events.

Return type:

Unsubscribe

Example

# Subscribe def on_move(event):

print(f”{event.detail[‘username’]} moved!”)

unsub = bus.on(“player:move”, on_move)

# Later, when done listening: unsub()

once(event_type, handler)[source]

Subscribe to an event type for a single event only.

Like on(), but automatically unsubscribes after the first event. Useful for one-time initialization or waiting for a specific event.

Parameters:
  • event_type (str) – The event type to listen for

  • handler (EventHandler) – Function to call (once)

Returns:

An unsubscribe function (in case you want to cancel early)

Return type:

Unsubscribe

Example

# Wait for first player to login, then do something def on_first_login(event):

print(f”First player: {event.detail[‘username’]}”) # Handler automatically unsubscribes after this

bus.once(“player:login”, on_first_login)

async wait_for(event_type, timeout_ms=None)[source]

Wait for a specific event type (async).

This is for async coordination - waiting until something happens. It’s an EXECUTION concern, not a LOGICAL concern. The event order is not affected by who is waiting.

Use cases: - Wait for server to be ready before sending commands - Wait for a resource to be loaded - Coordinate between async components

Parameters:
  • event_type (str) – The event type to wait for

  • timeout_ms (int | None) – Maximum time to wait in milliseconds. None means wait forever.

Returns:

The MudEvent when it occurs

Raises:

asyncio.TimeoutError – If timeout is reached before event

Return type:

MudEvent

Example

# Wait for server to be ready await bus.wait_for(“server:ready”, timeout_ms=5000)

# Now safe to proceed bus.emit(“player:login”, {“username”: “Gribnak”})

get_event_log(limit=None)[source]

Get events from the log.

The event log is the source of truth for what happened. Events are returned in order (oldest first, newest last).

Use cases: - Debugging: “What events led to this state?” - Replay: “Reconstruct state from events” - Audit: “Who did what when?”

Parameters:

limit (int | None) – Maximum number of events to return (from the end). None means return all events in the log.

Returns:

List of MudEvents in chronological order.

Return type:

list[MudEvent]

Example

# Get last 10 events recent = bus.get_event_log(limit=10) for event in recent:

print(f”[{event._meta.sequence}] {event.type}”)

# Get all events all_events = bus.get_event_log()

get_sequence()[source]

Get the current sequence number.

Useful for debugging and testing. The sequence number is the total count of events ever emitted by this bus instance.

Returns:

Current sequence number (last assigned)

Return type:

int

get_handler_count(event_type)[source]

Get the number of handlers for an event type.

Useful for debugging and testing.

Parameters:

event_type (str) – The event type to check

Returns:

Number of handlers subscribed to this event type

Return type:

int

classmethod reset_for_testing()[source]

Reset the singleton for testing.

* NOT FOR PRODUCTION USE *

This clears all state and allows a fresh bus to be created. Only use this in test setup/teardown.

clear_event_log()[source]

Clear the event log.

* USE WITH CAUTION *

This erases event history. Only use for testing or when intentionally resetting state.

mud_server.core.bus.bus