Back/Duplicate Messages

Problem: Duplicate Messages

Symptom

Same message processed multiple times. Order created twice, email sent twice, payment charged twice.

Impact: Data corruption, duplicate charges, angry users

Root Causes

1. Consumer Crash After Processing, Before Ack

Consumer processes message successfully, but crashes BEFORE sending ack. RabbitMQ redelivers message to another consumer.

def callback(ch, method, properties, body):
    # Process message
    create_order(body)  # ✓ Order created in database

    # Crash here before ack (network failure, OOM, etc.)
    ch.basic_ack(delivery_tag=method.delivery_tag)  # Never reached

# Result: RabbitMQ redelivers -> create_order() called again -> duplicate order

2. Publisher Retry After Network Failure

Message sent successfully, but network fails before confirmation received. Publisher retries, thinking it failed.

try:
    channel.basic_publish(...)
    # Message actually delivered, but network dies before TCP ack
except Exception:
    # Publisher thinks it failed, retries
    channel.basic_publish(...)  # Duplicate!

3. Slow Processing + Connection Timeout

Consumer takes too long. RabbitMQ assumes consumer dead, redelivers to another consumer. Both process same message.

def callback(ch, method, properties, body):
    # Takes 5 minutes (longer than heartbeat timeout)
    slow_operation(body)

    # Connection already closed by broker
    ch.basic_ack(...)  # Fails, message already redelivered

# Result: Another consumer also processing same message

Solutions

✅ Solution 1: Idempotent Operations

Make processing safe to repeat. Same operation twice = same result.

❌ Not Idempotent

def process(order_id):
    # Creates duplicate on retry
    db.execute("""
        INSERT INTO orders
        VALUES (?, ?)
    """, order_id, data)

def process(balance):
    # Charges twice
    balance -= 100
    return balance

✅ Idempotent

def process(order_id):
    # Safe to retry (INSERT IGNORE or ON CONFLICT DO NOTHING)
    db.execute("""
        INSERT INTO orders
        VALUES (?, ?)
        ON CONFLICT (order_id) DO NOTHING
    """, order_id, data)

def process(account_id, amount):
    # Deduplication check
    if not already_charged(account_id, transaction_id):
        charge(account_id, amount)
        mark_charged(transaction_id)

✅ Solution 2: Deduplication with Message ID

Track processed message IDs in database. Skip if already processed.

import uuid

# Producer: Add unique message ID
msg_id = str(uuid.uuid4())
channel.basic_publish(
    ...,
    properties=pika.BasicProperties(message_id=msg_id),
    body=data
)

# Consumer: Check if already processed
def callback(ch, method, properties, body):
    msg_id = properties.message_id

    # Check dedup table
    if already_processed(msg_id):
        print(f"[!] Duplicate detected: {msg_id}")
        ch.basic_ack(delivery_tag=method.delivery_tag)
        return

    # Process message
    process_message(body)

    # Mark as processed
    mark_processed(msg_id)

    ch.basic_ack(delivery_tag=method.delivery_tag)

def already_processed(msg_id):
    return db.execute("SELECT 1 FROM processed_messages WHERE msg_id = ?", msg_id).fetchone()

def mark_processed(msg_id):
    db.execute("INSERT INTO processed_messages (msg_id, timestamp) VALUES (?, ?)", msg_id, time.time())

✅ Solution 3: Transactional Processing

Wrap processing + ack in database transaction. Both succeed or both fail.

def callback(ch, method, properties, body):
    msg_id = properties.message_id

    try:
        with db.transaction():
            # Check dedup
            if already_processed(msg_id):
                ch.basic_ack(delivery_tag=method.delivery_tag)
                return

            # Process
            process_message(body)

            # Mark processed
            mark_processed(msg_id)

        # Only ack AFTER transaction commits
        ch.basic_ack(delivery_tag=method.delivery_tag)

    except Exception as e:
        # Transaction rolled back, message redelivered
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

✅ Solution 4: Business-Level Deduplication

Use natural unique key from business logic (e.g., order_id, user_id + timestamp).

CREATE TABLE orders (
    order_id VARCHAR(255) PRIMARY KEY,  -- Unique constraint prevents duplicates
    user_id INT,
    amount DECIMAL,
    created_at TIMESTAMP
);

def process_order(order_data):
    try:
        db.execute("""
            INSERT INTO orders (order_id, user_id, amount, created_at)
            VALUES (?, ?, ?, ?)
        """, order_data['order_id'], ...)
    except IntegrityError:
        # Duplicate order_id, already processed
        print(f"[!] Order {order_data['order_id']} already exists")
        return  # Skip, don't fail

✅ Solution 5: Publisher Confirms (Prevent Publisher Retries)

Get explicit confirmation from broker before considering message sent.

channel.confirm_delivery()

msg_id = str(uuid.uuid4())
try:
    channel.basic_publish(
        ...,
        properties=pika.BasicProperties(message_id=msg_id),
        body=data,
        mandatory=True
    )
    print(f"[✓] Message {msg_id} confirmed")
    # No need to retry
except pika.exceptions.UnroutableError:
    print(f"[✗] Message {msg_id} unroutable")
    # Handle error (don't blindly retry)
except pika.exceptions.NackError:
    print(f"[✗] Message {msg_id} nacked")
    # Handle error

Complete Deduplication Example

Producer (with unique ID)

import pika
import uuid
import json

connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.confirm_delivery()

order = {
    "order_id": str(uuid.uuid4()),
    "user_id": 123,
    "amount": 99.99
}

channel.basic_publish(
    exchange='',
    routing_key='orders',
    properties=pika.BasicProperties(
        message_id=order["order_id"],
        delivery_mode=2
    ),
    body=json.dumps(order).encode(),
    mandatory=True
)

print(f"[✓] Order {order['order_id']} sent")

Consumer (idempotent)

import sqlite3
import json

db = sqlite3.connect('orders.db')
db.execute("""
    CREATE TABLE IF NOT EXISTS orders (
        order_id TEXT PRIMARY KEY,
        user_id INT,
        amount REAL,
        processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
""")

def callback(ch, method, properties, body):
    order = json.loads(body.decode())
    order_id = order['order_id']

    try:
        # Idempotent insert
        db.execute("""
            INSERT INTO orders (order_id, user_id, amount)
            VALUES (?, ?, ?)
            ON CONFLICT (order_id) DO NOTHING
        """, (order_id, order['user_id'], order['amount']))

        db.commit()

        if db.total_changes == 0:
            print(f"[!] Duplicate: {order_id}")
        else:
            print(f"[✓] Processed: {order_id}")

        ch.basic_ack(delivery_tag=method.delivery_tag)

    except Exception as e:
        print(f"[✗] Error: {e}")
        db.rollback()
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

channel.basic_consume(queue='orders', on_message_callback=callback)
channel.start_consuming()

Testing Deduplication

  1. Test Consumer Crash:
    • Send message
    • Kill consumer after processing, before ack (Ctrl+C)
    • Start consumer again
    • Verify message processed only once (check database)
  2. Test Redelivery Flag:
    • Check method.redelivered flag in callback
    • Log when receiving redelivered messages
  3. Stress Test:
    • Send 1000 messages with random crashes
    • Count final database rows
    • Should equal exactly 1000 (no duplicates, no losses)

Summary

StrategyWhen to Use
Idempotent operationsAlways (best practice)
Message ID deduplicationWhen operations can't be idempotent
Transactional processingMulti-step operations
Business-level dedupNatural unique keys exist
Publisher confirmsPrevent publisher-side duplicates

Golden Rule: At-least-once delivery is guaranteed. Exactly-once is YOUR responsibility via idempotence.