Reference Implementations

Workflows

resilient-pipeline

advanced

Pipeline with retry, fallback, and human escalation for error recovery.

APIs Used

ctx.filesctx.escalate()ctx.telemetry.emit

Capabilities Required

workflows/resilient-pipelineworkflows/error-recovery

What this demonstrates

  • 1Retry loop with configurable max attempts and delay
  • 2ctx.escalate() escalation when retries are exhausted
  • 3ctx.files for primary/fallback document reads in the reference implementation
  • 4Typed SDKError handling with machine-readable error codes
  • 5Production error recovery: retry → fallback → escalate → abort
  • 6ctx.telemetry.emit via emitReferenceAuthorSignal on retry and fallback activation
typescript
/**
* Resilient Pipeline - Production Reference Agent
*
* Canon alignment: KB 105 (Agent SDK), KB 22 (HumanOS)
* Demonstrates: Error recovery patterns using SDK error types
* - Catching TimeoutError and retrying with backoff
* - Catching NotFoundError and using fallback data sources
* - Catching AccessDeniedError and escalating via ctx.escalate
* - Logging all error events to provenance for auditability
*
* This agent exercises the MockErrorConfig system. In the reference suite,
* it runs against a mock context configured with `failNext` to prove that
* agents can gracefully recover from transient failures.
*
* Principle: P10 (Fourth Law) - AI must know when it doesn't know.
* When errors occur, the agent degrades gracefully rather than failing hard.
*/
import { handler, withProvenanceContext, NotFoundError, TimeoutError, AccessDeniedError } from '@human/agent-sdk';
import type { ExecutionContext } from '@human/agent-sdk';
import { emitReferenceAuthorSignal } from '../../lib/reference-author-telemetry.js';
export const AGENT_ID = 'resilient-pipeline';
export const VERSION = '1.0.0';
export const CAPABILITIES = ['workflows/resilient-pipeline', 'workflows/error-recovery'];
export interface ResilientPipelineInput {
/** Data source to process */
source_id: string;
/** Maximum retries for transient failures */
max_retries?: number;
/** Fallback data source if primary fails */
fallback_source?: string;
}
export interface ResilientPipelineOutput {
success: boolean;
/** Data source that was actually used */
source_used: string;
/** Number of retries that occurred */
retries: number;
/** Whether fallback was activated */
fallback_activated: boolean;
/** Whether human escalation was triggered */
human_escalated: boolean;
/** Errors encountered during processing */
errors_encountered: string[];
provenance_id: string;
}
/**
* Retry a function with exponential backoff.
*/
async function withRetry<T>(
fn: () => Promise<T>,
maxRetries: number,
onRetry: (attempt: number, error: Error) => void
): Promise<T> {
let lastError: Error | undefined;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (err) {
lastError = err instanceof Error ? err : new Error(String(err));
if (attempt < maxRetries) {
onRetry(attempt + 1, lastError);
// Exponential backoff: 10ms, 20ms, 40ms... (kept short for tests)
await new Promise((resolve) => setTimeout(resolve, 10 * Math.pow(2, attempt)));
}
}
}
throw lastError;
}
const execute = async (
ctx: ExecutionContext,
input: ResilientPipelineInput
): Promise<ResilientPipelineOutput> => {
ctx.log.info('Starting resilient pipeline', { source_id: input.source_id });
const maxRetries = input.max_retries ?? 2;
const fallbackSource = input.fallback_source ?? 'fallback-default';
const errorsEncountered: string[] = [];
let retries = 0;
let fallbackActivated = false;
let humanEscalated = false;
let sourceUsed = input.source_id;
// Step 1: Try to read primary data source with retry (handles TimeoutError)
let data: string;
try {
data = await withRetry(
() => ctx.files.readText(`data/${input.source_id}/input.json`),
maxRetries,
(attempt, error) => {
retries++;
errorsEncountered.push(`Retry ${attempt}: ${error.message}`);
ctx.log.warn('Retrying after transient failure', {
attempt,
error: error.message,
isTimeout: error instanceof TimeoutError,
});
void emitReferenceAuthorSignal(ctx, 'pipeline_transient_retry', {
attempt,
source_id: input.source_id,
is_timeout: error instanceof TimeoutError,
});
}
);
} catch (primaryError) {
errorsEncountered.push(`Primary source failed: ${primaryError instanceof Error ? primaryError.message : String(primaryError)}`);
// Step 2: If primary fails, try fallback (handles NotFoundError)
if (primaryError instanceof NotFoundError) {
ctx.log.info('Primary source not found, activating fallback', {
primary: input.source_id,
fallback: fallbackSource,
});
await emitReferenceAuthorSignal(ctx, 'pipeline_fallback_activated', {
primary_source: input.source_id,
fallback_source: fallbackSource,
});
fallbackActivated = true;
sourceUsed = fallbackSource;
try {
data = await ctx.files.readText(`data/${fallbackSource}/input.json`);
} catch (fallbackError) {
errorsEncountered.push(`Fallback source failed: ${fallbackError instanceof Error ? fallbackError.message : String(fallbackError)}`);
// Step 3: If both sources fail, escalate to human (handles AccessDeniedError and others)
ctx.log.warn('All data sources failed, escalating to human', {
primary: input.source_id,
fallback: fallbackSource,
});
humanEscalated = true;
const decision = await ctx.escalate({
reason: 'All data sources unavailable - manual intervention required',
context: {
primary_source: input.source_id,
fallback_source: fallbackSource,
errors: errorsEncountered,
},
requiredCapability: 'data/manual-recovery',
});
// Use human-provided data or fail gracefully
data = decision.approved
? JSON.stringify({ recovered: true, source: 'human_intervention' })
: JSON.stringify({ recovered: false, source: 'none' });
}
} else if (primaryError instanceof AccessDeniedError) {
// Access denied: escalate immediately, no fallback
ctx.log.warn('Access denied to data source, escalating', {
source: input.source_id,
});
humanEscalated = true;
const decision = await ctx.escalate({
reason: `Access denied to data source: ${input.source_id}`,
context: {
source_id: input.source_id,
error: primaryError.message,
},
requiredCapability: 'data/access-grant',
});
data = decision.approved
? JSON.stringify({ recovered: true, source: 'access_granted' })
: JSON.stringify({ recovered: false, source: 'access_denied' });
} else {
// Unknown error: wrap and continue with empty data
data = JSON.stringify({ recovered: false, source: 'error', error: String(primaryError) });
}
}
// Step 4: Process the data (write output)
const outputPath = `data/${sourceUsed}/output.json`;
await ctx.files.writeText(outputPath, data);
// Step 5: Log comprehensive provenance
const provenanceId = await ctx.provenance.log(
withProvenanceContext(ctx, {
type: 'resilient_pipeline:completed',
status: 'success',
metadata: {
input: { source_id: input.source_id },
output: {
source_used: sourceUsed,
retries,
fallback_activated: fallbackActivated,
human_escalated: humanEscalated,
errors_encountered: errorsEncountered,
},
},
})
);
return {
success: true,
source_used: sourceUsed,
retries,
fallback_activated: fallbackActivated,
human_escalated: humanEscalated,
errors_encountered: errorsEncountered,
provenance_id: provenanceId,
};
};
export default handler({
name: AGENT_ID,
id: AGENT_ID,
version: VERSION,
capabilities: CAPABILITIES,
description: 'Pipeline with retry, fallback, and human escalation for error recovery',
manifest: {
operations: [
{
name: 'run',
description: 'Process data source with retry, fallback, and human escalation on failure',
paramsSchema: {
source_id: { type: 'string', required: true, description: 'Data source identifier' },
max_retries: { type: 'number', description: 'Max retries for transient failures' },
fallback_source: { type: 'string', description: 'Fallback source if primary fails' },
},
resultKind: 'agent.resilient-pipeline.result',
},
],
},
execute,
});

Run the tests

From monorepo root

$ pnpm test:agents:reference

$ pnpm test:agents:reference:verbose

The reference suite runs all 23 agents with createMockExecutionContext(), verifying every ctx.* API call and output shape.

See Also