Reference Implementationsadvanced
Workflows
resilient-pipeline
Pipeline with retry, fallback, and human escalation for error recovery.
APIs Used
ctx.filesctx.escalate()ctx.telemetry.emitCapabilities Required
workflows/resilient-pipelineworkflows/error-recoveryWhat this demonstrates
- 1Retry loop with configurable max attempts and delay
- 2ctx.escalate() escalation when retries are exhausted
- 3ctx.files for primary/fallback document reads in the reference implementation
- 4Typed SDKError handling with machine-readable error codes
- 5Production error recovery: retry → fallback → escalate → abort
- 6ctx.telemetry.emit via emitReferenceAuthorSignal on retry and fallback activation
Source
View on GitHubtypescript
/** * Resilient Pipeline - Production Reference Agent * * Canon alignment: KB 105 (Agent SDK), KB 22 (HumanOS) * Demonstrates: Error recovery patterns using SDK error types * - Catching TimeoutError and retrying with backoff * - Catching NotFoundError and using fallback data sources * - Catching AccessDeniedError and escalating via ctx.escalate * - Logging all error events to provenance for auditability * * This agent exercises the MockErrorConfig system. In the reference suite, * it runs against a mock context configured with `failNext` to prove that * agents can gracefully recover from transient failures. * * Principle: P10 (Fourth Law) - AI must know when it doesn't know. * When errors occur, the agent degrades gracefully rather than failing hard. */
import { handler, withProvenanceContext, NotFoundError, TimeoutError, AccessDeniedError } from '@human/agent-sdk';import type { ExecutionContext } from '@human/agent-sdk';import { emitReferenceAuthorSignal } from '../../lib/reference-author-telemetry.js';
export const AGENT_ID = 'resilient-pipeline';export const VERSION = '1.0.0';export const CAPABILITIES = ['workflows/resilient-pipeline', 'workflows/error-recovery'];
export interface ResilientPipelineInput { /** Data source to process */ source_id: string; /** Maximum retries for transient failures */ max_retries?: number; /** Fallback data source if primary fails */ fallback_source?: string;}
export interface ResilientPipelineOutput { success: boolean; /** Data source that was actually used */ source_used: string; /** Number of retries that occurred */ retries: number; /** Whether fallback was activated */ fallback_activated: boolean; /** Whether human escalation was triggered */ human_escalated: boolean; /** Errors encountered during processing */ errors_encountered: string[]; provenance_id: string;}
/** * Retry a function with exponential backoff. */async function withRetry<T>( fn: () => Promise<T>, maxRetries: number, onRetry: (attempt: number, error: Error) => void): Promise<T> { let lastError: Error | undefined; for (let attempt = 0; attempt <= maxRetries; attempt++) { try { return await fn(); } catch (err) { lastError = err instanceof Error ? err : new Error(String(err)); if (attempt < maxRetries) { onRetry(attempt + 1, lastError); // Exponential backoff: 10ms, 20ms, 40ms... (kept short for tests) await new Promise((resolve) => setTimeout(resolve, 10 * Math.pow(2, attempt))); } } } throw lastError;}
const execute = async ( ctx: ExecutionContext, input: ResilientPipelineInput): Promise<ResilientPipelineOutput> => { ctx.log.info('Starting resilient pipeline', { source_id: input.source_id });
const maxRetries = input.max_retries ?? 2; const fallbackSource = input.fallback_source ?? 'fallback-default'; const errorsEncountered: string[] = []; let retries = 0; let fallbackActivated = false; let humanEscalated = false; let sourceUsed = input.source_id;
// Step 1: Try to read primary data source with retry (handles TimeoutError) let data: string; try { data = await withRetry( () => ctx.files.readText(`data/${input.source_id}/input.json`), maxRetries, (attempt, error) => { retries++; errorsEncountered.push(`Retry ${attempt}: ${error.message}`); ctx.log.warn('Retrying after transient failure', { attempt, error: error.message, isTimeout: error instanceof TimeoutError, }); void emitReferenceAuthorSignal(ctx, 'pipeline_transient_retry', { attempt, source_id: input.source_id, is_timeout: error instanceof TimeoutError, }); } ); } catch (primaryError) { errorsEncountered.push(`Primary source failed: ${primaryError instanceof Error ? primaryError.message : String(primaryError)}`);
// Step 2: If primary fails, try fallback (handles NotFoundError) if (primaryError instanceof NotFoundError) { ctx.log.info('Primary source not found, activating fallback', { primary: input.source_id, fallback: fallbackSource, }); await emitReferenceAuthorSignal(ctx, 'pipeline_fallback_activated', { primary_source: input.source_id, fallback_source: fallbackSource, }); fallbackActivated = true; sourceUsed = fallbackSource;
try { data = await ctx.files.readText(`data/${fallbackSource}/input.json`); } catch (fallbackError) { errorsEncountered.push(`Fallback source failed: ${fallbackError instanceof Error ? fallbackError.message : String(fallbackError)}`);
// Step 3: If both sources fail, escalate to human (handles AccessDeniedError and others) ctx.log.warn('All data sources failed, escalating to human', { primary: input.source_id, fallback: fallbackSource, });
humanEscalated = true;
const decision = await ctx.escalate({ reason: 'All data sources unavailable - manual intervention required', context: { primary_source: input.source_id, fallback_source: fallbackSource, errors: errorsEncountered, }, requiredCapability: 'data/manual-recovery', });
// Use human-provided data or fail gracefully data = decision.approved ? JSON.stringify({ recovered: true, source: 'human_intervention' }) : JSON.stringify({ recovered: false, source: 'none' }); } } else if (primaryError instanceof AccessDeniedError) { // Access denied: escalate immediately, no fallback ctx.log.warn('Access denied to data source, escalating', { source: input.source_id, }); humanEscalated = true;
const decision = await ctx.escalate({ reason: `Access denied to data source: ${input.source_id}`, context: { source_id: input.source_id, error: primaryError.message, }, requiredCapability: 'data/access-grant', });
data = decision.approved ? JSON.stringify({ recovered: true, source: 'access_granted' }) : JSON.stringify({ recovered: false, source: 'access_denied' }); } else { // Unknown error: wrap and continue with empty data data = JSON.stringify({ recovered: false, source: 'error', error: String(primaryError) }); } }
// Step 4: Process the data (write output) const outputPath = `data/${sourceUsed}/output.json`; await ctx.files.writeText(outputPath, data);
// Step 5: Log comprehensive provenance const provenanceId = await ctx.provenance.log( withProvenanceContext(ctx, { type: 'resilient_pipeline:completed', status: 'success', metadata: { input: { source_id: input.source_id }, output: { source_used: sourceUsed, retries, fallback_activated: fallbackActivated, human_escalated: humanEscalated, errors_encountered: errorsEncountered, }, }, }) );
return { success: true, source_used: sourceUsed, retries, fallback_activated: fallbackActivated, human_escalated: humanEscalated, errors_encountered: errorsEncountered, provenance_id: provenanceId, };};
export default handler({ name: AGENT_ID, id: AGENT_ID, version: VERSION, capabilities: CAPABILITIES, description: 'Pipeline with retry, fallback, and human escalation for error recovery', manifest: { operations: [ { name: 'run', description: 'Process data source with retry, fallback, and human escalation on failure', paramsSchema: { source_id: { type: 'string', required: true, description: 'Data source identifier' }, max_retries: { type: 'number', description: 'Max retries for transient failures' }, fallback_source: { type: 'string', description: 'Fallback source if primary fails' }, }, resultKind: 'agent.resilient-pipeline.result', }, ], }, execute,});Run the tests
From monorepo root
$ pnpm test:agents:reference
$ pnpm test:agents:reference:verbose
The reference suite runs all 23 agents with createMockExecutionContext(), verifying every ctx.* API call and output shape.
See Also
SDK Reference
Patterns