Reference Implementationsintermediate
Data
data-quality-checker
Validate data integrity, flag anomalies, escalate critical quality issues.
APIs Used
ctx.dbctx.provenancectx.escalate()ctx.telemetry.emitCapabilities Required
data/quality/checkWhat this demonstrates
- 1ctx.db.query() for data access and anomaly detection
- 2ctx.provenance.log() to create an immutable audit record of quality checks
- 3ctx.escalate() to escalate critical quality failures for human review
- 4Data quality gate: check → log findings → escalate critical → pass/fail
- 5ctx.telemetry.emit via emitReferenceAuthorSignal when issues are detected (tenant-safe author bus)
Source
View on GitHubtypescript
/** * Data Quality Checker - Production Reference Agent * * Canon alignment: KB 105, P5 (Human-in-Loop) * Demonstrates: ctx.db, ctx.provenance, ctx.escalate(), ctx.files.list() * * Real use case: Validate data integrity, flag anomalies, * escalate to human when critical issues found. */
import { handler, withProvenanceContext } from '@human/agent-sdk';import type { ExecutionContext } from '@human/agent-sdk';import { emitReferenceAuthorSignal } from '../../lib/reference-author-telemetry.js';
export const AGENT_ID = 'data-quality-checker';export const VERSION = '1.0.0';export const CAPABILITIES = ['data/quality/check'];
export interface DataQualityInput { dataset_id: string; directory?: string;}
export interface DataQualityOutput { success: boolean; checks_passed: number; checks_failed: number; issues: Array<{ type: string; message: string; severity: 'low' | 'medium' | 'high' }>; escalation_required: boolean; provenance_id: string;}
const execute = async ( ctx: ExecutionContext, input: DataQualityInput): Promise<DataQualityOutput> => { ctx.log.info('Running data quality checks', { dataset_id: input.dataset_id });
const directory = input.directory ?? `data/${input.dataset_id}`; const issues: DataQualityOutput['issues'] = []; let checksPassed = 0; let checksFailed = 0;
// List files (ctx.files.list) const files = await ctx.files.list(directory).catch(() => []);
if (files.length === 0) { issues.push({ type: 'empty_dataset', message: 'No files found in directory', severity: 'high', }); checksFailed++; } else { checksPassed++; }
// Query DB for record counts using parameterized query (never interpolate user input into SQL) const db = await ctx.db(); const countRows = (await db.query( `SELECT COUNT(*) as cnt FROM datasets WHERE dataset_id = $1`, [input.dataset_id] ).catch(() => [{ cnt: 0 }])) as Array<{ cnt: number }>; const recordCount = Number(countRows[0]?.cnt ?? 0);
if (recordCount === 0) { issues.push({ type: 'zero_records', message: 'Dataset has zero records', severity: 'high', }); checksFailed++; } else { checksPassed++; }
const hasHighSeverity = issues.some((i) => i.severity === 'high'); if (issues.length > 0) { await emitReferenceAuthorSignal(ctx, 'data_quality_issues', { dataset_id: input.dataset_id, issue_types: issues.map((i) => i.type), has_high_severity: hasHighSeverity, }); } let escalationRequired = false; let provenanceId: string;
if (hasHighSeverity) { const decision = await ctx.escalate({ reason: 'Critical data quality issues detected', context: { dataset_id: input.dataset_id, issues, checks_passed: checksPassed, checks_failed: checksFailed, }, requiredCapability: 'data/quality/approve', });
escalationRequired = !decision.approved; provenanceId = await ctx.provenance.log( withProvenanceContext(ctx, { action: 'data_quality:escalation', status: 'success', input: { dataset_id: input.dataset_id }, output: { escalation_required: escalationRequired, human_decision: decision.reason, }, }) ); } else { provenanceId = await ctx.provenance.log( withProvenanceContext(ctx, { action: 'data_quality:checked', status: 'success', input: { dataset_id: input.dataset_id }, output: { checks_passed: checksPassed, checks_failed: checksFailed }, }) ); }
return { success: !hasHighSeverity || !escalationRequired, checks_passed: checksPassed, checks_failed: checksFailed, issues, escalation_required: escalationRequired, provenance_id: provenanceId, };};
export default handler({ name: AGENT_ID, id: AGENT_ID, version: VERSION, capabilities: CAPABILITIES, manifest: { operations: [ { name: 'check', description: 'Validate data integrity, flag anomalies, escalate to human when critical', paramsSchema: { dataset_id: { type: 'string', required: true, description: 'Dataset identifier' }, directory: { type: 'string', description: 'Optional directory path for file checks' }, }, resultKind: 'agent.data-quality-checker.result', }, ], }, execute,});Run the tests
From monorepo root
$ pnpm test:agents:reference
$ pnpm test:agents:reference:verbose
The reference suite runs all 23 agents with createMockExecutionContext(), verifying every ctx.* API call and output shape.
See Also
SDK Reference