readFarms method

StreamSubscription<FarmMessage> readFarms({
  1. required Communicator communicator,
  2. String baseTopic = "farm",
  3. OnFarmCallback? onFarm,
})

Implementation

StreamSubscription<FarmMessage> readFarms({
  required Communicator communicator,
  String baseTopic = "farm",
  OnFarmCallback? onFarm,
}) {
  if (!baseTopic.endsWith("/")) baseTopic = "$baseTopic/";

  // process farm messages
  return communicator.messages.listen((message) {
    // validate topic
    String topic = message.topic;

    if (!topic.startsWith(baseTopic)) {
      _log.info(
          "ignoring message not on baseTopic '$baseTopic' (${message.topic})");
      return;
    }

    topic = topic.substring(baseTopic.length);
    final subtopics = topic.split("/");

    // get farm
    if (subtopics.length < 4) {
      _log.warning("received message with invalid topic (${message.topic})");
      return;
    }

    final farmId = int.tryParse(subtopics.removeAt(0));
    if (farmId == null) {
      _log.warning(
          "received message with invalid farm id (${message.topic})");
      return;
    }

    if (!farms.containsKey(farmId)) {
      farms[farmId] = Farm(name: "Farm $farmId", id: farmId);
      _events.add(farms[farmId]!.events);
    }

    // process message
    message.topic = subtopics.join("/");
    farms[farmId]!.processMessage(message);
  });
}