Reference Implementationsintermediate
Data
streaming-analyzer
Stream LLM analysis over datasets with real-time token accumulation.
APIs Used
ctx.llm.stream()ctx.dbctx.provenanceCapabilities Required
data/stream-analysisdata/real-time-analyticsWhat this demonstrates
- 1ctx.llm.stream() for real-time streaming analysis with async iteration
- 2Token accumulation: aggregate chunk.content across the stream
- 3ctx.db for dataset access and result storage
- 4ctx.provenance.log() for auditing streaming analysis sessions
- 5Canonical streaming LLM pattern: query data → stream analysis → accumulate → store
Source
View on GitHubtypescript
/** * Streaming Analyzer - Production Reference Agent * * Canon alignment: KB 105 (Agent SDK), KB 22 (HumanOS) * Demonstrates: ctx.llm.stream (AsyncIterableIterator<LLMChunk>), ctx.db, ctx.files, ctx.provenance * * Real use case: Analyze a dataset with streaming LLM output for real-time * progress. Think: a dashboard showing analysis tokens appearing live as * the LLM processes data. Useful for long-running analyses where users need * feedback that work is in progress. */
import { handler, withProvenanceContext } from '@human/agent-sdk';import type { ExecutionContext } from '@human/agent-sdk';
export const AGENT_ID = 'streaming-analyzer';export const VERSION = '1.0.0';export const CAPABILITIES = ['data/stream-analysis', 'data/real-time-analytics'];
export interface StreamingAnalyzerInput { /** Dataset identifier to analyze */ dataset_id: string; /** Analysis type */ analysis_type?: 'summary' | 'trends' | 'anomalies'; /** Output file path for the analysis report */ output_path?: string;}
export interface StreamingAnalyzerOutput { success: boolean; /** Full analysis content */ analysis: string; /** Total tokens consumed during streaming */ total_tokens: number; /** Number of chunks received */ chunk_count: number; /** Output report path */ report_path: string; provenance_id: string;}
const execute = async ( ctx: ExecutionContext, input: StreamingAnalyzerInput): Promise<StreamingAnalyzerOutput> => { ctx.log.info('Starting streaming analysis', { dataset_id: input.dataset_id, analysis_type: input.analysis_type ?? 'summary', });
const analysisType = input.analysis_type ?? 'summary';
// Fetch data from DB (ctx.db()) const db = await ctx.db(); const rows = (await db.query( `SELECT * FROM datasets WHERE dataset_id = $1 LIMIT 500`, [input.dataset_id] )) as Array<Record<string, unknown>>;
ctx.log.info('Dataset loaded', { row_count: rows.length });
// Build prompt for streaming analysis const dataPreview = rows.length > 0 ? JSON.stringify(rows.slice(0, 10), null, 2) : 'No data rows found. Provide a general analysis framework.';
// Stream analysis using ctx.llm.stream - the key gap feature const stream = ctx.llm.stream({ prompt: [ { role: 'system', content: `You are a data analyst. Perform a ${analysisType} analysis. Be thorough and structured.`, }, { role: 'user', content: `Analyze this dataset (${rows.length} rows, showing first 10):\n\n${dataPreview}`, }, ], temperature: 0.4, maxTokens: 1000, });
// Accumulate streaming chunks let fullContent = ''; let chunkCount = 0; let totalTokens = 0;
for await (const chunk of stream) { fullContent = chunk.content; // content is accumulated by the stream chunkCount++;
// Log progress periodically (every 5 chunks) if (chunkCount % 5 === 0) { ctx.log.debug('Streaming progress', { chunks: chunkCount, chars: fullContent.length, }); }
// Track token usage from final chunk if (chunk.finishReason === 'stop' && chunk.usage) { totalTokens = chunk.usage.totalTokens; } }
ctx.log.info('Streaming complete', { chunk_count: chunkCount, total_tokens: totalTokens, content_length: fullContent.length, });
// Write analysis report to file (ctx.files) const reportPath = input.output_path ?? `reports/analysis-${input.dataset_id}-${analysisType}.md`;
const reportContent = [ `# ${analysisType.charAt(0).toUpperCase() + analysisType.slice(1)} Analysis`, ``, `**Dataset:** ${input.dataset_id}`, `**Rows analyzed:** ${rows.length}`, `**Generated:** ${new Date().toISOString()}`, `**Tokens used:** ${totalTokens}`, ``, `---`, ``, fullContent, ].join('\n');
await ctx.files.writeText(reportPath, reportContent);
// Log provenance (ctx.provenance) const provenanceId = await ctx.provenance.log( withProvenanceContext(ctx, { action: 'streaming_analysis:completed', status: 'success', input: { dataset_id: input.dataset_id, analysis_type: analysisType }, output: { report_path: reportPath, chunk_count: chunkCount, total_tokens: totalTokens, }, }) );
return { success: true, analysis: fullContent, total_tokens: totalTokens, chunk_count: chunkCount, report_path: reportPath, provenance_id: provenanceId, };};
export default handler({ name: AGENT_ID, id: AGENT_ID, version: VERSION, capabilities: CAPABILITIES, description: 'Stream LLM analysis over datasets with real-time token accumulation', manifest: { operations: [ { name: 'analyze', description: 'Stream LLM analysis over a dataset with real-time token accumulation', paramsSchema: { dataset_id: { type: 'string', required: true, description: 'Dataset identifier' }, analysis_type: { type: 'string', description: 'summary | trends | anomalies' }, output_path: { type: 'string', description: 'Output file path for report' }, }, resultKind: 'agent.streaming-analyzer.result', }, ], }, execute,});Run the tests
From monorepo root
$ pnpm test:agents:reference
$ pnpm test:agents:reference:verbose
The reference suite runs all 23 agents with createMockExecutionContext(), verifying every ctx.* API call and output shape.
See Also
SDK Reference