Back/Pub/Sub Pattern

Pub/Sub Pattern (Fanout)

Overview

Publish/Subscribe delivers same message to multiple consumers. Uses fanout exchange to broadcast messages to all bound queues.

Use Case:

  • Log aggregation (send logs to console, file, Elasticsearch)
  • Notifications (email, SMS, push notification all triggered by one event)
  • Real-time dashboard updates (multiple clients subscribe to same data feed)

Architecture

[Producer] --> [Fanout Exchange] --> [Queue A] --> [Consumer A]
                       |
                       |------------> [Queue B] --> [Consumer B]
                       |
                       +------------> [Queue C] --> [Consumer C]

ALL queues bound to exchange receive EVERY message (broadcast)

Publisher Implementation

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='concurp1.isc.heia-fr.ch',
        port=5072,
        credentials=pika.PlainCredentials('guest', 'guest')
    )
)
channel = connection.channel()

# Declare fanout exchange
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# Publish message (no routing_key needed for fanout)
message = "System error: Database connection failed"
channel.basic_publish(
    exchange='logs',
    routing_key='',  # Ignored by fanout exchange
    body=message.encode()
)

print(f"[x] Broadcast: '{message}'")
connection.close()

Subscriber Implementation

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='concurp1.isc.heia-fr.ch',
        port=5072,
        credentials=pika.PlainCredentials('guest', 'guest')
    )
)
channel = connection.channel()

# Declare same exchange (idempotent)
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# Create exclusive temporary queue (auto-deleted when consumer disconnects)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# Bind queue to exchange
channel.queue_bind(exchange='logs', queue=queue_name)

def callback(ch, method, properties, body):
    print(f"[x] Received: {body.decode()}")

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

print('[*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()

Lab20: Printer System

Lab20 builds print job fanout system where jobs sent to multiple printer queues.

Print Job Publisher

channel.exchange_declare(exchange='print_jobs', exchange_type='fanout')

job = {
    "document": "report.pdf",
    "pages": 10,
    "color": True
}

channel.basic_publish(
    exchange='print_jobs',
    routing_key='',
    body=json.dumps(job).encode()
)

print(f"[x] Sent print job: {job['document']}")

Printer Consumer

import json

channel.exchange_declare(exchange='print_jobs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='print_jobs', queue=queue_name)

def print_job(ch, method, properties, body):
    job = json.loads(body.decode())
    print(f"[Printer] Processing: {job['document']}")
    # Simulate printing
    time.sleep(job['pages'] * 0.5)
    print(f"[Printer] Done: {job['document']}")

channel.basic_consume(
    queue=queue_name,
    on_message_callback=print_job,
    auto_ack=True
)

Key Points

ConceptDetail
Exchange Typefanout
Routing KeyIgnored (broadcast to all)
Queue TypeUsually exclusive (auto-deleted)
Message DeliveryCopy to EVERY bound queue

Common Pitfalls

❌ Using direct exchange instead of fanout

Direct exchange requires exact routing_key match. Fanout broadcasts to all.

❌ Forgetting queue_bind

Queue must be bound to exchange. Declaring queue alone not enough.

❌ Messages lost when no consumers

If no queues bound to exchange, messages dropped. This is by design (ephemeral broadcast).

Related Patterns