pulsar_iot_client 1.0.3 copy "pulsar_iot_client: ^1.0.3" to clipboard
pulsar_iot_client: ^1.0.3 copied to clipboard

Apache Pulsar IoT client

pulsar_iot_client #

A lightweight Apache Pulsar client primarily focused on the IoT telemetry domain. This project aims to improve the resulting performance of a Pulsar cluster / reduce operating costs by removing the AMQP / MQTT proxy and utilizing the message batching.

Core features:

  • fully asynchronous API (Producer, Consumer)
  • transparent topic rebalancing handling (producer/consumer recreated on a new broker)
  • supports message batching. In fact, each batch message is stored as a single ledger entry on the Pulsar side, making this a preferable way of relatively small periodic data updates handling
  • OAuth2 / token-based authentication with access token refreshing support
  • TLS-forced mode to impose app security

TODO:

  • partitioned topics support
  • payload schema support

Quick start #

Producer

// PulsarClient is an entry point
var client = PulsarClient(
  settings: PulsarClientSettings(
    serviceUrl: 'pulsar://localhost:6650',
  ),
);

// global errors (not associated with individual requests completion)
client.clientErrorStream.listen((Object error) {
});

// create Producer on the topic
var producer = await client.newProducer(
  ProducerCreateParams(topic: 'persistent://tenant/namespace/topic')
);

// send a single message. Result contains the messageId
var resultSingle = await producer.sendMessage(
  GenericMessage(
    propertyMap: {'key': 'value'},
    payload: Uint8List.fromList('Binary payload'.codeUnits),
  )
);

// close individual producer
producer.close();
// close client (with all associated producers, consumers and broker connections)
client.close();

Consumer

// create Consumer on the topic with defined subscription name and type
var consumer = await client.newConsumer(
  ConsumerCreateParams(
      topic: 'persistent://tenant/namespace/topic',
      subscription: 'subscription-name',
      subType: SubscriptionType.exclusive,
  )
);

// listen for incoming messages
consumer.listen((GenericInputMessage message) {
  // ack message (unless it's intermediate in a batch) 
  if (message.storageType != MessageStorageType.batchIntermediate) {
    consumer.ackMessage(message.messageId, false);
  }
});

// give permission to the broker to push next 100 messages (flow control)
await consumer.getFlow(100);

OAuth2 (using oauth2 package)

Future<String> _getAuthToken(String? data) {
  var completer = Completer<String>();

  // alternatively, call refreshCredentials() on existing oauth2 client if supported by the particular grant type 
  oauth2.clientCredentialsGrant(
    oauth2Endpoint,
    oauth2Identifier,
    oauth2Secret,
  )
  .then((oauth2client) {
    completer.complete(oauth2client.credentials.accessToken);
  })
  .catchError((error, trace) { completer.completeError(error, trace); });

  return completer.future;
}

// The authTokenProvider function will be invoked to get/refresh an access token
// The periodic token refresh will be triggered by broker, there's no need to track expiration manually
var client = PulsarClient(
  settings: PulsarClientSettings(
    forceTLS: true,
    serviceUrl: 'pulsar+ssl://broker.my-domain.com:6651',
    authTokenProvider: _getAuthToken,
    authTokenRefreshSupported: true,
  ),
);
4
likes
120
points
30
downloads

Publisher

verified publisheractiveobd.com

Weekly Downloads

Apache Pulsar IoT client

Repository (GitHub)

Documentation

API reference

License

MIT (license)

Dependencies

convert, crclib, fixnum, logging, protobuf, tuple

More

Packages that depend on pulsar_iot_client