Về trang blog
Series: Kafka từ A-Z · Bài 4/6
Kafka Consumer & Consumer Groups
Offset, Rebalancing & Delivery Semantics

Consumer Group — phân chia partition

Khi nhiều consumer join cùng group.id, Kafka tự động phân chia các partition của topic cho từng consumer. Quy tắc: mỗi partition chỉ được assign cho đúng một consumer trong group.

Topic: order-events (3 partitions) — Consumer Group: order-processor
3 consumer, 3 partition — phân chia đều 1:1
Consumer-1
Partition 0
Consumer-2
Partition 1
Consumer-3
Partition 2
4 consumer, 3 partition — Consumer-4 idle (không có partition để assign)
Consumer-1
Partition 0
Consumer-2 → P1 · Consumer-3 → P2
Consumer-4
idle — không có partition

Multiple consumer groups: Hai group khác nhau (order-processoranalytics) đọc cùng một topic hoàn toàn độc lập — mỗi group có offset riêng, không ảnh hưởng nhau. Đây là điểm mạnh của Kafka so với traditional message queue.

Offset management

Offset là vị trí consumer đã đọc đến trong mỗi partition. Kafka lưu offset trong internal topic __consumer_offsets. Consumer group commit offset để báo "đã xử lý đến đây".

Auto commit (enable.auto.commit=true)

Consumer tự động commit offset định kỳ (mặc định 5 giây). Đơn giản nhưng có thể gây mất message: commit xảy ra trước khi xử lý xong → crash → message coi như đã xử lý.

Manual commit

Bạn tự gọi consumer.commit() sau khi xử lý thành công. An toàn hơn nhưng phức tạp hơn. Có hai dạng: synchronous (commit()) và asynchronous (commit_async()).

Rebalancing

Rebalancing xảy ra khi:

  • Consumer mới join group
  • Consumer rời group (crash hoặc shutdown graceful)
  • Topic thêm partition mới
  • Consumer không gửi heartbeat kịp (session.timeout.ms hết)

Trong quá trình rebalance, toàn bộ consumer trong group dừng đọc (stop-the-world). Kafka 2.4+ có incremental cooperative rebalancing — chỉ revoke partition thực sự cần di chuyển, giảm downtime đáng kể.

Rebalance storm
Nếu consumer xử lý chậm (max.poll.interval.ms quá ngắn), Kafka coi là dead và trigger rebalance. Sau rebalance, consumer join lại, trigger rebalance tiếp. Vòng lặp này gọi là rebalance storm. Fix: tăng max.poll.interval.ms hoặc giảm lượng xử lý trong mỗi poll.

Delivery Semantics

At-most-once
Message có thể mất, không bao giờ duplicate. Commit offset trước khi xử lý.
auto.commit=true với commit trước process
At-least-once
Message không mất, nhưng có thể duplicate nếu crash sau xử lý và trước commit.
Manual commit sau khi xử lý thành công
Exactly-once
Không mất, không duplicate. Cần idempotent consumer hoặc transactional producer.
Kafka Transactions + idempotent processing
Thực tế
Hầu hết hệ thống dùng at-least-once + idempotent consumer — xử lý có thể chạy nhiều lần nhưng kết quả vẫn đúng (ví dụ: upsert thay vì insert, check exists trước khi tạo). Exactly-once phức tạp hơn nhiều và thường chỉ cần thiết cho financial transactions.

Configs quan trọng

ConfigDefaultÝ nghĩa
group.idConsumer group identifier. Bắt buộc phải set.
auto.offset.resetlatestearliest = đọc từ đầu topic khi chưa có offset. latest = chỉ đọc message mới.
enable.auto.committrueAuto commit mỗi auto.commit.interval.ms (5000ms). Set false để manual commit.
max.poll.records500Số record tối đa mỗi lần poll. Giảm nếu xử lý mỗi record chậm.
max.poll.interval.ms300000 (5 phút)Thời gian tối đa giữa hai lần poll. Vượt quá → consumer bị coi là dead, trigger rebalance.
session.timeout.ms45000msThời gian chờ heartbeat. Hết hạn → consumer bị kick khỏi group.
heartbeat.interval.ms3000msTần suất gửi heartbeat. Nên = session.timeout.ms / 3.

Manual commit pattern thực chiến

python — at-least-once với manual commit
from confluent_kafka import Consumer, KafkaError
import json

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processor',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': 'false',  # manual commit
}
consumer = Consumer(conf)
consumer.subscribe(['order-events'])

def process_order(order):
    # business logic — idempotent nếu có thể
    print(f"Processing {order['order_id']}: {order['status']}")

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None or msg.error():
            continue

        try:
            order = json.loads(msg.value())
            process_order(order)
            consumer.commit(msg)  # commit sau khi xử lý thành công
        except Exception as e:
            print(f"Error processing: {e}")
            # không commit → message sẽ được đọc lại sau rebalance/restart
finally:
    consumer.close()

Dead Letter Queue pattern

Nếu message không thể xử lý được (malformed, business rule fail), đừng để consumer loop mãi. Gửi message đó sang một topic khác (order-events-dlq) để xử lý riêng:

python — DLQ pattern
from confluent_kafka import Consumer, Producer, KafkaError
import json

MAX_RETRIES = 3

def send_to_dlq(producer, original_msg, error_reason):
    dlq_value = {
        'original_topic': original_msg.topic(),
        'original_partition': original_msg.partition(),
        'original_offset': original_msg.offset(),
        'error': str(error_reason),
        'payload': original_msg.value().decode('utf-8'),
    }
    producer.produce(
        topic='order-events-dlq',
        key=original_msg.key(),
        value=json.dumps(dlq_value)
    )
    producer.flush()