Home
๐ kafkac โ A Kafka Consumer framework for python¶
kafkac is a minimal, opinionated framework for building reliable Kafka consumers in Python using the confluent-kafka client. It abstracts away the boilerplate of manual offset handling, shutdown coordination, and message deserialization - giving you a clean async interface for consuming messages safely and predictably.
[!IMPORTANT] kafkac prioritises correctness and speed, in that order, avoiding message loss at all costs.
[!CAUTION] Always write your consumer to be idempotent. Guaranteeing you will never see a duplicate message is not trivial.
โ๏ธ Core Features¶
- โก๏ธ Fully asynchronous message consumption
- ๐งฌ Version-aware model deserialization (Pydantic)
- ๐ก Handles common Kafka edge cases and failure scenarios
- ๐ฆ Batch consumption to reduce RTT and executor overhead
- ๐งพ Header-level message filtering support with out of the box filters
- ๐ Built-in metrics & OpenTelemetry integration
- ๐งฉ Pluggable middleware for pre/post-processing
- ๐ชฆ Automatic dead-letter queueing for poison-pill messages
- ๐ Smart retries with exponential backoff
- ๐ง Automatic rebalance management
- โจ And more...
Benchmarks¶
Below are some benchmarks that preload various levels of messages onto a topic, run a kafkac consumer to
process those messages, writing the messages to another topic, confirming all the messages are accounted for.
// TODO
๐ง Quick Start¶
import asyncio
from kafkac import AsyncKafkaConsumer
from kafkac import PartitionResult
from confluent_kafka import Message
async def handler(messages: list[Message]) -> PartitionResult:
return PartitionResult(succeeded=messages)
async def main():
config = {
"group.id": "foo",
"bootstrap.servers": "localhost:9092",
},
async with AsyncKafkaConsumer(
handler_func=handler,
config=config,
topic_regexes=["^topic$"],
batch_size=1000,
) as consumer:
await asyncio.sleep(60)
await consumer.stop()
# context manager will exit cleanly once the consumer has finalised.
# last messages will be processed and handled before graceful exit.
if __name__ == "__main__":
asyncio.gather(main())
Contributing¶
The project uses testcontainers to run an actual kafka container throughout integration tests to ensure it
is tested against something that at least resembles the real world. In order for this to function, ensure the
docker service is running.