MCP Server 开发实战:为 Agent 构建自定义工具链

从零构建生产级 MCP Server,涵盖工具定义、鉴权设计、流式响应和测试策略,帮你把任何 API 变成 Agent 可用的工具。

AgentList Team · 2026年4月21日
MCPAgentTool Server开发实战API

MCP Server 开发实战:为 Agent 构建自定义工具链

MCP (Model Context Protocol) 让 Agent 能够通过统一协议调用外部工具。但官方示例通常停在"Hello World"阶段——一个返回固定字符串的工具。生产级 MCP Server 需要处理鉴权、错误恢复、流式响应和版本兼容。本文从零构建一个生产级的 MCP Server,覆盖这些真实场景。

MCP Server 的三种能力

MCP 定义了三种 Server 能力,理解它们的区别是设计 Server 的第一步:

能力 用途 典型场景
Tools Agent 可调用的函数 搜索文档、执行查询、发送通知
Resources Agent 可读取的数据 文件内容、数据库记录、配置信息
Prompts 预定义的 Prompt 模板 常见任务的标准流程、格式化指令

选择原则:如果 Agent 需要"做某件事"(有副作用),用 Tool。如果只需要"读取信息"(无副作用),用 Resource。如果需要"引导行为模式",用 Prompt。

从零搭建:一个实用的 MCP Server

我们将构建一个 "文档助手" MCP Server,提供文档搜索、摘要生成和书签管理功能。

项目结构

doc-assistant-mcp/
├── server.py           # MCP Server 主入口
├── tools/
│   ├── search.py       # 文档搜索工具
│   ├── summarize.py    # 摘要生成工具
│   └── bookmarks.py    # 书签管理工具
├── auth.py             # 鉴权模块
└── pyproject.toml

核心实现

# server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
    Tool,
    TextContent,
    CallToolResult,
)
import json

# 创建 Server 实例
app = Server("doc-assistant")

@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="search_docs",
            description="搜索文档库中的相关内容。返回与查询最匹配的文档片段。",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "搜索查询,使用自然语言描述你想找的内容",
                    },
                    "limit": {
                        "type": "integer",
                        "description": "返回结果数量,默认5",
                        "default": 5,
                    },
                    "doc_type": {
                        "type": "string",
                        "enum": ["api", "guide", "tutorial", "all"],
                        "description": "文档类型过滤",
                        "default": "all",
                    },
                },
                "required": ["query"],
            },
        ),
        Tool(
            name="bookmark",
            description="将文档片段保存为书签,供后续快速引用。每个书签需要标题和内容。",
            inputSchema={
                "type": "object",
                "properties": {
                    "title": {
                        "type": "string",
                        "description": "书签标题",
                    },
                    "content": {
                        "type": "string",
                        "description": "书签内容(文档片段)",
                    },
                    "tags": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "标签列表,用于分类",
                    },
                },
                "required": ["title", "content"],
            },
        ),
        Tool(
            name="get_bookmarks",
            description="获取已保存的书签列表,可按标签过滤。",
            inputSchema={
                "type": "object",
                "properties": {
                    "tag": {
                        "type": "string",
                        "description": "按标签过滤,不传则返回全部",
                    },
                },
            },
        ),
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    if name == "search_docs":
        return await handle_search(arguments)
    elif name == "bookmark":
        return await handle_bookmark(arguments)
    elif name == "get_bookmarks":
        return await handle_get_bookmarks(arguments)
    else:
        return [TextContent(type="text", text=f"未知工具: {name}")]

async def handle_search(args: dict) -> list[TextContent]:
    query = args["query"]
    limit = args.get("limit", 5)
    doc_type = args.get("doc_type", "all")

    try:
        results = await search_documents(query, limit=limit, doc_type=doc_type)
        if not results:
            return [TextContent(
                type="text",
                text=f"未找到与 '{query}' 相关的文档。建议尝试不同的关键词或扩大搜索范围。",
            )]
        formatted = format_search_results(results)
        return [TextContent(type="text", text=formatted)]
    except SearchError as e:
        return [TextContent(type="text", text=f"搜索失败: {e}")]

async def handle_bookmark(args: dict) -> list[TextContent]:
    title = args["title"]
    content = args["content"]
    tags = args.get("tags", [])

    bookmark_id = await save_bookmark(title, content, tags)
    return [TextContent(
        type="text",
        text=f"已保存书签 '{title}' (ID: {bookmark_id})",
    )]

async def handle_get_bookmarks(args: dict) -> list[TextContent]:
    tag = args.get("tag")
    bookmarks = await load_bookmarks(tag=tag)
    if not bookmarks:
        return [TextContent(type="text", text="暂无书签。")]
    formatted = "\n\n".join(
        f"**{b['title']}** (标签: {', '.join(b['tags'])})\n{b['content'][:200]}"
        for b in bookmarks
    )
    return [TextContent(type="text", text=formatted)]

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await app.run(read_stream, write_stream, app.create_initialization_options())

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

鉴权设计:API Key 与作用域

生产级 MCP Server 需要鉴权,防止未授权访问。MCP 协议本身不定义鉴权机制,鉴权在传输层实现。

# auth.py
from dataclasses import dataclass
from typing import Any
import hashlib
import hmac

@dataclass
class AuthScope:
    tools: list[str]       # 允许使用的工具列表
    max_calls: int         # 每小时最大调用次数
    rate_limit_window: int # 限流窗口(秒)

# 定义不同权限级别
SCOPES = {
    "read_only": AuthScope(
        tools=["search_docs", "get_bookmarks"],
        max_calls=100,
        rate_limit_window=3600,
    ),
    "read_write": AuthScope(
        tools=["search_docs", "bookmark", "get_bookmarks"],
        max_calls=500,
        rate_limit_window=3600,
    ),
    "admin": AuthScope(
        tools=["search_docs", "bookmark", "get_bookmarks", "admin_tools"],
        max_calls=2000,
        rate_limit_window=3600,
    ),
}

class MCPAuth:
    def __init__(self):
        self.api_keys: dict[str, str] = {}  # key_id -> hashed_secret
        self.key_scopes: dict[str, AuthScope] = {}
        self.call_counts: dict[str, list[float]] = {}  # key_id -> timestamps

    def register_key(self, key_id: str, secret: str, scope_name: str = "read_only"):
        hashed = hashlib.sha256(secret.encode()).hexdigest()
        self.api_keys[key_id] = hashed
        self.key_scopes[key_id] = SCOPES.get(scope_name, SCOPES["read_only"])

    def authenticate(self, key_id: str, secret: str) -> bool:
        if key_id not in self.api_keys:
            return False
        hashed = hashlib.sha256(secret.encode()).hexdigest()
        return hmac.compare_digest(self.api_keys[key_id], hashed)

    def authorize(self, key_id: str, tool_name: str) -> tuple[bool, str]:
        scope = self.key_scopes.get(key_id)
        if not scope:
            return False, "无效的 API Key"

        if tool_name not in scope.tools:
            return False, f"工具 '{tool_name}' 不在权限范围内"

        # 限流检查
        import time
        now = time.time()
        calls = self.call_counts.get(key_id, [])
        calls = [t for t in calls if now - t < scope.rate_limit_window]

        if len(calls) >= scope.max_calls:
            return False, f"已达到调用频率限制 ({scope.max_calls}/小时)"

        calls.append(now)
        self.call_counts[key_id] = calls
        return True, "ok"

错误处理:优雅降级

Agent 系统中,工具调用失败不应导致整个任务崩溃。每个工具调用都应该有明确的错误类型和恢复建议。

from enum import Enum
from dataclasses import dataclass

class ErrorType(Enum):
    VALIDATION = "validation"       # 输入参数错误
    NOT_FOUND = "not_found"         # 资源不存在
    RATE_LIMITED = "rate_limited"   # 被限流
    UPSTREAM_ERROR = "upstream"     # 上游服务异常
    TIMEOUT = "timeout"             # 超时

@dataclass
class ToolError:
    error_type: ErrorType
    message: str
    suggestion: str  # 给 Agent 的恢复建议
    retryable: bool

def handle_tool_error(error: ToolError) -> TextContent:
    parts = [f"[{error.error_type.value}] {error.message}"]
    if error.retryable:
        parts.append(f"建议: {error.suggestion}(可重试)")
    else:
        parts.append(f"建议: {error.suggestion}")
    return TextContent(type="text", text="\n".join(parts))

# 在工具处理函数中使用
async def handle_search(args: dict) -> list[TextContent]:
    query = args.get("query", "").strip()
    if not query:
        return [handle_tool_error(ToolError(
            error_type=ErrorType.VALIDATION,
            message="搜索查询不能为空",
            suggestion="提供一个非空的搜索关键词",
            retryable=True,
        ))]

    limit = args.get("limit", 5)
    if limit > 20:
        return [handle_tool_error(ToolError(
            error_type=ErrorType.VALIDATION,
            message=f"limit 参数值 {limit} 超过最大值 20",
            suggestion="使用 1-20 之间的值",
            retryable=True,
        ))]

    try:
        results = await search_documents(query, limit=limit)
    except TimeoutError:
        return [handle_tool_error(ToolError(
            error_type=ErrorType.TIMEOUT,
            message="搜索请求超时",
            suggestion="缩小搜索范围或减少结果数量后重试",
            retryable=True,
        ))]
    except Exception as e:
        return [handle_tool_error(ToolError(
            error_type=ErrorType.UPSTREAM_ERROR,
            message=f"搜索服务异常: {e}",
            suggestion="稍后重试,或使用其他搜索方式",
            retryable=True,
        ))]

    return [TextContent(type="text", text=format_search_results(results))]

测试策略

MCP Server 的测试分三层:

# test_tools.py
import pytest

# 第一层:工具逻辑测试(不依赖 MCP 协议)
@pytest.mark.asyncio
async def test_search_returns_results():
    results = await search_documents("RAG pipeline", limit=3)
    assert len(results) <= 3
    for r in results:
        assert "title" in r
        assert "content" in r

@pytest.mark.asyncio
async def test_search_validates_limit():
    with pytest.raises(ValueError):
        await search_documents("test", limit=100)

# 第二层:Schema 验证测试
def test_tool_schemas_are_valid():
    tools = asyncio.run(list_tools())
    for tool in tools:
        assert tool.inputSchema.get("type") == "object"
        required = tool.inputSchema.get("required", [])
        properties = tool.inputSchema.get("properties", {})
        for field_name in required:
            assert field_name in properties

# 第三层:集成测试(模拟 MCP 调用)
@pytest.mark.asyncio
async def test_search_tool_integration():
    result = await call_tool("search_docs", {"query": "test query"})
    assert len(result) > 0
    assert result[0].type == "text"

@pytest.mark.asyncio
async def test_bookmark_roundtrip():
    # 保存
    save_result = await call_tool("bookmark", {
        "title": "Test Bookmark",
        "content": "Test content",
        "tags": ["test"],
    })
    assert "已保存" in save_result[0].text

    # 读取
    get_result = await call_tool("get_bookmarks", {"tag": "test"})
    assert "Test Bookmark" in get_result[0].text

常见误区

误区一:"工具描述不重要,Agent 会自己猜" 工具描述是 Agent 决定何时调用工具的唯一依据。模糊的描述导致 Agent 在错误场景调用工具。每个工具的 description 应该明确说明:这个工具做什么、什么时候该用它、什么时候不该用它。

误区二:"MCP Server 不需要限流" 如果你的 MCP Server 暴露了付费 API(如搜索、翻译、LLM 调用),没有限流意味着一次 prompt injection 就可能导致大量 API 调用和费用。限流是安全措施,不是性能优化。

误区三:"错误信息应该暴露内部实现" 不要在错误信息中暴露数据库连接字符串、API endpoint 或内部服务名称。错误信息应该给 Agent 足够的上下文来恢复,但不应该给攻击者侦察系统的信息。

总结

  • MCP Server 提供三种能力(Tools/Resources/Prompts),根据副作用类型选择合适的能力
  • 工具描述是 Agent 调用决策的关键——写得清晰、具体、包含使用场景
  • 鉴权在传输层实现:API Key + 作用域 + 限流,三者缺一不可
  • 错误处理要区分类型并给出恢复建议:让 Agent 知道"出了什么问题"和"下一步怎么办"
  • 三层测试策略:工具逻辑 → Schema 验证 → 集成测试,确保每层都可靠

本文由 AgentList 团队整理,更多 MCP 相关项目请浏览本站项目列表。