How to Integrate MCP
Connect Model Context Protocol servers for external tools.
Goal
Set up MCP servers and integrate tools into your pipeline.
Quick Start
Step 1: Create MCP Registry
import "github.com/AltairaLabs/PromptKit/runtime/mcp"
registry := mcp.NewRegistry()
defer registry.Close()
Step 2: Register MCP Server
err := registry.RegisterServer(mcp.ServerConfig{
Name: "filesystem",
Command: "npx",
Args: []string{"-y", "@modelcontextprotocol/server-filesystem", "/allowed"},
})
if err != nil {
log.Fatal(err)
}
Step 3: Discover Tools
ctx := context.Background()
serverTools, err := registry.ListAllTools(ctx)
if err != nil {
log.Fatal(err)
}
for serverName, tools := range serverTools {
log.Printf("Server %s has %d tools\n", serverName, len(tools))
}
Step 4: Integrate with Pipeline
// Create tool registry
toolRegistry := tools.NewRegistry()
// Register MCP executor
mcpExecutor := tools.NewMCPExecutor(registry)
toolRegistry.RegisterExecutor(mcpExecutor)
// Register MCP tools
for _, mcpTools := range serverTools {
for _, mcpTool := range mcpTools {
toolRegistry.Register(&tools.ToolDescriptor{
Name: mcpTool.Name,
Description: mcpTool.Description,
InputSchema: mcpTool.InputSchema,
Mode: "mcp",
})
}
}
// Use in pipeline
pipe := pipeline.NewPipeline(
middleware.ProviderMiddleware(provider, toolRegistry, &pipeline.ToolPolicy{
ToolChoice: "auto",
}, config),
)
Common MCP Servers
Filesystem Server
registry.RegisterServer(mcp.ServerConfig{
Name: "filesystem",
Command: "npx",
Args: []string{"-y", "@modelcontextprotocol/server-filesystem", "/data"},
})
Available Tools:
read_file: Read file contentswrite_file: Write file contentslist_directory: List directory contentscreate_directory: Create directorydelete_file: Delete file
Memory Server
registry.RegisterServer(mcp.ServerConfig{
Name: "memory",
Command: "npx",
Args: []string{"-y", "@modelcontextprotocol/server-memory"},
})
Available Tools:
store_memory: Store key-value dataretrieve_memory: Retrieve stored data
Custom Python Server
registry.RegisterServer(mcp.ServerConfig{
Name: "database",
Command: "python",
Args: []string{"/path/to/mcp_server.py"},
Env: map[string]string{
"DB_CONNECTION": os.Getenv("DB_CONNECTION"),
},
})
Complete Example
package main
import (
"context"
"log"
"github.com/AltairaLabs/PromptKit/runtime/mcp"
"github.com/AltairaLabs/PromptKit/runtime/pipeline"
"github.com/AltairaLabs/PromptKit/runtime/pipeline/middleware"
"github.com/AltairaLabs/PromptKit/runtime/providers/openai"
"github.com/AltairaLabs/PromptKit/runtime/tools"
)
func main() {
// Create MCP registry
mcpRegistry := mcp.NewRegistry()
defer mcpRegistry.Close()
// Register filesystem server
mcpRegistry.RegisterServer(mcp.ServerConfig{
Name: "filesystem",
Command: "npx",
Args: []string{"-y", "@modelcontextprotocol/server-filesystem", "/data"},
})
// Create tool registry
toolRegistry := tools.NewRegistry()
toolRegistry.RegisterExecutor(tools.NewMCPExecutor(mcpRegistry))
// Discover and register tools
ctx := context.Background()
serverTools, _ := mcpRegistry.ListAllTools(ctx)
for _, mcpTools := range serverTools {
for _, mcpTool := range mcpTools {
toolRegistry.Register(&tools.ToolDescriptor{
Name: mcpTool.Name,
Description: mcpTool.Description,
InputSchema: mcpTool.InputSchema,
Mode: "mcp",
})
}
}
// Create provider
provider := openai.NewOpenAIProvider(
"openai",
"gpt-4o-mini",
"",
openai.DefaultProviderDefaults(),
false,
)
defer provider.Close()
// Build pipeline with tools
pipe := pipeline.NewPipeline(
middleware.ProviderMiddleware(provider, toolRegistry, &pipeline.ToolPolicy{
ToolChoice: "auto",
MaxRounds: 5,
}, &middleware.ProviderMiddlewareConfig{
MaxTokens: 1500,
Temperature: 0.7,
}),
)
defer pipe.Shutdown(context.Background())
// Execute with tool access
result, err := pipe.Execute(ctx, "user", "Read the contents of /data/example.txt")
if err != nil {
log.Fatal(err)
}
log.Printf("Response: %s\n", result.Response.Content)
}
MCP Client Configuration
Timeouts
options := mcp.ClientOptions{
RequestTimeout: 30 * time.Second,
MaxRetries: 3,
RetryBackoff: time.Second,
}
client := mcp.NewStdioClientWithOptions(config, options)
Manual Client Usage
// Get client for specific server
client, err := registry.GetClient(ctx, "filesystem")
if err != nil {
log.Fatal(err)
}
// List available tools
tools, err := client.ListTools(ctx)
for _, tool := range tools {
log.Printf("Tool: %s - %s\n", tool.Name, tool.Description)
}
// Call tool directly
args := json.RawMessage(`{"path": "/data/file.txt"}`)
response, err := client.CallTool(ctx, "read_file", args)
if err != nil {
log.Fatal(err)
}
// Process response
for _, content := range response.Content {
if content.Type == "text" {
log.Println(content.Text)
}
}
Tool Discovery
Automatic Discovery
// Discover all tools from all servers
serverTools, err := registry.ListAllTools(ctx)
if err != nil {
log.Fatal(err)
}
for serverName, tools := range serverTools {
log.Printf("Server: %s\n", serverName)
for _, tool := range tools {
log.Printf(" - %s: %s\n", tool.Name, tool.Description)
log.Printf(" Schema: %s\n", tool.InputSchema)
}
}
Get Tool Schema
schema, err := registry.GetToolSchema(ctx, "read_file")
if err != nil {
log.Fatal(err)
}
log.Printf("Tool: %s\n", schema.Name)
log.Printf("Description: %s\n", schema.Description)
log.Printf("Schema: %s\n", schema.InputSchema)
Tool Execution
Through Tool Registry
// Register MCP tool
toolRegistry.Register(&tools.ToolDescriptor{
Name: "read_file",
Description: "Read file contents",
InputSchema: json.RawMessage(`{"type":"object","properties":{"path":{"type":"string"}}}`),
Mode: "mcp",
})
// Execute
args := json.RawMessage(`{"path": "/data/file.txt"}`)
result, err := toolRegistry.Execute(toolDescriptor, args)
if err != nil {
log.Fatal(err)
}
log.Printf("Result: %s\n", result)
Direct MCP Call
client, _ := registry.GetClientForTool(ctx, "read_file")
response, err := client.CallTool(ctx, "read_file", args)
Error Handling
Server Connection Errors
client, err := registry.GetClient(ctx, "filesystem")
if err != nil {
if strings.Contains(err.Error(), "not found") {
log.Println("Server not registered")
} else {
log.Printf("Connection error: %v", err)
}
return
}
Tool Execution Errors
response, err := client.CallTool(ctx, "read_file", args)
if err != nil {
log.Printf("Tool execution failed: %v", err)
return
}
// Check response for errors
for _, content := range response.Content {
if content.Type == "error" {
log.Printf("Tool error: %s", content.Text)
}
}
Troubleshooting
Issue: Server Won’t Start
Problem: MCP server command fails.
Solutions:
-
Check command is installed:
which npx npx -v -
Test command manually:
npx -y @modelcontextprotocol/server-filesystem /data -
Check server logs:
// Enable debug logging config.Env = map[string]string{"DEBUG": "1"}
Issue: Tools Not Discovered
Problem: ListAllTools returns empty.
Solution: Ensure server initialized:
// Force initialization
client, err := registry.GetClient(ctx, "filesystem")
if err != nil {
log.Fatal(err)
}
// Now list tools
tools, err := client.ListTools(ctx)
Issue: Tool Call Timeout
Problem: Tool execution hangs.
Solution: Increase timeout:
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
response, err := client.CallTool(ctx, "read_file", args)
Best Practices
-
Always close MCP registry:
defer registry.Close() -
Handle tool errors gracefully:
if err != nil { log.Printf("Tool failed: %v", err) // Provide fallback behavior } -
Limit allowed paths for filesystem server:
Args: []string{"-y", "@modelcontextprotocol/server-filesystem", "/allowed/path"}, -
Use environment variables for sensitive config:
Env: map[string]string{ "API_KEY": os.Getenv("API_KEY"), }, -
Set reasonable tool policies:
policy := &pipeline.ToolPolicy{ MaxRounds: 5, MaxToolCallsPerTurn: 10, }
Next Steps
- Implement Tools - Create custom tools
- Validate Tools - Tool validation
- Configure Pipeline - Complete setup
See Also
- MCP Reference - Complete API
- MCP Tutorial - Step-by-step guide
Was this page helpful?