Python 服务器案例
本页面展示了使用Python实现MCP的实际应用案例,帮助您理解如何利用Python构建强大的AI功能扩展。
为什么选择Python实现MCP
Python凭借其简洁的语法和丰富的生态系统,是实现MCP服务器的理想选择:
- 简单易用:Python的简洁语法让开发者能快速实现MCP功能
- 异步支持:通过
asyncio提供高效的异步处理能力 - 丰富的库:从数据处理到机器学习,Python生态系统提供全方位支持
- 跨平台兼容:可在各种操作系统上运行
1. 智能文档分析系统
场景描述
构建一个能够接收PDF文档、提取内容、分析关键信息并回答用户问题的MCP服务器。
MCP服务器实现
import os
import fitz # PyMuPDF
from mcp import Server, Tool, Resource, BadRequestError, ToolError
import spacy
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import json
# 创建MCP服务器
server = Server(
name="document-analyzer",
description="智能文档分析和问答服务",
version="1.0.0"
)
# 加载NLP模型
try:
nlp = spacy.load("zh_core_web_sm")
except OSError:
print("下载中文NLP模型...")
spacy.cli.download("zh_core_web_sm")
nlp = spacy.load("zh_core_web_sm")
# 文档存储
documents = {}
os.makedirs("data/docs", exist_ok=True)
@server.tool
async def extract_text_from_pdf(file_path: str) -> dict:
"""
从PDF文件中提取文本
Args:
file_path: PDF文件路径
Returns:
包含提取文本的字典
"""
if not os.path.exists(file_path):
raise BadRequestError(f"文件不存在: {file_path}")
if not file_path.lower().endswith('.pdf'):
raise BadRequestError("文件必须是PDF格式")
try:
doc = fitz.open(file_path)
text = ""
for page_num in range(len(doc)):
page = doc.load_page(page_num)
text += page.get_text()
# 将文档存储在内存中
doc_id = os.path.basename(file_path)
documents[doc_id] = {
"text": text,
"file_path": file_path,
"processed": False
}
# 保存文本到文件
text_file = os.path.join("data/docs", f"{doc_id}.txt")
with open(text_file, "w", encoding="utf-8") as f:
f.write(text)
return {
"doc_id": doc_id,
"page_count": len(doc),
"text_length": len(text),
"text_preview": text[:200] + "..." if len(text) > 200 else text,
"saved_to": text_file,
"status": "success"
}
except Exception as e:
raise ToolError(f"处理PDF时出错: {str(e)}")
@server.tool
async def analyze_document(doc_id: str) -> dict:
"""
分析文档,提取关键信息
Args:
doc_id: 文档ID
Returns:
包含分析结果的字典
"""
if doc_id not in documents:
raise BadRequestError(f"文档不存在: {doc_id}")
doc = documents[doc_id]
text = doc["text"]
# 使用spaCy进行NLP分析
nlp_doc = nlp(text[:100000]) # 限制处理长度以避免内存问题
# 提取实体
entities = []
for ent in nlp_doc.ents:
entities.append({
"text": ent.text,
"label": ent.label_,
"start": ent.start_char,
"end": ent.end_char
})
# 提取关键句子
sentences = [sent.text for sent in nlp_doc.sents]
# 使用TF-IDF找出最重要的句子
if len(sentences) > 1:
vectorizer = TfidfVectorizer(max_features=5000)
try:
tfidf_matrix = vectorizer.fit_transform(sentences)
sentence_scores = tfidf_matrix.sum(axis=1).A1
# 获取前5个最重要的句子
top_sentence_indices = sentence_scores.argsort()[-5:][::-1]
key_sentences = [sentences[i] for i in top_sentence_indices]
except:
# 如果向量化失败,选择前5个句子
key_sentences = sentences[:5]
else:
key_sentences = sentences
# 更新文档处理状态
doc["processed"] = True
doc["entities"] = entities
doc["key_sentences"] = key_sentences
# 保存分析结果
analysis_file = os.path.join("data/docs", f"{doc_id}_analysis.json")
with open(analysis_file, "w", encoding="utf-8") as f:
json.dump({
"key_sentences": key_sentences,
"entity_count": len(entities),
"top_entities": entities[:20] # 只保存前20个实体
}, f, ensure_ascii=False, indent=2)
return {
"doc_id": doc_id,
"entity_count": len(entities),
"entity_preview": entities[:5],
"key_sentences": key_sentences,
"analysis_saved_to": analysis_file,
"status": "success"
}
@server.tool
async def answer_question(doc_id: str, question: str) -> dict:
"""
回答关于文档的问题
Args:
doc_id: 文档ID
question: 用户问题
Returns:
包含回答的字典
"""
if doc_id not in documents:
raise BadRequestError(f"文档不存在: {doc_id}")
doc = documents[doc_id]
if not doc["processed"]:
# 如果文档尚未分析,先进行分析
await analyze_document(doc_id)
text = doc["text"]
# 将文档分成段落
paragraphs = text.split("\n\n")
paragraphs = [p for p in paragraphs if len(p.strip()) > 0]
if len(paragraphs) == 0:
return {
"answer": "文档没有有效内容,无法回答问题。",
"relevance_score": 0.0
}
# 使用TF-IDF计算问题和段落的相似度
all_texts = [question] + paragraphs
vectorizer = TfidfVectorizer()
try:
tfidf_matrix = vectorizer.fit_transform(all_texts)
# 问题的向量
question_vector = tfidf_matrix[0:1]
# 段落向量
paragraph_vectors = tfidf_matrix[1:]
# 计算相似度
similarities = cosine_similarity(question_vector, paragraph_vectors).flatten()
# 获取最相关的3个段落
top_paragraph_indices = similarities.argsort()[-3:][::-1]
relevant_paragraphs = [paragraphs[i] for i in top_paragraph_indices]
relevance_scores = [float(similarities[i]) for i in top_paragraph_indices]
# 构建回答
context = "\n\n".join(relevant_paragraphs)
return {
"answer": f"根据文档内容,以下信息与您的问题最相关:\n\n{context}",
"relevance_score": relevance_scores[0],
"context_paragraphs": relevant_paragraphs,
"status": "success"
}
except:
# 如果向量化失败,返回文档前3个段落
return {
"answer": f"无法计算相关性,这里是文档的开头内容:\n\n{''.join(paragraphs[:3])}",
"relevance_score": 0.0,
"status": "partial_success"
}
@server.resource(path="/documents")
async def documents_resource():
"""可用文档列表"""
doc_list = []
for doc_id, doc_info in documents.items():
doc_list.append({
"id": doc_id,
"path": doc_info["file_path"],
"processed": doc_info["processed"],
"text_length": len(doc_info["text"])
})
return doc_list
@server.prompt
def document_question():
"""提示用户询问文档问题"""
return """
我想询问关于一份文档的问题:
文档ID:[文档的ID,通常是文件名]
我的问题是:[在此输入您的问题]
请基于文档内容回答我的问题,并引用相关的原文作为依据。
"""
if __name__ == "__main__":
server.run()
使用体验
用户可以上传PDF文档,然后提问如:
- "这份合同的主要条款是什么?"
- "这篇论文的研究方法有哪些?"
- "这份报告中的关键发现是什么?"
MCP服务器会分析文档,找出与问题最相关的内容,并给出回答。
与Claude Desktop集成
{
"mcpServers": {
"docAnalyzer": {
"command": "python",
"args": ["path/to/document_analyzer.py"],
"env": {}
}
}
}
2. 实时数据可视化助手
场景描述
创建一个能够加载数据、生成数据可视化并实时返回给用户的MCP服务器。
MCP服务器实现
import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import io
import base64
from mcp import Server, Tool, Resource, BadRequestError
import json
from typing import Optional, List
# 创建MCP服务器
server = Server(
name="data-visualizer",
description="数据可视化和分析服务",
version="1.0.0"
)
# 临时数据存储
datasets = {}
os.makedirs("data/visualizations", exist_ok=True)
@server.tool
async def load_csv_data(file_path: str, dataset_name: str) -> dict:
"""
加载CSV数据
Args:
file_path: CSV文件路径
dataset_name: 数据集名称
Returns:
包含数据集信息的字典
"""
if not os.path.exists(file_path):
raise BadRequestError(f"文件不存在: {file_path}")
if not file_path.lower().endswith('.csv'):
raise BadRequestError("文件必须是CSV格式")
try:
df = pd.read_csv(file_path)
# 存储数据集
datasets[dataset_name] = df
# 数据集描述
summary = {
"dataset_name": dataset_name,
"row_count": len(df),
"column_count": len(df.columns),
"columns": df.columns.tolist(),
"column_types": {col: str(df[col].dtype) for col in df.columns},
"status": "success"
}
# 保存数据集信息
info_file = os.path.join("data/visualizations", f"{dataset_name}_info.json")
with open(info_file, "w", encoding="utf-8") as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
return summary
except Exception as e:
raise BadRequestError(f"加载CSV数据时出错: {str(e)}")
@server.tool
async def get_data_summary(dataset_name: str) -> dict:
"""
获取数据摘要
Args:
dataset_name: 数据集名称
Returns:
包含数据摘要的字典
"""
if dataset_name not in datasets:
raise BadRequestError(f"数据集不存在: {dataset_name}")
df = datasets[dataset_name]
# 计算数值列的统计摘要
numeric_cols = df.select_dtypes(include=['number']).columns.tolist()
if numeric_cols:
numeric_summary = df[numeric_cols].describe().to_dict()
else:
numeric_summary = {}
# 计算分类列的唯一值计数
categorical_cols = df.select_dtypes(exclude=['number']).columns.tolist()
categorical_summary = {}
for col in categorical_cols[:5]: # 只处理前5个分类列以避免过大
value_counts = df[col].value_counts().head(10).to_dict() # 只包含前10个值
categorical_summary[col] = value_counts
summary = {
"dataset_name": dataset_name,
"numeric_summary": numeric_summary,
"categorical_summary": categorical_summary,
"missing_values": df.isnull().sum().to_dict(),
"status": "success"
}
# 保存摘要
summary_file = os.path.join("data/visualizations", f"{dataset_name}_summary.json")
with open(summary_file, "w", encoding="utf-8") as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
return summary
@server.tool
async def create_visualization(
dataset_name: str,
chart_type: str,
x_column: str,
y_column: Optional[str] = None,
hue_column: Optional[str] = None,
title: Optional[str] = None,
file_name: Optional[str] = None
) -> dict:
"""
创建数据可视化
Args:
dataset_name: 数据集名称
chart_type: 图表类型 (bar, line, scatter, histogram, boxplot, heatmap, pie)
x_column: X轴列名
y_column: Y轴列名 (某些图表类型可选)
hue_column: 用于分组的列名 (可选)
title: 图表标题 (可选)
file_name: 输出文件名 (可选)
Returns:
包含可视化图像的字典
"""
if dataset_name not in datasets:
raise BadRequestError(f"数据集不存在: {dataset_name}")
df = datasets[dataset_name]
if x_column not in df.columns:
raise BadRequestError(f"列 '{x_column}' 不存在")
if y_column and y_column not in df.columns:
raise BadRequestError(f"列 '{y_column}' 不存在")
if hue_column and hue_column not in df.columns:
raise BadRequestError(f"列 '{hue_column}' 不存在")
# 设置中文字体支持
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
plt.figure(figsize=(10, 6))
try:
if chart_type == "bar":
if y_column:
ax = sns.barplot(x=x_column, y=y_column, hue=hue_column, data=df)
else:
# 如果没有提供y_column,使用计数
ax = sns.countplot(x=x_column, hue=hue_column, data=df)
elif chart_type == "line":
if not y_column:
raise BadRequestError("line图表需要y_column")
ax = sns.lineplot(x=x_column, y=y_column, hue=hue_column, data=df)
elif chart_type == "scatter":
if not y_column:
raise BadRequestError("scatter图表需要y_column")
ax = sns.scatterplot(x=x_column, y=y_column, hue=hue_column, data=df)
elif chart_type == "histogram":
ax = sns.histplot(data=df, x=x_column, hue=hue_column)
elif chart_type == "boxplot":
if y_column:
ax = sns.boxplot(x=x_column, y=y_column, hue=hue_column, data=df)
else:
ax = sns.boxplot(x=x_column, data=df)
elif chart_type == "heatmap":
if not y_column:
raise BadRequestError("heatmap图表需要y_column")
# 将数据透视为矩阵
pivot_data = df.pivot_table(index=x_column, columns=y_column, aggfunc='size', fill_value=0)
ax = sns.heatmap(pivot_data, annot=True, cmap="YlGnBu")
elif chart_type == "pie":
if not y_column:
# 使用计数作为值
counts = df[x_column].value_counts()
plt.pie(counts.values, labels=counts.index, autopct='%1.1f%%')
else:
# 根据y_column进行聚合
grouped = df.groupby(x_column)[y_column].sum()
plt.pie(grouped.values, labels=grouped.index, autopct='%1.1f%%')
ax = plt.gca()
else:
raise BadRequestError(f"不支持的图表类型: {chart_type}")
# 设置标题
if title:
plt.title(title)
else:
plt.title(f"{chart_type.capitalize()} of {x_column}" + (f" vs {y_column}" if y_column else ""))
# 处理长标签
if hasattr(ax, 'set_xticklabels'):
ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha='right')
# 调整布局
plt.tight_layout()
# 保存图表
if not file_name:
file_name = f"{dataset_name}_{chart_type}_{x_column}" + (f"_{y_column}" if y_column else "") + ".png"
file_path = os.path.join("data/visualizations", file_name)
plt.savefig(file_path, dpi=300)
# 将图表转换为base64编码的图像
buffer = io.BytesIO()
plt.savefig(buffer, format='png')
buffer.seek(0)
image_base64 = base64.b64encode(buffer.read()).decode('utf-8')
plt.close()
return {
"image": f"data:image/png;base64,{image_base64}",
"chart_type": chart_type,
"dataset": dataset_name,
"title": title,
"file_path": file_path,
"status": "success"
}
except Exception as e:
plt.close()
raise BadRequestError(f"创建可视化时出错: {str(e)}")
@server.resource(path="/datasets")
async def datasets_resource():
"""可用数据集列表"""
dataset_list = []
for name, df in datasets.items():
dataset_list.append({
"name": name,
"rows": len(df),
"columns": df.columns.tolist()
})
return dataset_list
@server.prompt
def data_visualization_request():
"""提示用户请求数据可视化"""
return """
我想基于以下数据集创建可视化:
数据集名称:[已加载的数据集名称]
我需要的图表类型是:[bar/line/scatter/histogram/boxplot/heatmap/pie]
请使用以下列进行可视化:
X轴列名:[X轴使用的列]
Y轴列名:[Y轴使用的列,某些图表类型可选]
分组列名:[用于分组的列,可选]
图表标题:[图表标题,可选]
请生成可视化并解释图表显示的主要趋势或模式。
"""
if __name__ == "__main__":
server.run()
使用体验
用户可以请求:
- "加载我的sales_data.csv数据并命名为'销售数据'"
- "生成一个销售数据中月份与销售额的条形图"
- "创建一个按地区分组的销售表现散点图"
MCP服务器会处理数据并生成可视化图表,用户可以直接查看图像结果和分析。