Skip to main content
Alpha

Durable sagas

A saga is the answer to the question every retry loop dodges: what happens between step three and step four when the process dies? In NetScript a saga is an explicit, message-driven state machine — you declare the state it carries, the messages it reacts to, and the effects each handler emits. It is authored with a single fluent builder, persisted to a durable store, and served by the sagas plugin on port :8092. The model is closer to Temporal than to a job queue, but it lives in plain TypeScript inside your workspace — no separate cluster to operate.

Saga state machine: an event advances the saga through typed states, a compensation branch unwinds a failed step, and every transition checkpoints to a kv or prisma store.
A saga is a state machine. Each inbound message runs a handler that mutates typed state and returns an effect ledger (advance, complete, fail, compensate); the runtime checkpoints state between messages to the durable store (kv or prisma) so an instance survives a crash.

This is the third capability in the continuous-app thread. The background jobs capability ended with a job, create-user-settings, that publishes a UserSettingsCreated message. Here that message stops being fire-and-forget: a saga consumes it, advances its own state, and emits a sagaComplete(...) effect that the runtime records as a first-class outcome — and persists durably so the instance survives a process restart.

What it is

A saga is a durable, message-driven state machine: a typed state object plus a set of handlers, one per message type, that advance that state and return an effect ledger. The runtime persists state between every message, so an instance that is mid-flight when the process restarts resumes exactly where it left off. Unlike a background job — a single idempotent unit — a saga correlates many messages into one long-running instance and gives compensation (the undo of an already-applied step) a first-class place in the model. Read the conceptual companion, the

The durability model , for why state must outlive the process and how effect-based outcomes differ from a retry loop.

Learn → · Do →

Fluent builder

defineSaga(id).durability().state().on().compensate().build() — id, durability tier, typed state, message handlers, compensations, then build(). One chain, fully type-checked.

Durable store backend

Runtime state persists to kv or prisma, chosen by NETSCRIPT_SAGA_STORE / appsettings. createDurableSagaRuntime({ backend, prisma }) owns the resources.

Effect-based outcomes

Every handler returns an array of effects — sagaComplete, sagaFail, sagaCompensate, send, schedule, spawn are named outcomes returned from handlers, never a fall-through.

Served on :8092

An oRPC API lists registered sagas, inspects running instances, publishes messages, and streams activity over SSE.

Cross-plugin choreography

The workers create-user-settings job publishes UserSettingsCreated; this saga consumes it — one message crossing the plugin boundary, type-checked on both sides.

Crash-survivable

State checkpoints between messages, so an instance picks up exactly where it left off after a restart. That survival is the entire point of a saga.

Minimal example — an order saga with compensation

The runnable shape you grow into: a real state type, multiple .on(...) handlers, and a .compensate(...) branch that unwinds an already-charged payment when fulfillment fails. The first handler reserves stock and emits a send(...) effect to charge payment; the failure path returns sagaCompensate(...) so the engine routes the matching compensation handler, which refunds and then fails the instance.

// plugins/sagas/order-saga.ts
import {
  defineSaga,
  sagaComplete,
  sagaCompensate,
  sagaFail,
  send,
} from '@netscript/plugin-sagas-core';

// The state is the source of truth that survives across messages and crashes.
type OrderState = Readonly<{
  status: 'awaiting-payment' | 'awaiting-fulfillment' | 'completed' | 'refunded';
  orderId?: string;
  paymentId?: string;
}>;

type OrderPlaced = Readonly<{ orderId: string }>;
type PaymentCaptured = Readonly<{ orderId: string; paymentId: string }>;
type FulfillmentFailed = Readonly<{ orderId: string; reason: string }>;

export const orderSaga = defineSaga('order-saga')
  // Durability tier: where the runtime checkpoints state between messages.
  .durability('t1')
  // Typed initial state, seeded once at instance creation.
  .state<OrderState>({ status: 'awaiting-payment' })
  // Handler 1: an order was placed — record it and ask the payment service to charge.
  .on<'OrderPlaced', OrderPlaced>('OrderPlaced', (saga, event) => {
    saga.state = { ...saga.state, orderId: event.payload.orderId };
    // `send` is an effect: dispatch a command to another target, not a direct call.
    return [send({ kind: 'service', id: 'payments' }, { orderId: event.payload.orderId }, {})];
  })
  // Handler 2: payment captured — advance toward fulfillment.
  .on<'PaymentCaptured', PaymentCaptured>('PaymentCaptured', (saga, event) => {
    saga.state = {
      ...saga.state,
      status: 'awaiting-fulfillment',
      paymentId: event.payload.paymentId,
    };
    return [];
  })
  // Handler 3: fulfillment failed AFTER we charged — request the compensation branch.
  .on<'FulfillmentFailed', FulfillmentFailed>('FulfillmentFailed', (saga, event) => {
    return [sagaCompensate({ type: 'FulfillmentFailed', payload: event.payload }, event.payload.reason)];
  })
  // Compensation: undo the already-applied payment, then fail the instance.
  .compensate<'FulfillmentFailed', FulfillmentFailed>('FulfillmentFailed', (saga, event) => {
    saga.state = { ...saga.state, status: 'refunded' };
    return [
      send({ kind: 'service', id: 'payments' }, { refund: saga.state.paymentId }, {}),
      sagaFail(`order ${event.payload.orderId} unfulfilled: ${event.payload.reason}`),
    ];
  })
  .build();

export default orderSaga;

Key types first

Before the options, the primary interfaces the DSL works in. SagaState is the base shape your typed state must satisfy; SagaContext is the read-only context passed to every handler; SagaDefinition is the frozen object build() returns and the runtime registers.

Primary saga types (@netscript/plugin-sagas-core)
NameTypeDescription
SagaState Readonly> Base state shape every saga's typed state must extend. Your State type is intersected with this.
SagaMessage { type, payload, correlationKey?, idempotencyKey?, concurrencyKey?, occurredAt?, traceparent? } The event or command delivered to a handler. type discriminates; payload is your typed body.
SagaContext { sagaId, instanceId, correlationKey, state, message, attempt, now, traceparent? } Read-only handler context. now is the injected clock; attempt is the retry count; trace fields carry W3C context.
SagaHandler (saga, event, context) => readonly CascadedMessage[] A synchronous handler: it mutates saga.state and RETURNS an effect ledger. No async, no direct I/O.
SagaDefinition Readonly<{ id, durability, initialState, handlers, compensations, correlations, retry?, concurrency?, schedule? }> The frozen definition build() produces. Registered into the runtime and into KV under ['saga','registry', id].
SAGA_DURABILITY_TIERS readonly ['t1','t2','t3'] The durability tiers a definition may declare via .durability(tier). 't1' is the scaffolded default.

The builder API

A saga is built with defineSaga(id) and a typestate chain: .state() must come before any handler, and .build() requires at least one handler. These are the methods on the returned SagaBuilder.

defineSaga(id) — SagaBuilder methods
NameTypeDescription
.durability(tier) SagaDurabilityTier ('t1' | 't2' | 't3') Set the persistence tier the runtime checkpoints state to. Defaults to t1. Distinct from the kv/prisma store backend (see below).
.state(initial) S extends SagaState Declare the typed initial state, seeded once at instance creation. MUST be called before any handler (typestate-enforced).
.on(type, handler) SagaHandler Register a forward handler for a message type. Receives (saga, event, context), mutates saga.state, returns an effect ledger.
.compensate(type, handler) SagaHandler Register a compensation (undo) handler keyed by message type. Routed when a handler returns sagaCompensate(...).
.correlate(rule) SagaCorrelation Extract a correlation key from an incoming message so it routes to the right running instance.
.concurrency(opts) { limit: number; key?: (m) => string } Bound how many messages run at once, optionally per derived key. Overlapping publishes for one key are rejected.
.schedule(cron) string Attach a cron expression that ticks the saga definition on a schedule.
.onSignal(signal, handler) SignalDefinition, handler Register a reserved signal handler (defineSignal). Runtime dispatch is deferred in the alpha.
.onQuery(query, handler) QueryDefinition, handler Register a reserved synchronous read-only query handler (defineQuery). Promises are rejected at type level.
.build() => SagaDefinition Freeze the chain into a SagaDefinition. Requires at least one .on(...) handler (typestate-enforced).
// plugins/sagas/<your-saga>.ts
import { defineSaga, sagaComplete } from '@netscript/plugin-sagas-core';

type State = Readonly<{ status: string; processedAt?: string }>;

// Build a saga: id -> durability tier -> typed state -> message handlers -> build().
export const userSettingsSaga = defineSaga('user-settings-saga')
  .durability('t1')
  .state<State>({ status: 'pending' })
  .on<'UserSettingsCreated', { userId: string }>(
    'UserSettingsCreated',
    (saga, event, context) => {
      // Advance the saga's own state, then emit completion as a recorded outcome.
      saga.state = {
        ...saga.state,
        status: 'completed',
        processedAt: context.now.toISOString(),
      };
      return [sagaComplete({
        userId: event.payload.userId,
        processedAt: context.now.toISOString(),
      })];
    },
  )
  .build();

export default userSettingsSaga;
// netscript.config.ts (excerpt)
import { defineSagaConfig } from '@netscript/plugin-sagas-core/config';

// The config-time entry the scaffolder + CLI read. SEPARATE from the runtime
// definition that defineSaga(...).build() produces.
export const orderSagaEntry = defineSagaConfig('order-saga', './plugins/sagas/order-saga.ts')
  .name('Order saga')
  .description('Reserve, charge, fulfill — with a compensation branch.')
  .topic('orders')
  .tags(['orders', 'checkout'])
  .build();

Effect helpers

A handler's only side effect is the array of cascaded messages it returns. These helpers (@netscript/plugin-sagas-core) construct the named effects. Every kind in CASCADED_MESSAGE_KINDS (send | scheduled | spawn | complete | fail | compensate) has a constructor.

Effect helpers — saga handler outcomes
NameTypeDescription
sagaComplete(result?) => CascadedMessage<'complete'> Terminal success. Marks the instance finished and records the optional result payload.
sagaFail(reason) string | Error => CascadedMessage<'fail'> Terminal failure. Records the reason; no further messages are applied to the instance.
sagaCompensate(message, reason?) => CascadedMessage<'compensate'> Route into the matching .compensate(type, ...) handler to undo an already-applied step.
send(target, payload, options) => CascadedMessage<'send'> Dispatch a command to a target (job, saga, or runtime adapter). options carry idempotencyKey / concurrencyKey / retry / queue.
schedule(message, delay) delay: Date | number | '5m' => CascadedMessage<'scheduled'> Deliver a wrapped message after a delay (a Date, ms, or a '30s'/'5m'/'2h'/'1d' string).
spawn(child, input, options) => CascadedMessage<'spawn'> Start a child saga from a definition or id, passing typed input. options take idempotencyKey / concurrencyKey.

createParallelQueue — fan-out and concurrent processing

Saga handlers stay synchronous and pure; the transport layer that carries cascaded messages and feeds fan-out work is where you tune concurrency. createParallelQueue (from

@netscript/queue ) is the primitive for that: it wraps a base queue so a single listener processes several messages at once. It is the right tool when a saga spawns many independent children or pushes I/O-bound side work (API calls, DB writes) that should run in parallel rather than one at a time. The concurrency option is the whole story — 1 is plain sequential, anything higher enables parallel processing.

createParallelQueue(name, options) — ParallelQueueOptions (extends QueueOptions)
NameTypeDescription
concurrency number (default 1) Number of concurrent processors. Must be >= 1; values > 1 wrap the queue for parallel listening. Use for I/O-bound work; for CPU-bound work prefer web-worker tasks.
provider QueueProvider ('deno-kv' | 'redis' | 'rabbitmq' | 'postgres') Backing queue provider. Omit to auto-discover from the Aspire environment.
autoDiscover boolean (default true) Discover a queue service from Aspire. Priority RabbitMQ > Redis > Deno KV.
retryAttempts number (default 3) Max retry attempts for failed messages, when the backend lacks native retry.
retryDelay number ms (default 1000) Delay between retries, when the backend lacks native retry.
connection QueueConnectionOptions Provider-specific connection options (denoKv / redis / rabbitmq / postgres).
deadLetterStore DeadLetterStorePort Where terminal message failures land. Omit to use the provider's durable default.
disableAutoTracing boolean (default false) Skip the automatic TracedQueue wrapper when you trace manually.
// plugins/sagas/fan-out.ts — process saga side-effects concurrently
import { createParallelQueue, QueueProvider } from '@netscript/queue';

type NotifyMessage = Readonly<{ orderId: string; channel: 'email' | 'sms' }>;

// Four messages processed at a time on a single listener — for I/O-bound fan-out
// (notifications, webhook calls) a saga emits as it advances.
const notifications = createParallelQueue<NotifyMessage>('order-notifications', {
  concurrency: 4,
  provider: QueueProvider.Redis,
});

await notifications.listen(async (message) => {
  // These run up to 4 at a time, not serially.
  await deliverNotification(message.orderId, message.channel);
});

Choosing a durable store backend

Authoring a saga decides what it does. The durable store backend decides where its runtime state lives between messages and across crashes. NetScript ships two backends, and the choice is explicit and mandatory — the runtime refuses to start without one.

  • kv — durable saga state in Deno KV (the orchestration store stood up by Aspire). Zero extra schema; the natural default for a single-service app and for local development.
  • prisma — durable saga state in your scaffolded relational database via Prisma (Postgres by default; equally mysql / mssql / sqlite, since the store writes through your project's Prisma client — it is not Postgres-specific). The PrismaSagaStore writes the dedicated runtime tables saga_runtime_state, saga_runtime_transition, and saga_runtime_correlation. Reach for this when you want the saga's own write path in your relational database alongside the rest of your data, with SQL-level inspection of in-flight state and transition history.

You select the backend with the NETSCRIPT_SAGA_STORE environment variable (kv | prisma) or the appsettings key sagas.store.backend. The plugin service resolves this on startup via resolveSagaStoreBackend(...), which throws if neither is set — there is no silent default in the resolver, by design, so a deployment can never guess wrong about where durable state lands. (Calling createDurableSagaRuntime(...) directly without a backend falls back to the KV store; the mandatory-selection guarantee comes from the resolver the service runs at startup.)

Durable saga store backends — trait matrix
NameTypeDescription
kv Deno KV Default for local/single-service. No extra schema. Provisioned by Aspire (Redis/KV). Resolved via NETSCRIPT_SAGA_STORE=kv or sagas.store.backend=kv.
prisma Relational / Prisma Writes saga_runtime_state, saga_runtime_transition, saga_runtime_correlation. Requires a Prisma client passed to createDurableSagaRuntime — so it follows whatever engine you scaffolded (Postgres by default; mysql / mssql / sqlite all work). SQL-inspectable in-flight state. Resolved via NETSCRIPT_SAGA_STORE=prisma or sagas.store.backend=prisma.
selection mandatory No implicit default. resolveSagaStoreBackend(...) throws when neither NETSCRIPT_SAGA_STORE nor sagas.store.backend is set.
client requirement prisma only backend: 'prisma' (or passing prisma) without a Prisma client throws 'Prisma saga store backend requires a Prisma client.'

The factory that owns these resources is createDurableSagaRuntime(...) from the @netscript/plugin-sagas/runtime subpath. It resolves a SagaStorePort, builds the native runtime over it, and hands you back a dispose() that closes the store (and the KV handle it opened).

import { createDurableSagaRuntime } from '@netscript/plugin-sagas/runtime';

// Deno KV durable store — the default for local/single-service apps.
// Selected at deploy time by NETSCRIPT_SAGA_STORE=kv (or appsettings sagas.store.backend=kv).
const { runtime, store, dispose } = await createDurableSagaRuntime({
  backend: 'kv',
  // kv is opened for you if you don't inject one (openSagaRuntimeKv()).
});

// ... register saga definitions on `runtime`, process messages ...

await dispose(); // closes the KV-backed store + handle
import { createDurableSagaRuntime } from '@netscript/plugin-sagas/runtime';
import { PrismaClient } from './generated/prisma/client.ts';

// Postgres/Prisma durable store — writes saga_runtime_* tables.
// Selected at deploy time by NETSCRIPT_SAGA_STORE=prisma.
const prisma = new PrismaClient();
const { runtime, store, dispose } = await createDurableSagaRuntime({
  backend: 'prisma',
  prisma, // REQUIRED for prisma — omitting it throws.
});

// ... register saga definitions, process messages — transitions land in
// saga_runtime_state / saga_runtime_transition / saga_runtime_correlation ...

await dispose();
import {
  createDurableSagaRuntime,
  resolveSagaStoreBackend,
} from '@netscript/plugin-sagas/runtime';

// Read the backend from the environment (or appsettings) — throws if unset.
const backend = resolveSagaStoreBackend({
  env: Deno.env.toObject(),
  // appsettings: loadedAppsettings, // sagas.store.backend
});

const runtime = await createDurableSagaRuntime({
  backend,
  prisma: backend === 'prisma' ? prismaClient : undefined,
});

The HTTP publisher — createSagaPublisher

A message reaches a saga through a publisher. createSagaPublisher (from @netscript/plugin-sagas/runtime) returns a SagaPublisherPort whose publish(...) POSTs to the sagas API publish endpoint, discovering the service URL from the Aspire environment by default. The workers create-user-settings job uses exactly this to emit UserSettingsCreated across the plugin boundary.

createSagaPublisher(options) — HttpSagaPublisherOptions
NameTypeDescription
serviceName string Aspire service name to resolve a base URL from. Defaults to the sagas service discovery name.
baseUrl string Explicit base URL override; skips discovery when set.
publishPath string (default '/api/v1/sagas/publish') Path the publisher POSTs each message to.
headers Record Extra headers sent with every publish (auth, tenant routing).
retryableStatusCodes readonly number[] (default 408,409,425,429,5xx) HTTP statuses treated as retryable so the receipt is marked retryable.
id string (default 'http-saga-publisher') Stable publisher id surfaced in diagnostics.
fetcher / readEnv boundary fns Test/injection seams for the fetch implementation and the env reader used for discovery.

The port returns a typed receipt rather than throwing: publish(...) resolves a SagaPublisherResult — either { published: true, ... } (a SagaPublisherReceipt) or { published: false, reason, retryable } (a SagaPublisherRejected). publishMany(...) takes a mode: 'sequential' | 'parallel' so a batch can fan out.

// plugins/workers/jobs/create-user-settings.ts (core, verbatim from the scaffold)
import { createSagaPublisher } from '@netscript/plugin-sagas/runtime';
import { createSuccessResult, defineJobHandler } from '@netscript/plugin-workers-core';
import { z } from 'zod';

const PayloadSchema = z.object({ userId: z.string().min(1) });
const sagaPublisher = createSagaPublisher<UserRegistrationMessage>();

const handler = defineJobHandler(async (ctx) => {
  const { userId } = PayloadSchema.parse(ctx.payload ?? {});
  // This is the message the saga below consumes — a typed receipt comes back.
  await sagaPublisher.publish({ type: 'UserSettingsCreated', payload: { userId } });
  return createSuccessResult({ userId, settingsCreated: true });
});

export default Object.assign(handler, { id: 'create-user-settings' });
// plugins/sagas/user-settings-saga.ts
import { defineSaga, sagaComplete } from '@netscript/plugin-sagas-core';

type State = Readonly<{ status: string; processedAt?: string }>;

export const userSettingsSaga = defineSaga('user-settings-saga')
  .durability('t1')
  .state<State>({ status: 'pending' })
  .on<'UserSettingsCreated', { userId: string }>(
    'UserSettingsCreated',
    (saga, event, context) => {
      saga.state = { ...saga.state, status: 'completed', processedAt: context.now.toISOString() };
      return [sagaComplete({ userId: event.payload.userId, processedAt: context.now.toISOString() })];
    },
  )
  .build();

Extension points

The current scaffold uses the curated defaults, but the core package exposes the seams the plugin is composed from. Each is a subpath of @netscript/plugin-sagas-core — cite the sub-path in prose and look it up under sagas (e.g. reference/sagas/presets, reference/sagas/transports).

Saga extension seams (@netscript/plugin-sagas-core subpaths)
NameTypeDescription
/presets startSagas(), startSagaHandlers() Composition helpers that build a runtime from explicit definitions and return a { runtime, bus, sagaCount, shutdown } bundle. startSagaHandlers is the distributed-handler alias.
/middleware createSagaMiddleware(), createSSEEventsMiddleware() Hono middleware that injects saga helpers into request context, plus SSE event emission with optional durable history (SagaHistoryWriter).
/transports createGarnetListTransport(), createNetScriptRedisTransport() At-least-once delivery adapters: a Garnet/Redis LIST transport and a Redis Streams transport, both with immediate + delayed publish and ack/nack.
/agent agent surface Agent integration seam for the plugin's agent-facing surface (alpha).
/integration/publisher SagaPublisherPort The publisher boundary createSagaPublisher implements — publish() / publishMany() with typed receipts.
/integration/workers workers bridge The seam that lets a workers job emit and consume saga messages across the plugin boundary.

Endpoints and ports

The sagas plugin runs an oRPC API service on :8092. It lists registered sagas and inspects running instances; the registry is backed by Deno KV. These are the routes the live scaffold serves — see sagas for the full generated surface.

Sagas plugin — runtime endpoints (port :8092)
NameTypeDescription
GET /health/live liveness Liveness probe for the sagas API service.
GET /api/v1/sagas/sagas registry List the saga definitions registered into KV (id, name, topic, handled message types, enabled).
GET /api/v1/sagas/instances instances List running and completed saga instances. Inspect one with /instances/{sagaName}/{correlationId}.
POST /api/v1/sagas/publish publish Publish a message to the saga bus — the same path createSagaPublisher POSTs to and the workers create-user-settings job uses.
GET /api/v1/sagas/subscribe stream (SSE) Server-sent-events stream of saga activity (saga:started / state_changed / completed / failed / compensating), KV-watch backed.

The continuous-app choreography

The thread that ties the capabilities together is real and it compiles. The workers plugin's create-user-settings job calls a saga publisher and emits UserSettingsCreated; this saga's .on('UserSettingsCreated', ...) handler consumes it and emits sagaComplete(...). One message crosses the plugin boundary, and both halves are type-checked against the same message type.

After both plugins are running under Aspire, trigger the workers job (POST :8091/api/v1/workers/jobs/create-user-settings/trigger) and watch the saga appear at GET :8092/api/v1/sagas/instances — the message crossed the boundary and a durable instance recorded its completion. Whether that durable instance lives in Deno KV or in your saga_runtime_* Postgres tables is exactly the NETSCRIPT_SAGA_STORE choice above.

Production notes

Why a saga, and why not

Reference →