MessageSocket constructor
Wraps sock, immediately starts reading messages into buffer.
Automatically pings at interval autoping (not pinging if autoping is null), flagging connection as
broken if the ping fails (which takes like 30 seconds). See ping.
Implementation
MessageSocket(this.sock, {Duration? autoping = const Duration(seconds: 5)}): this.mSendLock = Lock(reentrant: true), this.mRecvLock = Lock(reentrant: true) {
var _rxChannel = Channel<Uint8List?>();
this._rxIn = _rxChannel.getIn();
var rxOut = _rxChannel.getOut();
var _recvCountChannel = Channel<int>();
var _recvCountIn = _recvCountChannel.getIn();
this._recvCountOut = _recvCountChannel.getOut();
//NEXT autoping
unawaited(Future(() async {
var sw = Stopwatch();
var sw_big1 = Stopwatch();
var sw_big2 = Stopwatch();
//CHECK I don't think this handles socket closure
List<List<int>> pending = [];
int accumulated = 0;
int? requested = null;
// I'm tempted to put a lock around the pending and accumulated code, but I don't THINK it's necessary.
sw.start();
sw_big1.start();
sw_big2.start();
bool broken = false;
if (autoping != null) {
unawaited(Future(() async {
try {
while (!broken) {
await sleep(autoping.inMilliseconds);
await ping();
}
} catch (e) {
zlog(INFO, "MS autoping failed, probably disconnected: $e");
broken = true; //CHECK Should call .close()?
}
}));
}
unawaited(Future(() async {
while (true) {
sw_big2.reset();
if (requested == null) {
requested = await _recvCountIn.read();
if (broken) { //DITTO //CHECK Should call .close()?
await rxOut.write(null);
requested = null;
continue; //THINK Maybe just return? That'd leave the request channel blocked.... //LEAK Leaves this future here, otherwise
}
zlog(DEBUG, "MS2 ${sw.lap()} rx request");
}
if (requested == -1) {
pending.clear();
accumulated = 0;
requested = null;
} else if (requested == 0) {
await rxOut.write(Uint8List(0));
requested = null;
} else if (accumulated >= requested!) {
var bb = BytesBuilder(copy: false);
while (requested! > 0 && pending[0].length <= requested!) {
var temp = pending.removeAt(0);
bb.add(temp);
requested = requested! - temp.length;
accumulated -= temp.length;
}
if (requested! > 0) {
bb.add(pending[0].sublist(0,requested!));
pending[0] = pending[0].sublist(requested!, pending[0].length);
accumulated -= requested!;
}
requested = null;
zlog(DEBUG, "MS2 ${sw.lap()} collected response");
var response = Uint8List.fromList(bb.takeBytes());
zlog(DEBUG, "MS2 ${sw.lap()} built response");
await rxOut.write(response); //MISC This seems like a lot of conversions
zlog(DEBUG, "MS2 ${sw.lap()} tx data");
} else {
await sleep(10);
if (broken) { //DITTO //CHECK Should call .close()?
await rxOut.write(null);
requested = null;
continue; //DITTO //THINK Maybe just return?
}
}
//zlog(DEBUG, "MS2 ${sw_big2.lap()} send total");
}
}));
try {
await for (var data in sock) { //THINK I'm pretty sure data will still accumulate in the Socket; I wish I could backpressure it
sw_big1.reset();
zlog(DEBUG, "MS1 ${sw.lap()} rx data");
pending.add(data);
accumulated += data.length;
zlog(DEBUG, "MS1 ${sw.lap()} added data - acc $accumulated");
zlog(DEBUG, "MS1 ${sw_big1.lap()} read total");
}
zlog(INFO, "MS1 done; connection presumably closed");
broken = true;
} catch (e) {
zlog(INFO, "MS1 error; connection presumably closed: $e");
broken = true;
await close();
}
}));
}