KV, queues & cron
The integration trio. Most backends end up hand-assembling three boring-but-load-bearing
seams: a key-value store for cache and session state, a message queue for
fire-and-forget work, and a cron scheduler for time-driven jobs. NetScript ships all
three as provider-agnostic packages — @netscript/kv, @netscript/queue, and
@netscript/cron — each exposing one typed contract across several backends and
auto-detecting the best available adapter from the Aspire environment, with a zero-config
local fallback so the same code runs on a laptop and in production.
What it is
Each package is built on the same NetScript opinion: a single typed contract, several adapters
behind it, and provider selection that resolves automatically from the environment so you never
branch on "is this local or Aspire?" in product code. KV unifies Deno KV and Redis-compatible caches (redis by default, or garnet via --cache-backend), plus
in-memory; the queue unifies Deno KV, Redis, RabbitMQ (AMQP), and PostgreSQL (plus a
KV-polling fallback for remote KV Connect endpoints); cron unifies native Deno.cron() and an
in-memory scheduler. None of the three requires you to operate Redis, RabbitMQ, or PostgreSQL
during development — the local adapters carry the same code until Aspire
provisions the real backends. When telemetry is on, queues are traced
for you with no manual instrumentation.
Learn → / Do →
One API, many backends
Each package keeps a single typed contract with several adapters behind it, and provider selection resolves automatically from the environment. The matrix below is the full provider surface — read across a capability to see which backends it supports and how each is chosen. Note one deliberate exception: the PostgreSQL queue backend is explicit-provider only — it is real and durable, but auto-detection never selects it (see the callout below).
| Name | Type | Description |
|---|---|---|
Deno KV |
kv · queue · cron |
Default zero-config fallback. KV stores locally; the queue uses native Deno KV queue ops; cron shares the same local-first philosophy via Deno.cron(). No external service required. |
memory |
kv · cron |
Process-local. MemoryKvAdapter for KV (must be constructed explicitly) and the in-memory cron adapter (provider: 'memory') — deterministic for tests. The KV default for local development without Redis is Deno KV (local file), not the in-memory adapter. |
Redis / Garnet |
kv · queue |
Production cache and queue backend. Selected when CACHE_PROVIDER=redis|garnet, REDIS_URI/GARNET_URI, or Aspire services__redis__*/services__garnet__* are present. Redis is the default cache Aspire provisions; Garnet is the Redis-compatible alternative via --cache-backend garnet. |
RabbitMQ (AMQP) |
queue |
Durable broker for high-throughput, multi-consumer queues. Chosen first by auto-detection when Aspire reports a rabbitmq service. Imported via @netscript/queue/adapters/amqp for direct access. |
PostgreSQL |
queue |
Durable SQL-backed queue (FOR UPDATE SKIP LOCKED row-claim, visibility timeout, ack/nack, DLQ). EXPLICIT-PROVIDER ONLY — set provider: QueueProvider.Postgres; never auto-detected. Configure connection.postgres.{url,tableName}. |
KV-polling |
queue |
KvPollingAdapter — used automatically when the Deno KV path is a remote HTTP/HTTPS endpoint (KV Connect), where native queue ops are unavailable. Tunable via connection.denoKv.{pollInterval,visibilityTimeout,maxRetries}. |
Deno.cron() |
cron |
Native runtime scheduler. Used by createScheduler() whenever the runtime exposes Deno.cron(); falls back to the in-memory adapter otherwise. |
KV — key types first
getKv(config?) returns a WatchableKv bound to the auto-detected backend. Keys are typed
tuples, values are fully typed, and KvSetOptions.expireIn (milliseconds) gives per-key TTL on
every adapter. Pass a SharedKvConfig to override provider selection on first access.
| Name | Type | Description |
|---|---|---|
provider |
KvProvider? |
Force a specific provider. The default 'auto' inspects the environment and chooses the best available backend. |
path |
string? |
Deno KV path or URL. |
redisUrl |
string? |
Explicit Redis connection URL. |
redisNamespace |
string? |
Prefix used for Redis-backed keys. |
skipServiceDiscovery |
boolean? |
Skip environment-based provider detection. |
| Name | Type | Description |
|---|---|---|
expireIn |
number? |
Time-to-live in milliseconds; the key is removed after the window elapses. Honoured by every adapter. |
getKv() is the headline entrypoint; the same module also exposes closeKv(), resetKv()
(for tests), getRawKv() (the underlying Deno.Kv when the provider is Deno KV),
getActiveProvider(), isKvInitialized(), and the isWatchable() type guard.
// main.ts
import { getKv } from '@netscript/kv';
const kv = await getKv(); // picks Redis/Garnet under Aspire, else local Deno KV
await kv.set(['users', 'alice'], { name: 'Alice', role: 'admin' });
const entry = await kv.get<{ name: string; role: string }>(['users', 'alice']);
console.log(entry?.value.name); // "Alice"
// Per-key TTL works on every adapter — expire a session after one hour.
await kv.set(['sessions', 'tok_abc'], { userId: 'u1' }, { expireIn: 3_600_000 });
// watch-jobs.ts
import { getKv } from '@netscript/kv';
const kv = await getKv();
// Stream every change under a prefix, including newly created keys.
for await (const event of kv.watchPrefix(['jobs', 'order-processor'])) {
console.log(`${event.key.join('/')} -> ${event.type}`, event.value);
}
// counter.ts
import { getKv } from '@netscript/kv';
const kv = await getKv();
const entry = await kv.get<number>(['counters', 'visits']);
const result = await kv.atomic(
[{ key: ['counters', 'visits'], versionstamp: entry?.versionstamp ?? null }],
[{ type: 'set', key: ['counters', 'visits'], value: (entry?.value ?? 0) + 1 }],
);
if (!result.ok) {
// A concurrent write landed since the read — retry the cycle.
}
// kv.test.ts — deterministic, process-local store for tests.
import { MemoryKvAdapter } from '@netscript/kv';
const kv = new MemoryKvAdapter();
await kv.set(['test', 'key'], 'value');
// Or pin Redis explicitly via the subpath import (keeps ioredis out of other graphs).
import { RedisKvAdapter } from '@netscript/kv/redis';
const redisKv = new RedisKvAdapter({ url: 'redis://localhost:6379', namespace: 'myapp' });
await redisKv.set(['cache', 'featured'], items, { expireIn: 60_000 });
Queues — key types first
createQueue<T>(name, options?) returns a MessageQueue<T> over the auto-detected backend. The
factory stays synchronous; the heavy Redis, RabbitMQ (AMQP), and PostgreSQL adapters resolve
lazily on first use and never enter your module graph until then. Producers call
enqueue(message, options?); consumers call listen(handler, options?); stop() drains and
releases the connection. The QueueProvider enum names the four backends —
DenoKv, Redis, RabbitMQ, Postgres.
| Name | Type | Description |
|---|---|---|
provider |
QueueProvider? |
Pin a backend. Omit to auto-discover (RabbitMQ → Redis → Deno KV). Set QueueProvider.Postgres to opt into the explicit-only Postgres adapter. |
autoDiscover |
boolean? = true |
Enable Aspire-environment discovery. Priority: RabbitMQ > Redis > Deno KV. |
retryAttempts |
number? = 3 |
Max retries for failed messages — only applies when the backend has no native retry. |
retryDelay |
number? = 1000 |
Delay between retry attempts in milliseconds — only when the backend has no native retry. |
connection |
QueueConnectionOptions? |
Provider-specific connection: connection.{denoKv,redis,rabbitmq,postgres}. KV Connect tuning lives under connection.denoKv (pollInterval, visibilityTimeout, maxRetries). |
deadLetterStore |
DeadLetterStorePort? |
Custom terminal-failure store. When omitted, adapters use their provider-specific durable default. |
disableAutoTracing |
boolean? = false |
Skip the automatic TracedQueue wrapper when you prefer to wire spans by hand. |
createTypedQueue(name, schema, options?) wraps a Zod schema and validates on enqueue/dequeue;
its TypedQueueOptions adds validateOnEnqueue (default true), validateOnDequeue (default
true), and onValidationError ('discard' default, 'dlq', or 'throw'). createParallelQueue
takes the same options plus a concurrency count.
// enqueue-email.ts
import { createQueue } from '@netscript/queue';
// Auto-detects: RabbitMQ (AMQP) under Aspire, else Redis, else Deno KV.
const emails = createQueue<{ to: string; body: string }>('emails');
await emails.enqueue({
to: 'user@example.com',
body: 'Welcome to NetScript.',
});
// Delay availability (e.g. a 5-minute reminder) via EnqueueOptions.
await emails.enqueue({ to: 'user@example.com', body: 'Still there?' }, { delay: 5 * 60 * 1000 });
// consume-email.ts
import { createQueue } from '@netscript/queue';
const emails = createQueue<{ to: string; body: string }>('emails');
// The handler receives the message and a MessageContext (messageId, deliveryCount, ack/nack).
await emails.listen(async (message, context) => {
await sendEmail(message.to, message.body);
// Some backends ack/retry natively (queue.nativeRetrial === true).
});
// On shutdown, drain in-flight work and release the backend connection.
await emails.stop();
// parallel-jobs.ts
import { createParallelQueue } from '@netscript/queue';
// Process up to 4 I/O-bound messages concurrently on a single listener.
const jobs = createParallelQueue<{ orderId: string }>('order-jobs', { concurrency: 4 });
await jobs.listen(async (message) => {
await fulfilOrder(message.orderId); // these run in parallel, up to 4 at a time
});
// concurrency <= 1 behaves exactly like createQueue. For CPU-bound work prefer Web Workers.
// typed-notifications.ts
import { z } from 'zod';
import { createTypedQueue } from '@netscript/queue';
const NotificationSchema = z.object({
type: z.enum(['email', 'sms']),
to: z.string(),
body: z.string(),
});
// Invalid payloads route to a dead-letter queue instead of being discarded.
const notifications = createTypedQueue('notifications', NotificationSchema, {
onValidationError: 'dlq',
});
// pg-queue.ts
import { createQueue, QueueProvider } from '@netscript/queue';
// PostgreSQL is never auto-detected — opt in explicitly.
const jobs = createQueue<{ orderId: string }>('order-jobs', {
provider: QueueProvider.Postgres, // or provider: 'postgres'
connection: {
postgres: {
// url is optional — falls back to the Aspire-provisioned Postgres URI.
url: 'postgres://app:secret@localhost:5432/app',
tableName: 'message_queue', // default table name
},
},
});
await jobs.enqueue({ orderId: 'ord_123' });
// pin-provider.ts
import { createQueue, QueueProvider } from '@netscript/queue';
// Force Redis regardless of what auto-detection would pick.
const jobs = createQueue('jobs', { provider: QueueProvider.Redis });
// Or tune the Deno KV / KV-polling adapter for a remote KV Connect endpoint.
const remote = createQueue('jobs', {
connection: {
denoKv: { path: 'https://kv.example.com', pollInterval: 500, visibilityTimeout: 60_000 },
},
});
Cron — key types first
createScheduler(options?) returns a CronScheduler that uses native Deno.cron() when the
runtime exposes it and an in-memory adapter otherwise.
schedule(id, expression, handler, options?) registers a job from a standard cron expression or
a CronPresets constant and resolves to a ScheduledJob; trigger(id) fires it manually (handy
in tests); unschedule(id) removes one; stop() tears the scheduler down. For simple
single-scheduler apps, getScheduler() returns a shared singleton (stopScheduler() resets it).
The handler may be a bare JobHandler or a ContextualJobHandler that receives a JobContext
(jobId, scheduledTime, actualTime, attempt, signal).
| Name | Type | Description |
|---|---|---|
provider |
CronProvider? |
Pin a provider. Omit to auto-detect by runtime — Deno.cron() when available, else the in-memory adapter. |
tickInterval |
number? = 60000 |
Poll interval in milliseconds for the memory adapter; only used when provider is 'memory'. |
| Name | Type | Description |
|---|---|---|
timezone |
string? = 'UTC' |
IANA timezone the cron expression is evaluated against, e.g. 'America/New_York'. |
runOnInit |
boolean? |
Run the handler once immediately on registration, in addition to the schedule. |
enabled |
boolean? = true |
Whether the job starts enabled. Register a job disabled and turn it on later. |
backoff |
object? |
Retry backoff: { type: 'fixed' | 'exponential' | 'linear', initialDelay, maxDelay?, multiplier? }. Delays are milliseconds. |
maxRetries |
number? |
Maximum retries on handler failure before the run is recorded as failed. |
metadata |
Record |
Arbitrary metadata stored with the job and surfaced on its ScheduledJob record. |
// daily-report.ts
import { createScheduler, CronPresets } from '@netscript/cron';
const scheduler = createScheduler(); // native Deno.cron() when available
const report = async () => {
await generateDailyReport();
};
await scheduler.schedule('daily-report', CronPresets.WEEKDAYS_9AM, report, {
timezone: 'America/New_York',
});
// Fire once on demand — useful for verifying wiring.
await scheduler.trigger('daily-report');
await scheduler.stop();
// hourly-cleanup.ts
import { createScheduler } from '@netscript/cron';
const scheduler = createScheduler({ provider: 'memory', tickInterval: 100 });
// Hourly cleanup; the contextual handler receives scheduling metadata.
const cleanup = async (context) => {
console.log(context.jobId, context.attempt, context.scheduledTime.toISOString());
};
await scheduler.schedule('cleanup', '0 * * * *', cleanup);
// observe-jobs.ts
import { createScheduler } from '@netscript/cron';
const scheduler = createScheduler({ provider: 'memory' });
// Observe jobRun / jobError / jobScheduled / jobUnscheduled.
scheduler.on('jobRun', (event) => {
if (!event.result.success) {
console.error(`Job ${event.jobId} failed`, event.result.error);
}
});
// validate-cron.ts
import { isValidCronExpression, parseCronExpression } from '@netscript/cron';
if (isValidCronExpression('0 9 * * 1-5')) {
const parsed = parseCronExpression('0 9 * * 1-5');
console.log(parsed?.hour, parsed?.dayOfWeek);
}
How they compose
The trio is strongest together. A common pattern: a cron job wakes on a schedule and enqueues a batch of work; queue consumers process each message in parallel; and a KV store holds the cursor, dedupe keys, or rate-limit counters that keep the whole thing idempotent across restarts. None of these requires you to operate Redis, RabbitMQ, or PostgreSQL during development — the local Deno KV / in-memory adapters carry the same code until Aspire provisions the real backends (or until you opt in to the PostgreSQL queue explicitly).
| Name | Type | Description |
|---|---|---|
Key-value state |
@netscript/kv |
Synchronous-feeling read/write state: caches, sessions, flags, counters, cursors. Use TTL for ephemerality and watches for reactivity. |
Fire-and-forget work |
@netscript/queue |
Decouple slow work from the request path. Fan out to multiple consumers; let the backend handle retries where nativeRetrial is true. Four backends: Deno KV, Redis, AMQP, PostgreSQL. |
Time-driven work |
@netscript/cron |
Run handlers on a schedule. Pair with a queue to fan a scheduled tick out into many parallel jobs. |
Stateful orchestration |
durable saga |
When work spans steps with correlation and compensation, a queue is not enough — model it as a durable saga instead. |