RabbitMQ
Reliable message broker with advanced routing, exchanges, and queues.
5 min read
RabbitMQ Overview#
RabbitMQ is a robust message broker supporting multiple messaging patterns:
Producer → Exchange → Queue → Consumer
↓
Routing Key
Setup#
bash
# Docker
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
# Management UI at http://localhost:15672 (guest/guest)
# Install client
npm install amqplib
Basic Producer/Consumer#
javascript
// src/lib/rabbitmq.js
import amqp from 'amqplib';
let connection = null;
let channel = null;
export async function connect() {
connection = await amqp.connect(process.env.RABBITMQ_URL || 'amqp://localhost');
channel = await connection.createChannel();
return channel;
}
export async function getChannel() {
if (!channel) {
await connect();
}
return channel;
}
export async function close() {
await channel?.close();
await connection?.close();
}
Producer#
javascript
// src/producers/orderProducer.js
import { getChannel } from '../lib/rabbitmq.js';
const QUEUE = 'orders';
export async function sendOrder(order) {
const channel = await getChannel();
// Ensure queue exists
await channel.assertQueue(QUEUE, {
durable: true, // Survive broker restart
});
// Send message
channel.sendToQueue(
QUEUE,
Buffer.from(JSON.stringify(order)),
{
persistent: true, // Survive broker restart
contentType: 'application/json',
}
);
console.log(`Sent order: ${order.id}`);
}
// Usage
await sendOrder({
id: 'order_123',
userId: 'user_456',
items: [{ productId: 'prod_1', quantity: 2 }],
total: 59.99,
});
Consumer#
javascript
// src/consumers/orderConsumer.js
import { getChannel } from '../lib/rabbitmq.js';
const QUEUE = 'orders';
export async function startOrderConsumer() {
const channel = await getChannel();
await channel.assertQueue(QUEUE, { durable: true });
// Process one message at a time
channel.prefetch(1);
console.log(`Waiting for orders in ${QUEUE}...`);
channel.consume(QUEUE, async (msg) => {
if (!msg) return;
try {
const order = JSON.parse(msg.content.toString());
console.log(`Processing order: ${order.id}`);
await processOrder(order);
// Acknowledge success
channel.ack(msg);
console.log(`Order ${order.id} processed`);
} catch (error) {
console.error('Error processing order:', error);
// Reject and requeue (or send to DLQ)
channel.nack(msg, false, false); // Don't requeue
}
});
}
async function processOrder(order) {
// Process the order...
await new Promise(resolve => setTimeout(resolve, 1000));
}
Exchange Types#
Direct Exchange#
Route by exact routing key match:
javascript
// Producer
const EXCHANGE = 'notifications';
async function sendNotification(type, notification) {
const channel = await getChannel();
await channel.assertExchange(EXCHANGE, 'direct', { durable: true });
channel.publish(
EXCHANGE,
type, // Routing key: 'email', 'sms', 'push'
Buffer.from(JSON.stringify(notification)),
{ persistent: true }
);
}
// Consumer
async function startEmailConsumer() {
const channel = await getChannel();
await channel.assertExchange(EXCHANGE, 'direct', { durable: true });
const q = await channel.assertQueue('email_notifications', { durable: true });
// Bind to 'email' routing key
await channel.bindQueue(q.queue, EXCHANGE, 'email');
channel.consume(q.queue, async (msg) => {
const notification = JSON.parse(msg.content.toString());
await sendEmail(notification);
channel.ack(msg);
});
}
Fanout Exchange#
Broadcast to all bound queues:
javascript
// Producer - broadcast event to all services
const EXCHANGE = 'events';
async function broadcastEvent(event) {
const channel = await getChannel();
await channel.assertExchange(EXCHANGE, 'fanout', { durable: true });
channel.publish(
EXCHANGE,
'', // Routing key ignored for fanout
Buffer.from(JSON.stringify(event))
);
}
// Consumer 1 - Audit service
async function startAuditConsumer() {
const channel = await getChannel();
await channel.assertExchange(EXCHANGE, 'fanout', { durable: true });
const q = await channel.assertQueue('audit_events', { durable: true });
await channel.bindQueue(q.queue, EXCHANGE, '');
channel.consume(q.queue, async (msg) => {
await saveToAuditLog(JSON.parse(msg.content.toString()));
channel.ack(msg);
});
}
// Consumer 2 - Analytics service
async function startAnalyticsConsumer() {
const channel = await getChannel();
await channel.assertExchange(EXCHANGE, 'fanout', { durable: true });
const q = await channel.assertQueue('analytics_events', { durable: true });
await channel.bindQueue(q.queue, EXCHANGE, '');
channel.consume(q.queue, async (msg) => {
await trackAnalytics(JSON.parse(msg.content.toString()));
channel.ack(msg);
});
}
Topic Exchange#
Route by pattern matching:
javascript
// Producer
const EXCHANGE = 'logs';
async function sendLog(level, service, message) {
const channel = await getChannel();
await channel.assertExchange(EXCHANGE, 'topic', { durable: true });
const routingKey = `${level}.${service}`; // e.g., 'error.payment'
channel.publish(
EXCHANGE,
routingKey,
Buffer.from(JSON.stringify({ level, service, message, timestamp: Date.now() }))
);
}
// Consumer - all errors
async function startErrorConsumer() {
const channel = await getChannel();
await channel.assertExchange(EXCHANGE, 'topic', { durable: true });
const q = await channel.assertQueue('all_errors', { durable: true });
await channel.bindQueue(q.queue, EXCHANGE, 'error.*'); // Match error.anything
channel.consume(q.queue, (msg) => {
console.log('ERROR:', msg.content.toString());
channel.ack(msg);
});
}
// Consumer - all payment logs
async function startPaymentLogsConsumer() {
const channel = await getChannel();
await channel.assertExchange(EXCHANGE, 'topic', { durable: true });
const q = await channel.assertQueue('payment_logs', { durable: true });
await channel.bindQueue(q.queue, EXCHANGE, '*.payment'); // Match anything.payment
channel.consume(q.queue, (msg) => {
console.log('PAYMENT LOG:', msg.content.toString());
channel.ack(msg);
});
}
Dead Letter Queue#
Handle failed messages:
javascript
async function setupQueues() {
const channel = await getChannel();
// Dead letter exchange
await channel.assertExchange('dlx', 'direct', { durable: true });
await channel.assertQueue('dead_letters', { durable: true });
await channel.bindQueue('dead_letters', 'dlx', 'orders');
// Main queue with DLQ settings
await channel.assertQueue('orders', {
durable: true,
deadLetterExchange: 'dlx',
deadLetterRoutingKey: 'orders',
messageTtl: 86400000, // 24 hours
});
}
// Consumer with retry logic
channel.consume('orders', async (msg) => {
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0);
try {
await processOrder(JSON.parse(msg.content.toString()));
channel.ack(msg);
} catch (error) {
if (retryCount < 3) {
// Retry with delay
setTimeout(() => {
channel.sendToQueue('orders', msg.content, {
headers: { 'x-retry-count': retryCount + 1 },
});
channel.ack(msg);
}, Math.pow(2, retryCount) * 1000);
} else {
// Send to DLQ
channel.nack(msg, false, false);
}
}
});
RPC Pattern#
javascript
// RPC Server
async function startRPCServer() {
const channel = await getChannel();
const queue = 'rpc_queue';
await channel.assertQueue(queue, { durable: false });
channel.prefetch(1);
channel.consume(queue, async (msg) => {
const n = parseInt(msg.content.toString());
const result = fibonacci(n);
channel.sendToQueue(
msg.properties.replyTo,
Buffer.from(result.toString()),
{ correlationId: msg.properties.correlationId }
);
channel.ack(msg);
});
}
// RPC Client
async function callFibonacci(n) {
const channel = await getChannel();
const correlationId = generateUuid();
const q = await channel.assertQueue('', { exclusive: true });
return new Promise((resolve) => {
channel.consume(q.queue, (msg) => {
if (msg.properties.correlationId === correlationId) {
resolve(parseInt(msg.content.toString()));
}
}, { noAck: true });
channel.sendToQueue('rpc_queue', Buffer.from(n.toString()), {
correlationId,
replyTo: q.queue,
});
});
}
Key Takeaways#
- Use exchanges - Route messages with direct, fanout, or topic
- Acknowledge messages - Confirm processing before removal
- Handle failures - Implement dead letter queues
- Set durability - Survive broker restarts
- Use prefetch - Control consumer throughput
Continue Learning
Ready to level up your skills?
Explore more guides and tutorials to deepen your understanding and become a better developer.