Runtime Reference

Complete API reference for the PromptKit Runtime components.

Overview

The PromptKit Runtime provides the core execution engine for LLM interactions. It handles:

Quick Reference

Core Components

ComponentDescriptionReference
PipelineMiddleware-based execution enginepipeline.md
ProvidersLLM provider implementationsproviders.md
ToolsFunction calling and MCP integrationtools.md
MCPModel Context Protocol supportmcp.md
State StoreConversation persistencestatestore.md
ValidatorsContent validationvalidators.md
TypesCore data structurestypes.md
LoggingStructured logging with contextlogging.md

Import Paths

import (
    "github.com/AltairaLabs/PromptKit/runtime/pipeline"
    "github.com/AltairaLabs/PromptKit/runtime/providers"
    "github.com/AltairaLabs/PromptKit/runtime/tools"
    "github.com/AltairaLabs/PromptKit/runtime/mcp"
    "github.com/AltairaLabs/PromptKit/runtime/statestore"
    "github.com/AltairaLabs/PromptKit/runtime/validators"
    "github.com/AltairaLabs/PromptKit/runtime/types"
    "github.com/AltairaLabs/PromptKit/runtime/logger"
)

Basic Usage

Simple Pipeline

import (
    "context"
    "github.com/AltairaLabs/PromptKit/runtime/pipeline"
    "github.com/AltairaLabs/PromptKit/runtime/pipeline/middleware"
    "github.com/AltairaLabs/PromptKit/runtime/providers/openai"
)

// Create provider
provider := openai.NewOpenAIProvider(
    "openai",
    "gpt-4o-mini",
    "", // default baseURL
    openai.DefaultProviderDefaults(),
    false, // includeRawOutput
)

// Build pipeline with middleware
pipe := pipeline.NewPipeline(
    middleware.ProviderMiddleware(provider, nil, nil, &middleware.ProviderMiddlewareConfig{
        MaxTokens:   1500,
        Temperature: 0.7,
    }),
)

// Execute
result, err := pipe.Execute(ctx, "user", "Hello!")
if err != nil {
    log.Fatal(err)
}

fmt.Println(result.Response.Content)

With Tools

import (
    "github.com/AltairaLabs/PromptKit/runtime/tools"
    "github.com/AltairaLabs/PromptKit/runtime/mcp"
)

// Create tool registry
toolRegistry := tools.NewRegistry()

// Register MCP server
mcpRegistry := mcp.NewRegistry()
mcpRegistry.RegisterServer(mcp.ServerConfig{
    Name:    "filesystem",
    Command: "npx",
    Args:    []string{"-y", "@modelcontextprotocol/server-filesystem", "/allowed"},
})

// Discover and register MCP tools
mcpExecutor := tools.NewMCPExecutor(mcpRegistry)
toolRegistry.RegisterExecutor(mcpExecutor)

// Use in pipeline
pipe := pipeline.NewPipeline(
    middleware.ProviderMiddleware(provider, toolRegistry, &pipeline.ToolPolicy{
        ToolChoice: "auto",
        MaxRounds:  5,
    }, config),
)

Streaming Execution

// Execute with streaming
streamChan, err := pipe.ExecuteStream(ctx, "user", "Write a story")
if err != nil {
    log.Fatal(err)
}

// Process chunks
for chunk := range streamChan {
    if chunk.Error != nil {
        log.Printf("Error: %v\n", chunk.Error)
        break
    }
    
    if chunk.Delta != "" {
        fmt.Print(chunk.Delta)
    }
    
    if chunk.FinalResult != nil {
        fmt.Printf("\n\nTotal tokens: %d\n", chunk.FinalResult.CostInfo.InputTokens)
    }
}

Configuration

Pipeline Configuration

config := &pipeline.PipelineRuntimeConfig{
    MaxConcurrentExecutions: 100,        // Concurrent pipeline executions
    StreamBufferSize:        100,        // Stream chunk buffer size
    ExecutionTimeout:        30 * time.Second,  // Per-execution timeout
    GracefulShutdownTimeout: 10 * time.Second,  // Shutdown grace period
}

pipe := pipeline.NewPipelineWithConfig(config, middleware...)

Provider Configuration

defaults := providers.ProviderDefaults{
    Temperature: 0.7,
    TopP:        0.95,
    MaxTokens:   2000,
    Pricing: providers.Pricing{
        InputCostPer1K:  0.00015,  // $0.15 per 1M tokens
        OutputCostPer1K: 0.0006,   // $0.60 per 1M tokens
    },
}

provider := openai.NewOpenAIProvider("openai", "gpt-4o-mini", "", defaults, false)

Tool Policy

policy := &pipeline.ToolPolicy{
    ToolChoice:          "auto",     // "auto", "required", "none", or specific tool
    MaxRounds:           5,          // Max tool execution rounds
    MaxToolCallsPerTurn: 10,         // Max tools per LLM response
    Blocklist:           []string{"dangerous_tool"},  // Blocked tools
}

Error Handling

Pipeline Errors

result, err := pipe.Execute(ctx, "user", "Hello")
if err != nil {
    switch {
    case errors.Is(err, pipeline.ErrPipelineShuttingDown):
        log.Println("Pipeline is shutting down")
    case errors.Is(err, context.DeadlineExceeded):
        log.Println("Execution timeout")
    default:
        log.Printf("Execution failed: %v", err)
    }
}

Provider Errors

result, err := provider.Predict(ctx, req)
if err != nil {
    // Check for rate limiting, API errors, network errors
    log.Printf("Provider error: %v", err)
}

Tool Errors

result, err := toolRegistry.Execute(ctx, "tool_name", argsJSON)
if err != nil {
    log.Printf("Tool execution failed: %v", err)
}

Best Practices

Resource Management

// Always close resources
defer pipe.Shutdown(context.Background())
defer provider.Close()
defer mcpRegistry.Close()

Context Cancellation

// Use context for cancellation
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

result, err := pipe.Execute(ctx, "user", "Hello")

Streaming Cleanup

// Always drain streaming channels
streamChan, err := pipe.ExecuteStream(ctx, "user", "Hello")
if err != nil {
    return err
}

for chunk := range streamChan {
    // Process chunks
    if chunk.Error != nil {
        break
    }
}

Error Recovery

// Handle partial results on error
result, err := pipe.Execute(ctx, "user", "Hello")
if err != nil {
    // Check if we got partial execution data
    if result != nil && len(result.Messages) > 0 {
        log.Printf("Partial execution: %d messages", len(result.Messages))
    }
}

Performance Considerations

Concurrency Control

Streaming vs Non-Streaming

Tool Execution

Memory Management

See Also

Next Steps