Reference Implementations

Data

streaming-analyzer

intermediate

Stream LLM analysis over datasets with real-time token accumulation.

APIs Used

ctx.llm.stream()ctx.dbctx.provenance

Capabilities Required

data/stream-analysisdata/real-time-analytics

What this demonstrates

  • 1ctx.llm.stream() for real-time streaming analysis with async iteration
  • 2Token accumulation: aggregate chunk.content across the stream
  • 3ctx.db for dataset access and result storage
  • 4ctx.provenance.log() for auditing streaming analysis sessions
  • 5Canonical streaming LLM pattern: query data → stream analysis → accumulate → store
typescript
/**
* Streaming Analyzer - Production Reference Agent
*
* Canon alignment: KB 105 (Agent SDK), KB 22 (HumanOS)
* Demonstrates: ctx.llm.stream (AsyncIterableIterator<LLMChunk>), ctx.db, ctx.files, ctx.provenance
*
* Real use case: Analyze a dataset with streaming LLM output for real-time
* progress. Think: a dashboard showing analysis tokens appearing live as
* the LLM processes data. Useful for long-running analyses where users need
* feedback that work is in progress.
*/
import { handler, withProvenanceContext } from '@human/agent-sdk';
import type { ExecutionContext } from '@human/agent-sdk';
export const AGENT_ID = 'streaming-analyzer';
export const VERSION = '1.0.0';
export const CAPABILITIES = ['data/stream-analysis', 'data/real-time-analytics'];
export interface StreamingAnalyzerInput {
/** Dataset identifier to analyze */
dataset_id: string;
/** Analysis type */
analysis_type?: 'summary' | 'trends' | 'anomalies';
/** Output file path for the analysis report */
output_path?: string;
}
export interface StreamingAnalyzerOutput {
success: boolean;
/** Full analysis content */
analysis: string;
/** Total tokens consumed during streaming */
total_tokens: number;
/** Number of chunks received */
chunk_count: number;
/** Output report path */
report_path: string;
provenance_id: string;
}
const execute = async (
ctx: ExecutionContext,
input: StreamingAnalyzerInput
): Promise<StreamingAnalyzerOutput> => {
ctx.log.info('Starting streaming analysis', {
dataset_id: input.dataset_id,
analysis_type: input.analysis_type ?? 'summary',
});
const analysisType = input.analysis_type ?? 'summary';
// Fetch data from DB (ctx.db())
const db = await ctx.db();
const rows = (await db.query(
`SELECT * FROM datasets WHERE dataset_id = $1 LIMIT 500`,
[input.dataset_id]
)) as Array<Record<string, unknown>>;
ctx.log.info('Dataset loaded', { row_count: rows.length });
// Build prompt for streaming analysis
const dataPreview = rows.length > 0
? JSON.stringify(rows.slice(0, 10), null, 2)
: 'No data rows found. Provide a general analysis framework.';
// Stream analysis using ctx.llm.stream - the key gap feature
const stream = ctx.llm.stream({
prompt: [
{
role: 'system',
content: `You are a data analyst. Perform a ${analysisType} analysis. Be thorough and structured.`,
},
{
role: 'user',
content: `Analyze this dataset (${rows.length} rows, showing first 10):\n\n${dataPreview}`,
},
],
temperature: 0.4,
maxTokens: 1000,
});
// Accumulate streaming chunks
let fullContent = '';
let chunkCount = 0;
let totalTokens = 0;
for await (const chunk of stream) {
fullContent = chunk.content; // content is accumulated by the stream
chunkCount++;
// Log progress periodically (every 5 chunks)
if (chunkCount % 5 === 0) {
ctx.log.debug('Streaming progress', {
chunks: chunkCount,
chars: fullContent.length,
});
}
// Track token usage from final chunk
if (chunk.finishReason === 'stop' && chunk.usage) {
totalTokens = chunk.usage.totalTokens;
}
}
ctx.log.info('Streaming complete', {
chunk_count: chunkCount,
total_tokens: totalTokens,
content_length: fullContent.length,
});
// Write analysis report to file (ctx.files)
const reportPath = input.output_path ??
`reports/analysis-${input.dataset_id}-${analysisType}.md`;
const reportContent = [
`# ${analysisType.charAt(0).toUpperCase() + analysisType.slice(1)} Analysis`,
``,
`**Dataset:** ${input.dataset_id}`,
`**Rows analyzed:** ${rows.length}`,
`**Generated:** ${new Date().toISOString()}`,
`**Tokens used:** ${totalTokens}`,
``,
`---`,
``,
fullContent,
].join('\n');
await ctx.files.writeText(reportPath, reportContent);
// Log provenance (ctx.provenance)
const provenanceId = await ctx.provenance.log(
withProvenanceContext(ctx, {
action: 'streaming_analysis:completed',
status: 'success',
input: { dataset_id: input.dataset_id, analysis_type: analysisType },
output: {
report_path: reportPath,
chunk_count: chunkCount,
total_tokens: totalTokens,
},
})
);
return {
success: true,
analysis: fullContent,
total_tokens: totalTokens,
chunk_count: chunkCount,
report_path: reportPath,
provenance_id: provenanceId,
};
};
export default handler({
name: AGENT_ID,
id: AGENT_ID,
version: VERSION,
capabilities: CAPABILITIES,
description: 'Stream LLM analysis over datasets with real-time token accumulation',
manifest: {
operations: [
{
name: 'analyze',
description: 'Stream LLM analysis over a dataset with real-time token accumulation',
paramsSchema: {
dataset_id: { type: 'string', required: true, description: 'Dataset identifier' },
analysis_type: { type: 'string', description: 'summary | trends | anomalies' },
output_path: { type: 'string', description: 'Output file path for report' },
},
resultKind: 'agent.streaming-analyzer.result',
},
],
},
execute,
});

Run the tests

From monorepo root

$ pnpm test:agents:reference

$ pnpm test:agents:reference:verbose

The reference suite runs all 23 agents with createMockExecutionContext(), verifying every ctx.* API call and output shape.

See Also