Back/Tous les Exercices Possibles

Banque d'Exercices RabbitMQ - Préparation Examen

🎯 Cette page contient 25+ exercices couvrant TOUS les patterns et problèmes possibles. Code complet fourni pour chaque exercice.

📡 FANOUT EXCHANGE (Pub/Sub)

Ex 1.1: Système de Logs Multi-Handler

Scénario: Application génère logs. Broadcast vers console, fichier, email (si ERROR).

Producer (emit_log.py)

#!/usr/bin/env python3
import pika
import json
from datetime import datetime
import sys

def main():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(
            host='concurp1.isc.heia-fr.ch',
            port=5072,
            credentials=pika.PlainCredentials('guest', 'guest')
        )
    )
    channel = connection.channel()

    # Fanout exchange
    channel.exchange_declare(exchange='logs', exchange_type='fanout')

    # Simulate log messages
    levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
    for i, level in enumerate(levels):
        log = {
            'level': level,
            'message': f'Application event {i}',
            'timestamp': datetime.now().isoformat(),
            'module': 'main'
        }

        channel.basic_publish(
            exchange='logs',
            routing_key='',  # Ignored by fanout
            body=json.dumps(log).encode()
        )
        print(f"[✓] Sent {level}: {log['message']}")

    connection.close()

if __name__ == '__main__':
    main()

Consumer - Console Handler (log_console.py)

#!/usr/bin/env python3
import pika
import json

def main():
    connection = pika.BlockingConnection(...)
    channel = connection.channel()

    channel.exchange_declare(exchange='logs', exchange_type='fanout')

    # Exclusive queue (auto-deleted)
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    channel.queue_bind(exchange='logs', queue=queue_name)

    def callback(ch, method, properties, body):
        log = json.loads(body.decode())
        color_map = {
            'DEBUG': '\033[90m',    # Gray
            'INFO': '\033[94m',     # Blue
            'WARNING': '\033[93m',  # Yellow
            'ERROR': '\033[91m',    # Red
            'CRITICAL': '\033[95m'  # Magenta
        }
        reset = '\033[0m'

        color = color_map.get(log['level'], '')
        print(f"{color}[{log['timestamp']}] {log['level']:8} {log['message']}{reset}")

    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
    print('[*] Console handler waiting. Press CTRL+C to exit')
    channel.start_consuming()

if __name__ == '__main__':
    main()

Consumer - File Handler (log_file.py)

#!/usr/bin/env python3
import pika
import json

def main():
    connection = pika.BlockingConnection(...)
    channel = connection.channel()

    channel.exchange_declare(exchange='logs', exchange_type='fanout')

    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    channel.queue_bind(exchange='logs', queue=queue_name)

    def callback(ch, method, properties, body):
        log = json.loads(body.decode())
        with open('application.log', 'a') as f:
            f.write(f"[{log['timestamp']}] {log['level']}: {log['message']}\n")
        print(f"[FILE] Logged: {log['level']}")

    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
    print('[*] File handler writing to application.log')
    channel.start_consuming()

if __name__ == '__main__':
    main()

Ex 1.2: Système de Monitoring Serveur

Scénario: Serveur envoie metrics (CPU, RAM, disk). Multiple dashboards reçoivent en temps réel.

#!/usr/bin/env python3
# server_monitor.py (Producer)
import pika
import json
import psutil
import time

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.exchange_declare(exchange='server_metrics', exchange_type='fanout')

while True:
    metrics = {
        'cpu_percent': psutil.cpu_percent(interval=1),
        'memory_percent': psutil.virtual_memory().percent,
        'disk_percent': psutil.disk_usage('/').percent,
        'timestamp': time.time()
    }

    channel.basic_publish(
        exchange='server_metrics',
        routing_key='',
        body=json.dumps(metrics).encode()
    )
    print(f"[✓] CPU: {metrics['cpu_percent']}% | RAM: {metrics['memory_percent']}%")
    time.sleep(5)  # Every 5 seconds
#!/usr/bin/env python3
# dashboard.py (Consumer)
import pika
import json

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.exchange_declare(exchange='server_metrics', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='server_metrics', queue=queue_name)

def callback(ch, method, properties, body):
    metrics = json.loads(body.decode())

    # Alert if high usage
    if metrics['cpu_percent'] > 80:
        print(f"⚠️  HIGH CPU: {metrics['cpu_percent']}%")
    if metrics['memory_percent'] > 90:
        print(f"⚠️  HIGH MEMORY: {metrics['memory_percent']}%")

    print(f"[Dashboard] CPU: {metrics['cpu_percent']}% | MEM: {metrics['memory_percent']}%")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print('[*] Dashboard monitoring...')
channel.start_consuming()

Ex 1.3: Broadcast de Prix en Temps Réel

Scénario: Trading system broadcast prix actions à multiple clients.

#!/usr/bin/env python3
# price_feeder.py
import pika
import json
import random
import time

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.exchange_declare(exchange='stock_prices', exchange_type='fanout')

stocks = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN']

while True:
    for stock in stocks:
        price = {
            'symbol': stock,
            'price': round(random.uniform(100, 500), 2),
            'volume': random.randint(1000, 100000),
            'timestamp': time.time()
        }

        channel.basic_publish(
            exchange='stock_prices',
            routing_key='',
            body=json.dumps(price).encode()
        )

    time.sleep(2)  # Update every 2 seconds

🎯 DIRECT EXCHANGE (Routing)

Ex 2.1: Queue de Tâches avec Priorités

Scénario: Tasks routées vers queues HIGH/MEDIUM/LOW. Workers dédiés par priorité.

#!/usr/bin/env python3
# task_producer.py
import pika
import json
import sys

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.exchange_declare(exchange='tasks', exchange_type='direct')

# Declare all queues
for priority in ['high', 'medium', 'low']:
    channel.queue_declare(queue=f'{priority}_priority', durable=True)
    channel.queue_bind(
        exchange='tasks',
        queue=f'{priority}_priority',
        routing_key=priority
    )

tasks = [
    ('high', 'Fix critical security vulnerability'),
    ('high', 'Resolve production outage'),
    ('medium', 'Implement new feature'),
    ('medium', 'Code review PR #123'),
    ('low', 'Update documentation'),
    ('low', 'Refactor tests'),
]

for priority, description in tasks:
    task = {
        'priority': priority,
        'description': description,
        'status': 'pending'
    }

    channel.basic_publish(
        exchange='tasks',
        routing_key=priority,
        body=json.dumps(task).encode(),
        properties=pika.BasicProperties(delivery_mode=2)
    )
    print(f"[✓] [{priority.upper()}] {description}")

connection.close()
#!/usr/bin/env python3
# worker.py <priority>
import pika
import json
import time
import sys

if len(sys.argv) < 2:
    print("Usage: python worker.py <high|medium|low>")
    sys.exit(1)

priority = sys.argv[1]

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.exchange_declare(exchange='tasks', exchange_type='direct')

queue_name = f'{priority}_priority'
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(exchange='tasks', queue=queue_name, routing_key=priority)

channel.basic_qos(prefetch_count=1)

def callback(ch, method, properties, body):
    task = json.loads(body.decode())
    print(f"[{priority.upper()} WORKER] Processing: {task['description']}")

    # Simulate work (high priority = less time)
    work_time = {'high': 1, 'medium': 3, 'low': 5}
    time.sleep(work_time[priority])

    print(f"[{priority.upper()} WORKER] ✓ Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue=queue_name, on_message_callback=callback)

print(f'[*] {priority.upper()} priority worker ready')
channel.start_consuming()

Ex 2.2: Routage par Type de Document

Scénario: Documents routés vers processeurs spécialisés (PDF, Word, Image).

#!/usr/bin/env python3
# document_router.py
import pika
import json

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.exchange_declare(exchange='documents', exchange_type='direct')

documents = [
    {'filename': 'report.pdf', 'type': 'pdf', 'size_kb': 1024},
    {'filename': 'presentation.pptx', 'type': 'office', 'size_kb': 2048},
    {'filename': 'photo.jpg', 'type': 'image', 'size_kb': 512},
    {'filename': 'contract.pdf', 'type': 'pdf', 'size_kb': 256},
    {'filename': 'diagram.png', 'type': 'image', 'size_kb': 128},
]

for doc in documents:
    channel.basic_publish(
        exchange='documents',
        routing_key=doc['type'],  # Route by file type
        body=json.dumps(doc).encode(),
        properties=pika.BasicProperties(delivery_mode=2)
    )
    print(f"[✓] Routed {doc['filename']} to {doc['type']} processor")

connection.close()
#!/usr/bin/env python3
# pdf_processor.py
import pika
import json
import time

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.exchange_declare(exchange='documents', exchange_type='direct')

channel.queue_declare(queue='pdf_processing', durable=True)
channel.queue_bind(exchange='documents', queue='pdf_processing', routing_key='pdf')

channel.basic_qos(prefetch_count=1)

def callback(ch, method, properties, body):
    doc = json.loads(body.decode())
    print(f"[PDF Processor] Processing {doc['filename']}...")

    # Simulate: extract text, OCR, index
    time.sleep(doc['size_kb'] / 1000)  # Slower for larger files

    print(f"[PDF Processor] ✓ Indexed {doc['filename']}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='pdf_processing', on_message_callback=callback)
print('[*] PDF processor ready')
channel.start_consuming()

Ex 2.3: Notification par Canal

Scénario: Notifications routées vers email, SMS, ou push selon préférence user.

#!/usr/bin/env python3
# notification_sender.py
import pika
import json

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.exchange_declare(exchange='notifications', exchange_type='direct')

users = [
    {'user_id': 1, 'preferred_channel': 'email', 'message': 'Welcome!'},
    {'user_id': 2, 'preferred_channel': 'sms', 'message': 'Order shipped'},
    {'user_id': 3, 'preferred_channel': 'push', 'message': 'New message'},
    {'user_id': 4, 'preferred_channel': 'email', 'message': 'Password reset'},
]

for user in users:
    notif = {
        'user_id': user['user_id'],
        'message': user['message'],
        'channel': user['preferred_channel']
    }

    channel.basic_publish(
        exchange='notifications',
        routing_key=user['preferred_channel'],
        body=json.dumps(notif).encode()
    )
    print(f"[✓] Sent via {user['preferred_channel']}: {user['message']}")

connection.close()

🌐 TOPIC EXCHANGE (Pattern Matching)

Ex 3.1: Events Multi-Dimensionnels (Region.Service.Level)

Scénario: Events avec pattern "region.service.severity". Subscribers filtrent avec wildcards.

#!/usr/bin/env python3
# event_producer.py
import pika
import json

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.exchange_declare(exchange='events', exchange_type='topic')

events = [
    ('us.api.error', 'API timeout in us-east-1'),
    ('eu.database.warning', 'High query latency'),
    ('us.payment.error', 'Payment gateway down'),
    ('asia.cache.info', 'Cache hit rate 95%'),
    ('eu.api.critical', 'API returning 500 errors'),
    ('us.database.error', 'Connection pool exhausted'),
]

for routing_key, message in events:
    event = {
        'routing_key': routing_key,
        'message': message,
        'region': routing_key.split('.')[0],
        'service': routing_key.split('.')[1],
        'severity': routing_key.split('.')[2]
    }

    channel.basic_publish(
        exchange='events',
        routing_key=routing_key,
        body=json.dumps(event).encode()
    )
    print(f"[✓] [{routing_key}] {message}")

connection.close()
#!/usr/bin/env python3
# subscriber.py <pattern>
# Examples:
# python subscriber.py "us.#"           # All US events
# python subscriber.py "*.api.*"        # All API events any region
# python subscriber.py "*.*.error"      # All errors any region/service
# python subscriber.py "eu.database.#"  # EU database (any severity)

import pika
import json
import sys

if len(sys.argv) < 2:
    print("Usage: python subscriber.py <pattern>")
    sys.exit(1)

pattern = sys.argv[1]

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.exchange_declare(exchange='events', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='events', queue=queue_name, routing_key=pattern)

def callback(ch, method, properties, body):
    event = json.loads(body.decode())

    severity_colors = {
        'info': '\033[94m',
        'warning': '\033[93m',
        'error': '\033[91m',
        'critical': '\033[95m'
    }

    color = severity_colors.get(event['severity'], '')
    reset = '\033[0m'

    print(f"{color}[{event['routing_key']}] {event['message']}{reset}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print(f'[*] Listening for pattern: {pattern}')
channel.start_consuming()

Ex 3.2: E-Commerce Events (user.{userId}.{action})

Scénario: Track user actions. Analytics subscribe "user.*.purchase", fraud à "user.*.suspicious".

#!/usr/bin/env python3
# user_tracker.py
import pika
import json

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.exchange_declare(exchange='user_events', exchange_type='topic')

user_actions = [
    (1, 'login', 'User logged in from Chrome'),
    (1, 'view', 'Viewed product: Laptop'),
    (1, 'cart', 'Added laptop to cart'),
    (1, 'purchase', 'Purchased laptop for $999'),
    (2, 'login', 'User logged in from Firefox'),
    (2, 'suspicious', 'Multiple failed payment attempts'),
    (3, 'signup', 'New user registered'),
    (3, 'purchase', 'Purchased phone for $599'),
]

for user_id, action, description in user_actions:
    routing_key = f'user.{user_id}.{action}'

    event = {
        'user_id': user_id,
        'action': action,
        'description': description,
        'routing_key': routing_key
    }

    channel.basic_publish(
        exchange='user_events',
        routing_key=routing_key,
        body=json.dumps(event).encode()
    )
    print(f"[✓] [{routing_key}] {description}")

connection.close()
#!/usr/bin/env python3
# analytics.py (track purchases only)
import pika
import json

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.exchange_declare(exchange='user_events', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# Subscribe to ALL purchase events
channel.queue_bind(exchange='user_events', queue=queue_name, routing_key='user.*.purchase')

def callback(ch, method, properties, body):
    event = json.loads(body.decode())
    print(f"[Analytics] Purchase by User {event['user_id']}: {event['description']}")
    # Could write to analytics DB here

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print('[*] Analytics tracking purchases...')
channel.start_consuming()
#!/usr/bin/env python3
# fraud_detection.py
import pika
import json

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.exchange_declare(exchange='user_events', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# Subscribe to suspicious activity
channel.queue_bind(exchange='user_events', queue=queue_name, routing_key='user.*.suspicious')

def callback(ch, method, properties, body):
    event = json.loads(body.decode())
    print(f"🚨 [FRAUD ALERT] User {event['user_id']}: {event['description']}")
    # Could lock account, send alert

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print('[*] Fraud detection active...')
channel.start_consuming()

🔄 RPC PATTERN (Request-Reply)

Ex 4.1: Service de Calcul Mathématique

Scénario: Client envoie opérations math, serveur calcule et retourne résultat.

#!/usr/bin/env python3
# math_client.py
import pika
import uuid
import json

class MathRpcClient:
    def __init__(self):
        self.connection = pika.BlockingConnection(...)
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True
        )

        self.response = None
        self.corr_id = None

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body.decode()

    def call(self, operation, a, b):
        self.response = None
        self.corr_id = str(uuid.uuid4())

        request = {
            'operation': operation,
            'a': a,
            'b': b
        }

        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_math',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=json.dumps(request).encode()
        )

        print(f"[→] Calling {operation}({a}, {b})...")

        while self.response is None:
            self.connection.process_data_events()

        return json.loads(self.response)

    def close(self):
        self.connection.close()

# Usage
if __name__ == '__main__':
    client = MathRpcClient()

    # Test operations
    operations = [
        ('add', 5, 3),
        ('subtract', 10, 4),
        ('multiply', 6, 7),
        ('divide', 20, 4),
        ('power', 2, 8),
        ('divide', 10, 0),  # Error case
    ]

    for op, a, b in operations:
        result = client.call(op, a, b)
        if 'error' in result:
            print(f"[✗] {op}({a}, {b}) = ERROR: {result['error']}")
        else:
            print(f"[✓] {op}({a}, {b}) = {result['result']}")

    client.close()
#!/usr/bin/env python3
# math_server.py
import pika
import json

def calculate(operation, a, b):
    ops = {
        'add': lambda x, y: x + y,
        'subtract': lambda x, y: x - y,
        'multiply': lambda x, y: x * y,
        'divide': lambda x, y: x / y if y != 0 else None,
        'power': lambda x, y: x ** y,
        'modulo': lambda x, y: x % y if y != 0 else None,
    }

    if operation not in ops:
        return {'error': f'Unknown operation: {operation}'}

    try:
        result = ops[operation](a, b)
        if result is None:
            return {'error': 'Division by zero'}
        return {'result': result}
    except Exception as e:
        return {'error': str(e)}

def on_request(ch, method, props, body):
    request = json.loads(body.decode())

    print(f"[.] Computing {request['operation']}({request['a']}, {request['b']})")

    response = calculate(request['operation'], request['a'], request['b'])

    ch.basic_publish(
        exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(
            correlation_id=props.correlation_id
        ),
        body=json.dumps(response).encode()
    )

    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(f"[x] Sent result: {response}")

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.queue_declare(queue='rpc_math')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_math', on_message_callback=on_request)

print('[*] Math RPC server ready. Waiting for requests...')
channel.start_consuming()

Ex 4.2: Traducteur Unix Pipe (Lab21 Style) ⭐

Scénario EXAMEN: cat input.txt | python translator.py | less

⚠️ PATTERN LE PLUS PROBABLE À L'EXAMEN

dictionary.ini

[translations]
hello = bonjour
world = monde
cat = chat
dog = chien
house = maison
computer = ordinateur
book = livre
yes = oui
no = non
thank = merci
please = s'il vous plaît

translator.py (Client - lit stdin/écrit stdout)

#!/usr/bin/env python3
import sys
import re
import pika
import uuid

class TranslatorRPC:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                host='concurp1.isc.heia-fr.ch',
                port=5072,
                credentials=pika.PlainCredentials('guest', 'guest')
            )
        )
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True
        )

        self.response = None
        self.corr_id = None

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body.decode()

    def translate(self, word):
        self.response = None
        self.corr_id = str(uuid.uuid4())

        self.channel.basic_publish(
            exchange='',
            routing_key='translate_rpc',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=word.encode()
        )

        while self.response is None:
            self.connection.process_data_events()

        return self.response

    def close(self):
        self.connection.close()

def main():
    translator = TranslatorRPC()

    try:
        # Read from stdin line by line
        for line in sys.stdin:
            # Split into words and punctuation
            tokens = re.findall(r'\b\w+\b|[^\w\s]', line)

            translated_tokens = []
            for token in tokens:
                if token.isalpha():
                    # Translate word
                    translated = translator.translate(token.lower())

                    # Preserve original case
                    if token.isupper():
                        translated = translated.upper()
                    elif token[0].isupper():
                        translated = translated.capitalize()

                    translated_tokens.append(translated)
                else:
                    # Keep punctuation/numbers as-is
                    translated_tokens.append(token)

            # Write to stdout
            print(' '.join(translated_tokens))

    finally:
        translator.close()

if __name__ == '__main__':
    main()

translation_server.py (Server avec ConfigParser)

#!/usr/bin/env python3
import pika
import configparser
import sys

# Load dictionary from config file
config = configparser.ConfigParser()
config.read('dictionary.ini')

DICTIONARY = dict(config.items('translations'))

def translate(word):
    """Translate word using dictionary, return original if not found"""
    return DICTIONARY.get(word.lower(), word)

def on_request(ch, method, props, body):
    word = body.decode()
    translation = translate(word)

    ch.basic_publish(
        exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(
            correlation_id=props.correlation_id
        ),
        body=translation.encode()
    )

    ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='concurp1.isc.heia-fr.ch',
        port=5072,
        credentials=pika.PlainCredentials('guest', 'guest')
    )
)
channel = connection.channel()

channel.queue_declare(queue='translate_rpc')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='translate_rpc', on_message_callback=on_request)

print('[*] Translation server ready')
channel.start_consuming()

Usage:

# Terminal 1: Start server
python translation_server.py

# Terminal 2: Test translator
echo "Hello world" | python translator.py
# Output: bonjour monde

# Or with file
cat input.txt | python translator.py > output.txt

# Example input.txt:
# Hello cat and dog.
# Thank you for the book!

# Expected output:
# bonjour chat and chien.
# merci you for the livre!

Ex 4.3: Service de Validation de Données

Scénario: Client envoie data pour validation (email, phone, etc). Serveur valide et retourne résultat.

#!/usr/bin/env python3
# validator_server.py
import pika
import json
import re

def validate_email(email):
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    return bool(re.match(pattern, email))

def validate_phone(phone):
    pattern = r'^\+?[0-9]{10,15}$'
    return bool(re.match(pattern, phone.replace(' ', '').replace('-', '')))

def validate(data_type, value):
    validators = {
        'email': validate_email,
        'phone': validate_phone,
        'url': lambda v: v.startswith('http://') or v.startswith('https://'),
        'positive_int': lambda v: v.isdigit() and int(v) > 0,
    }

    if data_type not in validators:
        return {'valid': False, 'error': f'Unknown type: {data_type}'}

    is_valid = validators[data_type](value)
    return {'valid': is_valid}

def on_request(ch, method, props, body):
    request = json.loads(body.decode())
    response = validate(request['type'], request['value'])

    ch.basic_publish(
        exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(correlation_id=props.correlation_id),
        body=json.dumps(response).encode()
    )

    ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.queue_declare(queue='validator_rpc')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='validator_rpc', on_message_callback=on_request)

print('[*] Validator RPC server ready')
channel.start_consuming()

⚙️ WORK QUEUE (Fair Dispatch)

Ex 5.1: Traitement d'Images (Pipeline)

Scénario: Upload images → resize, compress, thumbnail. Multiple workers en parallèle.

#!/usr/bin/env python3
# image_uploader.py
import pika
import json
import os

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.queue_declare(queue='image_processing', durable=True)

# Simulate uploaded images
images = [
    {'filename': 'photo1.jpg', 'path': '/uploads/photo1.jpg', 'size_mb': 5},
    {'filename': 'photo2.jpg', 'path': '/uploads/photo2.jpg', 'size_mb': 3},
    {'filename': 'photo3.jpg', 'path': '/uploads/photo3.jpg', 'size_mb': 8},
    {'filename': 'photo4.jpg', 'path': '/uploads/photo4.jpg', 'size_mb': 2},
    {'filename': 'photo5.jpg', 'path': '/uploads/photo5.jpg', 'size_mb': 6},
]

for img in images:
    channel.basic_publish(
        exchange='',
        routing_key='image_processing',
        body=json.dumps(img).encode(),
        properties=pika.BasicProperties(
            delivery_mode=2,  # Persistent
        )
    )
    print(f"[✓] Queued: {img['filename']} ({img['size_mb']} MB)")

print(f"\n[✓] {len(images)} images queued for processing")
connection.close()
#!/usr/bin/env python3
# image_worker.py
import pika
import json
import time
import sys

worker_id = sys.argv[1] if len(sys.argv) > 1 else '1'

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.queue_declare(queue='image_processing', durable=True)

# Fair dispatch: one task at a time
channel.basic_qos(prefetch_count=1)

def process_image(ch, method, properties, body):
    img = json.loads(body.decode())

    print(f"[Worker {worker_id}] Processing {img['filename']}...")

    # Simulate processing time based on file size
    process_time = img['size_mb']
    time.sleep(process_time)

    print(f"[Worker {worker_id}] ✓ Done: {img['filename']}")

    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(
    queue='image_processing',
    on_message_callback=process_image,
    auto_ack=False
)

print(f'[*] Worker {worker_id} ready. Waiting for images...')
channel.start_consuming()

Test (lancez plusieurs workers):

# Terminal 1
python image_worker.py Worker1

# Terminal 2
python image_worker.py Worker2

# Terminal 3
python image_worker.py Worker3

# Terminal 4 - Send images
python image_uploader.py

# Observer: images distribuées équitablement (round-robin)

Ex 5.2: Système d'Impression (Lab20 Style) ⭐

Scénario EXAMEN: Print jobs broadcast à multiple imprimantes.

⚠️ FANOUT + WORK QUEUE pattern commun à l'examen

#!/usr/bin/env python3
# print_job_sender.py
import pika
import json

connection = pika.BlockingConnection(...)
channel = connection.channel()

# Fanout exchange - broadcast to all printers
channel.exchange_declare(exchange='print_jobs', exchange_type='fanout')

jobs = [
    {'document': 'report_Q1.pdf', 'pages': 25, 'color': True, 'copies': 1},
    {'document': 'invoice_1234.pdf', 'pages': 2, 'color': False, 'copies': 3},
    {'document': 'presentation.pptx', 'pages': 50, 'color': True, 'copies': 1},
    {'document': 'memo.docx', 'pages': 1, 'color': False, 'copies': 5},
]

for job in jobs:
    channel.basic_publish(
        exchange='print_jobs',
        routing_key='',
        body=json.dumps(job).encode()
    )

    color_str = 'Color' if job['color'] else 'B&W'
    print(f"[✓] Print job: {job['document']} ({job['pages']} pages, {color_str}, {job['copies']} copies)")

connection.close()
#!/usr/bin/env python3
# printer.py <printer_name>
import pika
import json
import time
import sys

if len(sys.argv) < 2:
    print("Usage: python printer.py <printer_name>")
    sys.exit(1)

printer_name = sys.argv[1]

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.exchange_declare(exchange='print_jobs', exchange_type='fanout')

# Each printer gets its own exclusive queue
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='print_jobs', queue=queue_name)

def print_job(ch, method, properties, body):
    job = json.loads(body.decode())

    print(f"[{printer_name}] Starting: {job['document']}")

    for copy in range(job['copies']):
        # Simulate printing time (0.5s per page)
        time.sleep(job['pages'] * 0.5)

        if job['copies'] > 1:
            print(f"[{printer_name}]   Copy {copy + 1}/{job['copies']} done")

    print(f"[{printer_name}] ✓ Completed: {job['document']}")

channel.basic_consume(queue=queue_name, on_message_callback=print_job, auto_ack=True)

print(f'[*] {printer_name} ready for print jobs')
channel.start_consuming()

Test (démarrez plusieurs imprimantes):

# Terminal 1
python printer.py "HP-LaserJet-101"

# Terminal 2
python printer.py "Canon-Pixma-202"

# Terminal 3
python printer.py "Epson-WorkForce-303"

# Terminal 4 - Send jobs
python print_job_sender.py

# Résultat: TOUTES les imprimantes reçoivent TOUS les jobs (fanout)

Ex 5.3: Queue d'Emails

Scénario: Emails en queue, workers les envoient via SMTP.

#!/usr/bin/env python3
# email_sender.py
import pika
import json

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.queue_declare(queue='email_queue', durable=True)

emails = [
    {'to': 'user1@example.com', 'subject': 'Welcome!', 'body': 'Thanks for signing up'},
    {'to': 'user2@example.com', 'subject': 'Order Confirmation', 'body': 'Your order #1234'},
    {'to': 'user3@example.com', 'subject': 'Password Reset', 'body': 'Reset link: ...'},
    {'to': 'user4@example.com', 'subject': 'Monthly Newsletter', 'body': 'Check out...'},
]

for email in emails:
    channel.basic_publish(
        exchange='',
        routing_key='email_queue',
        body=json.dumps(email).encode(),
        properties=pika.BasicProperties(delivery_mode=2)
    )
    print(f"[✓] Queued email to {email['to']}")

connection.close()
#!/usr/bin/env python3
# email_worker.py
import pika
import json
import time

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.queue_declare(queue='email_queue', durable=True)
channel.basic_qos(prefetch_count=1)

def send_email(ch, method, properties, body):
    email = json.loads(body.decode())

    print(f"[📧] Sending to {email['to']}: {email['subject']}")

    # Simulate SMTP send
    time.sleep(2)

    print(f"[✓] Sent to {email['to']}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='email_queue', on_message_callback=send_email, auto_ack=False)

print('[*] Email worker ready')
channel.start_consuming()

🎯 SCÉNARIOS TYPE EXAMEN

Ce Qui Tombera PROBABLEMENT

1. Code à Compléter (70% chance)

Fichier avec TODO comments. Ajouter exchange_type, routing_key, callbacks.

2. Debugger Code Cassé (60% chance)

Messages perdus (manque ack), wrong exchange type, oubli queue_bind.

3. Adapter Pattern (50% chance)

Transformer fanout → direct, ajouter persistence, ConfigParser.

4. Unix Pipe (Lab21 style) (40% chance)

cat input | python script.py | less avec RPC translation.

5. Printer System (Lab20 style) (30% chance)

Fanout broadcast print jobs à multiple printers.

💡 Checklist Examen

  • host='concurp1.isc.heia-fr.ch', port=5072
  • Toujours auto_ack=False + manual basic_ack()
  • Persistence: durable=True + delivery_mode=2
  • Fair dispatch: basic_qos(prefetch_count=1)
  • Fanout = broadcast (routing_key ignoré)
  • Direct = routing exact
  • Topic = wildcards (* = 1 mot, # = 0+ mots)
  • RPC: correlation_id + reply_to + exclusive callback queue
  • Toujours connection.close() à la fin
  • Test: python producer.py && python consumer.py

🎲 Autres Scénarios Possibles

📊 Analytics Pipeline

Events → filter → aggregate → store

Topic Exchange

🎬 Video Encoding

Videos → 720p, 1080p, 4K workers

Work Queue

🛒 Order Processing

Orders → validate → payment → ship

Direct Exchange

🔍 Search Indexing

Docs → parse → index → search

Work Queue

📝 Form Validation

Forms → validate → store → confirm

RPC

🎮 Game Leaderboard

Scores → aggregate → rankings

Topic Exchange

💾 Backup System

Files → compress → encrypt → store

Work Queue

📊 Reporting Service

Data → compute → format → send

RPC

🔔 Alert System

Threshold breached → notify all

Fanout