kafka_dart 1.0.1-alpha
kafka_dart: ^1.0.1-alpha copied to clipboard
High-performance Kafka client for Dart using librdkafka with Domain-Driven Design architecture.
Kafka Dart #
Important
EXPERIMENTAL - This package is experimental and might introduce breaking changes. Use with caution in production environments.
A high-performance Kafka client for Dart using librdkafka with Domain-Driven Design architecture. Supports both mock implementations for testing and real Kafka connectivity via FFI bindings.
๐ Features #
- ๐ Type-safe: Strong typing with value objects and domain entities
- ๐๏ธ Clean Architecture: Domain-Driven Design with layered architecture
- โ ๏ธ Error Handling: Comprehensive exception hierarchy with meaningful error messages
- ๐งช Testable: Clean separation of concerns enables easy unit testing with mocks
- โก High Performance: Built on librdkafka FFI bindings for production use
- ๐ฏ Simple API: Easy-to-use factory methods and service classes
- ๐ณ Docker Ready: Includes Docker Compose setup for local development
- ๐ Full Coverage: 98 % test coverage with comprehensive test suite
๐ฆ Installation #
Add this to your package's pubspec.yaml file:
dependencies:
kafka_dart: ^1.0.0
Then run:
dart pub get
Prerequisites #
You need to install librdkafka on your system:
Ubuntu/Debian:
sudo apt-get install librdkafka-dev
Fedora/RHEL:
sudo dnf install librdkafka-devel
macOS:
brew install librdkafka
And then generate the ffi bindings with:
dart run ffigen
๐ Quick Start #
Producer Example #
import 'package:kafka_dart/kafka_dart.dart';
Future<void> main() async {
// For real Kafka connectivity
final producer = await KafkaFactory.createAndInitializeProducer(
bootstrapServers: 'localhost:9092',
);
try {
await producer.sendMessage(
topic: 'my-topic',
payload: 'Hello, Kafka!',
key: 'message-key',
);
await producer.flush();
print('โ
Message sent to Kafka!');
} finally {
await producer.close();
}
}
Consumer Example #
import 'package:kafka_dart/kafka_dart.dart';
Future<void> main() async {
// For real Kafka connectivity
final consumer = await KafkaFactory.createAndInitializeConsumer(
bootstrapServers: 'localhost:9092',
groupId: 'my-consumer-group',
);
try {
await consumer.subscribe(['my-topic']);
for (int i = 0; i < 10; i++) {
final message = await consumer.pollMessage();
if (message != null) {
print('๐จ Received: ${message.payload.value}');
print('๐ Key: ${message.key.hasValue ? message.key.value : 'null'}');
await consumer.commitAsync();
}
await Future.delayed(Duration(seconds: 1));
}
} finally {
await consumer.close();
}
}
Testing with Mocks #
import 'package:kafka_dart/kafka_dart.dart';
Future<void> main() async {
// For testing - uses mock implementation
final producer = await KafkaFactory.createAndInitializeProducer(
bootstrapServers: 'localhost:9092',
useMock: true, // Safe for testing
);
try {
await producer.sendMessage(
topic: 'test-topic',
payload: 'Test message',
key: 'test-key',
);
print('โ
Mock message sent!');
} finally {
await producer.close();
}
}
๐๏ธ Architecture #
This library follows Domain-Driven Design principles with a clean, layered architecture:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Presentation Layer โ
โ (Public API & Factories) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Application Layer โ
โ (Services & Use Cases) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Domain Layer โ
โ (Entities, Value Objects, etc.) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Infrastructure Layer โ
โ (FFI Bindings & Repositories) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Layers #
- ๐จ Presentation Layer: Public API and factory classes
- โ๏ธ Application Layer: Use cases and application services
- ๐ง Domain Layer: Core business logic with entities, value objects, and repository interfaces
- ๐ง Infrastructure Layer: FFI bindings to librdkafka and repository implementations
๐ ๏ธ Development & Testing #
This project includes a comprehensive Makefile for common development tasks:
Testing Commands #
make test # Run all tests (uses mocks)
make test-coverage # Run tests with coverage (excludes infrastructure)
make coverage-html # Generate HTML coverage report and open in browser
make lint # Run Dart analyzer
Development Commands #
make docs # Generate API documentation
make generate # Regenerate FFI bindings from librdkafka
make clean # Clean coverage files
Kafka Development Environment #
make kafka-setup # Start Kafka + create test topics + open UI
make kafka-up # Start Kafka cluster only
make kafka-down # Stop Kafka cluster
The Kafka setup includes:
- Kafka broker on
localhost:9092 - Kafka UI on
http://localhost:8080for monitoring - Pre-created topics for testing
๐ Testing #
Automated Testing #
# Run all unit tests (213 tests, 98% coverage)
make test
# Generate coverage report
make test-coverage
make coverage-html
All tests use mock implementations for safety and speed. The test suite covers:
- Domain entities and value objects
- Application services
- Repository interfaces
- Integration scenarios
Manual Testing with Real Kafka #
1. Start Kafka environment:
make kafka-setup
2. Test with real FFI bindings:
dart run example/real_kafka_test.dart
3. Monitor with Kafka UI: Visit http://localhost:8080
4. Use Kafka CLI tools:
# Send messages
docker exec -it kafka kafka-console-producer --topic test-topic --bootstrap-server localhost:9092
# Read messages
docker exec -it kafka kafka-console-consumer --topic test-topic --from-beginning --bootstrap-server localhost:9092
๐ Coverage #
Current test coverage: 98%
Coverage excludes:
- FFI bindings (auto-generated from librdkafka)
- Infrastructure adapters (tested through integration)
Generate coverage reports:
make test-coverage # Generate lcov.info
make coverage-html # Generate HTML report and open
๐ Implementation Modes #
Mock Mode (Default for Testing) #
final producer = await KafkaFactory.createAndInitializeProducer(
bootstrapServers: 'localhost:9092',
useMock: true, // Safe, no external dependencies
);
Real Kafka Mode (Production) #
final producer = await KafkaFactory.createAndInitializeProducer(
bootstrapServers: 'localhost:9092',
// Uses librdkafka FFI bindings by default
);
The factory automatically chooses the appropriate repository implementation based on the useMock parameter.
๐ ๏ธ Project Structure #
lib/
โโโ src/
โ โโโ application/ # Application services
โ โ โโโ services/ # Kafka producer/consumer services
โ โโโ domain/ # Domain layer
โ โ โโโ entities/ # Domain entities
โ โ โโโ value_objects/ # Value objects
โ โ โโโ repositories/ # Repository interfaces
โ โ โโโ exceptions/ # Domain exceptions
โ โโโ infrastructure/ # Infrastructure layer
โ โ โโโ bindings/ # FFI bindings to librdkafka
โ โ โโโ repositories/ # Repository implementations
โ โ โโโ factories/ # Service factories
โ โโโ presentation/ # Public API (future)
โโโ kafka_dart.dart # Main export file
example/
โโโ real_kafka_test.dart # Real Kafka testing
โโโ manual_kafka_test.dart # Manual testing with mocks
โโโ ...
test/ # Comprehensive test suite
โโโ integration_test.dart # Integration tests (mocks)
โโโ domain/ # Domain layer tests
โโโ application/ # Application layer tests
โโโ ...
docker-compose.yml # Kafka development environment
Makefile # Development commands
scripts/setup-kafka.sh # Kafka setup automation
๐ง FFI Implementation #
The library includes full FFI bindings to librdkafka:
Producer Features:
- โ Message sending with keys and headers
- โ Partition selection (specific or automatic)
- โ Synchronous flushing
- โ Proper resource cleanup
Consumer Features:
- โ Topic subscription
- โ Message polling with timeout
- โ Offset management (sync/async commits)
- โ Consumer group coordination
Configuration:
- Uses librdkafka-compatible properties
- Automatic platform detection (Linux/macOS/Windows)
- Error handling with meaningful exceptions
๐ Examples #
See the /example folder for complete working examples:
real_kafka_test.dart- Real FFI implementation testmanual_kafka_test.dart- Mock implementation testproducer_example.dart- Producer patternsconsumer_example.dart- Consumer patterns
๐ค Contributing #
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Run tests (
make test) - Run linting (
make lint) - Test with real Kafka (
make kafka-setup && dart run example/real_kafka_test.dart) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Development Guidelines #
- All new features must include tests
- Use mock implementations in unit tests
- Test real implementations manually with Docker Kafka
- Follow Domain-Driven Design principles
- Maintain 95%+ test coverage
๐ License #
This project is licensed under the Apache 2.0 License - see the LICENSE file for details.
Copyright 2025 ยฉ Stefano Amorelli
๐ Acknowledgments #
- Built on top of librdkafka - the high-performance C/C++ Kafka client;
- Inspired by Domain-Driven Design principles;
- Thanks to the Dart FFI contributors for excellent tooling;
- Thanks to the Kafka community for robust ecosystem.
๐ Links #
- Package: https://pub.dev/packages/kafka_dart
- Repository: https://github.com/stefanoamorelli/kafka_dart
- Issues: https://github.com/stefanoamorelli/kafka_dart/issues
- librdkafka: https://github.com/confluentinc/librdkafka
- Kafka Documentation: https://kafka.apache.org/documentation/