Skip to content

Runtime Reference

Complete API reference for the PromptKit Runtime components.

The PromptKit Runtime provides the core execution engine for LLM interactions. It handles:

  • Pipeline Execution: Middleware-based processing with streaming support
  • Provider Integration: Multi-LLM support (OpenAI, Anthropic, Google Gemini)
  • Tool Execution: Function calling with MCP integration
  • State Management: Conversation persistence and caching
  • Validation: Content and response validation
  • Configuration: Flexible runtime configuration
ComponentDescriptionReference
PipelineMiddleware-based execution enginepipeline.md
ProvidersLLM provider implementationsproviders.md
ToolsFunction calling and MCP integrationtools.md
MCPModel Context Protocol supportmcp.md
State StoreConversation persistencestatestore.md
ValidatorsContent validationvalidators.md
TypesCore data structurestypes.md
LoggingStructured logging with contextlogging.md
import (
"github.com/AltairaLabs/PromptKit/runtime/pipeline"
"github.com/AltairaLabs/PromptKit/runtime/providers"
"github.com/AltairaLabs/PromptKit/runtime/tools"
"github.com/AltairaLabs/PromptKit/runtime/mcp"
"github.com/AltairaLabs/PromptKit/runtime/statestore"
"github.com/AltairaLabs/PromptKit/runtime/validators"
"github.com/AltairaLabs/PromptKit/runtime/types"
"github.com/AltairaLabs/PromptKit/runtime/logger"
)
import (
"context"
"github.com/AltairaLabs/PromptKit/runtime/pipeline"
"github.com/AltairaLabs/PromptKit/runtime/pipeline/middleware"
"github.com/AltairaLabs/PromptKit/runtime/providers/openai"
)
// Create provider
provider := openai.NewOpenAIProvider(
"openai",
"gpt-4o-mini",
"", // default baseURL
openai.DefaultProviderDefaults(),
false, // includeRawOutput
)
// Build pipeline with middleware
pipe := pipeline.NewPipeline(
middleware.ProviderMiddleware(provider, nil, nil, &middleware.ProviderMiddlewareConfig{
MaxTokens: 1500,
Temperature: 0.7,
}),
)
// Execute
result, err := pipe.Execute(ctx, "user", "Hello!")
if err != nil {
log.Fatal(err)
}
fmt.Println(result.Response.Content)
import (
"github.com/AltairaLabs/PromptKit/runtime/tools"
"github.com/AltairaLabs/PromptKit/runtime/mcp"
)
// Create tool registry
toolRegistry := tools.NewRegistry()
// Register MCP server
mcpRegistry := mcp.NewRegistry()
mcpRegistry.RegisterServer(mcp.ServerConfig{
Name: "filesystem",
Command: "npx",
Args: []string{"-y", "@modelcontextprotocol/server-filesystem", "/allowed"},
})
// Discover and register MCP tools
mcpExecutor := tools.NewMCPExecutor(mcpRegistry)
toolRegistry.RegisterExecutor(mcpExecutor)
// Use in pipeline
pipe := pipeline.NewPipeline(
middleware.ProviderMiddleware(provider, toolRegistry, &pipeline.ToolPolicy{
ToolChoice: "auto",
MaxRounds: 5,
}, config),
)
// Execute with streaming
streamChan, err := pipe.ExecuteStream(ctx, "user", "Write a story")
if err != nil {
log.Fatal(err)
}
// Process chunks
for chunk := range streamChan {
if chunk.Error != nil {
log.Printf("Error: %v\n", chunk.Error)
break
}
if chunk.Delta != "" {
fmt.Print(chunk.Delta)
}
if chunk.FinalResult != nil {
fmt.Printf("\n\nTotal tokens: %d\n", chunk.FinalResult.CostInfo.InputTokens)
}
}
config := &pipeline.PipelineRuntimeConfig{
MaxConcurrentExecutions: 100, // Concurrent pipeline executions
StreamBufferSize: 100, // Stream chunk buffer size
ExecutionTimeout: 30 * time.Second, // Per-execution timeout
GracefulShutdownTimeout: 10 * time.Second, // Shutdown grace period
}
pipe := pipeline.NewPipelineWithConfig(config, middleware...)
defaults := providers.ProviderDefaults{
Temperature: 0.7,
TopP: 0.95,
MaxTokens: 2000,
Pricing: providers.Pricing{
InputCostPer1K: 0.00015, // $0.15 per 1M tokens
OutputCostPer1K: 0.0006, // $0.60 per 1M tokens
},
}
provider := openai.NewOpenAIProvider("openai", "gpt-4o-mini", "", defaults, false)
policy := &pipeline.ToolPolicy{
ToolChoice: "auto", // "auto", "required", "none", or specific tool
MaxRounds: 5, // Max tool execution rounds
MaxToolCallsPerTurn: 10, // Max tools per LLM response
Blocklist: []string{"dangerous_tool"}, // Blocked tools
}
result, err := pipe.Execute(ctx, "user", "Hello")
if err != nil {
switch {
case errors.Is(err, pipeline.ErrPipelineShuttingDown):
log.Println("Pipeline is shutting down")
case errors.Is(err, context.DeadlineExceeded):
log.Println("Execution timeout")
default:
log.Printf("Execution failed: %v", err)
}
}
result, err := provider.Predict(ctx, req)
if err != nil {
// Check for rate limiting, API errors, network errors
log.Printf("Provider error: %v", err)
}
result, err := toolRegistry.Execute(ctx, "tool_name", argsJSON)
if err != nil {
log.Printf("Tool execution failed: %v", err)
}
// Always close resources
defer pipe.Shutdown(context.Background())
defer provider.Close()
defer mcpRegistry.Close()
// Use context for cancellation
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
result, err := pipe.Execute(ctx, "user", "Hello")
// Always drain streaming channels
streamChan, err := pipe.ExecuteStream(ctx, "user", "Hello")
if err != nil {
return err
}
for chunk := range streamChan {
// Process chunks
if chunk.Error != nil {
break
}
}
// Handle partial results on error
result, err := pipe.Execute(ctx, "user", "Hello")
if err != nil {
// Check if we got partial execution data
if result != nil && len(result.Messages) > 0 {
log.Printf("Partial execution: %d messages", len(result.Messages))
}
}
  • Configure MaxConcurrentExecutions based on provider rate limits
  • Use semaphores to prevent overwhelming providers
  • Consider graceful degradation under load
  • Use streaming for interactive applications (chatbots, UIs)
  • Use non-streaming for batch processing, testing, analytics
  • Streaming has ~10% overhead but better UX
  • MCP tools run in separate processes (stdio overhead)
  • Consider tool execution timeouts
  • Use repository executors for fast in-memory tools
  • Pipeline creates fresh ExecutionContext per call (prevents contamination)
  • Large conversation histories can increase memory usage
  • Consider state store cleanup strategies