stem library

Distributed task queue and worker framework for Dart.

Stem is a robust, production-ready background job system designed to orchestrate complex asynchronous workflows across multiple processes and machines.

Key Concepts

  • Stem: The main entry point for producers to enqueue tasks.
  • Broker: The transport layer that manages task delivery (e.g., Redis, SQS, In-Memory).
  • Worker: The runtime that consumes tasks and executes them in isolates or inline.
  • ResultBackend: Persists task status and results for later retrieval and coordination.
  • Beat: A distributed scheduler for recurring tasks using cron or interval specs.

Features

  • Typed Task Definitions: Guarantee type safety for arguments and results using TaskDefinition.
  • Observability: First-class support for OpenTelemetry tracing, metrics, and structured logging.
  • Reliability: Automatic retries with backoff, heartbeats, lease management, and idempotent claims.
  • Scalability: Managed isolate pools with automatic recycling and dynamic autoscaling.
  • Coordination: High-level workflows like Chords, Groups, and Chains, plus unique task enforcement.

Quick Start

import 'package:stem/stem.dart';

void main() async {
  // 1. Define a typed task
  final addDefinition = TaskDefinition<Map<String, int>, int>(
    name: 'add_task',
    encodeArgs: (args) => args,
  );

  // 2. Setup the registry and handler
  final registry = SimpleTaskRegistry()
    ..register(FunctionTaskHandler(
      name: 'add_task',
      entrypoint: (context, args) async {
        return (args['a'] as int) + (args['b'] as int);
      },
    ));

  // 3. Initialize Stem with a broker (e.g., In-Memory for testing)
  final stem = Stem(
    broker: InMemoryBroker(),
    registry: registry,
  );

  // 4. Enqueue work and wait for the result
  final taskId = await addDefinition.call({
    'a': 10,
    'b': 20,
  }).enqueueWith(stem);
  final result = await stem.waitForTask<int>(taskId);

  print('Sum is: ${result?.value}'); // Sum is: 30
}

Classes

AfterTaskPublishPayload
Payload emitted after a task has been published to the broker.
Beat
Scheduler loop that dispatches due ScheduleEntry records.
BeforeTaskPublishPayload
Payload emitted before a task is published to the broker.
BroadcastDefinition
Broadcast channel declaration.
Broker
Abstract broker interface implemented by queue adapters (Redis, SQS, etc). Since: 0.1.0
CalendarScheduleSpec
Schedule specification that matches specific calendar attributes.
Canvas
A high-level API for composing and dispatching tasks.
ChordMetadata
Metadata keys used for chord coordination.
ChordResult<T extends Object?>
Result returned by Canvas.chord.
ClockedScheduleSpec
Schedule specification for a fixed, absolute run time.
ConsoleMetricsExporter
Simple exporter that prints JSON metrics to either sink or stdout.
ControlCommandCompletedPayload
Payload emitted when a control command completes execution.
ControlCommandMessage
Control-plane command dispatched to worker control queues.
ControlCommandReceivedPayload
Payload emitted when a control command is received by a worker.
ControlEnvelopeTypes
Envelope name constants used by control messages.
ControlQueueNames
Helpers for building control queue names.
ControlReplyMessage
Control-plane reply emitted by a worker in response to a command.
CronScheduleSpec
Schedule specification for standard Cron expressions.
DartasticMetricsExporter
Metrics exporter that relays events through Dartastic's OTLP pipeline.
DeadLetterEntry
Dead letter queue entry containing the failed envelope and metadata.
DeadLetterPage
Page of dead letter results with optional continuation offset.
DeadLetterReplayResult
Result describing entries considered for replay.
DefaultQueueConfig
Default queue alias configuration tying alias to a canonical queue name.
Delivery
Runtime wrapper containing the envelope plus broker-specific receipt info.
DlqEntrySnapshot
Represents a single entry captured in the dead-letter queue.
EncodingResultBackend
Result backend decorator that applies TaskPayloadEncoder semantics.
EnqueueTaskSignal
Request to enqueue a task from an isolate.
Envelope
Task payload persisted inside a broker. Since: 0.1.0
EventBus
Distributes external events to workflow runs.
ExponentialJitterRetryStrategy
Exponential backoff with jitter, capped at a configurable duration.
ExtendLeaseSignal
Signals a request to extend a task lease.
FakeStem
Test helper that records enqueue calls without publishing to a broker.
FakeWorkflowClock
Controllable clock intended for tests.
FileRevokeStore
File-based RevokeStore using newline-delimited JSON entries.
Flow<T extends Object?>
Convenience wrapper that builds a WorkflowDefinition using the declarative FlowBuilder DSL.
FlowBuilder
Builder used by Flow to capture workflow steps in declaration order.
FlowContext
Context provided to each workflow step invocation.
FlowStep
Node in a workflow Flow.
FlowStepControl
Control directive emitted by a workflow step to suspend execution.
FunctionTaskHandler<R>
Convenience task handler that delegates execution to a top-level function suitable for isolate execution. Set runInIsolate to false or use FunctionTaskHandler.inline to keep execution in the worker isolate.
GroupDescriptor
Descriptor for group (e.g., chord) aggregation.
GroupDispatch<T extends Object?>
Handle returned by Canvas.group providing a stream of typed results.
GroupStatus
Aggregated status for a group/chord.
HeartbeatSignal
Signals a heartbeat from an executing task.
HeartbeatTransport
Transport abstraction for distributing worker heartbeat payloads.
InMemoryBroker
In-memory broker for testing and local development.
InMemoryEventBus
No-op EventBus suitable for single-process tests.
InMemoryLockStore
In-memory lock store used for tests and local scheduling.
InMemoryResultBackend
Simple in-memory result backend used for tests and local development.
InMemoryRevokeStore
Simple in-memory implementation of RevokeStore useful for tests.
InMemoryScheduleStore
Simple in-memory schedule store implementation used in tests.
InMemoryWorkflowRegistry
In-memory WorkflowRegistry implementation for local runs and tests.
InMemoryWorkflowStore
Simple in-memory WorkflowStore used for tests and examples.
IntervalScheduleSpec
Schedule specification for fixed time intervals.
JsonTaskPayloadEncoder
Default encoder that stores payloads verbatim as JSON-friendly values.
Lock
Handle to a lock acquired from a LockStore.
LockStore
Lock store used for unique jobs or scheduling coordination. Since: 0.1.0
LockStoreFactory
Factory for building LockStore instances.
MetricEvent
Immutable metric event emitted to exporters.
MetricsExporter
Consumers implement exporters to relay metric events to specific sinks.
Middleware
Middleware hook invoked for lifecycle events around enqueue/consume/execute. Since: 0.1.0
NoopHeartbeatTransport
Transport that intentionally drops all heartbeats.
NoopWorkflowIntrospectionSink
Default no-op sink for workflow step events.
ObservabilityConfig
Aggregates configuration controlling observability integrations.
ObservabilityReport
Aggregated observability information for queues, workers, and DLQ entries.
PayloadSigner
Handles signing and verification of envelope payloads.
ProgressSignal
Signals a task progress update.
PrometheusMetricsExporter
Exporter that accumulates metrics into Prometheus exposition format.
QueueBinding
Binding between a routing key (and optional headers) and a queue.
QueueDefinition
Detailed queue definition with optional routing metadata.
QueueHeartbeat
Aggregated in-flight counts for a specific queue.
QueuePriorityRange
Priority range constraint applied to a queue definition.
QueueSnapshot
Point-in-time metrics describing a specific queue.
RateLimitDecision
Result of attempting to acquire tokens from the rate limiter.
RateLimiter
Optional rate limiter interface shared across workers. Since: 0.1.0
RecordedEnqueue
Record describing an enqueued task captured by FakeStem.
ResultBackend
Result backend describes how task states are persisted and retrieved. Since: 0.1.0
RetryStrategy
Retry strategy used to compute the next backoff delay. Since: 0.1.0
RevokeEntry
Represents a persisted revoke entry for a task.
RevokeStore
Storage abstraction for task revocations.
RevokeStoreFactory
Factory for building RevokeStore instances.
RouteDecision
Describes the routing decision for an enqueue request.
RouteDefinition
Declarative routing rule mapping match criteria to targets.
RouteMatch
Routing match criteria with optional task glob, headers, or queue override.
RouteRequest
Immutable request used when resolving routing decisions.
RouteTarget
Route target describing queue or broadcast destination.
RoutingConfig
Canonical representation of routing configuration loaded from YAML or JSON.
RoutingInfo
Routing metadata that accompanies published envelopes.
RoutingRegistry
Registry that resolves routing configuration into queue/broadcast decisions.
RoutingSubscription
Subscription describing the queues and broadcast channels a worker should consume from.
RunState
Snapshot of a workflow run persisted by a WorkflowStore.
ScheduleCalculator
Computes the next run time for schedule entries.
ScheduleEntry
Schedule entry persisted by a Beat-like scheduler.
ScheduleEntryDispatchedPayload
Payload emitted when a schedule entry has been dispatched.
ScheduleEntryDuePayload
Payload emitted when a schedule entry becomes due for execution.
ScheduleEntryFailedPayload
Payload emitted when a schedule entry fails to execute.
ScheduleSpec
Base class for all scheduler specifications.
ScheduleSpecKind
Enumerates supported scheduler specification kinds.
ScheduleStore
Storage abstraction used by the scheduler to fetch due entries. Since: 0.1.0
ScheduleStoreFactory
Factory for building ScheduleStore instances.
ScheduleTimezoneResolver
Lazily resolves timezone identifiers.
Signal<T>
Dispatchable signal with typed payloads and listener management.
SignalContext
Context passed to every signal dispatch.
SignalDispatchConfig
Configuration applied to all signals dispatched through SignalHub.
SignalFilter<T>
Filters signal emissions based on payload and context.
SignalMiddleware
Middleware adapter that forwards enqueue and execution lifecycle events to the Stem signal dispatcher. Useful for incremental migrations where existing middleware chains should emit signals without rewriting core coordinator or worker logic.
SignalSubscription
Handle representing a registered signal listener.
SigningConfig
Configuration describing signing keys and behaviour.
SimpleTaskRegistry
Default in-memory registry implementation.
SolarScheduleSpec
Schedule specification based on celestial/solar events.
Stem
Facade used by producer applications to enqueue tasks.
StemApp
Convenience bootstrap for setting up a Stem runtime with sensible defaults.
StemBackendFactory
Factory for building ResultBackend instances.
StemBrokerFactory
Factory for building Broker instances.
StemClient
Shared entrypoint that owns broker/backend configuration for Stem runtimes.
StemConfig
Configuration source for Stem clients and worker processes.
StemMemoryAdapter
Built-in adapter for in-memory stores using the memory:// scheme.
StemMetrics
Central registry that aggregates metrics before exporting them.
StemResourceFactory<T>
Wrapper for constructing and disposing Stem resources lazily.
StemSignalConfiguration
Configuration for controlling Stem signals behavior.
StemSignalEmitter
Helper used by coordinators, workers, and middleware to emit strongly typed Stem signals without duplicating payload construction.
StemSignals
Central registry for all Stem framework signals.
StemStack
Resolved factories for a Stem deployment stack.
StemStoreAdapter
Adapter that can resolve Stem store factories from a URL.
StemStoreOverrides
Optional URL overrides for specific store kinds.
StemTracer
Utilities for Stem's tracing integration (Dartastic OpenTelemetry).
StemWorkerConfig
Worker configuration used by the bootstrap helpers.
StemWorkflowApp
Helper that bootstraps a workflow runtime on top of StemApp.
SystemWorkflowClock
Default clock that proxies to DateTime.now.
TaskCall<TArgs, TResult>
Represents a pending enqueue operation built from a TaskDefinition.
TaskChainResult<T extends Object?>
Result returned by Canvas.chain.
TaskContext
Context passed to handler implementations during execution.
TaskDefinition<TArgs, TResult>
Declarative task definition to build typed enqueue calls.
TaskDefn
Declares a function-backed task definition.
TaskEnqueueBuilder<TArgs, TResult>
Fluent builder used to construct rich enqueue requests.
TaskEnqueueOptions
Options that apply only to the enqueue operation.
TaskEnqueuer
Interface implemented by enqueuers like Stem and task contexts.
TaskEnqueueRequest
Enqueue request payload for isolate communication.
TaskEnqueueResponse
Response payload for isolate enqueue requests.
TaskEnqueueScope
Provides ambient metadata for task enqueue operations.
TaskError
Error metadata captured for failures.
TaskFailurePayload
Payload emitted when a task fails.
TaskHandler<R>
Runtime task handler. Since: 0.1.0
TaskInvocationContext
Context exposed to task entrypoints regardless of execution environment.
TaskInvocationSignal
Control messages emitted by task entrypoints running inside isolates.
TaskMetadata
Optional task metadata for documentation and tooling.
TaskOptions
Configuration options attached to task handlers.
TaskPayloadEncoder
Transforms task handler results to and from backend-friendly payloads.
TaskPayloadEncoderRegistry
Registry for managing and resolving TaskPayloadEncoder instances.
TaskPostrunPayload
Payload emitted after a task finishes execution.
TaskPrerunPayload
Payload emitted before a task begins execution.
TaskReceivedPayload
Payload emitted when a task is received by a worker.
TaskRegistrationEvent
Event emitted when a task handler registers with a registry.
TaskRegistry
Registry mapping task names to handler implementations.
TaskResult<T extends Object?>
Typed view over a TaskStatus returned by helpers such as Stem.waitForTask or canvas typed operations.
TaskRetryPayload
Payload emitted when a task is scheduled for retry.
TaskRetryPolicy
Retry policy configuration for tasks and publish attempts.
TaskRevokedPayload
Payload emitted when a task is revoked.
TaskSignature<T extends Object?>
Describes a task to schedule along with optional decoder metadata.
TaskStatus
Canonical task record stored in the result backend.
TaskStatusListRequest
Query parameters used to list task status records.
TaskStatusPage
Paginated page of task status records.
TaskStatusRecord
Immutable record representing a persisted task status with timestamps.
TaskSuccessPayload
Payload emitted when a task completes successfully.
TlsConfig
TLS configuration for brokers, result backends, and schedule stores.
TlsEnvKeys
Environment variable keys for TLS configuration.
UniqueTaskClaim
Result of a uniqueness acquisition attempt.
UniqueTaskCoordinator
Coordinates unique task claims using a LockStore.
UniqueTaskMetadata
Internal metadata keys stored on envelopes/result backend to track unique tasks.
Worker
Worker runtime that consumes tasks from a broker and executes handlers.
WorkerAutoscaleConfig
Autoscaling configuration for worker concurrency.
WorkerChildLifecyclePayload
Payload emitted for worker child isolate lifecycle events.
WorkerEvent
An event emitted during worker operation.
WorkerHeartbeat
Structured payload describing worker state for external monitoring systems.
WorkerHeartbeatPayload
Payload emitted when a worker sends a heartbeat.
WorkerInfo
Basic metadata representing a worker that emitted a signal.
WorkerLifecycleConfig
Lifecycle guard configuration for worker isolates and shutdown semantics.
WorkerLifecyclePayload
Payload emitted for worker lifecycle events (start, stop, etc.).
WorkerSnapshot
Captures details about a worker instance at the sample instant.
WorkflowAnnotations
Collection of workflow-related annotations.
WorkflowCancellationPolicy
Controls how the workflow runtime auto-cancels runs.
WorkflowClock
Abstraction over time sources used by the workflow runtime and stores.
WorkflowDefinition<T extends Object?>
Declarative workflow definition built via FlowBuilder or a higher-level script facade. The definition captures the ordered steps that the runtime will execute along with optional script metadata used by the facade runner.
WorkflowDefn
Declares a workflow definition class.
WorkflowEdge
Describes a directed edge between workflow steps.
WorkflowEventBusFactory
Factory for building workflow EventBus instances that depend on a store.
WorkflowRegistry
Registry for workflow definitions keyed by name.
WorkflowResult<T extends Object?>
Typed result returned by StemWorkflowApp.waitForCompletion.
WorkflowRun
Marks a workflow class method as the run entrypoint.
WorkflowRunPayload
Payload emitted for workflow run events.
WorkflowRuntime
Coordinates execution of workflow runs by dequeuing tasks, invoking steps, and persisting progress via a WorkflowStore.
WorkflowScript<T extends Object?>
High-level workflow facade that allows scripts to be authored as a single async function using step, sleep, and awaitEvent helpers.
WorkflowScriptContext
Runtime context exposed to workflow scripts. Implementations are provided by the workflow runtime so scripts can execute with durable semantics.
WorkflowScriptStepContext
Context provided to each script step invocation. Mirrors FlowContext but tailored for the facade helpers.
WorkflowStep
Marks a workflow class method as a durable step.
WorkflowStepEntry
Persisted step checkpoint metadata for a workflow run.
WorkflowStepEvent
Step-level execution event emitted by the workflow runtime.
WorkflowStore
Persistent storage for workflow runs, checkpoints, and suspensions.
WorkflowStoreFactory
Factory for building WorkflowStore instances.
WorkflowWatcher
Describes a workflow event watcher registered by the runtime.
WorkflowWatcherResolution
Represents a watcher that has been resolved (event delivered or timed out).

Enums

FlowControlType
Enumerates the suspension control types.
MetricType
Known metric aggregation types supported by the exporters.
RouteDecisionType
Distinguishes queue vs broadcast routing decisions.
RoutingTargetType
Target classification for routing operations.
SigningAlgorithm
Supported signing algorithms.
StemStoreKind
Store categories that can be resolved by adapters.
TaskState
Logical task status across enqueue, running, success, failure states.
UniqueTaskClaimStatus
Indicates whether uniqueness was acquired or a duplicate was detected.
WorkerEventType
Types of events a worker can emit.
WorkerShutdownMode
Shutdown modes for workers.
WorkflowDefinitionKind
Identifies whether a workflow is step-based or script-based.
WorkflowKind
Workflow definition type for annotated definitions.
WorkflowRunStatus
Status of a workflow run emitted via signals.
WorkflowStatus
Lifecycle state of a workflow run.
WorkflowStepEventType
Enumerates workflow step event types emitted by the runtime.
WorkflowStepKind
Kinds of workflow steps exposed for introspection.

Mixins

WorkflowIntrospectionSink
Sink for workflow step execution events.

Extensions

ControlCommandEnvelope on ControlCommandMessage
Extension helpers to convert a command into an Envelope.
ControlReplyEnvelope on ControlReplyMessage
Extension helpers to convert a reply into an Envelope.
TaskDefinitionCanvasX on TaskDefinition<TArgs, TResult>
Convenience helpers for turning TaskDefinitions into signatures.
TaskEnqueueBuilderExtension on TaskEnqueueBuilder<TArgs, TResult>
Convenience helpers for enqueuing TaskEnqueueBuilder instances.
TaskStateX on TaskState
Helpers for reasoning about task lifecycle states.

Constants

signatureHeader → const String
Header storing the base64 encoded payload signature.
signatureKeyHeader → const String
Header storing the signing key identifier used to calculate the signature.
stemArgsEncoderHeader → const String
Header key storing the args encoder identifier.
stemArgsEncoderMetaKey → const String
Metadata key storing the args encoder identifier.
stemResultEncoderMetaKey → const String
Metadata key storing the result encoder identifier.
workflow → const WorkflowAnnotations
Annotation namespace for workflow metadata.
workflowRunTaskName → const String
Task name used for workflow run execution tasks.

Properties

stemLogger → Logger
Shared logger configured with console output suitable for worker diagnostics.
final

Functions

configureStemLogging({Level level = Level.info}) → void
Sets the minimum log level for the shared stemLogger.
controlCommandFromEnvelope(Envelope envelope) ControlCommandMessage
Parses a ControlCommandMessage from a control command Envelope.
controlReplyFromEnvelope(Envelope envelope) ControlReplyMessage
Parses a ControlReplyMessage from a control reply Envelope.
ensureTaskPayloadEncoderRegistry(TaskPayloadEncoderRegistry? registry, {TaskPayloadEncoder resultEncoder = const JsonTaskPayloadEncoder(), TaskPayloadEncoder argsEncoder = const JsonTaskPayloadEncoder(), Iterable<TaskPayloadEncoder> additionalEncoders = const []}) TaskPayloadEncoderRegistry
Ensures a TaskPayloadEncoderRegistry is available, instantiating one when registry is null and registering any additionalEncoders with whichever registry is ultimately used.
generateEnvelopeId() String
Unique identifier generator used for task envelopes by default.
generateRevokeVersion() int
Generates a monotonically increasing revoke version based on UTC time.
logTlsHandshakeFailure({required String component, required String host, required int port, required TlsConfig? config, required Object error, required StackTrace stack}) → void
Logs diagnostic details for failed TLS handshakes.
task<T extends Object?>(String name, {Map<String, Object?> args = const {}, Map<String, String> headers = const {}, TaskOptions options = const TaskOptions(), Map<String, Object?> meta = const {}, DateTime? notBefore, T decode(Object? payload)?}) TaskSignature<T>
Returns a TaskSignature that creates an Envelope for a task.
withTaskPayloadEncoder(ResultBackend backend, TaskPayloadEncoderRegistry registry) ResultBackend
Wraps a ResultBackend to encode/decode payloads via registry.

Typedefs

Clock = DateTime Function()
Clock function used for deterministic testing.
SignalErrorReporter = void Function(String signalName, Object error, StackTrace stackTrace)
Function signature for reporting signal errors.
SignalHandler<T> = FutureOr<void> Function(T payload, SignalContext context)
Signature for signal handlers.
SignalPredicate<T> = bool Function(T payload, SignalContext context)
Predicate used to filter signal payloads.
TaskArgsEncoder<TArgs> = Map<String, Object?> Function(TArgs args)
Encodes strongly typed task arguments into a JSON-ready map.
TaskEntrypoint = FutureOr<Object?> Function(TaskInvocationContext context, Map<String, Object?> args)
Signature for task entrypoints that can run inside isolate executors.
TaskMetaBuilder<TArgs> = Map<String, Object?> Function(TArgs args)
Builds metadata for a task invocation using its arguments.
TaskResultDecoder<TResult> = TResult Function(Object? payload)
Decodes a persisted task result payload into a typed value.
WorkflowScriptBody<T extends Object?> = FutureOr<T> Function(WorkflowScriptContext context)
Declarative workflow definition built via FlowBuilder.

Exceptions / Errors

ScheduleConflictException
Thrown when a schedule mutation conflicts with a newer store version.
SignatureVerificationException
Thrown when an envelope signature is missing or invalid.
TaskRetryRequest
Signals an explicit retry request from within a task handler.
TaskRevokedException
Exception thrown when a task is revoked during execution.