🎯 Cette page contient 25+ exercices couvrant TOUS les patterns et problèmes possibles. Code complet fourni pour chaque exercice.
Scénario: Application génère logs. Broadcast vers console, fichier, email (si ERROR).
#!/usr/bin/env python3
import pika
import json
from datetime import datetime
import sys
def main():
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='concurp1.isc.heia-fr.ch',
port=5072,
credentials=pika.PlainCredentials('guest', 'guest')
)
)
channel = connection.channel()
# Fanout exchange
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# Simulate log messages
levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
for i, level in enumerate(levels):
log = {
'level': level,
'message': f'Application event {i}',
'timestamp': datetime.now().isoformat(),
'module': 'main'
}
channel.basic_publish(
exchange='logs',
routing_key='', # Ignored by fanout
body=json.dumps(log).encode()
)
print(f"[✓] Sent {level}: {log['message']}")
connection.close()
if __name__ == '__main__':
main()#!/usr/bin/env python3
import pika
import json
def main():
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# Exclusive queue (auto-deleted)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
def callback(ch, method, properties, body):
log = json.loads(body.decode())
color_map = {
'DEBUG': '\033[90m', # Gray
'INFO': '\033[94m', # Blue
'WARNING': '\033[93m', # Yellow
'ERROR': '\033[91m', # Red
'CRITICAL': '\033[95m' # Magenta
}
reset = '\033[0m'
color = color_map.get(log['level'], '')
print(f"{color}[{log['timestamp']}] {log['level']:8} {log['message']}{reset}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print('[*] Console handler waiting. Press CTRL+C to exit')
channel.start_consuming()
if __name__ == '__main__':
main()#!/usr/bin/env python3
import pika
import json
def main():
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
def callback(ch, method, properties, body):
log = json.loads(body.decode())
with open('application.log', 'a') as f:
f.write(f"[{log['timestamp']}] {log['level']}: {log['message']}\n")
print(f"[FILE] Logged: {log['level']}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print('[*] File handler writing to application.log')
channel.start_consuming()
if __name__ == '__main__':
main()Scénario: Serveur envoie metrics (CPU, RAM, disk). Multiple dashboards reçoivent en temps réel.
#!/usr/bin/env python3
# server_monitor.py (Producer)
import pika
import json
import psutil
import time
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.exchange_declare(exchange='server_metrics', exchange_type='fanout')
while True:
metrics = {
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent,
'timestamp': time.time()
}
channel.basic_publish(
exchange='server_metrics',
routing_key='',
body=json.dumps(metrics).encode()
)
print(f"[✓] CPU: {metrics['cpu_percent']}% | RAM: {metrics['memory_percent']}%")
time.sleep(5) # Every 5 seconds#!/usr/bin/env python3
# dashboard.py (Consumer)
import pika
import json
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.exchange_declare(exchange='server_metrics', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='server_metrics', queue=queue_name)
def callback(ch, method, properties, body):
metrics = json.loads(body.decode())
# Alert if high usage
if metrics['cpu_percent'] > 80:
print(f"⚠️ HIGH CPU: {metrics['cpu_percent']}%")
if metrics['memory_percent'] > 90:
print(f"⚠️ HIGH MEMORY: {metrics['memory_percent']}%")
print(f"[Dashboard] CPU: {metrics['cpu_percent']}% | MEM: {metrics['memory_percent']}%")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print('[*] Dashboard monitoring...')
channel.start_consuming()Scénario: Trading system broadcast prix actions à multiple clients.
#!/usr/bin/env python3
# price_feeder.py
import pika
import json
import random
import time
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.exchange_declare(exchange='stock_prices', exchange_type='fanout')
stocks = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN']
while True:
for stock in stocks:
price = {
'symbol': stock,
'price': round(random.uniform(100, 500), 2),
'volume': random.randint(1000, 100000),
'timestamp': time.time()
}
channel.basic_publish(
exchange='stock_prices',
routing_key='',
body=json.dumps(price).encode()
)
time.sleep(2) # Update every 2 secondsScénario: Tasks routées vers queues HIGH/MEDIUM/LOW. Workers dédiés par priorité.
#!/usr/bin/env python3
# task_producer.py
import pika
import json
import sys
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.exchange_declare(exchange='tasks', exchange_type='direct')
# Declare all queues
for priority in ['high', 'medium', 'low']:
channel.queue_declare(queue=f'{priority}_priority', durable=True)
channel.queue_bind(
exchange='tasks',
queue=f'{priority}_priority',
routing_key=priority
)
tasks = [
('high', 'Fix critical security vulnerability'),
('high', 'Resolve production outage'),
('medium', 'Implement new feature'),
('medium', 'Code review PR #123'),
('low', 'Update documentation'),
('low', 'Refactor tests'),
]
for priority, description in tasks:
task = {
'priority': priority,
'description': description,
'status': 'pending'
}
channel.basic_publish(
exchange='tasks',
routing_key=priority,
body=json.dumps(task).encode(),
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"[✓] [{priority.upper()}] {description}")
connection.close()#!/usr/bin/env python3
# worker.py <priority>
import pika
import json
import time
import sys
if len(sys.argv) < 2:
print("Usage: python worker.py <high|medium|low>")
sys.exit(1)
priority = sys.argv[1]
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.exchange_declare(exchange='tasks', exchange_type='direct')
queue_name = f'{priority}_priority'
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(exchange='tasks', queue=queue_name, routing_key=priority)
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
task = json.loads(body.decode())
print(f"[{priority.upper()} WORKER] Processing: {task['description']}")
# Simulate work (high priority = less time)
work_time = {'high': 1, 'medium': 3, 'low': 5}
time.sleep(work_time[priority])
print(f"[{priority.upper()} WORKER] ✓ Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
print(f'[*] {priority.upper()} priority worker ready')
channel.start_consuming()Scénario: Documents routés vers processeurs spécialisés (PDF, Word, Image).
#!/usr/bin/env python3
# document_router.py
import pika
import json
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.exchange_declare(exchange='documents', exchange_type='direct')
documents = [
{'filename': 'report.pdf', 'type': 'pdf', 'size_kb': 1024},
{'filename': 'presentation.pptx', 'type': 'office', 'size_kb': 2048},
{'filename': 'photo.jpg', 'type': 'image', 'size_kb': 512},
{'filename': 'contract.pdf', 'type': 'pdf', 'size_kb': 256},
{'filename': 'diagram.png', 'type': 'image', 'size_kb': 128},
]
for doc in documents:
channel.basic_publish(
exchange='documents',
routing_key=doc['type'], # Route by file type
body=json.dumps(doc).encode(),
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"[✓] Routed {doc['filename']} to {doc['type']} processor")
connection.close()#!/usr/bin/env python3
# pdf_processor.py
import pika
import json
import time
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.exchange_declare(exchange='documents', exchange_type='direct')
channel.queue_declare(queue='pdf_processing', durable=True)
channel.queue_bind(exchange='documents', queue='pdf_processing', routing_key='pdf')
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
doc = json.loads(body.decode())
print(f"[PDF Processor] Processing {doc['filename']}...")
# Simulate: extract text, OCR, index
time.sleep(doc['size_kb'] / 1000) # Slower for larger files
print(f"[PDF Processor] ✓ Indexed {doc['filename']}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='pdf_processing', on_message_callback=callback)
print('[*] PDF processor ready')
channel.start_consuming()Scénario: Notifications routées vers email, SMS, ou push selon préférence user.
#!/usr/bin/env python3
# notification_sender.py
import pika
import json
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.exchange_declare(exchange='notifications', exchange_type='direct')
users = [
{'user_id': 1, 'preferred_channel': 'email', 'message': 'Welcome!'},
{'user_id': 2, 'preferred_channel': 'sms', 'message': 'Order shipped'},
{'user_id': 3, 'preferred_channel': 'push', 'message': 'New message'},
{'user_id': 4, 'preferred_channel': 'email', 'message': 'Password reset'},
]
for user in users:
notif = {
'user_id': user['user_id'],
'message': user['message'],
'channel': user['preferred_channel']
}
channel.basic_publish(
exchange='notifications',
routing_key=user['preferred_channel'],
body=json.dumps(notif).encode()
)
print(f"[✓] Sent via {user['preferred_channel']}: {user['message']}")
connection.close()Scénario: Events avec pattern "region.service.severity". Subscribers filtrent avec wildcards.
#!/usr/bin/env python3
# event_producer.py
import pika
import json
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.exchange_declare(exchange='events', exchange_type='topic')
events = [
('us.api.error', 'API timeout in us-east-1'),
('eu.database.warning', 'High query latency'),
('us.payment.error', 'Payment gateway down'),
('asia.cache.info', 'Cache hit rate 95%'),
('eu.api.critical', 'API returning 500 errors'),
('us.database.error', 'Connection pool exhausted'),
]
for routing_key, message in events:
event = {
'routing_key': routing_key,
'message': message,
'region': routing_key.split('.')[0],
'service': routing_key.split('.')[1],
'severity': routing_key.split('.')[2]
}
channel.basic_publish(
exchange='events',
routing_key=routing_key,
body=json.dumps(event).encode()
)
print(f"[✓] [{routing_key}] {message}")
connection.close()#!/usr/bin/env python3
# subscriber.py <pattern>
# Examples:
# python subscriber.py "us.#" # All US events
# python subscriber.py "*.api.*" # All API events any region
# python subscriber.py "*.*.error" # All errors any region/service
# python subscriber.py "eu.database.#" # EU database (any severity)
import pika
import json
import sys
if len(sys.argv) < 2:
print("Usage: python subscriber.py <pattern>")
sys.exit(1)
pattern = sys.argv[1]
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.exchange_declare(exchange='events', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='events', queue=queue_name, routing_key=pattern)
def callback(ch, method, properties, body):
event = json.loads(body.decode())
severity_colors = {
'info': '\033[94m',
'warning': '\033[93m',
'error': '\033[91m',
'critical': '\033[95m'
}
color = severity_colors.get(event['severity'], '')
reset = '\033[0m'
print(f"{color}[{event['routing_key']}] {event['message']}{reset}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(f'[*] Listening for pattern: {pattern}')
channel.start_consuming()Scénario: Track user actions. Analytics subscribe "user.*.purchase", fraud à "user.*.suspicious".
#!/usr/bin/env python3
# user_tracker.py
import pika
import json
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.exchange_declare(exchange='user_events', exchange_type='topic')
user_actions = [
(1, 'login', 'User logged in from Chrome'),
(1, 'view', 'Viewed product: Laptop'),
(1, 'cart', 'Added laptop to cart'),
(1, 'purchase', 'Purchased laptop for $999'),
(2, 'login', 'User logged in from Firefox'),
(2, 'suspicious', 'Multiple failed payment attempts'),
(3, 'signup', 'New user registered'),
(3, 'purchase', 'Purchased phone for $599'),
]
for user_id, action, description in user_actions:
routing_key = f'user.{user_id}.{action}'
event = {
'user_id': user_id,
'action': action,
'description': description,
'routing_key': routing_key
}
channel.basic_publish(
exchange='user_events',
routing_key=routing_key,
body=json.dumps(event).encode()
)
print(f"[✓] [{routing_key}] {description}")
connection.close()#!/usr/bin/env python3
# analytics.py (track purchases only)
import pika
import json
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.exchange_declare(exchange='user_events', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Subscribe to ALL purchase events
channel.queue_bind(exchange='user_events', queue=queue_name, routing_key='user.*.purchase')
def callback(ch, method, properties, body):
event = json.loads(body.decode())
print(f"[Analytics] Purchase by User {event['user_id']}: {event['description']}")
# Could write to analytics DB here
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print('[*] Analytics tracking purchases...')
channel.start_consuming()#!/usr/bin/env python3
# fraud_detection.py
import pika
import json
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.exchange_declare(exchange='user_events', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Subscribe to suspicious activity
channel.queue_bind(exchange='user_events', queue=queue_name, routing_key='user.*.suspicious')
def callback(ch, method, properties, body):
event = json.loads(body.decode())
print(f"🚨 [FRAUD ALERT] User {event['user_id']}: {event['description']}")
# Could lock account, send alert
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print('[*] Fraud detection active...')
channel.start_consuming()Scénario: Client envoie opérations math, serveur calcule et retourne résultat.
#!/usr/bin/env python3
# math_client.py
import pika
import uuid
import json
class MathRpcClient:
def __init__(self):
self.connection = pika.BlockingConnection(...)
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True
)
self.response = None
self.corr_id = None
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body.decode()
def call(self, operation, a, b):
self.response = None
self.corr_id = str(uuid.uuid4())
request = {
'operation': operation,
'a': a,
'b': b
}
self.channel.basic_publish(
exchange='',
routing_key='rpc_math',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=json.dumps(request).encode()
)
print(f"[→] Calling {operation}({a}, {b})...")
while self.response is None:
self.connection.process_data_events()
return json.loads(self.response)
def close(self):
self.connection.close()
# Usage
if __name__ == '__main__':
client = MathRpcClient()
# Test operations
operations = [
('add', 5, 3),
('subtract', 10, 4),
('multiply', 6, 7),
('divide', 20, 4),
('power', 2, 8),
('divide', 10, 0), # Error case
]
for op, a, b in operations:
result = client.call(op, a, b)
if 'error' in result:
print(f"[✗] {op}({a}, {b}) = ERROR: {result['error']}")
else:
print(f"[✓] {op}({a}, {b}) = {result['result']}")
client.close()#!/usr/bin/env python3
# math_server.py
import pika
import json
def calculate(operation, a, b):
ops = {
'add': lambda x, y: x + y,
'subtract': lambda x, y: x - y,
'multiply': lambda x, y: x * y,
'divide': lambda x, y: x / y if y != 0 else None,
'power': lambda x, y: x ** y,
'modulo': lambda x, y: x % y if y != 0 else None,
}
if operation not in ops:
return {'error': f'Unknown operation: {operation}'}
try:
result = ops[operation](a, b)
if result is None:
return {'error': 'Division by zero'}
return {'result': result}
except Exception as e:
return {'error': str(e)}
def on_request(ch, method, props, body):
request = json.loads(body.decode())
print(f"[.] Computing {request['operation']}({request['a']}, {request['b']})")
response = calculate(request['operation'], request['a'], request['b'])
ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(
correlation_id=props.correlation_id
),
body=json.dumps(response).encode()
)
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f"[x] Sent result: {response}")
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.queue_declare(queue='rpc_math')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_math', on_message_callback=on_request)
print('[*] Math RPC server ready. Waiting for requests...')
channel.start_consuming()Scénario EXAMEN: cat input.txt | python translator.py | less
⚠️ PATTERN LE PLUS PROBABLE À L'EXAMEN
[translations] hello = bonjour world = monde cat = chat dog = chien house = maison computer = ordinateur book = livre yes = oui no = non thank = merci please = s'il vous plaît
#!/usr/bin/env python3
import sys
import re
import pika
import uuid
class TranslatorRPC:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='concurp1.isc.heia-fr.ch',
port=5072,
credentials=pika.PlainCredentials('guest', 'guest')
)
)
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True
)
self.response = None
self.corr_id = None
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body.decode()
def translate(self, word):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='translate_rpc',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=word.encode()
)
while self.response is None:
self.connection.process_data_events()
return self.response
def close(self):
self.connection.close()
def main():
translator = TranslatorRPC()
try:
# Read from stdin line by line
for line in sys.stdin:
# Split into words and punctuation
tokens = re.findall(r'\b\w+\b|[^\w\s]', line)
translated_tokens = []
for token in tokens:
if token.isalpha():
# Translate word
translated = translator.translate(token.lower())
# Preserve original case
if token.isupper():
translated = translated.upper()
elif token[0].isupper():
translated = translated.capitalize()
translated_tokens.append(translated)
else:
# Keep punctuation/numbers as-is
translated_tokens.append(token)
# Write to stdout
print(' '.join(translated_tokens))
finally:
translator.close()
if __name__ == '__main__':
main()#!/usr/bin/env python3
import pika
import configparser
import sys
# Load dictionary from config file
config = configparser.ConfigParser()
config.read('dictionary.ini')
DICTIONARY = dict(config.items('translations'))
def translate(word):
"""Translate word using dictionary, return original if not found"""
return DICTIONARY.get(word.lower(), word)
def on_request(ch, method, props, body):
word = body.decode()
translation = translate(word)
ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(
correlation_id=props.correlation_id
),
body=translation.encode()
)
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='concurp1.isc.heia-fr.ch',
port=5072,
credentials=pika.PlainCredentials('guest', 'guest')
)
)
channel = connection.channel()
channel.queue_declare(queue='translate_rpc')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='translate_rpc', on_message_callback=on_request)
print('[*] Translation server ready')
channel.start_consuming()# Terminal 1: Start server python translation_server.py # Terminal 2: Test translator echo "Hello world" | python translator.py # Output: bonjour monde # Or with file cat input.txt | python translator.py > output.txt # Example input.txt: # Hello cat and dog. # Thank you for the book! # Expected output: # bonjour chat and chien. # merci you for the livre!
Scénario: Client envoie data pour validation (email, phone, etc). Serveur valide et retourne résultat.
#!/usr/bin/env python3
# validator_server.py
import pika
import json
import re
def validate_email(email):
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
def validate_phone(phone):
pattern = r'^\+?[0-9]{10,15}$'
return bool(re.match(pattern, phone.replace(' ', '').replace('-', '')))
def validate(data_type, value):
validators = {
'email': validate_email,
'phone': validate_phone,
'url': lambda v: v.startswith('http://') or v.startswith('https://'),
'positive_int': lambda v: v.isdigit() and int(v) > 0,
}
if data_type not in validators:
return {'valid': False, 'error': f'Unknown type: {data_type}'}
is_valid = validators[data_type](value)
return {'valid': is_valid}
def on_request(ch, method, props, body):
request = json.loads(body.decode())
response = validate(request['type'], request['value'])
ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=json.dumps(response).encode()
)
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.queue_declare(queue='validator_rpc')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='validator_rpc', on_message_callback=on_request)
print('[*] Validator RPC server ready')
channel.start_consuming()Scénario: Upload images → resize, compress, thumbnail. Multiple workers en parallèle.
#!/usr/bin/env python3
# image_uploader.py
import pika
import json
import os
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.queue_declare(queue='image_processing', durable=True)
# Simulate uploaded images
images = [
{'filename': 'photo1.jpg', 'path': '/uploads/photo1.jpg', 'size_mb': 5},
{'filename': 'photo2.jpg', 'path': '/uploads/photo2.jpg', 'size_mb': 3},
{'filename': 'photo3.jpg', 'path': '/uploads/photo3.jpg', 'size_mb': 8},
{'filename': 'photo4.jpg', 'path': '/uploads/photo4.jpg', 'size_mb': 2},
{'filename': 'photo5.jpg', 'path': '/uploads/photo5.jpg', 'size_mb': 6},
]
for img in images:
channel.basic_publish(
exchange='',
routing_key='image_processing',
body=json.dumps(img).encode(),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
)
)
print(f"[✓] Queued: {img['filename']} ({img['size_mb']} MB)")
print(f"\n[✓] {len(images)} images queued for processing")
connection.close()#!/usr/bin/env python3
# image_worker.py
import pika
import json
import time
import sys
worker_id = sys.argv[1] if len(sys.argv) > 1 else '1'
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.queue_declare(queue='image_processing', durable=True)
# Fair dispatch: one task at a time
channel.basic_qos(prefetch_count=1)
def process_image(ch, method, properties, body):
img = json.loads(body.decode())
print(f"[Worker {worker_id}] Processing {img['filename']}...")
# Simulate processing time based on file size
process_time = img['size_mb']
time.sleep(process_time)
print(f"[Worker {worker_id}] ✓ Done: {img['filename']}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue='image_processing',
on_message_callback=process_image,
auto_ack=False
)
print(f'[*] Worker {worker_id} ready. Waiting for images...')
channel.start_consuming()# Terminal 1 python image_worker.py Worker1 # Terminal 2 python image_worker.py Worker2 # Terminal 3 python image_worker.py Worker3 # Terminal 4 - Send images python image_uploader.py # Observer: images distribuées équitablement (round-robin)
Scénario EXAMEN: Print jobs broadcast à multiple imprimantes.
⚠️ FANOUT + WORK QUEUE pattern commun à l'examen
#!/usr/bin/env python3
# print_job_sender.py
import pika
import json
connection = pika.BlockingConnection(...)
channel = connection.channel()
# Fanout exchange - broadcast to all printers
channel.exchange_declare(exchange='print_jobs', exchange_type='fanout')
jobs = [
{'document': 'report_Q1.pdf', 'pages': 25, 'color': True, 'copies': 1},
{'document': 'invoice_1234.pdf', 'pages': 2, 'color': False, 'copies': 3},
{'document': 'presentation.pptx', 'pages': 50, 'color': True, 'copies': 1},
{'document': 'memo.docx', 'pages': 1, 'color': False, 'copies': 5},
]
for job in jobs:
channel.basic_publish(
exchange='print_jobs',
routing_key='',
body=json.dumps(job).encode()
)
color_str = 'Color' if job['color'] else 'B&W'
print(f"[✓] Print job: {job['document']} ({job['pages']} pages, {color_str}, {job['copies']} copies)")
connection.close()#!/usr/bin/env python3
# printer.py <printer_name>
import pika
import json
import time
import sys
if len(sys.argv) < 2:
print("Usage: python printer.py <printer_name>")
sys.exit(1)
printer_name = sys.argv[1]
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.exchange_declare(exchange='print_jobs', exchange_type='fanout')
# Each printer gets its own exclusive queue
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='print_jobs', queue=queue_name)
def print_job(ch, method, properties, body):
job = json.loads(body.decode())
print(f"[{printer_name}] Starting: {job['document']}")
for copy in range(job['copies']):
# Simulate printing time (0.5s per page)
time.sleep(job['pages'] * 0.5)
if job['copies'] > 1:
print(f"[{printer_name}] Copy {copy + 1}/{job['copies']} done")
print(f"[{printer_name}] ✓ Completed: {job['document']}")
channel.basic_consume(queue=queue_name, on_message_callback=print_job, auto_ack=True)
print(f'[*] {printer_name} ready for print jobs')
channel.start_consuming()# Terminal 1 python printer.py "HP-LaserJet-101" # Terminal 2 python printer.py "Canon-Pixma-202" # Terminal 3 python printer.py "Epson-WorkForce-303" # Terminal 4 - Send jobs python print_job_sender.py # Résultat: TOUTES les imprimantes reçoivent TOUS les jobs (fanout)
Scénario: Emails en queue, workers les envoient via SMTP.
#!/usr/bin/env python3
# email_sender.py
import pika
import json
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.queue_declare(queue='email_queue', durable=True)
emails = [
{'to': 'user1@example.com', 'subject': 'Welcome!', 'body': 'Thanks for signing up'},
{'to': 'user2@example.com', 'subject': 'Order Confirmation', 'body': 'Your order #1234'},
{'to': 'user3@example.com', 'subject': 'Password Reset', 'body': 'Reset link: ...'},
{'to': 'user4@example.com', 'subject': 'Monthly Newsletter', 'body': 'Check out...'},
]
for email in emails:
channel.basic_publish(
exchange='',
routing_key='email_queue',
body=json.dumps(email).encode(),
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"[✓] Queued email to {email['to']}")
connection.close()#!/usr/bin/env python3
# email_worker.py
import pika
import json
import time
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.queue_declare(queue='email_queue', durable=True)
channel.basic_qos(prefetch_count=1)
def send_email(ch, method, properties, body):
email = json.loads(body.decode())
print(f"[📧] Sending to {email['to']}: {email['subject']}")
# Simulate SMTP send
time.sleep(2)
print(f"[✓] Sent to {email['to']}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='email_queue', on_message_callback=send_email, auto_ack=False)
print('[*] Email worker ready')
channel.start_consuming()1. Code à Compléter (70% chance)
Fichier avec TODO comments. Ajouter exchange_type, routing_key, callbacks.
2. Debugger Code Cassé (60% chance)
Messages perdus (manque ack), wrong exchange type, oubli queue_bind.
3. Adapter Pattern (50% chance)
Transformer fanout → direct, ajouter persistence, ConfigParser.
4. Unix Pipe (Lab21 style) (40% chance)
cat input | python script.py | less avec RPC translation.
5. Printer System (Lab20 style) (30% chance)
Fanout broadcast print jobs à multiple printers.
host='concurp1.isc.heia-fr.ch', port=5072auto_ack=False + manual basic_ack()durable=True + delivery_mode=2basic_qos(prefetch_count=1)connection.close() à la finpython producer.py && python consumer.pyEvents → filter → aggregate → store
Topic ExchangeVideos → 720p, 1080p, 4K workers
Work QueueOrders → validate → payment → ship
Direct ExchangeDocs → parse → index → search
Work QueueForms → validate → store → confirm
RPCScores → aggregate → rankings
Topic ExchangeFiles → compress → encrypt → store
Work QueueData → compute → format → send
RPCThreshold breached → notify all
Fanout