Back/Performance Issues

Problem: Slow Performance

Symptoms

  • Messages pile up in queue faster than consumed
  • High latency between publish and consume
  • Low throughput (messages/second)
  • Consumer CPU idle while queue grows

Root Causes & Solutions

1. prefetch_count Too Low

Consumer waits idle while broker sends one message at a time. Network round-trip overhead kills throughput.

❌ Bad (prefetch=1)

channel.basic_qos(prefetch_count=1)

# Consumer idle during network RTT
# [Process msg] [Wait] [Process] [Wait]
# Throughput: ~100 msg/s

✅ Good (prefetch=10-50)

channel.basic_qos(prefetch_count=20)

# Consumer has buffer of messages
# [Process][Process][Process]...
# Throughput: ~2000 msg/s

2. New Connection Per Message

Connection setup expensive (TCP handshake, auth, channel creation). Reuse connections.

❌ Bad

for msg in messages:
    conn = pika.BlockingConnection(...)
    channel = conn.channel()
    channel.basic_publish(...)
    conn.close()
# 1000 messages = 1000 connections

✅ Good

conn = pika.BlockingConnection(...)
channel = conn.channel()
for msg in messages:
    channel.basic_publish(...)
conn.close()
# 1000 messages = 1 connection

3. Single Consumer, Heavy Processing

One consumer can't keep up. Add more consumer instances (horizontal scaling).

# Terminal 1
python consumer.py

# Terminal 2
python consumer.py  # Add second instance

# Terminal 3
python consumer.py  # Add third instance

# Queue distributes messages round-robin
# Throughput 3x faster

4. Synchronous Processing in Callback

Callback blocks on slow I/O. Use threading to process in parallel.

❌ Slow (synchronous)

def callback(ch, method, props, body):
    # Blocks entire consumer
    result = slow_api_call(body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

# Processes 1 msg at a time

✅ Fast (threaded)

def callback(ch, method, props, body):
    threading.Thread(
        target=process_async,
        args=(ch, method, body)
    ).start()

def process_async(ch, method, body):
    result = slow_api_call(body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

# Processes multiple msgs in parallel

5. No Message Batching

Publishing messages one-by-one. Use batching + confirm_delivery().

channel.confirm_delivery()

batch = []
for msg in messages:
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=msg.encode()
    )
    batch.append(msg)

    # Flush batch every 100 messages
    if len(batch) >= 100:
        # All published in one network round-trip
        batch.clear()

# Much faster than individual publishes

6. Persistent Messages on Every Message

Persistence adds disk I/O overhead. Use transient messages for non-critical data.

Persistent (slower)

channel.basic_publish(
    ...,
    properties=pika.BasicProperties(
        delivery_mode=2  # Disk write
    )
)
# ~1000 msg/s

Transient (faster)

channel.basic_publish(
    ...,
    properties=pika.BasicProperties(
        delivery_mode=1  # RAM only
    )
)
# ~10000 msg/s

7. Network Latency

High RTT between client and broker. Run consumers closer to broker.

# Check latency
ping concurp1.isc.heia-fr.ch

# If >50ms, consider:
# - Running consumers in same datacenter as broker
# - Increasing prefetch_count to hide latency
# - Using connection pooling

Performance Tuning Guide

Optimal Settings by Use Case

Use Caseprefetch_countPersistenceWorkers
High throughput50-100TransientMany (10+)
Low latency1-5TransientFew (1-3)
Reliable processing1-10PersistentMedium (3-5)
Heavy processing5-20PersistentMany + threading

Benchmarking

Simple Throughput Test

import time

# Producer
start = time.time()
for i in range(10000):
    channel.basic_publish(...)
duration = time.time() - start
print(f"Throughput: {10000/duration:.0f} msg/s")

# Consumer
count = 0
start = time.time()

def callback(ch, method, props, body):
    global count
    count += 1
    if count == 10000:
        duration = time.time() - start
        print(f"Throughput: {10000/duration:.0f} msg/s")
    ch.basic_ack(delivery_tag=method.delivery_tag)

Monitoring

Key Metrics

  • Queue depth: Messages waiting to be consumed
  • Publish rate: Messages/sec incoming
  • Consume rate: Messages/sec processed
  • Consumer utilization: % time processing vs idle
  • Message age: Time from publish to consume
# Check queue stats
rabbitmqctl list_queues name messages messages_ready messages_unacknowledged

# Check rates
rabbitmqctl list_queues name message_stats.publish message_stats.deliver

Quick Wins Summary

  1. Increase prefetch_count (10-50 for most workloads)
  2. Reuse connections (don't create new per message)
  3. Add more consumer instances (horizontal scaling)
  4. Use threading for I/O-bound processing
  5. Use transient messages for non-critical data
  6. Batch publish operations
  7. Run consumers close to broker (reduce network latency)