Publish/Subscribe delivers same message to multiple consumers. Uses fanout exchange to broadcast messages to all bound queues.
Use Case:
[Producer] --> [Fanout Exchange] --> [Queue A] --> [Consumer A]
|
|------------> [Queue B] --> [Consumer B]
|
+------------> [Queue C] --> [Consumer C]
ALL queues bound to exchange receive EVERY message (broadcast)import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='concurp1.isc.heia-fr.ch',
port=5072,
credentials=pika.PlainCredentials('guest', 'guest')
)
)
channel = connection.channel()
# Declare fanout exchange
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# Publish message (no routing_key needed for fanout)
message = "System error: Database connection failed"
channel.basic_publish(
exchange='logs',
routing_key='', # Ignored by fanout exchange
body=message.encode()
)
print(f"[x] Broadcast: '{message}'")
connection.close()import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='concurp1.isc.heia-fr.ch',
port=5072,
credentials=pika.PlainCredentials('guest', 'guest')
)
)
channel = connection.channel()
# Declare same exchange (idempotent)
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# Create exclusive temporary queue (auto-deleted when consumer disconnects)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Bind queue to exchange
channel.queue_bind(exchange='logs', queue=queue_name)
def callback(ch, method, properties, body):
print(f"[x] Received: {body.decode()}")
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
print('[*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()Lab20 builds print job fanout system where jobs sent to multiple printer queues.
channel.exchange_declare(exchange='print_jobs', exchange_type='fanout')
job = {
"document": "report.pdf",
"pages": 10,
"color": True
}
channel.basic_publish(
exchange='print_jobs',
routing_key='',
body=json.dumps(job).encode()
)
print(f"[x] Sent print job: {job['document']}")import json
channel.exchange_declare(exchange='print_jobs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='print_jobs', queue=queue_name)
def print_job(ch, method, properties, body):
job = json.loads(body.decode())
print(f"[Printer] Processing: {job['document']}")
# Simulate printing
time.sleep(job['pages'] * 0.5)
print(f"[Printer] Done: {job['document']}")
channel.basic_consume(
queue=queue_name,
on_message_callback=print_job,
auto_ack=True
)| Concept | Detail |
|---|---|
| Exchange Type | fanout |
| Routing Key | Ignored (broadcast to all) |
| Queue Type | Usually exclusive (auto-deleted) |
| Message Delivery | Copy to EVERY bound queue |
❌ Using direct exchange instead of fanout
Direct exchange requires exact routing_key match. Fanout broadcasts to all.
❌ Forgetting queue_bind
Queue must be bound to exchange. Declaring queue alone not enough.
❌ Messages lost when no consumers
If no queues bound to exchange, messages dropped. This is by design (ephemeral broadcast).