Work Queue (aka Task Queue) distributes time-consuming tasks among multiple workers. Main idea: avoid doing resource-intensive task immediately and having to wait for it to complete. Instead, schedule task to be done later.
Use Case:
[Producer] --messages--> [Queue] --> [Worker 1]
|
|--> [Worker 2]
|
|--> [Worker N]prefetch_count=1 to prevent overloading one workerimport pika
# Connection setup
conn_param = pika.ConnectionParameters(
host='concurp1.isc.heia-fr.ch',
port=5072,
credentials=pika.PlainCredentials('guest', 'guest')
)
connection = pika.BlockingConnection(conn_param)
channel = connection.channel()
# Declare durable queue (survives broker restart)
channel.queue_declare(queue='task_queue', durable=True)
# Publish persistent message
message = "Task data: process this image"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message.encode(),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
print(f"[x] Sent '{message}'")
connection.close()Important: Set durable=True AND delivery_mode=2 to ensure queue and messages survive broker restart.
import pika
import time
conn_param = pika.ConnectionParameters(
host='concurp1.isc.heia-fr.ch',
port=5072,
credentials=pika.PlainCredentials('guest', 'guest')
)
connection = pika.BlockingConnection(conn_param)
channel = connection.channel()
# Same queue declaration (idempotent)
channel.queue_declare(queue='task_queue', durable=True)
def callback(ch, method, properties, body):
message = body.decode()
print(f"[x] Received '{message}'")
# Simulate work
time.sleep(message.count('.'))
print(f"[x] Done processing '{message}'")
# Manual acknowledgment (CRITICAL)
ch.basic_ack(delivery_tag=method.delivery_tag)
# Fair dispatch: don't give more than 1 message to worker at a time
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue='task_queue',
on_message_callback=callback,
auto_ack=False # manual ack required
)
print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()| Setting | Purpose | Default |
|---|---|---|
durable=True | Queue survives broker restart | False |
delivery_mode=2 | Message persists to disk | 1 (transient) |
auto_ack=False | Manual acknowledgment (prevents loss) | False |
prefetch_count=1 | Fair dispatch (one task at a time) | 0 (unlimited) |
❌ Forgetting manual ack
If worker crashes before ack, message is lost forever with auto_ack=True
❌ Not setting prefetch_count
One fast worker gets all messages, slow workers stay idle. Use prefetch_count=1
❌ Durable queue but transient messages
Queue survives restart, but messages don't. Need both durable=True AND delivery_mode=2