Back/Code Snippets

Ready-to-Copy Code Snippets

1. Configuration Setup

ConfigParser (YAML)

Load RabbitMQ credentials from config.yaml

# config.yaml
[rabbitmq]
host = concurp1.isc.heia-fr.ch
port = 5072
username = guest
password = guest

[dictionary]
hello = bonjour
world = monde
cat = chat
import pika
import configparser

# Load config
config = configparser.ConfigParser()
config.read('config.yaml')

# Get RabbitMQ settings
HOST = config.get('rabbitmq', 'host')
PORT = config.getint('rabbitmq', 'port')
USER = config.get('rabbitmq', 'username')
PASS = config.get('rabbitmq', 'password')

# Create connection
conn_params = pika.ConnectionParameters(
    host=HOST,
    port=PORT,
    credentials=pika.PlainCredentials(USER, PASS)
)

connection = pika.BlockingConnection(conn_params)
channel = connection.channel()

# Load dictionary
DICTIONARY = dict(config.items('dictionary'))
print(DICTIONARY)  # {'hello': 'bonjour', 'world': 'monde', ...}

Environment Variables

import os
import pika

HOST = os.getenv('RABBITMQ_HOST', 'concurp1.isc.heia-fr.ch')
PORT = int(os.getenv('RABBITMQ_PORT', '5072'))
USER = os.getenv('RABBITMQ_USER', 'guest')
PASS = os.getenv('RABBITMQ_PASS', 'guest')

conn_params = pika.ConnectionParameters(
    host=HOST,
    port=PORT,
    credentials=pika.PlainCredentials(USER, PASS),
    heartbeat=600,
    blocked_connection_timeout=300
)

2. Thread-Safe Patterns

Connection Per Thread

Pika connections NOT thread-safe. Each thread needs own connection.

import pika
import threading

def get_connection():
    """Create new connection (call once per thread)"""
    return pika.BlockingConnection(
        pika.ConnectionParameters(
            host='concurp1.isc.heia-fr.ch',
            port=5072,
            credentials=pika.PlainCredentials('guest', 'guest')
        )
    )

def worker_thread():
    # Each thread gets own connection
    connection = get_connection()
    channel = connection.channel()

    # Do work...
    channel.basic_publish(...)

    connection.close()

# Spawn threads
threads = []
for i in range(5):
    t = threading.Thread(target=worker_thread)
    t.start()
    threads.append(t)

for t in threads:
    t.join()

Thread Pool for Processing

from concurrent.futures import ThreadPoolExecutor
import pika

executor = ThreadPoolExecutor(max_workers=10)

def process_message(body):
    """Heavy processing in thread pool"""
    # Do slow work here
    result = expensive_operation(body)
    return result

def callback(ch, method, properties, body):
    """Main callback (runs in consumer thread)"""
    # Submit to thread pool
    future = executor.submit(process_message, body)

    # Register completion callback
    def done(f):
        try:
            result = f.result()
            print(f"[✓] Processed: {result}")
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            print(f"[✗] Error: {e}")
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

    future.add_done_callback(done)

channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

3. Error Handling Patterns

Robust Consumer with Retry

import pika
import time
import json

MAX_RETRIES = 3

def callback(ch, method, properties, body):
    try:
        # Parse message
        data = json.loads(body.decode())

        # Check retry count
        retry_count = properties.headers.get('x-retry-count', 0) if properties.headers else 0

        # Process
        result = process_data(data)
        print(f"[✓] Processed: {result}")

        # Success - ack
        ch.basic_ack(delivery_tag=method.delivery_tag)

    except json.JSONDecodeError as e:
        # Invalid JSON - don't requeue (poison message)
        print(f"[✗] Invalid JSON: {e}")
        ch.basic_ack(delivery_tag=method.delivery_tag)  # Discard

    except Exception as e:
        print(f"[✗] Processing error: {e}")

        # Check retry limit
        if retry_count < MAX_RETRIES:
            # Requeue with incremented retry count
            print(f"[⟳] Retry {retry_count + 1}/{MAX_RETRIES}")

            headers = properties.headers or {}
            headers['x-retry-count'] = retry_count + 1

            ch.basic_publish(
                exchange='',
                routing_key=method.routing_key,
                body=body,
                properties=pika.BasicProperties(headers=headers)
            )

            # Ack original (we re-published it)
            ch.basic_ack(delivery_tag=method.delivery_tag)
        else:
            # Max retries exceeded - send to dead letter queue
            print(f"[☠] Max retries exceeded, sending to DLQ")
            ch.basic_publish(
                exchange='',
                routing_key='dead_letter_queue',
                body=body,
                properties=properties
            )
            ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)
channel.start_consuming()

Connection Error Handling

import pika
import time

def safe_publish(channel, routing_key, body, max_retries=3):
    """Publish with automatic retry on failure"""
    for attempt in range(max_retries):
        try:
            channel.basic_publish(
                exchange='',
                routing_key=routing_key,
                body=body.encode(),
                properties=pika.BasicProperties(delivery_mode=2)
            )
            return True
        except pika.exceptions.AMQPConnectionError as e:
            print(f"[✗] Connection error (attempt {attempt + 1}/{max_retries}): {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff
                # Reconnect
                global connection, channel
                connection = create_connection()
                channel = connection.channel()
            else:
                print(f"[☠] Failed after {max_retries} attempts")
                return False
        except Exception as e:
            print(f"[✗] Unexpected error: {e}")
            return False

# Usage
if safe_publish(channel, 'task_queue', 'my message'):
    print("[✓] Published successfully")
else:
    print("[✗] Publish failed")

4. Testing Utilities

Message Counter (Test Throughput)

import time

class MessageCounter:
    def __init__(self):
        self.count = 0
        self.start_time = None

    def start(self):
        self.start_time = time.time()
        self.count = 0

    def increment(self):
        self.count += 1

    def report(self):
        duration = time.time() - self.start_time
        rate = self.count / duration if duration > 0 else 0
        print(f"[Stats] {self.count} messages in {duration:.2f}s = {rate:.0f} msg/s")

# Usage in consumer
counter = MessageCounter()
counter.start()

def callback(ch, method, properties, body):
    process_message(body)
    counter.increment()

    # Report every 1000 messages
    if counter.count % 1000 == 0:
        counter.report()

    ch.basic_ack(delivery_tag=method.delivery_tag)

Verify Queue Empty

def is_queue_empty(channel, queue_name):
    """Check if queue has messages"""
    method = channel.queue_declare(queue=queue_name, passive=True)
    return method.method.message_count == 0

# Usage in tests
assert is_queue_empty(channel, 'task_queue'), "Queue should be empty after processing"

# Or get message count
method = channel.queue_declare(queue='task_queue', passive=True)
print(f"Queue depth: {method.method.message_count} messages")

5. Common Patterns

Simple Producer

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='concurp1.isc.heia-fr.ch',
        port=5072,
        credentials=pika.PlainCredentials('guest', 'guest')
    )
)
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

for i in range(10):
    message = f"Task {i}"
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message.encode(),
        properties=pika.BasicProperties(delivery_mode=2)
    )
    print(f"[✓] Sent: {message}")

connection.close()

Simple Consumer

import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='concurp1.isc.heia-fr.ch',
        port=5072,
        credentials=pika.PlainCredentials('guest', 'guest')
    )
)
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
channel.basic_qos(prefetch_count=1)

def callback(ch, method, properties, body):
    message = body.decode()
    print(f"[x] Received: {message}")

    # Simulate work
    time.sleep(message.count('.'))

    print(f"[✓] Done: {message}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(
    queue='task_queue',
    on_message_callback=callback,
    auto_ack=False
)

print('[*] Waiting for messages. Press CTRL+C to exit')
channel.start_consuming()

JSON Message Handler

import json

# Producer
data = {
    "user_id": 123,
    "action": "purchase",
    "amount": 99.99
}

channel.basic_publish(
    exchange='',
    routing_key='events',
    body=json.dumps(data).encode()
)

# Consumer
def callback(ch, method, properties, body):
    try:
        data = json.loads(body.decode())
        print(f"[x] User {data['user_id']} {data['action']} ${data['amount']}")
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except json.JSONDecodeError as e:
        print(f"[✗] Invalid JSON: {e}")
        ch.basic_ack(delivery_tag=method.delivery_tag)  # Discard poison message

6. Exam Quick Reference

School Server Config

HOST = 'concurp1.isc.heia-fr.ch'
PORT = 5072
USER = 'guest'
PASS = 'guest'

Most Common Mistakes

  • Forgetting auto_ack=False
  • Not calling basic_ack()
  • Using same connection in multiple threads
  • Not closing connection at end
  • Wrong exchange type (direct vs fanout)
  • Forgetting queue_bind() for exchanges

Debugging Commands

# Check queue exists and has messages
rabbitmqctl list_queues

# Check bindings
rabbitmqctl list_bindings

# Check connections
rabbitmqctl list_connections

# Purge queue (clear all messages)
rabbitmqctl purge_queue task_queue