prepareNextRequestToProcess method
Discover most recent unprocessed job in database convert it back to an HTTP request. This method also locks the row to make it idempotent to subsequent processing.
Implementation
Future<RequestMethod?> prepareNextRequestToProcess() async {
final db = await getDb();
final unprocessedRequests = await db.transaction<List<Map<String, dynamic>>>((txn) async {
final latestLockedRequests = await _latestRequest(txn, whereLocked: true);
if (latestLockedRequests.isNotEmpty) {
// ensure that if the request is longer the 2 minutes old it's unlocked automatically
final lastUpdated =
DateTime.fromMillisecondsSinceEpoch(latestLockedRequests.first[updateAtColumn]);
final twoMinutesAgo = DateTime.now().subtract(const Duration(minutes: 2));
if (lastUpdated.isBefore(twoMinutesAgo)) {
await RequestSqliteCache.unlockRequest(
data: latestLockedRequests.first,
db: txn,
lockedColumn: lockedColumn,
primaryKeyColumn: primaryKeyColumn,
tableName: tableName,
);
}
if (serialProcessing) return [];
}
// Find the latest unlocked request
final unlockedRequests = await _latestRequest(txn, whereLocked: false);
if (unlockedRequests.isEmpty) return [];
// lock the latest unlocked request
await RequestSqliteCache.lockRequest(
data: unlockedRequests.first,
db: txn,
lockedColumn: lockedColumn,
primaryKeyColumn: primaryKeyColumn,
tableName: tableName,
);
// return the next unlocked request (now locked)
return unlockedRequests;
});
final jobs = unprocessedRequests.map(sqliteToRequest).cast<RequestMethod>();
if (jobs.isNotEmpty) return jobs.first;
// lock the request for idempotency
return null;
}