Skip to main content
COSMICBYTEZLABS
NewsSecurityHOWTOsToolsStudyTraining
ProjectsChecklistsAI RankingsNewsletterStatusTagsAbout
Subscribe

Press Enter to search or Esc to close

News
Security
HOWTOs
Tools
Study
Training
Projects
Checklists
AI Rankings
Newsletter
Status
Tags
About
RSS Feed
Reading List
Subscribe

Stay in the Loop

Get the latest security alerts, tutorials, and tech insights delivered to your inbox.

Subscribe NowFree forever. No spam.
COSMICBYTEZLABS

Your trusted source for IT intelligence, cybersecurity insights, and hands-on technical guides.

429+ Articles
114+ Guides

CONTENT

  • Latest News
  • Security Alerts
  • HOWTOs
  • Projects
  • Exam Prep

RESOURCES

  • Search
  • Browse Tags
  • Newsletter Archive
  • Reading List
  • RSS Feed

COMPANY

  • About Us
  • Contact
  • Privacy Policy
  • Terms of Service

© 2026 CosmicBytez Labs. All rights reserved.

System Status: Operational
  1. Home
  2. HOWTOs
  3. Building Event-Driven Systems with Python asyncio
Building Event-Driven Systems with Python asyncio
HOWTOAdvanced

Building Event-Driven Systems with Python asyncio

Design and implement event-driven architectures using Python asyncio. Covers event buses, async task orchestration, graceful shutdown, and real-world...

Dylan H.

Software Engineering

February 9, 2026
5 min read

Prerequisites

  • Python 3.10+
  • Basic async/await understanding
  • Understanding of design patterns

Overview

Event-driven architecture decouples components by communicating through events rather than direct function calls. Combined with Python's asyncio, this creates highly concurrent, maintainable systems. This guide covers patterns proven in production trading systems processing thousands of events per second.

What You'll Build

┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│  Data Feed   │────►│  Event Bus   │────►│  Strategy    │
│  (Producer)  │     │  (Router)    │     │  (Consumer)  │
└──────────────┘     └──────┬───────┘     └──────────────┘
                           │
                     ┌─────┴──────┐
                     │  Risk Mgr  │
                     │  (Guard)   │
                     └────────────┘

Requirements

ComponentVersion
Python3.10+
asynciostdlib
aiohttp3.9+ (optional, for HTTP)

Process

Step 1: Event Bus Implementation

The event bus is the backbone — it routes events to registered handlers:

# event_bus.py
import asyncio
from collections import defaultdict
from typing import Any, Callable, Coroutine
 
EventHandler = Callable[..., Coroutine[Any, Any, None]]
 
class EventBus:
    def __init__(self):
        self._handlers: dict[str, list[EventHandler]] = defaultdict(list)
        self._queue: asyncio.Queue = asyncio.Queue()
        self._running = False
 
    def subscribe(self, event_type: str, handler: EventHandler) -> None:
        """Register a handler for an event type."""
        self._handlers[event_type].append(handler)
 
    def unsubscribe(self, event_type: str, handler: EventHandler) -> None:
        """Remove a handler."""
        self._handlers[event_type].remove(handler)
 
    async def publish(self, event_type: str, data: Any = None) -> None:
        """Queue an event for processing."""
        await self._queue.put((event_type, data))
 
    async def start(self) -> None:
        """Start processing events from the queue."""
        self._running = True
        while self._running:
            try:
                event_type, data = await asyncio.wait_for(
                    self._queue.get(), timeout=1.0
                )
                handlers = self._handlers.get(event_type, [])
                # Run all handlers concurrently
                await asyncio.gather(
                    *(h(data) for h in handlers),
                    return_exceptions=True
                )
            except asyncio.TimeoutError:
                continue  # Check if still running
 
    async def stop(self) -> None:
        """Gracefully stop the event bus."""
        self._running = False
        # Drain remaining events
        while not self._queue.empty():
            event_type, data = self._queue.get_nowait()
            handlers = self._handlers.get(event_type, [])
            await asyncio.gather(
                *(h(data) for h in handlers),
                return_exceptions=True
            )

Step 2: Component Registry

Register and manage components with lifecycle hooks:

# registry.py
import asyncio
from typing import Protocol
 
class Component(Protocol):
    """Interface that all system components must implement."""
    async def start(self) -> None: ...
    async def stop(self) -> None: ...
    @property
    def name(self) -> str: ...
 
class ComponentRegistry:
    def __init__(self):
        self._components: dict[str, Component] = {}
 
    def register(self, component: Component) -> None:
        self._components[component.name] = component
 
    async def start_all(self) -> None:
        """Start all components concurrently."""
        await asyncio.gather(
            *(c.start() for c in self._components.values())
        )
 
    async def stop_all(self) -> None:
        """Stop all components in reverse registration order."""
        for component in reversed(list(self._components.values())):
            try:
                await asyncio.wait_for(component.stop(), timeout=5.0)
            except asyncio.TimeoutError:
                print(f"Warning: {component.name} did not stop within 5s")
 
    def get(self, name: str) -> Component:
        return self._components[name]

Step 3: Graceful Shutdown with Signal Handling

# shutdown.py
import asyncio
import signal
 
class GracefulShutdown:
    def __init__(self):
        self._shutdown_event = asyncio.Event()
 
    def setup(self, loop: asyncio.AbstractEventLoop) -> None:
        """Register signal handlers for graceful shutdown."""
        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.add_signal_handler(sig, self._handle_signal)
 
    def _handle_signal(self) -> None:
        print("\nShutdown signal received...")
        self._shutdown_event.set()
 
    async def wait(self) -> None:
        """Block until shutdown signal received."""
        await self._shutdown_event.wait()
 
    @property
    def is_shutting_down(self) -> bool:
        return self._shutdown_event.is_set()

Step 4: Kill Switch Pattern

For safety-critical systems, implement a kill switch that immediately halts all operations:

# kill_switch.py
import asyncio
from event_bus import EventBus
 
class KillSwitch:
    """Emergency stop for all system operations."""
 
    def __init__(self, event_bus: EventBus):
        self._event_bus = event_bus
        self._triggered = False
        self._reason: str | None = None
 
    async def trigger(self, reason: str) -> None:
        """Activate the kill switch."""
        if self._triggered:
            return
        self._triggered = True
        self._reason = reason
        await self._event_bus.publish("KILL_SWITCH", {
            "reason": reason,
            "timestamp": asyncio.get_event_loop().time()
        })
 
    @property
    def is_active(self) -> bool:
        return self._triggered
 
    async def reset(self, authorization: str) -> None:
        """Reset requires explicit authorization."""
        if authorization == "AUTHORIZED_RESET":
            self._triggered = False
            self._reason = None
            await self._event_bus.publish("KILL_SWITCH_RESET", {})

Step 5: Putting It All Together

# main.py
import asyncio
from event_bus import EventBus
from registry import ComponentRegistry
from shutdown import GracefulShutdown
from kill_switch import KillSwitch
 
class DataFeed:
    """Example producer component."""
    name = "data-feed"
 
    def __init__(self, event_bus: EventBus):
        self._bus = event_bus
        self._running = False
 
    async def start(self):
        self._running = True
        asyncio.create_task(self._produce())
 
    async def stop(self):
        self._running = False
 
    async def _produce(self):
        counter = 0
        while self._running:
            counter += 1
            await self._bus.publish("market_data", {
                "symbol": "BTC-USD",
                "price": 45000 + counter,
                "volume": 1000
            })
            await asyncio.sleep(0.1)
 
class Strategy:
    """Example consumer component."""
    name = "strategy"
 
    def __init__(self, event_bus: EventBus, kill_switch: KillSwitch):
        self._bus = event_bus
        self._kill = kill_switch
 
    async def start(self):
        self._bus.subscribe("market_data", self._on_market_data)
        self._bus.subscribe("KILL_SWITCH", self._on_kill)
 
    async def stop(self):
        pass
 
    async def _on_market_data(self, data):
        if self._kill.is_active:
            return
        # Process market data...
        print(f"Processing {data['symbol']} @ {data['price']}")
 
    async def _on_kill(self, data):
        print(f"Kill switch activated: {data['reason']}")
 
async def main():
    event_bus = EventBus()
    kill_switch = KillSwitch(event_bus)
    registry = ComponentRegistry()
    shutdown = GracefulShutdown()
 
    # Register components
    registry.register(DataFeed(event_bus))
    registry.register(Strategy(event_bus, kill_switch))
 
    # Start everything
    loop = asyncio.get_event_loop()
    shutdown.setup(loop)
 
    bus_task = asyncio.create_task(event_bus.start())
    await registry.start_all()
 
    # Wait for shutdown signal
    await shutdown.wait()
 
    # Graceful teardown
    await registry.stop_all()
    await event_bus.stop()
    bus_task.cancel()
 
if __name__ == "__main__":
    asyncio.run(main())

Error Handling Patterns

Circuit Breaker

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, reset_timeout: float = 60):
        self._failures = 0
        self._threshold = failure_threshold
        self._reset_timeout = reset_timeout
        self._state = "closed"  # closed, open, half-open
        self._last_failure: float = 0
 
    async def call(self, coro):
        if self._state == "open":
            if asyncio.get_event_loop().time() - self._last_failure > self._reset_timeout:
                self._state = "half-open"
            else:
                raise RuntimeError("Circuit breaker is open")
 
        try:
            result = await coro
            if self._state == "half-open":
                self._state = "closed"
                self._failures = 0
            return result
        except Exception:
            self._failures += 1
            self._last_failure = asyncio.get_event_loop().time()
            if self._failures >= self._threshold:
                self._state = "open"
            raise

Key Takeaways

  • Use asyncio.Queue for backpressure control between producers and consumers
  • Always implement graceful shutdown — drain queues before exiting
  • Kill switches are essential for safety-critical systems
  • Circuit breakers prevent cascading failures across components
  • Run handlers with return_exceptions=True so one failure doesn't block others
  • Keep event handlers fast — offload heavy work to separate tasks
#Python#asyncio#Event-Driven#Architecture#Trading

Related Articles

Automating Report Generation with Python and Jinja2

Build an automated report generation system using Python, Jinja2 templates, and data extraction from multiple sources. Covers multi-tenant data...

5 min read

Python for Security Automation: Essential Scripting

Learn Python security scripting fundamentals including network scanning, log parsing, hash analysis, API integration, and automated threat detection for...

8 min read

Container Security Scanning with Trivy: Images, IaC, and CI/CD

Learn how to use Trivy to scan container images, Dockerfiles, Kubernetes manifests, and Terraform for vulnerabilities and misconfigurations — then integrate it into your GitHub Actions pipeline.

7 min read
Back to all HOWTOs