Scaibu Open-Source Initiative

This is more than just an open-source project—it's a call to action. At Scaibu, we're on a mission to create powerful, innovative tools that empower developers worldwide—but we can't do it alone. We need you. Right now.

Your contributions—whether fixing a small bug, writing new code, or enhancing documentation—are vital. Each line of code you write brings us closer to transforming the developer ecosystem and building a future where technology is open, accessible, and driven by community collaboration.

At Scaibu, we see you, and we value your work. Every effort you make is rewarded, and your contributions will never go unnoticed. Join us and become part of a community that is building something bigger than all of us.

🌟 Why Contribute?

By contributing to Scaibu, you’re not just helping us—you’re investing in a movement. Here’s what’s in it for you:

  • Earn Reward Points: For every meaningful contribution, you earn points—with 1,000 points = $1 USD.
  • Recognition & Visibility: Your name will be proudly featured on our official platform as a valued contributor.
  • Exclusive Access: As we develop new advanced developer tools, active contributors will receive premium access—before anyone else.
  • Be Part of Something Bigger: Your work directly impacts the open-source community and helps shape the future of development.

🤝 Help Us Grow

We believe in the power of community. Your support is essential in helping us grow and deliver cutting-edge tools. Whether you contribute code, identify bugs, or improve documentation—every action matters.

If you’re passionate about open-source development and want to make a real difference, now is the time to step up. We can't do this without you. Together, we will build something extraordinary.

💼 How It Works

  1. Contribute: Help us grow by fixing bugs, writing new features, or improving documentation.
  2. Earn Points: Each verified contribution earns you reward points.
  3. Redeem Rewards: Use your points to unlock premium features on the Scaibu official site (currently in development).

🚀 Get Started

  1. Fork the Repository: Click the Fork button to create a copy of this repository.

  2. Clone Your Fork:

    git clone https://github.com/your-username/any-scaibu-package
    cd scaibu-package
    
  3. Create a Branch:

    git checkout -b feature/your-feature
    
  4. Make Changes: Implement your code or fix issues.

  5. Commit and Push:

    git add .
    git commit -m "feat: add new feature"
    git push origin feature/your-feature
    
  6. Open a Pull Request (PR): Submit a PR to the main branch for review.

📞 Contact Us

If you have questions or want to collaborate directly, feel free to reach out. We need your voice, your skills, and your passion. Together, we can make this happen.

📞 Contact Me

I’m personally invested in this journey—and I’m here to support you. If you’re ready to make a difference, reach out to me directly. Let’s build something incredible together.


Jaydeep Wagh
Founder, Scaibu

Core Concepts

What is a Mutex?

A mutex (mutual exclusion) is a synchronization primitive that grants exclusive access to a shared resource or critical section of code. When a task acquires a mutex, all other tasks attempting to acquire the same mutex are blocked until the first task releases it. This ensures that shared resources are accessed by only one task at a time, preventing potential race conditions.

In Dart's asynchronous environment, mutexes are particularly valuable because they allow you to create "islands of synchronization" within otherwise concurrent code, enabling safe manipulation of shared state.

When to Use Mutexes

Mutexes are essential in several scenarios:

  1. Shared Resource Access: When multiple asynchronous operations need to access or modify the same resource (database connection, file, shared memory).

  2. Critical Sections: When certain operations must be executed atomically without interruption.

  3. State Management: When maintaining consistent application state across asynchronous operations.

  4. Rate Limiting: When controlling the rate of operations like API calls to prevent overwhelming external services.

  5. Ordered Execution: When operations must be executed in a specific order despite being launched concurrently.

Core Features

Simple Mutex Implementation

The basic SimpleMutex provides straightforward exclusive access control to resources. This implementation forms the foundation of the library's concurrency control mechanisms.

import 'package:mutex_library/mutex.dart';

// Create a mutex to protect a specific resource
final SimpleMutex mutex = SimpleMutex('userProfileResource');

// Check mutex status
print('Mutex Name: ${mutex.name}');
print('Is Available: ${mutex.isUnused}');
print('Current Queue Length: ${mutex.queueLength}');

// Try to acquire without blocking
if (mutex.tryLock()) {
try {
// Critical section - exclusive access guaranteed
UserProfile profile = await fetchUserProfile(userId);
profile.lastLoginDate = DateTime.now();
await saveUserProfile(profile);
print('Profile updated successfully');
} finally {
// Always release the lock when done to prevent deadlocks
mutex.unlock();
print('Mutex released');
}
} else {
// Handle case when lock is unavailable
print('Resource busy - profile update queued for later');
scheduleRetry(() => updateUserProfile(userId));
}

This example demonstrates how to create and use a basic mutex. The SimpleMutex class provides fundamental operations:

  • Creating a named mutex to represent a specific resource
  • Checking if the mutex is currently available
  • Attempting to acquire the lock without blocking
  • Properly releasing the lock using a try-finally pattern

Mutex Service for Global Access

The MutexService provides a centralized way to access mutexes by name, ensuring that the same mutex instance is used across different parts of your application:

import 'package:mutex_library/mutex_service.dart';

// Get a mutex from the service (creates it if it doesn't exist)
final IMutex mutex = MutexService().getMutex('shared-resource');

// You can now use this mutex as needed
await mutex.lock();
try {
// Critical section
} finally {
mutex.unlock();
}

// Later in another part of your application
final IMutex sameMutex = MutexService().getMutex('shared-resource');
// This will return the same mutex instance, ensuring proper synchronization

The MutexService is particularly useful in large applications where multiple components need to access the same resources, as it ensures consistent mutex usage across the entire application.

Locking Patterns

The library supports multiple locking approaches to suit different scenarios:

1. Manual Lock/Unlock

For fine-grained control over lock acquisition and release, allowing you to manage exactly when the lock is acquired and released:

Future<void> updateConfiguration(Config config) async {
   final configMutex = MutexService().getMutex('app-configuration');

   print('Waiting to acquire configuration lock...');
   await configMutex.lock(); // This will wait until the lock is available
   print('Lock acquired, updating configuration');

   try {
      // Critical section operations - only one thread can be here at a time
      final currentConfig = await loadConfigFromDisk();
      currentConfig.merge(config);
      await validateConfiguration(currentConfig); // Might throw if invalid
      await writeConfigToDisk(currentConfig);
      await notifyConfigListeners();
      print('Configuration updated successfully');
   } catch (e) {
      print('Configuration update failed: $e');
      // Handle error - perhaps reverting to previous configuration
      await restorePreviousConfig();
   } finally {
      // Ensure lock is always released even if exceptions occur
      configMutex.unlock();
      print('Configuration lock released');
   }
}

This pattern gives you complete control over when the lock is acquired and released, which is useful when you need to perform setup or cleanup operations outside the locked section. The try-finally pattern ensures the lock is always released, even if an exception occurs.

2. Automatic Protection

For cleaner code with automatic lock management, the protect method handles locking and unlocking automatically:

Future<PaymentResult> processPayment(Payment payment) async {
   final paymentMutex = MutexService().getMutex('payment-processor');

   // The protect method automatically acquires and releases the lock
   final result = await paymentMutex.protect(() async {
      print('Payment processing started for order ${payment.orderId}');

      // These operations will be protected by the mutex
      final account = await getAccount(payment.accountId);

      // Verify sufficient funds
      if (account.balance < payment.amount) {
         throw InsufficientFundsException('Not enough funds to process payment');
      }

      // Deduct payment amount
      account.balance -= payment.amount;
      await saveAccount(account);

      // Record transaction
      final transaction = Transaction(
              id: generateTransactionId(),
              accountId: payment.accountId,
              amount: payment.amount,
              timestamp: DateTime.now(),
              type: TransactionType.payment
      );
      await recordTransaction(transaction);

      print('Payment processed successfully');
      return PaymentResult(
              success: true,
              transactionId: transaction.id,
              timestamp: transaction.timestamp
      );
   });

   return result;
}

The protect method is more concise and less error-prone since it guarantees the lock will be released, even if an exception occurs during the protected operation. This is the recommended approach for most use cases.

3. Non-blocking Lock Acquisition

For scenarios where waiting is undesirable, such as UI operations that shouldn't freeze the interface:

Future<void> refreshUserInterface() async {
   final uiMutex = MutexService().getMutex('ui-refresh');

   // Try to acquire the lock without blocking
   if (uiMutex.tryLock()) {
      try {
         print('Starting UI refresh');
         // Update UI components
         await loadDashboardData();
         await updateWidgets();
         await refreshVisuals();
         print('UI refresh complete');
      } finally {
         uiMutex.unlock();
         print('UI refresh lock released');
      }
   } else {
      // UI refresh is already in progress, skip this request
      print('UI refresh already in progress, skipping duplicate request');

      // Optionally, we could show a "refresh in progress" indicator to the user
      showRefreshInProgressIndicator();
   }
}

The tryLock method is useful when you want to attempt an operation only if the resource is immediately available. If the lock is already held, you can take alternative actions rather than waiting.

4. Timeout-based Locking

To prevent indefinite waiting, which could lead to poor user experience or blocked resources:

Future<void> synchronizeWithServer() async {
   final syncMutex = MutexService().getMutex('server-sync');

   print('Attempting to start server synchronization...');

   // Try to acquire the lock with a timeout
   final acquired = await syncMutex.lockWithTimeout(Duration(seconds: 5));

   if (acquired) {
      try {
         print('Starting server synchronization');
         // Perform synchronization operations
         await uploadPendingChanges();
         await downloadServerUpdates();
         await reconcileConflicts();
         print('Synchronization complete');
      } catch (e) {
         print('Synchronization error: $e');
      } finally {
         syncMutex.unlock();
         print('Synchronization lock released');
      }
   } else {
      // Lock acquisition timed out
      print('Could not acquire sync lock within timeout period');
      print('Another synchronization is likely in progress');

      // Notify user and possibly schedule retry
      notifyUser('Synchronization delayed - will try again automatically');
      scheduleRetry(() => synchronizeWithServer(), delay: Duration(minutes: 2));
   }
}

The lockWithTimeout method allows you to set a maximum waiting time, after which the attempt to acquire the lock will fail rather than waiting indefinitely. This is essential for responsive applications where operations should not block for too long.

Advanced Concurrency Features

Read-Write Lock

A read-write lock permits multiple simultaneous read operations while ensuring exclusive write access. This pattern is particularly useful for resources that are read frequently but modified infrequently, such as configuration data or cached values.

import 'package:mutex_library/read_write_lock.dart';

class CacheManager {
   final Map<String, dynamic> _cache = {};
   final ReadWriteLock _lock = ReadWriteLock('cache-manager');

   // Multiple readers can access the cache simultaneously
   Future<T?> get<T>(String key) async {
      return await _lock.read(() async {
         print('Reading from cache: $key');
         await Future.delayed(Duration(milliseconds: 20)); // Simulate read time
         final value = _cache[key];
         print('Cache read complete for $key: ${value != null ? "Hit" : "Miss"}');
         return value as T?;
      });
   }

   // Writers get exclusive access - blocks all readers while writing
   Future<void> set<T>(String key, T value) async {
      await _lock.write(() async {
         print('Writing to cache: $key');
         await Future.delayed(Duration(milliseconds: 50)); // Simulate write time
         _cache[key] = value;
         print('Cache write complete for $key');
      });
   }

   // Example of a complex operation that updates multiple entries
   Future<void> batchUpdate(Map<String, dynamic> entries) async {
      await _lock.write(() async {
         print('Starting batch update of ${entries.length} entries');

         for (final entry in entries.entries) {
            _cache[entry.key] = entry.value;
            // Simulate processing time for each entry
            await Future.delayed(Duration(milliseconds: 10));
         }

         print('Batch update complete');
      });
   }
}

// Usage example
Future<void> demonstrateReadWriteLock() async {
   final cache = CacheManager();

   // Populate cache with initial values
   await cache.set('user_1', {'name': 'Alice', 'age': 30});
   await cache.set('user_2', {'name': 'Bob', 'age': 25});

   // Execute concurrent operations
   await Future.wait([
      // Multiple reads can happen in parallel
      cache.get<Map>('user_1'),
      cache.get<Map>('user_2'),
      cache.get<Map>('user_3'), // Cache miss

      // Write operations get exclusive access
      cache.set('user_3', {'name': 'Charlie', 'age': 35}),

      // More read operations
      cache.get<Map>('user_1'),

      // Batch update (exclusive access)
      cache.batchUpdate({
         'user_4': {'name': 'David', 'age': 40},
         'user_5': {'name': 'Eva', 'age': 22}
      })
   ]);

   print('All cache operations completed');
}

The read-write lock pattern significantly improves performance by allowing concurrent read access while still providing exclusive write access when needed. In the example above, multiple cache reads can execute simultaneously, but writes will temporarily block all reads to ensure consistency.

Hierarchical Locking

Hierarchical locking ensures structured access to nested resources, preventing deadlocks and maintaining consistency. This is particularly useful when working with complex data structures like nested documents or tree-like data. I'll continue the documentation from where it left off and add more details about the Mutex Library for Dart/Flutter.

  Future<void> recalculateDocumentStatistics(String docId) async {
   await Future.delayed(Duration(milliseconds: 200));
}

Future<void> updateDocumentVersion(String docId) async {
   await Future.delayed(Duration(milliseconds: 50));
}

Future<List<String>> getDocumentSections(String docId) async {
   await Future.delayed(Duration(milliseconds: 100));
   return ['section-1', 'section-2', 'section-3'];
}

Future<void> validateSectionContent(String docId, String sectionId) async {
   await Future.delayed(Duration(milliseconds: 150));
}

Future<void> markSectionAsPublished(String docId, String sectionId) async {
   await Future.delayed(Duration(milliseconds: 100));
}

Future<void> markDocumentAsPublished(String docId) async {
   await Future.delayed(Duration(milliseconds: 150));
}

Future<void> notifySubscribers(String docId, String event) async {
   await Future.delayed(Duration(milliseconds: 200));
}
}

Hierarchical locking ensures that multiple operations accessing nested resources don't conflict with each other. In the example, document-level locks are acquired before section-level locks, establishing a consistent locking order that prevents deadlocks while maintaining data integrity.

Distributed Locking

The library extends beyond local process locking to distributed locking across multiple application instances. This is essential for applications running in clustered environments, microservices architectures, or distributed systems.

import 'package:mutex_library/distributed_mutex.dart';

class DistributedJobProcessor {
   final DistributedMutex _mutex;

   DistributedJobProcessor({required String redisUrl}) :
              _mutex = DistributedMutex(
                      key: 'batch-job-processor',
                      lockTimeoutSeconds: 300,  // 5 minutes
                      redisConnectionString: redisUrl
              );

   Future<void> processBatchJob(String batchId) async {
      print('Attempting to process batch $batchId');

      final lockAcquired = await _mutex.tryLock();

      if (!lockAcquired) {
         print('Could not acquire distributed lock - batch $batchId is likely being processed by another instance');
         return;
      }

      try {
         print('Lock acquired, processing batch $batchId');

         // Fetch batch data
         final batchItems = await fetchBatchItems(batchId);
         print('Processing ${batchItems.length} items in batch $batchId');

         // Process each item
         final results = <String, String>{};
         for (final item in batchItems) {
            try {
               await processItem(item);
               results[item.id] = 'success';
            } catch (e) {
               results[item.id] = 'error: $e';
               print('Error processing item ${item.id}: $e');
            }

            // Keep the lock alive during long-running operations
            await _mutex.extendLock();
         }

         // Record batch completion
         await markBatchComplete(batchId, results);
         print('Batch $batchId processing complete');

      } catch (e) {
         print('Error during batch processing: $e');
         await markBatchFailed(batchId, e.toString());
      } finally {
         // Release the distributed lock
         await _mutex.unlock();
         print('Distributed lock released');
      }
   }

   // Simulated batch processing methods
   Future<List<BatchItem>> fetchBatchItems(String batchId) async {
      await Future.delayed(Duration(milliseconds: 300));
      return List.generate(10, (i) => BatchItem(id: 'item-$i', data: 'Sample data $i'));
   }

   Future<void> processItem(BatchItem item) async {
      await Future.delayed(Duration(milliseconds: 200));
      print('Processed item ${item.id}');
   }

   Future<void> markBatchComplete(String batchId, Map<String, String> results) async {
      await Future.delayed(Duration(milliseconds: 250));
   }

   Future<void> markBatchFailed(String batchId, String error) async {
      await Future.delayed(Duration(milliseconds: 150));
   }
}

class BatchItem {
   final String id;
   final String data;

   BatchItem({required this.id, required this.data});
}

Distributed locking is particularly useful in cloud-based applications where multiple instances might need to coordinate resource access. The implementation uses Redis as a centralized lock manager, ensuring that only one application instance can process a specific batch job at a time.

Timed Locking & Deadlock Detection

The library includes sophisticated deadlock detection and prevention mechanisms to ensure your application doesn't freeze due to unresolved lock contention.

import 'package:mutex_library/mutex_service.dart';
import 'package:mutex_library/deadlock_detector.dart';

class BankingSystem {
   final MutexService _mutexService = MutexService();
   final DeadlockDetector _deadlockDetector = DeadlockDetector();

   Future<void> transferFunds(String fromAccount, String toAccount, double amount) async {
      print('Preparing to transfer $amount from $fromAccount to $toAccount');

      // Register this operation with the deadlock detector
      final operationId = _deadlockDetector.registerOperation('transfer-$fromAccount-$toAccount');

      try {
         // Acquire lock for source account with timeout and deadlock detection
         final sourceMutex = _mutexService.getMutex('account-$fromAccount');
         final lockAcquired = await sourceMutex.lockWithTimeout(
                 Duration(seconds: 5),
                 onWaiting: () => _deadlockDetector.registerWaiting(operationId, 'account-$fromAccount')
         );

         if (!lockAcquired) {
            throw TimeoutException('Could not acquire lock for account $fromAccount within timeout');
         }

         _deadlockDetector.registerAcquired(operationId, 'account-$fromAccount');

         try {
            print('Lock acquired for source account $fromAccount');

            // Verify sufficient funds
            final sourceBalance = await getAccountBalance(fromAccount);
            if (sourceBalance < amount) {
               throw InsufficientFundsException('Insufficient funds in account $fromAccount');
            }

            // Acquire lock for destination account
            final destMutex = _mutexService.getMutex('account-$toAccount');
            final destLockAcquired = await destMutex.lockWithTimeout(
                    Duration(seconds: 5),
                    onWaiting: () => _deadlockDetector.registerWaiting(operationId, 'account-$toAccount')
            );

            if (!destLockAcquired) {
               throw TimeoutException('Could not acquire lock for account $toAccount within timeout');
            }

            _deadlockDetector.registerAcquired(operationId, 'account-$toAccount');

            try {
               print('Lock acquired for destination account $toAccount');

               // Perform the transfer
               await updateAccountBalance(fromAccount, -amount);
               await updateAccountBalance(toAccount, amount);

               // Record the transaction
               await recordTransaction(fromAccount, toAccount, amount);

               print('Transfer complete: $amount from $fromAccount to $toAccount');
            } finally {
               // Release destination account lock
               destMutex.unlock();
               _deadlockDetector.registerReleased(operationId, 'account-$toAccount');
               print('Destination account lock released');
            }
         } finally {
            // Release source account lock
            sourceMutex.unlock();
            _deadlockDetector.registerReleased(operationId, 'account-$fromAccount');
            print('Source account lock released');
         }
      } finally {
         // Operation complete
         _deadlockDetector.unregisterOperation(operationId);
      }
   }

   // Smart transfer that automatically determines lock acquisition order
   // to prevent deadlocks
   Future<void> smartTransferFunds(String account1, String account2, double amount) async {
      // Always acquire locks in a consistent order to prevent deadlocks
      final orderedAccounts = [account1, account2]..sort();

      final firstAccount = orderedAccounts[0];
      final secondAccount = orderedAccounts[1];

      print('Smart transfer using ordered locking: $firstAccount, then $secondAccount');

      // Acquire first account lock
      final firstMutex = _mutexService.getMutex('account-$firstAccount');
      await firstMutex.lock();

      try {
         print('Lock acquired for first account $firstAccount');

         // Acquire second account lock
         final secondMutex = _mutexService.getMutex('account-$secondAccount');
         await secondMutex.lock();

         try {
            print('Lock acquired for second account $secondAccount');

            // Determine transfer direction
            if (firstAccount == account1) {
               // Transfer from account1 to account2
               await doTransfer(account1, account2, amount);
            } else {
               // Transfer from account2 to account1
               await doTransfer(account2, account1, amount);
            }
         } finally {
            // Release second account lock
            secondMutex.unlock();
            print('Second account lock released');
         }
      } finally {
         // Release first account lock
         firstMutex.unlock();
         print('First account lock released');
      }
   }

   // Helper method to perform the actual transfer
   Future<void> doTransfer(String from, String to, double amount) async {
      final sourceBalance = await getAccountBalance(from);
      if (sourceBalance < amount) {
         throw InsufficientFundsException('Insufficient funds in account $from');
      }

      await updateAccountBalance(from, -amount);
      await updateAccountBalance(to, amount);
      await recordTransaction(from, to, amount);

      print('Transfer complete: $amount from $from to $to');
   }

   // Simulated banking operations
   Future<double> getAccountBalance(String accountId) async {
      await Future.delayed(Duration(milliseconds: 100));
      return 1000.0; // Simulated balance
   }

   Future<void> updateAccountBalance(String accountId, double change) async {
      await Future.delayed(Duration(milliseconds: 150));
   }

   Future<void> recordTransaction(String fromAccount, String toAccount, double amount) async {
      await Future.delayed(Duration(milliseconds: 200));
   }
}

class InsufficientFundsException implements Exception {
   final String message;
   InsufficientFundsException(this.message);
   @override
   String toString() => 'InsufficientFundsException: $message';
}

class TimeoutException implements Exception {
   final String message;
   TimeoutException(this.message);
   @override
   String toString() => 'TimeoutException: $message';
}

The deadlock detection system monitors lock acquisition patterns and can detect potential deadlocks before they occur. The smartTransferFunds method demonstrates a common technique to prevent deadlocks: always acquiring locks in a consistent order (in this case, alphabetical order of account IDs).

Performance Considerations

Benchmarking Different Mutex Implementations

To help you choose the right mutex implementation for your specific use case, the library includes a benchmarking tool that compares different implementations under various workloads:

import 'package:mutex_library/benchmark.dart';

void main() async {
   final benchmark = MutexBenchmark();

   print('Running mutex performance benchmarks...');

   // Run benchmarks with different configurations
   await benchmark.runBenchmark(
           name: 'Light Contention',
           mutexCount: 10,
           operationCount: 1000,
           concurrentOperations: 5,
           operationDurationMs: 5
   );

   await benchmark.runBenchmark(
           name: 'Heavy Contention',
           mutexCount: 3,
           operationCount: 1000,
           concurrentOperations: 20,
           operationDurationMs: 10
   );

   await benchmark.runBenchmark(
           name: 'Long Operations',
           mutexCount: 5,
           operationCount: 100,
           concurrentOperations: 10,
           operationDurationMs: 50
   );

   print('Benchmark results:');
   benchmark.printResults();
}

This benchmarking tool helps you understand the performance characteristics of different mutex implementations under various conditions, allowing you to make informed decisions about which implementation to use in your application.

Memory Usage and Cleanup

The library includes mechanisms to manage memory usage and clean up unused mutexes:

import 'package:mutex_library/mutex_service.dart';

Future<void> monitorMutexUsage() async {
   final service = MutexService();

   // Get statistics on mutex usage
   final stats = service.getStatistics();
   print('Active mutexes: ${stats.activeMutexCount}');
   print('Total lock acquisitions: ${stats.totalLockAcquisitions}');
   print('Total lock timeouts: ${stats.totalLockTimeouts}');
   print('Average lock wait time: ${stats.averageLockWaitTimeMs}ms');

   // Clean up unused mutexes
   final removed = service.cleanupUnusedMutexes(Duration(minutes: 30));
   print('Removed $removed unused mutexes');

   // Monitor current contention
   final contendedMutexes = service.getHighContentionMutexes();
   for (final mutex in contendedMutexes) {
      print('High contention mutex: ${mutex.name}, queue length: ${mutex.queueLength}');
   }
}

Memory management is crucial, especially in long-running applications. The cleanup mechanism automatically removes mutexes that haven't been used for a specified period, preventing memory leaks.

Integration with Flutter

State Management

The mutex library integrates seamlessly with Flutter state management solutions like Provider and Riverpod:

import 'package:flutter/material.dart';
import 'package:provider/provider.dart';
import 'package:mutex_library/mutex.dart';

class UserRepository extends ChangeNotifier {
   final SimpleMutex _mutex = SimpleMutex('user-repository');
   User? _currentUser;

   User? get currentUser => _currentUser;

   Future<void> fetchUser(String userId) async {
      await _mutex.protect(() async {
         // Fetch user data from API
         print('Fetching user $userId');
         final userData = await _fetchUserFromApi(userId);

         // Update local state
         _currentUser = userData;
         notifyListeners();

         // Cache user data
         await _cacheUserData(userData);
      });
   }

   Future<void> updateUserProfile(Map<String, dynamic> updates) async {
      await _mutex.protect(() async {
         if (_currentUser == null) {
            throw StateError('No user is currently logged in');
         }

         // Update user profile
         print('Updating user profile for ${_currentUser!.id}');
         final updatedUser = await _updateUserProfileApi(_currentUser!.id, updates);

         // Update local state
         _currentUser = updatedUser;
         notifyListeners();

         // Update cache
         await _cacheUserData(updatedUser);
      });
   }

   Future<User> _fetchUserFromApi(String userId) async {
      await Future.delayed(Duration(milliseconds: 500));
      return User(id: userId, name: 'Test User', email: 'test@example.com');
   }

   Future<User> _updateUserProfileApi(String userId, Map<String, dynamic> updates) async {
      await Future.delayed(Duration(milliseconds: 700));
      return User(
              id: userId,
              name: updates['name'] ?? 'Test User',
              email: updates['email'] ?? 'test@example.com'
      );
   }

   Future<void> _cacheUserData(User user) async {
      await Future.delayed(Duration(milliseconds: 200));
      print('User data cached: ${user.id}');
   }
}

class User {
   final String id;
   final String name;
   final String email;

   User({required this.id, required this.name, required this.email});
}

// Usage in a Flutter widget
class UserProfileWidget extends StatelessWidget {
   final String userId;

   const UserProfileWidget({Key? key, required this.userId}) : super(key: key);

   @override
   Widget build(BuildContext context) {
      return Consumer<UserRepository>(
              builder: (context, userRepo, child) {
                 return FutureBuilder(
                         future: userRepo.fetchUser(userId),
                         builder: (context, snapshot) {
                            if (snapshot.connectionState == ConnectionState.waiting) {
                               return CircularProgressIndicator();
                            }

                            if (snapshot.hasError) {
                               return Text('Error: ${snapshot.error}');
                            }

                            final user = userRepo.currentUser;
                            if (user == null) {
                               return Text('User not found');
                            }

                            return Column(
                               children: [
                                  Text('User Profile: ${user.name}'),
                                  Text('Email: ${user.email}'),
                                  ElevatedButton(
                                          onPressed: () async {
                                             await userRepo.updateUserProfile({
                                                'name': 'Updated Name'
                                             });
                                          },
                                          child: Text('Update Name')
                                  )
                               ],
                            );
                         }
                 );
              }
      );
   }
}

Background Processing

The mutex library is particularly useful for managing background processes in Flutter applications:

import 'package:flutter/material.dart';
import 'package:mutex_library/mutex.dart';
import 'package:workmanager/workmanager.dart';

class BackgroundSyncManager {
   static const String SYNC_TASK = 'background-sync-task';
   final SimpleMutex _mutex = SimpleMutex('background-sync');

   Future<void> initialize() async {
      await Workmanager().initialize(
              callbackDispatcher,
              isInDebugMode: true
      );

      // Register periodic background sync task
      await Workmanager().registerPeriodicTask(
              'unique-sync-task-id',
              SYNC_TASK,
              frequency: Duration(hours: 1),
              constraints: Constraints(
                      networkType: NetworkType.connected,
                      requiresBatteryNotLow: true
              )
      );
   }

   Future<void> runSync() async {
      print('Background sync requested');

      // Try to acquire the mutex with a timeout
      final acquired = await _mutex.lockWithTimeout(Duration(minutes: 5));

      if (!acquired) {
         print('Another sync is already in progress, skipping this one');
         return;
      }

      try {
         print('Starting background sync');

         // Perform sync operations
         await _syncUserData();
         await _syncAppSettings();
         await _syncOfflineChanges();

         print('Background sync completed successfully');
      } catch (e) {
         print('Background sync failed: $e');
      } finally {
         _mutex.unlock();
         print('Background sync mutex released');
      }
   }

   // Mock sync operations
   Future<void> _syncUserData() async {
      await Future.delayed(Duration(seconds: 2));
      print('User data synchronized');
   }

   Future<void> _syncAppSettings() async {
      await Future.delayed(Duration(seconds: 1));
      print('App settings synchronized');
   }

   Future<void> _syncOfflineChanges() async {
      await Future.delayed(Duration(seconds: 3));
      print('Offline changes synchronized');
   }
}

// Setup background callback
void callbackDispatcher() {
   Workmanager().executeTask((task, inputData) async {
      print('Background task started: $task');

      if (task == BackgroundSyncManager.SYNC_TASK) {
         final syncManager = BackgroundSyncManager();
         await syncManager.runSync();
      }

      return true;
   });
}

This integration ensures that multiple background tasks don't interfere with each other, preventing race conditions and maintaining data consistency even when the app is running in the background.

Advanced Usage Patterns

Adaptive Mutex with Exponential Backoff

For systems under high load, an adaptive mutex with exponential backoff can help manage contention:

import 'package:mutex_library/advanced_mutex.dart';

class ApiRateLimiter {
   final AdaptiveMutex _mutex = AdaptiveMutex(
           name: 'api-rate-limiter',
           initialBackoffMs: 50,
           maxBackoffMs: 5000,
           backoffFactor: 2.0
   );

   Future<T> makeApiCall<T>(Future<T> Function() apiCall) async {
      int attempts = 0;
      const maxAttempts = 5;

      while (attempts < maxAttempts) {
         attempts++;

         try {
            // Try to acquire the mutex with adaptive backoff
            final acquired = await _mutex.acquire();

            if (!acquired) {
               print('API rate limit exceeded, backing off (attempt $attempts)');
               continue;
            }

            try {
               // Execute the API call
               return await apiCall();
            } finally {
               // Release the mutex, indicating success
               _mutex.release(success: true);
            }
         } catch (e) {
            // Release the mutex, indicating failure
            _mutex.release(success: false);

            if (attempts >= maxAttempts) {
               print('Maximum retry attempts reached, giving up');
               rethrow;
            }

            print('API call failed, retrying: $e');
         }
      }

      throw Exception('API call failed after multiple attempts');
   }
}

// Usage example
Future<void> demonstrateAdaptiveMutex() async {
   final rateLimiter = ApiRateLimiter();

   // Make multiple API calls concurrently
   await Future.wait([
      rateLimiter.makeApiCall(() => fetchUserData('user-1')),
      rateLimiter.makeApiCall(() => fetchUserData('user-2')),
      rateLimiter.makeApiCall(() => fetchUserData('user-3')),
      rateLimiter.makeApiCall(() => fetchUserData('user-4')),
      rateLimiter.makeApiCall(() => fetchUserData('user-5')),
   ]);

   print('All API calls completed');
}

Future<Map<String, dynamic>> fetchUserData(String userId) async {
   await Future.delayed(Duration(milliseconds: 300));
   return {'id': userId, 'name': 'User $userId'};
}

The adaptive mutex automatically adjusts its backoff strategy based on the success or failure of operations, helping to manage API rate limits effectively.

Mutex with Priority Queuing

For scenarios where certain operations should take precedence over others:

import 'package:mutex_library/priority_mutex.dart';

enum OperationPriority {
   low,
   medium,
   high,
   critical
}

class EmergencySystem {
   final PriorityMutex _mutex = PriorityMutex('emergency-system');

   Future<void> handleEmergency(String emergencyId, OperationPriority priority) async {
      print('Emergency $emergencyId received with priority: $priority');

      // Acquire the mutex with the specified priority
      await _mutex.lock(priority: priority);

      try {
         print('Handling emergency $emergencyId');

         // Perform emergency response operations
         await _dispatchFirstResponders(emergencyId);
         await _notifyAuthorities(emergencyId);
         await _logEmergencyEvent(emergencyId);

         print('Emergency $emergencyId handled successfully');
      } finally {
         _mutex.unlock();
         print('Emergency system mutex released');
      }
   }

   // Mock emergency operations
   Future<void> _dispatchFirstResponders(String emergencyId) async {
      await Future.delayed(Duration(milliseconds: 500));
      print('First responders dispatched for emergency $emergencyId');
   }

   Future<void> _notifyAuthorities(String emergencyId) async {
      await Future.delayed(Duration(milliseconds: 300));
      print('Authorities notified about emergency $emergencyId');
   }

   Future<void> _logEmergencyEvent(String emergencyId) async {
      await Future.delayed(Duration(milliseconds: 200));
      print('Emergency event $emergencyId logged');
   }
}

// Usage example
Future<void> demonstratePriorityMutex() async {
   final emergencySystem = EmergencySystem();

   // Create a mix of emergency events with different priorities
   await Future.wait([
      emergencySystem.handleEmergency('minor-flood-1', OperationPriority.low),
      emergencySystem.handleEmergency('traffic-accident-1', OperationPriority.medium),
      emergencySystem.handleEmergency('building-fire-1', OperationPriority.high),
      emergencySystem.handleEmergency('earthquake-1', OperationPriority.critical),
      emergencySystem.handleEmergency('minor-flood-2', OperationPriority.low),
      emergencySystem.handleEmergency('traffic-accident-2', OperationPriority.medium),
   ]);

   print('All emergencies handled');
}

The priority mutex ensures that critical operations are processed before less important ones, regardless of their order in the queue.

Best Practices

Mutex Naming Conventions

Consistent naming conventions help maintain clarity and prevent errors:

// Good practice: Use hierarchical naming with clear resource identification
final userProfileMutex = SimpleMutex('resource:user:profile:${userId}');
final documentMutex = SimpleMutex('resource:document:${docId}');
final systemConfigMutex = SimpleMutex('resource:config:system');

// Good practice: Use namespaces to group related mutexes
final dbConnectionMutex = SimpleMutex('db:connection:primary');
final apiRateLimitMutex = SimpleMutex('api:rate-limit:users');
final fileSystemMutex = SimpleMutex('fs:documents:${directoryPath}');

Error Handling and Recovery

Proper error handling ensures that mutexes are always released, even in failure scenarios:

Future<void> processUserData(String userId) async {
   final mutex = SimpleMutex('user-data-${userId}');

   try {
      await mutex.lock();

      try {
         // Process user data
         await fetchAndProcessUserData(userId);
      } catch (e) {
         // Handle processing errors
         print('Error processing user data: $e');
         await logError('user-data-processing', e, userId);

         // Attempt recovery
         if (e is NetworkTimeoutException) {
            await retryWithBackoff(() => fetchAndProcessUserData(userId));
         } else if (e is ValidationException) {
            await notifyUserOfValidationIssue(userId, e);
         } else {
            // Unrecoverable error
            await markUserForManualReview(userId);
         }
      } finally {
         // Always release the mutex
         mutex.unlock();
      }
   } catch (e) {
      // Handle lock acquisition errors
      print('Could not acquire user data lock: $e');
      await scheduleRetry(() => processUserData(userId));
   }
}

Monitoring and Logging

Comprehensive monitoring helps identify bottlenecks and performance issues:

import 'package:mutex_library/mutex_monitor.dart';

void configureMutexMonitoring() {
   final monitor = MutexMonitor();

   // Configure monitoring
   monitor.configure(
           logLockAcquisitions: true,
           logLockReleases: true,
           logTimeouts: true,
           logHighContentionEvents: true,
           contentionThreshold: 5,
           samplingRatePercent: 10,
           detailedLogging: true
   );

   // Register callbacks
   monitor.onHighContention((mutex, queueLength, waitTime) {
      print('HIGH CONTENTION: ${mutex.name}, queue: $queueLength, wait: ${waitTime}ms');
      logAnalyticsEvent('mutex_high_contention', {
         'mutex_name': mutex.name,
         'queue_length': queueLength,
         'wait_time_ms': waitTime
      });
   });

   monitor.onDeadlockDetected((mutexes) {
      print('DEADLOCK DETECTED: ${mutexes.map((m) => m.name).join(', ')}');
      logAnalyticsEvent('mutex_deadlock_detected', {
         'mutex_names': mutexes.map((m) => m.name).join(', ')
      });
      sendAlertToDevTeam('Deadlock detected in production');
   });

   // Start monitoring
   monitor.start();
}

// Mock analytics methods
void logAnalyticsEvent(String eventName, Map<String, dynamic> properties) {
   print('Analytics event: $eventName, properties: $properties');
}

void sendAlertToDevTeam(String message) {
   print('ALERT: $message');
}

Conclusion

The Mutex Library for Dart/Flutter provides a comprehensive solution for managing concurrency in your applications. By implementing proper synchronization mechanisms, you can prevent race conditions, ensure data consistency, and build more reliable applications.

Key takeaways:

  1. Mutexes are essential for protecting shared resources in concurrent environments.
  2. The library offers a variety of mutex implementations to suit different use cases.
  3. Advanced features like read-write locks, hierarchical locking, and distributed locking provide solutions for complex concurrency scenarios.
  4. Integration with Flutter state management and background processing ensures seamless use in mobile applications.
  5. Best practices like consistent naming conventions, proper error handling, and comprehensive monitoring help maintain application reliability.

By following the patterns and examples provided in this documentation, you can effectively manage concurrency in your Dart and Flutter applications, leading to more robust and reliable software.

StorageEngineBloc Documentation


🔹 Load All Items

To fetch all stored items under a specific tag.

storageEngineBloc.add(LoadItems<T>(
  tag: 'storage_key',
  fromJson: (json) => T.fromJson(json),
));

🔹 Load a Single Item

To fetch a specific item using its ID.

storageEngineBloc.add(LoadItem<T>(
  tag: 'storage_key',
  id: 'item_id',
  fromJson: (json) => T.fromJson(json),
));

🔹 Save an Item

To store or update an item in storage.

storageEngineBloc.add(SaveItem<T>(
  tag: 'storage_key',
  item: myItem,
));

🔹 Save Multiple Items

To store or update a list of items at once.

How?

storageEngineBloc.add(SaveItems<T>(
  tag: 'storage_key',
  items: myItemsList,
));

🔹 Delete a Single Item

To remove an item using its ID.

storageEngineBloc.add(DeleteItem(
  tag: 'storage_key',
  id: 'item_id',
));

🔹 Delete Multiple Items

To remove multiple items by their IDs.

storageEngineBloc.add(DeleteItems(
  tag: 'storage_key',
  ids: ['id1', 'id2'],
));

🔹 Delete Items Based on Condition

To remove items that match a specific condition.

storageEngineBloc.add(DeleteWhere<T>(
  tag: 'storage_key',
  fromJson: (json) => T.fromJson(json),
  condition: (item) => item.property == value,
));

🔹 Watch Items

To stream real-time updates for a list of items.

storageEngineBloc.add(WatchItems<T>(
  tag: 'storage_key',
  fromJson: (json) => T.fromJson(json),
));

🔹 Watch a Single Item

To stream real-time updates for a specific item.

storageEngineBloc.add(WatchItem<T>(
  tag: 'storage_key',
  id: 'item_id',
  fromJson: (json) => T.fromJson(json),
));

🔹 Query Items

To fetch items that match a specific filter condition.

storageEngineBloc.add(QueryItems<T>(
  tag: 'storage_key',
  fromJson: (json) => T.fromJson(json),
  filter: (item) => item.property == value,
));

🔹 Clear All Data

To remove all stored data under a specific tag.

storageEngineBloc.add(ClearStorage(
  tag: 'storage_key',
));

🔹 Store Raw Data

To save raw (unstructured) data.

storageEngineBloc.add(SetRawData(
  tag: 'storage_key',
  data: myRawData,
));

🔹 Get Raw Data

To retrieve raw (unstructured) data.

storageEngineBloc.add(GetRawData(
  tag: 'storage_key',
));

This documentation provides a clear and structured guide for using StorageEngineBloc<T>. Let me know if you'd like additional features or refinements!

API Reference

The library includes the following key classes and interfaces:

  • IMutex: The base interface for all mutex implementations
  • SimpleMutex: A basic mutex implementation
  • ReadWriteLock: A mutex that allows multiple readers but exclusive writers
  • PriorityMutex: A mutex that processes lock requests based on priority
  • AdaptiveMutex: A mutex with exponential backoff for high-contention scenarios
  • DistributedMutex: A mutex that works across multiple application instances
  • MutexService: A service for managing and accessing mutexes by name
  • DeadlockDetector: A utility for detecting and preventing deadlocks
  • MutexMonitor: A monitoring system for tracking mutex performance and issues

Each class provides specific methods and properties suited to its purpose, as demonstrated in the examples throughout this documentation.