Back/Deadlock Problem

Problem: Deadlock in RabbitMQ

Symptom

System hangs. No messages processed. All consumers blocked waiting. Nothing progresses.

Impact: Complete system freeze, requires manual intervention (restart)

Root Causes

1. RPC Call Cycle

Service A calls Service B via RPC. Service B calls Service A via RPC. Both block waiting for response → deadlock.

# Service A
def handle_request(ch, method, props, body):
    result = rpc_client.call_service_b()  # Blocks here
    return result

# Service B
def handle_request(ch, method, props, body):
    result = rpc_client.call_service_a()  # Blocks here
    return result

Timeline:
1. Service A receives request, calls Service B (blocks waiting)
2. Service B receives request, calls Service A (blocks waiting)
3. Service A can't respond (blocked on B)
4. Service B can't respond (blocked on A)
5. DEADLOCK ☠️

2. Queue Full + Blocking Publisher

Queue reaches max length. Publisher blocks trying to send. Consumer can't receive because waiting for lock held by publisher.

# Queue max length = 100
channel.queue_declare(
    queue='limited_queue',
    arguments={'x-max-length': 100}
)

# Producer sends 101 messages
for i in range(101):
    channel.basic_publish(...)  # Blocks on msg 101 (queue full)

# Consumer can't process because producer holds connection lock

3. Single-Threaded Consumer Making Blocking RPC

Consumer callback makes blocking RPC call on SAME connection. Connection blocked by RPC, can't process incoming RPC response.

# ❌ BAD - Same connection for consume + RPC
connection = pika.BlockingConnection(...)
channel = connection.channel()

def callback(ch, method, properties, body):
    # This blocks the connection
    result = rpc_client.call(data)  # Uses SAME connection
    # RPC response can't arrive (connection blocked by this callback)

channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()  # DEADLOCK

4. Unacknowledged Messages Fill prefetch Buffer

Consumer prefetches 100 messages. Processing first message blocks waiting for external resource. Other 99 messages can't be processed (no ack). New messages can't arrive (prefetch full).

channel.basic_qos(prefetch_count=100)

def callback(ch, method, properties, body):
    # First message blocks on slow external call
    external_api.slow_call()  # Takes 10 minutes
    # Other 99 prefetched messages can't be processed
    # Ack never happens, prefetch buffer stays full
    ch.basic_ack(delivery_tag=method.delivery_tag)

Solutions

✅ Solution 1: Break RPC Cycles (Async Pattern)

Don't block waiting for response. Use callback queues.

# Service A
def handle_request(ch, method, props, body):
    # Send async request to B (don't wait)
    channel.basic_publish(
        exchange='',
        routing_key='service_b_queue',
        body=body,
        properties=pika.BasicProperties(reply_to='service_a_callback')
    )
    # Process immediately, response comes to callback queue
    ch.basic_ack(delivery_tag=method.delivery_tag)

# No blocking = no deadlock

✅ Solution 2: Separate Connections for Publish/Consume

Use different connection for RPC client vs consumer.

# Connection 1: For consuming
consume_conn = pika.BlockingConnection(...)
consume_channel = consume_conn.channel()

# Connection 2: For RPC client (separate!)
rpc_conn = pika.BlockingConnection(...)
rpc_channel = rpc_conn.channel()
rpc_client = RpcClient(rpc_channel)

def callback(ch, method, properties, body):
    # RPC uses different connection, no blocking
    result = rpc_client.call(data)
    ch.basic_ack(delivery_tag=method.delivery_tag)

consume_channel.basic_consume(queue='task_queue', on_message_callback=callback)
consume_channel.start_consuming()

✅ Solution 3: Use Timeouts

Add timeout to RPC calls. Fail fast instead of blocking forever.

class RpcClient:
    def call(self, data, timeout=5.0):
        self.response = None
        self.corr_id = str(uuid.uuid4())

        channel.basic_publish(...)

        # Wait with timeout
        start = time.time()
        while self.response is None:
            if time.time() - start > timeout:
                raise TimeoutError("RPC call timed out")
            self.connection.process_data_events(time_limit=0.1)

        return self.response

✅ Solution 4: Lower prefetch_count

Prevent buffer from filling up. Allow other consumers to take over.

# Instead of prefetch_count=100
channel.basic_qos(prefetch_count=1)  # Only 1 message at a time

# If processing blocks, other consumers can take remaining messages

✅ Solution 5: Use Threading for Blocking Operations

Move blocking call to separate thread. Main thread acks immediately.

import threading

def callback(ch, method, properties, body):
    # Ack immediately
    ch.basic_ack(delivery_tag=method.delivery_tag)

    # Process in background thread
    thread = threading.Thread(
        target=process_blocking_task,
        args=(body,)
    )
    thread.start()

def process_blocking_task(body):
    # Blocking operation here (separate thread, won't block consumer)
    external_api.slow_call()

Detection & Debugging

Signs of Deadlock

  • Messages in queue but not processed
  • Consumers running but no activity
  • CPU usage near 0% (all threads blocked)
  • No error messages (silent hang)

Debug Tools

# Check queue status
rabbitmqctl list_queues name messages consumers

# Check connections
rabbitmqctl list_connections

# Thread dump (Python)
import threading
print(threading.enumerate())
for thread in threading.enumerate():
    print(thread, thread.is_alive())

Prevention Checklist

  • ✓ Never make blocking RPC calls within consumer callback on same connection
  • ✓ Use separate connections for publish vs consume
  • ✓ Add timeouts to all RPC calls
  • ✓ Keep prefetch_count low (1-10)
  • ✓ Avoid circular RPC dependencies between services
  • ✓ Use async patterns instead of blocking waits
  • ✓ Move slow operations to background threads

Summary

CauseSolution
RPC cycleUse async pattern, no blocking waits
Same connection for RPC + consumeSeparate connections
Blocking callbackMove to thread, ack immediately
Prefetch buffer fullLower prefetch_count
Infinite waitAdd timeouts