Skip to main content
Alpha

Queue / KV / cron

Scope: three task recipes for the runtime primitives every NetScript app reaches for sooner or later — enqueueing and consuming a queue message, reading and writing KV, and scheduling a cron job. Each primitive is a small, provider-agnostic package (@netscript/queue, @netscript/kv, @netscript/cron) that resolves its backend from the environment, so the same code runs against an in-memory adapter on your laptop and a real broker or store under Aspire. This page shows the minimal, copy-able shape for each, calls out the one place where you must pick a backend by hand (the PostgreSQL queue provider), then points you at the full generated API.

These primitives are lower-level than the background-jobs plugin: a worker job is a managed handler with persistence, retries, and an HTTP trigger, whereas a raw queue is just a typed pipe. Use the plugin when you want lifecycle management; use these primitives when you want a thin abstraction you control. For the concept-level tour, read Capabilities — KV, queues & cron.

Prerequisites

Before you start
NameTypeDescription
A NetScript workspace netscript init Created per the Quickstart. Run these recipes from the workspace root or any workspace member.
aspire startning (for real backends) cd aspire && aspire start Aspire is step 2 of local dev — it provisions Postgres, Redis (cache), and any broker BEFORE your service connects. In-memory and Deno KV adapters need nothing extra; Redis, RabbitMQ, and the Postgres queue all expect Aspire up first.
Deno KV unstable flag --unstable-kv KV (and the KV-backed queue/cron fallbacks) use Deno KV. The scaffold's deno.json sets unstable: ['raw-imports', 'kv']; add --unstable-kv to any ad-hoc deno run/check that touches these packages.

Recipe 1 — KV: read and write durable state

@netscript/kv is a reactive key-value store with one API across Deno KV, Redis, and an in-memory backend. The shared lifecycle resolves a provider once and hands back a WatchableKv — a KvStore with get / set / delete / has / list / atomic, plus reactive watch / watchPrefix.

Step 1 — Get the shared store

// some-module.ts
import { getKv } from "@netscript/kv";

// Resolves (and initializes on first call) the shared WatchableKv singleton.
// Provider is auto-detected from the environment unless you pass a config.
const kv = await getKv();

getKv() takes an optional SharedKvConfig if you want to pin a provider or a Deno KV path. The provider set is 'redis' | 'deno-kv' | 'nitro' | 'auto'; leaving it unset means 'auto'.

Step 2 — Write, read, and expire

// Keys are KvKey = readonly Deno.KvKeyPart[] — a portable tuple key.
await kv.set(["users", 42, "profile"], { name: "Ada", status: "active" });

const entry = await kv.get<{ name: string; status: string }>(["users", 42, "profile"]);
console.log(entry?.value?.name); // "Ada"

// KvSetOptions.expireIn sets a TTL in milliseconds.
await kv.set(["session", "tok_123"], { userId: 42 }, { expireIn: 60_000 });

await kv.delete(["session", "tok_123"]);

Step 3 — List a prefix and watch for changes

// list() takes a KvListOptions selector — here, every key under ["users"].
for await (const item of kv.list({ prefix: ["users"] })) {
  console.log(item.key, item.value);
}

// watchPrefix() returns an AsyncIterable; iterate it with for-await, cancel via AbortSignal.
const controller = new AbortController();
for await (const event of kv.watchPrefix(["users"], { signal: controller.signal })) {
  console.log("changed:", event.key);
}
// …to stop:
controller.abort();
KV adapters (@netscript/kv)
NameTypeDescription
DenoKvAdapter @netscript/kv (provider 'deno-kv') Deno-native, backed by Deno.Kv, with native watch support. The default for local dev. Needs --unstable-kv.
MemoryKvAdapter @netscript/kv (provider 'auto' in tests) Volatile in-process store. Nothing to provision; data is lost on restart. Ideal for tests and isolated runs.
Redis adapter @netscript/kv/redis (provider 'redis') Importing the sub-path self-registers the 'redis' provider. Backed by the Redis/Garnet resource Aspire provisions. Reads its connection via getRedisConnectionFromEnv().

See the full surface — getRawKv, getActiveProvider, atomic compare-and-swap, the WatchableKv contract — in @netscript/kv.

Recipe 2 — Queue: enqueue and consume a message

@netscript/queue wraps battle-tested adapters behind one MessageQueue interface with optional Zod validation. A queue has exactly two operations: enqueue (producer) and listen (consumer). There are four backends — Deno KV, Redis, RabbitMQ (AMQP), and PostgreSQL. Backend auto-discovery probes RabbitMQ (AMQP) first, then Redis, then Deno KV; PostgreSQL is opt-in only (Step 4).

Step 1 — Create a typed queue

Prefer createTypedQueue so messages are validated with Zod at both enqueue and dequeue time. (createQueue gives you the untyped pipe if you don't want validation.)

// queue.ts
import { createTypedQueue } from "@netscript/queue";
import { z } from "zod";

const EmailJobSchema = z.object({
  to: z.string().email(),
  subject: z.string().min(1),
  body: z.string(),
});

// Name is the logical queue; the backend is auto-discovered (RabbitMQ → Redis → Deno KV).
export const emailQueue = createTypedQueue("welcome-emails", EmailJobSchema);

Step 2 — Enqueue from a producer

import { emailQueue } from "./queue.ts";

await emailQueue.enqueue({
  to: "ada@example.com",
  subject: "Welcome",
  body: "Thanks for signing up.",
});

// EnqueueOptions supports a delay (e.g. retry-after, scheduled send).
await emailQueue.enqueue(
  { to: "grace@example.com", subject: "Reminder", body: "Don't forget." },
  { delay: 30_000 },
);

Step 3 — Consume with a listener

import { emailQueue } from "./queue.ts";

// listen() runs the handler per message; MessageContext carries metadata + ack controls.
await emailQueue.listen(async (message, context) => {
  // `message` is already validated against EmailJobSchema.
  await sendEmail(message.to, message.subject, message.body);
  // Throwing here surfaces a QueueHandlerError; the adapter handles retry/ack semantics.
});

Step 4 — Pin the PostgreSQL backend explicitly

The PostgreSQL provider gives you a SQL-durable queue — useful when you already run Postgres (under Aspire) and want one fewer moving part than a dedicated broker, or when you want the queue and your application data to share a transactional store. Because auto-discovery is RabbitMQ → Redis → Deno KV only, Postgres never wins the probe; you must name it.

import { createQueue, QueueProvider } from "@netscript/queue";

// provider: 'postgres' (or QueueProvider.Postgres) is the only way to select it.
const jobs = createQueue("jobs", {
  provider: QueueProvider.Postgres,
  connection: {
    postgres: {
      // url is optional — when omitted it falls back to Aspire's getPostgresUri().
      url: Deno.env.get("DATABASE_URL"),
      // tableName defaults to 'message_queue'.
      tableName: "message_queue",
    },
  },
});

await jobs.enqueue({ id: "job-1", kind: "reindex" });
await jobs.listen(async (message, context) => {
  await handle(message);
});

The adapter uses row-claim semantics (FOR UPDATE SKIP LOCKED) with a visibility timeout plus ack/nack and a dead-letter store, so concurrent consumers don't double-claim a message. createTypedQueue accepts the same provider/connection options if you want Zod validation on the Postgres-backed queue too.

Queue backends (@netscript/queue) — four selectable providers
NameTypeDescription
Deno KV provider 'deno-kv' — auto-discovery fallback Zero-dependency local default when no broker is present. Durable to the Deno KV store. Last in the auto-discovery probe. Needs --unstable-kv.
Redis provider 'redis' — second probe Backed by the Redis/Garnet resource Aspire provisions. Selected when a Redis connection is discoverable and no AMQP broker is.
RabbitMQ (AMQP) provider 'rabbitmq' — first probe Full broker semantics via Fedify's AMQP adapter (amqplib). Preferred when an AMQP connection is available. Use createParallelQueue for concurrent processing.
PostgreSQL provider 'postgres' — EXPLICIT only SQL-durable queue over npm:pg with FOR UPDATE SKIP LOCKED row-claim, visibility timeout, ack/nack, and a dead-letter store. connection.postgres.{url,tableName} (url optional → Aspire getPostgresUri(); tableName defaults to 'message_queue'). Never chosen by auto-discovery — you must set provider:'postgres'.

For createParallelQueue, the typed/parallel options, the full QueueProvider enum, the connection.postgres shape, the QueueError hierarchy, and the standalone safeValidate / validateOrThrow helpers, see @netscript/queue.

Recipe 3 — Cron: schedule a recurring job

@netscript/cron is a runtime-agnostic scheduler over native Deno.cron and an in-memory scheduler for tests, with timezone support and job-lifecycle events. You get a scheduler from the factory and register jobs with a cron expression.

Step 1 — Get a scheduler

// scheduler.ts
import { getScheduler } from "@netscript/cron";

// Shared singleton, created on first call, auto-detecting the runtime backend.
const scheduler = getScheduler();

createScheduler(options?) builds a fresh scheduler if you'd rather not use the shared instance; stopScheduler() tears the default one down (handy between tests).

Step 2 — Schedule with a cron expression or a preset

import { getScheduler, CronPresets, isValidCronExpression } from "@netscript/cron";

const scheduler = getScheduler();

// Standard 5-field cron expression. Validate untrusted input first.
const expr = "*/15 * * * *"; // every 15 minutes
if (!isValidCronExpression(expr)) throw new Error(`bad cron: ${expr}`);

const cleanup = async () => {
  await purgeExpiredSessions();
};

await scheduler.schedule("session-cleanup", expr, cleanup);

// Or use a named preset instead of a raw expression.
await scheduler.schedule("nightly-report", CronPresets.EVERY_DAY, async () => {
  await emitDailyReport();
});

The scheduler contract, parseCronExpression, the full CronPresets set (EVERY_MINUTE, EVERY_5_MINUTES, EVERY_HOUR, EVERY_DAY, WEEKDAYS_9AM, …), and the adapter classes are documented in @netscript/cron.

See also