How to Integrate MCP

Connect Model Context Protocol servers for external tools.

Goal

Set up MCP servers and integrate tools into your pipeline.

Quick Start

Step 1: Create MCP Registry

import "github.com/AltairaLabs/PromptKit/runtime/mcp"

registry := mcp.NewRegistry()
defer registry.Close()

Step 2: Register MCP Server

err := registry.RegisterServer(mcp.ServerConfig{
    Name:    "filesystem",
    Command: "npx",
    Args:    []string{"-y", "@modelcontextprotocol/server-filesystem", "/allowed"},
})
if err != nil {
    log.Fatal(err)
}

Step 3: Discover Tools

ctx := context.Background()
serverTools, err := registry.ListAllTools(ctx)
if err != nil {
    log.Fatal(err)
}

for serverName, tools := range serverTools {
    log.Printf("Server %s has %d tools\n", serverName, len(tools))
}

Step 4: Integrate with Pipeline

// Create tool registry
toolRegistry := tools.NewRegistry()

// Register MCP executor
mcpExecutor := tools.NewMCPExecutor(registry)
toolRegistry.RegisterExecutor(mcpExecutor)

// Register MCP tools
for _, mcpTools := range serverTools {
    for _, mcpTool := range mcpTools {
        toolRegistry.Register(&tools.ToolDescriptor{
            Name:        mcpTool.Name,
            Description: mcpTool.Description,
            InputSchema: mcpTool.InputSchema,
            Mode:        "mcp",
        })
    }
}

// Use in pipeline
pipe := pipeline.NewPipeline(
    middleware.ProviderMiddleware(provider, toolRegistry, &pipeline.ToolPolicy{
        ToolChoice: "auto",
    }, config),
)

Common MCP Servers

Filesystem Server

registry.RegisterServer(mcp.ServerConfig{
    Name:    "filesystem",
    Command: "npx",
    Args:    []string{"-y", "@modelcontextprotocol/server-filesystem", "/data"},
})

Available Tools:

Memory Server

registry.RegisterServer(mcp.ServerConfig{
    Name:    "memory",
    Command: "npx",
    Args:    []string{"-y", "@modelcontextprotocol/server-memory"},
})

Available Tools:

Custom Python Server

registry.RegisterServer(mcp.ServerConfig{
    Name:    "database",
    Command: "python",
    Args:    []string{"/path/to/mcp_server.py"},
    Env: map[string]string{
        "DB_CONNECTION": os.Getenv("DB_CONNECTION"),
    },
})

Complete Example

package main

import (
    "context"
    "log"
    
    "github.com/AltairaLabs/PromptKit/runtime/mcp"
    "github.com/AltairaLabs/PromptKit/runtime/pipeline"
    "github.com/AltairaLabs/PromptKit/runtime/pipeline/middleware"
    "github.com/AltairaLabs/PromptKit/runtime/providers/openai"
    "github.com/AltairaLabs/PromptKit/runtime/tools"
)

func main() {
    // Create MCP registry
    mcpRegistry := mcp.NewRegistry()
    defer mcpRegistry.Close()
    
    // Register filesystem server
    mcpRegistry.RegisterServer(mcp.ServerConfig{
        Name:    "filesystem",
        Command: "npx",
        Args:    []string{"-y", "@modelcontextprotocol/server-filesystem", "/data"},
    })
    
    // Create tool registry
    toolRegistry := tools.NewRegistry()
    toolRegistry.RegisterExecutor(tools.NewMCPExecutor(mcpRegistry))
    
    // Discover and register tools
    ctx := context.Background()
    serverTools, _ := mcpRegistry.ListAllTools(ctx)
    for _, mcpTools := range serverTools {
        for _, mcpTool := range mcpTools {
            toolRegistry.Register(&tools.ToolDescriptor{
                Name:        mcpTool.Name,
                Description: mcpTool.Description,
                InputSchema: mcpTool.InputSchema,
                Mode:        "mcp",
            })
        }
    }
    
    // Create provider
    provider := openai.NewOpenAIProvider(
        "openai",
        "gpt-4o-mini",
        "",
        openai.DefaultProviderDefaults(),
        false,
    )
    defer provider.Close()
    
    // Build pipeline with tools
    pipe := pipeline.NewPipeline(
        middleware.ProviderMiddleware(provider, toolRegistry, &pipeline.ToolPolicy{
            ToolChoice: "auto",
            MaxRounds:  5,
        }, &middleware.ProviderMiddlewareConfig{
            MaxTokens:   1500,
            Temperature: 0.7,
        }),
    )
    defer pipe.Shutdown(context.Background())
    
    // Execute with tool access
    result, err := pipe.Execute(ctx, "user", "Read the contents of /data/example.txt")
    if err != nil {
        log.Fatal(err)
    }
    
    log.Printf("Response: %s\n", result.Response.Content)
}

MCP Client Configuration

Timeouts

options := mcp.ClientOptions{
    RequestTimeout: 30 * time.Second,
    MaxRetries:     3,
    RetryBackoff:   time.Second,
}

client := mcp.NewStdioClientWithOptions(config, options)

Manual Client Usage

// Get client for specific server
client, err := registry.GetClient(ctx, "filesystem")
if err != nil {
    log.Fatal(err)
}

// List available tools
tools, err := client.ListTools(ctx)
for _, tool := range tools {
    log.Printf("Tool: %s - %s\n", tool.Name, tool.Description)
}

// Call tool directly
args := json.RawMessage(`{"path": "/data/file.txt"}`)
response, err := client.CallTool(ctx, "read_file", args)
if err != nil {
    log.Fatal(err)
}

// Process response
for _, content := range response.Content {
    if content.Type == "text" {
        log.Println(content.Text)
    }
}

Tool Discovery

Automatic Discovery

// Discover all tools from all servers
serverTools, err := registry.ListAllTools(ctx)
if err != nil {
    log.Fatal(err)
}

for serverName, tools := range serverTools {
    log.Printf("Server: %s\n", serverName)
    for _, tool := range tools {
        log.Printf("  - %s: %s\n", tool.Name, tool.Description)
        log.Printf("    Schema: %s\n", tool.InputSchema)
    }
}

Get Tool Schema

schema, err := registry.GetToolSchema(ctx, "read_file")
if err != nil {
    log.Fatal(err)
}

log.Printf("Tool: %s\n", schema.Name)
log.Printf("Description: %s\n", schema.Description)
log.Printf("Schema: %s\n", schema.InputSchema)

Tool Execution

Through Tool Registry

// Register MCP tool
toolRegistry.Register(&tools.ToolDescriptor{
    Name:        "read_file",
    Description: "Read file contents",
    InputSchema: json.RawMessage(`{"type":"object","properties":{"path":{"type":"string"}}}`),
    Mode:        "mcp",
})

// Execute
args := json.RawMessage(`{"path": "/data/file.txt"}`)
result, err := toolRegistry.Execute(toolDescriptor, args)
if err != nil {
    log.Fatal(err)
}

log.Printf("Result: %s\n", result)

Direct MCP Call

client, _ := registry.GetClientForTool(ctx, "read_file")
response, err := client.CallTool(ctx, "read_file", args)

Error Handling

Server Connection Errors

client, err := registry.GetClient(ctx, "filesystem")
if err != nil {
    if strings.Contains(err.Error(), "not found") {
        log.Println("Server not registered")
    } else {
        log.Printf("Connection error: %v", err)
    }
    return
}

Tool Execution Errors

response, err := client.CallTool(ctx, "read_file", args)
if err != nil {
    log.Printf("Tool execution failed: %v", err)
    return
}

// Check response for errors
for _, content := range response.Content {
    if content.Type == "error" {
        log.Printf("Tool error: %s", content.Text)
    }
}

Troubleshooting

Issue: Server Won’t Start

Problem: MCP server command fails.

Solutions:

  1. Check command is installed:

    which npx
    npx -v
  2. Test command manually:

    npx -y @modelcontextprotocol/server-filesystem /data
  3. Check server logs:

    // Enable debug logging
    config.Env = map[string]string{"DEBUG": "1"}

Issue: Tools Not Discovered

Problem: ListAllTools returns empty.

Solution: Ensure server initialized:

// Force initialization
client, err := registry.GetClient(ctx, "filesystem")
if err != nil {
    log.Fatal(err)
}

// Now list tools
tools, err := client.ListTools(ctx)

Issue: Tool Call Timeout

Problem: Tool execution hangs.

Solution: Increase timeout:

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

response, err := client.CallTool(ctx, "read_file", args)

Best Practices

  1. Always close MCP registry:

    defer registry.Close()
  2. Handle tool errors gracefully:

    if err != nil {
        log.Printf("Tool failed: %v", err)
        // Provide fallback behavior
    }
  3. Limit allowed paths for filesystem server:

    Args: []string{"-y", "@modelcontextprotocol/server-filesystem", "/allowed/path"},
  4. Use environment variables for sensitive config:

    Env: map[string]string{
        "API_KEY": os.Getenv("API_KEY"),
    },
  5. Set reasonable tool policies:

    policy := &pipeline.ToolPolicy{
        MaxRounds:           5,
        MaxToolCallsPerTurn: 10,
    }

Next Steps

See Also