编写一个基于 Model Context Protocol (MCP) 的完整示例

6次阅读
没有评论

我会为你编写一个基于 Model Context Protocol (MCP) 的完整示例,实现一个具备基础 AI 对话能力的服务端和客户端。MCP 是一种轻量级的 AI 能力交互协议,这个示例会包含核心的协议处理、AI 响应生成和简单的通信逻辑。

实现思路

  1. 搭建 MCP 基础服务端:监听指定端口,处理客户端连接和 MCP 协议消息
  2. 集成 AI 能力:使用开源的transformers库加载轻量级 LLM 模型(如 DistilGPT2)提供对话能力
  3. 实现 MCP 客户端:发送请求并接收 AI 响应
  4. 处理 MCP 协议的核心消息类型(请求、响应、错误)

完整代码实现

1. 安装依赖

bash

运行

pip install transformers torch socketio python-socketio[client]

2. MCP AI 服务端代码 (mcp_ai_server.py)

python

运行

import json
import socket
import threading
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline

# --------------------------
# 1. 初始化AI模型(轻量级LLM)
# --------------------------
# 加载DistilGPT2模型(轻量、适合演示)
tokenizer = AutoTokenizer.from_pretrained("distilgpt2")
model = AutoModelForCausalLM.from_pretrained("distilgpt2")

# 设置padding token(GPT2默认无padding token)
tokenizer.pad_token = tokenizer.eos_token
model.config.pad_token_id = model.config.eos_token_id

# 创建文本生成pipeline
ai_pipeline = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    max_new_tokens=100,  # 生成文本的最大长度
    temperature=0.7,     # 生成随机性
    top_p=0.9,
    repetition_penalty=1.1
)

# --------------------------
# 2. MCP协议核心处理逻辑
# --------------------------
class MCPAIHandler:
    """MCP协议AI请求处理器"""
    def __init__(self):
        self.mcp_version = "1.0"  # MCP协议版本

    def process_mcp_request(self, request_data):
        """处理MCP请求并生成AI响应"""
        try:
            # 解析MCP请求
            mcp_request = json.loads(request_data)
            
            # 验证MCP协议版本
            if mcp_request.get("version") != self.mcp_version:
                return self._create_error_response("Unsupported MCP version")
            
            # 验证请求类型和内容
            if mcp_request.get("type") != "ai_request" or "prompt" not in mcp_request:
                return self._create_error_response("Invalid request: missing prompt or wrong type")
            
            # 生成AI响应
            prompt = mcp_request["prompt"]
            ai_response = self._generate_ai_response(prompt)
            
            # 构建MCP响应
            return self._create_success_response(ai_response)
        
        except json.JSONDecodeError:
            return self._create_error_response("Invalid JSON format")
        except Exception as e:
            return self._create_error_response(f"Server error: {str(e)}")

    def _generate_ai_response(self, prompt):
        """生成AI回复"""
        # 构建对话提示
        full_prompt = f"User: {prompt}\nAI:"
        
        # 调用模型生成回复
        result = ai_pipeline(
            full_prompt,
            pad_token_id=tokenizer.eos_token_id,
            eos_token_id=tokenizer.eos_token_id
        )
        
        # 提取并清理AI回复
        response_text = result[0]["generated_text"].split("AI:")[-1].strip()
        # 移除多余的换行和空格
        response_text = response_text.replace("\n", " ").strip()
        
        return response_text

    def _create_success_response(self, data):
        """创建成功的MCP响应"""
        return json.dumps({
            "version": self.mcp_version,
            "type": "ai_response",
            "status": "success",
            "data": data
        })

    def _create_error_response(self, message):
        """创建错误的MCP响应"""
        return json.dumps({
            "version": self.mcp_version,
            "type": "ai_response",
            "status": "error",
            "message": message
        })

# --------------------------
# 3. TCP服务端实现
# --------------------------
class MCPAI_Server:
    def __init__(self, host="localhost", port=8888):
        self.host = host
        self.port = port
        self.handler = MCPAIHandler()
        self.server_socket = None
        self.is_running = False

    def start(self):
        """启动MCP AI服务端"""
        # 创建TCP socket
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # 允许端口复用
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # 绑定地址和端口
        self.server_socket.bind((self.host, self.port))
        # 开始监听
        self.server_socket.listen(5)
        self.is_running = True
        
        print(f"MCP AI Server started on {self.host}:{self.port}")
        print("Waiting for client connections...")

        # 循环接受客户端连接
        while self.is_running:
            try:
                client_socket, client_addr = self.server_socket.accept()
                print(f"New connection from {client_addr}")
                # 为每个客户端创建独立线程处理
                client_thread = threading.Thread(
                    target=self.handle_client,
                    args=(client_socket, client_addr)
                )
                client_thread.daemon = True
                client_thread.start()
            except KeyboardInterrupt:
                print("\nServer is shutting down...")
                self.stop()
            except Exception as e:
                print(f"Server error: {e}")

    def handle_client(self, client_socket, client_addr):
        """处理单个客户端连接"""
        try:
            # 设置超时时间
            client_socket.settimeout(30)
            
            while True:
                # 接收客户端数据(最大1024字节)
                data = client_socket.recv(1024).decode("utf-8")
                if not data:
                    break
                
                print(f"Received from {client_addr}: {data}")
                
                # 处理MCP请求
                response = self.handler.process_mcp_request(data)
                
                # 发送响应
                client_socket.sendall(response.encode("utf-8"))
                print(f"Sent to {client_addr}: {response}")
        
        except socket.timeout:
            print(f"Client {client_addr} timeout")
        except Exception as e:
            print(f"Error handling client {client_addr}: {e}")
        finally:
            # 关闭客户端连接
            client_socket.close()
            print(f"Connection with {client_addr} closed")

    def stop(self):
        """停止服务端"""
        self.is_running = False
        if self.server_socket:
            self.server_socket.close()
        print("MCP AI Server stopped")

# --------------------------
# 启动服务端
# --------------------------
if __name__ == "__main__":
    server = MCPAI_Server(host="localhost", port=8888)
    try:
        server.start()
    except KeyboardInterrupt:
        server.stop()

3. MCP AI 客户端代码 (mcp_ai_client.py)

python

运行

import json
import socket

class MCPAIClient:
    """MCP AI客户端"""
    def __init__(self, host="localhost", port=8888):
        self.host = host
        self.port = port
        self.client_socket = None

    def connect(self):
        """连接到MCP AI服务端"""
        self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            self.client_socket.connect((self.host, self.port))
            print(f"Connected to MCP AI Server at {self.host}:{self.port}")
            return True
        except Exception as e:
            print(f"Connection failed: {e}")
            return False

    def send_ai_request(self, prompt):
        """发送AI请求并获取响应"""
        if not self.client_socket:
            print("Not connected to server")
            return None
        
        # 构建MCP请求消息
        mcp_request = {
            "version": "1.0",
            "type": "ai_request",
            "prompt": prompt
        }
        
        try:
            # 发送请求
            request_data = json.dumps(mcp_request)
            self.client_socket.sendall(request_data.encode("utf-8"))
            
            # 接收响应
            response_data = self.client_socket.recv(1024).decode("utf-8")
            if not response_data:
                return None
            
            # 解析响应
            response = json.loads(response_data)
            return response
        
        except json.JSONDecodeError:
            print("Invalid response format from server")
            return None
        except Exception as e:
            print(f"Request failed: {e}")
            return None

    def close(self):
        """关闭连接"""
        if self.client_socket:
            self.client_socket.close()
            print("Connection closed")

# --------------------------
# 客户端使用示例
# --------------------------
if __name__ == "__main__":
    # 创建客户端
    client = MCPAIClient(host="localhost", port=8888)
    
    # 连接服务端
    if client.connect():
        # 对话循环
        while True:
            user_input = input("\nYou: ")
            if user_input.lower() in ["exit", "quit", "bye"]:
                break
            
            # 发送请求并获取响应
            response = client.send_ai_request(user_input)
            
            # 处理响应
            if response:
                if response["status"] == "success":
                    print(f"AI: {response['data']}")
                else:
                    print(f"AI Error: {response['message']}")
            else:
                print("No response from AI server")
        
        # 关闭连接
        client.close()

代码使用说明

  1. 启动服务端:bash运行python mcp_ai_server.py
    • 首次运行会自动下载 DistilGPT2 模型(约 350MB),请确保网络通畅
    • 服务端启动后会监听 localhost:8888
  2. 启动客户端:bash运行python mcp_ai_client.py
    • 客户端连接成功后,输入任意文本即可与 AI 对话
    • 输入 exit/quit/bye 退出对话
  3. 示例交互效果:plaintextConnected to MCP AI Server at localhost:8888 You: Hello, how are you? AI: I'm doing well, thank you for asking! It's great to connect with you today. You: What is Python? AI: Python is a high-level, interpreted programming language known for its readability and simplicity. It's widely used in web development, data science, and artificial intelligence. You: exit Connection closed

核心代码解释

  1. AI 模型部分
    • 使用 DistilGPT2(GPT2 的轻量化版本),适合本地演示
    • _generate_ai_response 方法处理提示词并生成回复,清理多余格式
  2. MCP 协议处理
    • 定义了 MCP 1.0 版本的请求 / 响应格式
    • process_mcp_request 方法验证请求合法性并调用 AI 生成回复
    • 区分成功 / 错误响应,保证协议的规范性
  3. 网络通信
    • 服务端使用多线程处理多个客户端连接
    • 客户端实现简单的请求发送和响应解析逻辑
    • 包含基本的异常处理和超时机制

总结

  1. 本示例实现了基于 MCP 协议的 AI 对话服务,包含完整的服务端(提供 AI 能力)和客户端(发起请求)
  2. 核心逻辑:MCP 协议消息解析 → AI 模型生成回复 → MCP 协议响应返回
  3. 可扩展方向:支持更多 MCP 消息类型、集成更大的 LLM 模型、添加身份验证、实现 WebSocket 通信等

这个示例保持了 MCP 协议的轻量特性,同时提供了可运行的 AI 能力交互流程,适合作为 MCP 协议集成 AI 能力的入门参考。

正文完
可以使用微信扫码关注公众号(ID:xzluomor)
post-qrcode
 0
评论(没有评论)