| #!/usr/bin/env python3 |
| """ |
| Telegram图片处理MCP工具服务器 |
| 提供analyze_telegram_image工具,用于读取和分析Telegram图片 |
| """ |
| |
| import base64 |
| import logging |
| import os |
| from pathlib import Path |
| from typing import Any |
| |
| from mcp.server import Server |
| from mcp.server.stdio import stdio_server |
| from mcp.types import Tool, TextContent, ImageContent |
| |
| logger = logging.getLogger(__name__) |
| |
| # 创建MCP服务器 |
| app = Server("telegram-image-tools") |
| |
| |
| @app.list_tools() |
| async def list_tools() -> list[Tool]: |
| """列出可用的工具""" |
| return [ |
| Tool( |
| name="analyze_telegram_image", |
| description="分析Telegram用户发送的图片。读取指定路径的图片文件,返回base64编码的图片内容供Claude分析。", |
| inputSchema={ |
| "type": "object", |
| "properties": { |
| "image_path": { |
| "type": "string", |
| "description": "图片文件的绝对路径" |
| }, |
| "caption": { |
| "type": "string", |
| "description": "用户发送图片时附带的文字说明(可选)" |
| } |
| }, |
| "required": ["image_path"] |
| } |
| ) |
| ] |
| |
| |
| @app.call_tool() |
| async def call_tool(name: str, arguments: Any) -> list[TextContent | ImageContent]: |
| """处理工具调用""" |
| if name != "analyze_telegram_image": |
| raise ValueError(f"Unknown tool: {name}") |
| |
| # 获取参数 |
| image_path = arguments.get("image_path") |
| caption = arguments.get("caption", "") |
| |
| if not image_path: |
| raise ValueError("Missing required parameter: image_path") |
| |
| # 检查文件是否存在 |
| if not os.path.exists(image_path): |
| return [ |
| TextContent( |
| type="text", |
| text=f"❌ 错误:图片文件不存在: {image_path}" |
| ) |
| ] |
| |
| try: |
| # 读取图片文件 |
| with open(image_path, "rb") as f: |
| image_data = f.read() |
| |
| # Base64编码 |
| encoded_image = base64.b64encode(image_data).decode("utf-8") |
| |
| # 检测MIME类型 |
| file_ext = Path(image_path).suffix.lower() |
| mime_type_map = { |
| ".jpg": "image/jpeg", |
| ".jpeg": "image/jpeg", |
| ".png": "image/png", |
| ".gif": "image/gif", |
| ".webp": "image/webp", |
| ".bmp": "image/bmp" |
| } |
| mime_type = mime_type_map.get(file_ext, "image/jpeg") |
| |
| # 构建返回内容(MCP标准格式) |
| result = [] |
| |
| # 添加说明文本 |
| if caption: |
| result.append( |
| TextContent( |
| type="text", |
| text=f"📸 用户发送了图片,附言:{caption}" |
| ) |
| ) |
| else: |
| result.append( |
| TextContent( |
| type="text", |
| text="📸 用户发送了图片,请分析图片内容并回复用户。" |
| ) |
| ) |
| |
| # 添加图片内容(MCP标准格式) |
| result.append( |
| ImageContent( |
| type="image", |
| data=encoded_image, |
| mimeType=mime_type |
| ) |
| ) |
| |
| logger.info(f"成功处理图片: {image_path}, MIME: {mime_type}, 大小: {len(encoded_image)} bytes") |
| |
| return result |
| |
| except Exception as e: |
| logger.error(f"处理图片失败: {e}") |
| return [ |
| TextContent( |
| type="text", |
| text=f"❌ 读取图片失败: {str(e)}" |
| ) |
| ] |
| |
| |
| async def main(): |
| """启动MCP服务器""" |
| async with stdio_server() as (read_stream, write_stream): |
| await app.run( |
| read_stream, |
| write_stream, |
| app.create_initialization_options() |
| ) |
| |
| |
| if __name__ == "__main__": |
| import asyncio |
| asyncio.run(main()) |