Skip to content

Streaming Package Reference

Complete reference for the runtime/streaming package, which provides generic utilities for bidirectional streaming communication with LLM providers.

The streaming package extracts common patterns used in duplex (bidirectional) streaming conversations:

  • Response Processing - State machine for handling provider responses
  • Audio Streaming - Utilities for sending audio chunks to providers
  • Tool Execution - Interface for streaming tool calls
  • Response Collection - Patterns for managing streaming responses
import "github.com/AltairaLabs/PromptKit/runtime/streaming"

Indicates what action to take after processing a response element.

type ResponseAction int
const (
ResponseActionContinue ResponseAction = iota // Keep waiting
ResponseActionComplete // Turn completed
ResponseActionError // Error occurred
ResponseActionToolCalls // Execute tool calls
)
ActionDescription
ResponseActionContinueInformational element (e.g., interruption signal), continue waiting
ResponseActionCompleteResponse complete, turn finished
ResponseActionErrorError occurred or empty response
ResponseActionToolCallsTool calls received, need execution
func ProcessResponseElement(elem *stage.StreamElement, logPrefix string) (ResponseAction, error)

Core state machine for duplex streaming response handling. Analyzes a stream element and determines the appropriate action.

Parameters:

  • elem - Stream element from the pipeline
  • logPrefix - Prefix for log messages

Returns:

  • ResponseAction - Action to take
  • error - Only set when action is ResponseActionError

Example:

for elem := range outputChan {
action, err := streaming.ProcessResponseElement(&elem, "MySession")
switch action {
case streaming.ResponseActionContinue:
continue
case streaming.ResponseActionComplete:
return nil
case streaming.ResponseActionError:
return err
case streaming.ResponseActionToolCalls:
// Execute tools and send results
}
}
var ErrEmptyResponse = errors.New("empty response, likely interrupted")

Returned when a response element has no content. This typically indicates an interrupted response that wasn’t properly handled.


Provides utilities for streaming audio data through a pipeline.

type AudioStreamer struct {
ChunkSize int // Bytes per chunk (default: 640)
ChunkIntervalMs int // Interval between chunks (default: 20ms)
}
func NewAudioStreamer() *AudioStreamer

Creates a new audio streamer with default settings:

  • ChunkSize: 640 bytes (20ms at 16kHz 16-bit mono)
  • ChunkIntervalMs: 20ms
func (a *AudioStreamer) StreamBurst(
ctx context.Context,
audioData []byte,
sampleRate int,
inputChan chan<- stage.StreamElement,
) error

Sends all audio data as fast as possible without pacing. Preferred for pre-recorded audio to avoid false turn detections from natural speech pauses.

Parameters:

  • ctx - Context for cancellation
  • audioData - Raw PCM audio bytes
  • sampleRate - Sample rate in Hz (typically 16000)
  • inputChan - Pipeline input channel

Example:

streamer := streaming.NewAudioStreamer()
err := streamer.StreamBurst(ctx, audioData, 16000, inputChan)
func (a *AudioStreamer) StreamRealtime(
ctx context.Context,
audioData []byte,
sampleRate int,
inputChan chan<- stage.StreamElement,
) error

Sends audio data paced to match real-time playback. Each chunk is sent with a delay matching its duration.

Note: This mode can cause issues with providers that detect speech pauses mid-utterance. Use StreamBurst for pre-recorded audio.

func (a *AudioStreamer) SendChunk(
ctx context.Context,
chunk []byte,
sampleRate int,
inputChan chan<- stage.StreamElement,
) error

Sends a single audio chunk through the pipeline.

func SendEndOfStream(
ctx context.Context,
inputChan chan<- stage.StreamElement,
) error

Signals that audio input is complete for the current turn. This triggers the provider to generate a response.

Example:

// Stream audio
streamer.StreamBurst(ctx, audioData, 16000, inputChan)
// Signal end of input
streaming.SendEndOfStream(ctx, inputChan)
const (
DefaultChunkSize = 640 // 20ms at 16kHz 16-bit mono
DefaultSampleRate = 16000 // Required by Gemini Live API
DefaultChunkIntervalMs = 20 // Interval for real-time mode
)

Interface for executing tool calls during streaming sessions.

type ToolExecutor interface {
Execute(ctx context.Context, toolCalls []types.MessageToolCall) (*ToolExecutionResult, error)
}

Implementations are responsible for:

  • Looking up tools in a registry
  • Executing the tool functions
  • Formatting results for the provider
  • Handling execution errors
type ToolExecutionResult struct {
// For sending back to the streaming provider
ProviderResponses []providers.ToolResponse
// For state store capture
ResultMessages []types.Message
}
func SendToolResults(
ctx context.Context,
result *ToolExecutionResult,
inputChan chan<- stage.StreamElement,
) error

Sends tool execution results back through the pipeline to the provider, and includes tool result messages for state store capture.

func BuildToolResponseElement(result *ToolExecutionResult) stage.StreamElement

Creates a stream element containing tool results. The element includes:

  • tool_responses metadata for the provider
  • tool_result_messages for state store capture
func ExecuteAndSend(
ctx context.Context,
executor ToolExecutor,
toolCalls []types.MessageToolCall,
inputChan chan<- stage.StreamElement,
) error

Convenience function that executes tool calls and sends results through the pipeline in one operation.


type ResponseCollectorConfig struct {
ToolExecutor ToolExecutor // Called when tool calls are received
LogPrefix string // Prepended to log messages
}

Manages response collection from a streaming session, processing elements, handling tool calls, and signaling completion.

type ResponseCollector struct {
// ...
}
func NewResponseCollector(config ResponseCollectorConfig) *ResponseCollector

Creates a new response collector with the given configuration.

func (c *ResponseCollector) Start(
ctx context.Context,
outputChan <-chan stage.StreamElement,
inputChan chan<- stage.StreamElement,
) <-chan error

Begins collecting responses in a goroutine. Returns a channel that receives nil on success or an error on failure.

The collector will:

  1. Process incoming stream elements
  2. Execute tool calls via the ToolExecutor (if configured)
  3. Send tool results back through inputChan
  4. Signal completion or error through the returned channel

Example:

config := streaming.ResponseCollectorConfig{
ToolExecutor: myExecutor,
LogPrefix: "Session-123",
}
collector := streaming.NewResponseCollector(config)
doneChan := collector.Start(ctx, outputChan, inputChan)
// Wait for completion
if err := <-doneChan; err != nil {
log.Printf("Response collection failed: %v", err)
}
func DrainStaleMessages(outputChan <-chan stage.StreamElement) (int, error)

Removes any buffered messages from the output channel. Useful for clearing state between turns.

Returns:

  • Number of messages drained
  • ErrSessionEnded if the session ended during drain
func WaitForResponse(ctx context.Context, responseDone <-chan error) error

Convenience function for blocking until a response is received.


var (
ErrEmptyResponse = errors.New("empty response, likely interrupted")
ErrSessionEnded = errors.New("session ended")
)

package main
import (
"context"
"github.com/AltairaLabs/PromptKit/runtime/streaming"
"github.com/AltairaLabs/PromptKit/runtime/pipeline/stage"
)
// MyToolExecutor implements streaming.ToolExecutor
type MyToolExecutor struct {
registry *tools.Registry
}
func (e *MyToolExecutor) Execute(
ctx context.Context,
toolCalls []types.MessageToolCall,
) (*streaming.ToolExecutionResult, error) {
var responses []providers.ToolResponse
var messages []types.Message
for _, call := range toolCalls {
result, err := e.registry.Execute(ctx, call.Function.Name, call.Function.Arguments)
if err != nil {
responses = append(responses, providers.ToolResponse{
ToolCallID: call.ID,
Result: err.Error(),
IsError: true,
})
} else {
responses = append(responses, providers.ToolResponse{
ToolCallID: call.ID,
Result: string(result),
})
}
}
return &streaming.ToolExecutionResult{
ProviderResponses: responses,
ResultMessages: messages,
}, nil
}
func streamAudioTurn(ctx context.Context, audioData []byte, inputChan chan<- stage.StreamElement, outputChan <-chan stage.StreamElement) error {
// Stream audio in burst mode
streamer := streaming.NewAudioStreamer()
if err := streamer.StreamBurst(ctx, audioData, 16000, inputChan); err != nil {
return err
}
// Signal end of input
if err := streaming.SendEndOfStream(ctx, inputChan); err != nil {
return err
}
// Collect response with tool handling
executor := &MyToolExecutor{registry: myRegistry}
collector := streaming.NewResponseCollector(streaming.ResponseCollectorConfig{
ToolExecutor: executor,
LogPrefix: "AudioTurn",
})
doneChan := collector.Start(ctx, outputChan, inputChan)
return streaming.WaitForResponse(ctx, doneChan)
}