""" MCP Server - Model Context Protocol Server Fornisce metodi per LLM per connettersi e recuperare dati dalle infrastrutture """ import asyncio import json import logging from typing import Any, Dict, List, Optional from dataclasses import dataclass, asdict from datetime import datetime import os # Import per connessioni import paramiko from pysnmp.hlapi import * import requests from requests.auth import HTTPBasicAuth logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class ConnectionConfig: """Configurazione connessione""" type: str # ssh, snmp, api, database host: str port: int username: Optional[str] = None password: Optional[str] = None api_key: Optional[str] = None additional_params: Optional[Dict[str, Any]] = None @dataclass class CommandResult: """Risultato esecuzione comando""" success: bool output: Any error: Optional[str] = None timestamp: str = None duration_ms: int = 0 def __post_init__(self): if self.timestamp is None: self.timestamp = datetime.now().isoformat() class MCPServer: """ Model Context Protocol Server Espone metodi sicuri per LLM per accedere alle infrastrutture """ def __init__(self, config_file: str = "/app/config/mcp_config.json"): self.config_file = config_file self.connections: Dict[str, ConnectionConfig] = {} self.load_config() def load_config(self): """Carica configurazione connessioni""" if os.path.exists(self.config_file): with open(self.config_file, 'r') as f: config_data = json.load(f) for name, conn_data in config_data.get('connections', {}).items(): self.connections[name] = ConnectionConfig(**conn_data) logger.info(f"Loaded {len(self.connections)} connection configurations") # ===== SSH Methods ===== async def ssh_execute( self, connection_name: str, command: str, timeout: int = 30 ) -> CommandResult: """ Esegui comando SSH su device Args: connection_name: Nome connessione configurata command: Comando da eseguire timeout: Timeout in secondi Returns: CommandResult con output comando """ start_time = datetime.now() try: conn = self.connections.get(connection_name) if not conn or conn.type != 'ssh': return CommandResult( success=False, output=None, error=f"Connection {connection_name} not found or wrong type" ) # Esegui comando SSH ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh_client.connect( hostname=conn.host, port=conn.port, username=conn.username, password=conn.password, timeout=timeout ) stdin, stdout, stderr = ssh_client.exec_command(command, timeout=timeout) output = stdout.read().decode('utf-8') error = stderr.read().decode('utf-8') ssh_client.close() duration = int((datetime.now() - start_time).total_seconds() * 1000) return CommandResult( success=True if not error else False, output=output, error=error if error else None, duration_ms=duration ) except Exception as e: logger.error(f"SSH execute error: {e}") duration = int((datetime.now() - start_time).total_seconds() * 1000) return CommandResult( success=False, output=None, error=str(e), duration_ms=duration ) async def ssh_get_config( self, connection_name: str, config_commands: List[str] = None ) -> CommandResult: """ Recupera configurazione da device via SSH Default commands per diversi vendor: - Cisco: show running-config - HP: show running-config - Linux: cat /etc/... """ if config_commands is None: # Default per dispositivi di rete config_commands = [ "show running-config", "show version", "show interfaces status" ] outputs = {} for cmd in config_commands: result = await self.ssh_execute(connection_name, cmd) if result.success: outputs[cmd] = result.output return CommandResult( success=len(outputs) > 0, output=outputs, error=None if outputs else "No commands executed successfully" ) # ===== SNMP Methods ===== async def snmp_get( self, connection_name: str, oid: str ) -> CommandResult: """ SNMP GET su OID specifico Args: connection_name: Nome connessione SNMP oid: OID da queryare Returns: CommandResult con valore OID """ start_time = datetime.now() try: conn = self.connections.get(connection_name) if not conn or conn.type != 'snmp': return CommandResult( success=False, output=None, error=f"Connection {connection_name} not found or wrong type" ) community = conn.additional_params.get('community', 'public') iterator = getCmd( SnmpEngine(), CommunityData(community), UdpTransportTarget((conn.host, conn.port)), ContextData(), ObjectType(ObjectIdentity(oid)) ) errorIndication, errorStatus, errorIndex, varBinds = next(iterator) if errorIndication: return CommandResult( success=False, output=None, error=str(errorIndication) ) output = { 'oid': oid, 'value': str(varBinds[0][1]) } duration = int((datetime.now() - start_time).total_seconds() * 1000) return CommandResult( success=True, output=output, duration_ms=duration ) except Exception as e: logger.error(f"SNMP get error: {e}") duration = int((datetime.now() - start_time).total_seconds() * 1000) return CommandResult( success=False, output=None, error=str(e), duration_ms=duration ) async def snmp_walk( self, connection_name: str, oid: str, max_results: int = 100 ) -> CommandResult: """ SNMP WALK su OID tree Args: connection_name: Nome connessione SNMP oid: OID base max_results: Numero massimo risultati Returns: CommandResult con lista valori """ start_time = datetime.now() try: conn = self.connections.get(connection_name) if not conn or conn.type != 'snmp': return CommandResult( success=False, output=None, error=f"Connection {connection_name} not found or wrong type" ) community = conn.additional_params.get('community', 'public') results = [] for (errorIndication, errorStatus, errorIndex, varBinds) in nextCmd( SnmpEngine(), CommunityData(community), UdpTransportTarget((conn.host, conn.port)), ContextData(), ObjectType(ObjectIdentity(oid)), lexicographicMode=False, maxRows=max_results ): if errorIndication: break for varBind in varBinds: results.append({ 'oid': str(varBind[0]), 'value': str(varBind[1]) }) duration = int((datetime.now() - start_time).total_seconds() * 1000) return CommandResult( success=True, output=results, duration_ms=duration ) except Exception as e: logger.error(f"SNMP walk error: {e}") duration = int((datetime.now() - start_time).total_seconds() * 1000) return CommandResult( success=False, output=None, error=str(e), duration_ms=duration ) # ===== API Methods ===== async def api_request( self, connection_name: str, endpoint: str, method: str = "GET", data: Optional[Dict] = None, headers: Optional[Dict] = None ) -> CommandResult: """ Esegui richiesta API REST Args: connection_name: Nome connessione API endpoint: Endpoint relativo (es: /api/v1/vms) method: HTTP method data: Body request (per POST/PUT) headers: Headers addizionali Returns: CommandResult con response API """ start_time = datetime.now() try: conn = self.connections.get(connection_name) if not conn or conn.type != 'api': return CommandResult( success=False, output=None, error=f"Connection {connection_name} not found or wrong type" ) # Costruisci URL base_url = f"https://{conn.host}:{conn.port}" if conn.port != 443 else f"https://{conn.host}" url = f"{base_url}{endpoint}" # Headers req_headers = headers or {} if conn.api_key: req_headers['Authorization'] = f"Bearer {conn.api_key}" # Auth auth = None if conn.username and conn.password: auth = HTTPBasicAuth(conn.username, conn.password) # Request response = requests.request( method=method, url=url, json=data, headers=req_headers, auth=auth, verify=False, # Per ambienti interni timeout=30 ) duration = int((datetime.now() - start_time).total_seconds() * 1000) # Parse response try: output = response.json() except: output = response.text return CommandResult( success=response.status_code < 400, output={ 'status_code': response.status_code, 'data': output }, error=None if response.status_code < 400 else f"HTTP {response.status_code}", duration_ms=duration ) except Exception as e: logger.error(f"API request error: {e}") duration = int((datetime.now() - start_time).total_seconds() * 1000) return CommandResult( success=False, output=None, error=str(e), duration_ms=duration ) # ===== VMware Specific ===== async def vmware_get_vms(self, connection_name: str) -> CommandResult: """Recupera lista VM da vCenter""" return await self.api_request( connection_name=connection_name, endpoint="/rest/vcenter/vm", method="GET" ) async def vmware_get_hosts(self, connection_name: str) -> CommandResult: """Recupera lista host ESXi""" return await self.api_request( connection_name=connection_name, endpoint="/rest/vcenter/host", method="GET" ) async def vmware_get_datastores(self, connection_name: str) -> CommandResult: """Recupera lista datastore""" return await self.api_request( connection_name=connection_name, endpoint="/rest/vcenter/datastore", method="GET" ) # ===== Cisco Specific ===== async def cisco_get_interfaces(self, connection_name: str) -> CommandResult: """Ottieni status interfacce Cisco""" return await self.ssh_execute( connection_name=connection_name, command="show interfaces status" ) async def cisco_get_vlans(self, connection_name: str) -> CommandResult: """Ottieni configurazione VLAN""" return await self.ssh_execute( connection_name=connection_name, command="show vlan brief" ) # ===== UPS Specific ===== async def ups_get_status(self, connection_name: str) -> CommandResult: """Recupera status UPS via SNMP""" # UPS OIDs standard (RFC 1628) oids = { 'battery_status': '.1.3.6.1.2.1.33.1.2.1.0', 'battery_runtime': '.1.3.6.1.2.1.33.1.2.3.0', 'output_load': '.1.3.6.1.2.1.33.1.4.4.1.5.1' } results = {} for name, oid in oids.items(): result = await self.snmp_get(connection_name, oid) if result.success: results[name] = result.output['value'] return CommandResult( success=len(results) > 0, output=results, error=None if results else "Failed to retrieve UPS data" ) # ===== Utility Methods ===== async def test_connection(self, connection_name: str) -> CommandResult: """ Testa connessione """ conn = self.connections.get(connection_name) if not conn: return CommandResult( success=False, output=None, error=f"Connection {connection_name} not found" ) if conn.type == 'ssh': return await self.ssh_execute(connection_name, "echo 'test'") elif conn.type == 'snmp': return await self.snmp_get(connection_name, '.1.3.6.1.2.1.1.1.0') # sysDescr elif conn.type == 'api': return await self.api_request(connection_name, "/", method="GET") return CommandResult( success=False, output=None, error=f"Unknown connection type: {conn.type}" ) def get_available_methods(self) -> List[Dict[str, Any]]: """ Ritorna lista metodi disponibili per LLM """ methods = [ { "name": "ssh_execute", "description": "Execute SSH command on network device or server", "parameters": ["connection_name", "command", "timeout"], "example": "await mcp.ssh_execute('switch-core-01', 'show version')" }, { "name": "ssh_get_config", "description": "Retrieve device configuration via SSH", "parameters": ["connection_name", "config_commands"], "example": "await mcp.ssh_get_config('router-01')" }, { "name": "snmp_get", "description": "SNMP GET on specific OID", "parameters": ["connection_name", "oid"], "example": "await mcp.snmp_get('ups-01', '.1.3.6.1.2.1.33.1.2.1.0')" }, { "name": "snmp_walk", "description": "SNMP WALK on OID tree", "parameters": ["connection_name", "oid", "max_results"], "example": "await mcp.snmp_walk('switch-01', '.1.3.6.1.2.1.2.2')" }, { "name": "api_request", "description": "Execute REST API request", "parameters": ["connection_name", "endpoint", "method", "data", "headers"], "example": "await mcp.api_request('vcenter', '/rest/vcenter/vm', 'GET')" }, { "name": "vmware_get_vms", "description": "Get list of VMs from vCenter", "parameters": ["connection_name"], "example": "await mcp.vmware_get_vms('vcenter-prod')" }, { "name": "vmware_get_hosts", "description": "Get list of ESXi hosts", "parameters": ["connection_name"], "example": "await mcp.vmware_get_hosts('vcenter-prod')" }, { "name": "cisco_get_interfaces", "description": "Get Cisco switch interface status", "parameters": ["connection_name"], "example": "await mcp.cisco_get_interfaces('switch-core-01')" }, { "name": "ups_get_status", "description": "Get UPS status via SNMP", "parameters": ["connection_name"], "example": "await mcp.ups_get_status('ups-01')" } ] return methods # Singleton instance mcp_server = MCPServer() # FastAPI integration from fastapi import FastAPI, HTTPException from pydantic import BaseModel mcp_app = FastAPI( title="MCP Server API", description="Model Context Protocol Server - Infrastructure Connection Methods", version="1.0.0" ) class CommandRequest(BaseModel): connection_name: str command: Optional[str] = None oid: Optional[str] = None endpoint: Optional[str] = None method: Optional[str] = "GET" data: Optional[Dict] = None @mcp_app.get("/methods") async def list_methods(): """Lista tutti i metodi disponibili""" return mcp_server.get_available_methods() @mcp_app.get("/connections") async def list_connections(): """Lista connessioni configurate""" return { name: { 'type': conn.type, 'host': conn.host, 'port': conn.port } for name, conn in mcp_server.connections.items() } @mcp_app.post("/execute/ssh") async def execute_ssh(request: CommandRequest): """Esegui comando SSH""" if not request.command: raise HTTPException(status_code=400, detail="Command required") result = await mcp_server.ssh_execute( request.connection_name, request.command ) return asdict(result) @mcp_app.post("/execute/snmp/get") async def execute_snmp_get(request: CommandRequest): """Esegui SNMP GET""" if not request.oid: raise HTTPException(status_code=400, detail="OID required") result = await mcp_server.snmp_get( request.connection_name, request.oid ) return asdict(result) @mcp_app.post("/execute/api") async def execute_api(request: CommandRequest): """Esegui richiesta API""" if not request.endpoint: raise HTTPException(status_code=400, detail="Endpoint required") result = await mcp_server.api_request( request.connection_name, request.endpoint, request.method, request.data ) return asdict(result) @mcp_app.get("/test/{connection_name}") async def test_connection(connection_name: str): """Testa una connessione""" result = await mcp_server.test_connection(connection_name) return asdict(result) if __name__ == "__main__": import uvicorn uvicorn.run(mcp_app, host="0.0.0.0", port=8001)