Skip to content

Integrate MCP

Connect Model Context Protocol servers for external tools.

Set up MCP servers and integrate tools into your pipeline.

import "github.com/AltairaLabs/PromptKit/runtime/mcp"
registry := mcp.NewRegistry()
defer registry.Close()
err := registry.RegisterServer(mcp.ServerConfig{
Name: "filesystem",
Command: "npx",
Args: []string{"-y", "@modelcontextprotocol/server-filesystem", "/allowed"},
})
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
serverTools, err := registry.ListAllTools(ctx)
if err != nil {
log.Fatal(err)
}
for serverName, tools := range serverTools {
log.Printf("Server %s has %d tools\n", serverName, len(tools))
}
// Create tool registry
toolRegistry := tools.NewRegistry()
// Register MCP executor
mcpExecutor := tools.NewMCPExecutor(registry)
toolRegistry.RegisterExecutor(mcpExecutor)
// Register MCP tools
for _, mcpTools := range serverTools {
for _, mcpTool := range mcpTools {
toolRegistry.Register(&tools.ToolDescriptor{
Name: mcpTool.Name,
Description: mcpTool.Description,
InputSchema: mcpTool.InputSchema,
Mode: "mcp",
})
}
}
// Use in pipeline
pipe := pipeline.NewPipeline(
middleware.ProviderMiddleware(provider, toolRegistry, &pipeline.ToolPolicy{
ToolChoice: "auto",
}, config),
)
registry.RegisterServer(mcp.ServerConfig{
Name: "filesystem",
Command: "npx",
Args: []string{"-y", "@modelcontextprotocol/server-filesystem", "/data"},
})

Available Tools:

  • read_file: Read file contents
  • write_file: Write file contents
  • list_directory: List directory contents
  • create_directory: Create directory
  • delete_file: Delete file
registry.RegisterServer(mcp.ServerConfig{
Name: "memory",
Command: "npx",
Args: []string{"-y", "@modelcontextprotocol/server-memory"},
})

Available Tools:

  • store_memory: Store key-value data
  • retrieve_memory: Retrieve stored data
registry.RegisterServer(mcp.ServerConfig{
Name: "database",
Command: "python",
Args: []string{"/path/to/mcp_server.py"},
Env: map[string]string{
"DB_CONNECTION": os.Getenv("DB_CONNECTION"),
},
})
package main
import (
"context"
"log"
"github.com/AltairaLabs/PromptKit/runtime/mcp"
"github.com/AltairaLabs/PromptKit/runtime/pipeline"
"github.com/AltairaLabs/PromptKit/runtime/pipeline/middleware"
"github.com/AltairaLabs/PromptKit/runtime/providers/openai"
"github.com/AltairaLabs/PromptKit/runtime/tools"
)
func main() {
// Create MCP registry
mcpRegistry := mcp.NewRegistry()
defer mcpRegistry.Close()
// Register filesystem server
mcpRegistry.RegisterServer(mcp.ServerConfig{
Name: "filesystem",
Command: "npx",
Args: []string{"-y", "@modelcontextprotocol/server-filesystem", "/data"},
})
// Create tool registry
toolRegistry := tools.NewRegistry()
toolRegistry.RegisterExecutor(tools.NewMCPExecutor(mcpRegistry))
// Discover and register tools
ctx := context.Background()
serverTools, _ := mcpRegistry.ListAllTools(ctx)
for _, mcpTools := range serverTools {
for _, mcpTool := range mcpTools {
toolRegistry.Register(&tools.ToolDescriptor{
Name: mcpTool.Name,
Description: mcpTool.Description,
InputSchema: mcpTool.InputSchema,
Mode: "mcp",
})
}
}
// Create provider
provider := openai.NewOpenAIProvider(
"openai",
"gpt-4o-mini",
"",
openai.DefaultProviderDefaults(),
false,
)
defer provider.Close()
// Build pipeline with tools
pipe := pipeline.NewPipeline(
middleware.ProviderMiddleware(provider, toolRegistry, &pipeline.ToolPolicy{
ToolChoice: "auto",
MaxRounds: 5,
}, &middleware.ProviderMiddlewareConfig{
MaxTokens: 1500,
Temperature: 0.7,
}),
)
defer pipe.Shutdown(context.Background())
// Execute with tool access
result, err := pipe.Execute(ctx, "user", "Read the contents of /data/example.txt")
if err != nil {
log.Fatal(err)
}
log.Printf("Response: %s\n", result.Response.Content)
}
options := mcp.ClientOptions{
RequestTimeout: 30 * time.Second,
MaxRetries: 3,
RetryBackoff: time.Second,
}
client := mcp.NewStdioClientWithOptions(config, options)
// Get client for specific server
client, err := registry.GetClient(ctx, "filesystem")
if err != nil {
log.Fatal(err)
}
// List available tools
tools, err := client.ListTools(ctx)
for _, tool := range tools {
log.Printf("Tool: %s - %s\n", tool.Name, tool.Description)
}
// Call tool directly
args := json.RawMessage(`{"path": "/data/file.txt"}`)
response, err := client.CallTool(ctx, "read_file", args)
if err != nil {
log.Fatal(err)
}
// Process response
for _, content := range response.Content {
if content.Type == "text" {
log.Println(content.Text)
}
}
// Discover all tools from all servers
serverTools, err := registry.ListAllTools(ctx)
if err != nil {
log.Fatal(err)
}
for serverName, tools := range serverTools {
log.Printf("Server: %s\n", serverName)
for _, tool := range tools {
log.Printf(" - %s: %s\n", tool.Name, tool.Description)
log.Printf(" Schema: %s\n", tool.InputSchema)
}
}
schema, err := registry.GetToolSchema(ctx, "read_file")
if err != nil {
log.Fatal(err)
}
log.Printf("Tool: %s\n", schema.Name)
log.Printf("Description: %s\n", schema.Description)
log.Printf("Schema: %s\n", schema.InputSchema)
// Register MCP tool
toolRegistry.Register(&tools.ToolDescriptor{
Name: "read_file",
Description: "Read file contents",
InputSchema: json.RawMessage(`{"type":"object","properties":{"path":{"type":"string"}}}`),
Mode: "mcp",
})
// Execute
args := json.RawMessage(`{"path": "/data/file.txt"}`)
result, err := toolRegistry.Execute(toolDescriptor, args)
if err != nil {
log.Fatal(err)
}
log.Printf("Result: %s\n", result)
client, _ := registry.GetClientForTool(ctx, "read_file")
response, err := client.CallTool(ctx, "read_file", args)
client, err := registry.GetClient(ctx, "filesystem")
if err != nil {
if strings.Contains(err.Error(), "not found") {
log.Println("Server not registered")
} else {
log.Printf("Connection error: %v", err)
}
return
}
response, err := client.CallTool(ctx, "read_file", args)
if err != nil {
log.Printf("Tool execution failed: %v", err)
return
}
// Check response for errors
for _, content := range response.Content {
if content.Type == "error" {
log.Printf("Tool error: %s", content.Text)
}
}

Problem: MCP server command fails.

Solutions:

  1. Check command is installed:

    Terminal window
    which npx
    npx -v
  2. Test command manually:

    Terminal window
    npx -y @modelcontextprotocol/server-filesystem /data
  3. Check server logs:

    // Enable debug logging
    config.Env = map[string]string{"DEBUG": "1"}

Problem: ListAllTools returns empty.

Solution: Ensure server initialized:

// Force initialization
client, err := registry.GetClient(ctx, "filesystem")
if err != nil {
log.Fatal(err)
}
// Now list tools
tools, err := client.ListTools(ctx)

Problem: Tool execution hangs.

Solution: Increase timeout:

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
response, err := client.CallTool(ctx, "read_file", args)
  1. Always close MCP registry:

    defer registry.Close()
  2. Handle tool errors gracefully:

    if err != nil {
    log.Printf("Tool failed: %v", err)
    // Provide fallback behavior
    }
  3. Limit allowed paths for filesystem server:

    Args: []string{"-y", "@modelcontextprotocol/server-filesystem", "/allowed/path"},
  4. Use environment variables for sensitive config:

    Env: map[string]string{
    "API_KEY": os.Getenv("API_KEY"),
    },
  5. Set reasonable tool policies:

    policy := &pipeline.ToolPolicy{
    MaxRounds: 5,
    MaxToolCallsPerTurn: 10,
    }