Features: - Automated datacenter documentation generation - MCP integration for device connectivity - Auto-remediation engine with safety checks - Multi-factor reliability scoring (0-100%) - Human feedback learning loop - Pattern recognition and continuous improvement - Agentic chat support with AI - API for ticket resolution - Frontend React with Material-UI - CI/CD pipelines (GitLab + Gitea) - Docker & Kubernetes deployment - Complete documentation and guides v2.0 Highlights: - Auto-remediation with write operations (disabled by default) - Reliability calculator with 4-factor scoring - Human feedback system for continuous learning - Pattern-based progressive automation - Approval workflow for critical actions - Full audit trail and rollback capability
664 lines
20 KiB
Markdown
664 lines
20 KiB
Markdown
# Script di Raccolta Dati per Documentazione Datacenter
|
|
|
|
## 1. Script Python Principali
|
|
|
|
### 1.1 Main Orchestrator
|
|
```python
|
|
#!/usr/bin/env python3
|
|
"""
|
|
main.py - Orchestrator principale per generazione documentazione
|
|
"""
|
|
|
|
import sys
|
|
import argparse
|
|
import logging
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
# Import moduli custom
|
|
from collectors import (
|
|
InfrastructureCollector,
|
|
NetworkCollector,
|
|
VirtualizationCollector,
|
|
StorageCollector,
|
|
SecurityCollector,
|
|
BackupCollector,
|
|
MonitoringCollector,
|
|
DatabaseCollector,
|
|
ProcedureCollector,
|
|
ImprovementAnalyzer
|
|
)
|
|
|
|
from generators import DocumentationGenerator
|
|
from validators import DocumentValidator
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DatacenterDocGenerator:
|
|
def __init__(self, config_file='config.yaml'):
|
|
self.config = self.load_config(config_file)
|
|
self.sections = []
|
|
|
|
def load_config(self, config_file):
|
|
"""Load configuration from YAML file"""
|
|
import yaml
|
|
with open(config_file, 'r') as f:
|
|
return yaml.safe_load(f)
|
|
|
|
def collect_data(self, section=None):
|
|
"""Collect data from all sources"""
|
|
collectors = {
|
|
'01': InfrastructureCollector(self.config),
|
|
'02': NetworkCollector(self.config),
|
|
'03': VirtualizationCollector(self.config),
|
|
'04': StorageCollector(self.config),
|
|
'05': SecurityCollector(self.config),
|
|
'06': BackupCollector(self.config),
|
|
'07': MonitoringCollector(self.config),
|
|
'08': DatabaseCollector(self.config),
|
|
'09': ProcedureCollector(self.config),
|
|
}
|
|
|
|
data = {}
|
|
sections_to_process = [section] if section else collectors.keys()
|
|
|
|
for section_id in sections_to_process:
|
|
try:
|
|
logger.info(f"Collecting data for section {section_id}")
|
|
collector = collectors.get(section_id)
|
|
if collector:
|
|
data[section_id] = collector.collect()
|
|
logger.info(f"✓ Section {section_id} data collected")
|
|
except Exception as e:
|
|
logger.error(f"✗ Failed to collect section {section_id}: {e}")
|
|
data[section_id] = None
|
|
|
|
return data
|
|
|
|
def generate_documentation(self, data):
|
|
"""Generate markdown documentation from collected data"""
|
|
generator = DocumentationGenerator(self.config)
|
|
|
|
for section_id, section_data in data.items():
|
|
if section_data:
|
|
try:
|
|
logger.info(f"Generating documentation for section {section_id}")
|
|
output_file = f"output/section_{section_id}.md"
|
|
generator.generate(section_id, section_data, output_file)
|
|
|
|
# Validate generated document
|
|
validator = DocumentValidator()
|
|
if validator.validate(output_file):
|
|
logger.info(f"✓ Section {section_id} generated and validated")
|
|
self.sections.append(section_id)
|
|
else:
|
|
logger.warning(f"⚠ Section {section_id} validation warnings")
|
|
|
|
except Exception as e:
|
|
logger.error(f"✗ Failed to generate section {section_id}: {e}")
|
|
|
|
# Generate improvement section based on all other sections
|
|
if len(self.sections) > 0:
|
|
logger.info("Analyzing for improvements...")
|
|
analyzer = ImprovementAnalyzer(self.config)
|
|
improvements = analyzer.analyze(data)
|
|
generator.generate('10', improvements, "output/section_10.md")
|
|
|
|
def run(self, section=None, dry_run=False):
|
|
"""Main execution flow"""
|
|
logger.info("=" * 60)
|
|
logger.info("Starting Datacenter Documentation Generation")
|
|
logger.info(f"Timestamp: {datetime.now().isoformat()}")
|
|
logger.info("=" * 60)
|
|
|
|
try:
|
|
# Collect data
|
|
data = self.collect_data(section)
|
|
|
|
if dry_run:
|
|
logger.info("DRY RUN - Data collection complete, skipping generation")
|
|
return True
|
|
|
|
# Generate documentation
|
|
self.generate_documentation(data)
|
|
|
|
logger.info("=" * 60)
|
|
logger.info(f"✓ Documentation generation completed successfully")
|
|
logger.info(f"Sections updated: {', '.join(self.sections)}")
|
|
logger.info("=" * 60)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Fatal error during documentation generation: {e}")
|
|
return False
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Generate Datacenter Documentation')
|
|
parser.add_argument('--section', help='Generate specific section only (01-10)')
|
|
parser.add_argument('--dry-run', action='store_true', help='Collect data without generating docs')
|
|
parser.add_argument('--config', default='config.yaml', help='Configuration file path')
|
|
parser.add_argument('--debug', action='store_true', help='Enable debug logging')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.debug:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
generator = DatacenterDocGenerator(args.config)
|
|
success = generator.run(section=args.section, dry_run=args.dry_run)
|
|
|
|
sys.exit(0 if success else 1)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
```
|
|
|
|
---
|
|
|
|
## 2. Collector Modules
|
|
|
|
### 2.1 Infrastructure Collector
|
|
```python
|
|
#!/usr/bin/env python3
|
|
"""
|
|
collectors/infrastructure.py - Raccolta dati infrastruttura fisica
|
|
"""
|
|
|
|
from dataclasses import dataclass
|
|
from typing import List, Dict
|
|
import requests
|
|
from pysnmp.hlapi import *
|
|
|
|
@dataclass
|
|
class UPSData:
|
|
id: str
|
|
model: str
|
|
power_kva: float
|
|
battery_capacity: float
|
|
autonomy_minutes: int
|
|
status: str
|
|
last_test: str
|
|
|
|
class InfrastructureCollector:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.asset_db = self.connect_asset_db()
|
|
|
|
def connect_asset_db(self):
|
|
"""Connect to asset management database"""
|
|
import mysql.connector
|
|
return mysql.connector.connect(
|
|
host=self.config['databases']['asset_db']['host'],
|
|
user=self.config['databases']['asset_db']['user'],
|
|
password=self.config['databases']['asset_db']['password'],
|
|
database=self.config['databases']['asset_db']['database']
|
|
)
|
|
|
|
def collect_ups_data(self) -> List[UPSData]:
|
|
"""Collect UPS data via SNMP"""
|
|
ups_devices = self.config['infrastructure']['ups_devices']
|
|
ups_data = []
|
|
|
|
for ups in ups_devices:
|
|
try:
|
|
# Query UPS via SNMP
|
|
iterator = getCmd(
|
|
SnmpEngine(),
|
|
CommunityData(self.config['snmp']['community']),
|
|
UdpTransportTarget((ups['ip'], 161)),
|
|
ContextData(),
|
|
ObjectType(ObjectIdentity('UPS-MIB', 'upsIdentModel', 0)),
|
|
ObjectType(ObjectIdentity('UPS-MIB', 'upsBatteryStatus', 0)),
|
|
)
|
|
|
|
errorIndication, errorStatus, errorIndex, varBinds = next(iterator)
|
|
|
|
if errorIndication:
|
|
logger.error(f"SNMP error for {ups['id']}: {errorIndication}")
|
|
continue
|
|
|
|
# Parse SNMP response
|
|
model = str(varBinds[0][1])
|
|
status = str(varBinds[1][1])
|
|
|
|
ups_data.append(UPSData(
|
|
id=ups['id'],
|
|
model=model,
|
|
power_kva=ups.get('power_kva', 0),
|
|
battery_capacity=ups.get('battery_capacity', 0),
|
|
autonomy_minutes=ups.get('autonomy_minutes', 0),
|
|
status=status,
|
|
last_test=ups.get('last_test', 'N/A')
|
|
))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to collect UPS {ups['id']}: {e}")
|
|
|
|
return ups_data
|
|
|
|
def collect_rack_data(self) -> List[Dict]:
|
|
"""Collect rack inventory from asset database"""
|
|
cursor = self.asset_db.cursor(dictionary=True)
|
|
cursor.execute("""
|
|
SELECT
|
|
rack_id,
|
|
location,
|
|
total_units,
|
|
occupied_units,
|
|
max_power_kw
|
|
FROM racks
|
|
ORDER BY location, rack_id
|
|
""")
|
|
return cursor.fetchall()
|
|
|
|
def collect_environmental_sensors(self) -> List[Dict]:
|
|
"""Collect temperature/humidity sensor data"""
|
|
sensors_api = self.config['infrastructure']['sensors_api']
|
|
response = requests.get(
|
|
f"{sensors_api}/api/sensors/current",
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def collect(self) -> Dict:
|
|
"""Main collection method"""
|
|
return {
|
|
'ups_systems': self.collect_ups_data(),
|
|
'racks': self.collect_rack_data(),
|
|
'environmental': self.collect_environmental_sensors(),
|
|
'cooling': self.collect_cooling_data(),
|
|
'power_distribution': self.collect_pdu_data(),
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
```
|
|
|
|
### 2.2 Network Collector
|
|
```python
|
|
#!/usr/bin/env python3
|
|
"""
|
|
collectors/network.py - Raccolta configurazioni networking
|
|
"""
|
|
|
|
from netmiko import ConnectHandler
|
|
import paramiko
|
|
|
|
class NetworkCollector:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
|
|
def connect_device(self, device_config):
|
|
"""SSH connection to network device"""
|
|
return ConnectHandler(
|
|
device_type=device_config['type'],
|
|
host=device_config['host'],
|
|
username=device_config['username'],
|
|
password=device_config['password'],
|
|
secret=device_config.get('enable_password')
|
|
)
|
|
|
|
def collect_switch_inventory(self) -> List[Dict]:
|
|
"""Collect switch inventory and configuration"""
|
|
switches = []
|
|
|
|
for switch_config in self.config['network']['switches']:
|
|
try:
|
|
connection = self.connect_device(switch_config)
|
|
|
|
# Collect basic info
|
|
version = connection.send_command('show version')
|
|
interfaces = connection.send_command('show interfaces status')
|
|
vlan = connection.send_command('show vlan brief')
|
|
|
|
switches.append({
|
|
'hostname': switch_config['hostname'],
|
|
'version': self.parse_version(version),
|
|
'interfaces': self.parse_interfaces(interfaces),
|
|
'vlans': self.parse_vlans(vlan),
|
|
})
|
|
|
|
connection.disconnect()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to collect {switch_config['hostname']}: {e}")
|
|
|
|
return switches
|
|
|
|
def collect_firewall_rules(self) -> Dict:
|
|
"""Collect firewall configuration"""
|
|
# Implementation depends on firewall vendor
|
|
pass
|
|
|
|
def collect(self) -> Dict:
|
|
"""Main collection method"""
|
|
return {
|
|
'switches': self.collect_switch_inventory(),
|
|
'routers': self.collect_router_data(),
|
|
'firewalls': self.collect_firewall_rules(),
|
|
'vlans': self.collect_vlan_config(),
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
```
|
|
|
|
### 2.3 VMware Collector
|
|
```python
|
|
#!/usr/bin/env python3
|
|
"""
|
|
collectors/virtualization.py - Raccolta dati VMware/Hypervisor
|
|
"""
|
|
|
|
from pyVim.connect import SmartConnect, Disconnect
|
|
from pyVmomi import vim
|
|
import ssl
|
|
|
|
class VirtualizationCollector:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.si = self.connect_vcenter()
|
|
|
|
def connect_vcenter(self):
|
|
"""Connect to vCenter"""
|
|
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
|
|
context.verify_mode = ssl.CERT_NONE
|
|
|
|
return SmartConnect(
|
|
host=self.config['vmware']['vcenter_host'],
|
|
user=self.config['vmware']['username'],
|
|
pwd=self.config['vmware']['password'],
|
|
sslContext=context
|
|
)
|
|
|
|
def collect_vm_inventory(self) -> List[Dict]:
|
|
"""Collect all VMs"""
|
|
content = self.si.RetrieveContent()
|
|
container = content.rootFolder
|
|
viewType = [vim.VirtualMachine]
|
|
recursive = True
|
|
|
|
containerView = content.viewManager.CreateContainerView(
|
|
container, viewType, recursive
|
|
)
|
|
|
|
vms = []
|
|
for vm in containerView.view:
|
|
if vm.config:
|
|
vms.append({
|
|
'name': vm.name,
|
|
'power_state': vm.runtime.powerState,
|
|
'vcpu': vm.config.hardware.numCPU,
|
|
'memory_mb': vm.config.hardware.memoryMB,
|
|
'guest_os': vm.config.guestFullName,
|
|
'host': vm.runtime.host.name if vm.runtime.host else 'N/A',
|
|
'storage_gb': sum(d.capacityInBytes for d in vm.config.hardware.device
|
|
if isinstance(d, vim.vm.device.VirtualDisk)) / 1024**3
|
|
})
|
|
|
|
return vms
|
|
|
|
def collect_host_inventory(self) -> List[Dict]:
|
|
"""Collect ESXi hosts"""
|
|
content = self.si.RetrieveContent()
|
|
hosts = []
|
|
|
|
for datacenter in content.rootFolder.childEntity:
|
|
if hasattr(datacenter, 'hostFolder'):
|
|
for cluster in datacenter.hostFolder.childEntity:
|
|
for host in cluster.host:
|
|
hosts.append({
|
|
'name': host.name,
|
|
'cluster': cluster.name,
|
|
'cpu_cores': host.hardware.cpuInfo.numCpuCores,
|
|
'memory_gb': host.hardware.memorySize / 1024**3,
|
|
'cpu_usage': host.summary.quickStats.overallCpuUsage,
|
|
'memory_usage': host.summary.quickStats.overallMemoryUsage,
|
|
'vms_count': len(host.vm),
|
|
'uptime': host.summary.quickStats.uptime,
|
|
})
|
|
|
|
return hosts
|
|
|
|
def collect(self) -> Dict:
|
|
"""Main collection method"""
|
|
data = {
|
|
'vms': self.collect_vm_inventory(),
|
|
'hosts': self.collect_host_inventory(),
|
|
'datastores': self.collect_datastore_info(),
|
|
'clusters': self.collect_cluster_config(),
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
Disconnect(self.si)
|
|
return data
|
|
```
|
|
|
|
---
|
|
|
|
## 3. Helper Functions
|
|
|
|
### 3.1 SNMP Utilities
|
|
```python
|
|
"""
|
|
utils/snmp_helper.py
|
|
"""
|
|
|
|
from pysnmp.hlapi import *
|
|
|
|
def snmp_get(target, oid, community='public'):
|
|
"""Simple SNMP GET"""
|
|
iterator = getCmd(
|
|
SnmpEngine(),
|
|
CommunityData(community),
|
|
UdpTransportTarget((target, 161)),
|
|
ContextData(),
|
|
ObjectType(ObjectIdentity(oid))
|
|
)
|
|
|
|
errorIndication, errorStatus, errorIndex, varBinds = next(iterator)
|
|
|
|
if errorIndication:
|
|
raise Exception(f"SNMP Error: {errorIndication}")
|
|
|
|
return str(varBinds[0][1])
|
|
|
|
def snmp_walk(target, oid, community='public'):
|
|
"""Simple SNMP WALK"""
|
|
results = []
|
|
|
|
for (errorIndication, errorStatus, errorIndex, varBinds) in nextCmd(
|
|
SnmpEngine(),
|
|
CommunityData(community),
|
|
UdpTransportTarget((target, 161)),
|
|
ContextData(),
|
|
ObjectType(ObjectIdentity(oid)),
|
|
lexicographicMode=False
|
|
):
|
|
if errorIndication:
|
|
break
|
|
|
|
for varBind in varBinds:
|
|
results.append((str(varBind[0]), str(varBind[1])))
|
|
|
|
return results
|
|
```
|
|
|
|
### 3.2 Token Counter
|
|
```python
|
|
"""
|
|
utils/token_counter.py
|
|
"""
|
|
|
|
def count_tokens(text):
|
|
"""
|
|
Stima approssimativa dei token
|
|
1 token ≈ 4 caratteri in inglese
|
|
"""
|
|
return len(text) // 4
|
|
|
|
def count_file_tokens(file_path):
|
|
"""Count tokens in a file"""
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
return count_tokens(content)
|
|
```
|
|
|
|
---
|
|
|
|
## 4. Configuration File Example
|
|
|
|
### 4.1 config.yaml
|
|
```yaml
|
|
# Configuration file for datacenter documentation generator
|
|
|
|
# Database connections
|
|
databases:
|
|
asset_db:
|
|
host: db.company.local
|
|
port: 3306
|
|
user: readonly_user
|
|
password: ${VAULT:asset_db_password}
|
|
database: asset_management
|
|
|
|
# Infrastructure
|
|
infrastructure:
|
|
ups_devices:
|
|
- id: UPS-01
|
|
ip: 10.0.10.10
|
|
power_kva: 100
|
|
- id: UPS-02
|
|
ip: 10.0.10.11
|
|
power_kva: 100
|
|
|
|
sensors_api: http://sensors.company.local
|
|
|
|
# Network devices
|
|
network:
|
|
switches:
|
|
- hostname: core-sw-01
|
|
host: 10.0.10.20
|
|
type: cisco_ios
|
|
username: readonly
|
|
password: ${VAULT:network_password}
|
|
|
|
# VMware
|
|
vmware:
|
|
vcenter_host: vcenter.company.local
|
|
username: automation@vsphere.local
|
|
password: ${VAULT:vmware_password}
|
|
|
|
# SNMP
|
|
snmp:
|
|
community: ${VAULT:snmp_community}
|
|
version: 2c
|
|
|
|
# Output
|
|
output:
|
|
directory: /opt/datacenter-docs/output
|
|
format: markdown
|
|
|
|
# Thresholds
|
|
thresholds:
|
|
cpu_warning: 80
|
|
cpu_critical: 90
|
|
memory_warning: 85
|
|
memory_critical: 95
|
|
```
|
|
|
|
---
|
|
|
|
## 5. Deployment Script
|
|
|
|
### 5.1 deploy.sh
|
|
```bash
|
|
#!/bin/bash
|
|
# Deploy datacenter documentation generator
|
|
|
|
set -e
|
|
|
|
INSTALL_DIR="/opt/datacenter-docs"
|
|
VENV_DIR="$INSTALL_DIR/venv"
|
|
LOG_DIR="/var/log/datacenter-docs"
|
|
|
|
echo "Installing datacenter documentation generator..."
|
|
|
|
# Create directories
|
|
mkdir -p $INSTALL_DIR
|
|
mkdir -p $LOG_DIR
|
|
mkdir -p $INSTALL_DIR/output
|
|
|
|
# Create virtual environment
|
|
python3 -m venv $VENV_DIR
|
|
source $VENV_DIR/bin/activate
|
|
|
|
# Install dependencies
|
|
pip install --upgrade pip
|
|
pip install -r requirements.txt
|
|
|
|
# Copy files
|
|
cp -r collectors $INSTALL_DIR/
|
|
cp -r generators $INSTALL_DIR/
|
|
cp -r validators $INSTALL_DIR/
|
|
cp -r templates $INSTALL_DIR/
|
|
cp main.py $INSTALL_DIR/
|
|
cp config.yaml $INSTALL_DIR/
|
|
|
|
# Set permissions
|
|
chown -R automation:automation $INSTALL_DIR
|
|
chmod +x $INSTALL_DIR/main.py
|
|
|
|
# Install cron job
|
|
cat > /etc/cron.d/datacenter-docs << 'CRON'
|
|
# Datacenter documentation generation
|
|
0 */6 * * * automation /opt/datacenter-docs/venv/bin/python /opt/datacenter-docs/main.py >> /var/log/datacenter-docs/cron.log 2>&1
|
|
CRON
|
|
|
|
echo "✓ Installation complete!"
|
|
echo "Run: cd $INSTALL_DIR && source venv/bin/activate && python main.py --help"
|
|
```
|
|
|
|
---
|
|
|
|
## 6. Testing Framework
|
|
|
|
### 6.1 test_collectors.py
|
|
```python
|
|
#!/usr/bin/env python3
|
|
"""
|
|
tests/test_collectors.py
|
|
"""
|
|
|
|
import unittest
|
|
from unittest.mock import Mock, patch
|
|
from collectors.infrastructure import InfrastructureCollector
|
|
|
|
class TestInfrastructureCollector(unittest.TestCase):
|
|
def setUp(self):
|
|
self.config = {
|
|
'databases': {'asset_db': {...}},
|
|
'snmp': {'community': 'public'}
|
|
}
|
|
self.collector = InfrastructureCollector(self.config)
|
|
|
|
@patch('mysql.connector.connect')
|
|
def test_asset_db_connection(self, mock_connect):
|
|
"""Test database connection"""
|
|
mock_connect.return_value = Mock()
|
|
db = self.collector.connect_asset_db()
|
|
self.assertIsNotNone(db)
|
|
|
|
def test_ups_data_collection(self):
|
|
"""Test UPS data collection"""
|
|
# Mock SNMP responses
|
|
ups_data = self.collector.collect_ups_data()
|
|
self.assertIsInstance(ups_data, list)
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|
|
```
|
|
|
|
---
|
|
|
|
**Documento Versione**: 1.0
|
|
**Per Supporto**: automation-team@company.com
|