Reference Implementations

Workflows

job-lifecycle-manager

intermediate

Manage background jobs with deduplication, cancellation, and cache invalidation.

APIs Used

ctx.queuectx.cache

Capabilities Required

workflows/job-lifecycleworkflows/dedup

What this demonstrates

  • 1ctx.queue.push() to enqueue background jobs with metadata
  • 2ctx.cache.get() for deduplication: skip if job already queued
  • 3ctx.queue.cancel() to cancel in-flight jobs by ID
  • 4ctx.cache.del() for cache invalidation after cancellation
  • 5Job lifecycle: enqueue → deduplicate → track → cancel → invalidate
typescript
/**
* Job Lifecycle Manager - Production Reference Agent
*
* Canon alignment: KB 105 (Agent SDK), KB 22 (HumanOS)
* Demonstrates: ctx.cache.delete, ctx.queue.cancel, ctx.cache, ctx.queue, ctx.provenance
*
* Real use case: Manage background jobs with deduplication. When a new job
* request arrives for something already queued, cancel the stale job and
* invalidate its cached result before enqueuing fresh work. This prevents
* resource waste from duplicate processing and ensures users always get
* fresh results.
*/
import { handler, withProvenanceContext } from '@human/agent-sdk';
import type { ExecutionContext } from '@human/agent-sdk';
export const AGENT_ID = 'job-lifecycle-manager';
export const VERSION = '1.0.0';
export const CAPABILITIES = ['workflows/job-lifecycle', 'workflows/dedup'];
export interface JobLifecycleInput {
/** Job type identifier */
job_type: string;
/** Job payload */
payload: Record<string, unknown>;
/** Cache key prefix for dedup */
cache_prefix?: string;
/** Priority for new job (1-10) */
priority?: number;
}
export interface JobLifecycleOutput {
success: boolean;
/** New job ID */
new_job_id: string;
/** Whether a stale job was cancelled */
stale_cancelled: boolean;
/** Whether stale cache was invalidated */
cache_invalidated: boolean;
/** Previous job ID if one existed */
previous_job_id?: string;
provenance_id: string;
}
const execute = async (
ctx: ExecutionContext,
input: JobLifecycleInput
): Promise<JobLifecycleOutput> => {
ctx.log.info('Managing job lifecycle', { job_type: input.job_type });
const cachePrefix = input.cache_prefix ?? 'job';
const jobCacheKey = `${cachePrefix}:${input.job_type}:current`;
const resultCacheKey = `${cachePrefix}:${input.job_type}:result`;
let staleCancelled = false;
let cacheInvalidated = false;
let previousJobId: string | undefined;
// Step 1: Check cache for existing job ID (ctx.cache.get)
const existingJobId = await ctx.cache.get(jobCacheKey);
if (existingJobId) {
ctx.log.info('Found existing job', { job_id: existingJobId });
previousJobId = existingJobId;
// Step 2: Check if existing job is still pending
try {
const status = await ctx.queue.status(existingJobId);
if (status.state === 'pending') {
// Step 3: Cancel stale job (ctx.queue.cancel) - key gap feature
staleCancelled = await ctx.queue.cancel(existingJobId);
ctx.log.info('Cancelled stale job', {
job_id: existingJobId,
cancelled: staleCancelled,
});
} else {
ctx.log.info('Existing job not cancellable', {
job_id: existingJobId,
state: status.state,
});
}
} catch (err) {
// Job may no longer exist - log and continue
ctx.log.warn('Could not check stale job status', {
job_id: existingJobId,
error: err instanceof Error ? err.message : String(err),
});
}
// Step 4: Invalidate stale cached result (ctx.cache.delete) - key gap feature
await ctx.cache.delete(resultCacheKey);
cacheInvalidated = true;
ctx.log.info('Invalidated cached result', { key: resultCacheKey });
}
// Step 5: Enqueue new job (ctx.queue.enqueue)
const newJobId = await ctx.queue.enqueue(input.job_type, input.payload, {
priority: input.priority ?? 5,
dedupeKey: `${cachePrefix}:${input.job_type}`,
});
// Step 6: Cache the new job ID for future dedup (ctx.cache.set)
await ctx.cache.set(jobCacheKey, newJobId, { ttl: 3600 });
ctx.log.info('Enqueued new job', { job_id: newJobId });
// Step 7: Log provenance (ctx.provenance)
const provenanceId = await ctx.provenance.log(
withProvenanceContext(ctx, {
type: 'job_lifecycle:managed',
status: 'success',
metadata: {
input: { job_type: input.job_type },
output: {
new_job_id: newJobId,
stale_cancelled: staleCancelled,
cache_invalidated: cacheInvalidated,
previous_job_id: previousJobId,
},
},
})
);
return {
success: true,
new_job_id: newJobId,
stale_cancelled: staleCancelled,
cache_invalidated: cacheInvalidated,
previous_job_id: previousJobId,
provenance_id: provenanceId,
};
};
export default handler({
name: AGENT_ID,
id: AGENT_ID,
version: VERSION,
capabilities: CAPABILITIES,
description: 'Manage background jobs with deduplication, cancellation, and cache invalidation',
manifest: {
operations: [
{
name: 'manage',
description: 'Enqueue job with dedup: cancel stale job and invalidate cache if needed',
paramsSchema: {
job_type: { type: 'string', required: true, description: 'Job type identifier' },
payload: { type: 'object', required: true, description: 'Job payload' },
cache_prefix: { type: 'string', description: 'Cache key prefix for dedup' },
priority: { type: 'number', description: 'Priority 1-10' },
},
resultKind: 'agent.job-lifecycle-manager.result',
},
],
},
execute,
});

Run the tests

From monorepo root

$ pnpm test:agents:reference

$ pnpm test:agents:reference:verbose

The reference suite runs all 23 agents with createMockExecutionContext(), verifying every ctx.* API call and output shape.