Durable streams
A NetScript stream is a typed, durable change-log: producers write entity state into a durable-stream server, and any number of HTTP/SSE consumers materialize the latest value per key. alpha
NetScript's streams capability is the typed, change-data backbone the other
plugins lean on — workers, sagas, and the auth service all publish their live
state through it. The producer half is real and shipping today: you define a
typed stream schema with defineStreamSchema, open a
producer with createDurableStream, and upsert/delete/flush entity state
over a durable-stream server that runs as an Aspire resource on port :4437.
The scope on this page is narrower than it used to be. The producer
runtime in @netscript/plugin-streams-core is genuine — it writes through
@durable-streams/client with idempotent delivery. What is not live is the
topic-centric manifest sugar in @netscript/plugin-streams
(defineStreamProducer / defineStreamConsumer): a producer's publish()
returns a rejected promise and a consumer's subscribe() throws
synchronously — both with StreamUnsupportedOperationError, pointing you at the
core package. There is also no in-process consumer subscribe() yet —
consumption is over the durable-stream server's HTTP/SSE protocol, which Fresh
clients read. This page draws those lines precisely so you build against the
producer surface that exists, not against the manifest helpers that don't.
What it is
A NetScript stream is an entity-oriented change log. You describe a set of
collections — each a named entity type with a primary key — and the producer
publishes upsert and delete operations keyed by that primary key. Downstream
readers materialize the latest value per key and observe a live, replayable view
of your domain state. This is the same contracts-first instinct as oRPC services,
but applied to state replication instead of request/response: the schema is
the type contract that both producer and any HTTP/SSE consumer are locked to.
Streams sit alongside the other long-running capabilities rather than replacing them. Reach for a durable saga
when you need message-driven orchestration with compensation; reach for a
trigger
when inbound HTTP or a
file-watch should kick off work. A stream is the read-model fan-out: each of
those plugins runs a thin streams/producer.ts that mirrors its execution state
through createDurableStream, so a Fresh dashboard can watch saga, worker, and
trigger progress live without polling a request/response API.
Learn → / Do →
Minimal example — produce, then consume
The producer side is two calls: freeze a typed schema, open a stream, write
entity state. The consumer side is an HTTP/SSE read of the same :4437 stream
path — there is no in-process subscribe() handle, so a Fresh island (or any
SSE client) reads the durable log directly and materializes the latest value per
key.
// streams/executions-schema.ts
// Author against the core package — this is the live surface.
import { defineStreamSchema } from '@netscript/plugin-streams-core';
import { z } from 'zod';
// Each collection is an entity type with a primary key. The schema is the
// type contract producers and HTTP/SSE consumers are locked to.
export const executionsSchema = defineStreamSchema({
execution: {
schema: z.object({
id: z.string().min(1),
status: z.enum(['queued', 'running', 'succeeded', 'failed']),
updatedAt: z.string().datetime(),
}),
type: 'execution',
primaryKey: 'id',
},
});
// streams/producer.ts
import { createDurableStream } from '@netscript/plugin-streams-core';
import { executionsSchema } from './executions-schema.ts';
// createDurableStream returns a singleton producer per streamPath and begins
// connecting to the :4437 durable-stream server immediately.
const producer = createDurableStream({
streamPath: '/workers/executions',
schema: executionsSchema,
producerId: 'workers-service',
});
// upsert/delete are synchronous enqueues keyed by the collection primary key.
producer.upsert('execution', {
id: 'exec-1',
status: 'running',
updatedAt: new Date().toISOString(),
});
producer.delete('execution', 'exec-0');
// flush before graceful shutdown; it rethrows the connect error if the
// producer never connected (see known limitations).
await producer.flush();
// islands/ExecutionsView.tsx — read the durable log directly
import { getStreamsUrl } from '@netscript/plugin-streams-core';
// There is no in-process subscribe(); consumption is an HTTP/SSE read of the
// same stream path the producer writes to. getStreamsUrl resolves the :4437
// base from Aspire discovery / VITE env (see runtime resolvers below).
const base = getStreamsUrl();
const source = new EventSource(`${base}/workers/executions`);
const latest = new Map<string, unknown>(); // materialize latest value per key
source.onmessage = (ev) => {
const change = JSON.parse(ev.data) as { key: string; value?: unknown };
if (change.value === undefined) latest.delete(change.key);
else latest.set(change.key, change.value);
};
// @netscript/plugin-streams (the manifest root) re-exports topic-centric
// helpers that are NOT implemented. They fail loud, by design.
import {
defineStreamProducer,
defineStreamConsumer,
} from '@netscript/plugin-streams';
// The producer handle is returned, but publish() rejects.
const producer = defineStreamProducer(/* topic */);
// await producer.publish(event) // -> rejects with StreamUnsupportedOperationError
// The consumer's subscribe() throws synchronously the moment you call it.
const consumer = defineStreamConsumer(/* topic */);
// consumer.subscribe(handler) // -> throws StreamUnsupportedOperationError
// Correct path: import createDurableStream / defineStreamSchema from
// '@netscript/plugin-streams-core' instead (see the other tabs).
Key types first — the stream definition API
A stream schema is a map of collections. Each collection is a
CollectionDefinition — a Standard-Schema validator, a State-Protocol type
discriminator, and the primaryKey property the producer keys writes by.
defineStreamSchema(collections) freezes that map into a StateSchema that both
the producer and any HTTP/SSE consumer are locked to.
| Name | Type | Description |
|---|---|---|
schema |
unknown (Standard Schema validator) |
Standard-Schema-compatible validator (e.g. a zod object) used by durable-streams to validate the collection payload. |
type |
string (required) |
State Protocol type discriminator emitted for every event in this collection (e.g. 'execution'). |
primaryKey |
string (required) |
Property name on the value used as the entity primary key; upsert/delete are keyed by this property. |
defineStreamSchema returns a frozen StateSchema<TDef> — the durable-streams
runtime attaches per-collection event helpers (insert/update/upsert/delete,
the CollectionEventHelpers) so the schema can both validate and emit
State-Protocol ChangeEvents. The supported Operation set is
insert | update | delete | upsert.
Producer options — createDurableStream
createDurableStream(options) takes a DurableStreamProducerOptions and returns
a DurableStreamProducer. It is a singleton factory keyed by streamPath:
calling it twice with the same path returns the same live producer (a closed one
is replaced). Writes are idempotent via @durable-streams/client's
IdempotentProducer (stable producerId + auto-claim), so duplicate enqueues do
not double-apply downstream.
| Name | Type | Description |
|---|---|---|
streamPath |
string (required) |
Stream path relative to the base URL, e.g. '/workers/executions'. This is the singleton key and the path consumers read over HTTP/SSE. |
schema |
StateSchema |
The frozen schema returned by defineStreamSchema; binds the producer to its collection map. |
producerId |
string (required) |
Stable producer identity used for idempotent delivery (IdempotentProducer auto-claim). Keep it stable across restarts for duplicate-safe writes. |
signal |
AbortSignal? |
Optional abort signal consulted while opening the stream connection; aborting cancels the connect (an AbortError is treated as expected, not a failure). |
The DurableStreamProducer it returns exposes a small, synchronous-write surface
with an async flush/close for shutdown. (StreamProducerPort is the
implemented-by interface — the same four members, with entityType widened to
string.)
| Member | Shape | Behavior |
|---|---|---|
upsert(entityType, value) |
(K, Record) => void |
Enqueue an upsert keyed by the collection primaryKey; skipped (warns) if the key is missing/empty or the collection is unknown. |
delete(entityType, key) |
(K, string) => void |
Enqueue a delete by primary key; skipped (warns) on an empty key. |
flush() |
() => Promise<void> |
Await pending writes before shutdown; rethrows the connect error if the producer never connected. |
close() |
() => Promise<void> |
Flush, close the underlying handle, and release the singleton for this streamPath. |
streamPath |
string (readonly) |
The stream path this producer owns. |
closed |
boolean (get) |
Whether shutdown has begun; further upsert/delete calls are ignored. |
Runtime & transport — URL and auth resolution
The producer never hardcodes a host. createDurableStream resolves the
durable-stream base URL through getStreamsUrl() and the auth header through
getStreamsAuth(), both of which read the environment so the same code works
under Aspire, in a browser build, or against an explicit override. buildStreamUrl
joins a stream path onto that base, and inspectStreamTopic returns a JSON-stable
diagnostic report for a schema (handy in tests and CLI doctors).
| Name | Type | Description |
|---|---|---|
getStreamsUrl() |
() => string |
Resolves the durable-stream base URL. Server: DURABLE_STREAMS_URL override, else Aspire's services__streams__http__0 discovery var. Browser: VITE_services__streams__http__0 (or the VITE_STREAMS_URL shorthand). Throws a descriptive error if none resolve. |
getStreamsAuth() |
() => Record |
Builds the auth header from STREAMS_SECRET (or DURABLE_STREAMS_SECRET) as { Authorization: 'Bearer |
buildStreamUrl(path, baseUrl?) |
(string, string?) => string |
Joins a stream path onto the resolved base (or an explicit baseUrl), trimming a trailing slash on the base. |
inspectStreamTopic(input) |
(input) => StreamTopicInspectionReport |
Diagnostic: returns a JSON-stable report (package, target, summary, details with collections/streamPath/producerId) for a schema + optional producer metadata. |
| Name | Type | Description |
|---|---|---|
DURABLE_STREAMS_URL |
server override |
Explicit base URL; takes precedence over Aspire discovery (e.g. http://localhost:4437). |
services__streams__http__0 |
Aspire (server) |
Injected by the Aspire resource graph; the default server-side discovery path. |
VITE_services__streams__http__0 |
browser |
Vite-injected reference for browser/Fresh consumers; VITE_STREAMS_URL is the convenience shorthand. |
STREAMS_SECRET / DURABLE_STREAMS_SECRET |
auth |
Bearer secret for getStreamsAuth(); when set, every connect sends Authorization: Bearer |
Known limitations
Be deliberate about what the alpha producer does and does not guarantee.
Endpoints & manifest
The streams plugin is registered as a utility/infra plugin — note it requires
neither a database nor KV (requiresDb=false, requiresKv=false), unlike
workers, sagas, and triggers. Its durable-stream service listens on :4437 and
is wired into the Aspire resource graph so workers, sagas, and auth can publish
through it. The port is overridable via STREAMS_PORT or PORT.
| Property | Value |
|---|---|
| Plugin location | plugins/streams/ |
| Real producer package | @netscript/plugin-streams-core (createDurableStream, defineStreamSchema) |
| Manifest import | @netscript/plugin-streams — topic helpers throw StreamUnsupportedOperationError |
| Transport client | @durable-streams/client (IdempotentProducer) |
| Dev service port | :4437 (durable-stream Aspire service; override with STREAMS_PORT/PORT) |
| provider.kind | stream · category plugin · pluginType utility |
| Requires DB / KV | false / false |
| First-party producers | workers, sagas, triggers, auth (each streams/producer.ts → createDurableStream) |
| Consumer surface | HTTP/SSE from the :4437 server (Fresh clients) — no in-process subscribe() |
The plugin is referenced from netscript.config.ts as
./plugins/streams/mod.ts. Because workers, sagas, and triggers each list
streams in their dependencies, it is installed first in the dependency graph
and its :4437 service comes up so dependent producers have somewhere to write.
Production notes
Reference
This hub is intentionally thin — the full generated API lives in the reference.