AMQPConnectionError: Connection refusedStreamLostError: Stream connection lostConnectionClosed: (320) CONNECTION_FORCEDWiFi drops, VPN disconnects, firewall kills idle connection
# Error pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(54, 'Connection reset by peer')
Broker doesn't receive heartbeat from client. Assumes client dead, closes connection.
# Default heartbeat: 60s # If client doesn't send heartbeat in 120s (2x interval), broker kills connection # Caused by: # - Blocking operation in callback (no heartbeat sent) # - CPU overload (no time to send heartbeat) # - Network congestion
# Error pika.exceptions.AMQPConnectionError: (403, "ACCESS_REFUSED - Login was refused") # Or pika.exceptions.ChannelClosedByBroker: (403, 'ACCESS_REFUSED')
RabbitMQ server restarts for maintenance or crashes. All connections dropped.
import pika
import time
def create_connection():
while True:
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='concurp1.isc.heia-fr.ch',
port=5072,
credentials=pika.PlainCredentials('guest', 'guest'),
heartbeat=600, # 10 min
blocked_connection_timeout=300
)
)
print("[✓] Connected to RabbitMQ")
return connection
except pika.exceptions.AMQPConnectionError as e:
print(f"[✗] Connection failed: {e}")
print("[⟳] Retrying in 5s...")
time.sleep(5)
# Usage
connection = create_connection()
channel = connection.channel()
# In consumer loop
try:
channel.start_consuming()
except (pika.exceptions.StreamLostError, pika.exceptions.AMQPConnectionError):
print("[✗] Connection lost, reconnecting...")
connection = create_connection()
channel = connection.channel()
# Re-setup consumer
channel.basic_consume(...)
channel.start_consuming()For long-running tasks, increase heartbeat to avoid timeout during processing
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='concurp1.isc.heia-fr.ch',
port=5072,
credentials=pika.PlainCredentials('guest', 'guest'),
heartbeat=600 # 10 minutes (default is 60s)
)
)Reuse connections across multiple operations. Don't create new connection per message.
class ConnectionPool:
def __init__(self):
self.connection = None
self.channel = None
def get_channel(self):
if self.connection is None or self.connection.is_closed:
self.connection = create_connection()
self.channel = self.connection.channel()
return self.channel
pool = ConnectionPool()
# Reuse channel
for msg in messages:
channel = pool.get_channel()
channel.basic_publish(...)def is_connection_open(connection):
return connection and connection.is_open
def ensure_connection():
global connection, channel
if not is_connection_open(connection):
print("[⟳] Reconnecting...")
connection = create_connection()
channel = connection.channel()
setup_consumer(channel)
# Check periodically
import threading
def health_check():
while True:
time.sleep(30) # Check every 30s
ensure_connection()
threading.Thread(target=health_check, daemon=True).start()import signal
import sys
def signal_handler(sig, frame):
print("[!] Shutting down gracefully...")
try:
channel.stop_consuming()
connection.close()
except Exception as e:
print(f"[✗] Error during shutdown: {e}")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Now Ctrl+C closes connection cleanly| Parameter | Default | Recommended | Purpose |
|---|---|---|---|
heartbeat | 60s | 600s | Detect dead connections |
blocked_connection_timeout | None | 300s | Timeout when broker blocks |
connection_attempts | 1 | 3-5 | Retry on initial connect |
retry_delay | 2s | 5s | Wait between retries |
socket_timeout | 10s | 10s | TCP socket timeout |
import pika
import time
import signal
import sys
class RobustConsumer:
def __init__(self):
self.connection = None
self.channel = None
self.should_stop = False
def connect(self):
while not self.should_stop:
try:
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='concurp1.isc.heia-fr.ch',
port=5072,
credentials=pika.PlainCredentials('guest', 'guest'),
heartbeat=600,
blocked_connection_timeout=300,
connection_attempts=3,
retry_delay=5
)
)
self.channel = self.connection.channel()
self.channel.queue_declare(queue='task_queue', durable=True)
self.channel.basic_qos(prefetch_count=1)
print("[✓] Connected")
return True
except Exception as e:
print(f"[✗] Connection failed: {e}")
print("[⟳] Retrying in 5s...")
time.sleep(5)
return False
def callback(self, ch, method, properties, body):
try:
print(f"[x] Processing {body.decode()}")
time.sleep(1)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"[✗] Error: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def start(self):
while not self.should_stop:
if not self.connect():
break
self.channel.basic_consume(
queue='task_queue',
on_message_callback=self.callback
)
try:
print("[*] Consuming...")
self.channel.start_consuming()
except (pika.exceptions.StreamLostError, pika.exceptions.AMQPConnectionError) as e:
print(f"[✗] Connection lost: {e}")
print("[⟳] Reconnecting...")
continue
except KeyboardInterrupt:
print("[!] Interrupted")
self.stop()
def stop(self):
self.should_stop = True
if self.channel:
self.channel.stop_consuming()
if self.connection:
self.connection.close()
print("[✓] Stopped")
# Run
consumer = RobustConsumer()
consumer.start()time.sleep(30) in callback