Overview
Event-driven architecture decouples components by communicating through events rather than direct function calls. Combined with Python's asyncio, this creates highly concurrent, maintainable systems. This guide covers patterns proven in production trading systems processing thousands of events per second.
What You'll Build
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Data Feed │────►│ Event Bus │────►│ Strategy │
│ (Producer) │ │ (Router) │ │ (Consumer) │
└──────────────┘ └──────┬───────┘ └──────────────┘
│
┌─────┴──────┐
│ Risk Mgr │
│ (Guard) │
└────────────┘Requirements
| Component | Version |
|---|---|
| Python | 3.10+ |
| asyncio | stdlib |
| aiohttp | 3.9+ (optional, for HTTP) |
Process
Step 1: Event Bus Implementation
The event bus is the backbone — it routes events to registered handlers:
# event_bus.py
import asyncio
from collections import defaultdict
from typing import Any, Callable, Coroutine
EventHandler = Callable[..., Coroutine[Any, Any, None]]
class EventBus:
def __init__(self):
self._handlers: dict[str, list[EventHandler]] = defaultdict(list)
self._queue: asyncio.Queue = asyncio.Queue()
self._running = False
def subscribe(self, event_type: str, handler: EventHandler) -> None:
"""Register a handler for an event type."""
self._handlers[event_type].append(handler)
def unsubscribe(self, event_type: str, handler: EventHandler) -> None:
"""Remove a handler."""
self._handlers[event_type].remove(handler)
async def publish(self, event_type: str, data: Any = None) -> None:
"""Queue an event for processing."""
await self._queue.put((event_type, data))
async def start(self) -> None:
"""Start processing events from the queue."""
self._running = True
while self._running:
try:
event_type, data = await asyncio.wait_for(
self._queue.get(), timeout=1.0
)
handlers = self._handlers.get(event_type, [])
# Run all handlers concurrently
await asyncio.gather(
*(h(data) for h in handlers),
return_exceptions=True
)
except asyncio.TimeoutError:
continue # Check if still running
async def stop(self) -> None:
"""Gracefully stop the event bus."""
self._running = False
# Drain remaining events
while not self._queue.empty():
event_type, data = self._queue.get_nowait()
handlers = self._handlers.get(event_type, [])
await asyncio.gather(
*(h(data) for h in handlers),
return_exceptions=True
)Step 2: Component Registry
Register and manage components with lifecycle hooks:
# registry.py
import asyncio
from typing import Protocol
class Component(Protocol):
"""Interface that all system components must implement."""
async def start(self) -> None: ...
async def stop(self) -> None: ...
@property
def name(self) -> str: ...
class ComponentRegistry:
def __init__(self):
self._components: dict[str, Component] = {}
def register(self, component: Component) -> None:
self._components[component.name] = component
async def start_all(self) -> None:
"""Start all components concurrently."""
await asyncio.gather(
*(c.start() for c in self._components.values())
)
async def stop_all(self) -> None:
"""Stop all components in reverse registration order."""
for component in reversed(list(self._components.values())):
try:
await asyncio.wait_for(component.stop(), timeout=5.0)
except asyncio.TimeoutError:
print(f"Warning: {component.name} did not stop within 5s")
def get(self, name: str) -> Component:
return self._components[name]Step 3: Graceful Shutdown with Signal Handling
# shutdown.py
import asyncio
import signal
class GracefulShutdown:
def __init__(self):
self._shutdown_event = asyncio.Event()
def setup(self, loop: asyncio.AbstractEventLoop) -> None:
"""Register signal handlers for graceful shutdown."""
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, self._handle_signal)
def _handle_signal(self) -> None:
print("\nShutdown signal received...")
self._shutdown_event.set()
async def wait(self) -> None:
"""Block until shutdown signal received."""
await self._shutdown_event.wait()
@property
def is_shutting_down(self) -> bool:
return self._shutdown_event.is_set()Step 4: Kill Switch Pattern
For safety-critical systems, implement a kill switch that immediately halts all operations:
# kill_switch.py
import asyncio
from event_bus import EventBus
class KillSwitch:
"""Emergency stop for all system operations."""
def __init__(self, event_bus: EventBus):
self._event_bus = event_bus
self._triggered = False
self._reason: str | None = None
async def trigger(self, reason: str) -> None:
"""Activate the kill switch."""
if self._triggered:
return
self._triggered = True
self._reason = reason
await self._event_bus.publish("KILL_SWITCH", {
"reason": reason,
"timestamp": asyncio.get_event_loop().time()
})
@property
def is_active(self) -> bool:
return self._triggered
async def reset(self, authorization: str) -> None:
"""Reset requires explicit authorization."""
if authorization == "AUTHORIZED_RESET":
self._triggered = False
self._reason = None
await self._event_bus.publish("KILL_SWITCH_RESET", {})Step 5: Putting It All Together
# main.py
import asyncio
from event_bus import EventBus
from registry import ComponentRegistry
from shutdown import GracefulShutdown
from kill_switch import KillSwitch
class DataFeed:
"""Example producer component."""
name = "data-feed"
def __init__(self, event_bus: EventBus):
self._bus = event_bus
self._running = False
async def start(self):
self._running = True
asyncio.create_task(self._produce())
async def stop(self):
self._running = False
async def _produce(self):
counter = 0
while self._running:
counter += 1
await self._bus.publish("market_data", {
"symbol": "BTC-USD",
"price": 45000 + counter,
"volume": 1000
})
await asyncio.sleep(0.1)
class Strategy:
"""Example consumer component."""
name = "strategy"
def __init__(self, event_bus: EventBus, kill_switch: KillSwitch):
self._bus = event_bus
self._kill = kill_switch
async def start(self):
self._bus.subscribe("market_data", self._on_market_data)
self._bus.subscribe("KILL_SWITCH", self._on_kill)
async def stop(self):
pass
async def _on_market_data(self, data):
if self._kill.is_active:
return
# Process market data...
print(f"Processing {data['symbol']} @ {data['price']}")
async def _on_kill(self, data):
print(f"Kill switch activated: {data['reason']}")
async def main():
event_bus = EventBus()
kill_switch = KillSwitch(event_bus)
registry = ComponentRegistry()
shutdown = GracefulShutdown()
# Register components
registry.register(DataFeed(event_bus))
registry.register(Strategy(event_bus, kill_switch))
# Start everything
loop = asyncio.get_event_loop()
shutdown.setup(loop)
bus_task = asyncio.create_task(event_bus.start())
await registry.start_all()
# Wait for shutdown signal
await shutdown.wait()
# Graceful teardown
await registry.stop_all()
await event_bus.stop()
bus_task.cancel()
if __name__ == "__main__":
asyncio.run(main())Error Handling Patterns
Circuit Breaker
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, reset_timeout: float = 60):
self._failures = 0
self._threshold = failure_threshold
self._reset_timeout = reset_timeout
self._state = "closed" # closed, open, half-open
self._last_failure: float = 0
async def call(self, coro):
if self._state == "open":
if asyncio.get_event_loop().time() - self._last_failure > self._reset_timeout:
self._state = "half-open"
else:
raise RuntimeError("Circuit breaker is open")
try:
result = await coro
if self._state == "half-open":
self._state = "closed"
self._failures = 0
return result
except Exception:
self._failures += 1
self._last_failure = asyncio.get_event_loop().time()
if self._failures >= self._threshold:
self._state = "open"
raiseKey Takeaways
- Use
asyncio.Queuefor backpressure control between producers and consumers - Always implement graceful shutdown — drain queues before exiting
- Kill switches are essential for safety-critical systems
- Circuit breakers prevent cascading failures across components
- Run handlers with
return_exceptions=Trueso one failure doesn't block others - Keep event handlers fast — offload heavy work to separate tasks