kafka_dart 1.0.1-alpha copy "kafka_dart: ^1.0.1-alpha" to clipboard
kafka_dart: ^1.0.1-alpha copied to clipboard

High-performance Kafka client for Dart using librdkafka with Domain-Driven Design architecture.

Kafka Dart #

Pub Version Dart SDK License: Apache 2.0 Coverage FFI Support

Important

EXPERIMENTAL - This package is experimental and might introduce breaking changes. Use with caution in production environments.

A high-performance Kafka client for Dart using librdkafka with Domain-Driven Design architecture. Supports both mock implementations for testing and real Kafka connectivity via FFI bindings.

๐Ÿš€ Features #

dart-kafka

  • ๐Ÿ”’ Type-safe: Strong typing with value objects and domain entities
  • ๐Ÿ—๏ธ Clean Architecture: Domain-Driven Design with layered architecture
  • โš ๏ธ Error Handling: Comprehensive exception hierarchy with meaningful error messages
  • ๐Ÿงช Testable: Clean separation of concerns enables easy unit testing with mocks
  • โšก High Performance: Built on librdkafka FFI bindings for production use
  • ๐ŸŽฏ Simple API: Easy-to-use factory methods and service classes
  • ๐Ÿณ Docker Ready: Includes Docker Compose setup for local development
  • ๐Ÿ“Š Full Coverage: 98 % test coverage with comprehensive test suite

๐Ÿ“ฆ Installation #

Add this to your package's pubspec.yaml file:

dependencies:
  kafka_dart: ^1.0.0

Then run:

dart pub get

Prerequisites #

You need to install librdkafka on your system:

Ubuntu/Debian:

sudo apt-get install librdkafka-dev

Fedora/RHEL:

sudo dnf install librdkafka-devel

macOS:

brew install librdkafka

And then generate the ffi bindings with:

dart run ffigen

๐Ÿš€ Quick Start #

Producer Example #

import 'package:kafka_dart/kafka_dart.dart';

Future<void> main() async {
  // For real Kafka connectivity
  final producer = await KafkaFactory.createAndInitializeProducer(
    bootstrapServers: 'localhost:9092',
  );

  try {
    await producer.sendMessage(
      topic: 'my-topic',
      payload: 'Hello, Kafka!',
      key: 'message-key',
    );
    await producer.flush();
    print('โœ… Message sent to Kafka!');
  } finally {
    await producer.close();
  }
}

Consumer Example #

import 'package:kafka_dart/kafka_dart.dart';

Future<void> main() async {
  // For real Kafka connectivity  
  final consumer = await KafkaFactory.createAndInitializeConsumer(
    bootstrapServers: 'localhost:9092',
    groupId: 'my-consumer-group',
  );

  try {
    await consumer.subscribe(['my-topic']);
    
    for (int i = 0; i < 10; i++) {
      final message = await consumer.pollMessage();
      if (message != null) {
        print('๐Ÿ“จ Received: ${message.payload.value}');
        print('๐Ÿ”‘ Key: ${message.key.hasValue ? message.key.value : 'null'}');
        await consumer.commitAsync();
      }
      await Future.delayed(Duration(seconds: 1));
    }
  } finally {
    await consumer.close();
  }
}

Testing with Mocks #

import 'package:kafka_dart/kafka_dart.dart';

Future<void> main() async {
  // For testing - uses mock implementation
  final producer = await KafkaFactory.createAndInitializeProducer(
    bootstrapServers: 'localhost:9092',
    useMock: true, // Safe for testing
  );

  try {
    await producer.sendMessage(
      topic: 'test-topic',
      payload: 'Test message',
      key: 'test-key',
    );
    print('โœ… Mock message sent!');
  } finally {
    await producer.close();
  }
}

๐Ÿ—๏ธ Architecture #

This library follows Domain-Driven Design principles with a clean, layered architecture:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚         Presentation Layer          โ”‚
โ”‚      (Public API & Factories)       โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚         Application Layer           โ”‚
โ”‚     (Services & Use Cases)          โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚           Domain Layer              โ”‚
โ”‚   (Entities, Value Objects, etc.)   โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚        Infrastructure Layer         โ”‚
โ”‚   (FFI Bindings & Repositories)     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Layers #

  • ๐ŸŽจ Presentation Layer: Public API and factory classes
  • โš™๏ธ Application Layer: Use cases and application services
  • ๐Ÿง  Domain Layer: Core business logic with entities, value objects, and repository interfaces
  • ๐Ÿ”ง Infrastructure Layer: FFI bindings to librdkafka and repository implementations

๐Ÿ› ๏ธ Development & Testing #

This project includes a comprehensive Makefile for common development tasks:

Testing Commands #

make test              # Run all tests (uses mocks)
make test-coverage     # Run tests with coverage (excludes infrastructure)
make coverage-html     # Generate HTML coverage report and open in browser
make lint              # Run Dart analyzer

Development Commands #

make docs              # Generate API documentation
make generate          # Regenerate FFI bindings from librdkafka
make clean             # Clean coverage files

Kafka Development Environment #

make kafka-setup       # Start Kafka + create test topics + open UI
make kafka-up          # Start Kafka cluster only
make kafka-down        # Stop Kafka cluster

The Kafka setup includes:

  • Kafka broker on localhost:9092
  • Kafka UI on http://localhost:8080 for monitoring
  • Pre-created topics for testing

๐Ÿ“Š Testing #

Automated Testing #

# Run all unit tests (213 tests, 98% coverage)
make test

# Generate coverage report  
make test-coverage
make coverage-html

All tests use mock implementations for safety and speed. The test suite covers:

  • Domain entities and value objects
  • Application services
  • Repository interfaces
  • Integration scenarios

Manual Testing with Real Kafka #

1. Start Kafka environment:

make kafka-setup

2. Test with real FFI bindings:

dart run example/real_kafka_test.dart

3. Monitor with Kafka UI: Visit http://localhost:8080

4. Use Kafka CLI tools:

# Send messages
docker exec -it kafka kafka-console-producer --topic test-topic --bootstrap-server localhost:9092

# Read messages  
docker exec -it kafka kafka-console-consumer --topic test-topic --from-beginning --bootstrap-server localhost:9092

๐Ÿ“ˆ Coverage #

Current test coverage: 98%

Coverage excludes:

  • FFI bindings (auto-generated from librdkafka)
  • Infrastructure adapters (tested through integration)

Generate coverage reports:

make test-coverage    # Generate lcov.info
make coverage-html    # Generate HTML report and open

๐Ÿ”„ Implementation Modes #

Mock Mode (Default for Testing) #

final producer = await KafkaFactory.createAndInitializeProducer(
  bootstrapServers: 'localhost:9092',
  useMock: true, // Safe, no external dependencies
);

Real Kafka Mode (Production) #

final producer = await KafkaFactory.createAndInitializeProducer(
  bootstrapServers: 'localhost:9092',
  // Uses librdkafka FFI bindings by default
);

The factory automatically chooses the appropriate repository implementation based on the useMock parameter.

๐Ÿ› ๏ธ Project Structure #

lib/
โ”œโ”€โ”€ src/
โ”‚   โ”œโ”€โ”€ application/         # Application services
โ”‚   โ”‚   โ””โ”€โ”€ services/        # Kafka producer/consumer services
โ”‚   โ”œโ”€โ”€ domain/             # Domain layer
โ”‚   โ”‚   โ”œโ”€โ”€ entities/       # Domain entities
โ”‚   โ”‚   โ”œโ”€โ”€ value_objects/  # Value objects
โ”‚   โ”‚   โ”œโ”€โ”€ repositories/   # Repository interfaces
โ”‚   โ”‚   โ””โ”€โ”€ exceptions/     # Domain exceptions
โ”‚   โ”œโ”€โ”€ infrastructure/     # Infrastructure layer
โ”‚   โ”‚   โ”œโ”€โ”€ bindings/       # FFI bindings to librdkafka
โ”‚   โ”‚   โ”œโ”€โ”€ repositories/   # Repository implementations
โ”‚   โ”‚   โ””โ”€โ”€ factories/      # Service factories
โ”‚   โ””โ”€โ”€ presentation/       # Public API (future)
โ””โ”€โ”€ kafka_dart.dart        # Main export file

example/
โ”œโ”€โ”€ real_kafka_test.dart    # Real Kafka testing
โ”œโ”€โ”€ manual_kafka_test.dart  # Manual testing with mocks
โ””โ”€โ”€ ...

test/                       # Comprehensive test suite
โ”œโ”€โ”€ integration_test.dart   # Integration tests (mocks)
โ”œโ”€โ”€ domain/                 # Domain layer tests
โ”œโ”€โ”€ application/            # Application layer tests
โ””โ”€โ”€ ...

docker-compose.yml          # Kafka development environment
Makefile                   # Development commands
scripts/setup-kafka.sh     # Kafka setup automation

๐Ÿ”ง FFI Implementation #

The library includes full FFI bindings to librdkafka:

Producer Features:

  • โœ… Message sending with keys and headers
  • โœ… Partition selection (specific or automatic)
  • โœ… Synchronous flushing
  • โœ… Proper resource cleanup

Consumer Features:

  • โœ… Topic subscription
  • โœ… Message polling with timeout
  • โœ… Offset management (sync/async commits)
  • โœ… Consumer group coordination

Configuration:

  • Uses librdkafka-compatible properties
  • Automatic platform detection (Linux/macOS/Windows)
  • Error handling with meaningful exceptions

๐Ÿ“š Examples #

See the /example folder for complete working examples:

  • real_kafka_test.dart - Real FFI implementation test
  • manual_kafka_test.dart - Mock implementation test
  • producer_example.dart - Producer patterns
  • consumer_example.dart - Consumer patterns

๐Ÿค Contributing #

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Run tests (make test)
  4. Run linting (make lint)
  5. Test with real Kafka (make kafka-setup && dart run example/real_kafka_test.dart)
  6. Commit your changes (git commit -m 'Add amazing feature')
  7. Push to the branch (git push origin feature/amazing-feature)
  8. Open a Pull Request

Development Guidelines #

  • All new features must include tests
  • Use mock implementations in unit tests
  • Test real implementations manually with Docker Kafka
  • Follow Domain-Driven Design principles
  • Maintain 95%+ test coverage

๐Ÿ“„ License #

This project is licensed under the Apache 2.0 License - see the LICENSE file for details.

Copyright 2025 ยฉ Stefano Amorelli

๐Ÿ™ Acknowledgments #

  • Built on top of librdkafka - the high-performance C/C++ Kafka client;
  • Inspired by Domain-Driven Design principles;
  • Thanks to the Dart FFI contributors for excellent tooling;
  • Thanks to the Kafka community for robust ecosystem.
6
likes
0
points
43
downloads

Publisher

verified publisheramorelli.tech

Weekly Downloads

High-performance Kafka client for Dart using librdkafka with Domain-Driven Design architecture.

Repository (GitHub)
View/report issues

License

unknown (license)

Dependencies

ffi, ffigen

More

Packages that depend on kafka_dart