Kafka Dart
STATUS Work in progress!
License: MIT
A pure Dart implementation of the Apache Kafka Protocol from scratch. This project provides a lightweight, efficient, and easy-to-use Kafka client for Dart applications. It includes support for producing, consuming, and administering Kafka topics, all implemented natively in Dart.
Main Classes
- KafkaProducer: Produce messages to Kafka topics.
- KafkaConsumer: Consume messages from Kafka topics.
- KafkaAdmin: Administer Kafka topics, partitions, and configurations.
- KafkaClient: Core client for handling generic controls such as queues, conncect and close.
- KafkaCluster: Responsible for handling interaction with Kafka Brokers.
Installation
Add the package to your pubspec.yaml:
dependencies:
dart-kafka:
git: https://github.com/EduHC/dart-kafka.git
Then, run dart pub get to install the package.
Usage
2. KafkaProducer
Produce messages to a Kafka topic:
import 'package:dart_kafka/dart_kafka.dart';
void main() async {
List<Broker> brokers = [
Broker(host: '192.168.200.131', port: 29092),
Broker(host: '192.168.200.131', port: 29093),
Broker(host: '192.168.200.131', port: 29094),
];
final KafkaClient kafka = KafkaClient(brokers: brokers);
await kafka.connect();
// Listen to the Streaming events received from Async requests
Future.microtask(
() => kafka.eventStream.listen(
(event) => print("[ASYNC response]: $event"),
),
);
KafkaAdmin admin = KafkaAdmin(kafka: kafka);
await admin.updateTopicsMetadata(
topics: [
'test-topic',
'notifications',
],
async: false,
apiVersion: 9,
clientId: 'dart-kafka',
allowAutoTopicCreation: false,
includeClusterAuthorizedOperations: false,
includeTopicAuthorizedOperations: false,
correlationId: null);
KafkaProducer producer = KafkaProducer(kafka: kafka);
producer.produce(
acks: -1,
timeoutMs: 1500,
topics: [
Topic(topicName: 'notifications', partitions: [
Partition(
id: 0,
batch: RecordBatch(records: [
Record(
attributes: 0,
timestampDelta: 0,
offsetDelta: 0,
timestamp: DateTime.now().millisecondsSinceEpoch,
value: '{"id": 1}')
]))
]),
],
async: true,
apiVersion: 11,
clientId: 'dart-kafka',
producerId: -1,
attributes: 0);
dynamic res = await producer.produce(
acks: -1,
timeoutMs: 1500,
topics: [
Topic(topicName: 'test-topic', partitions: [
Partition(
id: 0,
batch: RecordBatch(records: [
Record(
attributes: 0,
timestampDelta: 0,
offsetDelta: 0,
timestamp: DateTime.now().millisecondsSinceEpoch,
value: '{"test": "This is a test!"}')
]))
]),
],
async: false,
apiVersion: 11,
clientId: 'dart-kafka',
producerId: -1,
attributes: 0);
print("*********************************************");
print("[SYNC Response]: $res");
kafka.close();
return;
}
2. KafkaConsumer
Consume messages from a Kafka topic:
import 'package:dart_kafka/dart_kafka.dart';
void main() async {
List<Broker> brokers = [
Broker(host: '192.168.200.131', port: 29092),
Broker(host: '192.168.200.131', port: 29093),
Broker(host: '192.168.200.131', port: 29094),
];
final KafkaClient kafka = KafkaClient(brokers: brokers);
await kafka.connect();
// Listen to the Streaming events received from Async requests
Future.microtask(
() => kafka.eventStream.listen(
(event) => print("[ASYNC response]: $event"),
),
);
KafkaConsumer consumer = KafkaConsumer(kafka: kafka);
consumer.sendFetchRequest(
clientId: 'dart-kafka',
apiVersion: 8,
async: true,
topics: [
Topic(topicName: 'test-topic', partitions: [
Partition(id: 0, fetchOffset: 0),
])
]);
KafkaAdmin admin = kafka.admin;
await admin.updateTopicsMetadata(
topics: [
'test-topic',
'notifications',
],
async: false,
apiVersion: 9,
clientId: 'dart-kafka',
allowAutoTopicCreation: false,
includeClusterAuthorizedOperations: false,
includeTopicAuthorizedOperations: false,
correlationId: null);
dynamic res = await consumer.sendFetchRequest(
clientId: 'dart-kafka',
apiVersion: 8,
async: false,
topics: [
Topic(topicName: 'notifications', partitions: [
Partition(id: 0, fetchOffset: 0),
])
]);
print("*********************************************");
print("[SYNC Response]: $res");
kafka.close();
return;
}
4. KafkaAdmin
Administer Kafka topics:
import 'package:dart_kafka/dart_kafka.dart';
void main() async {
final KafkaClient kafka = KafkaClient(host: '192.168.3.55', port: 29092);
await kafka.connect();
// Listen to the Streaming events received from Async requests
Future.microtask(
() => kafka.eventStream.listen(
(event) => print("[ASYNC response]: $event"),
),
);
final admin = KafkaAdmin(kafka: kafka);
await admin.sendApiVersionRequest(apiVersion: 0, clientId: 'test');
await admin.sendMetadataRequest(
topics: ['test-topic'],
allowAutoTopicCreation: true,
includeClusterAuthorizedOperations: true,
includeTopicAuthorizedOperations: true,
clientId: 'test',
apiVersion: 12
);
await kafka.close();
}
Documentation
For mor detailed information about the Apache Kafka's Protocol, please see this link: https://kafka.apache.org/protocol
Contributing
Contributions are welcome! If you'd like to contribute, please follow these steps:
- Fork the repository.
- Create a new branch for your feature or bugfix.
- Commit your changes and push to your fork.
- Submit a pull request with a detailed description of your changes.
Development Setup
Clone the repository:
git clone https://github.com/EduHC/dart-kafka
cd dart-kafka
Install dependencies:
dart pub get
License
This project is licensed under the MIT License. See the LICENSE file for details.
Acknowledgments
Inspired by the Apache Kafka Protocol.
Built with ❤️ and Dart.
Support
If you encounter any issues or have questions, please open an issue. '