Về trang blog
Series: Kafka từ A-Z · Bài 3/6
Kafka Producer Deep Dive
acks, batching, ordering không mất data

Producer flow: từ .produce() đến broker

Khi bạn gọi producer.produce(topic, key, value), Kafka client thực hiện các bước sau:

  1. Serialization — key và value được serialize thành byte[]
  2. Partition assignment — quyết định message vào partition nào (dựa trên key hash hoặc round-robin)
  3. Record accumulator — message được buffer vào batch trong memory, tổ chức theo (topic, partition)
  4. Sender thread — background thread gom các batch đủ điều kiện và gửi đến broker leader
  5. Broker acknowledgment — broker trả lời theo cấu hình acks
  6. Callback / delivery report — producer notify kết quả (success/error)
Async by default
produce() là non-blocking — nó chỉ đẩy message vào buffer. flush() hoặc poll() mới thực sự gửi. Nếu process crash trước khi flush, message trong buffer sẽ mất.

acks — durability guarantee quan trọng nhất

acks kiểm soát khi nào broker coi là "đã nhận" message. Đây là trade-off giữa tốc độ và độ an toàn:

acks=0
Fire & Forget
Producer không chờ bất kỳ ack nào. Gửi xong là quên. Nếu broker crash trước khi ghi — message mất hoàn toàn.
Dùng cho: metrics, logs không quan trọng
acks=1
Leader ACK
Leader broker ghi vào disk xong mới ack. Nếu leader crash trước khi replica kịp sync — message vẫn có thể mất.
Dùng cho: high-throughput, chấp nhận mất ít
acks=all
All ISR ACK
Tất cả In-Sync Replicas đều ghi xong mới ack. Mạnh nhất — message không mất ngay cả khi leader crash.
Dùng cho: financial, order, critical data
acks=all không đủ một mình
acks=all chỉ có ý nghĩa khi kết hợp với min.insync.replicas=2 ở topic/broker config. Nếu chỉ còn 1 ISR và min.insync.replicas=1, thực chất ngang với acks=1.

Batching & linger.ms

Kafka producer không gửi từng message một — nó gom lại thành batch để tối ưu network. Hai config kiểm soát batch:

ConfigDefaultÝ nghĩaTune khi nào
batch.size 16KB Kích thước tối đa của một batch (bytes). Batch đầy → gửi ngay. Tăng lên 64KB-1MB nếu throughput cao và có thể chấp nhận latency nhỏ
linger.ms 0ms Thời gian chờ thêm message vào batch dù chưa đầy. 0 = gửi ngay. Set 5-20ms để tăng batch size khi throughput không đều
buffer.memory 32MB Tổng memory cho record accumulator. Đầy → produce() block. Tăng nếu producer tạo message nhanh hơn tốc độ gửi
python — high-throughput config
conf = {
    'bootstrap.servers': 'localhost:9092',
    'acks': 'all',
    'retries': 3,
    'linger.ms': 10,          # chờ 10ms để gom batch
    'batch.size': 65536,       # 64KB batch size
    'compression.type': 'snappy', # compress batch trước khi gửi
    'enable.idempotence': 'true',  # không duplicate khi retry
}

Partitioner — message đi về partition nào?

  • Key không null — hash(key) % numPartitions. Cùng key → cùng partition → đảm bảo ordering
  • Key null, Kafka < 2.4 — round-robin giữa các partition
  • Key null, Kafka ≥ 2.4 — sticky partitioner: gom vào một partition cho đến khi batch đủ lớn, rồi đổi → throughput tốt hơn round-robin
Key quan trọng hơn bạn nghĩ
Nếu cần order, luôn dùng key. Nếu không cần order và muốn phân phối đều, để key là null. Tránh dùng key mà chỉ có vài giá trị (ví dụ: region) — sẽ gây hot partition (một partition nhận 90% traffic trong khi các partition khác rảnh).

Compression

Kafka hỗ trợ compress batch trước khi gửi. Producer compress, broker lưu compressed, consumer decompress. Broker không cần decode để route.

AlgorithmRatioCPUBest for
none1xKhôngLatency cực thấp, không quan tâm bandwidth
gzipTốt nhất (~5x)CaoCold storage, không nhạy cảm latency
snappyTốt (~3x)ThấpBalance — phù hợp cho hầu hết use case
lz4Trung bình (~2x)Rất thấpLatency nhạy cảm, throughput cao
zstdTốt nhất (~5x)Trung bìnhKafka 2.1+, modern default tốt nhất

Idempotent Producer — tránh duplicate khi retry

Khi broker crash sau khi ghi nhưng trước khi gửi ack, producer sẽ retry. Nếu broker thực ra đã ghi thành công, message sẽ bị ghi hai lần (duplicate).

Bật enable.idempotence=true: Kafka gán mỗi message một sequence number duy nhất. Broker phát hiện duplicate và discard, producer retry an toàn.

python
conf = {
    'bootstrap.servers': 'localhost:9092',
    'enable.idempotence': 'true',  # tự động set acks=all, retries=MAX
    # Khi enable.idempotence=true, các config sau được auto set:
    # acks=all, retries=INT_MAX, max.in.flight.requests.per.connection=5
}

Config cheat sheet theo use case

Use Caseackslinger.mscompressionidempotence
Metrics / Logs00lz4false
App events15snappyfalse
Order / Paymentall0-5snappytrue
CDC / Audit logall10zstdtrue
Bulk import150gzipfalse