Skip to content

Home

๐Ÿ kafkac โ€” A Kafka Consumer framework for python

kafkac is a minimal, opinionated framework for building reliable Kafka consumers in Python using the confluent-kafka client. It abstracts away the boilerplate of manual offset handling, shutdown coordination, and message deserialization - giving you a clean async interface for consuming messages safely and predictably.

[!IMPORTANT] kafkac prioritises correctness and speed, in that order, avoiding message loss at all costs.

[!CAUTION] Always write your consumer to be idempotent. Guaranteeing you will never see a duplicate message is not trivial.


โš™๏ธ Core Features

  • โšก๏ธ Fully asynchronous message consumption
  • ๐Ÿงฌ Version-aware model deserialization (Pydantic)
  • ๐Ÿ›ก Handles common Kafka edge cases and failure scenarios
  • ๐Ÿ“ฆ Batch consumption to reduce RTT and executor overhead
  • ๐Ÿงพ Header-level message filtering support with out of the box filters
  • ๐Ÿ“Š Built-in metrics & OpenTelemetry integration
  • ๐Ÿงฉ Pluggable middleware for pre/post-processing
  • ๐Ÿชฆ Automatic dead-letter queueing for poison-pill messages
  • ๐Ÿ” Smart retries with exponential backoff
  • ๐Ÿง˜ Automatic rebalance management
  • โœจ And more...

Benchmarks

Below are some benchmarks that preload various levels of messages onto a topic, run a kafkac consumer to process those messages, writing the messages to another topic, confirming all the messages are accounted for.

// TODO


๐Ÿง  Quick Start

import asyncio

from kafkac import AsyncKafkaConsumer
from kafkac import PartitionResult
from confluent_kafka import Message


async def handler(messages: list[Message]) -> PartitionResult:
    return PartitionResult(succeeded=messages)


async def main():
    config = {
        "group.id": "foo",
        "bootstrap.servers": "localhost:9092",
    },
    async with AsyncKafkaConsumer(
            handler_func=handler,
            config=config,
            topic_regexes=["^topic$"],
            batch_size=1000,
    ) as consumer:
        await asyncio.sleep(60)
        await consumer.stop()
        # context manager will exit cleanly once the consumer has finalised.
        # last messages will be processed and handled before graceful exit.


if __name__ == "__main__":
    asyncio.gather(main())

Contributing

The project uses testcontainers to run an actual kafka container throughout integration tests to ensure it is tested against something that at least resembles the real world. In order for this to function, ensure the docker service is running.