prepareNextRequestToProcess method

Future<Request?> prepareNextRequestToProcess()
inherited

Discover most recent unprocessed job in database convert it back to an HTTP request. This method also locks the row to make it idempotent to subsequent processing.

Implementation

Future<RequestMethod?> prepareNextRequestToProcess() async {
  final db = await getDb();
  final unprocessedRequests = await db.transaction<List<Map<String, dynamic>>>((txn) async {
    final latestLockedRequests = await _latestRequest(txn, whereLocked: true);

    if (latestLockedRequests.isNotEmpty) {
      // ensure that if the request is longer the 2 minutes old it's unlocked automatically
      final lastUpdated =
          DateTime.fromMillisecondsSinceEpoch(latestLockedRequests.first[updateAtColumn]);
      final twoMinutesAgo = DateTime.now().subtract(const Duration(minutes: 2));
      if (lastUpdated.isBefore(twoMinutesAgo)) {
        await RequestSqliteCache.unlockRequest(
          data: latestLockedRequests.first,
          db: txn,
          lockedColumn: lockedColumn,
          primaryKeyColumn: primaryKeyColumn,
          tableName: tableName,
        );
      }
      if (serialProcessing) return [];
    }

    // Find the latest unlocked request
    final unlockedRequests = await _latestRequest(txn, whereLocked: false);
    if (unlockedRequests.isEmpty) return [];
    // lock the latest unlocked request
    await RequestSqliteCache.lockRequest(
      data: unlockedRequests.first,
      db: txn,
      lockedColumn: lockedColumn,
      primaryKeyColumn: primaryKeyColumn,
      tableName: tableName,
    );
    // return the next unlocked request (now locked)
    return unlockedRequests;
  });

  final jobs = unprocessedRequests.map(sqliteToRequest).cast<RequestMethod>();

  if (jobs.isNotEmpty) return jobs.first;

  // lock the request for idempotency

  return null;
}