stem library
Distributed task queue and worker framework for Dart.
Stem is a robust, production-ready background job system designed to orchestrate complex asynchronous workflows across multiple processes and machines.
Key Concepts
- Stem: The main entry point for producers to enqueue tasks.
- Broker: The transport layer that manages task delivery (e.g., Redis, SQS, In-Memory).
- Worker: The runtime that consumes tasks and executes them in isolates or inline.
- ResultBackend: Persists task status and results for later retrieval and coordination.
- Beat: A distributed scheduler for recurring tasks using cron or interval specs.
Features
- Typed Task Definitions: Guarantee type safety for arguments and results using TaskDefinition.
- Observability: First-class support for OpenTelemetry tracing, metrics, and structured logging.
- Reliability: Automatic retries with backoff, heartbeats, lease management, and idempotent claims.
- Scalability: Managed isolate pools with automatic recycling and dynamic autoscaling.
- Coordination: High-level workflows like Chords, Groups, and Chains, plus unique task enforcement.
Quick Start
import 'package:stem/stem.dart';
void main() async {
// 1. Define a typed task
final addDefinition = TaskDefinition<Map<String, int>, int>(
name: 'add_task',
encodeArgs: (args) => args,
);
// 2. Setup the registry and handler
final registry = SimpleTaskRegistry()
..register(FunctionTaskHandler(
name: 'add_task',
entrypoint: (context, args) async {
return (args['a'] as int) + (args['b'] as int);
},
));
// 3. Initialize Stem with a broker (e.g., In-Memory for testing)
final stem = Stem(
broker: InMemoryBroker(),
registry: registry,
);
// 4. Enqueue work and wait for the result
final taskId = await addDefinition.call({
'a': 10,
'b': 20,
}).enqueueWith(stem);
final result = await stem.waitForTask<int>(taskId);
print('Sum is: ${result?.value}'); // Sum is: 30
}
Classes
- AfterTaskPublishPayload
- Payload emitted after a task has been published to the broker.
- Beat
- Scheduler loop that dispatches due ScheduleEntry records.
- BeforeTaskPublishPayload
- Payload emitted before a task is published to the broker.
- BroadcastDefinition
- Broadcast channel declaration.
- Broker
- Abstract broker interface implemented by queue adapters (Redis, SQS, etc). Since: 0.1.0
- CalendarScheduleSpec
- Schedule specification that matches specific calendar attributes.
- Canvas
- A high-level API for composing and dispatching tasks.
- ChordMetadata
- Metadata keys used for chord coordination.
-
ChordResult<
T extends Object?> - Result returned by Canvas.chord.
- ClockedScheduleSpec
- Schedule specification for a fixed, absolute run time.
- ConsoleMetricsExporter
- Simple exporter that prints JSON metrics to either sink or stdout.
- ControlCommandCompletedPayload
- Payload emitted when a control command completes execution.
- ControlCommandMessage
- Control-plane command dispatched to worker control queues.
- ControlCommandReceivedPayload
- Payload emitted when a control command is received by a worker.
- ControlEnvelopeTypes
- Envelope name constants used by control messages.
- ControlQueueNames
- Helpers for building control queue names.
- ControlReplyMessage
- Control-plane reply emitted by a worker in response to a command.
- CronScheduleSpec
- Schedule specification for standard Cron expressions.
- DartasticMetricsExporter
- Metrics exporter that relays events through Dartastic's OTLP pipeline.
- DeadLetterEntry
- Dead letter queue entry containing the failed envelope and metadata.
- DeadLetterPage
- Page of dead letter results with optional continuation offset.
- DeadLetterReplayResult
- Result describing entries considered for replay.
- DefaultQueueConfig
-
Default queue alias configuration tying
aliasto a canonical queue name. - Delivery
- Runtime wrapper containing the envelope plus broker-specific receipt info.
- DlqEntrySnapshot
- Represents a single entry captured in the dead-letter queue.
- EncodingResultBackend
- Result backend decorator that applies TaskPayloadEncoder semantics.
- EnqueueTaskSignal
- Request to enqueue a task from an isolate.
- Envelope
- Task payload persisted inside a broker. Since: 0.1.0
- EventBus
- Distributes external events to workflow runs.
- ExponentialJitterRetryStrategy
- Exponential backoff with jitter, capped at a configurable duration.
- ExtendLeaseSignal
- Signals a request to extend a task lease.
- FakeStem
- Test helper that records enqueue calls without publishing to a broker.
- FakeWorkflowClock
- Controllable clock intended for tests.
- FileRevokeStore
- File-based RevokeStore using newline-delimited JSON entries.
-
Flow<
T extends Object?> - Convenience wrapper that builds a WorkflowDefinition using the declarative FlowBuilder DSL.
- FlowBuilder
- Builder used by Flow to capture workflow steps in declaration order.
- FlowContext
- Context provided to each workflow step invocation.
- FlowStep
- Node in a workflow Flow.
- FlowStepControl
- Control directive emitted by a workflow step to suspend execution.
-
FunctionTaskHandler<
R> -
Convenience task handler that delegates execution to a top-level function
suitable for isolate execution. Set runInIsolate to
falseor use FunctionTaskHandler.inline to keep execution in the worker isolate. - GroupDescriptor
- Descriptor for group (e.g., chord) aggregation.
-
GroupDispatch<
T extends Object?> - Handle returned by Canvas.group providing a stream of typed results.
- GroupStatus
- Aggregated status for a group/chord.
- HeartbeatSignal
- Signals a heartbeat from an executing task.
- HeartbeatTransport
- Transport abstraction for distributing worker heartbeat payloads.
- InMemoryBroker
- In-memory broker for testing and local development.
- InMemoryEventBus
- No-op EventBus suitable for single-process tests.
- InMemoryLockStore
- In-memory lock store used for tests and local scheduling.
- InMemoryResultBackend
- Simple in-memory result backend used for tests and local development.
- InMemoryRevokeStore
- Simple in-memory implementation of RevokeStore useful for tests.
- InMemoryScheduleStore
- Simple in-memory schedule store implementation used in tests.
- InMemoryWorkflowRegistry
- In-memory WorkflowRegistry implementation for local runs and tests.
- InMemoryWorkflowStore
- Simple in-memory WorkflowStore used for tests and examples.
- IntervalScheduleSpec
- Schedule specification for fixed time intervals.
- JsonTaskPayloadEncoder
- Default encoder that stores payloads verbatim as JSON-friendly values.
- Lock
- Handle to a lock acquired from a LockStore.
- LockStore
- Lock store used for unique jobs or scheduling coordination. Since: 0.1.0
- LockStoreFactory
- Factory for building LockStore instances.
- MetricEvent
- Immutable metric event emitted to exporters.
- MetricsExporter
- Consumers implement exporters to relay metric events to specific sinks.
- Middleware
- Middleware hook invoked for lifecycle events around enqueue/consume/execute. Since: 0.1.0
- NoopHeartbeatTransport
- Transport that intentionally drops all heartbeats.
- NoopWorkflowIntrospectionSink
- Default no-op sink for workflow step events.
- ObservabilityConfig
- Aggregates configuration controlling observability integrations.
- ObservabilityReport
- Aggregated observability information for queues, workers, and DLQ entries.
- PayloadSigner
- Handles signing and verification of envelope payloads.
- ProgressSignal
- Signals a task progress update.
- PrometheusMetricsExporter
- Exporter that accumulates metrics into Prometheus exposition format.
- QueueBinding
- Binding between a routing key (and optional headers) and a queue.
- QueueDefinition
- Detailed queue definition with optional routing metadata.
- QueueHeartbeat
- Aggregated in-flight counts for a specific queue.
- QueuePriorityRange
- Priority range constraint applied to a queue definition.
- QueueSnapshot
- Point-in-time metrics describing a specific queue.
- RateLimitDecision
- Result of attempting to acquire tokens from the rate limiter.
- RateLimiter
- Optional rate limiter interface shared across workers. Since: 0.1.0
- RecordedEnqueue
- Record describing an enqueued task captured by FakeStem.
- ResultBackend
- Result backend describes how task states are persisted and retrieved. Since: 0.1.0
- RetryStrategy
- Retry strategy used to compute the next backoff delay. Since: 0.1.0
- RevokeEntry
- Represents a persisted revoke entry for a task.
- RevokeStore
- Storage abstraction for task revocations.
- RevokeStoreFactory
- Factory for building RevokeStore instances.
- RouteDecision
- Describes the routing decision for an enqueue request.
- RouteDefinition
- Declarative routing rule mapping match criteria to targets.
- RouteMatch
- Routing match criteria with optional task glob, headers, or queue override.
- RouteRequest
- Immutable request used when resolving routing decisions.
- RouteTarget
- Route target describing queue or broadcast destination.
- RoutingConfig
- Canonical representation of routing configuration loaded from YAML or JSON.
- RoutingInfo
- Routing metadata that accompanies published envelopes.
- RoutingRegistry
- Registry that resolves routing configuration into queue/broadcast decisions.
- RoutingSubscription
- Subscription describing the queues and broadcast channels a worker should consume from.
- RunState
- Snapshot of a workflow run persisted by a WorkflowStore.
- ScheduleCalculator
- Computes the next run time for schedule entries.
- ScheduleEntry
- Schedule entry persisted by a Beat-like scheduler.
- ScheduleEntryDispatchedPayload
- Payload emitted when a schedule entry has been dispatched.
- ScheduleEntryDuePayload
- Payload emitted when a schedule entry becomes due for execution.
- ScheduleEntryFailedPayload
- Payload emitted when a schedule entry fails to execute.
- ScheduleSpec
- Base class for all scheduler specifications.
- ScheduleSpecKind
- Enumerates supported scheduler specification kinds.
- ScheduleStore
- Storage abstraction used by the scheduler to fetch due entries. Since: 0.1.0
- ScheduleStoreFactory
- Factory for building ScheduleStore instances.
- ScheduleTimezoneResolver
- Lazily resolves timezone identifiers.
-
Signal<
T> - Dispatchable signal with typed payloads and listener management.
- SignalContext
- Context passed to every signal dispatch.
- SignalDispatchConfig
-
Configuration applied to all signals dispatched through
SignalHub. -
SignalFilter<
T> - Filters signal emissions based on payload and context.
- SignalMiddleware
- Middleware adapter that forwards enqueue and execution lifecycle events to the Stem signal dispatcher. Useful for incremental migrations where existing middleware chains should emit signals without rewriting core coordinator or worker logic.
- SignalSubscription
- Handle representing a registered signal listener.
- SigningConfig
- Configuration describing signing keys and behaviour.
- SimpleTaskRegistry
- Default in-memory registry implementation.
- SolarScheduleSpec
- Schedule specification based on celestial/solar events.
- Stem
- Facade used by producer applications to enqueue tasks.
- StemApp
- Convenience bootstrap for setting up a Stem runtime with sensible defaults.
- StemBackendFactory
- Factory for building ResultBackend instances.
- StemBrokerFactory
- Factory for building Broker instances.
- StemClient
- Shared entrypoint that owns broker/backend configuration for Stem runtimes.
- StemConfig
- Configuration source for Stem clients and worker processes.
- StemMemoryAdapter
-
Built-in adapter for in-memory stores using the
memory://scheme. - StemMetrics
- Central registry that aggregates metrics before exporting them.
-
StemResourceFactory<
T> - Wrapper for constructing and disposing Stem resources lazily.
- StemSignalConfiguration
- Configuration for controlling Stem signals behavior.
- StemSignalEmitter
- Helper used by coordinators, workers, and middleware to emit strongly typed Stem signals without duplicating payload construction.
- StemSignals
- Central registry for all Stem framework signals.
- StemStack
- Resolved factories for a Stem deployment stack.
- StemStoreAdapter
- Adapter that can resolve Stem store factories from a URL.
- StemStoreOverrides
- Optional URL overrides for specific store kinds.
- StemTracer
- Utilities for Stem's tracing integration (Dartastic OpenTelemetry).
- StemWorkerConfig
- Worker configuration used by the bootstrap helpers.
- StemWorkflowApp
- Helper that bootstraps a workflow runtime on top of StemApp.
- SystemWorkflowClock
- Default clock that proxies to DateTime.now.
-
TaskCall<
TArgs, TResult> - Represents a pending enqueue operation built from a TaskDefinition.
-
TaskChainResult<
T extends Object?> - Result returned by Canvas.chain.
- TaskContext
- Context passed to handler implementations during execution.
-
TaskDefinition<
TArgs, TResult> - Declarative task definition to build typed enqueue calls.
- TaskDefn
- Declares a function-backed task definition.
-
TaskEnqueueBuilder<
TArgs, TResult> - Fluent builder used to construct rich enqueue requests.
- TaskEnqueueOptions
- Options that apply only to the enqueue operation.
- TaskEnqueuer
- Interface implemented by enqueuers like Stem and task contexts.
- TaskEnqueueRequest
- Enqueue request payload for isolate communication.
- TaskEnqueueResponse
- Response payload for isolate enqueue requests.
- TaskEnqueueScope
- Provides ambient metadata for task enqueue operations.
- TaskError
- Error metadata captured for failures.
- TaskFailurePayload
- Payload emitted when a task fails.
-
TaskHandler<
R> - Runtime task handler. Since: 0.1.0
- TaskInvocationContext
- Context exposed to task entrypoints regardless of execution environment.
- TaskInvocationSignal
- Control messages emitted by task entrypoints running inside isolates.
- TaskMetadata
- Optional task metadata for documentation and tooling.
- TaskOptions
- Configuration options attached to task handlers.
- TaskPayloadEncoder
- Transforms task handler results to and from backend-friendly payloads.
- TaskPayloadEncoderRegistry
- Registry for managing and resolving TaskPayloadEncoder instances.
- TaskPostrunPayload
- Payload emitted after a task finishes execution.
- TaskPrerunPayload
- Payload emitted before a task begins execution.
- TaskReceivedPayload
- Payload emitted when a task is received by a worker.
- TaskRegistrationEvent
- Event emitted when a task handler registers with a registry.
- TaskRegistry
- Registry mapping task names to handler implementations.
-
TaskResult<
T extends Object?> - Typed view over a TaskStatus returned by helpers such as Stem.waitForTask or canvas typed operations.
- TaskRetryPayload
- Payload emitted when a task is scheduled for retry.
- TaskRetryPolicy
- Retry policy configuration for tasks and publish attempts.
- TaskRevokedPayload
- Payload emitted when a task is revoked.
-
TaskSignature<
T extends Object?> - Describes a task to schedule along with optional decoder metadata.
- TaskStatus
- Canonical task record stored in the result backend.
- TaskStatusListRequest
- Query parameters used to list task status records.
- TaskStatusPage
- Paginated page of task status records.
- TaskStatusRecord
- Immutable record representing a persisted task status with timestamps.
- TaskSuccessPayload
- Payload emitted when a task completes successfully.
- TlsConfig
- TLS configuration for brokers, result backends, and schedule stores.
- TlsEnvKeys
- Environment variable keys for TLS configuration.
- UniqueTaskClaim
- Result of a uniqueness acquisition attempt.
- UniqueTaskCoordinator
- Coordinates unique task claims using a LockStore.
- UniqueTaskMetadata
- Internal metadata keys stored on envelopes/result backend to track unique tasks.
- Worker
- Worker runtime that consumes tasks from a broker and executes handlers.
- WorkerAutoscaleConfig
- Autoscaling configuration for worker concurrency.
- WorkerChildLifecyclePayload
- Payload emitted for worker child isolate lifecycle events.
- WorkerEvent
- An event emitted during worker operation.
- WorkerHeartbeat
- Structured payload describing worker state for external monitoring systems.
- WorkerHeartbeatPayload
- Payload emitted when a worker sends a heartbeat.
- WorkerInfo
- Basic metadata representing a worker that emitted a signal.
- WorkerLifecycleConfig
- Lifecycle guard configuration for worker isolates and shutdown semantics.
- WorkerLifecyclePayload
- Payload emitted for worker lifecycle events (start, stop, etc.).
- WorkerSnapshot
- Captures details about a worker instance at the sample instant.
- WorkflowAnnotations
- Collection of workflow-related annotations.
- WorkflowCancellationPolicy
- Controls how the workflow runtime auto-cancels runs.
- WorkflowClock
- Abstraction over time sources used by the workflow runtime and stores.
-
WorkflowDefinition<
T extends Object?> - Declarative workflow definition built via FlowBuilder or a higher-level script facade. The definition captures the ordered steps that the runtime will execute along with optional script metadata used by the facade runner.
- WorkflowDefn
- Declares a workflow definition class.
- WorkflowEdge
- Describes a directed edge between workflow steps.
- WorkflowEventBusFactory
- Factory for building workflow EventBus instances that depend on a store.
- WorkflowRegistry
- Registry for workflow definitions keyed by name.
-
WorkflowResult<
T extends Object?> - Typed result returned by StemWorkflowApp.waitForCompletion.
- WorkflowRun
- Marks a workflow class method as the run entrypoint.
- WorkflowRunPayload
- Payload emitted for workflow run events.
- WorkflowRuntime
- Coordinates execution of workflow runs by dequeuing tasks, invoking steps, and persisting progress via a WorkflowStore.
-
WorkflowScript<
T extends Object?> -
High-level workflow facade that allows scripts to be authored as a single
async function using
step,sleep, andawaitEventhelpers. - WorkflowScriptContext
- Runtime context exposed to workflow scripts. Implementations are provided by the workflow runtime so scripts can execute with durable semantics.
- WorkflowScriptStepContext
- Context provided to each script step invocation. Mirrors FlowContext but tailored for the facade helpers.
- WorkflowStep
- Marks a workflow class method as a durable step.
- WorkflowStepEntry
- Persisted step checkpoint metadata for a workflow run.
- WorkflowStepEvent
- Step-level execution event emitted by the workflow runtime.
- WorkflowStore
- Persistent storage for workflow runs, checkpoints, and suspensions.
- WorkflowStoreFactory
- Factory for building WorkflowStore instances.
- WorkflowWatcher
- Describes a workflow event watcher registered by the runtime.
- WorkflowWatcherResolution
- Represents a watcher that has been resolved (event delivered or timed out).
Enums
- FlowControlType
- Enumerates the suspension control types.
- MetricType
- Known metric aggregation types supported by the exporters.
- RouteDecisionType
- Distinguishes queue vs broadcast routing decisions.
- RoutingTargetType
- Target classification for routing operations.
- SigningAlgorithm
- Supported signing algorithms.
- StemStoreKind
- Store categories that can be resolved by adapters.
- TaskState
- Logical task status across enqueue, running, success, failure states.
- UniqueTaskClaimStatus
- Indicates whether uniqueness was acquired or a duplicate was detected.
- WorkerEventType
- Types of events a worker can emit.
- WorkerShutdownMode
- Shutdown modes for workers.
- WorkflowDefinitionKind
- Identifies whether a workflow is step-based or script-based.
- WorkflowKind
- Workflow definition type for annotated definitions.
- WorkflowRunStatus
- Status of a workflow run emitted via signals.
- WorkflowStatus
- Lifecycle state of a workflow run.
- WorkflowStepEventType
- Enumerates workflow step event types emitted by the runtime.
- WorkflowStepKind
- Kinds of workflow steps exposed for introspection.
Mixins
- WorkflowIntrospectionSink
- Sink for workflow step execution events.
Extensions
- ControlCommandEnvelope on ControlCommandMessage
- Extension helpers to convert a command into an Envelope.
- ControlReplyEnvelope on ControlReplyMessage
- Extension helpers to convert a reply into an Envelope.
-
TaskDefinitionCanvasX
on TaskDefinition<
TArgs, TResult> - Convenience helpers for turning TaskDefinitions into signatures.
-
TaskEnqueueBuilderExtension
on TaskEnqueueBuilder<
TArgs, TResult> - Convenience helpers for enqueuing TaskEnqueueBuilder instances.
- TaskStateX on TaskState
- Helpers for reasoning about task lifecycle states.
Constants
- signatureHeader → const String
- Header storing the base64 encoded payload signature.
- signatureKeyHeader → const String
- Header storing the signing key identifier used to calculate the signature.
- stemArgsEncoderHeader → const String
- Header key storing the args encoder identifier.
- stemArgsEncoderMetaKey → const String
- Metadata key storing the args encoder identifier.
- stemResultEncoderMetaKey → const String
- Metadata key storing the result encoder identifier.
- workflow → const WorkflowAnnotations
- Annotation namespace for workflow metadata.
- workflowRunTaskName → const String
- Task name used for workflow run execution tasks.
Properties
- stemLogger → Logger
-
Shared logger configured with console output suitable for worker
diagnostics.
final
Functions
-
configureStemLogging(
{Level level = Level.info}) → void -
Sets the minimum log
levelfor the shared stemLogger. -
controlCommandFromEnvelope(
Envelope envelope) → ControlCommandMessage - Parses a ControlCommandMessage from a control command Envelope.
-
controlReplyFromEnvelope(
Envelope envelope) → ControlReplyMessage - Parses a ControlReplyMessage from a control reply Envelope.
-
ensureTaskPayloadEncoderRegistry(
TaskPayloadEncoderRegistry? registry, {TaskPayloadEncoder resultEncoder = const JsonTaskPayloadEncoder(), TaskPayloadEncoder argsEncoder = const JsonTaskPayloadEncoder(), Iterable< TaskPayloadEncoder> additionalEncoders = const []}) → TaskPayloadEncoderRegistry -
Ensures a TaskPayloadEncoderRegistry is available, instantiating one when
registryis null and registering anyadditionalEncoderswith whichever registry is ultimately used. -
generateEnvelopeId(
) → String - Unique identifier generator used for task envelopes by default.
-
generateRevokeVersion(
) → int - Generates a monotonically increasing revoke version based on UTC time.
-
logTlsHandshakeFailure(
{required String component, required String host, required int port, required TlsConfig? config, required Object error, required StackTrace stack}) → void - Logs diagnostic details for failed TLS handshakes.
-
task<
T extends Object?> (String name, {Map< String, Object?> args = const {}, Map<String, String> headers = const {}, TaskOptions options = const TaskOptions(), Map<String, Object?> meta = const {}, DateTime? notBefore, T decode(Object? payload)?}) → TaskSignature<T> - Returns a TaskSignature that creates an Envelope for a task.
-
withTaskPayloadEncoder(
ResultBackend backend, TaskPayloadEncoderRegistry registry) → ResultBackend -
Wraps a ResultBackend to encode/decode payloads via
registry.
Typedefs
- Clock = DateTime Function()
- Clock function used for deterministic testing.
- SignalErrorReporter = void Function(String signalName, Object error, StackTrace stackTrace)
- Function signature for reporting signal errors.
-
SignalHandler<
T> = FutureOr< void> Function(T payload, SignalContext context) - Signature for signal handlers.
-
SignalPredicate<
T> = bool Function(T payload, SignalContext context) - Predicate used to filter signal payloads.
-
TaskArgsEncoder<
TArgs> = Map< String, Object?> Function(TArgs args) - Encodes strongly typed task arguments into a JSON-ready map.
-
TaskEntrypoint
= FutureOr<
Object?> Function(TaskInvocationContext context, Map<String, Object?> args) - Signature for task entrypoints that can run inside isolate executors.
-
TaskMetaBuilder<
TArgs> = Map< String, Object?> Function(TArgs args) - Builds metadata for a task invocation using its arguments.
-
TaskResultDecoder<
TResult> = TResult Function(Object? payload) - Decodes a persisted task result payload into a typed value.
-
WorkflowScriptBody<
T extends Object?> = FutureOr< T> Function(WorkflowScriptContext context) - Declarative workflow definition built via FlowBuilder.
Exceptions / Errors
- ScheduleConflictException
- Thrown when a schedule mutation conflicts with a newer store version.
- SignatureVerificationException
- Thrown when an envelope signature is missing or invalid.
- TaskRetryRequest
- Signals an explicit retry request from within a task handler.
- TaskRevokedException
- Exception thrown when a task is revoked during execution.