Reference Implementations

Data

etl-pipeline

advanced

Extract from APIs, transform, load to warehouse with dedup and atomic transactions.

APIs Used

ctx.queuectx.cachectx.db.transaction()

Capabilities Required

data/etl/execute

What this demonstrates

  • 1ctx.queue for batch job orchestration across multiple data sources
  • 2ctx.cache for deduplication tracking (skip already-processed records)
  • 3ctx.db.transaction() for atomic multi-table loads
  • 4Extract → deduplicate → transform → atomic load pipeline
typescript
/**
* ETL Pipeline Agent - Production Reference Agent
*
* Canon alignment: KB 105
* Demonstrates: ctx.queue, ctx.cache, ctx.db, ctx.db.transaction()
*
* Real use case: Extract from APIs, transform, load to warehouse.
* Uses queue for jobs, cache for dedup, transactions for atomicity.
*/
import { handler, withProvenanceContext } from '@human/agent-sdk';
import type { ExecutionContext } from '@human/agent-sdk';
export const AGENT_ID = 'etl-pipeline';
export const VERSION = '1.0.0';
export const CAPABILITIES = ['data/etl/execute'];
export interface ETLPipelineInput {
source_id: string;
batch_size?: number;
}
export interface ETLPipelineOutput {
success: boolean;
rows_loaded: number;
job_ids: string[];
cache_hits: number;
provenance_id: string;
}
const execute = async (
ctx: ExecutionContext,
input: ETLPipelineInput
): Promise<ETLPipelineOutput> => {
ctx.log.info('Running ETL pipeline', { source_id: input.source_id });
const batchSize = input.batch_size ?? 100;
const cacheKey = `etl:${input.source_id}:last_batch`;
// Check cache for idempotency (ctx.cache)
const lastBatch = await ctx.cache.get(cacheKey);
const cacheHits = lastBatch ? 1 : 0;
// Enqueue extract job (ctx.queue)
const extractJobId = await ctx.queue.enqueue('extract', {
source_id: input.source_id,
batch_size: batchSize,
});
const transformJobId = await ctx.queue.enqueue('transform', {
source_id: input.source_id,
});
// Load in transaction (ctx.db().transaction)
const db = await ctx.db();
let rowsLoaded = 0;
await db.transaction(async (tx) => {
const mockRows = Array.from({ length: batchSize }, (_, i) => ({
source_id: input.source_id,
row_num: i + 1,
data: JSON.stringify({ id: i, value: `item-${i}` }),
}));
for (const row of mockRows) {
await tx.insert('warehouse_data', row);
rowsLoaded++;
}
});
// Update cache
await ctx.cache.set(cacheKey, JSON.stringify({ batch_size: batchSize, at: Date.now() }), {
ttl: 3600,
});
const provenanceId = await ctx.provenance.log(
withProvenanceContext(ctx, {
action: 'etl:pipeline_complete',
status: 'success',
input: { source_id: input.source_id },
output: { rows_loaded: rowsLoaded, job_ids: [extractJobId, transformJobId] },
})
);
return {
success: true,
rows_loaded: rowsLoaded,
job_ids: [extractJobId, transformJobId],
cache_hits: cacheHits,
provenance_id: provenanceId,
};
};
export default handler({
name: AGENT_ID,
id: AGENT_ID,
version: VERSION,
capabilities: CAPABILITIES,
manifest: {
operations: [
{
name: 'run',
description: 'Run ETL pipeline: extract, transform, load from source to warehouse',
paramsSchema: {
source_id: { type: 'string', required: true, description: 'Source identifier' },
batch_size: { type: 'number', description: 'Batch size for extraction' },
},
resultKind: 'agent.etl-pipeline.result',
},
],
},
execute,
});

Run the tests

From monorepo root

$ pnpm test:agents:reference

$ pnpm test:agents:reference:verbose

The reference suite runs all 23 agents with createMockExecutionContext(), verifying every ctx.* API call and output shape.