Durable sagas
A saga is the answer to the question every retry loop dodges: what happens between
step three and step four when the process dies? In NetScript a saga is an explicit,
message-driven state machine — you declare the state it carries, the messages it
reacts to, and the effects each handler emits. It is authored with a single fluent
builder, persisted to a durable store, and served by the sagas plugin on port
:8092. The model is closer to Temporal than to a job queue, but it lives in plain
TypeScript inside your workspace — no separate cluster to operate.
This is the third capability in the continuous-app thread. The
background jobs capability ended with a job,
create-user-settings, that publishes a UserSettingsCreated message. Here that
message stops being fire-and-forget: a saga consumes it, advances its own state,
and emits a sagaComplete(...) effect that the runtime records as a first-class
outcome — and persists durably so the instance survives a process restart.
What it is
A saga is a durable, message-driven state machine: a typed state object plus a set of handlers, one per message type, that advance that state and return an effect ledger. The runtime persists state between every message, so an instance that is mid-flight when the process restarts resumes exactly where it left off. Unlike a background job — a single idempotent unit — a saga correlates many messages into one long-running instance and gives compensation (the undo of an already-applied step) a first-class place in the model. Read the conceptual companion, the
The durability model , for why state must outlive the process and how effect-based outcomes differ from a retry loop.
Learn → · Do →
defineSaga(id).durability().state().on().compensate().build() — id, durability tier, typed state, message handlers, compensations, then build(). One chain, fully type-checked.
Runtime state persists to kv or prisma, chosen by NETSCRIPT_SAGA_STORE / appsettings. createDurableSagaRuntime({ backend, prisma }) owns the resources.
Every handler returns an array of effects — sagaComplete, sagaFail, sagaCompensate, send, schedule, spawn are named outcomes returned from handlers, never a fall-through.
An oRPC API lists registered sagas, inspects running instances, publishes messages, and streams activity over SSE.
The workers create-user-settings job publishes UserSettingsCreated; this saga consumes it — one message crossing the plugin boundary, type-checked on both sides.
State checkpoints between messages, so an instance picks up exactly where it left off after a restart. That survival is the entire point of a saga.
Minimal example — an order saga with compensation
The runnable shape you grow into: a real state type, multiple .on(...) handlers, and a
.compensate(...) branch that unwinds an already-charged payment when fulfillment fails.
The first handler reserves stock and emits a send(...) effect to charge payment; the
failure path returns sagaCompensate(...) so the engine routes the matching compensation
handler, which refunds and then fails the instance.
// plugins/sagas/order-saga.ts
import {
defineSaga,
sagaComplete,
sagaCompensate,
sagaFail,
send,
} from '@netscript/plugin-sagas-core';
// The state is the source of truth that survives across messages and crashes.
type OrderState = Readonly<{
status: 'awaiting-payment' | 'awaiting-fulfillment' | 'completed' | 'refunded';
orderId?: string;
paymentId?: string;
}>;
type OrderPlaced = Readonly<{ orderId: string }>;
type PaymentCaptured = Readonly<{ orderId: string; paymentId: string }>;
type FulfillmentFailed = Readonly<{ orderId: string; reason: string }>;
export const orderSaga = defineSaga('order-saga')
// Durability tier: where the runtime checkpoints state between messages.
.durability('t1')
// Typed initial state, seeded once at instance creation.
.state<OrderState>({ status: 'awaiting-payment' })
// Handler 1: an order was placed — record it and ask the payment service to charge.
.on<'OrderPlaced', OrderPlaced>('OrderPlaced', (saga, event) => {
saga.state = { ...saga.state, orderId: event.payload.orderId };
// `send` is an effect: dispatch a command to another target, not a direct call.
return [send({ kind: 'service', id: 'payments' }, { orderId: event.payload.orderId }, {})];
})
// Handler 2: payment captured — advance toward fulfillment.
.on<'PaymentCaptured', PaymentCaptured>('PaymentCaptured', (saga, event) => {
saga.state = {
...saga.state,
status: 'awaiting-fulfillment',
paymentId: event.payload.paymentId,
};
return [];
})
// Handler 3: fulfillment failed AFTER we charged — request the compensation branch.
.on<'FulfillmentFailed', FulfillmentFailed>('FulfillmentFailed', (saga, event) => {
return [sagaCompensate({ type: 'FulfillmentFailed', payload: event.payload }, event.payload.reason)];
})
// Compensation: undo the already-applied payment, then fail the instance.
.compensate<'FulfillmentFailed', FulfillmentFailed>('FulfillmentFailed', (saga, event) => {
saga.state = { ...saga.state, status: 'refunded' };
return [
send({ kind: 'service', id: 'payments' }, { refund: saga.state.paymentId }, {}),
sagaFail(`order ${event.payload.orderId} unfulfilled: ${event.payload.reason}`),
];
})
.build();
export default orderSaga;
Key types first
Before the options, the primary interfaces the DSL works in. SagaState is the base
shape your typed state must satisfy; SagaContext is the read-only context passed to
every handler; SagaDefinition is the frozen object build() returns and the runtime
registers.
| Name | Type | Description |
|---|---|---|
SagaState |
Readonly |
Base state shape every saga's typed state must extend. Your State type is intersected with this. |
SagaMessage |
{ type, payload, correlationKey?, idempotencyKey?, concurrencyKey?, occurredAt?, traceparent? } |
The event or command delivered to a handler. type discriminates; payload is your typed body. |
SagaContext |
{ sagaId, instanceId, correlationKey, state, message, attempt, now, traceparent? } |
Read-only handler context. now is the injected clock; attempt is the retry count; trace fields carry W3C context. |
SagaHandler |
(saga, event, context) => readonly CascadedMessage[] |
A synchronous handler: it mutates saga.state and RETURNS an effect ledger. No async, no direct I/O. |
SagaDefinition |
Readonly<{ id, durability, initialState, handlers, compensations, correlations, retry?, concurrency?, schedule? }> |
The frozen definition build() produces. Registered into the runtime and into KV under ['saga','registry', id]. |
SAGA_DURABILITY_TIERS |
readonly ['t1','t2','t3'] |
The durability tiers a definition may declare via .durability(tier). 't1' is the scaffolded default. |
The builder API
A saga is built with defineSaga(id) and a typestate chain: .state() must come before
any handler, and .build() requires at least one handler. These are the methods on the
returned SagaBuilder.
| Name | Type | Description |
|---|---|---|
.durability(tier) |
SagaDurabilityTier ('t1' | 't2' | 't3') |
Set the persistence tier the runtime checkpoints state to. Defaults to t1. Distinct from the kv/prisma store backend (see below). |
.state |
S extends SagaState |
Declare the typed initial state, seeded once at instance creation. MUST be called before any handler (typestate-enforced). |
.on |
SagaHandler |
Register a forward handler for a message type. Receives (saga, event, context), mutates saga.state, returns an effect ledger. |
.compensate |
SagaHandler |
Register a compensation (undo) handler keyed by message type. Routed when a handler returns sagaCompensate(...). |
.correlate(rule) |
SagaCorrelation |
Extract a correlation key from an incoming message so it routes to the right running instance. |
.concurrency(opts) |
{ limit: number; key?: (m) => string } |
Bound how many messages run at once, optionally per derived key. Overlapping publishes for one key are rejected. |
.schedule(cron) |
string |
Attach a cron expression that ticks the saga definition on a schedule. |
.onSignal(signal, handler) |
SignalDefinition, handler |
Register a reserved signal handler (defineSignal). Runtime dispatch is deferred in the alpha. |
.onQuery(query, handler) |
QueryDefinition, handler |
Register a reserved synchronous read-only query handler (defineQuery). Promises are rejected at type level. |
.build() |
=> SagaDefinition |
Freeze the chain into a SagaDefinition. Requires at least one .on(...) handler (typestate-enforced). |
// plugins/sagas/<your-saga>.ts
import { defineSaga, sagaComplete } from '@netscript/plugin-sagas-core';
type State = Readonly<{ status: string; processedAt?: string }>;
// Build a saga: id -> durability tier -> typed state -> message handlers -> build().
export const userSettingsSaga = defineSaga('user-settings-saga')
.durability('t1')
.state<State>({ status: 'pending' })
.on<'UserSettingsCreated', { userId: string }>(
'UserSettingsCreated',
(saga, event, context) => {
// Advance the saga's own state, then emit completion as a recorded outcome.
saga.state = {
...saga.state,
status: 'completed',
processedAt: context.now.toISOString(),
};
return [sagaComplete({
userId: event.payload.userId,
processedAt: context.now.toISOString(),
})];
},
)
.build();
export default userSettingsSaga;
// netscript.config.ts (excerpt)
import { defineSagaConfig } from '@netscript/plugin-sagas-core/config';
// The config-time entry the scaffolder + CLI read. SEPARATE from the runtime
// definition that defineSaga(...).build() produces.
export const orderSagaEntry = defineSagaConfig('order-saga', './plugins/sagas/order-saga.ts')
.name('Order saga')
.description('Reserve, charge, fulfill — with a compensation branch.')
.topic('orders')
.tags(['orders', 'checkout'])
.build();
Effect helpers
A handler's only side effect is the array of cascaded messages it returns. These
helpers (@netscript/plugin-sagas-core) construct the named effects. Every kind in
CASCADED_MESSAGE_KINDS (send | scheduled | spawn | complete | fail | compensate) has
a constructor.
| Name | Type | Description |
|---|---|---|
sagaComplete(result?) |
=> CascadedMessage<'complete'> |
Terminal success. Marks the instance finished and records the optional result payload. |
sagaFail(reason) |
string | Error => CascadedMessage<'fail'> |
Terminal failure. Records the reason; no further messages are applied to the instance. |
sagaCompensate(message, reason?) |
=> CascadedMessage<'compensate'> |
Route into the matching .compensate(type, ...) handler to undo an already-applied step. |
send(target, payload, options) |
=> CascadedMessage<'send'> |
Dispatch a command to a target (job, saga, or runtime adapter). options carry idempotencyKey / concurrencyKey / retry / queue. |
schedule(message, delay) |
delay: Date | number | '5m' => CascadedMessage<'scheduled'> |
Deliver a wrapped message after a delay (a Date, ms, or a '30s'/'5m'/'2h'/'1d' string). |
spawn(child, input, options) |
=> CascadedMessage<'spawn'> |
Start a child saga from a definition or id, passing typed input. options take idempotencyKey / concurrencyKey. |
createParallelQueue — fan-out and concurrent processing
Saga handlers stay synchronous and pure; the transport layer that carries cascaded
messages and feeds fan-out work is where you tune concurrency. createParallelQueue (from
@netscript/queue
) is the primitive for
that: it wraps a base queue so a single listener processes several messages at once. It is
the right tool when a saga spawns many independent children or pushes I/O-bound side work
(API calls, DB writes) that should run in parallel rather than one at a time. The
concurrency option is the whole story — 1 is plain sequential, anything higher enables
parallel processing.
| Name | Type | Description |
|---|---|---|
concurrency |
number (default 1) |
Number of concurrent processors. Must be >= 1; values > 1 wrap the queue for parallel listening. Use for I/O-bound work; for CPU-bound work prefer web-worker tasks. |
provider |
QueueProvider ('deno-kv' | 'redis' | 'rabbitmq' | 'postgres') |
Backing queue provider. Omit to auto-discover from the Aspire environment. |
autoDiscover |
boolean (default true) |
Discover a queue service from Aspire. Priority RabbitMQ > Redis > Deno KV. |
retryAttempts |
number (default 3) |
Max retry attempts for failed messages, when the backend lacks native retry. |
retryDelay |
number ms (default 1000) |
Delay between retries, when the backend lacks native retry. |
connection |
QueueConnectionOptions |
Provider-specific connection options (denoKv / redis / rabbitmq / postgres). |
deadLetterStore |
DeadLetterStorePort |
Where terminal message failures land. Omit to use the provider's durable default. |
disableAutoTracing |
boolean (default false) |
Skip the automatic TracedQueue wrapper when you trace manually. |
// plugins/sagas/fan-out.ts — process saga side-effects concurrently
import { createParallelQueue, QueueProvider } from '@netscript/queue';
type NotifyMessage = Readonly<{ orderId: string; channel: 'email' | 'sms' }>;
// Four messages processed at a time on a single listener — for I/O-bound fan-out
// (notifications, webhook calls) a saga emits as it advances.
const notifications = createParallelQueue<NotifyMessage>('order-notifications', {
concurrency: 4,
provider: QueueProvider.Redis,
});
await notifications.listen(async (message) => {
// These run up to 4 at a time, not serially.
await deliverNotification(message.orderId, message.channel);
});
Choosing a durable store backend
Authoring a saga decides what it does. The durable store backend decides where its runtime state lives between messages and across crashes. NetScript ships two backends, and the choice is explicit and mandatory — the runtime refuses to start without one.
kv— durable saga state in Deno KV (the orchestration store stood up by Aspire). Zero extra schema; the natural default for a single-service app and for local development.prisma— durable saga state in your scaffolded relational database via Prisma (Postgres by default; equallymysql/mssql/sqlite, since the store writes through your project's Prisma client — it is not Postgres-specific). ThePrismaSagaStorewrites the dedicated runtime tablessaga_runtime_state,saga_runtime_transition, andsaga_runtime_correlation. Reach for this when you want the saga's own write path in your relational database alongside the rest of your data, with SQL-level inspection of in-flight state and transition history.
You select the backend with the NETSCRIPT_SAGA_STORE environment variable (kv | prisma) or the appsettings key sagas.store.backend. The plugin service resolves this on startup via resolveSagaStoreBackend(...), which throws if neither is set — there is no silent default in the resolver, by design, so a deployment can never guess wrong about where durable state lands. (Calling createDurableSagaRuntime(...) directly without a backend falls back to the KV store; the mandatory-selection guarantee comes from the resolver the service runs at startup.)
| Name | Type | Description |
|---|---|---|
kv |
Deno KV |
Default for local/single-service. No extra schema. Provisioned by Aspire (Redis/KV). Resolved via NETSCRIPT_SAGA_STORE=kv or sagas.store.backend=kv. |
prisma |
Relational / Prisma |
Writes saga_runtime_state, saga_runtime_transition, saga_runtime_correlation. Requires a Prisma client passed to createDurableSagaRuntime — so it follows whatever engine you scaffolded (Postgres by default; mysql / mssql / sqlite all work). SQL-inspectable in-flight state. Resolved via NETSCRIPT_SAGA_STORE=prisma or sagas.store.backend=prisma. |
selection |
mandatory |
No implicit default. resolveSagaStoreBackend(...) throws when neither NETSCRIPT_SAGA_STORE nor sagas.store.backend is set. |
client requirement |
prisma only |
backend: 'prisma' (or passing prisma) without a Prisma client throws 'Prisma saga store backend requires a Prisma client.' |
The factory that owns these resources is createDurableSagaRuntime(...) from the
@netscript/plugin-sagas/runtime subpath. It resolves a SagaStorePort, builds the
native runtime over it, and hands you back a dispose() that closes the store (and the
KV handle it opened).
import { createDurableSagaRuntime } from '@netscript/plugin-sagas/runtime';
// Deno KV durable store — the default for local/single-service apps.
// Selected at deploy time by NETSCRIPT_SAGA_STORE=kv (or appsettings sagas.store.backend=kv).
const { runtime, store, dispose } = await createDurableSagaRuntime({
backend: 'kv',
// kv is opened for you if you don't inject one (openSagaRuntimeKv()).
});
// ... register saga definitions on `runtime`, process messages ...
await dispose(); // closes the KV-backed store + handle
import { createDurableSagaRuntime } from '@netscript/plugin-sagas/runtime';
import { PrismaClient } from './generated/prisma/client.ts';
// Postgres/Prisma durable store — writes saga_runtime_* tables.
// Selected at deploy time by NETSCRIPT_SAGA_STORE=prisma.
const prisma = new PrismaClient();
const { runtime, store, dispose } = await createDurableSagaRuntime({
backend: 'prisma',
prisma, // REQUIRED for prisma — omitting it throws.
});
// ... register saga definitions, process messages — transitions land in
// saga_runtime_state / saga_runtime_transition / saga_runtime_correlation ...
await dispose();
import {
createDurableSagaRuntime,
resolveSagaStoreBackend,
} from '@netscript/plugin-sagas/runtime';
// Read the backend from the environment (or appsettings) — throws if unset.
const backend = resolveSagaStoreBackend({
env: Deno.env.toObject(),
// appsettings: loadedAppsettings, // sagas.store.backend
});
const runtime = await createDurableSagaRuntime({
backend,
prisma: backend === 'prisma' ? prismaClient : undefined,
});
The HTTP publisher — createSagaPublisher
A message reaches a saga through a publisher. createSagaPublisher (from
@netscript/plugin-sagas/runtime) returns a SagaPublisherPort whose publish(...) POSTs
to the sagas API publish endpoint, discovering the service URL from the Aspire environment
by default. The workers create-user-settings job uses exactly this to emit
UserSettingsCreated across the plugin boundary.
| Name | Type | Description |
|---|---|---|
serviceName |
string |
Aspire service name to resolve a base URL from. Defaults to the sagas service discovery name. |
baseUrl |
string |
Explicit base URL override; skips discovery when set. |
publishPath |
string (default '/api/v1/sagas/publish') |
Path the publisher POSTs each message to. |
headers |
Record |
Extra headers sent with every publish (auth, tenant routing). |
retryableStatusCodes |
readonly number[] (default 408,409,425,429,5xx) |
HTTP statuses treated as retryable so the receipt is marked retryable. |
id |
string (default 'http-saga-publisher') |
Stable publisher id surfaced in diagnostics. |
fetcher / readEnv |
boundary fns |
Test/injection seams for the fetch implementation and the env reader used for discovery. |
The port returns a typed receipt rather than throwing: publish(...) resolves a
SagaPublisherResult — either { published: true, ... } (a SagaPublisherReceipt) or
{ published: false, reason, retryable } (a SagaPublisherRejected). publishMany(...)
takes a mode: 'sequential' | 'parallel' so a batch can fan out.
// plugins/workers/jobs/create-user-settings.ts (core, verbatim from the scaffold)
import { createSagaPublisher } from '@netscript/plugin-sagas/runtime';
import { createSuccessResult, defineJobHandler } from '@netscript/plugin-workers-core';
import { z } from 'zod';
const PayloadSchema = z.object({ userId: z.string().min(1) });
const sagaPublisher = createSagaPublisher<UserRegistrationMessage>();
const handler = defineJobHandler(async (ctx) => {
const { userId } = PayloadSchema.parse(ctx.payload ?? {});
// This is the message the saga below consumes — a typed receipt comes back.
await sagaPublisher.publish({ type: 'UserSettingsCreated', payload: { userId } });
return createSuccessResult({ userId, settingsCreated: true });
});
export default Object.assign(handler, { id: 'create-user-settings' });
// plugins/sagas/user-settings-saga.ts
import { defineSaga, sagaComplete } from '@netscript/plugin-sagas-core';
type State = Readonly<{ status: string; processedAt?: string }>;
export const userSettingsSaga = defineSaga('user-settings-saga')
.durability('t1')
.state<State>({ status: 'pending' })
.on<'UserSettingsCreated', { userId: string }>(
'UserSettingsCreated',
(saga, event, context) => {
saga.state = { ...saga.state, status: 'completed', processedAt: context.now.toISOString() };
return [sagaComplete({ userId: event.payload.userId, processedAt: context.now.toISOString() })];
},
)
.build();
Extension points
The current scaffold uses the curated defaults, but the core package exposes the seams the
plugin is composed from. Each is a subpath of @netscript/plugin-sagas-core — cite the
sub-path in prose and look it up under
sagas
(e.g.
reference/sagas/presets, reference/sagas/transports).
| Name | Type | Description |
|---|---|---|
/presets |
startSagas(), startSagaHandlers() |
Composition helpers that build a runtime from explicit definitions and return a { runtime, bus, sagaCount, shutdown } bundle. startSagaHandlers is the distributed-handler alias. |
/middleware |
createSagaMiddleware(), createSSEEventsMiddleware() |
Hono middleware that injects saga helpers into request context, plus SSE event emission with optional durable history (SagaHistoryWriter). |
/transports |
createGarnetListTransport(), createNetScriptRedisTransport() |
At-least-once delivery adapters: a Garnet/Redis LIST transport and a Redis Streams transport, both with immediate + delayed publish and ack/nack. |
/agent |
agent surface |
Agent integration seam for the plugin's agent-facing surface (alpha). |
/integration/publisher |
SagaPublisherPort |
The publisher boundary createSagaPublisher implements — publish() / publishMany() with typed receipts. |
/integration/workers |
workers bridge |
The seam that lets a workers job emit and consume saga messages across the plugin boundary. |
Endpoints and ports
The sagas plugin runs an oRPC API service on :8092. It lists registered sagas and
inspects running instances; the registry is backed by Deno KV. These are the routes
the live scaffold serves — see
sagas
for the full
generated surface.
| Name | Type | Description |
|---|---|---|
GET /health/live |
liveness |
Liveness probe for the sagas API service. |
GET /api/v1/sagas/sagas |
registry |
List the saga definitions registered into KV (id, name, topic, handled message types, enabled). |
GET /api/v1/sagas/instances |
instances |
List running and completed saga instances. Inspect one with /instances/{sagaName}/{correlationId}. |
POST /api/v1/sagas/publish |
publish |
Publish a message to the saga bus — the same path createSagaPublisher POSTs to and the workers create-user-settings job uses. |
GET /api/v1/sagas/subscribe |
stream (SSE) |
Server-sent-events stream of saga activity (saga:started / state_changed / completed / failed / compensating), KV-watch backed. |
The continuous-app choreography
The thread that ties the capabilities together is real and it compiles. The workers
plugin's create-user-settings job calls a saga publisher and emits
UserSettingsCreated; this saga's .on('UserSettingsCreated', ...) handler consumes
it and emits sagaComplete(...). One message crosses the plugin boundary, and both
halves are type-checked against the same message type.
After both plugins are running under Aspire, trigger the workers job
(POST :8091/api/v1/workers/jobs/create-user-settings/trigger) and watch the saga
appear at GET :8092/api/v1/sagas/instances — the message crossed the boundary and a
durable instance recorded its completion. Whether that durable instance lives in Deno
KV or in your saga_runtime_* Postgres tables is exactly the NETSCRIPT_SAGA_STORE
choice above.