Streaming Responses
Stream LLM responses for real-time output.
Display LLM responses progressively as they’re generated.
Quick Start
Section titled “Quick Start”import "io"
stream, err := pipe.ExecuteStream(ctx, "user", "Write a story")if err != nil { log.Fatal(err)}defer stream.Close()
for { chunk, err := stream.Next() if err == io.EOF { break } if err != nil { log.Printf("Stream error: %v", err) break }
fmt.Print(chunk.Content)}Basic Streaming
Section titled “Basic Streaming”Execute Stream
Section titled “Execute Stream”ctx := context.Background()
stream, err := pipe.ExecuteStream(ctx, "user", "Generate a long document")if err != nil { log.Fatal(err)}defer stream.Close()
// Process chunksfor { chunk, err := stream.Next() if err == io.EOF { break // Stream complete } if err != nil { log.Fatal(err) }
// Display chunk fmt.Print(chunk.Content)}With Session Context
Section titled “With Session Context”sessionID := "user-123"
stream, err := pipe.ExecuteStreamWithContext(ctx, sessionID, "user", "Continue our conversation")if err != nil { log.Fatal(err)}defer stream.Close()
for { chunk, err := stream.Next() if err == io.EOF { break } if err != nil { log.Fatal(err) }
fmt.Print(chunk.Content)}Chunk Processing
Section titled “Chunk Processing”Accumulate Content
Section titled “Accumulate Content”var accumulated string
stream, _ := pipe.ExecuteStream(ctx, "user", "Write code")defer stream.Close()
for { chunk, err := stream.Next() if err == io.EOF { break } if err != nil { log.Fatal(err) }
accumulated += chunk.Content fmt.Print(chunk.Content)}
// Use final contentlog.Printf("Total: %d chars", len(accumulated))Real-Time Display
Section titled “Real-Time Display”import "time"
stream, _ := pipe.ExecuteStream(ctx, "user", "Explain quantum physics")defer stream.Close()
lastUpdate := time.Now()minDelay := 50 * time.Millisecond
for { chunk, err := stream.Next() if err == io.EOF { break } if err != nil { break }
// Throttle updates if time.Since(lastUpdate) >= minDelay { fmt.Print(chunk.Content) lastUpdate = time.Now() }}Parse Structured Output
Section titled “Parse Structured Output”import "strings"
stream, _ := pipe.ExecuteStream(ctx, "user", "List 5 items")defer stream.Close()
var buffer stringvar items []string
for { chunk, err := stream.Next() if err == io.EOF { break } if err != nil { break }
buffer += chunk.Content
// Parse lines as they arrive if strings.Contains(buffer, "\n") { lines := strings.Split(buffer, "\n") for i := 0; i < len(lines)-1; i++ { line := strings.TrimSpace(lines[i]) if line != "" { items = append(items, line) log.Println("Found item:", line) } } buffer = lines[len(lines)-1] }}
// Process remaining bufferif buffer != "" { items = append(items, strings.TrimSpace(buffer))}Web Streaming
Section titled “Web Streaming”HTTP Server-Sent Events
Section titled “HTTP Server-Sent Events”import "net/http"
func handleStream(w http.ResponseWriter, r *http.Request) { // Set SSE headers w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming unsupported", http.StatusInternalServerError) return }
prompt := r.URL.Query().Get("prompt") ctx := r.Context()
stream, err := pipe.ExecuteStream(ctx, "user", prompt) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } defer stream.Close()
for { chunk, err := stream.Next() if err == io.EOF { fmt.Fprintf(w, "data: [DONE]\n\n") flusher.Flush() break } if err != nil { fmt.Fprintf(w, "event: error\ndata: %s\n\n", err.Error()) flusher.Flush() break }
// Send chunk fmt.Fprintf(w, "data: %s\n\n", chunk.Content) flusher.Flush() }}
http.HandleFunc("/stream", handleStream)http.ListenAndServe(":8080", nil)WebSocket Streaming
Section titled “WebSocket Streaming”import "github.com/gorilla/websocket"
var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true },}
func handleWebSocket(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println("Upgrade error:", err) return } defer conn.Close()
// Read message _, message, err := conn.ReadMessage() if err != nil { return }
ctx := r.Context() stream, err := pipe.ExecuteStream(ctx, "user", string(message)) if err != nil { conn.WriteMessage(websocket.TextMessage, []byte(err.Error())) return } defer stream.Close()
for { chunk, err := stream.Next() if err == io.EOF { conn.WriteMessage(websocket.TextMessage, []byte("[DONE]")) break } if err != nil { conn.WriteMessage(websocket.TextMessage, []byte(err.Error())) break }
err = conn.WriteMessage(websocket.TextMessage, []byte(chunk.Content)) if err != nil { break } }}Stream Control
Section titled “Stream Control”Cancel Stream
Section titled “Cancel Stream”import "context"
ctx, cancel := context.WithCancel(context.Background())
stream, _ := pipe.ExecuteStream(ctx, "user", "Long response")defer stream.Close()
// Cancel after 10 chunkscount := 0for { chunk, err := stream.Next() if err == io.EOF { break } if err != nil { break }
fmt.Print(chunk.Content)
count++ if count >= 10 { cancel() // Stop stream break }}Timeout Stream
Section titled “Timeout Stream”ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)defer cancel()
stream, err := pipe.ExecuteStream(ctx, "user", "Complex query")if err != nil { log.Fatal(err)}defer stream.Close()
for { chunk, err := stream.Next() if err == io.EOF { break } if err != nil { if errors.Is(err, context.DeadlineExceeded) { log.Println("Stream timed out") } break }
fmt.Print(chunk.Content)}Error Handling
Section titled “Error Handling”Recover from Stream Errors
Section titled “Recover from Stream Errors”stream, _ := pipe.ExecuteStream(ctx, "user", "Generate content")defer stream.Close()
var accumulated stringvar lastGoodChunk string
for { chunk, err := stream.Next() if err == io.EOF { break } if err != nil { log.Printf("Stream error: %v", err)
// Use accumulated content if len(accumulated) > 0 { log.Println("Using partial response") fmt.Println(accumulated) } break }
accumulated += chunk.Content lastGoodChunk = chunk.Content fmt.Print(chunk.Content)}Retry on Error
Section titled “Retry on Error”func streamWithRetry(pipe *pipeline.Pipeline, ctx context.Context, role, content string, maxRetries int) error { for i := 0; i < maxRetries; i++ { stream, err := pipe.ExecuteStream(ctx, role, content) if err != nil { log.Printf("Stream start failed (attempt %d/%d): %v", i+1, maxRetries, err) time.Sleep(time.Second * time.Duration(i+1)) continue } defer stream.Close()
// Process stream for { chunk, err := stream.Next() if err == io.EOF { return nil // Success } if err != nil { log.Printf("Stream error (attempt %d/%d): %v", i+1, maxRetries, err) break // Retry }
fmt.Print(chunk.Content) }
time.Sleep(time.Second * time.Duration(i+1)) }
return fmt.Errorf("stream failed after %d retries", maxRetries)}Complete Example
Section titled “Complete Example”package main
import ( "context" "fmt" "io" "log" "net/http"
"github.com/AltairaLabs/PromptKit/runtime/pipeline" "github.com/AltairaLabs/PromptKit/runtime/pipeline/middleware" "github.com/AltairaLabs/PromptKit/runtime/providers/openai")
func main() { // Create provider provider := openai.NewOpenAIProvider( "openai", "gpt-4o-mini", "", openai.DefaultProviderDefaults(), false, ) defer provider.Close()
// Build pipeline pipe := pipeline.NewPipeline( middleware.ProviderMiddleware(provider, nil, nil, &middleware.ProviderMiddlewareConfig{ MaxTokens: 1500, Temperature: 0.7, }), ) defer pipe.Shutdown(context.Background())
// Set up HTTP endpoint http.HandleFunc("/stream", func(w http.ResponseWriter, r *http.Request) { handleStreamRequest(w, r, pipe) })
log.Println("Server running on :8080") http.ListenAndServe(":8080", nil)}
func handleStreamRequest(w http.ResponseWriter, r *http.Request, pipe *pipeline.Pipeline) { // SSE headers w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming unsupported", http.StatusInternalServerError) return }
prompt := r.URL.Query().Get("prompt") if prompt == "" { http.Error(w, "Missing prompt parameter", http.StatusBadRequest) return }
ctx := r.Context() stream, err := pipe.ExecuteStream(ctx, "user", prompt) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } defer stream.Close()
for { chunk, err := stream.Next() if err == io.EOF { fmt.Fprintf(w, "data: [DONE]\n\n") flusher.Flush() break } if err != nil { fmt.Fprintf(w, "event: error\ndata: %s\n\n", err.Error()) flusher.Flush() break }
fmt.Fprintf(w, "data: %s\n\n", chunk.Content) flusher.Flush() }}Troubleshooting
Section titled “Troubleshooting”Issue: Chunks Arrive Slowly
Section titled “Issue: Chunks Arrive Slowly”Problem: Long delays between chunks.
Solutions:
- Check network latency
- Use faster model:
provider := openai.NewOpenAIProvider("openai", "gpt-4o-mini", ...)
- Reduce max tokens:
config.MaxTokens = 500
Issue: Stream Hangs
Section titled “Issue: Stream Hangs”Problem: stream.Next() blocks indefinitely.
Solutions:
-
Add timeout:
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)defer cancel() -
Check provider connection:
// Test non-streaming firstresult, err := pipe.Execute(ctx, "user", "test")
Issue: Incomplete Responses
Section titled “Issue: Incomplete Responses”Problem: Stream ends prematurely.
Solutions:
-
Check for errors:
if err != io.EOF {log.Printf("Stream error: %v", err)} -
Accumulate content:
if len(accumulated) > 0 {// Use partial response}
Best Practices
Section titled “Best Practices”-
Always close streams:
defer stream.Close() -
Handle EOF correctly:
if err == io.EOF {break // Normal completion} -
Set timeouts:
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)defer cancel() -
Flush HTTP streams:
flusher.Flush() -
Accumulate content for recovery:
var accumulated stringaccumulated += chunk.Content -
Throttle UI updates:
if time.Since(lastUpdate) >= minDelay {fmt.Print(chunk.Content)} -
Handle context cancellation:
select {case <-ctx.Done():returndefault:// Continue}
Next Steps
Section titled “Next Steps”- Handle Errors - Error strategies
- Monitor Costs - Track usage
- Configure Pipeline - Complete setup
See Also
Section titled “See Also”- Pipeline Reference - Stream API
- Providers Reference - Streaming support