Reference Implementations

Data

data-quality-checker

intermediate

Validate data integrity, flag anomalies, escalate critical quality issues.

APIs Used

ctx.dbctx.provenancectx.escalate()ctx.telemetry.emit

Capabilities Required

data/quality/check

What this demonstrates

  • 1ctx.db.query() for data access and anomaly detection
  • 2ctx.provenance.log() to create an immutable audit record of quality checks
  • 3ctx.escalate() to escalate critical quality failures for human review
  • 4Data quality gate: check → log findings → escalate critical → pass/fail
  • 5ctx.telemetry.emit via emitReferenceAuthorSignal when issues are detected (tenant-safe author bus)
typescript
/**
* Data Quality Checker - Production Reference Agent
*
* Canon alignment: KB 105, P5 (Human-in-Loop)
* Demonstrates: ctx.db, ctx.provenance, ctx.escalate(), ctx.files.list()
*
* Real use case: Validate data integrity, flag anomalies,
* escalate to human when critical issues found.
*/
import { handler, withProvenanceContext } from '@human/agent-sdk';
import type { ExecutionContext } from '@human/agent-sdk';
import { emitReferenceAuthorSignal } from '../../lib/reference-author-telemetry.js';
export const AGENT_ID = 'data-quality-checker';
export const VERSION = '1.0.0';
export const CAPABILITIES = ['data/quality/check'];
export interface DataQualityInput {
dataset_id: string;
directory?: string;
}
export interface DataQualityOutput {
success: boolean;
checks_passed: number;
checks_failed: number;
issues: Array<{ type: string; message: string; severity: 'low' | 'medium' | 'high' }>;
escalation_required: boolean;
provenance_id: string;
}
const execute = async (
ctx: ExecutionContext,
input: DataQualityInput
): Promise<DataQualityOutput> => {
ctx.log.info('Running data quality checks', { dataset_id: input.dataset_id });
const directory = input.directory ?? `data/${input.dataset_id}`;
const issues: DataQualityOutput['issues'] = [];
let checksPassed = 0;
let checksFailed = 0;
// List files (ctx.files.list)
const files = await ctx.files.list(directory).catch(() => []);
if (files.length === 0) {
issues.push({
type: 'empty_dataset',
message: 'No files found in directory',
severity: 'high',
});
checksFailed++;
} else {
checksPassed++;
}
// Query DB for record counts using parameterized query (never interpolate user input into SQL)
const db = await ctx.db();
const countRows = (await db.query(
`SELECT COUNT(*) as cnt FROM datasets WHERE dataset_id = $1`,
[input.dataset_id]
).catch(() => [{ cnt: 0 }])) as Array<{ cnt: number }>;
const recordCount = Number(countRows[0]?.cnt ?? 0);
if (recordCount === 0) {
issues.push({
type: 'zero_records',
message: 'Dataset has zero records',
severity: 'high',
});
checksFailed++;
} else {
checksPassed++;
}
const hasHighSeverity = issues.some((i) => i.severity === 'high');
if (issues.length > 0) {
await emitReferenceAuthorSignal(ctx, 'data_quality_issues', {
dataset_id: input.dataset_id,
issue_types: issues.map((i) => i.type),
has_high_severity: hasHighSeverity,
});
}
let escalationRequired = false;
let provenanceId: string;
if (hasHighSeverity) {
const decision = await ctx.escalate({
reason: 'Critical data quality issues detected',
context: {
dataset_id: input.dataset_id,
issues,
checks_passed: checksPassed,
checks_failed: checksFailed,
},
requiredCapability: 'data/quality/approve',
});
escalationRequired = !decision.approved;
provenanceId = await ctx.provenance.log(
withProvenanceContext(ctx, {
action: 'data_quality:escalation',
status: 'success',
input: { dataset_id: input.dataset_id },
output: {
escalation_required: escalationRequired,
human_decision: decision.reason,
},
})
);
} else {
provenanceId = await ctx.provenance.log(
withProvenanceContext(ctx, {
action: 'data_quality:checked',
status: 'success',
input: { dataset_id: input.dataset_id },
output: { checks_passed: checksPassed, checks_failed: checksFailed },
})
);
}
return {
success: !hasHighSeverity || !escalationRequired,
checks_passed: checksPassed,
checks_failed: checksFailed,
issues,
escalation_required: escalationRequired,
provenance_id: provenanceId,
};
};
export default handler({
name: AGENT_ID,
id: AGENT_ID,
version: VERSION,
capabilities: CAPABILITIES,
manifest: {
operations: [
{
name: 'check',
description: 'Validate data integrity, flag anomalies, escalate to human when critical',
paramsSchema: {
dataset_id: { type: 'string', required: true, description: 'Dataset identifier' },
directory: { type: 'string', description: 'Optional directory path for file checks' },
},
resultKind: 'agent.data-quality-checker.result',
},
],
},
execute,
});

Run the tests

From monorepo root

$ pnpm test:agents:reference

$ pnpm test:agents:reference:verbose

The reference suite runs all 23 agents with createMockExecutionContext(), verifying every ctx.* API call and output shape.

See Also