invoke method

  1. @override
InvokeResponse invoke(
  1. Map params,
  2. Responder responder,
  3. InvokeResponse response,
  4. Node parentNode, [
  5. int maxPermission = Permission.CONFIG,
])
override

Handles the invoke method from the internals of the responder. Use onInvoke to handle when a node is invoked.

Implementation

@override
InvokeResponse invoke(
  Map params,
  Responder responder,
  InvokeResponse response,
  Node parentNode, [
  int maxPermission = Permission.CONFIG,
]) {
  Object? rslt;
  try {
    rslt = onInvoke(params);
  } catch (e, stack) {
    var error = DSError('invokeException', msg: e.toString());
    try {
      error.detail = stack.toString();
    } catch (e) {}
    response.close(error);
    return response;
  }

  dynamic rtype = 'values';
  if (configs.containsKey(r'$result')) {
    rtype = configs[r'$result'];
  }

  if (rslt == null) {
    // Create a default result based on the result type
    if (rtype == 'values') {
      rslt = <dynamic>{};
    } else if (rtype == 'table') {
      rslt = <dynamic>[];
    } else if (rtype == 'stream') {
      rslt = <dynamic>[];
    }
  }

  if (rslt is Iterable) {
    response.updateStream(rslt.toList(), streamStatus: StreamStatus.closed);
  } else if (rslt is Map) {
    var columns = <dynamic>[];
    var out = <dynamic>[];
    for (var x in rslt.keys) {
      columns.add(<String, dynamic>{'name': x, 'type': 'dynamic'});
      out.add(rslt[x]);
    }

    response.updateStream(
      <dynamic>[out],
      columns: columns,
      streamStatus: StreamStatus.closed,
    );
  } else if (rslt is SimpleTableResult) {
    response.updateStream(
      rslt.rows!,
      columns: rslt.columns,
      streamStatus: StreamStatus.closed,
    );
  } else if (rslt is AsyncTableResult) {
    (rslt).write(response);
    response.onClose = (var response) {
      if ((rslt as AsyncTableResult).onClose != null) {
        (rslt).onClose!(response);
      }
    };
    return response;
  } else if (rslt is Table) {
    response.updateStream(
      rslt.rows,
      columns: rslt.columns,
      streamStatus: StreamStatus.closed,
    );
  } else if (rslt is Stream) {
    var r = AsyncTableResult();

    response.onClose = (var response) {
      if (r.onClose != null) {
        r.onClose!(response);
      }
    };

    var stream = rslt;

    if (rtype == 'stream') {
      StreamSubscription? sub;

      r.onClose = (_) {
        if (sub != null) {
          sub.cancel();
        }
      };

      sub = stream.listen(
        (dynamic v) {
          if (v is TableMetadata) {
            r.meta = v.meta;
            return;
          } else if (v is TableColumns) {
            r.columns = v.columns.map((x) => x.getData()).toList();
            return;
          }

          if (v is Iterable) {
            r.update(v.toList(), StreamStatus.open);
          } else if (v is Map) {
            dynamic meta;
            if (v.containsKey('__META__')) {
              meta = v['__META__'];
            }
            r.update(<dynamic>[v], StreamStatus.open, meta);
          } else {
            throw Exception('Unknown Value from Stream');
          }
        },
        onDone: () {
          r.close();
        },
        onError: (dynamic e, StackTrace stack) {
          var error = DSError('invokeException', msg: e.toString());
          try {
            error.detail = stack.toString();
          } catch (e) {}
          response.close(error);
        },
        cancelOnError: true,
      );
      r.write(response);
      return response;
    } else {
      var list = <dynamic>[];
      StreamSubscription? sub;

      r.onClose = (_) {
        if (sub != null) {
          sub.cancel();
        }
      };

      sub = stream.listen(
        (dynamic v) {
          if (v is TableMetadata) {
            r.meta = v.meta;
            return;
          } else if (v is TableColumns) {
            r.columns = v.columns.map((x) => x.getData()).toList();
            return;
          }

          if (v is Iterable) {
            list.addAll(v);
          } else if (v is Map) {
            list.add(v);
          } else {
            throw Exception('Unknown Value from Stream');
          }
        },
        onDone: () {
          r.update(list);
          r.close();
        },
        onError: (dynamic e, StackTrace stack) {
          var error = DSError('invokeException', msg: e.toString());
          try {
            error.detail = stack.toString();
          } catch (e) {}
          response.close(error);
        },
        cancelOnError: true,
      );
    }
    r.write(response);
    return response;
  } else if (rslt is Future) {
    AsyncTableResult? r = AsyncTableResult();

    response.onClose = (var response) {
      if (r?.onClose != null) {
        r?.onClose!(response);
      }
    };

    rslt
        .then((dynamic value) {
          if (value is LiveTable) {
            r = null;
            value.sendTo(response);
          } else if (value is Stream) {
            var stream = value;
            StreamSubscription? sub;

            r?.onClose = (_) {
              if (sub != null) {
                sub.cancel();
              }
            };

            sub = stream.listen(
              (dynamic v) {
                if (v is TableMetadata) {
                  r?.meta = v.meta;
                  return;
                } else if (v is TableColumns) {
                  r?.columns = v.columns.map((x) => x.getData()).toList();
                  return;
                }

                if (v is Iterable) {
                  r?.update(v.toList());
                } else if (v is Map) {
                  Map? meta;
                  if (v.containsKey('__META__')) {
                    meta = v['__META__'];
                  }
                  r?.update(<dynamic>[v], StreamStatus.open, meta);
                } else {
                  throw Exception('Unknown Value from Stream');
                }
              },
              onDone: () {
                r?.close();
              },
              onError: (dynamic e, StackTrace stack) {
                var error = DSError('invokeException', msg: e.toString());
                try {
                  error.detail = stack.toString();
                } catch (e) {}
                response.close(error);
              },
              cancelOnError: true,
            );
          } else if (value is Table) {
            var table = value;
            r?.columns = table.columns.map((x) => x.getData()).toList();
            r?.update(table.rows, StreamStatus.closed, table.meta);
            r?.close();
          } else {
            r?.update(value is Iterable ? value.toList() : <dynamic>[value]);
            r?.close();
          }
        })
        .catchError((dynamic e, StackTrace stack) {
          var error = DSError('invokeException', msg: e.toString());
          try {
            error.detail = stack.toString();
          } catch (e) {}
          response.close(error);
        });
    r?.write(response);
    return response;
  } else if (rslt is LiveTable) {
    rslt.sendTo(response);
  } else {
    response.close();
  }

  return response;
}