Reference Implementationsadvanced
Data
etl-pipeline
Extract from APIs, transform, load to warehouse with dedup and atomic transactions.
APIs Used
ctx.queuectx.cachectx.db.transaction()Capabilities Required
data/etl/executeWhat this demonstrates
- 1ctx.queue for batch job orchestration across multiple data sources
- 2ctx.cache for deduplication tracking (skip already-processed records)
- 3ctx.db.transaction() for atomic multi-table loads
- 4Extract → deduplicate → transform → atomic load pipeline
Source
View on GitHubtypescript
/** * ETL Pipeline Agent - Production Reference Agent * * Canon alignment: KB 105 * Demonstrates: ctx.queue, ctx.cache, ctx.db, ctx.db.transaction() * * Real use case: Extract from APIs, transform, load to warehouse. * Uses queue for jobs, cache for dedup, transactions for atomicity. */
import { handler, withProvenanceContext } from '@human/agent-sdk';import type { ExecutionContext } from '@human/agent-sdk';
export const AGENT_ID = 'etl-pipeline';export const VERSION = '1.0.0';export const CAPABILITIES = ['data/etl/execute'];
export interface ETLPipelineInput { source_id: string; batch_size?: number;}
export interface ETLPipelineOutput { success: boolean; rows_loaded: number; job_ids: string[]; cache_hits: number; provenance_id: string;}
const execute = async ( ctx: ExecutionContext, input: ETLPipelineInput): Promise<ETLPipelineOutput> => { ctx.log.info('Running ETL pipeline', { source_id: input.source_id });
const batchSize = input.batch_size ?? 100; const cacheKey = `etl:${input.source_id}:last_batch`;
// Check cache for idempotency (ctx.cache) const lastBatch = await ctx.cache.get(cacheKey); const cacheHits = lastBatch ? 1 : 0;
// Enqueue extract job (ctx.queue) const extractJobId = await ctx.queue.enqueue('extract', { source_id: input.source_id, batch_size: batchSize, });
const transformJobId = await ctx.queue.enqueue('transform', { source_id: input.source_id, });
// Load in transaction (ctx.db().transaction) const db = await ctx.db(); let rowsLoaded = 0; await db.transaction(async (tx) => { const mockRows = Array.from({ length: batchSize }, (_, i) => ({ source_id: input.source_id, row_num: i + 1, data: JSON.stringify({ id: i, value: `item-${i}` }), }));
for (const row of mockRows) { await tx.insert('warehouse_data', row); rowsLoaded++; } });
// Update cache await ctx.cache.set(cacheKey, JSON.stringify({ batch_size: batchSize, at: Date.now() }), { ttl: 3600, });
const provenanceId = await ctx.provenance.log( withProvenanceContext(ctx, { action: 'etl:pipeline_complete', status: 'success', input: { source_id: input.source_id }, output: { rows_loaded: rowsLoaded, job_ids: [extractJobId, transformJobId] }, }) );
return { success: true, rows_loaded: rowsLoaded, job_ids: [extractJobId, transformJobId], cache_hits: cacheHits, provenance_id: provenanceId, };};
export default handler({ name: AGENT_ID, id: AGENT_ID, version: VERSION, capabilities: CAPABILITIES, manifest: { operations: [ { name: 'run', description: 'Run ETL pipeline: extract, transform, load from source to warehouse', paramsSchema: { source_id: { type: 'string', required: true, description: 'Source identifier' }, batch_size: { type: 'number', description: 'Batch size for extraction' }, }, resultKind: 'agent.etl-pipeline.result', }, ], }, execute,});Run the tests
From monorepo root
$ pnpm test:agents:reference
$ pnpm test:agents:reference:verbose
The reference suite runs all 23 agents with createMockExecutionContext(), verifying every ctx.* API call and output shape.