Same message processed multiple times. Order created twice, email sent twice, payment charged twice.
Impact: Data corruption, duplicate charges, angry users
Consumer processes message successfully, but crashes BEFORE sending ack. RabbitMQ redelivers message to another consumer.
def callback(ch, method, properties, body):
# Process message
create_order(body) # ✓ Order created in database
# Crash here before ack (network failure, OOM, etc.)
ch.basic_ack(delivery_tag=method.delivery_tag) # Never reached
# Result: RabbitMQ redelivers -> create_order() called again -> duplicate orderMessage sent successfully, but network fails before confirmation received. Publisher retries, thinking it failed.
try:
channel.basic_publish(...)
# Message actually delivered, but network dies before TCP ack
except Exception:
# Publisher thinks it failed, retries
channel.basic_publish(...) # Duplicate!Consumer takes too long. RabbitMQ assumes consumer dead, redelivers to another consumer. Both process same message.
def callback(ch, method, properties, body):
# Takes 5 minutes (longer than heartbeat timeout)
slow_operation(body)
# Connection already closed by broker
ch.basic_ack(...) # Fails, message already redelivered
# Result: Another consumer also processing same messageMake processing safe to repeat. Same operation twice = same result.
❌ Not Idempotent
def process(order_id):
# Creates duplicate on retry
db.execute("""
INSERT INTO orders
VALUES (?, ?)
""", order_id, data)
def process(balance):
# Charges twice
balance -= 100
return balance✅ Idempotent
def process(order_id):
# Safe to retry (INSERT IGNORE or ON CONFLICT DO NOTHING)
db.execute("""
INSERT INTO orders
VALUES (?, ?)
ON CONFLICT (order_id) DO NOTHING
""", order_id, data)
def process(account_id, amount):
# Deduplication check
if not already_charged(account_id, transaction_id):
charge(account_id, amount)
mark_charged(transaction_id)Track processed message IDs in database. Skip if already processed.
import uuid
# Producer: Add unique message ID
msg_id = str(uuid.uuid4())
channel.basic_publish(
...,
properties=pika.BasicProperties(message_id=msg_id),
body=data
)
# Consumer: Check if already processed
def callback(ch, method, properties, body):
msg_id = properties.message_id
# Check dedup table
if already_processed(msg_id):
print(f"[!] Duplicate detected: {msg_id}")
ch.basic_ack(delivery_tag=method.delivery_tag)
return
# Process message
process_message(body)
# Mark as processed
mark_processed(msg_id)
ch.basic_ack(delivery_tag=method.delivery_tag)
def already_processed(msg_id):
return db.execute("SELECT 1 FROM processed_messages WHERE msg_id = ?", msg_id).fetchone()
def mark_processed(msg_id):
db.execute("INSERT INTO processed_messages (msg_id, timestamp) VALUES (?, ?)", msg_id, time.time())Wrap processing + ack in database transaction. Both succeed or both fail.
def callback(ch, method, properties, body):
msg_id = properties.message_id
try:
with db.transaction():
# Check dedup
if already_processed(msg_id):
ch.basic_ack(delivery_tag=method.delivery_tag)
return
# Process
process_message(body)
# Mark processed
mark_processed(msg_id)
# Only ack AFTER transaction commits
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Transaction rolled back, message redelivered
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)Use natural unique key from business logic (e.g., order_id, user_id + timestamp).
CREATE TABLE orders (
order_id VARCHAR(255) PRIMARY KEY, -- Unique constraint prevents duplicates
user_id INT,
amount DECIMAL,
created_at TIMESTAMP
);
def process_order(order_data):
try:
db.execute("""
INSERT INTO orders (order_id, user_id, amount, created_at)
VALUES (?, ?, ?, ?)
""", order_data['order_id'], ...)
except IntegrityError:
# Duplicate order_id, already processed
print(f"[!] Order {order_data['order_id']} already exists")
return # Skip, don't failGet explicit confirmation from broker before considering message sent.
channel.confirm_delivery()
msg_id = str(uuid.uuid4())
try:
channel.basic_publish(
...,
properties=pika.BasicProperties(message_id=msg_id),
body=data,
mandatory=True
)
print(f"[✓] Message {msg_id} confirmed")
# No need to retry
except pika.exceptions.UnroutableError:
print(f"[✗] Message {msg_id} unroutable")
# Handle error (don't blindly retry)
except pika.exceptions.NackError:
print(f"[✗] Message {msg_id} nacked")
# Handle errorimport pika
import uuid
import json
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.confirm_delivery()
order = {
"order_id": str(uuid.uuid4()),
"user_id": 123,
"amount": 99.99
}
channel.basic_publish(
exchange='',
routing_key='orders',
properties=pika.BasicProperties(
message_id=order["order_id"],
delivery_mode=2
),
body=json.dumps(order).encode(),
mandatory=True
)
print(f"[✓] Order {order['order_id']} sent")import sqlite3
import json
db = sqlite3.connect('orders.db')
db.execute("""
CREATE TABLE IF NOT EXISTS orders (
order_id TEXT PRIMARY KEY,
user_id INT,
amount REAL,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
def callback(ch, method, properties, body):
order = json.loads(body.decode())
order_id = order['order_id']
try:
# Idempotent insert
db.execute("""
INSERT INTO orders (order_id, user_id, amount)
VALUES (?, ?, ?)
ON CONFLICT (order_id) DO NOTHING
""", (order_id, order['user_id'], order['amount']))
db.commit()
if db.total_changes == 0:
print(f"[!] Duplicate: {order_id}")
else:
print(f"[✓] Processed: {order_id}")
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"[✗] Error: {e}")
db.rollback()
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_consume(queue='orders', on_message_callback=callback)
channel.start_consuming()method.redelivered flag in callback| Strategy | When to Use |
|---|---|
| Idempotent operations | Always (best practice) |
| Message ID deduplication | When operations can't be idempotent |
| Transactional processing | Multi-step operations |
| Business-level dedup | Natural unique keys exist |
| Publisher confirms | Prevent publisher-side duplicates |
Golden Rule: At-least-once delivery is guaranteed. Exactly-once is YOUR responsibility via idempotence.