Skip to main content
Alpha

Durable streams

A NetScript stream is a typed, durable change-log: producers write entity state into a durable-stream server, and any number of HTTP/SSE consumers materialize the latest value per key. alpha

A producer defines a typed stream schema and writes upsert/delete operations into the durable-stream server on port 4437; the durable log fans out over HTTP/SSE to Fresh consumers that materialize the latest value per key.
Streams pipeline: producer (defineStreamSchema + createDurableStream) → durable log on :4437 → HTTP/SSE → Fresh consumers (latest value per key).

NetScript's streams capability is the typed, change-data backbone the other plugins lean on — workers, sagas, and the auth service all publish their live state through it. The producer half is real and shipping today: you define a typed stream schema with defineStreamSchema, open a producer with createDurableStream, and upsert/delete/flush entity state over a durable-stream server that runs as an Aspire resource on port :4437.

The scope on this page is narrower than it used to be. The producer runtime in @netscript/plugin-streams-core is genuine — it writes through @durable-streams/client with idempotent delivery. What is not live is the topic-centric manifest sugar in @netscript/plugin-streams (defineStreamProducer / defineStreamConsumer): a producer's publish() returns a rejected promise and a consumer's subscribe() throws synchronously — both with StreamUnsupportedOperationError, pointing you at the core package. There is also no in-process consumer subscribe() yet — consumption is over the durable-stream server's HTTP/SSE protocol, which Fresh clients read. This page draws those lines precisely so you build against the producer surface that exists, not against the manifest helpers that don't.

What it is

A NetScript stream is an entity-oriented change log. You describe a set of collections — each a named entity type with a primary key — and the producer publishes upsert and delete operations keyed by that primary key. Downstream readers materialize the latest value per key and observe a live, replayable view of your domain state. This is the same contracts-first instinct as oRPC services, but applied to state replication instead of request/response: the schema is the type contract that both producer and any HTTP/SSE consumer are locked to.

Streams sit alongside the other long-running capabilities rather than replacing them. Reach for a durable saga

when you need message-driven orchestration with compensation; reach for a

trigger when inbound HTTP or a file-watch should kick off work. A stream is the read-model fan-out: each of those plugins runs a thin streams/producer.ts that mirrors its execution state through createDurableStream, so a Fresh dashboard can watch saga, worker, and trigger progress live without polling a request/response API.

Learn → / Do →

Minimal example — produce, then consume

The producer side is two calls: freeze a typed schema, open a stream, write entity state. The consumer side is an HTTP/SSE read of the same :4437 stream path — there is no in-process subscribe() handle, so a Fresh island (or any SSE client) reads the durable log directly and materializes the latest value per key.

// streams/executions-schema.ts
// Author against the core package — this is the live surface.
import { defineStreamSchema } from '@netscript/plugin-streams-core';
import { z } from 'zod';

// Each collection is an entity type with a primary key. The schema is the
// type contract producers and HTTP/SSE consumers are locked to.
export const executionsSchema = defineStreamSchema({
  execution: {
    schema: z.object({
      id: z.string().min(1),
      status: z.enum(['queued', 'running', 'succeeded', 'failed']),
      updatedAt: z.string().datetime(),
    }),
    type: 'execution',
    primaryKey: 'id',
  },
});
// streams/producer.ts
import { createDurableStream } from '@netscript/plugin-streams-core';
import { executionsSchema } from './executions-schema.ts';

// createDurableStream returns a singleton producer per streamPath and begins
// connecting to the :4437 durable-stream server immediately.
const producer = createDurableStream({
  streamPath: '/workers/executions',
  schema: executionsSchema,
  producerId: 'workers-service',
});

// upsert/delete are synchronous enqueues keyed by the collection primary key.
producer.upsert('execution', {
  id: 'exec-1',
  status: 'running',
  updatedAt: new Date().toISOString(),
});
producer.delete('execution', 'exec-0');

// flush before graceful shutdown; it rethrows the connect error if the
// producer never connected (see known limitations).
await producer.flush();
// islands/ExecutionsView.tsx — read the durable log directly
import { getStreamsUrl } from '@netscript/plugin-streams-core';

// There is no in-process subscribe(); consumption is an HTTP/SSE read of the
// same stream path the producer writes to. getStreamsUrl resolves the :4437
// base from Aspire discovery / VITE env (see runtime resolvers below).
const base = getStreamsUrl();
const source = new EventSource(`${base}/workers/executions`);

const latest = new Map<string, unknown>(); // materialize latest value per key
source.onmessage = (ev) => {
  const change = JSON.parse(ev.data) as { key: string; value?: unknown };
  if (change.value === undefined) latest.delete(change.key);
  else latest.set(change.key, change.value);
};
// @netscript/plugin-streams (the manifest root) re-exports topic-centric
// helpers that are NOT implemented. They fail loud, by design.
import {
  defineStreamProducer,
  defineStreamConsumer,
} from '@netscript/plugin-streams';

// The producer handle is returned, but publish() rejects.
const producer = defineStreamProducer(/* topic */);
// await producer.publish(event) // -> rejects with StreamUnsupportedOperationError

// The consumer's subscribe() throws synchronously the moment you call it.
const consumer = defineStreamConsumer(/* topic */);
// consumer.subscribe(handler) // -> throws StreamUnsupportedOperationError

// Correct path: import createDurableStream / defineStreamSchema from
// '@netscript/plugin-streams-core' instead (see the other tabs).

Key types first — the stream definition API

A stream schema is a map of collections. Each collection is a CollectionDefinition — a Standard-Schema validator, a State-Protocol type discriminator, and the primaryKey property the producer keys writes by. defineStreamSchema(collections) freezes that map into a StateSchema that both the producer and any HTTP/SSE consumer are locked to.

CollectionDefinition — one entry per collection in defineStreamSchema
NameTypeDescription
schema unknown (Standard Schema validator) Standard-Schema-compatible validator (e.g. a zod object) used by durable-streams to validate the collection payload.
type string (required) State Protocol type discriminator emitted for every event in this collection (e.g. 'execution').
primaryKey string (required) Property name on the value used as the entity primary key; upsert/delete are keyed by this property.

defineStreamSchema returns a frozen StateSchema<TDef> — the durable-streams runtime attaches per-collection event helpers (insert/update/upsert/delete, the CollectionEventHelpers) so the schema can both validate and emit State-Protocol ChangeEvents. The supported Operation set is insert | update | delete | upsert.

Producer options — createDurableStream

createDurableStream(options) takes a DurableStreamProducerOptions and returns a DurableStreamProducer. It is a singleton factory keyed by streamPath: calling it twice with the same path returns the same live producer (a closed one is replaced). Writes are idempotent via @durable-streams/client's IdempotentProducer (stable producerId + auto-claim), so duplicate enqueues do not double-apply downstream.

DurableStreamProducerOptions (createDurableStream argument)
NameTypeDescription
streamPath string (required) Stream path relative to the base URL, e.g. '/workers/executions'. This is the singleton key and the path consumers read over HTTP/SSE.
schema StateSchema (required) The frozen schema returned by defineStreamSchema; binds the producer to its collection map.
producerId string (required) Stable producer identity used for idempotent delivery (IdempotentProducer auto-claim). Keep it stable across restarts for duplicate-safe writes.
signal AbortSignal? Optional abort signal consulted while opening the stream connection; aborting cancels the connect (an AbortError is treated as expected, not a failure).

The DurableStreamProducer it returns exposes a small, synchronous-write surface with an async flush/close for shutdown. (StreamProducerPort is the implemented-by interface — the same four members, with entityType widened to string.)

DurableStreamProducer — methods
MemberShapeBehavior
upsert(entityType, value) (K, Record) => void Enqueue an upsert keyed by the collection primaryKey; skipped (warns) if the key is missing/empty or the collection is unknown.
delete(entityType, key) (K, string) => void Enqueue a delete by primary key; skipped (warns) on an empty key.
flush() () => Promise<void> Await pending writes before shutdown; rethrows the connect error if the producer never connected.
close() () => Promise<void> Flush, close the underlying handle, and release the singleton for this streamPath.
streamPath string (readonly) The stream path this producer owns.
closed boolean (get) Whether shutdown has begun; further upsert/delete calls are ignored.

Runtime & transport — URL and auth resolution

The producer never hardcodes a host. createDurableStream resolves the durable-stream base URL through getStreamsUrl() and the auth header through getStreamsAuth(), both of which read the environment so the same code works under Aspire, in a browser build, or against an explicit override. buildStreamUrl joins a stream path onto that base, and inspectStreamTopic returns a JSON-stable diagnostic report for a schema (handy in tests and CLI doctors).

Runtime resolvers (@netscript/plugin-streams-core)
NameTypeDescription
getStreamsUrl() () => string Resolves the durable-stream base URL. Server: DURABLE_STREAMS_URL override, else Aspire's services__streams__http__0 discovery var. Browser: VITE_services__streams__http__0 (or the VITE_STREAMS_URL shorthand). Throws a descriptive error if none resolve.
getStreamsAuth() () => Record Builds the auth header from STREAMS_SECRET (or DURABLE_STREAMS_SECRET) as { Authorization: 'Bearer ' }; returns {} when no secret is set.
buildStreamUrl(path, baseUrl?) (string, string?) => string Joins a stream path onto the resolved base (or an explicit baseUrl), trimming a trailing slash on the base.
inspectStreamTopic(input) (input) => StreamTopicInspectionReport Diagnostic: returns a JSON-stable report (package, target, summary, details with collections/streamPath/producerId) for a schema + optional producer metadata.
Environment variables read by the resolvers
NameTypeDescription
DURABLE_STREAMS_URL server override Explicit base URL; takes precedence over Aspire discovery (e.g. http://localhost:4437).
services__streams__http__0 Aspire (server) Injected by the Aspire resource graph; the default server-side discovery path.
VITE_services__streams__http__0 browser Vite-injected reference for browser/Fresh consumers; VITE_STREAMS_URL is the convenience shorthand.
STREAMS_SECRET / DURABLE_STREAMS_SECRET auth Bearer secret for getStreamsAuth(); when set, every connect sends Authorization: Bearer .

Known limitations

Be deliberate about what the alpha producer does and does not guarantee.

Endpoints & manifest

The streams plugin is registered as a utility/infra plugin — note it requires neither a database nor KV (requiresDb=false, requiresKv=false), unlike workers, sagas, and triggers. Its durable-stream service listens on :4437 and is wired into the Aspire resource graph so workers, sagas, and auth can publish through it. The port is overridable via STREAMS_PORT or PORT.

Streams plugin — runtime facts
PropertyValue
Plugin location plugins/streams/
Real producer package @netscript/plugin-streams-core (createDurableStream, defineStreamSchema)
Manifest import @netscript/plugin-streams — topic helpers throw StreamUnsupportedOperationError
Transport client @durable-streams/client (IdempotentProducer)
Dev service port :4437 (durable-stream Aspire service; override with STREAMS_PORT/PORT)
provider.kind stream · category plugin · pluginType utility
Requires DB / KV false / false
First-party producers workers, sagas, triggers, auth (each streams/producer.tscreateDurableStream)
Consumer surface HTTP/SSE from the :4437 server (Fresh clients) — no in-process subscribe()

The plugin is referenced from netscript.config.ts as ./plugins/streams/mod.ts. Because workers, sagas, and triggers each list streams in their dependencies, it is installed first in the dependency graph and its :4437 service comes up so dependent producers have somewhere to write.

Production notes

Reference

This hub is intentionally thin — the full generated API lives in the reference.

streams