Streaming Package Reference
Complete reference for the runtime/streaming package, which provides generic utilities for bidirectional streaming communication with LLM providers.
Overview
Section titled “Overview”The streaming package extracts common patterns used in duplex (bidirectional) streaming conversations:
- Response Processing - State machine for handling provider responses
- Audio Streaming - Utilities for sending audio chunks to providers
- Tool Execution - Interface for streaming tool calls
- Response Collection - Patterns for managing streaming responses
Import
Section titled “Import”import "github.com/AltairaLabs/PromptKit/runtime/streaming"Response Processing
Section titled “Response Processing”ResponseAction
Section titled “ResponseAction”Indicates what action to take after processing a response element.
type ResponseAction int
const ( ResponseActionContinue ResponseAction = iota // Keep waiting ResponseActionComplete // Turn completed ResponseActionError // Error occurred ResponseActionToolCalls // Execute tool calls)| Action | Description |
|---|---|
ResponseActionContinue | Informational element (e.g., interruption signal), continue waiting |
ResponseActionComplete | Response complete, turn finished |
ResponseActionError | Error occurred or empty response |
ResponseActionToolCalls | Tool calls received, need execution |
ProcessResponseElement
Section titled “ProcessResponseElement”func ProcessResponseElement(elem *stage.StreamElement, logPrefix string) (ResponseAction, error)Core state machine for duplex streaming response handling. Analyzes a stream element and determines the appropriate action.
Parameters:
elem- Stream element from the pipelinelogPrefix- Prefix for log messages
Returns:
ResponseAction- Action to takeerror- Only set when action isResponseActionError
Example:
for elem := range outputChan { action, err := streaming.ProcessResponseElement(&elem, "MySession") switch action { case streaming.ResponseActionContinue: continue case streaming.ResponseActionComplete: return nil case streaming.ResponseActionError: return err case streaming.ResponseActionToolCalls: // Execute tools and send results }}ErrEmptyResponse
Section titled “ErrEmptyResponse”var ErrEmptyResponse = errors.New("empty response, likely interrupted")Returned when a response element has no content. This typically indicates an interrupted response that wasn’t properly handled.
Audio Streaming
Section titled “Audio Streaming”AudioStreamer
Section titled “AudioStreamer”Provides utilities for streaming audio data through a pipeline.
type AudioStreamer struct { ChunkSize int // Bytes per chunk (default: 640) ChunkIntervalMs int // Interval between chunks (default: 20ms)}NewAudioStreamer
Section titled “NewAudioStreamer”func NewAudioStreamer() *AudioStreamerCreates a new audio streamer with default settings:
- ChunkSize: 640 bytes (20ms at 16kHz 16-bit mono)
- ChunkIntervalMs: 20ms
StreamBurst
Section titled “StreamBurst”func (a *AudioStreamer) StreamBurst( ctx context.Context, audioData []byte, sampleRate int, inputChan chan<- stage.StreamElement,) errorSends all audio data as fast as possible without pacing. Preferred for pre-recorded audio to avoid false turn detections from natural speech pauses.
Parameters:
ctx- Context for cancellationaudioData- Raw PCM audio bytessampleRate- Sample rate in Hz (typically 16000)inputChan- Pipeline input channel
Example:
streamer := streaming.NewAudioStreamer()err := streamer.StreamBurst(ctx, audioData, 16000, inputChan)StreamRealtime
Section titled “StreamRealtime”func (a *AudioStreamer) StreamRealtime( ctx context.Context, audioData []byte, sampleRate int, inputChan chan<- stage.StreamElement,) errorSends audio data paced to match real-time playback. Each chunk is sent with a delay matching its duration.
Note: This mode can cause issues with providers that detect speech pauses mid-utterance. Use StreamBurst for pre-recorded audio.
SendChunk
Section titled “SendChunk”func (a *AudioStreamer) SendChunk( ctx context.Context, chunk []byte, sampleRate int, inputChan chan<- stage.StreamElement,) errorSends a single audio chunk through the pipeline.
SendEndOfStream
Section titled “SendEndOfStream”func SendEndOfStream( ctx context.Context, inputChan chan<- stage.StreamElement,) errorSignals that audio input is complete for the current turn. This triggers the provider to generate a response.
Example:
// Stream audiostreamer.StreamBurst(ctx, audioData, 16000, inputChan)
// Signal end of inputstreaming.SendEndOfStream(ctx, inputChan)Audio Constants
Section titled “Audio Constants”const ( DefaultChunkSize = 640 // 20ms at 16kHz 16-bit mono DefaultSampleRate = 16000 // Required by Gemini Live API DefaultChunkIntervalMs = 20 // Interval for real-time mode)Tool Execution
Section titled “Tool Execution”ToolExecutor
Section titled “ToolExecutor”Interface for executing tool calls during streaming sessions.
type ToolExecutor interface { Execute(ctx context.Context, toolCalls []types.MessageToolCall) (*ToolExecutionResult, error)}Implementations are responsible for:
- Looking up tools in a registry
- Executing the tool functions
- Formatting results for the provider
- Handling execution errors
ToolExecutionResult
Section titled “ToolExecutionResult”type ToolExecutionResult struct { // For sending back to the streaming provider ProviderResponses []providers.ToolResponse
// For state store capture ResultMessages []types.Message}SendToolResults
Section titled “SendToolResults”func SendToolResults( ctx context.Context, result *ToolExecutionResult, inputChan chan<- stage.StreamElement,) errorSends tool execution results back through the pipeline to the provider, and includes tool result messages for state store capture.
BuildToolResponseElement
Section titled “BuildToolResponseElement”func BuildToolResponseElement(result *ToolExecutionResult) stage.StreamElementCreates a stream element containing tool results. The element includes:
tool_responsesmetadata for the providertool_result_messagesfor state store capture
ExecuteAndSend
Section titled “ExecuteAndSend”func ExecuteAndSend( ctx context.Context, executor ToolExecutor, toolCalls []types.MessageToolCall, inputChan chan<- stage.StreamElement,) errorConvenience function that executes tool calls and sends results through the pipeline in one operation.
Response Collection
Section titled “Response Collection”ResponseCollectorConfig
Section titled “ResponseCollectorConfig”type ResponseCollectorConfig struct { ToolExecutor ToolExecutor // Called when tool calls are received LogPrefix string // Prepended to log messages}ResponseCollector
Section titled “ResponseCollector”Manages response collection from a streaming session, processing elements, handling tool calls, and signaling completion.
type ResponseCollector struct { // ...}NewResponseCollector
Section titled “NewResponseCollector”func NewResponseCollector(config ResponseCollectorConfig) *ResponseCollectorCreates a new response collector with the given configuration.
func (c *ResponseCollector) Start( ctx context.Context, outputChan <-chan stage.StreamElement, inputChan chan<- stage.StreamElement,) <-chan errorBegins collecting responses in a goroutine. Returns a channel that receives nil on success or an error on failure.
The collector will:
- Process incoming stream elements
- Execute tool calls via the ToolExecutor (if configured)
- Send tool results back through inputChan
- Signal completion or error through the returned channel
Example:
config := streaming.ResponseCollectorConfig{ ToolExecutor: myExecutor, LogPrefix: "Session-123",}collector := streaming.NewResponseCollector(config)doneChan := collector.Start(ctx, outputChan, inputChan)
// Wait for completionif err := <-doneChan; err != nil { log.Printf("Response collection failed: %v", err)}DrainStaleMessages
Section titled “DrainStaleMessages”func DrainStaleMessages(outputChan <-chan stage.StreamElement) (int, error)Removes any buffered messages from the output channel. Useful for clearing state between turns.
Returns:
- Number of messages drained
ErrSessionEndedif the session ended during drain
WaitForResponse
Section titled “WaitForResponse”func WaitForResponse(ctx context.Context, responseDone <-chan error) errorConvenience function for blocking until a response is received.
Error Types
Section titled “Error Types”var ( ErrEmptyResponse = errors.New("empty response, likely interrupted") ErrSessionEnded = errors.New("session ended"))Complete Example
Section titled “Complete Example”package main
import ( "context" "github.com/AltairaLabs/PromptKit/runtime/streaming" "github.com/AltairaLabs/PromptKit/runtime/pipeline/stage")
// MyToolExecutor implements streaming.ToolExecutortype MyToolExecutor struct { registry *tools.Registry}
func (e *MyToolExecutor) Execute( ctx context.Context, toolCalls []types.MessageToolCall,) (*streaming.ToolExecutionResult, error) { var responses []providers.ToolResponse var messages []types.Message
for _, call := range toolCalls { result, err := e.registry.Execute(ctx, call.Function.Name, call.Function.Arguments) if err != nil { responses = append(responses, providers.ToolResponse{ ToolCallID: call.ID, Result: err.Error(), IsError: true, }) } else { responses = append(responses, providers.ToolResponse{ ToolCallID: call.ID, Result: string(result), }) } }
return &streaming.ToolExecutionResult{ ProviderResponses: responses, ResultMessages: messages, }, nil}
func streamAudioTurn(ctx context.Context, audioData []byte, inputChan chan<- stage.StreamElement, outputChan <-chan stage.StreamElement) error { // Stream audio in burst mode streamer := streaming.NewAudioStreamer() if err := streamer.StreamBurst(ctx, audioData, 16000, inputChan); err != nil { return err }
// Signal end of input if err := streaming.SendEndOfStream(ctx, inputChan); err != nil { return err }
// Collect response with tool handling executor := &MyToolExecutor{registry: myRegistry} collector := streaming.NewResponseCollector(streaming.ResponseCollectorConfig{ ToolExecutor: executor, LogPrefix: "AudioTurn", })
doneChan := collector.Start(ctx, outputChan, inputChan) return streaming.WaitForResponse(ctx, doneChan)}See Also
Section titled “See Also”- Audio API Reference - VAD mode, ASM mode, turn detection
- TTS API Reference - Text-to-speech services
- Duplex Configuration - Arena duplex configuration
- Duplex Architecture - How duplex streaming works