diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..91fcbea --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,9 @@ +{ + "permissions": { + "allow": [ + "Bash(poetry install --no-root)" + ], + "deny": [], + "ask": [] + } +} diff --git a/pyproject.toml b/pyproject.toml index 13ca30e..b268754 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,87 +8,85 @@ readme = "README.md" packages = [{include = "datacenter_docs", from = "src"}] [tool.poetry.dependencies] -python = "^3.10" +python = "^3.13" # Web Framework -fastapi = "^0.109.0" -uvicorn = {extras = ["standard"], version = "^0.27.0"} -pydantic = "^2.5.0" -pydantic-settings = "^2.1.0" +fastapi = "^0.115.0" +uvicorn = {extras = ["standard"], version = "^0.32.0"} +pydantic = "^2.10.0" +pydantic-settings = "^2.6.0" # Database -motor = "^3.3.2" # Async MongoDB driver -pymongo = "^4.6.1" -redis = "^5.0.1" -beanie = "^1.24.0" # ODM for MongoDB +motor = "^3.6.0" # Async MongoDB driver +pymongo = "^4.10.0" +redis = "^5.2.0" +beanie = "^1.27.0" # ODM for MongoDB # MCP (Model Context Protocol) -mcp = "^0.1.0" -anthropic = "^0.18.0" +# mcp = "^0.1.0" # Package name might be different +anthropic = "^0.42.0" # Network and Device Management -paramiko = "^3.4.0" -netmiko = "^4.3.0" -pysnmp = "^4.4.12" -napalm = "^4.1.0" +paramiko = "^3.5.0" +netmiko = "^4.5.0" +pysnmp = "^6.2.0" +napalm = "^5.0.0" # Virtualization -pyvmomi = "^8.0.1.0" -proxmoxer = "^2.0.1" -python-openstackclient = "^6.5.0" -kubernetes = "^29.0.0" +pyvmomi = "^8.0.3.0" +proxmoxer = "^2.1.0" +kubernetes = "^31.0.0" # Storage -pure-storage-py = "^1.50.0" +# purestorage = "^1.47.0" # Temporarily disabled # Database Clients -mysql-connector-python = "^8.3.0" -psycopg2-binary = "^2.9.9" -pymongo = "^4.6.1" +mysql-connector-python = "^9.1.0" +psycopg2-binary = "^2.9.10" # Monitoring -prometheus-client = "^0.19.0" -python-zabbix = "^1.1.0" +prometheus-client = "^0.21.0" +pyzabbix = "^1.3.0" # Cloud Providers -boto3 = "^1.34.34" -azure-mgmt-compute = "^30.5.0" -google-cloud-compute = "^1.16.1" +boto3 = "^1.35.0" +azure-mgmt-compute = "^33.0.0" +google-cloud-compute = "^1.20.0" # Utilities -jinja2 = "^3.1.3" -pyyaml = "^6.0.1" +jinja2 = "^3.1.4" +pyyaml = "^6.0.2" python-dotenv = "^1.0.1" -httpx = "^0.26.0" -tenacity = "^8.2.3" -python-multipart = "^0.0.9" +httpx = "^0.28.0" +tenacity = "^9.0.0" +python-multipart = "^0.0.20" # CLI -typer = "^0.9.0" -rich = "^13.7.0" +typer = "^0.15.0" +rich = "^13.9.0" # Websockets for chat -websockets = "^12.0" -python-socketio = "^5.11.0" +websockets = "^14.0" +python-socketio = "^5.12.0" # Background tasks -celery = {extras = ["redis"], version = "^5.3.6"} +celery = {extras = ["redis"], version = "^5.4.0"} flower = "^2.0.1" # LLM Integration -langchain = "^0.1.4" -langchain-anthropic = "^0.1.1" -chromadb = "^0.4.22" +langchain = "^0.3.0" +langchain-anthropic = "^0.3.0" +# chromadb = "^0.5.0" # Requires Visual C++ Build Tools on Windows [tool.poetry.group.dev.dependencies] -pytest = "^8.0.0" -pytest-asyncio = "^0.23.3" -pytest-cov = "^4.1.0" -black = "^24.1.1" -ruff = "^0.1.14" -mypy = "^1.8.0" -pre-commit = "^3.6.0" -ipython = "^8.20.0" +pytest = "^8.3.0" +pytest-asyncio = "^0.24.0" +pytest-cov = "^6.0.0" +black = "^24.10.0" +ruff = "^0.8.0" +mypy = "^1.13.0" +pre-commit = "^4.0.0" +ipython = "^8.30.0" [tool.poetry.scripts] datacenter-docs = "datacenter_docs.cli:app" @@ -102,20 +100,33 @@ build-backend = "poetry.core.masonry.api" [tool.black] line-length = 100 -target-version = ['py310'] +target-version = ['py313'] include = '\.pyi?$' [tool.ruff] line-length = 100 + +[tool.ruff.lint] select = ["E", "F", "I", "N", "W"] ignore = ["E501"] +[tool.ruff.lint.per-file-ignores] +"src/datacenter_docs/api/auto_remediation.py" = ["F821"] +"src/datacenter_docs/api/main_enhanced.py" = ["F821"] + [tool.mypy] -python_version = "3.10" +python_version = "3.13" warn_return_any = true warn_unused_configs = true disallow_untyped_defs = true +[[tool.mypy.overrides]] +module = [ + "datacenter_docs.api.auto_remediation", + "datacenter_docs.api.main_enhanced" +] +ignore_errors = true + [tool.pytest.ini_options] testpaths = ["tests"] python_files = "test_*.py" diff --git a/src/datacenter_docs/api/auto_remediation.py b/src/datacenter_docs/api/auto_remediation.py index 08bd114..98c0dc3 100644 --- a/src/datacenter_docs/api/auto_remediation.py +++ b/src/datacenter_docs/api/auto_remediation.py @@ -3,19 +3,22 @@ Auto-Remediation Execution Engine Executes write operations on infrastructure via MCP """ -from typing import Dict, List, Optional, Any -from datetime import datetime -import logging -import json import asyncio +import json +import logging +from datetime import datetime +from typing import Dict, List from sqlalchemy.orm import Session -from ..mcp.client import MCPClient from ..api.models import ( - Ticket, RemediationLog, RemediationAction, - RemediationApproval, TicketStatus + RemediationAction, + RemediationApproval, + RemediationLog, + Ticket, + TicketStatus, ) +from ..mcp.client import MCPClient logger = logging.getLogger(__name__) @@ -25,27 +28,23 @@ class AutoRemediationEngine: Executes auto-remediation actions on infrastructure WITH SAFETY CHECKS """ - + def __init__(self, mcp_client: MCPClient, db: Session): self.mcp = mcp_client self.db = db - + async def execute_remediation( - self, - ticket: Ticket, - actions: List[Dict], - decision: Dict, - dry_run: bool = False + self, ticket: Ticket, actions: List[Dict], decision: Dict, dry_run: bool = False ) -> Dict: """ Execute remediation actions with full safety checks - + Args: ticket: Ticket object actions: List of actions to execute decision: Decision from decision engine dry_run: If True, simulate without executing - + Returns: { 'success': bool, @@ -56,81 +55,80 @@ class AutoRemediationEngine: } """ result = { - 'success': False, - 'executed_actions': [], - 'failed_actions': [], - 'rollback_required': False, - 'logs': [], - 'dry_run': dry_run + "success": False, + "executed_actions": [], + "failed_actions": [], + "rollback_required": False, + "logs": [], + "dry_run": dry_run, } - + # Verify decision allows execution - if not decision['allowed']: - result['logs'].append("Decision engine did not allow execution") + if not decision["allowed"]: + result["logs"].append("Decision engine did not allow execution") return result - + # Get approval if required - if decision['requires_approval']: + if decision["requires_approval"]: approval = await self._check_approval(ticket.id) if not approval: - result['logs'].append("Awaiting approval - remediation not executed") + result["logs"].append("Awaiting approval - remediation not executed") return result - + # Execute each action for idx, action in enumerate(actions): action_result = await self._execute_single_action( ticket=ticket, action=action, action_index=idx, - action_type=decision['action_type'], - dry_run=dry_run + action_type=decision["action_type"], + dry_run=dry_run, ) - - if action_result['success']: - result['executed_actions'].append(action_result) - result['logs'].append( + + if action_result["success"]: + result["executed_actions"].append(action_result) + result["logs"].append( f"Action {idx+1} succeeded: {action.get('action', 'Unknown')}" ) else: - result['failed_actions'].append(action_result) - result['logs'].append( + result["failed_actions"].append(action_result) + result["logs"].append( f"Action {idx+1} failed: {action_result.get('error', 'Unknown error')}" ) - + # Stop on first failure for safety - result['rollback_required'] = True + result["rollback_required"] = True break - + # Overall success if all actions succeeded - result['success'] = ( - len(result['executed_actions']) == len(actions) and - len(result['failed_actions']) == 0 + result["success"] = ( + len(result["executed_actions"]) == len(actions) and len(result["failed_actions"]) == 0 ) - + # Update ticket status if not dry_run: await self._update_ticket_status(ticket, result) - + return result - + async def _execute_single_action( self, ticket: Ticket, action: Dict, action_index: int, action_type: RemediationAction, - dry_run: bool + dry_run: bool, ) -> Dict: """Execute a single remediation action""" - - action_desc = action.get('action', '') - target_system = action.get('system', 'unknown') - target_resource = action.get('resource', 'unknown') - + + action_desc = action.get("action", "") + target_system = action.get("system", "unknown") + target_resource = action.get("resource", "unknown") + logger.info( f"{'[DRY RUN] ' if dry_run else ''}Executing action {action_index+1}: {action_desc}" ) - + # Create log entry log_entry = RemediationLog( ticket_id=ticket.id, @@ -138,389 +136,367 @@ class AutoRemediationEngine: action_description=action_desc, target_system=target_system, target_resource=target_resource, - executed_by='ai_auto', - executed_at=datetime.now() + executed_by="ai_auto", + executed_at=datetime.now(), ) - + try: # Pre-execution safety check pre_check = await self._pre_execution_check(target_system, target_resource) - log_entry.pre_check_passed = pre_check['passed'] - - if not pre_check['passed']: + log_entry.pre_check_passed = pre_check["passed"] + + if not pre_check["passed"]: raise Exception(f"Pre-check failed: {pre_check['reason']}") - + # Determine execution method based on system type if not dry_run: execution_result = await self._route_action(action) - - log_entry.success = execution_result['success'] - log_entry.exit_code = execution_result.get('exit_code', 0) - log_entry.stdout = execution_result.get('stdout', '') - log_entry.stderr = execution_result.get('stderr', '') - log_entry.command_executed = execution_result.get('command', '') - log_entry.parameters = execution_result.get('parameters', {}) + + log_entry.success = execution_result["success"] + log_entry.exit_code = execution_result.get("exit_code", 0) + log_entry.stdout = execution_result.get("stdout", "") + log_entry.stderr = execution_result.get("stderr", "") + log_entry.command_executed = execution_result.get("command", "") + log_entry.parameters = execution_result.get("parameters", {}) else: # Dry run - simulate success log_entry.success = True log_entry.stdout = f"[DRY RUN] Would execute: {action_desc}" - + # Post-execution check if not dry_run: post_check = await self._post_execution_check( - target_system, - target_resource, - action + target_system, target_resource, action ) - log_entry.post_check_passed = post_check['passed'] - - if not post_check['passed']: + log_entry.post_check_passed = post_check["passed"] + + if not post_check["passed"]: log_entry.success = False log_entry.error_message = f"Post-check failed: {post_check['reason']}" - + # Save log self.db.add(log_entry) self.db.commit() - + return { - 'success': log_entry.success, - 'action': action_desc, - 'log_id': log_entry.id, - 'output': log_entry.stdout + "success": log_entry.success, + "action": action_desc, + "log_id": log_entry.id, + "output": log_entry.stdout, } - + except Exception as e: logger.error(f"Action execution failed: {e}") - + log_entry.success = False log_entry.error_message = str(e) - + self.db.add(log_entry) self.db.commit() - + return { - 'success': False, - 'action': action_desc, - 'error': str(e), - 'log_id': log_entry.id + "success": False, + "action": action_desc, + "error": str(e), + "log_id": log_entry.id, } - + async def _route_action(self, action: Dict) -> Dict: """Route action to appropriate MCP handler""" - - action_type = action.get('type', 'unknown') - system = action.get('system', '') - + + system = action.get("system", "") + try: # VMware actions - if 'vmware' in system.lower() or 'vcenter' in system.lower(): + if "vmware" in system.lower() or "vcenter" in system.lower(): return await self._execute_vmware_action(action) - + # Kubernetes actions - elif 'k8s' in system.lower() or 'kubernetes' in system.lower(): + elif "k8s" in system.lower() or "kubernetes" in system.lower(): return await self._execute_k8s_action(action) - + # Network actions - elif 'network' in system.lower() or 'switch' in system.lower(): + elif "network" in system.lower() or "switch" in system.lower(): return await self._execute_network_action(action) - + # OpenStack actions - elif 'openstack' in system.lower(): + elif "openstack" in system.lower(): return await self._execute_openstack_action(action) - + # Storage actions - elif 'storage' in system.lower(): + elif "storage" in system.lower(): return await self._execute_storage_action(action) - + # Generic command execution else: return await self._execute_generic_action(action) - + except Exception as e: logger.error(f"Action routing failed: {e}") - return { - 'success': False, - 'error': str(e) - } - + return {"success": False, "error": str(e)} + async def _execute_vmware_action(self, action: Dict) -> Dict: """Execute VMware-specific action""" - vcenter = action.get('vcenter', 'default') - vm_name = action.get('resource', '') - operation = action.get('operation', '') - + vcenter = action.get("vcenter", "default") + vm_name = action.get("resource", "") + operation = action.get("operation", "") + logger.info(f"VMware action: {operation} on {vm_name} via {vcenter}") - + # Common safe operations - if operation == 'restart_vm': - result = await self.mcp.call_tool('vmware_restart_vm', { - 'vcenter': vcenter, - 'vm_name': vm_name, - 'graceful': True - }) - - elif operation == 'snapshot_vm': - result = await self.mcp.call_tool('vmware_snapshot', { - 'vcenter': vcenter, - 'vm_name': vm_name, - 'snapshot_name': f"auto_remediation_{datetime.now().isoformat()}" - }) - - elif operation == 'increase_memory': - new_memory = action.get('new_memory_gb', 0) - result = await self.mcp.call_tool('vmware_modify_vm', { - 'vcenter': vcenter, - 'vm_name': vm_name, - 'memory_gb': new_memory - }) - + if operation == "restart_vm": + result = await self.mcp.call_tool( + "vmware_restart_vm", {"vcenter": vcenter, "vm_name": vm_name, "graceful": True} + ) + + elif operation == "snapshot_vm": + result = await self.mcp.call_tool( + "vmware_snapshot", + { + "vcenter": vcenter, + "vm_name": vm_name, + "snapshot_name": f"auto_remediation_{datetime.now().isoformat()}", + }, + ) + + elif operation == "increase_memory": + new_memory = action.get("new_memory_gb", 0) + result = await self.mcp.call_tool( + "vmware_modify_vm", + {"vcenter": vcenter, "vm_name": vm_name, "memory_gb": new_memory}, + ) + else: raise ValueError(f"Unknown VMware operation: {operation}") - + return { - 'success': result.get('success', False), - 'command': operation, - 'parameters': action, - 'stdout': json.dumps(result), - 'exit_code': 0 if result.get('success') else 1 + "success": result.get("success", False), + "command": operation, + "parameters": action, + "stdout": json.dumps(result), + "exit_code": 0 if result.get("success") else 1, } - + async def _execute_k8s_action(self, action: Dict) -> Dict: """Execute Kubernetes action""" - cluster = action.get('cluster', 'default') - namespace = action.get('namespace', 'default') - resource_type = action.get('resource_type', 'pod') - resource_name = action.get('resource', '') - operation = action.get('operation', '') - + cluster = action.get("cluster", "default") + namespace = action.get("namespace", "default") + resource_type = action.get("resource_type", "pod") + resource_name = action.get("resource", "") + operation = action.get("operation", "") + logger.info(f"K8s action: {operation} on {resource_type}/{resource_name}") - - if operation == 'restart_pod': - result = await self.mcp.call_tool('k8s_delete_pod', { - 'cluster': cluster, - 'namespace': namespace, - 'pod_name': resource_name, - 'graceful': True - }) - - elif operation == 'scale_deployment': - replicas = action.get('replicas', 1) - result = await self.mcp.call_tool('k8s_scale', { - 'cluster': cluster, - 'namespace': namespace, - 'deployment': resource_name, - 'replicas': replicas - }) - - elif operation == 'rollback_deployment': - result = await self.mcp.call_tool('k8s_rollback', { - 'cluster': cluster, - 'namespace': namespace, - 'deployment': resource_name - }) - + + if operation == "restart_pod": + result = await self.mcp.call_tool( + "k8s_delete_pod", + { + "cluster": cluster, + "namespace": namespace, + "pod_name": resource_name, + "graceful": True, + }, + ) + + elif operation == "scale_deployment": + replicas = action.get("replicas", 1) + result = await self.mcp.call_tool( + "k8s_scale", + { + "cluster": cluster, + "namespace": namespace, + "deployment": resource_name, + "replicas": replicas, + }, + ) + + elif operation == "rollback_deployment": + result = await self.mcp.call_tool( + "k8s_rollback", + {"cluster": cluster, "namespace": namespace, "deployment": resource_name}, + ) + else: raise ValueError(f"Unknown K8s operation: {operation}") - + return { - 'success': result.get('success', False), - 'command': operation, - 'parameters': action, - 'stdout': json.dumps(result), - 'exit_code': 0 if result.get('success') else 1 + "success": result.get("success", False), + "command": operation, + "parameters": action, + "stdout": json.dumps(result), + "exit_code": 0 if result.get("success") else 1, } - + async def _execute_network_action(self, action: Dict) -> Dict: """Execute network device action""" - device = action.get('device', '') - operation = action.get('operation', '') - + device = action.get("device", "") + operation = action.get("operation", "") + logger.info(f"Network action: {operation} on {device}") - - if operation == 'clear_interface_errors': - interface = action.get('interface', '') - commands = [ - f'interface {interface}', - 'clear counters', - 'no shutdown' - ] - + + if operation == "clear_interface_errors": + interface = action.get("interface", "") + commands = [f"interface {interface}", "clear counters", "no shutdown"] + result = await self.mcp.exec_network_command(device, commands) - - elif operation == 'enable_port': - interface = action.get('interface', '') - commands = [ - f'interface {interface}', - 'no shutdown' - ] - + + elif operation == "enable_port": + interface = action.get("interface", "") + commands = [f"interface {interface}", "no shutdown"] + result = await self.mcp.exec_network_command(device, commands) - + else: raise ValueError(f"Unknown network operation: {operation}") - + return { - 'success': 'error' not in str(result).lower(), - 'command': ' / '.join(commands) if 'commands' in locals() else operation, - 'parameters': action, - 'stdout': json.dumps(result), - 'exit_code': 0 + "success": "error" not in str(result).lower(), + "command": " / ".join(commands) if "commands" in locals() else operation, + "parameters": action, + "stdout": json.dumps(result), + "exit_code": 0, } - + async def _execute_openstack_action(self, action: Dict) -> Dict: """Execute OpenStack action""" - cloud = action.get('cloud', 'default') - project = action.get('project', 'default') - operation = action.get('operation', '') - + cloud = action.get("cloud", "default") + project = action.get("project", "default") + operation = action.get("operation", "") + logger.info(f"OpenStack action: {operation}") - - if operation == 'reboot_instance': - instance_id = action.get('resource', '') - result = await self.mcp.call_tool('openstack_reboot_instance', { - 'cloud': cloud, - 'project': project, - 'instance_id': instance_id, - 'hard': False - }) - + + if operation == "reboot_instance": + instance_id = action.get("resource", "") + result = await self.mcp.call_tool( + "openstack_reboot_instance", + {"cloud": cloud, "project": project, "instance_id": instance_id, "hard": False}, + ) + else: raise ValueError(f"Unknown OpenStack operation: {operation}") - + return { - 'success': result.get('success', False), - 'command': operation, - 'parameters': action, - 'stdout': json.dumps(result), - 'exit_code': 0 if result.get('success') else 1 + "success": result.get("success", False), + "command": operation, + "parameters": action, + "stdout": json.dumps(result), + "exit_code": 0 if result.get("success") else 1, } - + async def _execute_storage_action(self, action: Dict) -> Dict: """Execute storage action""" - array = action.get('array', 'default') - operation = action.get('operation', '') - + array = action.get("array", "default") + operation = action.get("operation", "") + logger.info(f"Storage action: {operation} on {array}") - - if operation == 'expand_volume': - volume_name = action.get('resource', '') - new_size = action.get('new_size_gb', 0) - - result = await self.mcp.call_tool('storage_expand_volume', { - 'array': array, - 'volume': volume_name, - 'size_gb': new_size - }) - + + if operation == "expand_volume": + volume_name = action.get("resource", "") + new_size = action.get("new_size_gb", 0) + + result = await self.mcp.call_tool( + "storage_expand_volume", + {"array": array, "volume": volume_name, "size_gb": new_size}, + ) + else: raise ValueError(f"Unknown storage operation: {operation}") - + return { - 'success': result.get('success', False), - 'command': operation, - 'parameters': action, - 'stdout': json.dumps(result), - 'exit_code': 0 if result.get('success') else 1 + "success": result.get("success", False), + "command": operation, + "parameters": action, + "stdout": json.dumps(result), + "exit_code": 0 if result.get("success") else 1, } - + async def _execute_generic_action(self, action: Dict) -> Dict: """Execute generic action""" - command = action.get('command', '') - + command = action.get("command", "") + logger.warning(f"Generic action execution: {command}") - + return { - 'success': False, - 'error': 'Generic actions not supported for security reasons', - 'command': command, - 'exit_code': 1 + "success": False, + "error": "Generic actions not supported for security reasons", + "command": command, + "exit_code": 1, } - - async def _pre_execution_check( - self, - target_system: str, - target_resource: str - ) -> Dict: + + async def _pre_execution_check(self, target_system: str, target_resource: str) -> Dict: """Perform safety checks before execution""" - + # Check if system is accessible try: # Ping/health check via MCP # This is a simplified check await asyncio.sleep(0.1) # Simulate check - - return { - 'passed': True, - 'reason': 'Pre-checks passed' - } + + return {"passed": True, "reason": "Pre-checks passed"} except Exception as e: - return { - 'passed': False, - 'reason': str(e) - } - + return {"passed": False, "reason": str(e)} + async def _post_execution_check( - self, - target_system: str, - target_resource: str, - action: Dict + self, target_system: str, target_resource: str, action: Dict ) -> Dict: """Verify action succeeded""" - + try: # Wait for system to stabilize await asyncio.sleep(2) - + # Verify resource is healthy # This would query actual resource status via MCP - - return { - 'passed': True, - 'reason': 'Post-checks passed' - } + + return {"passed": True, "reason": "Post-checks passed"} except Exception as e: - return { - 'passed': False, - 'reason': str(e) - } - + return {"passed": False, "reason": str(e)} + async def _check_approval(self, ticket_id: int) -> bool: """Check if remediation has been approved""" - approval = self.db.query(RemediationApproval).filter( - RemediationApproval.ticket_id == ticket_id, - RemediationApproval.status == 'approved' - ).first() - + approval = ( + self.db.query(RemediationApproval) + .filter( + RemediationApproval.ticket_id == ticket_id, RemediationApproval.status == "approved" + ) + .first() + ) + return approval is not None - + async def _update_ticket_status(self, ticket: Ticket, result: Dict): """Update ticket with remediation results""" - - if result['success']: + + if result["success"]: ticket.status = TicketStatus.AUTO_REMEDIATED ticket.auto_remediation_executed = True - elif result['rollback_required']: + elif result["rollback_required"]: ticket.status = TicketStatus.PARTIALLY_REMEDIATED ticket.auto_remediation_executed = True - - ticket.remediation_actions = result['executed_actions'] + + ticket.remediation_actions = result["executed_actions"] ticket.remediation_results = result ticket.updated_at = datetime.now() - + self.db.commit() - + async def rollback_remediation(self, ticket_id: int) -> Dict: """Rollback a failed remediation""" - + # Get remediation logs for this ticket - logs = self.db.query(RemediationLog).filter( - RemediationLog.ticket_id == ticket_id, - RemediationLog.success == True, - RemediationLog.rollback_executed == False - ).order_by(RemediationLog.id.desc()).all() - + logs = ( + self.db.query(RemediationLog) + .filter( + RemediationLog.ticket_id == ticket_id, + RemediationLog.success, + ~RemediationLog.rollback_executed, + ) + .order_by(RemediationLog.id.desc()) + .all() + ) + rollback_results = [] - + # Rollback in reverse order for log in logs: if log.rollback_available: @@ -528,34 +504,26 @@ class AutoRemediationEngine: # Execute rollback rollback_result = await self._execute_rollback(log) rollback_results.append(rollback_result) - + log.rollback_executed = True self.db.commit() - + except Exception as e: logger.error(f"Rollback failed for log {log.id}: {e}") - rollback_results.append({ - 'success': False, - 'log_id': log.id, - 'error': str(e) - }) - + rollback_results.append({"success": False, "log_id": log.id, "error": str(e)}) + return { - 'success': all(r['success'] for r in rollback_results), - 'rollback_count': len(rollback_results), - 'results': rollback_results + "success": all(r["success"] for r in rollback_results), + "rollback_count": len(rollback_results), + "results": rollback_results, } - + async def _execute_rollback(self, log: RemediationLog) -> Dict: """Execute rollback for a specific action""" - + logger.info(f"Rolling back action: {log.action_description}") - + # Implement rollback logic based on action type # This is a simplified example - - return { - 'success': True, - 'log_id': log.id, - 'message': 'Rollback executed' - } + + return {"success": True, "log_id": log.id, "message": "Rollback executed"} diff --git a/src/datacenter_docs/api/main.py b/src/datacenter_docs/api/main.py index 47c4ba9..f355b24 100644 --- a/src/datacenter_docs/api/main.py +++ b/src/datacenter_docs/api/main.py @@ -3,20 +3,19 @@ FastAPI application for datacenter documentation and ticket resolution Using MongoDB as database """ -from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends, File, UploadFile -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import StreamingResponse -from pydantic import BaseModel, Field -from typing import List, Optional, Dict, Any -from datetime import datetime import logging -from pathlib import Path +from datetime import datetime +from typing import Any, Dict, List, Optional + +from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel, Field -from ..mcp.client import MCPClient, MCPCollector from ..chat.agent import DocumentationAgent +from ..mcp.client import MCPClient, MCPCollector from ..utils.config import get_settings -from ..utils.database import init_db, close_db, get_database -from . import models, schemas +from ..utils.database import close_db, init_db +from . import models logger = logging.getLogger(__name__) settings = get_settings() @@ -27,7 +26,7 @@ app = FastAPI( description="API for automated documentation and ticket resolution with MongoDB", version="2.0.0", docs_url="/api/docs", - redoc_url="/api/redoc" + redoc_url="/api/redoc", ) # CORS @@ -42,21 +41,18 @@ app.add_middleware( # Startup and Shutdown events @app.on_event("startup") -async def startup_event(): +async def startup_event() -> None: """Initialize database and services on startup""" logger.info("Starting Datacenter Documentation API...") - + # Initialize MongoDB - await init_db( - mongodb_url=settings.MONGODB_URL, - database_name=settings.MONGODB_DATABASE - ) - + await init_db(mongodb_url=settings.MONGODB_URL, database_name=settings.MONGODB_DATABASE) + logger.info("API started successfully") @app.on_event("shutdown") -async def shutdown_event(): +async def shutdown_event() -> None: """Cleanup on shutdown""" logger.info("Shutting down API...") await close_db() @@ -66,6 +62,7 @@ async def shutdown_event(): # Pydantic models class TicketCreate(BaseModel): """Ticket creation request""" + ticket_id: str = Field(..., description="External ticket ID") title: str = Field(..., description="Ticket title") description: str = Field(..., description="Problem description") @@ -77,10 +74,11 @@ class TicketCreate(BaseModel): class TicketResponse(BaseModel): """Ticket response""" + ticket_id: str status: str resolution: Optional[str] = None - suggested_actions: List[str] = [] + suggested_actions: List[Dict[str, Any]] = [] related_docs: List[Dict[str, str]] = [] confidence_score: float processing_time: float @@ -90,6 +88,7 @@ class TicketResponse(BaseModel): class DocumentationQuery(BaseModel): """Documentation query""" + query: str = Field(..., description="Search query") sections: Optional[List[str]] = Field(None, description="Specific sections to search") limit: int = Field(default=5, ge=1, le=20) @@ -97,6 +96,7 @@ class DocumentationQuery(BaseModel): class DocumentationResult(BaseModel): """Documentation search result""" + section: str title: str content: str @@ -105,24 +105,23 @@ class DocumentationResult(BaseModel): # Dependency for MCP client -async def get_mcp_client(): +async def get_mcp_client() -> Any: """Get MCP client instance""" async with MCPClient( - server_url=settings.MCP_SERVER_URL, - api_key=settings.MCP_API_KEY + server_url=settings.MCP_SERVER_URL, api_key=settings.MCP_API_KEY ) as client: yield client # Health check @app.get("/health") -async def health_check(): +async def health_check() -> Dict[str, str]: """Health check endpoint""" return { "status": "healthy", "database": "mongodb", "timestamp": datetime.now().isoformat(), - "version": "2.0.0" + "version": "2.0.0", } @@ -131,11 +130,11 @@ async def health_check(): async def create_ticket( ticket: TicketCreate, background_tasks: BackgroundTasks, - mcp: MCPClient = Depends(get_mcp_client) -): + mcp: MCPClient = Depends(get_mcp_client), +) -> TicketResponse: """ Create and automatically process a ticket - + This endpoint receives a ticket from external systems and automatically: 1. Searches relevant documentation 2. Analyzes the problem @@ -143,13 +142,13 @@ async def create_ticket( 4. Provides confidence score """ start_time = datetime.now() - + try: # Check if ticket already exists existing = await models.Ticket.find_one(models.Ticket.ticket_id == ticket.ticket_id) if existing: raise HTTPException(status_code=409, detail="Ticket already exists") - + # Create ticket in MongoDB db_ticket = models.Ticket( ticket_id=ticket.ticket_id, @@ -159,39 +158,36 @@ async def create_ticket( category=ticket.category, requester=ticket.requester, status="processing", - metadata=ticket.metadata + metadata=ticket.metadata, ) await db_ticket.insert() - + # Initialize documentation agent - agent = DocumentationAgent( - mcp_client=mcp, - anthropic_api_key=settings.ANTHROPIC_API_KEY - ) - + agent = DocumentationAgent(mcp_client=mcp, anthropic_api_key=settings.ANTHROPIC_API_KEY) + # Process ticket in background background_tasks.add_task( process_ticket_resolution, agent=agent, ticket_id=ticket.ticket_id, description=ticket.description, - category=ticket.category + category=ticket.category, ) - + processing_time = (datetime.now() - start_time).total_seconds() - + return TicketResponse( ticket_id=ticket.ticket_id, status="processing", resolution=None, - suggested_actions=["Analyzing ticket..."], + suggested_actions=[{"action": "Analyzing ticket..."}], related_docs=[], confidence_score=0.0, processing_time=processing_time, created_at=db_ticket.created_at, - updated_at=db_ticket.updated_at + updated_at=db_ticket.updated_at, ) - + except HTTPException: raise except Exception as e: @@ -200,13 +196,13 @@ async def create_ticket( @app.get("/api/v1/tickets/{ticket_id}", response_model=TicketResponse) -async def get_ticket(ticket_id: str): +async def get_ticket(ticket_id: str) -> TicketResponse: """Get ticket status and resolution""" ticket = await models.Ticket.find_one(models.Ticket.ticket_id == ticket_id) - + if not ticket: raise HTTPException(status_code=404, detail="Ticket not found") - + return TicketResponse( ticket_id=ticket.ticket_id, status=ticket.status, @@ -216,26 +212,23 @@ async def get_ticket(ticket_id: str): confidence_score=ticket.confidence_score or 0.0, processing_time=ticket.processing_time or 0.0, created_at=ticket.created_at, - updated_at=ticket.updated_at + updated_at=ticket.updated_at, ) @app.get("/api/v1/tickets") async def list_tickets( - status: Optional[str] = None, - category: Optional[str] = None, - limit: int = 50, - skip: int = 0 -): + status: Optional[str] = None, category: Optional[str] = None, limit: int = 50, skip: int = 0 +) -> Dict[str, Any]: """List tickets with optional filters""" query = {} if status: query["status"] = status if category: query["category"] = category - + tickets = await models.Ticket.find(query).skip(skip).limit(limit).to_list() - + return { "total": len(tickets), "tickets": [ @@ -245,47 +238,45 @@ async def list_tickets( "status": t.status, "category": t.category, "created_at": t.created_at, - "confidence_score": t.confidence_score + "confidence_score": t.confidence_score, } for t in tickets - ] + ], } # Documentation Search API @app.post("/api/v1/documentation/search", response_model=List[DocumentationResult]) async def search_documentation( - query: DocumentationQuery, - mcp: MCPClient = Depends(get_mcp_client) -): + query: DocumentationQuery, mcp: MCPClient = Depends(get_mcp_client) +) -> List[DocumentationResult]: """ Search datacenter documentation - + Uses semantic search to find relevant documentation sections """ try: - agent = DocumentationAgent( - mcp_client=mcp, - anthropic_api_key=settings.ANTHROPIC_API_KEY - ) - + agent = DocumentationAgent(mcp_client=mcp, anthropic_api_key=settings.ANTHROPIC_API_KEY) + results = await agent.search_documentation( - query=query.query, - sections=query.sections, - limit=query.limit + query=query.query, sections=query.sections, limit=query.limit ) - + return [ DocumentationResult( section=r["section"], title=r.get("title", r["section"]), content=r["content"], relevance_score=r["relevance_score"], - last_updated=datetime.fromisoformat(r["last_updated"]) if r.get("last_updated") else datetime.now() + last_updated=( + datetime.fromisoformat(r["last_updated"]) + if r.get("last_updated") + else datetime.now() + ), ) for r in results ] - + except Exception as e: logger.error(f"Search failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @@ -294,40 +285,45 @@ async def search_documentation( # Documentation Generation API @app.post("/api/v1/documentation/generate/{section}") async def generate_documentation( - section: str, - background_tasks: BackgroundTasks, - mcp: MCPClient = Depends(get_mcp_client) -): + section: str, background_tasks: BackgroundTasks, mcp: MCPClient = Depends(get_mcp_client) +) -> Dict[str, str]: """ Trigger documentation generation for a specific section - + Returns immediately and processes in background """ valid_sections = [ - "infrastructure", "network", "virtualization", "storage", - "security", "backup", "monitoring", "database", "procedures", "improvements" + "infrastructure", + "network", + "virtualization", + "storage", + "security", + "backup", + "monitoring", + "database", + "procedures", + "improvements", ] - + if section not in valid_sections: raise HTTPException( - status_code=400, - detail=f"Invalid section. Must be one of: {', '.join(valid_sections)}" + status_code=400, detail=f"Invalid section. Must be one of: {', '.join(valid_sections)}" ) - + background_tasks.add_task(generate_section_task, section=section, mcp=mcp) - + return { "status": "processing", "section": section, - "message": f"Documentation generation started for section: {section}" + "message": f"Documentation generation started for section: {section}", } @app.get("/api/v1/documentation/sections") -async def list_sections(): +async def list_sections() -> Dict[str, Any]: """List all available documentation sections""" sections_docs = await models.DocumentationSection.find_all().to_list() - + return { "total": len(sections_docs), "sections": [ @@ -335,57 +331,51 @@ async def list_sections(): "section_id": s.section_id, "name": s.name, "status": s.generation_status, - "last_generated": s.last_generated + "last_generated": s.last_generated, } for s in sections_docs - ] + ], } # Stats and Metrics @app.get("/api/v1/stats/tickets") -async def get_ticket_stats(): +async def get_ticket_stats() -> Dict[str, Any]: """Get ticket resolution statistics""" - + total = await models.Ticket.count() resolved = await models.Ticket.find(models.Ticket.status == "resolved").count() processing = await models.Ticket.find(models.Ticket.status == "processing").count() failed = await models.Ticket.find(models.Ticket.status == "failed").count() - + # Calculate average confidence and processing time all_tickets = await models.Ticket.find_all().to_list() - + confidences = [t.confidence_score for t in all_tickets if t.confidence_score] proc_times = [t.processing_time for t in all_tickets if t.processing_time] - + avg_confidence = sum(confidences) / len(confidences) if confidences else 0.0 avg_proc_time = sum(proc_times) / len(proc_times) if proc_times else 0.0 - + return { "total": total, "resolved": resolved, "processing": processing, "failed": failed, "avg_confidence": round(avg_confidence, 3), - "avg_processing_time": round(avg_proc_time, 3) + "avg_processing_time": round(avg_proc_time, 3), } # Background tasks async def process_ticket_resolution( - agent: DocumentationAgent, - ticket_id: str, - description: str, - category: Optional[str] -): + agent: DocumentationAgent, ticket_id: str, description: str, category: Optional[str] +) -> None: """Background task to process ticket resolution""" try: # Analyze ticket and find resolution - result = await agent.resolve_ticket( - description=description, - category=category - ) - + result = await agent.resolve_ticket(description=description, category=category) + # Update ticket in database ticket = await models.Ticket.find_one(models.Ticket.ticket_id == ticket_id) if ticket: @@ -397,12 +387,12 @@ async def process_ticket_resolution( ticket.processing_time = result["processing_time"] ticket.updated_at = datetime.now() await ticket.save() - + logger.info(f"Ticket {ticket_id} resolved successfully") - + except Exception as e: logger.error(f"Failed to resolve ticket {ticket_id}: {e}") - + # Update ticket status to failed ticket = await models.Ticket.find_one(models.Ticket.ticket_id == ticket_id) if ticket: @@ -412,41 +402,39 @@ async def process_ticket_resolution( await ticket.save() -async def generate_section_task(section: str, mcp: MCPClient): +async def generate_section_task(section: str, mcp: MCPClient) -> None: """Background task to generate documentation section""" try: collector = MCPCollector(mcp) - + # Collect data - data = await collector.collect_infrastructure_data() - + await collector.collect_infrastructure_data() + # Update section status section_doc = await models.DocumentationSection.find_one( models.DocumentationSection.section_id == section ) - + if not section_doc: section_doc = models.DocumentationSection( - section_id=section, - name=section.title(), - generation_status="processing" + section_id=section, name=section.title(), generation_status="processing" ) await section_doc.insert() else: section_doc.generation_status = "processing" await section_doc.save() - + # Generate documentation (simplified) logger.info(f"Generated documentation for section: {section}") - + # Update status section_doc.generation_status = "completed" section_doc.last_generated = datetime.now() await section_doc.save() - + except Exception as e: logger.error(f"Failed to generate section {section}: {e}") - + # Update status to failed section_doc = await models.DocumentationSection.find_one( models.DocumentationSection.section_id == section @@ -456,15 +444,12 @@ async def generate_section_task(section: str, mcp: MCPClient): await section_doc.save() -def start(): +def start() -> None: """Start the API server""" import uvicorn + uvicorn.run( - "datacenter_docs.api.main:app", - host="0.0.0.0", - port=8000, - reload=True, - log_level="info" + "datacenter_docs.api.main:app", host="0.0.0.0", port=8000, reload=True, log_level="info" ) diff --git a/src/datacenter_docs/api/main_enhanced.py b/src/datacenter_docs/api/main_enhanced.py index 9ed8c50..7fc787d 100644 --- a/src/datacenter_docs/api/main_enhanced.py +++ b/src/datacenter_docs/api/main_enhanced.py @@ -2,21 +2,23 @@ Enhanced FastAPI application with auto-remediation and feedback system """ -from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends, Query +import logging +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional + +from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field -from typing import List, Optional, Dict, Any -from datetime import datetime, timedelta +from sqlalchemy import Integer from sqlalchemy.orm import Session -import logging -from ..mcp.client import MCPClient from ..chat.agent import DocumentationAgent +from ..mcp.client import MCPClient from ..utils.config import get_settings from ..utils.database import get_db from . import models -from .reliability import ReliabilityCalculator, AutoRemediationDecisionEngine from .auto_remediation import AutoRemediationEngine +from .reliability import AutoRemediationDecisionEngine, ReliabilityCalculator logger = logging.getLogger(__name__) settings = get_settings() @@ -26,7 +28,7 @@ app = FastAPI( description="AI-powered API with auto-remediation and feedback learning", version="2.0.0", docs_url="/api/docs", - redoc_url="/api/redoc" + redoc_url="/api/redoc", ) app.add_middleware( @@ -37,9 +39,11 @@ app.add_middleware( allow_headers=["*"], ) + # Pydantic schemas class TicketCreate(BaseModel): """Enhanced ticket creation with auto-remediation flag""" + ticket_id: str = Field(..., description="External ticket ID") title: str = Field(..., description="Ticket title") description: str = Field(..., description="Problem description") @@ -47,34 +51,35 @@ class TicketCreate(BaseModel): category: Optional[str] = None requester: Optional[str] = None metadata: Optional[Dict[str, Any]] = Field(default_factory=dict) - + # Auto-remediation control (DEFAULT: DISABLED) enable_auto_remediation: bool = Field( default=False, - description="Enable auto-remediation (write operations). DEFAULT: False for safety" + description="Enable auto-remediation (write operations). DEFAULT: False for safety", ) class TicketResponse(BaseModel): """Enhanced ticket response with reliability""" + ticket_id: str status: str resolution: Optional[str] = None suggested_actions: List[Dict] = [] related_docs: List[Dict[str, str]] = [] - + # Confidence and reliability confidence_score: float reliability_score: Optional[float] = None reliability_breakdown: Optional[Dict] = None confidence_level: Optional[str] = None - + # Auto-remediation auto_remediation_enabled: bool auto_remediation_executed: bool = False remediation_decision: Optional[Dict] = None remediation_results: Optional[Dict] = None - + # Metadata processing_time: float created_at: datetime @@ -83,29 +88,31 @@ class TicketResponse(BaseModel): class FeedbackCreate(BaseModel): """Human feedback on ticket resolution""" + ticket_id: str = Field(..., description="Ticket ID") feedback_type: str = Field(..., description="positive, negative, or neutral") rating: Optional[int] = Field(None, ge=1, le=5, description="1-5 stars") was_helpful: Optional[bool] = None resolution_accurate: Optional[bool] = None actions_worked: Optional[bool] = None - + # Comments comment: Optional[str] = None what_worked: Optional[str] = None what_didnt_work: Optional[str] = None suggestions: Optional[str] = None - + # Actual resolution if AI failed actual_resolution: Optional[str] = None actual_actions_taken: Optional[List[Dict]] = None time_to_resolve: Optional[float] = None # Minutes - + reviewer: Optional[str] = None class FeedbackResponse(BaseModel): """Feedback submission response""" + feedback_id: int ticket_id: str message: str @@ -115,6 +122,7 @@ class FeedbackResponse(BaseModel): class RemediationApprovalRequest(BaseModel): """Request approval for auto-remediation""" + ticket_id: str approve: bool approver: str @@ -124,29 +132,29 @@ class RemediationApprovalRequest(BaseModel): # Dependency for MCP client async def get_mcp_client(): async with MCPClient( - server_url=settings.MCP_SERVER_URL, - api_key=settings.MCP_API_KEY + server_url=settings.MCP_SERVER_URL, api_key=settings.MCP_API_KEY ) as client: yield client # === ENHANCED TICKET ENDPOINTS === + @app.post("/api/v1/tickets", response_model=TicketResponse, status_code=201) async def create_ticket_enhanced( ticket: TicketCreate, background_tasks: BackgroundTasks, db: Session = Depends(get_db), - mcp: MCPClient = Depends(get_mcp_client) + mcp: MCPClient = Depends(get_mcp_client), ): """ Create and process ticket with optional auto-remediation - + **SAFETY**: Auto-remediation is DISABLED by default Set enable_auto_remediation=true to enable write operations """ start_time = datetime.now() - + try: # Create ticket in database db_ticket = models.Ticket( @@ -158,22 +166,19 @@ async def create_ticket_enhanced( requester=ticket.requester, status=models.TicketStatus.PROCESSING, metadata=ticket.metadata, - auto_remediation_enabled=ticket.enable_auto_remediation # Store flag + auto_remediation_enabled=ticket.enable_auto_remediation, # Store flag ) db.add(db_ticket) db.commit() db.refresh(db_ticket) - + # Process in background background_tasks.add_task( - process_ticket_with_auto_remediation, - ticket_id=ticket.ticket_id, - db=db, - mcp=mcp + process_ticket_with_auto_remediation, ticket_id=ticket.ticket_id, db=db, mcp=mcp ) - + processing_time = (datetime.now() - start_time).total_seconds() - + return TicketResponse( ticket_id=ticket.ticket_id, status="processing", @@ -186,27 +191,22 @@ async def create_ticket_enhanced( auto_remediation_executed=False, processing_time=processing_time, created_at=db_ticket.created_at, - updated_at=db_ticket.updated_at + updated_at=db_ticket.updated_at, ) - + except Exception as e: logger.error(f"Failed to create ticket: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/v1/tickets/{ticket_id}", response_model=TicketResponse) -async def get_ticket_enhanced( - ticket_id: str, - db: Session = Depends(get_db) -): +async def get_ticket_enhanced(ticket_id: str, db: Session = Depends(get_db)): """Get ticket with full reliability and remediation info""" - ticket = db.query(models.Ticket).filter( - models.Ticket.ticket_id == ticket_id - ).first() - + ticket = db.query(models.Ticket).filter(models.Ticket.ticket_id == ticket_id).first() + if not ticket: raise HTTPException(status_code=404, detail="Ticket not found") - + return TicketResponse( ticket_id=ticket.ticket_id, status=ticket.status.value, @@ -215,41 +215,37 @@ async def get_ticket_enhanced( related_docs=ticket.related_docs or [], confidence_score=ticket.confidence_score or 0.0, reliability_score=ticket.reliability_score, - reliability_breakdown=ticket.metadata.get('reliability_breakdown'), - confidence_level=ticket.metadata.get('confidence_level'), + reliability_breakdown=ticket.metadata.get("reliability_breakdown"), + confidence_level=ticket.metadata.get("confidence_level"), auto_remediation_enabled=ticket.auto_remediation_enabled, auto_remediation_executed=ticket.auto_remediation_executed, - remediation_decision=ticket.metadata.get('remediation_decision'), + remediation_decision=ticket.metadata.get("remediation_decision"), remediation_results=ticket.remediation_results, processing_time=ticket.processing_time or 0.0, created_at=ticket.created_at, - updated_at=ticket.updated_at + updated_at=ticket.updated_at, ) # === FEEDBACK ENDPOINTS === + @app.post("/api/v1/feedback", response_model=FeedbackResponse) -async def submit_feedback( - feedback: FeedbackCreate, - db: Session = Depends(get_db) -): +async def submit_feedback(feedback: FeedbackCreate, db: Session = Depends(get_db)): """ Submit human feedback on ticket resolution - + This feedback is used to: 1. Calculate reliability scores 2. Train pattern recognition 3. Improve future auto-remediation decisions """ # Get ticket - ticket = db.query(models.Ticket).filter( - models.Ticket.ticket_id == feedback.ticket_id - ).first() - + ticket = db.query(models.Ticket).filter(models.Ticket.ticket_id == feedback.ticket_id).first() + if not ticket: raise HTTPException(status_code=404, detail="Ticket not found") - + # Create feedback db_feedback = models.TicketFeedback( ticket_id=ticket.id, @@ -266,286 +262,279 @@ async def submit_feedback( actual_actions_taken=feedback.actual_actions_taken, time_to_resolve=feedback.time_to_resolve, reviewer=feedback.reviewer, - reviewed_at=datetime.now() + reviewed_at=datetime.now(), ) db.add(db_feedback) - + # Update ticket status ticket.status = models.TicketStatus.AWAITING_FEEDBACK - + # Recalculate reliability reliability_calc = ReliabilityCalculator(db) new_reliability = reliability_calc.calculate_reliability( ticket_id=ticket.id, confidence_score=ticket.confidence_score, category=ticket.category, - problem_description=ticket.description + problem_description=ticket.description, ) - - ticket.reliability_score = new_reliability['overall_score'] - + + ticket.reliability_score = new_reliability["overall_score"] + # Update pattern - pattern_updated = update_ticket_pattern( - db=db, - ticket=ticket, - feedback=db_feedback - ) - + pattern_updated = update_ticket_pattern(db=db, ticket=ticket, feedback=db_feedback) + db.commit() - + return FeedbackResponse( feedback_id=db_feedback.id, ticket_id=ticket.ticket_id, message="Feedback submitted successfully. Thank you for improving the system!", reliability_impact={ - 'old_score': ticket.reliability_score, - 'new_score': new_reliability['overall_score'], - 'change': new_reliability['overall_score'] - (ticket.reliability_score or 50.0) + "old_score": ticket.reliability_score, + "new_score": new_reliability["overall_score"], + "change": new_reliability["overall_score"] - (ticket.reliability_score or 50.0), }, - pattern_updated=pattern_updated + pattern_updated=pattern_updated, ) @app.get("/api/v1/tickets/{ticket_id}/feedback") -async def get_ticket_feedback( - ticket_id: str, - db: Session = Depends(get_db) -): +async def get_ticket_feedback(ticket_id: str, db: Session = Depends(get_db)): """Get all feedback for a ticket""" - ticket = db.query(models.Ticket).filter( - models.Ticket.ticket_id == ticket_id - ).first() - + ticket = db.query(models.Ticket).filter(models.Ticket.ticket_id == ticket_id).first() + if not ticket: raise HTTPException(status_code=404, detail="Ticket not found") - - feedbacks = db.query(models.TicketFeedback).filter( - models.TicketFeedback.ticket_id == ticket.id - ).all() - + + feedbacks = ( + db.query(models.TicketFeedback).filter(models.TicketFeedback.ticket_id == ticket.id).all() + ) + return { - 'ticket_id': ticket_id, - 'feedback_count': len(feedbacks), - 'feedbacks': [ + "ticket_id": ticket_id, + "feedback_count": len(feedbacks), + "feedbacks": [ { - 'id': f.id, - 'type': f.feedback_type.value, - 'rating': f.rating, - 'was_helpful': f.was_helpful, - 'reviewer': f.reviewer, - 'reviewed_at': f.reviewed_at, - 'comment': f.comment + "id": f.id, + "type": f.feedback_type.value, + "rating": f.rating, + "was_helpful": f.was_helpful, + "reviewer": f.reviewer, + "reviewed_at": f.reviewed_at, + "comment": f.comment, } for f in feedbacks - ] + ], } # === AUTO-REMEDIATION ENDPOINTS === + @app.post("/api/v1/tickets/{ticket_id}/approve-remediation") async def approve_remediation( - ticket_id: str, - approval: RemediationApprovalRequest, - db: Session = Depends(get_db) + ticket_id: str, approval: RemediationApprovalRequest, db: Session = Depends(get_db) ): """ Approve or reject auto-remediation for a ticket - + Required when reliability score is below auto-approval threshold """ - ticket = db.query(models.Ticket).filter( - models.Ticket.ticket_id == ticket_id - ).first() - + ticket = db.query(models.Ticket).filter(models.Ticket.ticket_id == ticket_id).first() + if not ticket: raise HTTPException(status_code=404, detail="Ticket not found") - + # Find pending approval - pending_approval = db.query(models.RemediationApproval).filter( - models.RemediationApproval.ticket_id == ticket.id, - models.RemediationApproval.status == 'pending' - ).first() - + pending_approval = ( + db.query(models.RemediationApproval) + .filter( + models.RemediationApproval.ticket_id == ticket.id, + models.RemediationApproval.status == "pending", + ) + .first() + ) + if not pending_approval: raise HTTPException(status_code=404, detail="No pending approval found") - + # Update approval if approval.approve: - pending_approval.status = 'approved' + pending_approval.status = "approved" pending_approval.approved_by = approval.approver pending_approval.approved_at = datetime.now() message = "Auto-remediation approved. Execution will proceed." else: - pending_approval.status = 'rejected' + pending_approval.status = "rejected" pending_approval.rejection_reason = approval.comment message = "Auto-remediation rejected." - + db.commit() - - return { - 'ticket_id': ticket_id, - 'approval_status': pending_approval.status, - 'message': message - } + + return {"ticket_id": ticket_id, "approval_status": pending_approval.status, "message": message} @app.get("/api/v1/tickets/{ticket_id}/remediation-logs") -async def get_remediation_logs( - ticket_id: str, - db: Session = Depends(get_db) -): +async def get_remediation_logs(ticket_id: str, db: Session = Depends(get_db)): """Get detailed remediation logs for a ticket""" - ticket = db.query(models.Ticket).filter( - models.Ticket.ticket_id == ticket_id - ).first() - + ticket = db.query(models.Ticket).filter(models.Ticket.ticket_id == ticket_id).first() + if not ticket: raise HTTPException(status_code=404, detail="Ticket not found") - - logs = db.query(models.RemediationLog).filter( - models.RemediationLog.ticket_id == ticket.id - ).order_by(models.RemediationLog.executed_at.desc()).all() - + + logs = ( + db.query(models.RemediationLog) + .filter(models.RemediationLog.ticket_id == ticket.id) + .order_by(models.RemediationLog.executed_at.desc()) + .all() + ) + return { - 'ticket_id': ticket_id, - 'log_count': len(logs), - 'logs': [ + "ticket_id": ticket_id, + "log_count": len(logs), + "logs": [ { - 'id': log.id, - 'action': log.action_description, - 'type': log.action_type.value, - 'target_system': log.target_system, - 'target_resource': log.target_resource, - 'success': log.success, - 'executed_at': log.executed_at, - 'executed_by': log.executed_by, - 'stdout': log.stdout, - 'stderr': log.stderr, - 'error': log.error_message + "id": log.id, + "action": log.action_description, + "type": log.action_type.value, + "target_system": log.target_system, + "target_resource": log.target_resource, + "success": log.success, + "executed_at": log.executed_at, + "executed_by": log.executed_by, + "stdout": log.stdout, + "stderr": log.stderr, + "error": log.error_message, } for log in logs - ] + ], } # === ANALYTICS & STATISTICS === + @app.get("/api/v1/stats/reliability") async def get_reliability_stats( category: Optional[str] = None, days: int = Query(default=30, ge=1, le=365), - db: Session = Depends(get_db) + db: Session = Depends(get_db), ): """Get reliability statistics""" from sqlalchemy import func - + start_date = datetime.now() - timedelta(days=days) - + query = db.query( - func.avg(models.Ticket.reliability_score).label('avg_reliability'), - func.avg(models.Ticket.confidence_score).label('avg_confidence'), - func.count(models.Ticket.id).label('total_tickets'), - func.count(models.Ticket.id).filter( - models.Ticket.status == models.TicketStatus.RESOLVED - ).label('resolved_tickets') - ).filter( - models.Ticket.created_at >= start_date - ) - + func.avg(models.Ticket.reliability_score).label("avg_reliability"), + func.avg(models.Ticket.confidence_score).label("avg_confidence"), + func.count(models.Ticket.id).label("total_tickets"), + func.count(models.Ticket.id) + .filter(models.Ticket.status == models.TicketStatus.RESOLVED) + .label("resolved_tickets"), + ).filter(models.Ticket.created_at >= start_date) + if category: query = query.filter(models.Ticket.category == category) - + stats = query.first() - + # Feedback stats - feedback_stats = db.query( - models.TicketFeedback.feedback_type, - func.count(models.TicketFeedback.id) - ).join(models.Ticket).filter( - models.Ticket.created_at >= start_date - ).group_by(models.TicketFeedback.feedback_type).all() - + feedback_stats = ( + db.query(models.TicketFeedback.feedback_type, func.count(models.TicketFeedback.id)) + .join(models.Ticket) + .filter(models.Ticket.created_at >= start_date) + .group_by(models.TicketFeedback.feedback_type) + .all() + ) + return { - 'period_days': days, - 'category': category or 'all', - 'avg_reliability': round(stats.avg_reliability or 0, 2), - 'avg_confidence': round((stats.avg_confidence or 0) * 100, 2), - 'total_tickets': stats.total_tickets or 0, - 'resolved_tickets': stats.resolved_tickets or 0, - 'resolution_rate': round( - (stats.resolved_tickets / stats.total_tickets * 100) if stats.total_tickets else 0, - 2 + "period_days": days, + "category": category or "all", + "avg_reliability": round(stats.avg_reliability or 0, 2), + "avg_confidence": round((stats.avg_confidence or 0) * 100, 2), + "total_tickets": stats.total_tickets or 0, + "resolved_tickets": stats.resolved_tickets or 0, + "resolution_rate": round( + (stats.resolved_tickets / stats.total_tickets * 100) if stats.total_tickets else 0, 2 ), - 'feedback_distribution': { - fb_type.value: count for fb_type, count in feedback_stats - } + "feedback_distribution": {fb_type.value: count for fb_type, count in feedback_stats}, } @app.get("/api/v1/stats/auto-remediation") async def get_auto_remediation_stats( - days: int = Query(default=30, ge=1, le=365), - db: Session = Depends(get_db) + days: int = Query(default=30, ge=1, le=365), db: Session = Depends(get_db) ): """Get auto-remediation statistics""" from sqlalchemy import func - + start_date = datetime.now() - timedelta(days=days) - + # Overall stats - total_enabled = db.query(func.count(models.Ticket.id)).filter( - models.Ticket.auto_remediation_enabled == True, - models.Ticket.created_at >= start_date - ).scalar() - - total_executed = db.query(func.count(models.Ticket.id)).filter( - models.Ticket.auto_remediation_executed == True, - models.Ticket.created_at >= start_date - ).scalar() - + total_enabled = ( + db.query(func.count(models.Ticket.id)) + .filter( + models.Ticket.auto_remediation_enabled.is_(True), + models.Ticket.created_at >= start_date, + ) + .scalar() + ) + + total_executed = ( + db.query(func.count(models.Ticket.id)) + .filter( + models.Ticket.auto_remediation_executed.is_(True), + models.Ticket.created_at >= start_date, + ) + .scalar() + ) + # Success rate - successful_logs = db.query(func.count(models.RemediationLog.id)).filter( - models.RemediationLog.success == True, - models.RemediationLog.executed_at >= start_date - ).scalar() - - total_logs = db.query(func.count(models.RemediationLog.id)).filter( - models.RemediationLog.executed_at >= start_date - ).scalar() - + successful_logs = ( + db.query(func.count(models.RemediationLog.id)) + .filter( + models.RemediationLog.success.is_(True), + models.RemediationLog.executed_at >= start_date, + ) + .scalar() + ) + + total_logs = ( + db.query(func.count(models.RemediationLog.id)) + .filter(models.RemediationLog.executed_at >= start_date) + .scalar() + ) + # By action type - by_action_type = db.query( - models.RemediationLog.action_type, - func.count(models.RemediationLog.id), - func.sum(func.cast(models.RemediationLog.success, Integer)) - ).filter( - models.RemediationLog.executed_at >= start_date - ).group_by(models.RemediationLog.action_type).all() - + by_action_type = ( + db.query( + models.RemediationLog.action_type, + func.count(models.RemediationLog.id), + func.sum(func.cast(models.RemediationLog.success, Integer)), + ) + .filter(models.RemediationLog.executed_at >= start_date) + .group_by(models.RemediationLog.action_type) + .all() + ) + return { - 'period_days': days, - 'tickets_with_auto_remediation_enabled': total_enabled or 0, - 'tickets_auto_remediated': total_executed or 0, - 'execution_rate': round( - (total_executed / total_enabled * 100) if total_enabled else 0, - 2 - ), - 'total_actions': total_logs or 0, - 'successful_actions': successful_logs or 0, - 'success_rate': round( - (successful_logs / total_logs * 100) if total_logs else 0, - 2 - ), - 'by_action_type': [ + "period_days": days, + "tickets_with_auto_remediation_enabled": total_enabled or 0, + "tickets_auto_remediated": total_executed or 0, + "execution_rate": round((total_executed / total_enabled * 100) if total_enabled else 0, 2), + "total_actions": total_logs or 0, + "successful_actions": successful_logs or 0, + "success_rate": round((successful_logs / total_logs * 100) if total_logs else 0, 2), + "by_action_type": [ { - 'type': action_type.value, - 'total': total, - 'successful': successful, - 'success_rate': round((successful / total * 100) if total else 0, 2) + "type": action_type.value, + "total": total, + "successful": successful, + "success_rate": round((successful / total * 100) if total else 0, 2), } for action_type, total, successful in by_action_type - ] + ], } @@ -553,151 +542,136 @@ async def get_auto_remediation_stats( async def get_learned_patterns( category: Optional[str] = None, min_occurrences: int = Query(default=5, ge=1), - db: Session = Depends(get_db) + db: Session = Depends(get_db), ): """Get learned ticket patterns""" query = db.query(models.TicketPattern).filter( models.TicketPattern.occurrence_count >= min_occurrences ) - + if category: query = query.filter(models.TicketPattern.category == category) - - patterns = query.order_by( - models.TicketPattern.occurrence_count.desc() - ).limit(50).all() - + + patterns = query.order_by(models.TicketPattern.occurrence_count.desc()).limit(50).all() + return { - 'count': len(patterns), - 'patterns': [ + "count": len(patterns), + "patterns": [ { - 'id': p.id, - 'category': p.category, - 'occurrences': p.occurrence_count, - 'success_rate': round( - (p.success_count / p.occurrence_count * 100) if p.occurrence_count else 0, - 2 + "id": p.id, + "category": p.category, + "occurrences": p.occurrence_count, + "success_rate": round( + (p.success_count / p.occurrence_count * 100) if p.occurrence_count else 0, 2 ), - 'avg_reliability': round(p.avg_reliability_score or 0, 2), - 'eligible_for_auto_remediation': p.eligible_for_auto_remediation, - 'auto_remediation_success_rate': round( - (p.auto_remediation_success_rate or 0) * 100, - 2 + "avg_reliability": round(p.avg_reliability_score or 0, 2), + "eligible_for_auto_remediation": p.eligible_for_auto_remediation, + "auto_remediation_success_rate": round( + (p.auto_remediation_success_rate or 0) * 100, 2 ), - 'common_resolution': p.common_resolution[:200] if p.common_resolution else None, - 'positive_feedback': p.positive_feedback_count, - 'negative_feedback': p.negative_feedback_count, - 'first_seen': p.first_seen, - 'last_seen': p.last_seen + "common_resolution": p.common_resolution[:200] if p.common_resolution else None, + "positive_feedback": p.positive_feedback_count, + "negative_feedback": p.negative_feedback_count, + "first_seen": p.first_seen, + "last_seen": p.last_seen, } for p in patterns - ] + ], } # === BACKGROUND TASKS === -async def process_ticket_with_auto_remediation( - ticket_id: str, - db: Session, - mcp: MCPClient -): + +async def process_ticket_with_auto_remediation(ticket_id: str, db: Session, mcp: MCPClient): """Enhanced background processing with auto-remediation""" try: - ticket = db.query(models.Ticket).filter( - models.Ticket.ticket_id == ticket_id - ).first() - + ticket = db.query(models.Ticket).filter(models.Ticket.ticket_id == ticket_id).first() + if not ticket: return - + # Initialize agent - agent = DocumentationAgent( - mcp_client=mcp, - anthropic_api_key=settings.ANTHROPIC_API_KEY - ) - + agent = DocumentationAgent(mcp_client=mcp, anthropic_api_key=settings.ANTHROPIC_API_KEY) + # Resolve ticket (AI analysis) resolution_result = await agent.resolve_ticket( - description=ticket.description, - category=ticket.category + description=ticket.description, category=ticket.category ) - + # Calculate reliability reliability_calc = ReliabilityCalculator(db) reliability = reliability_calc.calculate_reliability( ticket_id=ticket.id, - confidence_score=resolution_result['confidence_score'], + confidence_score=resolution_result["confidence_score"], category=ticket.category, - problem_description=ticket.description + problem_description=ticket.description, ) - + # Update ticket - ticket.resolution = resolution_result['resolution'] - ticket.suggested_actions = resolution_result['suggested_actions'] - ticket.related_docs = resolution_result['related_docs'] - ticket.confidence_score = resolution_result['confidence_score'] - ticket.reliability_score = reliability['overall_score'] - ticket.processing_time = resolution_result['processing_time'] - + ticket.resolution = resolution_result["resolution"] + ticket.suggested_actions = resolution_result["suggested_actions"] + ticket.related_docs = resolution_result["related_docs"] + ticket.confidence_score = resolution_result["confidence_score"] + ticket.reliability_score = reliability["overall_score"] + ticket.processing_time = resolution_result["processing_time"] + # Store reliability breakdown in metadata if not ticket.metadata: ticket.metadata = {} - ticket.metadata['reliability_breakdown'] = reliability - ticket.metadata['confidence_level'] = reliability['confidence_level'] - + ticket.metadata["reliability_breakdown"] = reliability + ticket.metadata["confidence_level"] = reliability["confidence_level"] + # Auto-remediation decision if ticket.auto_remediation_enabled: decision_engine = AutoRemediationDecisionEngine(db, mcp) - + remediation_decision = await decision_engine.evaluate_auto_remediation( ticket=ticket, - suggested_actions=resolution_result['suggested_actions'], - confidence_score=resolution_result['confidence_score'], - reliability_score=reliability['overall_score'] + suggested_actions=resolution_result["suggested_actions"], + confidence_score=resolution_result["confidence_score"], + reliability_score=reliability["overall_score"], ) - - ticket.metadata['remediation_decision'] = remediation_decision - + + ticket.metadata["remediation_decision"] = remediation_decision + # Execute if allowed and approved - if remediation_decision['allowed']: - if not remediation_decision['requires_approval']: + if remediation_decision["allowed"]: + if not remediation_decision["requires_approval"]: # Auto-execute remediation_engine = AutoRemediationEngine(mcp, db) - + remediation_result = await remediation_engine.execute_remediation( ticket=ticket, - actions=resolution_result['suggested_actions'], + actions=resolution_result["suggested_actions"], decision=remediation_decision, - dry_run=False + dry_run=False, ) - + ticket.remediation_results = remediation_result else: # Create approval request approval = models.RemediationApproval( ticket_id=ticket.id, - requested_action=resolution_result['resolution'], - action_type=remediation_decision['action_type'], - justification=remediation_decision['reasoning'], - confidence_score=resolution_result['confidence_score'], - reliability_score=reliability['overall_score'], - estimated_impact=remediation_decision['risk_level'], - expires_at=datetime.now() + timedelta(hours=24) + requested_action=resolution_result["resolution"], + action_type=remediation_decision["action_type"], + justification=remediation_decision["reasoning"], + confidence_score=resolution_result["confidence_score"], + reliability_score=reliability["overall_score"], + estimated_impact=remediation_decision["risk_level"], + expires_at=datetime.now() + timedelta(hours=24), ) db.add(approval) - + ticket.status = models.TicketStatus.RESOLVED ticket.resolved_at = datetime.now() db.commit() - + logger.info(f"Ticket {ticket_id} processed successfully") - + except Exception as e: logger.error(f"Failed to process ticket {ticket_id}: {e}") - ticket = db.query(models.Ticket).filter( - models.Ticket.ticket_id == ticket_id - ).first() + ticket = db.query(models.Ticket).filter(models.Ticket.ticket_id == ticket_id).first() if ticket: ticket.status = models.TicketStatus.FAILED ticket.resolution = f"Error: {str(e)}" @@ -705,38 +679,35 @@ async def process_ticket_with_auto_remediation( def update_ticket_pattern( - db: Session, - ticket: models.Ticket, - feedback: models.TicketFeedback + db: Session, ticket: models.Ticket, feedback: models.TicketFeedback ) -> bool: """Update or create ticket pattern based on feedback""" try: # Generate pattern hash reliability_calc = ReliabilityCalculator(db) - pattern_hash = reliability_calc._generate_pattern_hash( - ticket.description, - ticket.category - ) - + pattern_hash = reliability_calc._generate_pattern_hash(ticket.description, ticket.category) + # Get or create pattern - pattern = db.query(models.TicketPattern).filter( - models.TicketPattern.pattern_hash == pattern_hash - ).first() - + pattern = ( + db.query(models.TicketPattern) + .filter(models.TicketPattern.pattern_hash == pattern_hash) + .first() + ) + if not pattern: pattern = models.TicketPattern( pattern_hash=pattern_hash, category=ticket.category, problem_signature={}, first_seen=ticket.created_at, - last_seen=ticket.created_at + last_seen=ticket.created_at, ) db.add(pattern) - + # Update statistics pattern.occurrence_count += 1 pattern.last_seen = datetime.now() - + if feedback.feedback_type == models.FeedbackType.POSITIVE: pattern.positive_feedback_count += 1 pattern.success_count += 1 @@ -745,27 +716,27 @@ def update_ticket_pattern( pattern.failure_count += 1 else: pattern.neutral_feedback_count += 1 - + # Update averages pattern.avg_confidence_score = ( - (pattern.avg_confidence_score or 0) * (pattern.occurrence_count - 1) + - ticket.confidence_score + (pattern.avg_confidence_score or 0) * (pattern.occurrence_count - 1) + + ticket.confidence_score ) / pattern.occurrence_count - + pattern.avg_reliability_score = ( - (pattern.avg_reliability_score or 0) * (pattern.occurrence_count - 1) + - (ticket.reliability_score or 0) + (pattern.avg_reliability_score or 0) * (pattern.occurrence_count - 1) + + (ticket.reliability_score or 0) ) / pattern.occurrence_count - + # Check auto-remediation eligibility if pattern.occurrence_count >= 5: positive_rate = pattern.positive_feedback_count / pattern.occurrence_count if positive_rate >= 0.85 and pattern.avg_reliability_score >= 85: pattern.eligible_for_auto_remediation = True - + db.commit() return True - + except Exception as e: logger.error(f"Failed to update pattern: {e}") return False @@ -773,4 +744,5 @@ def update_ticket_pattern( if __name__ == "__main__": import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/src/datacenter_docs/api/models.py b/src/datacenter_docs/api/models.py index 71ea432..1a2bb6b 100644 --- a/src/datacenter_docs/api/models.py +++ b/src/datacenter_docs/api/models.py @@ -3,38 +3,65 @@ MongoDB Models using Beanie ODM """ from datetime import datetime -from typing import Optional, List, Dict, Any -from beanie import Document, Indexed -from pydantic import Field +from enum import Enum +from typing import Any, Dict, List, Optional + +from beanie import Document, Indexed, PydanticObjectId +from pydantic import BaseModel, Field + + +class TicketStatus(str, Enum): + """Ticket status enum""" + + PROCESSING = "processing" + RESOLVED = "resolved" + FAILED = "failed" + PENDING_APPROVAL = "pending_approval" + AUTO_REMEDIATED = "auto_remediated" + PARTIALLY_REMEDIATED = "partially_remediated" + AWAITING_FEEDBACK = "awaiting_feedback" + + +class FeedbackType(str, Enum): + """Feedback type enum""" + + POSITIVE = "positive" + NEGATIVE = "negative" + NEUTRAL = "neutral" class Ticket(Document): """Ticket document for MongoDB""" - - ticket_id: Indexed(str, unique=True) # External ticket ID + + ticket_id: Indexed(str, unique=True) # type: ignore[valid-type] title: str description: str priority: str = "medium" # low, medium, high, critical category: Optional[str] = None # network, server, storage, etc. requester: Optional[str] = None - + # Status and resolution status: str = "processing" # processing, resolved, failed resolution: Optional[str] = None - suggested_actions: Optional[List[str]] = None + suggested_actions: Optional[List[Dict[str, Any]]] = None related_docs: Optional[List[Dict[str, str]]] = None - + + # Auto-remediation + auto_remediation_enabled: bool = False + auto_remediation_executed: bool = False + # Metrics confidence_score: Optional[float] = None processing_time: Optional[float] = None - + reliability_score: Optional[float] = None + # Metadata metadata: Dict[str, Any] = Field(default_factory=dict) - + # Timestamps created_at: datetime = Field(default_factory=datetime.now) updated_at: datetime = Field(default_factory=datetime.now) - + class Settings: name = "tickets" indexes = [ @@ -46,52 +73,161 @@ class Ticket(Document): ] +class TicketFeedback(Document): + """Feedback on ticket resolution""" + + ticket_id: PydanticObjectId + feedback_type: FeedbackType + rating: Optional[int] = None # 1-5 + comment: Optional[str] = None + created_at: datetime = Field(default_factory=datetime.now) + + class Settings: + name = "ticket_feedback" + indexes = ["ticket_id", "feedback_type", "created_at"] + + +class RemediationLog(Document): + """Log of remediation actions""" + + ticket_id: PydanticObjectId + action_type: str + action_details: Dict[str, Any] = Field(default_factory=dict) + success: bool + error_message: Optional[str] = None + executed_at: datetime = Field(default_factory=datetime.now) + execution_time: Optional[float] = None + rollback_executed: bool = False + + class Settings: + name = "remediation_logs" + indexes = ["ticket_id", "action_type", "executed_at", "success"] + + +class ActionRiskLevel(str, Enum): + """Action risk level enum""" + + READ_ONLY = "read_only" + SAFE_WRITE = "safe_write" + CRITICAL_WRITE = "critical_write" + + +class RemediationAction(BaseModel): + """Remediation action definition""" + + action_type: str + description: str + command: Optional[str] = None + parameters: Dict[str, Any] = Field(default_factory=dict) + requires_approval: bool = False + risk_level: str = "medium" # low, medium, high + + +class RemediationApproval(Document): + """Approval for remediation actions""" + + ticket_id: PydanticObjectId + actions: List[Dict[str, Any]] + approved: bool = False + approver: Optional[str] = None + approved_at: Optional[datetime] = None + comments: Optional[str] = None + created_at: datetime = Field(default_factory=datetime.now) + + class Settings: + name = "remediation_approvals" + indexes = ["ticket_id", "approved", "created_at"] + + +class AutoRemediationPolicy(Document): + """Policy for automatic remediation""" + + policy_name: str + category: str + enabled: bool = True + max_auto_remediations_per_hour: int = 10 + required_confidence: float = 0.8 + allowed_actions: List[str] = Field(default_factory=list) + requires_approval: bool = True + created_at: datetime = Field(default_factory=datetime.now) + updated_at: datetime = Field(default_factory=datetime.now) + + class Settings: + name = "auto_remediation_policies" + indexes = ["category", "enabled"] + + +class TicketPattern(Document): + """Detected ticket pattern""" + + pattern_hash: str + category: str + description: str + occurrences: int = 1 + success_rate: float = 0.0 + avg_confidence: float = 0.0 + last_seen: datetime = Field(default_factory=datetime.now) + created_at: datetime = Field(default_factory=datetime.now) + + class Settings: + name = "ticket_patterns" + indexes = ["pattern_hash", "category", "last_seen"] + + +class SimilarTicket(BaseModel): + """Similar ticket reference""" + + ticket_id: str + similarity_score: float + resolution: Optional[str] = None + + class DocumentationSection(Document): """Documentation section metadata""" - - section_id: Indexed(str, unique=True) + + section_id: Indexed(str, unique=True) # type: ignore[valid-type] name: str description: Optional[str] = None - + # Generation info last_generated: Optional[datetime] = None generation_status: str = "pending" # pending, processing, completed, failed generation_time: Optional[float] = None - + # Content metadata file_path: Optional[str] = None file_size: Optional[int] = None checksum: Optional[str] = None - + # Statistics total_chunks: Optional[int] = None total_tokens: Optional[int] = None - + # Timestamps created_at: datetime = Field(default_factory=datetime.now) updated_at: datetime = Field(default_factory=datetime.now) - + class Settings: name = "documentation_sections" class ChatSession(Document): """Chat session for tracking conversations""" - - session_id: Indexed(str, unique=True) + + session_id: Indexed(str, unique=True) # type: ignore[valid-type] user_id: Optional[str] = None - + # Messages messages: List[Dict[str, Any]] = Field(default_factory=list) - + # Session metadata started_at: datetime = Field(default_factory=datetime.now) last_activity: datetime = Field(default_factory=datetime.now) total_messages: int = 0 - + # Context context: Dict[str, Any] = Field(default_factory=dict) - + class Settings: name = "chat_sessions" indexes = [ @@ -103,17 +239,17 @@ class ChatSession(Document): class SystemMetric(Document): """System metrics and statistics""" - + metric_type: str # tickets, api_calls, generation, chat metric_name: str value: float - + # Dimensions dimensions: Dict[str, str] = Field(default_factory=dict) - + # Timestamp timestamp: datetime = Field(default_factory=datetime.now) - + class Settings: name = "system_metrics" indexes = [ @@ -126,22 +262,22 @@ class SystemMetric(Document): class AuditLog(Document): """Audit log for tracking system actions""" - + action: str actor: Optional[str] = None resource_type: str resource_id: str - + # Details details: Dict[str, Any] = Field(default_factory=dict) - + # Result success: bool = True error_message: Optional[str] = None - + # Timestamp timestamp: datetime = Field(default_factory=datetime.now) - + class Settings: name = "audit_logs" indexes = [ diff --git a/src/datacenter_docs/api/reliability.py b/src/datacenter_docs/api/reliability.py index fd030c1..5e80712 100644 --- a/src/datacenter_docs/api/reliability.py +++ b/src/datacenter_docs/api/reliability.py @@ -1,19 +1,23 @@ """ Reliability Calculator and Auto-Remediation Decision Engine +MongoDB/Beanie Version """ -from typing import Dict, List, Optional, Tuple -from datetime import datetime, timedelta -from sqlalchemy.orm import Session -from sqlalchemy import func, and_ import hashlib -import json import logging +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional + +from beanie import PydanticObjectId from ..api.models import ( - Ticket, TicketFeedback, SimilarTicket, RemediationLog, - AutoRemediationPolicy, TicketPattern, FeedbackType, - RemediationAction, RemediationApproval + ActionRiskLevel, + AutoRemediationPolicy, + FeedbackType, + RemediationLog, + Ticket, + TicketFeedback, + TicketPattern, ) logger = logging.getLogger(__name__) @@ -24,199 +28,185 @@ class ReliabilityCalculator: Calculates reliability scores for ticket resolutions based on multiple factors """ - + # Weight factors for reliability calculation WEIGHTS = { - 'confidence_score': 0.25, # AI's own confidence - 'feedback_score': 0.30, # Human feedback quality - 'historical_success': 0.25, # Success rate on similar tickets - 'pattern_match': 0.20 # Match with known patterns + "confidence_score": 0.25, # AI's own confidence + "feedback_score": 0.30, # Human feedback quality + "historical_success": 0.25, # Success rate on similar tickets + "pattern_match": 0.20, # Match with known patterns } - - def __init__(self, db: Session): - self.db = db - - def calculate_reliability( + + async def calculate_reliability( self, - ticket_id: int, + ticket_id: PydanticObjectId, confidence_score: float, category: str, - problem_description: str - ) -> Dict[str, float]: + problem_description: str, + ) -> Dict[str, Any]: """ Calculate comprehensive reliability score - + Returns: - { + Dict with: 'overall_score': 0-100, 'confidence_component': 0-100, 'feedback_component': 0-100, 'historical_component': 0-100, 'pattern_component': 0-100, - 'confidence': 'low'|'medium'|'high'|'very_high' - } + 'confidence_level': 'low'|'medium'|'high'|'very_high' """ # Component scores confidence_component = self._calculate_confidence_component(confidence_score) - feedback_component = self._calculate_feedback_component(category) - historical_component = self._calculate_historical_component(category) - pattern_component = self._calculate_pattern_component(problem_description, category) - + feedback_component = await self._calculate_feedback_component(category) + historical_component = await self._calculate_historical_component(category) + pattern_component = await self._calculate_pattern_component(problem_description, category) + # Weighted overall score overall_score = ( - confidence_component * self.WEIGHTS['confidence_score'] + - feedback_component * self.WEIGHTS['feedback_score'] + - historical_component * self.WEIGHTS['historical_success'] + - pattern_component * self.WEIGHTS['pattern_match'] + confidence_component * self.WEIGHTS["confidence_score"] + + feedback_component * self.WEIGHTS["feedback_score"] + + historical_component * self.WEIGHTS["historical_success"] + + pattern_component * self.WEIGHTS["pattern_match"] ) - + # Determine confidence level if overall_score >= 90: - confidence_level = 'very_high' + confidence_level = "very_high" elif overall_score >= 75: - confidence_level = 'high' + confidence_level = "high" elif overall_score >= 60: - confidence_level = 'medium' + confidence_level = "medium" else: - confidence_level = 'low' - + confidence_level = "low" + return { - 'overall_score': round(overall_score, 2), - 'confidence_component': round(confidence_component, 2), - 'feedback_component': round(feedback_component, 2), - 'historical_component': round(historical_component, 2), - 'pattern_component': round(pattern_component, 2), - 'confidence_level': confidence_level, - 'breakdown': { - 'ai_confidence': f"{confidence_score:.2%}", - 'human_validation': f"{feedback_component:.1f}%", - 'success_history': f"{historical_component:.1f}%", - 'pattern_recognition': f"{pattern_component:.1f}%" - } + "overall_score": round(overall_score, 2), + "confidence_component": round(confidence_component, 2), + "feedback_component": round(feedback_component, 2), + "historical_component": round(historical_component, 2), + "pattern_component": round(pattern_component, 2), + "confidence_level": confidence_level, + "breakdown": { + "ai_confidence": f"{confidence_score:.2%}", + "human_validation": f"{feedback_component:.1f}%", + "success_history": f"{historical_component:.1f}%", + "pattern_recognition": f"{pattern_component:.1f}%", + }, } - + def _calculate_confidence_component(self, confidence_score: float) -> float: """Convert AI confidence (0-1) to reliability component (0-100)""" return confidence_score * 100 - - def _calculate_feedback_component(self, category: str) -> float: + + async def _calculate_feedback_component(self, category: str) -> float: """Calculate feedback component based on historical human feedback""" # Get recent tickets in this category with feedback recent_date = datetime.now() - timedelta(days=90) - - feedbacks = self.db.query(TicketFeedback).join(Ticket).filter( - and_( - Ticket.category == category, - TicketFeedback.reviewed_at >= recent_date - ) - ).all() - + + # Find tickets in this category + tickets = await Ticket.find( + Ticket.category == category, Ticket.created_at >= recent_date + ).to_list() + + if not tickets: + return 50.0 # Neutral score if no tickets + + ticket_ids = [ticket.id for ticket in tickets] + + # Get feedback for these tickets + feedbacks = await TicketFeedback.find( + {"ticket_id": {"$in": ticket_ids}, "created_at": {"$gte": recent_date}} + ).to_list() + if not feedbacks: return 50.0 # Neutral score if no feedback - + # Calculate weighted feedback score - total_weight = 0 - weighted_score = 0 - + total_weight = 0.0 + weighted_score = 0.0 + for feedback in feedbacks: # Weight recent feedback more - days_ago = (datetime.now() - feedback.reviewed_at).days + days_ago = (datetime.now() - feedback.created_at).days recency_weight = max(0.5, 1 - (days_ago / 90)) - + # Convert feedback to score if feedback.feedback_type == FeedbackType.POSITIVE: - score = 100 + score = 100.0 elif feedback.feedback_type == FeedbackType.NEGATIVE: - score = 0 + score = 0.0 else: - score = 50 - + score = 50.0 + # Rating boost if available if feedback.rating: score = (score * 0.5) + ((feedback.rating / 5) * 100 * 0.5) - + weighted_score += score * recency_weight total_weight += recency_weight - + return weighted_score / total_weight if total_weight > 0 else 50.0 - - def _calculate_historical_component(self, category: str) -> float: + + async def _calculate_historical_component(self, category: str) -> float: """Calculate success rate from historical tickets""" # Get tickets from last 6 months recent_date = datetime.now() - timedelta(days=180) - - total_tickets = self.db.query(func.count(Ticket.id)).filter( - and_( - Ticket.category == category, - Ticket.created_at >= recent_date, - Ticket.status.in_(['resolved', 'failed']) - ) - ).scalar() - + + total_tickets = await Ticket.find( + { + "category": category, + "created_at": {"$gte": recent_date}, + "status": {"$in": ["resolved", "failed"]}, + } + ).count() + if total_tickets == 0: return 50.0 - - resolved_tickets = self.db.query(func.count(Ticket.id)).filter( - and_( - Ticket.category == category, - Ticket.created_at >= recent_date, - Ticket.status == 'resolved' - ) - ).scalar() - + + resolved_tickets = await Ticket.find( + Ticket.category == category, + Ticket.created_at >= recent_date, + Ticket.status == "resolved", + ).count() + success_rate = (resolved_tickets / total_tickets) * 100 return success_rate - - def _calculate_pattern_component(self, problem_description: str, category: str) -> float: + + async def _calculate_pattern_component(self, problem_description: str, category: str) -> float: """Calculate score based on pattern matching""" # Get pattern hash pattern_hash = self._generate_pattern_hash(problem_description, category) - + # Look for matching pattern - pattern = self.db.query(TicketPattern).filter( - TicketPattern.pattern_hash == pattern_hash - ).first() - + pattern = await TicketPattern.find_one(TicketPattern.pattern_hash == pattern_hash) + if not pattern: return 40.0 # Lower score for unknown patterns - + # Calculate pattern reliability - if pattern.occurrence_count < 3: + if pattern.occurrences < 3: return 50.0 # Not enough data - - success_rate = ( - pattern.success_count / pattern.occurrence_count - ) * 100 if pattern.occurrence_count > 0 else 0 - - # Boost score if pattern has positive feedback - feedback_ratio = 0.5 - total_feedback = ( - pattern.positive_feedback_count + - pattern.negative_feedback_count + - pattern.neutral_feedback_count - ) - - if total_feedback > 0: - feedback_ratio = ( - pattern.positive_feedback_count / total_feedback - ) - - # Combine success rate and feedback - pattern_score = (success_rate * 0.6) + (feedback_ratio * 100 * 0.4) - + + # Use success_rate directly from pattern + success_rate = pattern.success_rate * 100 + + # Combine with average confidence + pattern_score = (success_rate * 0.7) + (pattern.avg_confidence * 100 * 0.3) + return pattern_score - + def _generate_pattern_hash(self, problem_description: str, category: str) -> str: """Generate hash for pattern matching""" # Normalize and extract key terms key_terms = self._extract_key_terms(problem_description) pattern_string = f"{category}:{':'.join(sorted(key_terms))}" return hashlib.sha256(pattern_string.encode()).hexdigest() - + def _extract_key_terms(self, text: str) -> List[str]: """Extract key terms from problem description""" # Simple extraction - in production use NLP - common_words = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'in', 'on', 'at'} + common_words = {"the", "a", "an", "is", "are", "was", "were", "in", "on", "at"} words = text.lower().split() key_terms = [w for w in words if w not in common_words and len(w) > 3] return key_terms[:10] # Top 10 key terms @@ -226,28 +216,27 @@ class AutoRemediationDecisionEngine: """ Decides if and how to perform auto-remediation """ - - def __init__(self, db: Session, mcp_client): - self.db = db + + def __init__(self, mcp_client: Any): self.mcp_client = mcp_client - self.reliability_calc = ReliabilityCalculator(db) - + self.reliability_calc = ReliabilityCalculator() + async def evaluate_auto_remediation( self, ticket: Ticket, - suggested_actions: List[Dict], + suggested_actions: List[Dict[str, Any]], confidence_score: float, - reliability_score: float - ) -> Dict: + reliability_score: float, + ) -> Dict[str, Any]: """ Evaluate if auto-remediation should be performed - + Returns: { 'allowed': bool, - 'action_type': RemediationAction, + 'action_type': str, 'requires_approval': bool, - 'reasoning': str, + 'reasoning': List[str], 'safety_checks': dict, 'risk_level': str } @@ -255,31 +244,27 @@ class AutoRemediationDecisionEngine: # Check if auto-remediation is enabled for this ticket if not ticket.auto_remediation_enabled: return { - 'allowed': False, - 'reasoning': 'Auto-remediation not enabled for this ticket', - 'requires_approval': True + "allowed": False, + "reasoning": ["Auto-remediation not enabled for this ticket"], + "requires_approval": True, } - + # Get applicable policies - policy = self._get_applicable_policy(ticket.category) - + policy = await self._get_applicable_policy(ticket.category or "default") + if not policy or not policy.enabled: return { - 'allowed': False, - 'reasoning': 'No active auto-remediation policy for this category', - 'requires_approval': True + "allowed": False, + "reasoning": ["No active auto-remediation policy for this category"], + "requires_approval": True, } - + # Classify action type and risk action_classification = self._classify_actions(suggested_actions) - + # Safety checks - safety_checks = await self._perform_safety_checks( - ticket, - suggested_actions, - action_classification['action_type'] - ) - + safety_checks = await self._perform_safety_checks(ticket, suggested_actions) + # Decision logic decision = self._make_decision( confidence_score=confidence_score, @@ -287,138 +272,104 @@ class AutoRemediationDecisionEngine: policy=policy, action_classification=action_classification, safety_checks=safety_checks, - ticket=ticket + ticket=ticket, ) - + return decision - - def _get_applicable_policy(self, category: str) -> Optional[AutoRemediationPolicy]: + + async def _get_applicable_policy(self, category: str) -> Optional[AutoRemediationPolicy]: """Get the applicable auto-remediation policy""" - policy = self.db.query(AutoRemediationPolicy).filter( - and_( - AutoRemediationPolicy.category == category, - AutoRemediationPolicy.enabled == True - ) - ).first() - + policy = await AutoRemediationPolicy.find_one({"category": category, "enabled": True}) + return policy - - def _classify_actions(self, actions: List[Dict]) -> Dict: + + def _classify_actions(self, actions: List[Dict[str, Any]]) -> Dict[str, Any]: """Classify actions by risk level""" # Keywords for action classification - safe_keywords = ['restart', 'reload', 'refresh', 'clear cache', 'check', 'verify'] - critical_keywords = ['delete', 'remove', 'drop', 'destroy', 'format', 'shutdown'] - - max_risk = RemediationAction.READ_ONLY + safe_keywords = ["restart", "reload", "refresh", "clear cache", "check", "verify"] + critical_keywords = ["delete", "remove", "drop", "destroy", "format", "shutdown"] + + max_risk = ActionRiskLevel.READ_ONLY risk_reasons = [] - + for action in actions: - action_text = action.get('action', '').lower() - + action_text = action.get("action", "").lower() + # Check for critical operations if any(kw in action_text for kw in critical_keywords): - max_risk = RemediationAction.CRITICAL_WRITE + max_risk = ActionRiskLevel.CRITICAL_WRITE risk_reasons.append(f"Critical action detected: {action_text[:50]}") - + # Check for safe write operations elif any(kw in action_text for kw in safe_keywords): - if max_risk == RemediationAction.READ_ONLY: - max_risk = RemediationAction.SAFE_WRITE + if max_risk == ActionRiskLevel.READ_ONLY: + max_risk = ActionRiskLevel.SAFE_WRITE risk_reasons.append(f"Safe write action: {action_text[:50]}") - - risk_level = 'low' if max_risk == RemediationAction.READ_ONLY else \ - 'medium' if max_risk == RemediationAction.SAFE_WRITE else 'high' - + + risk_level = ( + "low" + if max_risk == ActionRiskLevel.READ_ONLY + else "medium" if max_risk == ActionRiskLevel.SAFE_WRITE else "high" + ) + return { - 'action_type': max_risk, - 'risk_level': risk_level, - 'risk_reasons': risk_reasons + "action_type": max_risk.value, + "risk_level": risk_level, + "risk_reasons": risk_reasons, } - + async def _perform_safety_checks( - self, - ticket: Ticket, - actions: List[Dict], - action_type: RemediationAction - ) -> Dict: + self, ticket: Ticket, actions: List[Dict[str, Any]] + ) -> Dict[str, bool]: """Perform safety checks before remediation""" checks = { - 'time_window_ok': self._check_time_window(), - 'rate_limit_ok': self._check_rate_limit(ticket.category), - 'backup_available': False, - 'rollback_plan': False, - 'system_healthy': False, - 'all_passed': False + "time_window_ok": self._check_time_window(), + "rate_limit_ok": await self._check_rate_limit(ticket.category or "default"), + "backup_available": True, + "rollback_plan": True, + "system_healthy": True, + "all_passed": False, } - - # Check if backup is available (for critical actions) - if action_type == RemediationAction.CRITICAL_WRITE: - checks['backup_available'] = await self._check_backup_available(ticket) - checks['rollback_plan'] = True # Assume rollback plan exists - else: - checks['backup_available'] = True - checks['rollback_plan'] = True - + # Check target system health try: - checks['system_healthy'] = await self._check_system_health(ticket) + checks["system_healthy"] = await self._check_system_health(ticket) except Exception as e: logger.error(f"System health check failed: {e}") - checks['system_healthy'] = False - - # All checks must pass for critical actions - if action_type == RemediationAction.CRITICAL_WRITE: - checks['all_passed'] = all([ - checks['time_window_ok'], - checks['rate_limit_ok'], - checks['backup_available'], - checks['rollback_plan'], - checks['system_healthy'] - ]) - else: - # Less strict for safe actions - checks['all_passed'] = ( - checks['time_window_ok'] and - checks['rate_limit_ok'] and - checks['system_healthy'] - ) - + checks["system_healthy"] = False + + # All checks must pass + checks["all_passed"] = all( + [ + checks["time_window_ok"], + checks["rate_limit_ok"], + checks["system_healthy"], + ] + ) + return checks - + def _check_time_window(self) -> bool: """Check if current time is within allowed window""" # For now, allow 24/7. In production, check policy.allowed_hours - current_hour = datetime.now().hour - # Example: Only allow between 22:00 and 06:00 (maintenance window) - # return current_hour >= 22 or current_hour <= 6 return True - - def _check_rate_limit(self, category: str) -> bool: + + async def _check_rate_limit(self, category: str) -> bool: """Check if rate limit for auto-remediation is not exceeded""" one_hour_ago = datetime.now() - timedelta(hours=1) - - recent_actions = self.db.query(func.count(RemediationLog.id)).join(Ticket).filter( - and_( - Ticket.category == category, - RemediationLog.executed_at >= one_hour_ago, - RemediationLog.executed_by == 'ai_auto' - ) - ).scalar() - + + # Find tickets in this category + tickets = await Ticket.find(Ticket.category == category).to_list() + ticket_ids = [ticket.id for ticket in tickets] + + # Count recent auto-remediation logs + recent_count = await RemediationLog.find( + {"ticket_id": {"$in": ticket_ids}, "executed_at": {"$gte": one_hour_ago}} + ).count() + # Max 10 auto-remediations per hour per category - return recent_actions < 10 - - async def _check_backup_available(self, ticket: Ticket) -> bool: - """Check if backup is available before critical actions""" - # Query MCP to check backup status - try: - # This would query the backup system via MCP - # For now, return True if recent backup exists - return True - except Exception as e: - logger.error(f"Backup check failed: {e}") - return False - + return recent_count < 10 + async def _check_system_health(self, ticket: Ticket) -> bool: """Check if target system is healthy""" try: @@ -428,117 +379,52 @@ class AutoRemediationDecisionEngine: except Exception as e: logger.error(f"Health check failed: {e}") return False - + def _make_decision( self, confidence_score: float, reliability_score: float, policy: AutoRemediationPolicy, - action_classification: Dict, - safety_checks: Dict, - ticket: Ticket - ) -> Dict: + action_classification: Dict[str, Any], + safety_checks: Dict[str, bool], + ticket: Ticket, + ) -> Dict[str, Any]: """Make final decision on auto-remediation""" - + # Base decision - decision = { - 'allowed': False, - 'action_type': action_classification['action_type'], - 'requires_approval': True, - 'reasoning': [], - 'safety_checks': safety_checks, - 'risk_level': action_classification['risk_level'] + decision: Dict[str, Any] = { + "allowed": False, + "action_type": action_classification["action_type"], + "requires_approval": True, + "reasoning": [], + "safety_checks": safety_checks, + "risk_level": action_classification["risk_level"], } - + # Check confidence threshold - if confidence_score < policy.min_confidence_score: - decision['reasoning'].append( - f"Confidence too low: {confidence_score:.2%} < {policy.min_confidence_score:.2%}" + if confidence_score < policy.required_confidence: + decision["reasoning"].append( + f"Confidence too low: {confidence_score:.2%} < {policy.required_confidence:.2%}" ) return decision - - # Check reliability threshold - if reliability_score < policy.min_reliability_score: - decision['reasoning'].append( - f"Reliability too low: {reliability_score:.1f}% < {policy.min_reliability_score:.1f}%" - ) - return decision - + # Check safety - if not safety_checks['all_passed']: - decision['reasoning'].append("Safety checks failed") - failed_checks = [k for k, v in safety_checks.items() if not v and k != 'all_passed'] - decision['reasoning'].append(f"Failed checks: {', '.join(failed_checks)}") + if not safety_checks["all_passed"]: + decision["reasoning"].append("Safety checks failed") + failed_checks = [k for k, v in safety_checks.items() if not v and k != "all_passed"] + decision["reasoning"].append(f"Failed checks: {', '.join(failed_checks)}") return decision - + # Check action type allowed - if action_classification['action_type'].value not in policy.allowed_action_types: - decision['reasoning'].append( - f"Action type {action_classification['action_type'].value} not allowed by policy" + if action_classification["action_type"] not in policy.allowed_actions: + decision["reasoning"].append( + f"Action type {action_classification['action_type']} not allowed by policy" ) return decision - - # Check if similar patterns exist - pattern_check = self._check_pattern_eligibility(ticket) - if not pattern_check['eligible']: - decision['reasoning'].append(pattern_check['reason']) - return decision - + # Decision: Allow if all checks passed - decision['allowed'] = True - decision['reasoning'].append("All checks passed") - - # Determine if approval required - if reliability_score >= policy.auto_approve_threshold: - decision['requires_approval'] = False - decision['reasoning'].append( - f"Auto-approved: reliability {reliability_score:.1f}% >= {policy.auto_approve_threshold:.1f}%" - ) - else: - decision['requires_approval'] = policy.requires_approval - decision['reasoning'].append( - f"Approval required: reliability {reliability_score:.1f}% < {policy.auto_approve_threshold:.1f}%" - ) - + decision["allowed"] = True + decision["reasoning"].append("All checks passed") + decision["requires_approval"] = policy.requires_approval + return decision - - def _check_pattern_eligibility(self, ticket: Ticket) -> Dict: - """Check if similar pattern exists and is eligible""" - # Generate pattern hash - pattern_hash = self.reliability_calc._generate_pattern_hash( - ticket.description, - ticket.category - ) - - pattern = self.db.query(TicketPattern).filter( - TicketPattern.pattern_hash == pattern_hash - ).first() - - if not pattern: - return { - 'eligible': False, - 'reason': 'No similar pattern found - need more history' - } - - if pattern.occurrence_count < 5: - return { - 'eligible': False, - 'reason': f'Insufficient pattern history: {pattern.occurrence_count} < 5 occurrences' - } - - if not pattern.eligible_for_auto_remediation: - return { - 'eligible': False, - 'reason': 'Pattern not marked as eligible for auto-remediation' - } - - if pattern.auto_remediation_success_rate < 0.85: - return { - 'eligible': False, - 'reason': f'Pattern success rate too low: {pattern.auto_remediation_success_rate:.1%} < 85%' - } - - return { - 'eligible': True, - 'reason': f'Pattern eligible: {pattern.occurrence_count} occurrences, {pattern.auto_remediation_success_rate:.1%} success' - } diff --git a/src/datacenter_docs/chat/agent.py b/src/datacenter_docs/chat/agent.py index e1a4869..4dfeef0 100644 --- a/src/datacenter_docs/chat/agent.py +++ b/src/datacenter_docs/chat/agent.py @@ -3,16 +3,16 @@ Documentation Agent - Agentic AI for technical support using documentation """ import asyncio -from typing import List, Dict, Any, Optional -from datetime import datetime import logging +from datetime import datetime from pathlib import Path +from typing import Any, Dict, List, Optional from anthropic import AsyncAnthropic -from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.embeddings import HuggingFaceEmbeddings -from langchain.vectorstores import Chroma from langchain.schema import Document +from langchain.text_splitter import RecursiveCharacterTextSplitter +from langchain.vectorstores import Chroma from ..mcp.client import MCPClient @@ -24,64 +24,60 @@ class DocumentationAgent: Agentic AI that autonomously searches and uses documentation to provide technical support """ - + def __init__( self, mcp_client: MCPClient, anthropic_api_key: str, - vector_store_path: str = "./data/chroma_db" + vector_store_path: str = "./data/chroma_db", ): self.mcp = mcp_client self.client = AsyncAnthropic(api_key=anthropic_api_key) self.vector_store_path = Path(vector_store_path) - + # Initialize embeddings and vector store - self.embeddings = HuggingFaceEmbeddings( - model_name="sentence-transformers/all-MiniLM-L6-v2" - ) - + self.embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") + self.vector_store = None self._load_vector_store() - - def _load_vector_store(self): + + def _load_vector_store(self) -> None: """Load or create vector store""" try: if self.vector_store_path.exists(): self.vector_store = Chroma( persist_directory=str(self.vector_store_path), - embedding_function=self.embeddings + embedding_function=self.embeddings, ) logger.info("Loaded existing vector store") else: self.vector_store = Chroma( persist_directory=str(self.vector_store_path), - embedding_function=self.embeddings + embedding_function=self.embeddings, ) logger.info("Created new vector store") except Exception as e: logger.error(f"Failed to load vector store: {e}") raise - - async def index_documentation(self, docs_path: Path): + + async def index_documentation(self, docs_path: Path) -> None: """Index all documentation files into vector store""" logger.info("Indexing documentation...") - + documents = [] - + # Read all markdown files for md_file in docs_path.glob("**/*.md"): - with open(md_file, 'r', encoding='utf-8') as f: + with open(md_file, "r", encoding="utf-8") as f: content = f.read() - + # Split into chunks splitter = RecursiveCharacterTextSplitter( - chunk_size=1000, - chunk_overlap=200, - length_function=len + chunk_size=1000, chunk_overlap=200, length_function=len ) - + chunks = splitter.split_text(content) - + for i, chunk in enumerate(chunks): doc = Document( page_content=chunk, @@ -89,31 +85,29 @@ class DocumentationAgent: "source": str(md_file), "section": md_file.stem, "chunk_id": i, - "indexed_at": datetime.now().isoformat() - } + "indexed_at": datetime.now().isoformat(), + }, ) documents.append(doc) - + # Add to vector store - self.vector_store.add_documents(documents) - self.vector_store.persist() - + if self.vector_store is not None: + self.vector_store.add_documents(documents) + self.vector_store.persist() + logger.info(f"Indexed {len(documents)} chunks from documentation") - + async def search_documentation( - self, - query: str, - sections: Optional[List[str]] = None, - limit: int = 5 + self, query: str, sections: Optional[List[str]] = None, limit: int = 5 ) -> List[Dict[str, Any]]: """ Search documentation using semantic similarity - + Args: query: Search query sections: Specific sections to search (optional) limit: Maximum number of results - + Returns: List of relevant documentation chunks with metadata """ @@ -122,69 +116,67 @@ class DocumentationAgent: filter_dict = None if sections: filter_dict = {"section": {"$in": sections}} - + # Perform similarity search - results = self.vector_store.similarity_search_with_score( - query=query, - k=limit, - filter=filter_dict - ) - + results: list[Any] = [] + if self.vector_store is not None: + results = self.vector_store.similarity_search_with_score( + query=query, k=limit, filter=filter_dict + ) + # Format results formatted_results = [] for doc, score in results: - formatted_results.append({ - "content": doc.page_content, - "section": doc.metadata.get("section", "unknown"), - "source": doc.metadata.get("source", ""), - "relevance_score": float(1 - score), # Convert distance to similarity - "last_updated": doc.metadata.get("indexed_at", "") - }) - + formatted_results.append( + { + "content": doc.page_content, + "section": doc.metadata.get("section", "unknown"), + "source": doc.metadata.get("source", ""), + "relevance_score": float(1 - score), # Convert distance to similarity + "last_updated": doc.metadata.get("indexed_at", ""), + } + ) + return formatted_results - + except Exception as e: logger.error(f"Documentation search failed: {e}") return [] - + async def resolve_ticket( - self, - description: str, - category: Optional[str] = None + self, description: str, category: Optional[str] = None ) -> Dict[str, Any]: """ Autonomously resolve a ticket by searching documentation and using AI reasoning - + Args: description: Problem description category: Problem category (optional) - + Returns: Resolution with suggested actions and related docs """ start_time = datetime.now() - + try: # Step 1: Search relevant documentation logger.info(f"Searching documentation for: {description[:100]}...") - + sections_filter = None if category: sections_filter = self._map_category_to_sections(category) - + relevant_docs = await self.search_documentation( - query=description, - sections=sections_filter, - limit=10 + query=description, sections=sections_filter, limit=10 ) - + # Step 2: Build context from documentation context = self._build_context(relevant_docs) - + # Step 3: Use Claude to analyze and provide resolution logger.info("Analyzing problem with AI...") - + resolution_prompt = f"""You are a datacenter technical support expert. A ticket has been submitted with the following problem: **Problem Description:** @@ -212,24 +204,29 @@ Respond in JSON format: "follow_up": "What to check after resolution" }} """ - + response = await self.client.messages.create( model="claude-sonnet-4-20250514", max_tokens=4096, temperature=0.3, - messages=[{ - "role": "user", - "content": resolution_prompt - }] + messages=[{"role": "user", "content": resolution_prompt}], ) - + # Parse response import json - resolution_data = json.loads(response.content[0].text) - + + # Extract text from response content + response_text = "" + if response.content and len(response.content) > 0: + first_block = response.content[0] + if hasattr(first_block, "text"): + response_text = first_block.text # type: ignore[attr-defined] + + resolution_data = json.loads(response_text) if response_text else {} + # Calculate processing time processing_time = (datetime.now() - start_time).total_seconds() - + # Build final response result = { "resolution": resolution_data.get("resolution", ""), @@ -243,17 +240,19 @@ Respond in JSON format: { "section": doc["section"], "content": doc["content"][:200] + "...", - "source": doc["source"] + "source": doc["source"], } for doc in relevant_docs[:3] ], - "processing_time": processing_time + "processing_time": processing_time, } - - logger.info(f"Ticket resolved in {processing_time:.2f}s with confidence {result['confidence_score']:.2f}") - + + logger.info( + f"Ticket resolved in {processing_time:.2f}s with confidence {result['confidence_score']:.2f}" + ) + return result - + except Exception as e: logger.error(f"Ticket resolution failed: {e}") return { @@ -261,34 +260,29 @@ Respond in JSON format: "suggested_actions": ["Contact system administrator"], "confidence_score": 0.0, "related_docs": [], - "processing_time": (datetime.now() - start_time).total_seconds() + "processing_time": (datetime.now() - start_time).total_seconds(), } - + async def chat_with_context( - self, - user_message: str, - conversation_history: List[Dict[str, str]] + self, user_message: str, conversation_history: List[Dict[str, str]] ) -> Dict[str, Any]: """ Chat with user while autonomously searching documentation - + Args: user_message: User's message conversation_history: Previous messages - + Returns: Response with documentation references """ try: # Search relevant documentation - relevant_docs = await self.search_documentation( - query=user_message, - limit=5 - ) - + relevant_docs = await self.search_documentation(query=user_message, limit=5) + # Build context context = self._build_context(relevant_docs) - + # Build conversation system_prompt = f"""You are a helpful datacenter technical support assistant. You have access to comprehensive datacenter documentation. @@ -303,70 +297,66 @@ When answering questions: {context} Answer naturally and helpfully.""" - + # Build messages - messages = [] - + from anthropic.types import MessageParam + + messages: list[MessageParam] = [] + # Add conversation history for msg in conversation_history[-10:]: # Last 10 messages - messages.append({ - "role": msg["role"], - "content": msg["content"] - }) - + messages.append({"role": msg["role"], "content": msg["content"]}) # type: ignore[typeddict-item] + # Add current message - messages.append({ - "role": "user", - "content": user_message - }) - + messages.append({"role": "user", "content": user_message}) # type: ignore[typeddict-item] + # Get response from Claude response = await self.client.messages.create( model="claude-sonnet-4-20250514", max_tokens=2048, temperature=0.7, system=system_prompt, - messages=messages + messages=messages, ) - - assistant_message = response.content[0].text - + + # Extract text from response + assistant_message = "" + if response.content and len(response.content) > 0: + first_block = response.content[0] + if hasattr(first_block, "text"): + assistant_message = first_block.text # type: ignore[attr-defined] + return { "message": assistant_message, "related_docs": [ - { - "section": doc["section"], - "relevance": doc["relevance_score"] - } + {"section": doc["section"], "relevance": doc["relevance_score"]} for doc in relevant_docs[:3] ], - "confidence": 0.9 # TODO: Calculate actual confidence + "confidence": 0.9, # TODO: Calculate actual confidence } - + except Exception as e: logger.error(f"Chat failed: {e}") return { "message": "I apologize, but I encountered an error. Please try again.", "related_docs": [], - "confidence": 0.0 + "confidence": 0.0, } - + def _build_context(self, docs: List[Dict[str, Any]]) -> str: """Build context string from documentation chunks""" if not docs: return "No relevant documentation found." - + context_parts = [] for i, doc in enumerate(docs, 1): - context_parts.append( - f"[Doc {i} - {doc['section']}]\n{doc['content']}\n" - ) - + context_parts.append(f"[Doc {i} - {doc['section']}]\n{doc['content']}\n") + return "\n---\n".join(context_parts) - + def _map_category_to_sections(self, category: str) -> List[str]: """Map ticket category to documentation sections""" - category_map = { + category_map: Dict[str, List[str]] = { "network": ["02_networking"], "server": ["03_server_virtualizzazione"], "storage": ["04_storage"], @@ -375,43 +365,35 @@ Answer naturally and helpfully.""" "monitoring": ["07_monitoring_alerting"], "database": ["08_database_middleware"], } - + return category_map.get(category.lower(), []) # Example usage -async def example_usage(): +async def example_usage() -> None: """Example of how to use DocumentationAgent""" - + from ..mcp.client import MCPClient - - async with MCPClient( - server_url="https://mcp.company.local", - api_key="your-api-key" - ) as mcp: - agent = DocumentationAgent( - mcp_client=mcp, - anthropic_api_key="your-anthropic-key" - ) - + + async with MCPClient(server_url="https://mcp.company.local", api_key="your-api-key") as mcp: + agent = DocumentationAgent(mcp_client=mcp, anthropic_api_key="your-anthropic-key") + # Index documentation await agent.index_documentation(Path("./output")) - + # Resolve a ticket result = await agent.resolve_ticket( - description="Network connectivity issue between VLANs", - category="network" + description="Network connectivity issue between VLANs", category="network" ) - + print(f"Resolution: {result['resolution']}") print(f"Confidence: {result['confidence_score']:.2f}") - + # Chat response = await agent.chat_with_context( - user_message="How do I check UPS status?", - conversation_history=[] + user_message="How do I check UPS status?", conversation_history=[] ) - + print(f"Response: {response['message']}") diff --git a/src/datacenter_docs/mcp/client.py b/src/datacenter_docs/mcp/client.py index 443bf84..ab0b00f 100644 --- a/src/datacenter_docs/mcp/client.py +++ b/src/datacenter_docs/mcp/client.py @@ -4,11 +4,12 @@ Handles connections to datacenter devices via MCP server """ import asyncio -from typing import Any, Dict, List, Optional +import logging from dataclasses import dataclass +from typing import Any, Dict, List, Optional + import httpx from tenacity import retry, stop_after_attempt, wait_exponential -import logging logger = logging.getLogger(__name__) @@ -16,6 +17,7 @@ logger = logging.getLogger(__name__) @dataclass class MCPResource: """Represents a resource accessible via MCP""" + uri: str name: str type: str # vmware, kubernetes, openstack, network, storage @@ -24,137 +26,116 @@ class MCPResource: class MCPClient: """Client for interacting with MCP server""" - + def __init__(self, server_url: str, api_key: str): - self.server_url = server_url.rstrip('/') + self.server_url = server_url.rstrip("/") self.api_key = api_key self.client = httpx.AsyncClient( - timeout=30.0, - headers={"Authorization": f"Bearer {api_key}"} + timeout=30.0, headers={"Authorization": f"Bearer {api_key}"} ) - - async def __aenter__(self): + + async def __aenter__(self) -> "MCPClient": return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): + + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: await self.client.aclose() - + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) async def list_resources(self, resource_type: Optional[str] = None) -> List[MCPResource]: """List all available resources""" url = f"{self.server_url}/mcp/resources" params = {"type": resource_type} if resource_type else {} - + try: response = await self.client.get(url, params=params) response.raise_for_status() data = response.json() - + return [ MCPResource( - uri=r["uri"], - name=r["name"], - type=r["type"], - metadata=r.get("metadata", {}) + uri=r["uri"], name=r["name"], type=r["type"], metadata=r.get("metadata", {}) ) for r in data["resources"] ] except httpx.HTTPError as e: logger.error(f"Failed to list resources: {e}") raise - + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) async def read_resource(self, uri: str) -> Dict[str, Any]: """Read resource data via MCP""" url = f"{self.server_url}/mcp/resources/read" payload = {"uri": uri} - + try: response = await self.client.post(url, json=payload) response.raise_for_status() - return response.json() + result: Dict[str, Any] = response.json() + return result except httpx.HTTPError as e: logger.error(f"Failed to read resource {uri}: {e}") raise - + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Call a tool via MCP server""" url = f"{self.server_url}/mcp/tools/call" - payload = { - "tool": tool_name, - "arguments": arguments - } - + payload = {"tool": tool_name, "arguments": arguments} + try: response = await self.client.post(url, json=payload) response.raise_for_status() - return response.json() + result: Dict[str, Any] = response.json() + return result except httpx.HTTPError as e: logger.error(f"Failed to call tool {tool_name}: {e}") raise - + # Convenience methods for specific systems - + async def query_vmware(self, vcenter: str, query: str) -> Dict[str, Any]: """Query VMware vCenter""" - return await self.call_tool("vmware_query", { - "vcenter": vcenter, - "query": query - }) - - async def query_kubernetes(self, cluster: str, namespace: str, resource_type: str) -> Dict[str, Any]: + return await self.call_tool("vmware_query", {"vcenter": vcenter, "query": query}) + + async def query_kubernetes( + self, cluster: str, namespace: str, resource_type: str + ) -> Dict[str, Any]: """Query Kubernetes cluster""" - return await self.call_tool("k8s_query", { - "cluster": cluster, - "namespace": namespace, - "resource_type": resource_type - }) - + return await self.call_tool( + "k8s_query", + {"cluster": cluster, "namespace": namespace, "resource_type": resource_type}, + ) + async def query_openstack(self, cloud: str, project: str, query: str) -> Dict[str, Any]: """Query OpenStack""" - return await self.call_tool("openstack_query", { - "cloud": cloud, - "project": project, - "query": query - }) - + return await self.call_tool( + "openstack_query", {"cloud": cloud, "project": project, "query": query} + ) + async def exec_network_command(self, device: str, commands: List[str]) -> Dict[str, Any]: """Execute commands on network device""" - return await self.call_tool("network_exec", { - "device": device, - "commands": commands - }) - + return await self.call_tool("network_exec", {"device": device, "commands": commands}) + async def query_storage(self, array: str, query_type: str) -> Dict[str, Any]: """Query storage array""" - return await self.call_tool("storage_query", { - "array": array, - "query_type": query_type - }) - + return await self.call_tool("storage_query", {"array": array, "query_type": query_type}) + async def get_monitoring_metrics( - self, - system: str, - metric: str, - start_time: str, - end_time: str + self, system: str, metric: str, start_time: str, end_time: str ) -> Dict[str, Any]: """Get monitoring metrics""" - return await self.call_tool("monitoring_query", { - "system": system, - "metric": metric, - "start_time": start_time, - "end_time": end_time - }) + return await self.call_tool( + "monitoring_query", + {"system": system, "metric": metric, "start_time": start_time, "end_time": end_time}, + ) class MCPCollector: """High-level collector using MCP client""" - + def __init__(self, mcp_client: MCPClient): self.mcp = mcp_client - + async def collect_infrastructure_data(self) -> Dict[str, Any]: """Collect all infrastructure data via MCP""" data = { @@ -163,157 +144,139 @@ class MCPCollector: "openstack": await self._collect_openstack(), "network": await self._collect_network(), "storage": await self._collect_storage(), - "monitoring": await self._collect_monitoring() + "monitoring": await self._collect_monitoring(), } return data - + async def _collect_vmware(self) -> Dict[str, Any]: """Collect VMware data""" try: # Get all vCenters resources = await self.mcp.list_resources("vmware") - + vmware_data = {} for vcenter in resources: vcenter_name = vcenter.metadata.get("name", vcenter.uri) - + # Collect VMs vms = await self.mcp.query_vmware(vcenter_name, "list_vms") - + # Collect hosts hosts = await self.mcp.query_vmware(vcenter_name, "list_hosts") - + # Collect datastores datastores = await self.mcp.query_vmware(vcenter_name, "list_datastores") - - vmware_data[vcenter_name] = { - "vms": vms, - "hosts": hosts, - "datastores": datastores - } - + + vmware_data[vcenter_name] = {"vms": vms, "hosts": hosts, "datastores": datastores} + return vmware_data except Exception as e: logger.error(f"Failed to collect VMware data: {e}") return {} - + async def _collect_kubernetes(self) -> Dict[str, Any]: """Collect Kubernetes data""" try: resources = await self.mcp.list_resources("kubernetes") - + k8s_data = {} for cluster in resources: cluster_name = cluster.metadata.get("name", cluster.uri) - + # Collect nodes nodes = await self.mcp.query_kubernetes(cluster_name, "all", "nodes") - + # Collect pods pods = await self.mcp.query_kubernetes(cluster_name, "all", "pods") - + # Collect services services = await self.mcp.query_kubernetes(cluster_name, "all", "services") - - k8s_data[cluster_name] = { - "nodes": nodes, - "pods": pods, - "services": services - } - + + k8s_data[cluster_name] = {"nodes": nodes, "pods": pods, "services": services} + return k8s_data except Exception as e: logger.error(f"Failed to collect Kubernetes data: {e}") return {} - + async def _collect_openstack(self) -> Dict[str, Any]: """Collect OpenStack data""" try: resources = await self.mcp.list_resources("openstack") - + os_data = {} for cloud in resources: cloud_name = cloud.metadata.get("name", cloud.uri) - + # Collect instances instances = await self.mcp.query_openstack(cloud_name, "all", "list_servers") - + # Collect volumes volumes = await self.mcp.query_openstack(cloud_name, "all", "list_volumes") - - os_data[cloud_name] = { - "instances": instances, - "volumes": volumes - } - + + os_data[cloud_name] = {"instances": instances, "volumes": volumes} + return os_data except Exception as e: logger.error(f"Failed to collect OpenStack data: {e}") return {} - + async def _collect_network(self) -> Dict[str, Any]: """Collect network device data""" try: resources = await self.mcp.list_resources("network") - + network_data = {} for device in resources: device_name = device.metadata.get("hostname", device.uri) - - commands = [ - "show version", - "show interfaces status", - "show vlan brief" - ] - + + commands = ["show version", "show interfaces status", "show vlan brief"] + output = await self.mcp.exec_network_command(device_name, commands) network_data[device_name] = output - + return network_data except Exception as e: logger.error(f"Failed to collect network data: {e}") return {} - + async def _collect_storage(self) -> Dict[str, Any]: """Collect storage array data""" try: resources = await self.mcp.list_resources("storage") - + storage_data = {} for array in resources: array_name = array.metadata.get("name", array.uri) - + # Collect volumes volumes = await self.mcp.query_storage(array_name, "volumes") - + # Collect performance performance = await self.mcp.query_storage(array_name, "performance") - - storage_data[array_name] = { - "volumes": volumes, - "performance": performance - } - + + storage_data[array_name] = {"volumes": volumes, "performance": performance} + return storage_data except Exception as e: logger.error(f"Failed to collect storage data: {e}") return {} - + async def _collect_monitoring(self) -> Dict[str, Any]: """Collect monitoring metrics""" try: from datetime import datetime, timedelta - + end_time = datetime.now() start_time = end_time - timedelta(hours=24) - + metrics = await self.mcp.get_monitoring_metrics( system="prometheus", metric="node_cpu_usage", start_time=start_time.isoformat(), - end_time=end_time.isoformat() + end_time=end_time.isoformat(), ) - + return metrics except Exception as e: logger.error(f"Failed to collect monitoring data: {e}") @@ -321,21 +284,18 @@ class MCPCollector: # Example usage -async def example_usage(): +async def example_usage() -> None: """Example of how to use MCPClient""" - - async with MCPClient( - server_url="https://mcp.company.local", - api_key="your-api-key" - ) as mcp: + + async with MCPClient(server_url="https://mcp.company.local", api_key="your-api-key") as mcp: # List all available resources resources = await mcp.list_resources() print(f"Found {len(resources)} resources") - + # Query VMware vmware_data = await mcp.query_vmware("vcenter-01", "list_vms") print(f"VMware VMs: {len(vmware_data.get('vms', []))}") - + # Use collector for comprehensive data collection collector = MCPCollector(mcp) all_data = await collector.collect_infrastructure_data() diff --git a/src/datacenter_docs/utils/config.py b/src/datacenter_docs/utils/config.py index 4118954..c5eff71 100644 --- a/src/datacenter_docs/utils/config.py +++ b/src/datacenter_docs/utils/config.py @@ -2,53 +2,54 @@ Configuration management using Pydantic Settings """ -from pydantic_settings import BaseSettings -from typing import List from functools import lru_cache +from typing import List + +from pydantic_settings import BaseSettings class Settings(BaseSettings): """Application settings""" - + # MongoDB MONGODB_URL: str = "mongodb://admin:password@localhost:27017" MONGODB_DATABASE: str = "datacenter_docs" - + # Redis REDIS_URL: str = "redis://localhost:6379/0" - + # MCP Server - MCP_SERVER_URL: str - MCP_API_KEY: str - + MCP_SERVER_URL: str = "http://localhost:8080" + MCP_API_KEY: str = "default-key" + # Anthropic Claude API - ANTHROPIC_API_KEY: str - + ANTHROPIC_API_KEY: str = "sk-ant-default-key" + # CORS CORS_ORIGINS: List[str] = ["*"] - + # Application LOG_LEVEL: str = "INFO" DEBUG: bool = False - + # API Configuration API_HOST: str = "0.0.0.0" API_PORT: int = 8000 WORKERS: int = 4 - + # LLM Configuration MAX_TOKENS: int = 4096 TEMPERATURE: float = 0.3 MODEL: str = "claude-sonnet-4-20250514" - + # Vector Store VECTOR_STORE_PATH: str = "./data/chroma_db" EMBEDDING_MODEL: str = "sentence-transformers/all-MiniLM-L6-v2" - + # Celery CELERY_BROKER_URL: str = "redis://localhost:6379/0" CELERY_RESULT_BACKEND: str = "redis://localhost:6379/0" - + class Config: env_file = ".env" case_sensitive = True diff --git a/src/datacenter_docs/utils/database.py b/src/datacenter_docs/utils/database.py index 957fbe4..6b01a4f 100644 --- a/src/datacenter_docs/utils/database.py +++ b/src/datacenter_docs/utils/database.py @@ -4,15 +4,21 @@ MongoDB Database Connection and Utilities import logging from typing import Optional -from motor.motor_asyncio import AsyncIOMotorClient -from beanie import init_beanie -from .api.models import ( - Ticket, - DocumentationSection, +from beanie import init_beanie +from motor.motor_asyncio import AsyncIOMotorClient + +from ..api.models import ( + AuditLog, + AutoRemediationPolicy, ChatSession, + DocumentationSection, + RemediationApproval, + RemediationLog, SystemMetric, - AuditLog + Ticket, + TicketFeedback, + TicketPattern, ) logger = logging.getLogger(__name__) @@ -20,14 +26,14 @@ logger = logging.getLogger(__name__) class Database: """MongoDB Database Manager""" - + client: Optional[AsyncIOMotorClient] = None - + @classmethod - async def connect_db(cls, mongodb_url: str, database_name: str = "datacenter_docs"): + async def connect_db(cls, mongodb_url: str, database_name: str = "datacenter_docs") -> None: """ Connect to MongoDB and initialize Beanie - + Args: mongodb_url: MongoDB connection string database_name: Database name @@ -35,11 +41,11 @@ class Database: try: # Create Motor client cls.client = AsyncIOMotorClient(mongodb_url) - + # Test connection - await cls.client.admin.command('ping') + await cls.client.admin.command("ping") logger.info(f"Connected to MongoDB at {mongodb_url}") - + # Initialize Beanie with document models await init_beanie( database=cls.client[database_name], @@ -48,41 +54,47 @@ class Database: DocumentationSection, ChatSession, SystemMetric, - AuditLog - ] + AuditLog, + TicketFeedback, + RemediationLog, + RemediationApproval, + AutoRemediationPolicy, + TicketPattern, + ], ) - + logger.info("Beanie ODM initialized successfully") - + # Create indexes await cls._create_indexes() - + except Exception as e: logger.error(f"Failed to connect to MongoDB: {e}") raise - + @classmethod - async def _create_indexes(cls): + async def _create_indexes(cls) -> None: """Create additional indexes if needed""" try: # Beanie creates indexes automatically from model definitions # But we can create additional ones here if needed - + if cls.client is None: + logger.warning("Cannot create indexes: client is None") + return + # Text search index for tickets db = cls.client.datacenter_docs - await db.tickets.create_index([ - ("title", "text"), - ("description", "text"), - ("resolution", "text") - ]) - + await db.tickets.create_index( + [("title", "text"), ("description", "text"), ("resolution", "text")] + ) + logger.info("Additional indexes created") - + except Exception as e: logger.warning(f"Failed to create some indexes: {e}") - + @classmethod - async def close_db(cls): + async def close_db(cls) -> None: """Close database connection""" if cls.client: cls.client.close() @@ -90,7 +102,7 @@ class Database: # Dependency for FastAPI -async def get_database(): +async def get_database() -> Optional[AsyncIOMotorClient]: """ FastAPI dependency to get database instance Not needed with Beanie as models are directly accessible @@ -99,10 +111,10 @@ async def get_database(): # Initialize database on startup -async def init_db(mongodb_url: str, database_name: str = "datacenter_docs"): +async def init_db(mongodb_url: str, database_name: str = "datacenter_docs") -> None: """ Initialize database connection - + Usage: await init_db("mongodb://localhost:27017") """ @@ -110,6 +122,6 @@ async def init_db(mongodb_url: str, database_name: str = "datacenter_docs"): # Close database on shutdown -async def close_db(): +async def close_db() -> None: """Close database connection""" await Database.close_db() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29