A CSV file-watch import job
In Chapter 1 you stood up my-erp/ with the workers and
triggers plugins running. Now you wire the first real piece of the pipeline: a file-watch trigger
that fires the moment a supplier drops a CSV into a watched folder, and a durable background job
that parses it. This is the ingest core of an ERP sync — an inbound file becomes durable background
work.
What you will build
By the end of this chapter, dropping a file named products_*.csv into
my-erp/.data/incoming/products will automatically enqueue and run a worker job that reads the file,
parses its rows, logs a summary, and returns a structured success result — all without any HTTP call.
You author two files: a trigger (defineFileWatch) and a job (defineJobHandler), then
register the job so the trigger can address it by id.
Before you begin
You need the workspace from Chapter 1 with both the workers
and triggers plugins installed, and aspire start healthy. Confirm the prior state from the project
root:
netscript plugin list
Expected: workers and triggers both appear. Also confirm Aspire is up — the file-watch processor
and the workers runtime both depend on it:
curl http://localhost:8091/health # workers API, healthy JSON
curl http://localhost:8093/health # triggers API, healthy JSON
If either is missing, return to Chapter 1 — do not start over.
Step 1 — Create the watched folder
The trigger watches a directory; create it (and the staging path suppliers will write to) so the watcher has something to attach to:
mkdir -p .data/incoming/products
This is just a local folder in your workspace. In production it would be a mounted share or sync target the supplier writes to; the trigger does not care where the bytes come from, only that a matching file appears.
Step 2 — Author the import job
A NetScript job is a function wrapped by defineJobHandler, given a stable id, and exported as the
module default. Inside the handler you receive a ctx carrying the payload, do the work, and return
a result built with createSuccessResult / createFailureResult. Create the job under the workers
plugin:
// plugins/workers/jobs/import-products.ts
import {
createFailureResult,
createSuccessResult,
defineJobHandler,
} from '@netscript/plugin-workers-core';
import { z } from 'zod';
// The file-watch trigger hands the job the path of the file that landed.
const ImportProductsPayloadSchema = z.object({
filePath: z.string().min(1),
fileName: z.string().min(1),
});
const handler = defineJobHandler(async (ctx) => {
const { filePath, fileName } = ImportProductsPayloadSchema.parse(ctx.payload ?? {});
// 1. Read the staged file.
let rawContent: string;
try {
rawContent = await Deno.readTextFile(filePath);
} catch (error: unknown) {
const message = error instanceof Error ? error.message : String(error);
return createFailureResult(`Failed to read ${fileName}: ${message}`);
}
// 2. Parse a simple CSV (header row + data rows). No external lib needed.
const lines = rawContent.trim().split('\n').filter((line) => line.trim().length > 0);
if (lines.length < 2) {
return createFailureResult('CSV is empty or has no data rows');
}
const headers = lines[0].split(',').map((h) => h.trim().toLowerCase());
const rows = lines.slice(1).map((line) => {
const values = line.split(',').map((v) => v.trim());
const row: Record<string, string> = {};
headers.forEach((h, i) => row[h] = values[i] ?? '');
return row;
});
// 3. Return a structured result. The runtime records it on the execution.
return createSuccessResult({ fileName, rowCount: rows.length, headers });
});
export default Object.assign(handler, { id: 'import-products' });
The two things to read off this: the handler is an async/arrow function (never a bare function),
and the stable id is attached with Object.assign so the runtime registry can address the job by a
predictable string rather than its filename.
Step 3 — Author the file-watch trigger
Now the trigger. defineFileWatch(handler, spec) comes from
@netscript/plugin-triggers-core/builders. The handler returns an array of effects; the one you
want is enqueueJob(jobRef, { payload }), which hands a worker job the inbound event. The job is
referenced by a small typed object — its id, name, topic, and entrypoint.
// plugins/triggers/product-import.ts
import { defineFileWatch, enqueueJob } from '@netscript/plugin-triggers-core/builders';
import type { JobDefinition } from '@netscript/plugin-workers-core';
// A reference to the worker job authored in Step 2.
const importProductsJob = {
id: 'import-products' as JobDefinition<'import-products'>['id'],
name: 'Import Products',
topic: 'default',
entrypoint: './workers/jobs/import-products.ts',
} satisfies JobDefinition<'import-products'>;
export default defineFileWatch(
// event.payload carries filePath / fileName for the matched file.
(event) => Promise.resolve([enqueueJob(importProductsJob, { payload: event.payload })]),
{
id: 'product-import-trigger',
paths: ['.data/incoming/products'],
patterns: ['products_*.csv'],
on: ['create'],
stabilityThreshold: { checkIntervalMs: 1000, stableChecks: 2 },
description: 'Watches for product CSV files and starts the import job.',
tags: ['file-watch', 'product', 'import'],
},
);
Read the spec carefully — it is the whole behavior of the watcher:
| Name | Type | Description |
|---|---|---|
id |
string |
Stable identifier for this trigger, used in logs and the events feed. |
paths |
string[] |
Directories to watch. Here, the .data/incoming/products folder you created in Step 1. |
patterns |
string[] |
Glob patterns a file must match to fire the handler. products_*.csv ignores everything else. |
on |
('create' | 'modify' | 'remove')[] |
Which filesystem events fire the trigger. 'create' = a new file landing. |
stabilityThreshold |
{ checkIntervalMs, stableChecks } |
Debounce: wait until the file size is unchanged across stableChecks polls before firing, so a half-written upload is never parsed. |
Step 4 — Register the job
The trigger addresses the job by id, which means the workers runtime needs a generated registry
that maps each id to its handler. Generate the plugin registries:
netscript generate plugins
This scans plugins/workers/jobs (and the triggers plugin) and writes a registry the running
services load. After this, import-products is addressable, and product-import-trigger is loaded
by the file-watch processor.
Verify your progress
With Aspire up, drop a matching CSV into the watched folder and watch the pipeline run end to end. First create a sample file:
cat > .data/incoming/products_2024.csv <<'CSV'
name,sku,price
Widget,WID-1,9.99
Gadget,GAD-2,19.99
CSV
mv .data/incoming/products_2024.csv .data/incoming/products/products_2024.csv
Moving the finished file into the watched folder fires a single create event (and avoids the
watcher seeing a half-written file). Now confirm both sides of the hand-off:
# 1. The trigger recorded the inbound file event (:8093).
curl 'http://localhost:8093/api/v1/events?limit=10'
# 2. The job it enqueued executed (:8091).
curl 'http://localhost:8091/api/v1/workers/executions?limit=10'
Expected: the events feed lists a product-import-trigger event, and the executions feed shows a
completed import-products run whose result is { "fileName": "products_2024.csv", "rowCount": 2, "headers": ["name","sku","price"] }. Open the workers resource logs in the
Aspire dashboard to read the job's structured log lines.
- [ ]
.data/incoming/products/exists and you dropped aproducts_*.csvinto it. - [ ] The triggers events feed shows a
product-import-triggerevent. - [ ] The workers executions feed shows a completed
import-productsrun withrowCount: 2. - [ ]
deno task checkis clean.
What you built
A file-watch trigger (defineFileWatch) that fires on every products_*.csv landing in a watched
folder, and a durable background job (defineJobHandler) that parses it — wired together by
enqueueJob and made addressable with netscript generate plugins. An inbound file now becomes
durable background work with no HTTP in the loop. Next, you will see how a non-TypeScript step could
join this pipeline.