Yêu cầu và lựa chọn setup
Cần có: Docker Desktop (hoặc Docker Engine + Compose plugin) phiên bản 24+. Không cần Java, không cần cài gì thêm.
KRaft mode (không cần ZooKeeper) là lựa chọn đúng đắn cho project mới. Setup gọn hơn,
ít service hơn, ít bug hơn. Image apache/kafka hỗ trợ KRaft từ Kafka 3.7+.
| Service | Image | Port | Mục đích |
|---|---|---|---|
| kafka | apache/kafka:3.9.0 | 9092 | Kafka broker (KRaft mode, no ZooKeeper) |
| kafka-ui | provectuslabs/kafka-ui:latest | 8080 | Web dashboard — topics, consumers, messages |
docker-compose.yml
docker-compose.yml
version: '3.8'
services:
kafka:
image: apache/kafka:3.9.0
container_name: kafka
ports:
- "9092:9092"
environment:
# KRaft mode — không cần ZooKeeper
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
# Topic defaults
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
healthcheck:
test: ["CMD", "/opt/kafka/bin/kafka-topics.sh", "--bootstrap-server", "localhost:9092", "--list"]
interval: 10s
timeout: 5s
retries: 5
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
depends_on:
kafka:
condition: service_healthy
Chỉ dùng cho local dev
Config này replication_factor=1, không có authentication, không có persistence volume.
Dữ liệu mất khi container restart. Bài 6 sẽ cover production-ready setup.
Khởi động và verify
1
Khởi động cluster
docker compose up -d
Lần đầu sẽ pull image (~500MB). Kafka mất khoảng 15-20 giây để ready.
2
Kiểm tra status
docker compose ps
# kafka running (healthy)
# kafka-ui running
Kafka UI có thể hiển thị "Connection refused" vài giây đầu — chờ healthcheck pass.
3
Mở Kafka UI
Truy cập
http://localhost:8080 — bạn sẽ thấy cluster "local" với 0 topic.CLI operations: topic, produce, consume
Kafka ship kèm các script CLI trong /opt/kafka/bin/. Exec vào container để chạy:
Tạo topic
bash
# Tạo topic với 3 partitions
docker exec kafka /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--topic order-events \
--partitions 3 \
--replication-factor 1
# List tất cả topic
docker exec kafka /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
# Chi tiết một topic
docker exec kafka /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--describe \
--topic order-events
Produce message từ CLI
bash — console producer
docker exec -it kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic order-events \
--property "parse.key=true" \
--property "key.separator=:"
# Nhập message theo format key:value
> order-1:{"id":1,"status":"placed","total":150000}
> order-2:{"id":2,"status":"placed","total":280000}
> order-1:{"id":1,"status":"paid","total":150000}
# Ctrl+C để thoát
Consume message từ CLI
bash — console consumer
# Đọc từ đầu (--from-beginning)
docker exec kafka /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic order-events \
--from-beginning \
--property "print.key=true" \
--property "key.separator= | "
# Output:
# order-1 | {"id":1,"status":"placed","total":150000}
# order-2 | {"id":2,"status":"placed","total":280000}
# order-1 | {"id":1,"status":"paid","total":150000}
Quan sát ordering
Chạy --describe sẽ thấy order-1 và order-2 nằm ở các partition khác nhau (do khác key hash).
Nhưng cả hai event của order-1 đều ở cùng partition, theo đúng thứ tự.
Python quickstart với confluent-kafka
bash — cài thư viện
pip install confluent-kafka
Producer
producer.py
from confluent_kafka import Producer
import json
conf = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(conf)
def delivery_report(err, msg):
if err:
print(f'Delivery failed: {err}')
else:
print(f'Delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}')
orders = [
{'order_id': 'ord-001', 'status': 'placed', 'amount': 150000},
{'order_id': 'ord-002', 'status': 'placed', 'amount': 280000},
{'order_id': 'ord-001', 'status': 'paid', 'amount': 150000},
]
for order in orders:
producer.produce(
topic='order-events',
key=order['order_id'], # key → routing đến cùng partition
value=json.dumps(order),
callback=delivery_report
)
producer.flush() # đảm bảo gửi hết trước khi thoát
Consumer
consumer.py
from confluent_kafka import Consumer, KafkaError
import json
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processor', # consumer group ID
'auto.offset.reset': 'earliest', # đọc từ đầu nếu chưa có offset
'enable.auto.commit': 'true', # tự commit offset (đơn giản nhất)
}
consumer = Consumer(conf)
consumer.subscribe(['order-events'])
try:
while True:
msg = consumer.poll(timeout=1.0) # poll 1 giây
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise Exception(msg.error())
order = json.loads(msg.value().decode('utf-8'))
print(f"[P{msg.partition()}|O{msg.offset()}] {order['order_id']}: {order['status']}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
Kafka UI — explore qua browser
Kafka UI tại http://localhost:8080 cho phép bạn:
- Topics — xem list topic, số partition, offset, retention
- Messages — browse message trong partition, search theo key/value/offset
- Consumers — xem consumer groups, lag (số message chưa đọc)
- Brokers — thông tin cluster, disk usage, config
- Schema Registry — nếu bạn thêm Schema Registry vào compose
Consumer Lag
Metric quan trọng nhất khi monitor Kafka production là consumer lag —
số message trong topic mà consumer group chưa xử lý. Lag tăng liên tục = consumer không đủ nhanh.
Kafka UI hiển thị lag real-time cho từng partition.
Lỗi thường gặp và cách fix
| Lỗi | Nguyên nhân | Fix |
|---|---|---|
Connection refused 9092 |
Kafka chưa ready hoặc healthcheck fail | Chờ 20-30 giây, check docker compose logs kafka |
LEADER_NOT_AVAILABLE |
Topic mới tạo, leader election chưa xong | Chờ vài giây rồi retry |
| Kafka UI không kết nối được | Kafka UI dùng service name kafka trong Docker network, nhưng Kafka chưa ready |
Restart kafka-ui: docker compose restart kafka-ui |
| Consumer không nhận message | auto.offset.reset=latest — consumer chỉ đọc message mới sau khi subscribe |
Đổi thành earliest hoặc produce message mới |
| Port 8080 bị chiếm | Process khác đang dùng 8080 (thường là Tomcat, Spring Boot) | Đổi port trong compose: "8090:8080" |
Checklist verify setup hoàn chỉnh
docker compose ps— cả hai service đều running (healthy)- Tạo topic
order-eventsthành công không có lỗi --listhiển thịorder-events- Produce 3 message từ CLI hoặc Python
- Consume hiển thị đúng 3 message theo thứ tự trong từng partition
- Kafka UI tại
localhost:8080thấy topic và consumer group
Series Kafka từ A-Z 6 bài
- Bài 1: Kafka là gì? Core concepts & architecture
- Bài 2: Cài đặt với Docker — Kafka UI trong 15 phút
- Bài 3: Producer deep dive — acks, batching, ordering
- Bài 4: Consumer & Consumer Groups — offset, rebalancing
- Bài 5: Partitions & Replication — durability đảm bảo thế nào
- Bài 6: Kafka trong Production — monitoring, tuning, common issues