withContext<C extends ProcessContext, K> method

Future<K> withContext<C extends ProcessContext, K>(
  1. ProcessSpec<C> spec,
  2. ContextFn<C, K> fn, {
  3. int priority = 0,
  4. bool preemptable = false,
})

Execute a function with the specified context loaded

If the context is already loaded and accepting requests, executes immediately via the current client. Otherwise, waits for pending requests to complete, switches to the new context, and then executes the function.

If the task has higher priority than all other pending tasks and requires a different context, triggers a context switch immediately.

If preemptable is true and this task is preempted by a higher priority task requiring a different context, the task will be re-scheduled rather than failing.

Implementation

Future<K> withContext<C extends ProcessContext, K>(
  ProcessSpec<C> spec,
  ContextFn<C, K> fn, {
  int priority = 0,
  bool preemptable = false,
}) async {
  final task = _PendingTask(
    spec: spec,
    contextFn: (c) => fn(c as C),
    priority: priority,
    preemptable: preemptable,
  );
  _pendingTasks.add(task);

  // Wait if currently switching contexts
  while (_switchingFuture != null) {
    await _switchingFuture;
  }

  // If current context matches and accepting requests, use it
  final c = _current;
  if (c != null && c.acceptingRequests && _accept(c.spec, spec)) {
    _cancelIdleTimer();
    _current!.scheduleExecution();
    return await task.completer.future;
  }

  // Check if this task should trigger a context switch
  // (different context needed AND highest priority among all pending tasks)
  final shouldSwitch = _shouldSwitchContext(task);

  if (shouldSwitch) {
    // Need to switch contexts - create switching future for coordination
    final completer = Completer<void>();
    final switchFuture = completer.future;
    _switchingFuture = switchFuture;

    void markSwitchCompleted() {
      if (!completer.isCompleted) {
        completer.complete();
      }
      if (_switchingFuture == switchFuture) {
        _switchingFuture = null;
      }
    }

    try {
      // Stop accepting new requests on current context
      if (_current != null) {
        _current!.stopAcceptingRequests();

        if (_current?.isPreemptable ?? false) {
          // Preempt: kill process immediately and re-schedule pending tasks
          _current!.markPreempting();
          await _stopProcess(); // Kill immediately without waiting
        } else {
          // Wait for pending requests to complete gracefully
          await _current!.waitForPending();
          await _stopProcess();
        }
      }

      await _switchToContext(spec);
      markSwitchCompleted();

      // Execute the request
      _current!.scheduleExecution();
      return await task.completer.future;
    } finally {
      markSwitchCompleted();
    }
  } else {
    // Wait for the task to be picked up eventually
    _current?.scheduleExecution();
    return await task.completer.future;
  }
}