Skip to main content
Alpha

KV, queues & cron

The integration trio. Most backends end up hand-assembling three boring-but-load-bearing seams: a key-value store for cache and session state, a message queue for fire-and-forget work, and a cron scheduler for time-driven jobs. NetScript ships all three as provider-agnostic packages — @netscript/kv, @netscript/queue, and @netscript/cron — each exposing one typed contract across several backends and auto-detecting the best available adapter from the Aspire environment, with a zero-config local fallback so the same code runs on a laptop and in production.

A cron schedule fires a tick that enqueues work onto a provider-agnostic queue; one or more worker listeners drain the queue in parallel while a KV store holds cursors, dedupe keys, and counters that keep the loop idempotent.
The trio in one loop: cron tick → enqueue → parallel queue listeners → KV state holds the cursor, dedupe keys, and counters.

What it is

Each package is built on the same NetScript opinion: a single typed contract, several adapters behind it, and provider selection that resolves automatically from the environment so you never branch on "is this local or Aspire?" in product code. KV unifies Deno KV and Redis-compatible caches (redis by default, or garnet via --cache-backend), plus in-memory; the queue unifies Deno KV, Redis, RabbitMQ (AMQP), and PostgreSQL (plus a KV-polling fallback for remote KV Connect endpoints); cron unifies native Deno.cron() and an in-memory scheduler. None of the three requires you to operate Redis, RabbitMQ, or PostgreSQL during development — the local adapters carry the same code until Aspire provisions the real backends. When telemetry is on, queues are traced for you with no manual instrumentation.

Learn → / Do →

One API, many backends

Each package keeps a single typed contract with several adapters behind it, and provider selection resolves automatically from the environment. The matrix below is the full provider surface — read across a capability to see which backends it supports and how each is chosen. Note one deliberate exception: the PostgreSQL queue backend is explicit-provider only — it is real and durable, but auto-detection never selects it (see the callout below).

Adapter × capability — supported backends and how they are selected
NameTypeDescription
Deno KV kv · queue · cron Default zero-config fallback. KV stores locally; the queue uses native Deno KV queue ops; cron shares the same local-first philosophy via Deno.cron(). No external service required.
memory kv · cron Process-local. MemoryKvAdapter for KV (must be constructed explicitly) and the in-memory cron adapter (provider: 'memory') — deterministic for tests. The KV default for local development without Redis is Deno KV (local file), not the in-memory adapter.
Redis / Garnet kv · queue Production cache and queue backend. Selected when CACHE_PROVIDER=redis|garnet, REDIS_URI/GARNET_URI, or Aspire services__redis__*/services__garnet__* are present. Redis is the default cache Aspire provisions; Garnet is the Redis-compatible alternative via --cache-backend garnet.
RabbitMQ (AMQP) queue Durable broker for high-throughput, multi-consumer queues. Chosen first by auto-detection when Aspire reports a rabbitmq service. Imported via @netscript/queue/adapters/amqp for direct access.
PostgreSQL queue Durable SQL-backed queue (FOR UPDATE SKIP LOCKED row-claim, visibility timeout, ack/nack, DLQ). EXPLICIT-PROVIDER ONLY — set provider: QueueProvider.Postgres; never auto-detected. Configure connection.postgres.{url,tableName}.
KV-polling queue KvPollingAdapter — used automatically when the Deno KV path is a remote HTTP/HTTPS endpoint (KV Connect), where native queue ops are unavailable. Tunable via connection.denoKv.{pollInterval,visibilityTimeout,maxRetries}.
Deno.cron() cron Native runtime scheduler. Used by createScheduler() whenever the runtime exposes Deno.cron(); falls back to the in-memory adapter otherwise.

KV — key types first

getKv(config?) returns a WatchableKv bound to the auto-detected backend. Keys are typed tuples, values are fully typed, and KvSetOptions.expireIn (milliseconds) gives per-key TTL on every adapter. Pass a SharedKvConfig to override provider selection on first access.

SharedKvConfig — first-access overrides for getKv()
NameTypeDescription
provider KvProvider? Force a specific provider. The default 'auto' inspects the environment and chooses the best available backend.
path string? Deno KV path or URL.
redisUrl string? Explicit Redis connection URL.
redisNamespace string? Prefix used for Redis-backed keys.
skipServiceDiscovery boolean? Skip environment-based provider detection.
KvSetOptions — per-key write options
NameTypeDescription
expireIn number? Time-to-live in milliseconds; the key is removed after the window elapses. Honoured by every adapter.

getKv() is the headline entrypoint; the same module also exposes closeKv(), resetKv() (for tests), getRawKv() (the underlying Deno.Kv when the provider is Deno KV), getActiveProvider(), isKvInitialized(), and the isWatchable() type guard.

// main.ts
import { getKv } from '@netscript/kv';

const kv = await getKv(); // picks Redis/Garnet under Aspire, else local Deno KV

await kv.set(['users', 'alice'], { name: 'Alice', role: 'admin' });

const entry = await kv.get<{ name: string; role: string }>(['users', 'alice']);
console.log(entry?.value.name); // "Alice"

// Per-key TTL works on every adapter — expire a session after one hour.
await kv.set(['sessions', 'tok_abc'], { userId: 'u1' }, { expireIn: 3_600_000 });
// watch-jobs.ts
import { getKv } from '@netscript/kv';

const kv = await getKv();

// Stream every change under a prefix, including newly created keys.
for await (const event of kv.watchPrefix(['jobs', 'order-processor'])) {
  console.log(`${event.key.join('/')} -> ${event.type}`, event.value);
}
// counter.ts
import { getKv } from '@netscript/kv';

const kv = await getKv();

const entry = await kv.get<number>(['counters', 'visits']);
const result = await kv.atomic(
  [{ key: ['counters', 'visits'], versionstamp: entry?.versionstamp ?? null }],
  [{ type: 'set', key: ['counters', 'visits'], value: (entry?.value ?? 0) + 1 }],
);

if (!result.ok) {
  // A concurrent write landed since the read — retry the cycle.
}
// kv.test.ts — deterministic, process-local store for tests.
import { MemoryKvAdapter } from '@netscript/kv';

const kv = new MemoryKvAdapter();
await kv.set(['test', 'key'], 'value');

// Or pin Redis explicitly via the subpath import (keeps ioredis out of other graphs).
import { RedisKvAdapter } from '@netscript/kv/redis';

const redisKv = new RedisKvAdapter({ url: 'redis://localhost:6379', namespace: 'myapp' });
await redisKv.set(['cache', 'featured'], items, { expireIn: 60_000 });

Queues — key types first

createQueue<T>(name, options?) returns a MessageQueue<T> over the auto-detected backend. The factory stays synchronous; the heavy Redis, RabbitMQ (AMQP), and PostgreSQL adapters resolve lazily on first use and never enter your module graph until then. Producers call enqueue(message, options?); consumers call listen(handler, options?); stop() drains and releases the connection. The QueueProvider enum names the four backends — DenoKv, Redis, RabbitMQ, Postgres.

QueueOptions — passed to createQueue / createTypedQueue / createParallelQueue
NameTypeDescription
provider QueueProvider? Pin a backend. Omit to auto-discover (RabbitMQ → Redis → Deno KV). Set QueueProvider.Postgres to opt into the explicit-only Postgres adapter.
autoDiscover boolean? = true Enable Aspire-environment discovery. Priority: RabbitMQ > Redis > Deno KV.
retryAttempts number? = 3 Max retries for failed messages — only applies when the backend has no native retry.
retryDelay number? = 1000 Delay between retry attempts in milliseconds — only when the backend has no native retry.
connection QueueConnectionOptions? Provider-specific connection: connection.{denoKv,redis,rabbitmq,postgres}. KV Connect tuning lives under connection.denoKv (pollInterval, visibilityTimeout, maxRetries).
deadLetterStore DeadLetterStorePort? Custom terminal-failure store. When omitted, adapters use their provider-specific durable default.
disableAutoTracing boolean? = false Skip the automatic TracedQueue wrapper when you prefer to wire spans by hand.

createTypedQueue(name, schema, options?) wraps a Zod schema and validates on enqueue/dequeue; its TypedQueueOptions adds validateOnEnqueue (default true), validateOnDequeue (default true), and onValidationError ('discard' default, 'dlq', or 'throw'). createParallelQueue takes the same options plus a concurrency count.

// enqueue-email.ts
import { createQueue } from '@netscript/queue';

// Auto-detects: RabbitMQ (AMQP) under Aspire, else Redis, else Deno KV.
const emails = createQueue<{ to: string; body: string }>('emails');

await emails.enqueue({
  to: 'user@example.com',
  body: 'Welcome to NetScript.',
});

// Delay availability (e.g. a 5-minute reminder) via EnqueueOptions.
await emails.enqueue({ to: 'user@example.com', body: 'Still there?' }, { delay: 5 * 60 * 1000 });
// consume-email.ts
import { createQueue } from '@netscript/queue';

const emails = createQueue<{ to: string; body: string }>('emails');

// The handler receives the message and a MessageContext (messageId, deliveryCount, ack/nack).
await emails.listen(async (message, context) => {
  await sendEmail(message.to, message.body);
  // Some backends ack/retry natively (queue.nativeRetrial === true).
});

// On shutdown, drain in-flight work and release the backend connection.
await emails.stop();
// parallel-jobs.ts
import { createParallelQueue } from '@netscript/queue';

// Process up to 4 I/O-bound messages concurrently on a single listener.
const jobs = createParallelQueue<{ orderId: string }>('order-jobs', { concurrency: 4 });

await jobs.listen(async (message) => {
  await fulfilOrder(message.orderId); // these run in parallel, up to 4 at a time
});

// concurrency <= 1 behaves exactly like createQueue. For CPU-bound work prefer Web Workers.
// typed-notifications.ts
import { z } from 'zod';
import { createTypedQueue } from '@netscript/queue';

const NotificationSchema = z.object({
  type: z.enum(['email', 'sms']),
  to: z.string(),
  body: z.string(),
});

// Invalid payloads route to a dead-letter queue instead of being discarded.
const notifications = createTypedQueue('notifications', NotificationSchema, {
  onValidationError: 'dlq',
});
// pg-queue.ts
import { createQueue, QueueProvider } from '@netscript/queue';

// PostgreSQL is never auto-detected — opt in explicitly.
const jobs = createQueue<{ orderId: string }>('order-jobs', {
  provider: QueueProvider.Postgres,        // or provider: 'postgres'
  connection: {
    postgres: {
      // url is optional — falls back to the Aspire-provisioned Postgres URI.
      url: 'postgres://app:secret@localhost:5432/app',
      tableName: 'message_queue',           // default table name
    },
  },
});

await jobs.enqueue({ orderId: 'ord_123' });
// pin-provider.ts
import { createQueue, QueueProvider } from '@netscript/queue';

// Force Redis regardless of what auto-detection would pick.
const jobs = createQueue('jobs', { provider: QueueProvider.Redis });

// Or tune the Deno KV / KV-polling adapter for a remote KV Connect endpoint.
const remote = createQueue('jobs', {
  connection: {
    denoKv: { path: 'https://kv.example.com', pollInterval: 500, visibilityTimeout: 60_000 },
  },
});

Cron — key types first

createScheduler(options?) returns a CronScheduler that uses native Deno.cron() when the runtime exposes it and an in-memory adapter otherwise. schedule(id, expression, handler, options?) registers a job from a standard cron expression or a CronPresets constant and resolves to a ScheduledJob; trigger(id) fires it manually (handy in tests); unschedule(id) removes one; stop() tears the scheduler down. For simple single-scheduler apps, getScheduler() returns a shared singleton (stopScheduler() resets it). The handler may be a bare JobHandler or a ContextualJobHandler that receives a JobContext (jobId, scheduledTime, actualTime, attempt, signal).

CreateSchedulerOptions — passed to createScheduler / getScheduler
NameTypeDescription
provider CronProvider? Pin a provider. Omit to auto-detect by runtime — Deno.cron() when available, else the in-memory adapter.
tickInterval number? = 60000 Poll interval in milliseconds for the memory adapter; only used when provider is 'memory'.
ScheduleOptions — per-job scheduling options
NameTypeDescription
timezone string? = 'UTC' IANA timezone the cron expression is evaluated against, e.g. 'America/New_York'.
runOnInit boolean? Run the handler once immediately on registration, in addition to the schedule.
enabled boolean? = true Whether the job starts enabled. Register a job disabled and turn it on later.
backoff object? Retry backoff: { type: 'fixed' | 'exponential' | 'linear', initialDelay, maxDelay?, multiplier? }. Delays are milliseconds.
maxRetries number? Maximum retries on handler failure before the run is recorded as failed.
metadata Record? Arbitrary metadata stored with the job and surfaced on its ScheduledJob record.
// daily-report.ts
import { createScheduler, CronPresets } from '@netscript/cron';

const scheduler = createScheduler(); // native Deno.cron() when available

const report = async () => {
  await generateDailyReport();
};

await scheduler.schedule('daily-report', CronPresets.WEEKDAYS_9AM, report, {
  timezone: 'America/New_York',
});

// Fire once on demand — useful for verifying wiring.
await scheduler.trigger('daily-report');
await scheduler.stop();
// hourly-cleanup.ts
import { createScheduler } from '@netscript/cron';

const scheduler = createScheduler({ provider: 'memory', tickInterval: 100 });

// Hourly cleanup; the contextual handler receives scheduling metadata.
const cleanup = async (context) => {
  console.log(context.jobId, context.attempt, context.scheduledTime.toISOString());
};

await scheduler.schedule('cleanup', '0 * * * *', cleanup);
// observe-jobs.ts
import { createScheduler } from '@netscript/cron';

const scheduler = createScheduler({ provider: 'memory' });

// Observe jobRun / jobError / jobScheduled / jobUnscheduled.
scheduler.on('jobRun', (event) => {
  if (!event.result.success) {
    console.error(`Job ${event.jobId} failed`, event.result.error);
  }
});
// validate-cron.ts
import { isValidCronExpression, parseCronExpression } from '@netscript/cron';

if (isValidCronExpression('0 9 * * 1-5')) {
  const parsed = parseCronExpression('0 9 * * 1-5');
  console.log(parsed?.hour, parsed?.dayOfWeek);
}

How they compose

The trio is strongest together. A common pattern: a cron job wakes on a schedule and enqueues a batch of work; queue consumers process each message in parallel; and a KV store holds the cursor, dedupe keys, or rate-limit counters that keep the whole thing idempotent across restarts. None of these requires you to operate Redis, RabbitMQ, or PostgreSQL during development — the local Deno KV / in-memory adapters carry the same code until Aspire provisions the real backends (or until you opt in to the PostgreSQL queue explicitly).

Picking the right primitive
NameTypeDescription
Key-value state @netscript/kv Synchronous-feeling read/write state: caches, sessions, flags, counters, cursors. Use TTL for ephemerality and watches for reactivity.
Fire-and-forget work @netscript/queue Decouple slow work from the request path. Fan out to multiple consumers; let the backend handle retries where nativeRetrial is true. Four backends: Deno KV, Redis, AMQP, PostgreSQL.
Time-driven work @netscript/cron Run handlers on a schedule. Pair with a queue to fan a scheduled tick out into many parallel jobs.
Stateful orchestration durable saga When work spans steps with correlation and compensation, a queue is not enough — model it as a durable saga instead.

Reference →

kv · queue ·

cron