Load RabbitMQ credentials from config.yaml
# config.yaml [rabbitmq] host = concurp1.isc.heia-fr.ch port = 5072 username = guest password = guest [dictionary] hello = bonjour world = monde cat = chat
import pika
import configparser
# Load config
config = configparser.ConfigParser()
config.read('config.yaml')
# Get RabbitMQ settings
HOST = config.get('rabbitmq', 'host')
PORT = config.getint('rabbitmq', 'port')
USER = config.get('rabbitmq', 'username')
PASS = config.get('rabbitmq', 'password')
# Create connection
conn_params = pika.ConnectionParameters(
host=HOST,
port=PORT,
credentials=pika.PlainCredentials(USER, PASS)
)
connection = pika.BlockingConnection(conn_params)
channel = connection.channel()
# Load dictionary
DICTIONARY = dict(config.items('dictionary'))
print(DICTIONARY) # {'hello': 'bonjour', 'world': 'monde', ...}import os
import pika
HOST = os.getenv('RABBITMQ_HOST', 'concurp1.isc.heia-fr.ch')
PORT = int(os.getenv('RABBITMQ_PORT', '5072'))
USER = os.getenv('RABBITMQ_USER', 'guest')
PASS = os.getenv('RABBITMQ_PASS', 'guest')
conn_params = pika.ConnectionParameters(
host=HOST,
port=PORT,
credentials=pika.PlainCredentials(USER, PASS),
heartbeat=600,
blocked_connection_timeout=300
)Pika connections NOT thread-safe. Each thread needs own connection.
import pika
import threading
def get_connection():
"""Create new connection (call once per thread)"""
return pika.BlockingConnection(
pika.ConnectionParameters(
host='concurp1.isc.heia-fr.ch',
port=5072,
credentials=pika.PlainCredentials('guest', 'guest')
)
)
def worker_thread():
# Each thread gets own connection
connection = get_connection()
channel = connection.channel()
# Do work...
channel.basic_publish(...)
connection.close()
# Spawn threads
threads = []
for i in range(5):
t = threading.Thread(target=worker_thread)
t.start()
threads.append(t)
for t in threads:
t.join()from concurrent.futures import ThreadPoolExecutor
import pika
executor = ThreadPoolExecutor(max_workers=10)
def process_message(body):
"""Heavy processing in thread pool"""
# Do slow work here
result = expensive_operation(body)
return result
def callback(ch, method, properties, body):
"""Main callback (runs in consumer thread)"""
# Submit to thread pool
future = executor.submit(process_message, body)
# Register completion callback
def done(f):
try:
result = f.result()
print(f"[✓] Processed: {result}")
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"[✗] Error: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
future.add_done_callback(done)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()import pika
import time
import json
MAX_RETRIES = 3
def callback(ch, method, properties, body):
try:
# Parse message
data = json.loads(body.decode())
# Check retry count
retry_count = properties.headers.get('x-retry-count', 0) if properties.headers else 0
# Process
result = process_data(data)
print(f"[✓] Processed: {result}")
# Success - ack
ch.basic_ack(delivery_tag=method.delivery_tag)
except json.JSONDecodeError as e:
# Invalid JSON - don't requeue (poison message)
print(f"[✗] Invalid JSON: {e}")
ch.basic_ack(delivery_tag=method.delivery_tag) # Discard
except Exception as e:
print(f"[✗] Processing error: {e}")
# Check retry limit
if retry_count < MAX_RETRIES:
# Requeue with incremented retry count
print(f"[⟳] Retry {retry_count + 1}/{MAX_RETRIES}")
headers = properties.headers or {}
headers['x-retry-count'] = retry_count + 1
ch.basic_publish(
exchange='',
routing_key=method.routing_key,
body=body,
properties=pika.BasicProperties(headers=headers)
)
# Ack original (we re-published it)
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
# Max retries exceeded - send to dead letter queue
print(f"[☠] Max retries exceeded, sending to DLQ")
ch.basic_publish(
exchange='',
routing_key='dead_letter_queue',
body=body,
properties=properties
)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)
channel.start_consuming()import pika
import time
def safe_publish(channel, routing_key, body, max_retries=3):
"""Publish with automatic retry on failure"""
for attempt in range(max_retries):
try:
channel.basic_publish(
exchange='',
routing_key=routing_key,
body=body.encode(),
properties=pika.BasicProperties(delivery_mode=2)
)
return True
except pika.exceptions.AMQPConnectionError as e:
print(f"[✗] Connection error (attempt {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
# Reconnect
global connection, channel
connection = create_connection()
channel = connection.channel()
else:
print(f"[☠] Failed after {max_retries} attempts")
return False
except Exception as e:
print(f"[✗] Unexpected error: {e}")
return False
# Usage
if safe_publish(channel, 'task_queue', 'my message'):
print("[✓] Published successfully")
else:
print("[✗] Publish failed")import time
class MessageCounter:
def __init__(self):
self.count = 0
self.start_time = None
def start(self):
self.start_time = time.time()
self.count = 0
def increment(self):
self.count += 1
def report(self):
duration = time.time() - self.start_time
rate = self.count / duration if duration > 0 else 0
print(f"[Stats] {self.count} messages in {duration:.2f}s = {rate:.0f} msg/s")
# Usage in consumer
counter = MessageCounter()
counter.start()
def callback(ch, method, properties, body):
process_message(body)
counter.increment()
# Report every 1000 messages
if counter.count % 1000 == 0:
counter.report()
ch.basic_ack(delivery_tag=method.delivery_tag)def is_queue_empty(channel, queue_name):
"""Check if queue has messages"""
method = channel.queue_declare(queue=queue_name, passive=True)
return method.method.message_count == 0
# Usage in tests
assert is_queue_empty(channel, 'task_queue'), "Queue should be empty after processing"
# Or get message count
method = channel.queue_declare(queue='task_queue', passive=True)
print(f"Queue depth: {method.method.message_count} messages")import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='concurp1.isc.heia-fr.ch',
port=5072,
credentials=pika.PlainCredentials('guest', 'guest')
)
)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
for i in range(10):
message = f"Task {i}"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message.encode(),
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"[✓] Sent: {message}")
connection.close()import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='concurp1.isc.heia-fr.ch',
port=5072,
credentials=pika.PlainCredentials('guest', 'guest')
)
)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
message = body.decode()
print(f"[x] Received: {message}")
# Simulate work
time.sleep(message.count('.'))
print(f"[✓] Done: {message}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue='task_queue',
on_message_callback=callback,
auto_ack=False
)
print('[*] Waiting for messages. Press CTRL+C to exit')
channel.start_consuming()import json
# Producer
data = {
"user_id": 123,
"action": "purchase",
"amount": 99.99
}
channel.basic_publish(
exchange='',
routing_key='events',
body=json.dumps(data).encode()
)
# Consumer
def callback(ch, method, properties, body):
try:
data = json.loads(body.decode())
print(f"[x] User {data['user_id']} {data['action']} ${data['amount']}")
ch.basic_ack(delivery_tag=method.delivery_tag)
except json.JSONDecodeError as e:
print(f"[✗] Invalid JSON: {e}")
ch.basic_ack(delivery_tag=method.delivery_tag) # Discard poison messageHOST = 'concurp1.isc.heia-fr.ch' PORT = 5072 USER = 'guest' PASS = 'guest'
auto_ack=Falsebasic_ack()queue_bind() for exchanges# Check queue exists and has messages rabbitmqctl list_queues # Check bindings rabbitmqctl list_bindings # Check connections rabbitmqctl list_connections # Purge queue (clear all messages) rabbitmqctl purge_queue task_queue