Skip to main content
Alpha

Publish a durable stream

Use createDurableStream(options) when a server-side runtime owns entity state that browser clients or other consumers should subscribe to through a durable stream service.

Prerequisites

  • A stream schema from defineStreamSchema(...).
  • A stable streamPath.
  • A stable producerId for idempotent delivery.
  • The streams runtime service reachable for the stream path.

Define the schema

defineStreamSchema(...) accepts collection definitions keyed by collection name. Each collection declares a schema, type, and primaryKey.

import { defineStreamSchema } from '@netscript/plugin-streams-core';

const standardShipmentSchema = {
  '~standard': {
    version: 1,
    vendor: 'example',
    validate: (value: unknown) => ({ value }),
  },
} as const;

export const shipmentStreamSchema = defineStreamSchema({
  shipments: {
    schema: standardShipmentSchema,
    type: 'shipment',
    primaryKey: 'id',
  },
});

Create the producer

createDurableStream({ streamPath, schema, producerId, signal }) returns a singleton producer for the stream path while it remains open.

import { createDurableStream } from '@netscript/plugin-streams-core';
import { shipmentStreamSchema } from './shipment-stream-schema.ts';

const shutdown = new AbortController();

const producer = createDurableStream({
  streamPath: '/shipments',
  schema: shipmentStreamSchema,
  producerId: 'shipping-service',
  signal: shutdown.signal,
});

Upsert and delete state

The producer validates the collection key against the schema and reads the entity primary key from the configured primaryKey field.

producer.upsert('shipments', {
  id: 'ship_123',
  orderId: 'ord_123',
  status: 'in_transit',
  updatedAt: new Date().toISOString(),
});

producer.delete('shipments', 'ship_123');

Flush on shutdown

Call flush() before a process exits so pending writes reach the stream runtime. close() flushes and removes the singleton producer for the stream path.

addEventListener('unload', async () => {
  shutdown.abort();
  await producer.flush();
  await producer.close();
});

Failure modes

  • Missing primary key: the producer skips the upsert for that entity.
  • Unknown collection key: TypeScript catches it when the schema is typed; unchecked dynamic values should be validated before calling upsert.
  • Runtime unavailable: flush() can throw the connection error. Treat this as a deploy/runtime failure, not proof of subprocess behavior.
  • Duplicate stream path: createDurableStream reuses the open producer for the same streamPath.

Next steps