Routing pattern uses direct exchange to route messages to queues based on exact routing key match. Selective pub/sub.
Use Case:
[Producer] --routing_key="error"--> [Direct Exchange]
|
|--(binding key="error")-> [Error Queue] -> [Error Handler]
|
|--(binding key="warning")-> [Warning Queue] -> [Warning Handler]
|
+--(binding key="info")-> [Info Queue] -> [Info Handler]
Message with key "error" ONLY goes to Error Queueimport pika
connection = pika.BlockingConnection(...)
channel = connection.channel()
# Declare direct exchange
channel.exchange_declare(exchange='logs_direct', exchange_type='direct')
# Publish with routing key
severity = 'error' # or 'warning', 'info'
message = 'Database connection failed'
channel.basic_publish(
exchange='logs_direct',
routing_key=severity, # Messages routed by this key
body=f"{severity}: {message}".encode()
)
print(f"[✓] Sent [{severity}]: {message}")import pika
import sys
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.exchange_declare(exchange='logs_direct', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Subscribe to specific severities
severities = sys.argv[1:] if len(sys.argv) > 1 else ['info']
for severity in severities:
channel.queue_bind(
exchange='logs_direct',
queue=queue_name,
routing_key=severity # Only receive messages with this key
)
print(f"[*] Subscribed to: {severities}")
def callback(ch, method, properties, body):
print(f"[x] {method.routing_key}: {body.decode()}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
# Usage:
# python subscriber.py error # Only errors
# python subscriber.py error warning # Errors + warnings
# python subscriber.py info # Only infoOne queue can bind to multiple routing keys.
# All logs queue (receives everything) channel.queue_bind(exchange='logs_direct', queue='all_logs', routing_key='info') channel.queue_bind(exchange='logs_direct', queue='all_logs', routing_key='warning') channel.queue_bind(exchange='logs_direct', queue='all_logs', routing_key='error') # Critical logs queue (only errors) channel.queue_bind(exchange='logs_direct', queue='critical_logs', routing_key='error')
| Concept | Detail |
|---|---|
| Exchange Type | direct |
| Routing | Exact key match |
| Use Case | Selective broadcast |
| Multiple Bindings | Supported (queue can receive multiple keys) |