""" Sample MCP Weather Server Demonstrates integration with external APIs and real-world MCP server implementation """ import asyncio import json import logging from datetime import datetime, timedelta from typing import Dict, List, Any, Optional import aiohttp import structlog # Configure structured logging logging.basicConfig(level=logging.INFO) logger = structlog.get_logger() class WeatherAPI: """Mock weather API for demonstration purposes.""" def __init__(self): # Mock weather data for different locations self.mock_data = { "New York": { "temperature": 22, "humidity": 65, "conditions": "Partly Cloudy", "wind_speed": 8.5, "wind_direction": "NW", "pressure": 1013.2, "visibility": 10.0, "uv_index": 6, "forecast": [ {"date": "2024-11-30", "high": 24, "low": 18, "conditions": "Sunny"}, {"date": "2024-12-01", "high": 20, "low": 15, "conditions": "Cloudy"}, {"date": "2024-12-02", "high": 23, "low": 19, "conditions": "Partly Cloudy"} ] }, "London": { "temperature": 15, "humidity": 78, "conditions": "Rainy", "wind_speed": 12.3, "wind_direction": "SW", "pressure": 1008.5, "visibility": 8.0, "uv_index": 2, "forecast": [ {"date": "2024-11-30", "high": 17, "low": 12, "conditions": "Rainy"}, {"date": "2024-12-01", "high": 14, "low": 10, "conditions": "Overcast"}, {"date": "2024-12-02", "high": 16, "low": 11, "conditions": "Drizzle"} ] }, "Tokyo": { "temperature": 18, "humidity": 72, "conditions": "Clear", "wind_speed": 5.2, "wind_direction": "NE", "pressure": 1016.8, "visibility": 12.0, "uv_index": 4, "forecast": [ {"date": "2024-11-30", "high": 21, "low": 16, "conditions": "Clear"}, {"date": "2024-12-01", "high": 19, "low": 14, "conditions": "Partly Cloudy"}, {"date": "2024-12-02", "high": 22, "low": 17, "conditions": "Sunny"} ] } } async def get_current_weather(self, location: str) -> Dict[str, Any]: """Get current weather for a location.""" # Simulate API delay await asyncio.sleep(0.1) location = location.strip().title() if location in self.mock_data: data = self.mock_data[location].copy() data["location"] = location data["timestamp"] = datetime.utcnow().isoformat() return data else: raise ValueError(f"Weather data not available for location: {location}") async def get_forecast(self, location: str, days: int = 3) -> Dict[str, Any]: """Get weather forecast for a location.""" await asyncio.sleep(0.15) location = location.strip().title() if location in self.mock_data: forecast_data = self.mock_data[location]["forecast"][:days] return { "location": location, "forecast": forecast_data, "generated_at": datetime.utcnow().isoformat() } else: raise ValueError(f"Forecast data not available for location: {location}") async def search_locations(self, query: str) -> List[str]: """Search for available locations.""" await asyncio.sleep(0.05) query = query.lower() locations = [] for location in self.mock_data.keys(): if query in location.lower(): locations.append(location) return locations class WeatherMCPServer: """Model Context Protocol Weather Server implementation.""" def __init__(self, port: int = 8001): self.port = port self.api = WeatherAPI() self.tools = { "get_current_weather": { "name": "get_current_weather", "description": "Get current weather conditions for a specified location", "inputSchema": { "type": "object", "properties": { "location": { "type": "string", "description": "City or location name (e.g., 'New York', 'London', 'Tokyo')", "minLength": 1, "maxLength": 100 } }, "required": ["location"] }, "outputSchema": { "type": "object", "properties": { "location": {"type": "string"}, "temperature": {"type": "number", "description": "Temperature in Celsius"}, "humidity": {"type": "number", "description": "Humidity percentage"}, "conditions": {"type": "string", "description": "Weather conditions"}, "wind_speed": {"type": "number", "description": "Wind speed in km/h"}, "wind_direction": {"type": "string", "description": "Wind direction"}, "pressure": {"type": "number", "description": "Atmospheric pressure in hPa"}, "visibility": {"type": "number", "description": "Visibility in km"}, "uv_index": {"type": "number", "description": "UV index"}, "timestamp": {"type": "string", "description": "ISO timestamp"} } }, "examples": [ {"location": "New York"}, {"location": "London"}, {"location": "Tokyo"} ], "tags": ["weather", "current", "conditions"] }, "get_weather_forecast": { "name": "get_weather_forecast", "description": "Get weather forecast for a specified location and number of days", "inputSchema": { "type": "object", "properties": { "location": { "type": "string", "description": "City or location name", "minLength": 1, "maxLength": 100 }, "days": { "type": "integer", "description": "Number of forecast days (1-7)", "minimum": 1, "maximum": 7, "default": 3 } }, "required": ["location"] }, "outputSchema": { "type": "object", "properties": { "location": {"type": "string"}, "forecast": { "type": "array", "items": { "type": "object", "properties": { "date": {"type": "string"}, "high": {"type": "number", "description": "High temperature"}, "low": {"type": "number", "description": "Low temperature"}, "conditions": {"type": "string"} } } }, "generated_at": {"type": "string"} } }, "examples": [ {"location": "New York", "days": 5}, {"location": "London", "days": 3} ], "tags": ["weather", "forecast", "planning"] }, "search_weather_locations": { "name": "search_weather_locations", "description": "Search for available weather locations", "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "Search query for location names", "minLength": 1, "maxLength": 50 } }, "required": ["query"] }, "outputSchema": { "type": "object", "properties": { "locations": { "type": "array", "items": {"type": "string"}, "description": "List of matching locations" } } }, "examples": [ {"query": "New"}, {"query": "London"} ], "tags": ["search", "locations", "discovery"] } } async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """Handle incoming MCP requests.""" method = request.get("method") params = request.get("params", {}) request_id = request.get("id") try: if method == "tools/list": return await self._handle_tools_list(request_id) elif method == "tools/call": return await self._handle_tools_call(request_id, params) elif method == "initialize": return await self._handle_initialize(request_id, params) else: return self._error_response(request_id, -32601, "Method not found") except Exception as e: logger.error("Request handling failed", method=method, error=str(e)) return self._error_response(request_id, -32603, f"Internal error: {str(e)}") async def _handle_tools_list(self, request_id: str) -> Dict[str, Any]: """Handle tools/list request.""" tools_list = [] for tool_name, tool_info in self.tools.items(): tools_list.append({ "name": tool_info["name"], "description": tool_info["description"], "inputSchema": tool_info["inputSchema"], "outputSchema": tool_info.get("outputSchema"), "examples": tool_info.get("examples", []), "tags": tool_info.get("tags", []) }) return { "jsonrpc": "2.0", "id": request_id, "result": { "tools": tools_list } } async def _handle_tools_call(self, request_id: str, params: Dict[str, Any]) -> Dict[str, Any]: """Handle tools/call request.""" tool_name = params.get("name") arguments = params.get("arguments", {}) if tool_name not in self.tools: return self._error_response(request_id, -32601, f"Tool '{tool_name}' not found") # Execute the tool try: if tool_name == "get_current_weather": result = await self._get_current_weather(arguments) elif tool_name == "get_weather_forecast": result = await self._get_weather_forecast(arguments) elif tool_name == "search_weather_locations": result = await self._search_weather_locations(arguments) else: return self._error_response(request_id, -32601, f"Tool '{tool_name}' not implemented") return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": json.dumps(result, indent=2) } ] } } except Exception as e: logger.error("Tool execution failed", tool=tool_name, error=str(e)) return self._error_response(request_id, -32603, f"Tool execution failed: {str(e)}") async def _handle_initialize(self, request_id: str, params: Dict[str, Any]) -> Dict[str, Any]: """Handle initialize request.""" return { "jsonrpc": "2.0", "id": request_id, "result": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "serverInfo": { "name": "Weather MCP Server", "version": "1.0.0", "description": "Provides weather information and forecasts" } } } async def _get_current_weather(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute get_current_weather tool.""" location = arguments.get("location") if not location: raise ValueError("Location is required") return await self.api.get_current_weather(location) async def _get_weather_forecast(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute get_weather_forecast tool.""" location = arguments.get("location") days = arguments.get("days", 3) if not location: raise ValueError("Location is required") if not isinstance(days, int) or days < 1 or days > 7: raise ValueError("Days must be an integer between 1 and 7") return await self.api.get_forecast(location, days) async def _search_weather_locations(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute search_weather_locations tool.""" query = arguments.get("query") if not query: raise ValueError("Query is required") locations = await self.api.search_locations(query) return {"locations": locations} def _error_response(self, request_id: str, code: int, message: str) -> Dict[str, Any]: """Create error response.""" return { "jsonrpc": "2.0", "id": request_id, "error": { "code": code, "message": message } } async def start_server(self): """Start the MCP server.""" from aiohttp import web, ClientTimeout import aiohttp_cors # Create aiohttp application app = web.Application() # Add CORS support cors = aiohttp_cors.setup(app, defaults={ "*": aiohttp_cors.ResourceOptions( allow_credentials=True, expose_headers="*", allow_headers="*", allow_methods="*" ) }) # Add MCP endpoint async def mcp_handler(request): try: data = await request.json() result = await self.handle_request(data) return web.json_response(result) except json.JSONDecodeError: return web.json_response({ "jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": "Parse error"} }, status=400) except Exception as e: logger.error("Request processing failed", error=str(e)) return web.json_response({ "jsonrpc": "2.0", "id": None, "error": {"code": -32603, "message": "Internal error"} }, status=500) # Add routes app.router.add_post('/mcp', mcp_handler) app.router.add_get('/health', self._health_check) # Configure CORS for all routes for route in list(app.router.routes()): cors.add(route) # Start server runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, 'localhost', self.port) await site.start() logger.info(f"Weather MCP Server started on port {self.port}") return runner async def _health_check(self, request): """Health check endpoint.""" return web.json_response({ "status": "healthy", "service": "Weather MCP Server", "version": "1.0.0", "timestamp": datetime.utcnow().isoformat() }) async def stop_server(self, runner): """Stop the MCP server.""" await runner.cleanup() async def main(): """Main entry point for the weather server.""" server = WeatherMCPServer() runner = await server.start_server() try: # Keep server running while True: await asyncio.sleep(1) except KeyboardInterrupt: logger.info("Shutting down weather server...") await server.stop_server(runner) if __name__ == "__main__": asyncio.run(main())