Features

Convert concurrent business processes into serial sequential execution of each business process.

Getting started

Install the package

Usage

Example1:

import 'package:serial_queue/serial_queue.dart';
import 'package:serial_queue/sleep.dart';

int i = 0;

void main() async {
  SerialQueue queue = SerialQueue()..startQueue();

  Future.delayed(const Duration(milliseconds: 20), () async {
    Task addTask = Task<int, List<int>>.create(taskHandler: add, params: [1, 2, 3, 4, 5, 6, 7, 8, 9]);
    queue.addTask(addTask);
    var sum = await addTask.future;
  });

  Future.delayed(const Duration(milliseconds: 20), () async {
    Task incrementTask = Task<void, String>.create(taskHandler: increment, params: 'D');
    queue.addTask(incrementTask);
    await incrementTask.future;
  });

  Future.delayed(const Duration(milliseconds: 20), () async {
    Task showTask = Task<void, String?>.create(taskHandler: show, params: 'text content');
    queue.addTask(showTask);
    await showTask.future;
  });
  await sleep(1000);
}

int add({List<int>? params})  {
  var sum = params!.reduce((value, element) => value + element);
  return sum;
}

void show({String? params}) async {
  print('$params');
  return;
}

Future<int> increment({String? params}) async {
  int a = i;
  await sleep(3);
  a++;
  i = a;
  print('正在处理 increment 任务----$params--$i');
  return i;
}

Example2:

import 'package:flutter_test/flutter_test.dart';
import 'package:serial_queue/serial_queue.dart';
import 'package:serial_queue/sleep.dart';


/// 100个人在5台atm机上存钱
void main() async {
  bool useQueue = false;
  Bank bank = Bank(name: 'ABC', useQueue: useQueue);

  var team1 = bank.persons.sublist(0, 20);
  var team2 = bank.persons.sublist(20, 40);
  var team3 = bank.persons.sublist(40, 60);
  var team4 = bank.persons.sublist(60, 80);
  var team5 = bank.persons.sublist(80, 100);

  Future.delayed(const Duration(), () async {
    for (var person in team1) {
      await bank.atms[0]?.deposit(person!.accountNo, 100);
      await sleep(30);
    }
  });
  Future.delayed(const Duration(), () async {
    for (var person in team2) {
      await bank.atms[1]?.deposit(person!.accountNo, 100);
      await sleep(2);
    }
  });
  Future.delayed(const Duration(), () async {
    for (var person in team3) {
      await bank.atms[2]?.deposit(person!.accountNo, 100);
      await sleep(2);
    }
  });
  Future.delayed(const Duration(), () async {
    for (var person in team4) {
      await bank.atms[3]?.deposit(person!.accountNo, 100);
      await sleep(2);
    }
  });
  Future.delayed(const Duration(), () async {
    for (var person in team5) {
      await bank.atms[4]?.deposit(person!.accountNo, 100);
      await sleep(2);
    }
  });
  // Future.delayed(const Duration(milliseconds: 200), () async {
  //   bank.atms[0]!.hasError = true;
  //   await sleep(200);
  //   bank.atms[0]!.hasError = false;
  // });
  await sleep(3000);
  await bank.close();
}

///
class Person {
  Person({required this.accountNo});
  String accountNo;
  double money = 0;
  void look() {}
}

///
class ATM {
  Bank bank;
  String id;
  double money = 0;
  bool hasError = false;

  ATM({required this.bank, required this.id});

  Future<bool> deposit(String accountNo, double money) async {
    if (hasError) {
      return await bank.atmTimeout();
    }
    return await bank.atmDeposit(id, accountNo, money);
  }

  void look() {
    print('ATM id:$id   money=$money');
  }
}

///
class Bank {
  double total = 0;
  var persons = <Person?>[];
  var atms = <ATM?>[];
  String name;
  bool useQueue = false;
  late SerialQueue queue;
  int orderId = 0;

  ///
  Bank({required this.name, this.useQueue = false}) {
    queue = SerialQueue(log: true)..startQueue();
    for (int i = 0; i < 5; i++) {
      atms.add(ATM(bank: this, id: 'id-$i'));
    }
    for (int i = 0; i < 100; i++) {
      persons.add(Person(accountNo: 'N@$i'));
    }
  }

  Future<bool> atmTimeout() {
    var task = Task<bool, String>.create(
      taskHandler: ({String? params}) {
        print(params);
        return false;
      },
      params: '-----------> ATM故障!!!',
    );
    queue.addTask(task);
    return task.future.onError((error, stackTrace){
      print('$error');
      return false;
    });
  }

  ///
  Future<bool> atmDeposit(String fromAtmId, String accountNo, double money) async {
    if (useQueue) {
      var task = Task<bool, DepositInfo>.create(taskHandler: _atmDeposit, params: DepositInfo(fromAtmId: fromAtmId, accountNo: accountNo, money: money));
      queue.addTask(task);
      var r = await task.future.onError((error, stackTrace){
        print('$error');
        return false;
      });
      return r;
    } else {
      return _atmDeposit(params: DepositInfo(fromAtmId: fromAtmId, accountNo: accountNo, money: money));
    }
  }

  ///
  Future<bool> _atmDeposit({DepositInfo? params}) async {
    print('orderId:${orderId++}     fromAtmId:${params!.fromAtmId}  ${params.accountNo}  ${params.money}');
    await sleep(2);
    var atm = atms.firstWhere((atm) => atm?.id == params.fromAtmId, orElse: () => null);
    if (atm == null) {
      return false;
    }
    var person = persons.firstWhere((p) => p?.accountNo == params.accountNo, orElse: () => null);
    if (person == null) {
      return false;
    }
    double t = total;
    double atmT = atm.money;
    await sleep(2);
    person.money += params.money;
    atmT += params.money;
    t += params.money;
    await sleep(1);
    total = t;
    atm.money = atmT;
    return true;
  }

  ///
  void look() {
    print('Bank name:$name   total=$total');
    for (var atm in atms) {
      atm!.look();
    }
    for (var p in persons) {
      p!.look();
    }
  }

  Future<void> close() async {
    await queue.dispose(() {
      print('---------------------银行结算信息---------------------');
      look();
    });
  }
}

class DepositInfo {
  String fromAtmId;
  String accountNo;
  double money;

  DepositInfo({required this.fromAtmId, required this.accountNo, required this.money});
}

Libraries

serial_queue
sleep