readFarms method
StreamSubscription<FarmMessage>
readFarms({
- required Communicator communicator,
- String baseTopic = "farm",
- OnFarmCallback? onFarm,
Implementation
StreamSubscription<FarmMessage> readFarms({
required Communicator communicator,
String baseTopic = "farm",
OnFarmCallback? onFarm,
}) {
if (!baseTopic.endsWith("/")) baseTopic = "$baseTopic/";
// process farm messages
return communicator.messages.listen((message) {
// validate topic
String topic = message.topic;
if (!topic.startsWith(baseTopic)) {
_log.info(
"ignoring message not on baseTopic '$baseTopic' (${message.topic})");
return;
}
topic = topic.substring(baseTopic.length);
final subtopics = topic.split("/");
// get farm
if (subtopics.length < 4) {
_log.warning("received message with invalid topic (${message.topic})");
return;
}
final farmId = int.tryParse(subtopics.removeAt(0));
if (farmId == null) {
_log.warning(
"received message with invalid farm id (${message.topic})");
return;
}
if (!farms.containsKey(farmId)) {
farms[farmId] = Farm(name: "Farm $farmId", id: farmId);
_events.add(farms[farmId]!.events);
}
// process message
message.topic = subtopics.join("/");
farms[farmId]!.processMessage(message);
});
}