Apache Kafka
High-throughput distributed event streaming platform for real-time data pipelines.
5 min read
Kafka Overview#
Kafka is designed for high-throughput, real-time event streaming:
Producers → [Topic: Partitions] → Consumer Groups
P0 P1 P2
Key Concepts#
| Concept | Description |
|---|---|
| Topic | Category/feed of messages |
| Partition | Ordered, immutable sequence within topic |
| Offset | Position of message in partition |
| Consumer Group | Group of consumers sharing work |
| Broker | Kafka server in the cluster |
Setup#
bash
# Docker Compose
# docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
bash
npm install kafkajs
Producer#
javascript
// src/lib/kafka.js
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
});
export const producer = kafka.producer();
export const consumer = kafka.consumer({ groupId: 'my-group' });
export async function connectProducer() {
await producer.connect();
console.log('Kafka producer connected');
}
export async function disconnectProducer() {
await producer.disconnect();
}
javascript
// src/producers/eventProducer.js
import { producer, connectProducer } from '../lib/kafka.js';
await connectProducer();
export async function sendEvent(topic, event) {
await producer.send({
topic,
messages: [
{
key: event.id, // Ensures same key goes to same partition
value: JSON.stringify(event),
headers: {
'event-type': event.type,
'timestamp': Date.now().toString(),
},
},
],
});
}
// Batch send for efficiency
export async function sendEvents(topic, events) {
await producer.send({
topic,
messages: events.map(event => ({
key: event.id,
value: JSON.stringify(event),
})),
});
}
// Usage
await sendEvent('orders', {
id: 'order_123',
type: 'order.created',
userId: 'user_456',
total: 99.99,
createdAt: new Date().toISOString(),
});
Consumer#
javascript
// src/consumers/orderConsumer.js
import { consumer } from '../lib/kafka.js';
export async function startOrderConsumer() {
await consumer.connect();
await consumer.subscribe({
topic: 'orders',
fromBeginning: false, // Start from latest
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString());
console.log({
topic,
partition,
offset: message.offset,
key: message.key?.toString(),
event,
});
await processOrder(event);
},
});
}
async function processOrder(order) {
switch (order.type) {
case 'order.created':
await handleOrderCreated(order);
break;
case 'order.paid':
await handleOrderPaid(order);
break;
case 'order.shipped':
await handleOrderShipped(order);
break;
}
}
Consumer Groups#
Multiple consumers in a group share partitions:
javascript
// Consumer 1
const consumer1 = kafka.consumer({ groupId: 'order-processors' });
// Consumer 2
const consumer2 = kafka.consumer({ groupId: 'order-processors' });
// If topic has 4 partitions:
// Consumer 1 gets: P0, P1
// Consumer 2 gets: P2, P3
javascript
// Scale horizontally
// src/workers/orderWorker.js
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: `order-worker-${process.env.WORKER_ID}`,
brokers: ['localhost:9092'],
});
const consumer = kafka.consumer({
groupId: 'order-processors',
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
async function start() {
await consumer.connect();
await consumer.subscribe({ topic: 'orders' });
await consumer.run({
partitionsConsumedConcurrently: 3, // Process 3 partitions concurrently
eachMessage: async ({ message }) => {
await processOrder(JSON.parse(message.value.toString()));
},
});
}
start().catch(console.error);
Batch Processing#
Process messages in batches for efficiency:
javascript
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning }) => {
for (const message of batch.messages) {
if (!isRunning()) break;
await processMessage(message);
resolveOffset(message.offset);
await heartbeat();
}
},
});
Exactly-Once Semantics#
javascript
// Transactional producer
const producer = kafka.producer({
transactionalId: 'my-transactional-producer',
maxInFlightRequests: 1,
idempotent: true,
});
await producer.connect();
// Send with transaction
const transaction = await producer.transaction();
try {
await transaction.send({
topic: 'orders',
messages: [{ value: JSON.stringify(order) }],
});
await transaction.send({
topic: 'inventory',
messages: [{ value: JSON.stringify(inventoryUpdate) }],
});
await transaction.commit();
} catch (error) {
await transaction.abort();
throw error;
}
Topic Administration#
javascript
import { Kafka } from 'kafkajs';
const kafka = new Kafka({ brokers: ['localhost:9092'] });
const admin = kafka.admin();
async function setupTopics() {
await admin.connect();
await admin.createTopics({
topics: [
{
topic: 'orders',
numPartitions: 6,
replicationFactor: 1,
configEntries: [
{ name: 'retention.ms', value: '604800000' }, // 7 days
{ name: 'cleanup.policy', value: 'delete' },
],
},
{
topic: 'user-events',
numPartitions: 3,
replicationFactor: 1,
},
],
});
// List topics
const topics = await admin.listTopics();
console.log('Topics:', topics);
await admin.disconnect();
}
Event-Driven Architecture#
javascript
// src/events/eventBus.js
import { producer, consumer } from '../lib/kafka.js';
const handlers = new Map();
export function on(eventType, handler) {
if (!handlers.has(eventType)) {
handlers.set(eventType, []);
}
handlers.get(eventType).push(handler);
}
export async function emit(eventType, payload) {
await producer.send({
topic: 'events',
messages: [{
key: payload.aggregateId,
value: JSON.stringify({
type: eventType,
payload,
timestamp: Date.now(),
}),
}],
});
}
export async function startEventListener() {
await consumer.subscribe({ topic: 'events' });
await consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value.toString());
const eventHandlers = handlers.get(event.type) || [];
for (const handler of eventHandlers) {
try {
await handler(event.payload);
} catch (error) {
console.error(`Handler error for ${event.type}:`, error);
}
}
},
});
}
// Usage
on('order.created', async (order) => {
await sendOrderConfirmationEmail(order);
});
on('order.created', async (order) => {
await updateInventory(order);
});
await emit('order.created', {
aggregateId: 'order_123',
userId: 'user_456',
items: [...],
});
Kafka Streams Alternative (Node.js)#
For stream processing, use consumer with windowing:
javascript
// Simple windowed aggregation
const windows = new Map();
const WINDOW_SIZE = 60000; // 1 minute
await consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value.toString());
const windowKey = Math.floor(Date.now() / WINDOW_SIZE);
if (!windows.has(windowKey)) {
windows.set(windowKey, { count: 0, total: 0 });
}
const window = windows.get(windowKey);
window.count++;
window.total += event.amount;
// Clean old windows
for (const [key] of windows) {
if (key < windowKey - 5) windows.delete(key);
}
},
});
Key Takeaways#
- Use partitions - Scale consumers horizontally
- Consumer groups - Share work across instances
- Key messages - Same key = same partition = ordering
- Batch for throughput - Process messages in batches
- Handle offsets - Control exactly-once semantics
Continue Learning
Ready to level up your skills?
Explore more guides and tutorials to deepen your understanding and become a better developer.