Reference Implementationsintermediate
Workflows
job-lifecycle-manager
Manage background jobs with deduplication, cancellation, and cache invalidation.
APIs Used
ctx.queuectx.cacheCapabilities Required
workflows/job-lifecycleworkflows/dedupWhat this demonstrates
- 1ctx.queue.push() to enqueue background jobs with metadata
- 2ctx.cache.get() for deduplication: skip if job already queued
- 3ctx.queue.cancel() to cancel in-flight jobs by ID
- 4ctx.cache.del() for cache invalidation after cancellation
- 5Job lifecycle: enqueue → deduplicate → track → cancel → invalidate
Source
View on GitHubtypescript
/** * Job Lifecycle Manager - Production Reference Agent * * Canon alignment: KB 105 (Agent SDK), KB 22 (HumanOS) * Demonstrates: ctx.cache.delete, ctx.queue.cancel, ctx.cache, ctx.queue, ctx.provenance * * Real use case: Manage background jobs with deduplication. When a new job * request arrives for something already queued, cancel the stale job and * invalidate its cached result before enqueuing fresh work. This prevents * resource waste from duplicate processing and ensures users always get * fresh results. */
import { handler, withProvenanceContext } from '@human/agent-sdk';import type { ExecutionContext } from '@human/agent-sdk';
export const AGENT_ID = 'job-lifecycle-manager';export const VERSION = '1.0.0';export const CAPABILITIES = ['workflows/job-lifecycle', 'workflows/dedup'];
export interface JobLifecycleInput { /** Job type identifier */ job_type: string; /** Job payload */ payload: Record<string, unknown>; /** Cache key prefix for dedup */ cache_prefix?: string; /** Priority for new job (1-10) */ priority?: number;}
export interface JobLifecycleOutput { success: boolean; /** New job ID */ new_job_id: string; /** Whether a stale job was cancelled */ stale_cancelled: boolean; /** Whether stale cache was invalidated */ cache_invalidated: boolean; /** Previous job ID if one existed */ previous_job_id?: string; provenance_id: string;}
const execute = async ( ctx: ExecutionContext, input: JobLifecycleInput): Promise<JobLifecycleOutput> => { ctx.log.info('Managing job lifecycle', { job_type: input.job_type });
const cachePrefix = input.cache_prefix ?? 'job'; const jobCacheKey = `${cachePrefix}:${input.job_type}:current`; const resultCacheKey = `${cachePrefix}:${input.job_type}:result`;
let staleCancelled = false; let cacheInvalidated = false; let previousJobId: string | undefined;
// Step 1: Check cache for existing job ID (ctx.cache.get) const existingJobId = await ctx.cache.get(jobCacheKey);
if (existingJobId) { ctx.log.info('Found existing job', { job_id: existingJobId }); previousJobId = existingJobId;
// Step 2: Check if existing job is still pending try { const status = await ctx.queue.status(existingJobId);
if (status.state === 'pending') { // Step 3: Cancel stale job (ctx.queue.cancel) - key gap feature staleCancelled = await ctx.queue.cancel(existingJobId); ctx.log.info('Cancelled stale job', { job_id: existingJobId, cancelled: staleCancelled, }); } else { ctx.log.info('Existing job not cancellable', { job_id: existingJobId, state: status.state, }); } } catch (err) { // Job may no longer exist - log and continue ctx.log.warn('Could not check stale job status', { job_id: existingJobId, error: err instanceof Error ? err.message : String(err), }); }
// Step 4: Invalidate stale cached result (ctx.cache.delete) - key gap feature await ctx.cache.delete(resultCacheKey); cacheInvalidated = true; ctx.log.info('Invalidated cached result', { key: resultCacheKey }); }
// Step 5: Enqueue new job (ctx.queue.enqueue) const newJobId = await ctx.queue.enqueue(input.job_type, input.payload, { priority: input.priority ?? 5, dedupeKey: `${cachePrefix}:${input.job_type}`, });
// Step 6: Cache the new job ID for future dedup (ctx.cache.set) await ctx.cache.set(jobCacheKey, newJobId, { ttl: 3600 });
ctx.log.info('Enqueued new job', { job_id: newJobId });
// Step 7: Log provenance (ctx.provenance) const provenanceId = await ctx.provenance.log( withProvenanceContext(ctx, { type: 'job_lifecycle:managed', status: 'success', metadata: { input: { job_type: input.job_type }, output: { new_job_id: newJobId, stale_cancelled: staleCancelled, cache_invalidated: cacheInvalidated, previous_job_id: previousJobId, }, }, }) );
return { success: true, new_job_id: newJobId, stale_cancelled: staleCancelled, cache_invalidated: cacheInvalidated, previous_job_id: previousJobId, provenance_id: provenanceId, };};
export default handler({ name: AGENT_ID, id: AGENT_ID, version: VERSION, capabilities: CAPABILITIES, description: 'Manage background jobs with deduplication, cancellation, and cache invalidation', manifest: { operations: [ { name: 'manage', description: 'Enqueue job with dedup: cancel stale job and invalidate cache if needed', paramsSchema: { job_type: { type: 'string', required: true, description: 'Job type identifier' }, payload: { type: 'object', required: true, description: 'Job payload' }, cache_prefix: { type: 'string', description: 'Cache key prefix for dedup' }, priority: { type: 'number', description: 'Priority 1-10' }, }, resultKind: 'agent.job-lifecycle-manager.result', }, ], }, execute,});Run the tests
From monorepo root
$ pnpm test:agents:reference
$ pnpm test:agents:reference:verbose
The reference suite runs all 23 agents with createMockExecutionContext(), verifying every ctx.* API call and output shape.