1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
| import anyio import click import httpx import mcp.types as types from mcp.server.lowlevel import Server from duckduckgo_search import DDGS
async def search_from_network( query: str, ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: """ 获取网络信息 """ for i in range(10): try: results = DDGS().text(query, max_results=2) break except Exception as e: print(f"Error occurred while searching: {e}") if i == 9: return [types.TextContent(type="text", text="Error occurred while searching")] await anyio.sleep(1) print(f'result={results}') if not results: return [types.TextContent(type="text", text="No results found")]
return [types.TextContent(type="text", text=results[0]['body']),types.TextContent(type="text", text=results[1]['body'])]
async def fetch_website( url: str, ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: headers = { "User-Agent": "MCP Test Server (github.com/modelcontextprotocol/python-sdk)" } async with httpx.AsyncClient(follow_redirects=True, headers=headers) as client: response = await client.get(url) response.raise_for_status() return [types.TextContent(type="text", text=response.text)]
@click.command() @click.option("--port", default=8000, help="Port to listen on for SSE") @click.option( "--transport", type=click.Choice(["stdio", "sse"]), default="stdio", help="Transport type", ) def main(port: int, transport: str) -> int: app = Server("mcp-demo")
@app.call_tool() async def fetch_tool( name: str, arguments: dict ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: if name == "fetch": if "url" not in arguments: raise ValueError("Missing required argument 'url'") return await fetch_website(arguments["url"]) elif name == "search": if "query" not in arguments: raise ValueError("Missing required argument 'query'") return await search_from_network(arguments["query"]) else: raise ValueError(f"Unknown tool: {name}")
@app.list_tools() async def list_tools() -> list[types.Tool]: return [ types.Tool( name="fetch", description="Fetches a website and returns its content", inputSchema={ "type": "object", "required": ["url"], "properties": { "url": { "type": "string", "description": "URL to fetch", } }, }, ), types.Tool( name="search", description="从网络查询信息", inputSchema={ "type": "object", "required": ["query"], "properties": { "query": { "type": "string", "description": "需要查询的内容", } }, }, ), ] if transport == "sse": from mcp.server.sse import SseServerTransport from starlette.applications import Starlette from starlette.routing import Mount, Route
sse = SseServerTransport("/sse/messages/")
async def handle_sse(request): async with sse.connect_sse( request.scope, request.receive, request._send ) as streams: await app.run( streams[0], streams[1], app.create_initialization_options() ) starlette_app = Starlette( debug=True, routes=[ Route("/sse", endpoint=handle_sse), Mount("/sse/messages/", app=sse.handle_post_message), ], )
import uvicorn
uvicorn.run(starlette_app, host="0.0.0.0", port=port) else: from mcp.server.stdio import stdio_server
async def arun(): async with stdio_server() as streams: await app.run( streams[0], streams[1], app.create_initialization_options() )
anyio.run(arun)
return 0
if __name__ == "__main__": main()
|