我会为你编写一个基于 Model Context Protocol (MCP) 的完整示例,实现一个具备基础 AI 对话能力的服务端和客户端。MCP 是一种轻量级的 AI 能力交互协议,这个示例会包含核心的协议处理、AI 响应生成和简单的通信逻辑。
实现思路
- 搭建 MCP 基础服务端:监听指定端口,处理客户端连接和 MCP 协议消息
- 集成 AI 能力:使用开源的
transformers库加载轻量级 LLM 模型(如 DistilGPT2)提供对话能力 - 实现 MCP 客户端:发送请求并接收 AI 响应
- 处理 MCP 协议的核心消息类型(请求、响应、错误)
完整代码实现
1. 安装依赖
bash
运行
pip install transformers torch socketio python-socketio[client]
2. MCP AI 服务端代码 (mcp_ai_server.py)
python
运行
import json
import socket
import threading
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
# --------------------------
# 1. 初始化AI模型(轻量级LLM)
# --------------------------
# 加载DistilGPT2模型(轻量、适合演示)
tokenizer = AutoTokenizer.from_pretrained("distilgpt2")
model = AutoModelForCausalLM.from_pretrained("distilgpt2")
# 设置padding token(GPT2默认无padding token)
tokenizer.pad_token = tokenizer.eos_token
model.config.pad_token_id = model.config.eos_token_id
# 创建文本生成pipeline
ai_pipeline = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
max_new_tokens=100, # 生成文本的最大长度
temperature=0.7, # 生成随机性
top_p=0.9,
repetition_penalty=1.1
)
# --------------------------
# 2. MCP协议核心处理逻辑
# --------------------------
class MCPAIHandler:
"""MCP协议AI请求处理器"""
def __init__(self):
self.mcp_version = "1.0" # MCP协议版本
def process_mcp_request(self, request_data):
"""处理MCP请求并生成AI响应"""
try:
# 解析MCP请求
mcp_request = json.loads(request_data)
# 验证MCP协议版本
if mcp_request.get("version") != self.mcp_version:
return self._create_error_response("Unsupported MCP version")
# 验证请求类型和内容
if mcp_request.get("type") != "ai_request" or "prompt" not in mcp_request:
return self._create_error_response("Invalid request: missing prompt or wrong type")
# 生成AI响应
prompt = mcp_request["prompt"]
ai_response = self._generate_ai_response(prompt)
# 构建MCP响应
return self._create_success_response(ai_response)
except json.JSONDecodeError:
return self._create_error_response("Invalid JSON format")
except Exception as e:
return self._create_error_response(f"Server error: {str(e)}")
def _generate_ai_response(self, prompt):
"""生成AI回复"""
# 构建对话提示
full_prompt = f"User: {prompt}\nAI:"
# 调用模型生成回复
result = ai_pipeline(
full_prompt,
pad_token_id=tokenizer.eos_token_id,
eos_token_id=tokenizer.eos_token_id
)
# 提取并清理AI回复
response_text = result[0]["generated_text"].split("AI:")[-1].strip()
# 移除多余的换行和空格
response_text = response_text.replace("\n", " ").strip()
return response_text
def _create_success_response(self, data):
"""创建成功的MCP响应"""
return json.dumps({
"version": self.mcp_version,
"type": "ai_response",
"status": "success",
"data": data
})
def _create_error_response(self, message):
"""创建错误的MCP响应"""
return json.dumps({
"version": self.mcp_version,
"type": "ai_response",
"status": "error",
"message": message
})
# --------------------------
# 3. TCP服务端实现
# --------------------------
class MCPAI_Server:
def __init__(self, host="localhost", port=8888):
self.host = host
self.port = port
self.handler = MCPAIHandler()
self.server_socket = None
self.is_running = False
def start(self):
"""启动MCP AI服务端"""
# 创建TCP socket
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 允许端口复用
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定地址和端口
self.server_socket.bind((self.host, self.port))
# 开始监听
self.server_socket.listen(5)
self.is_running = True
print(f"MCP AI Server started on {self.host}:{self.port}")
print("Waiting for client connections...")
# 循环接受客户端连接
while self.is_running:
try:
client_socket, client_addr = self.server_socket.accept()
print(f"New connection from {client_addr}")
# 为每个客户端创建独立线程处理
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, client_addr)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\nServer is shutting down...")
self.stop()
except Exception as e:
print(f"Server error: {e}")
def handle_client(self, client_socket, client_addr):
"""处理单个客户端连接"""
try:
# 设置超时时间
client_socket.settimeout(30)
while True:
# 接收客户端数据(最大1024字节)
data = client_socket.recv(1024).decode("utf-8")
if not data:
break
print(f"Received from {client_addr}: {data}")
# 处理MCP请求
response = self.handler.process_mcp_request(data)
# 发送响应
client_socket.sendall(response.encode("utf-8"))
print(f"Sent to {client_addr}: {response}")
except socket.timeout:
print(f"Client {client_addr} timeout")
except Exception as e:
print(f"Error handling client {client_addr}: {e}")
finally:
# 关闭客户端连接
client_socket.close()
print(f"Connection with {client_addr} closed")
def stop(self):
"""停止服务端"""
self.is_running = False
if self.server_socket:
self.server_socket.close()
print("MCP AI Server stopped")
# --------------------------
# 启动服务端
# --------------------------
if __name__ == "__main__":
server = MCPAI_Server(host="localhost", port=8888)
try:
server.start()
except KeyboardInterrupt:
server.stop()
3. MCP AI 客户端代码 (mcp_ai_client.py)
python
运行
import json
import socket
class MCPAIClient:
"""MCP AI客户端"""
def __init__(self, host="localhost", port=8888):
self.host = host
self.port = port
self.client_socket = None
def connect(self):
"""连接到MCP AI服务端"""
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.client_socket.connect((self.host, self.port))
print(f"Connected to MCP AI Server at {self.host}:{self.port}")
return True
except Exception as e:
print(f"Connection failed: {e}")
return False
def send_ai_request(self, prompt):
"""发送AI请求并获取响应"""
if not self.client_socket:
print("Not connected to server")
return None
# 构建MCP请求消息
mcp_request = {
"version": "1.0",
"type": "ai_request",
"prompt": prompt
}
try:
# 发送请求
request_data = json.dumps(mcp_request)
self.client_socket.sendall(request_data.encode("utf-8"))
# 接收响应
response_data = self.client_socket.recv(1024).decode("utf-8")
if not response_data:
return None
# 解析响应
response = json.loads(response_data)
return response
except json.JSONDecodeError:
print("Invalid response format from server")
return None
except Exception as e:
print(f"Request failed: {e}")
return None
def close(self):
"""关闭连接"""
if self.client_socket:
self.client_socket.close()
print("Connection closed")
# --------------------------
# 客户端使用示例
# --------------------------
if __name__ == "__main__":
# 创建客户端
client = MCPAIClient(host="localhost", port=8888)
# 连接服务端
if client.connect():
# 对话循环
while True:
user_input = input("\nYou: ")
if user_input.lower() in ["exit", "quit", "bye"]:
break
# 发送请求并获取响应
response = client.send_ai_request(user_input)
# 处理响应
if response:
if response["status"] == "success":
print(f"AI: {response['data']}")
else:
print(f"AI Error: {response['message']}")
else:
print("No response from AI server")
# 关闭连接
client.close()
代码使用说明
- 启动服务端:bash运行
python mcp_ai_server.py- 首次运行会自动下载 DistilGPT2 模型(约 350MB),请确保网络通畅
- 服务端启动后会监听
localhost:8888
- 启动客户端:bash运行
python mcp_ai_client.py- 客户端连接成功后,输入任意文本即可与 AI 对话
- 输入
exit/quit/bye退出对话
- 示例交互效果:plaintext
Connected to MCP AI Server at localhost:8888 You: Hello, how are you? AI: I'm doing well, thank you for asking! It's great to connect with you today. You: What is Python? AI: Python is a high-level, interpreted programming language known for its readability and simplicity. It's widely used in web development, data science, and artificial intelligence. You: exit Connection closed
核心代码解释
- AI 模型部分:
- 使用 DistilGPT2(GPT2 的轻量化版本),适合本地演示
_generate_ai_response方法处理提示词并生成回复,清理多余格式
- MCP 协议处理:
- 定义了 MCP 1.0 版本的请求 / 响应格式
process_mcp_request方法验证请求合法性并调用 AI 生成回复- 区分成功 / 错误响应,保证协议的规范性
- 网络通信:
- 服务端使用多线程处理多个客户端连接
- 客户端实现简单的请求发送和响应解析逻辑
- 包含基本的异常处理和超时机制
总结
- 本示例实现了基于 MCP 协议的 AI 对话服务,包含完整的服务端(提供 AI 能力)和客户端(发起请求)
- 核心逻辑:MCP 协议消息解析 → AI 模型生成回复 → MCP 协议响应返回
- 可扩展方向:支持更多 MCP 消息类型、集成更大的 LLM 模型、添加身份验证、实现 WebSocket 通信等
这个示例保持了 MCP 协议的轻量特性,同时提供了可运行的 AI 能力交互流程,适合作为 MCP 协议集成 AI 能力的入门参考。
正文完
可以使用微信扫码关注公众号(ID:xzluomor)