pub_sub 2.3.0

  • Readme
  • Changelog
  • Example
  • Installing
  • 56

pub_sub #

Pub build status

Keep application instances in sync with a simple pub/sub API.

Installation #

Add pub_sub as a dependency in your pubspec.yaml file:

dependencies:
  pub_sub: ^1.0.0

Then, be sure to run pub get in your terminal.

Usage #

pub_sub is your typical pub/sub API. However, pub_sub enforces authentication of every request. It is very possible that pub_sub will run on both servers and in the browser, or on a platform like Flutter. Thus, there are provisions available to limit access.

Be careful to not leak any pub_sub client ID's if operating over a network. If you do, you risk malicious users injecting events into your application, which could ultimately spell disaster.

A pub_sub server can operate across multiple adapters, which take care of interfacing data over different media. For example, a single server can handle pub/sub between multiple Isolates and TCP Sockets, as well as WebSockets, simultaneously.

import 'package:pub_sub/pub_sub.dart' as pub_sub;

main() async {
  var server = new pub_sub.Server([
    new FooAdapter(...),
    new BarAdapter(...)
  ]);

  server.addAdapter(new BazAdapter(...));

  // Call `start` to activate adapters, and begin handling requests.
  server.start();
}

Trusted Clients #

You can use package:pub_sub without explicitly registering clients, if and only if those clients come from trusted sources.

Clients via Isolate are always trusted.

Clients via package:json_rpc_2 must be explicitly marked as trusted (i.e. using an IP whitelist mechanism):

new JsonRpc2Adapter(..., isTrusted: false);

// Pass `null` as Client ID when trusted...
new pub_sub.IsolateClient(null);

Access Control #

The ID's of all untrusted clients who will connect to the server must be known at start-up time. You may not register new clients after the server has started. This is mostly a security consideration; if it is impossible to register new clients, then malicious users cannot grant themselves additional privileges within the system.

import 'package:pub_sub/pub_sub.dart' as pub_sub;

main() async {
  // ...
  server.registerClient(const ClientInfo('<client-id>'));

  // Create a user who can subscribe, but not publish.
  server.registerClient(const ClientInfo('<client-id>', canPublish: false));

  // Create a user who can publish, but not subscribe.
  server.registerClient(const ClientInfo('<client-id>', canSubscribe: false));

  // Create a user with no privileges whatsoever.
  server.registerClient(const ClientInfo('<client-id>', canPublish: false, canSubscribe: false));

  server.start();
}

Isolates #

If you are just running multiple instances of a server, use package:pub_sub/isolate.dart.

You'll need one isolate to be the master. Typically this is the first isolate you create.

import 'dart:io';
import 'dart:isolate';
import 'package:pub_sub/isolate.dart' as pub_sub;
import 'package:pub_sub/pub_sub.dart' as pub_sub;

main() async {
  // Easily bring up a server.
  var adapter = new pub_sub.IsolateAdapter();
  var server = new pub_sub.Server([adapter]);

  // You then need to create a client that will connect to the adapter.
  // Each isolate in your application should contain a client.
  for (int i = 0; i < Platform.numberOfProcessors - 1; i++) {
    server.registerClient(new pub_sub.ClientInfo('client$i'));
  }

  // Start the server.
  server.start();

  // Next, let's start isolates that interact with the server.
  //
  // Fortunately, we can send SendPorts over Isolates, so this is no hassle.
  for (int i = 0; i < Platform.numberOfProcessors - 1; i++)
    Isolate.spawn(isolateMain, [i, adapter.receivePort.sendPort]);

  // It's possible that you're running your application in the server isolate as well:
  isolateMain([0, adapter.receivePort.sendPort]);
}

void isolateMain(List args) {
  var client =
      new pub_sub.IsolateClient('client${args[0]}', args[1] as SendPort);

  // The client will connect automatically. In the meantime, we can start subscribing to events.
  client.subscribe('user::logged_in').then((sub) {
    // The `ClientSubscription` class extends `Stream`. Hooray for asynchrony!
    sub.listen((msg) {
      print('Logged in: $msg');
    });
  });
}

JSON RPC 2.0 #

If you are not running on isolates, you need to import package:pub_sub/json_rpc_2.dart. This library leverages package:json_rpc_2 and package:stream_channel to create clients and servers that can hypothetically run on any medium, i.e. WebSockets, or TCP Sockets.

Check out test/json_rpc_2_test.dart for an example of serving pub_sub over TCP sockets.

Protocol #

pub_sub is built upon a simple RPC, and this package includes an implementation that runs via SendPorts and ReceivePorts, as well as one that runs on any StreamChannel<String>.

Data sent over the wire looks like the following:

// Sent by a client to initiate an exchange.
interface Request {
  // This is an arbitrary string, assigned by your client, but in every case,
  // the client uses this to match your requests with asynchronous responses.
  request_id: string,
  
  // The ID of the client to authenticate as.
  // 
  // As you can imagine, this should be kept secret, to prevent breaches.
  client_id: string,

  // Required for *every* request.
  params: {
    // A value to be `publish`ed.
    value?: any,

    // The name of an event to `publish`.
    event_name?: string,

    // The ID of a subscription to be cancelled.
    subscription_id?: string
  }
}

/// Sent by the server in response to a request.
interface Response {
  // `true` for success, `false` for failures.
  status: boolean,
  
  // Only appears if `status` is `false`; explains why an operation failed.
  error_message?: string,

  // Matches the request_id sent by the client.
  request_id: string,

  result?: {
    // The number of other clients to whom an event was `publish`ed.
    listeners:? number,

    // The ID of a created subscription.
    subscription_id?: string
  }
}

When sending via JSON_RPC 2.0, the params of a Request are simply folded into the object itself, for simplicity's sake. In this case, a response will be sent as a notification whose name is the request_id.

In the case of Isolate clients/servers, events will be simply sent as Lists:

['<event-name>', value]

Clients can send the following (3) methods:

  • subscribe (event_name:string): Subscribe to an event.
  • unsubscribe (subscription_id:string): Unsubscribe from an event you previously subscribed to.
  • publish (event_name:string, value:any): Publish an event to all other clients who are subscribed.

The client and server in package:pub_sub/isolate.dart must make extra provisions to keep track of client ID's. Since SendPorts and ReceivePorts do not have any sort of guaranteed-unique ID's, new clients must send their SendPort to the server before sending any requests. The server then responds with an id that must be used to identify a SendPort to send a response to.

2.3.0 #

  • Allow 2.x versions of stream_channel.
  • Apply package:pedantic lints.

2.2.0 #

  • Upgrade uuid.

2.1.0 #

  • Allow for "trusted clients," which are implicitly-registered clients. This makes using package:pub_sub easier, as well making it easier to scale.

2.0.0 #

  • Dart 2 updates.

example/main.dart

import 'dart:io';
import 'dart:isolate';
import 'package:pub_sub/isolate.dart' as pub_sub;
import 'package:pub_sub/pub_sub.dart' as pub_sub;

main() async {
  // Easily bring up a server.
  var adapter = new pub_sub.IsolateAdapter();
  var server = new pub_sub.Server([adapter]);

  // You then need to create a client that will connect to the adapter.
  // Every untrusted client in your application should be pre-registered.
  //
  // In the case of Isolates, however, those are always implicitly trusted.
  for (int i = 0; i < Platform.numberOfProcessors - 1; i++) {
    server.registerClient(new pub_sub.ClientInfo('client$i'));
  }

  // Start the server.
  server.start();

  // Next, let's start isolates that interact with the server.
  //
  // Fortunately, we can send SendPorts over Isolates, so this is no hassle.
  for (int i = 0; i < Platform.numberOfProcessors - 1; i++)
    await Isolate.spawn(isolateMain, [i, adapter.receivePort.sendPort]);

  // It's possible that you're running your application in the server isolate as well:
  isolateMain([0, adapter.receivePort.sendPort]);
}

void isolateMain(List args) {
  // Isolates are always trusted, so technically we don't need to pass a client iD.
  var client =
      new pub_sub.IsolateClient('client${args[0]}', args[1] as SendPort);

  // The client will connect automatically. In the meantime, we can start subscribing to events.
  client.subscribe('user::logged_in').then((sub) {
    // The `ClientSubscription` class extends `Stream`. Hooray for asynchrony!
    sub.listen((msg) {
      print('Logged in: $msg');
    });
  });
}

Use this package as a library

1. Depend on it

Add this to your package's pubspec.yaml file:


dependencies:
  pub_sub: ^2.3.0

2. Install it

You can install packages from the command line:

with pub:


$ pub get

with Flutter:


$ flutter pub get

Alternatively, your editor might support pub get or flutter pub get. Check the docs for your editor to learn more.

3. Import it

Now in your Dart code, you can use:


import 'package:pub_sub/pub_sub.dart';
  
Popularity:
Describes how popular the package is relative to other packages. [more]
30
Health:
Code health derived from static analysis. [more]
72
Maintenance:
Reflects how tidy and up-to-date the package is. [more]
100
Overall:
Weighted score of the above. [more]
56
Learn more about scoring.

We analyzed this package on Dec 6, 2019, and provided a score, details, and suggestions below. Analysis was completed with status completed using:

  • Dart: 2.6.1
  • pana: 0.12.21

Platforms

Detected platforms: Flutter, web, other

No platform restriction found in primary library package:pub_sub/pub_sub.dart.

Health suggestions

Fix lib/src/isolate/client.dart. (-8.63 points)

Analysis of lib/src/isolate/client.dart reported 18 hints, including:

line 9 col 47: Unnecessary new keyword.

line 12 col 22: Unnecessary new keyword.

line 27 col 35: Unnecessary new keyword.

line 39 col 17: Unnecessary new keyword.

line 41 col 29: Unnecessary new keyword.

Fix lib/src/json_rpc/client.dart. (-6.78 points)

Analysis of lib/src/json_rpc/client.dart reported 14 hints, including:

line 11 col 22: Unnecessary new keyword.

line 24 col 13: Unnecessary new keyword.

line 38 col 9: DO use curly braces for all flow control structures.

line 38 col 15: Unnecessary new keyword.

line 44 col 15: Unnecessary new keyword.

Fix lib/src/protocol/server/server.dart. (-6.78 points)

Analysis of lib/src/protocol/server/server.dart reported 14 hints, including:

line 15 col 16: Unnecessary new keyword.

line 28 col 7: DO use curly braces for all flow control structures.

line 28 col 13: Unnecessary new keyword.

line 38 col 7: DO use curly braces for all flow control structures.

line 38 col 13: Unnecessary new keyword.

Fix lib/src/isolate/server.dart. (-4.89 points)

Analysis of lib/src/isolate/server.dart reported 10 hints, including:

line 10 col 7: Unnecessary new keyword.

line 12 col 7: Unnecessary new keyword.

line 14 col 7: Unnecessary new keyword.

line 15 col 22: Unnecessary new keyword.

line 18 col 35: Unnecessary new keyword.

Fix lib/src/json_rpc/server.dart. (-4.89 points)

Analysis of lib/src/json_rpc/server.dart reported 10 hints, including:

line 10 col 7: Unnecessary new keyword.

line 12 col 7: Unnecessary new keyword.

line 14 col 7: Unnecessary new keyword.

line 17 col 22: Unnecessary new keyword.

line 47 col 12: Unnecessary new keyword.

Dependencies

Package Constraint Resolved Available
Direct dependencies
Dart SDK >=2.0.0-dev <3.0.0
json_rpc_2 ^2.0.0 2.1.0
stream_channel >=1.0.0 <3.0.0 2.0.0
uuid ^2.0.0-rc.1 2.0.4
Transitive dependencies
async 2.4.0
charcode 1.1.2
collection 1.14.12
convert 2.1.1
crypto 2.1.3
path 1.6.4
stack_trace 1.9.3
typed_data 1.1.6
Dev dependencies
pedantic ^1.0.0
test ^1.0.0