flushIfNeeded method

Future<void> flushIfNeeded()

If necessary, pause the sending isolate (the producer) until our consumer has processed sufficient data. await x.flushIfNeeded() should be called regularly, in order to ensure the Send/Receive port buffer between the isolates doesn't get too big.

sendValue calls this method, so when it is used, there is no need to call it again.

If this object is used as a sink, this method should be called regularly. A reasonable way to do this in a subclass method would be as follows:

    final OjectThatAcceptsSink sinkUser = ...;
    while (there is work to do) {
        await flushIfNeeded();
        sinkUser.sendSomeDataTo(this);
            // sendSomeDataTo() may call this.add() multiple times
    }

NOTE: In the special case of a buffer size of zero, the await flushIfNeeded() call should be after generateSomeData() to give rendezvous semantics, that is, to pause the producer until the item has been consumed.

Implementation

Future<void> flushIfNeeded() {
  _ensureInConsumer();
  final session = _session!;
  if (session.bufferSize > 1) {
    return session.waitForAcks(1);
  } else {
    // For a buffer of 1, it must be empty before adding an element.
    // For a buffer of 0, it must be empty before letting the add complete.
    return session.waitForAcks(0);
  }
}