Files
llm-automation-docs-and-rem…/requirements/data_collection_scripts.md
LLM Automation System 1ba5ce851d Initial commit: LLM Automation Docs & Remediation Engine v2.0
Features:
- Automated datacenter documentation generation
- MCP integration for device connectivity
- Auto-remediation engine with safety checks
- Multi-factor reliability scoring (0-100%)
- Human feedback learning loop
- Pattern recognition and continuous improvement
- Agentic chat support with AI
- API for ticket resolution
- Frontend React with Material-UI
- CI/CD pipelines (GitLab + Gitea)
- Docker & Kubernetes deployment
- Complete documentation and guides

v2.0 Highlights:
- Auto-remediation with write operations (disabled by default)
- Reliability calculator with 4-factor scoring
- Human feedback system for continuous learning
- Pattern-based progressive automation
- Approval workflow for critical actions
- Full audit trail and rollback capability
2025-10-17 23:47:28 +00:00

664 lines
20 KiB
Markdown

# Script di Raccolta Dati per Documentazione Datacenter
## 1. Script Python Principali
### 1.1 Main Orchestrator
```python
#!/usr/bin/env python3
"""
main.py - Orchestrator principale per generazione documentazione
"""
import sys
import argparse
import logging
from datetime import datetime
from pathlib import Path
# Import moduli custom
from collectors import (
InfrastructureCollector,
NetworkCollector,
VirtualizationCollector,
StorageCollector,
SecurityCollector,
BackupCollector,
MonitoringCollector,
DatabaseCollector,
ProcedureCollector,
ImprovementAnalyzer
)
from generators import DocumentationGenerator
from validators import DocumentValidator
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DatacenterDocGenerator:
def __init__(self, config_file='config.yaml'):
self.config = self.load_config(config_file)
self.sections = []
def load_config(self, config_file):
"""Load configuration from YAML file"""
import yaml
with open(config_file, 'r') as f:
return yaml.safe_load(f)
def collect_data(self, section=None):
"""Collect data from all sources"""
collectors = {
'01': InfrastructureCollector(self.config),
'02': NetworkCollector(self.config),
'03': VirtualizationCollector(self.config),
'04': StorageCollector(self.config),
'05': SecurityCollector(self.config),
'06': BackupCollector(self.config),
'07': MonitoringCollector(self.config),
'08': DatabaseCollector(self.config),
'09': ProcedureCollector(self.config),
}
data = {}
sections_to_process = [section] if section else collectors.keys()
for section_id in sections_to_process:
try:
logger.info(f"Collecting data for section {section_id}")
collector = collectors.get(section_id)
if collector:
data[section_id] = collector.collect()
logger.info(f"✓ Section {section_id} data collected")
except Exception as e:
logger.error(f"✗ Failed to collect section {section_id}: {e}")
data[section_id] = None
return data
def generate_documentation(self, data):
"""Generate markdown documentation from collected data"""
generator = DocumentationGenerator(self.config)
for section_id, section_data in data.items():
if section_data:
try:
logger.info(f"Generating documentation for section {section_id}")
output_file = f"output/section_{section_id}.md"
generator.generate(section_id, section_data, output_file)
# Validate generated document
validator = DocumentValidator()
if validator.validate(output_file):
logger.info(f"✓ Section {section_id} generated and validated")
self.sections.append(section_id)
else:
logger.warning(f"⚠ Section {section_id} validation warnings")
except Exception as e:
logger.error(f"✗ Failed to generate section {section_id}: {e}")
# Generate improvement section based on all other sections
if len(self.sections) > 0:
logger.info("Analyzing for improvements...")
analyzer = ImprovementAnalyzer(self.config)
improvements = analyzer.analyze(data)
generator.generate('10', improvements, "output/section_10.md")
def run(self, section=None, dry_run=False):
"""Main execution flow"""
logger.info("=" * 60)
logger.info("Starting Datacenter Documentation Generation")
logger.info(f"Timestamp: {datetime.now().isoformat()}")
logger.info("=" * 60)
try:
# Collect data
data = self.collect_data(section)
if dry_run:
logger.info("DRY RUN - Data collection complete, skipping generation")
return True
# Generate documentation
self.generate_documentation(data)
logger.info("=" * 60)
logger.info(f"✓ Documentation generation completed successfully")
logger.info(f"Sections updated: {', '.join(self.sections)}")
logger.info("=" * 60)
return True
except Exception as e:
logger.exception(f"Fatal error during documentation generation: {e}")
return False
def main():
parser = argparse.ArgumentParser(description='Generate Datacenter Documentation')
parser.add_argument('--section', help='Generate specific section only (01-10)')
parser.add_argument('--dry-run', action='store_true', help='Collect data without generating docs')
parser.add_argument('--config', default='config.yaml', help='Configuration file path')
parser.add_argument('--debug', action='store_true', help='Enable debug logging')
args = parser.parse_args()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
generator = DatacenterDocGenerator(args.config)
success = generator.run(section=args.section, dry_run=args.dry_run)
sys.exit(0 if success else 1)
if __name__ == '__main__':
main()
```
---
## 2. Collector Modules
### 2.1 Infrastructure Collector
```python
#!/usr/bin/env python3
"""
collectors/infrastructure.py - Raccolta dati infrastruttura fisica
"""
from dataclasses import dataclass
from typing import List, Dict
import requests
from pysnmp.hlapi import *
@dataclass
class UPSData:
id: str
model: str
power_kva: float
battery_capacity: float
autonomy_minutes: int
status: str
last_test: str
class InfrastructureCollector:
def __init__(self, config):
self.config = config
self.asset_db = self.connect_asset_db()
def connect_asset_db(self):
"""Connect to asset management database"""
import mysql.connector
return mysql.connector.connect(
host=self.config['databases']['asset_db']['host'],
user=self.config['databases']['asset_db']['user'],
password=self.config['databases']['asset_db']['password'],
database=self.config['databases']['asset_db']['database']
)
def collect_ups_data(self) -> List[UPSData]:
"""Collect UPS data via SNMP"""
ups_devices = self.config['infrastructure']['ups_devices']
ups_data = []
for ups in ups_devices:
try:
# Query UPS via SNMP
iterator = getCmd(
SnmpEngine(),
CommunityData(self.config['snmp']['community']),
UdpTransportTarget((ups['ip'], 161)),
ContextData(),
ObjectType(ObjectIdentity('UPS-MIB', 'upsIdentModel', 0)),
ObjectType(ObjectIdentity('UPS-MIB', 'upsBatteryStatus', 0)),
)
errorIndication, errorStatus, errorIndex, varBinds = next(iterator)
if errorIndication:
logger.error(f"SNMP error for {ups['id']}: {errorIndication}")
continue
# Parse SNMP response
model = str(varBinds[0][1])
status = str(varBinds[1][1])
ups_data.append(UPSData(
id=ups['id'],
model=model,
power_kva=ups.get('power_kva', 0),
battery_capacity=ups.get('battery_capacity', 0),
autonomy_minutes=ups.get('autonomy_minutes', 0),
status=status,
last_test=ups.get('last_test', 'N/A')
))
except Exception as e:
logger.error(f"Failed to collect UPS {ups['id']}: {e}")
return ups_data
def collect_rack_data(self) -> List[Dict]:
"""Collect rack inventory from asset database"""
cursor = self.asset_db.cursor(dictionary=True)
cursor.execute("""
SELECT
rack_id,
location,
total_units,
occupied_units,
max_power_kw
FROM racks
ORDER BY location, rack_id
""")
return cursor.fetchall()
def collect_environmental_sensors(self) -> List[Dict]:
"""Collect temperature/humidity sensor data"""
sensors_api = self.config['infrastructure']['sensors_api']
response = requests.get(
f"{sensors_api}/api/sensors/current",
timeout=10
)
response.raise_for_status()
return response.json()
def collect(self) -> Dict:
"""Main collection method"""
return {
'ups_systems': self.collect_ups_data(),
'racks': self.collect_rack_data(),
'environmental': self.collect_environmental_sensors(),
'cooling': self.collect_cooling_data(),
'power_distribution': self.collect_pdu_data(),
'timestamp': datetime.now().isoformat()
}
```
### 2.2 Network Collector
```python
#!/usr/bin/env python3
"""
collectors/network.py - Raccolta configurazioni networking
"""
from netmiko import ConnectHandler
import paramiko
class NetworkCollector:
def __init__(self, config):
self.config = config
def connect_device(self, device_config):
"""SSH connection to network device"""
return ConnectHandler(
device_type=device_config['type'],
host=device_config['host'],
username=device_config['username'],
password=device_config['password'],
secret=device_config.get('enable_password')
)
def collect_switch_inventory(self) -> List[Dict]:
"""Collect switch inventory and configuration"""
switches = []
for switch_config in self.config['network']['switches']:
try:
connection = self.connect_device(switch_config)
# Collect basic info
version = connection.send_command('show version')
interfaces = connection.send_command('show interfaces status')
vlan = connection.send_command('show vlan brief')
switches.append({
'hostname': switch_config['hostname'],
'version': self.parse_version(version),
'interfaces': self.parse_interfaces(interfaces),
'vlans': self.parse_vlans(vlan),
})
connection.disconnect()
except Exception as e:
logger.error(f"Failed to collect {switch_config['hostname']}: {e}")
return switches
def collect_firewall_rules(self) -> Dict:
"""Collect firewall configuration"""
# Implementation depends on firewall vendor
pass
def collect(self) -> Dict:
"""Main collection method"""
return {
'switches': self.collect_switch_inventory(),
'routers': self.collect_router_data(),
'firewalls': self.collect_firewall_rules(),
'vlans': self.collect_vlan_config(),
'timestamp': datetime.now().isoformat()
}
```
### 2.3 VMware Collector
```python
#!/usr/bin/env python3
"""
collectors/virtualization.py - Raccolta dati VMware/Hypervisor
"""
from pyVim.connect import SmartConnect, Disconnect
from pyVmomi import vim
import ssl
class VirtualizationCollector:
def __init__(self, config):
self.config = config
self.si = self.connect_vcenter()
def connect_vcenter(self):
"""Connect to vCenter"""
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.verify_mode = ssl.CERT_NONE
return SmartConnect(
host=self.config['vmware']['vcenter_host'],
user=self.config['vmware']['username'],
pwd=self.config['vmware']['password'],
sslContext=context
)
def collect_vm_inventory(self) -> List[Dict]:
"""Collect all VMs"""
content = self.si.RetrieveContent()
container = content.rootFolder
viewType = [vim.VirtualMachine]
recursive = True
containerView = content.viewManager.CreateContainerView(
container, viewType, recursive
)
vms = []
for vm in containerView.view:
if vm.config:
vms.append({
'name': vm.name,
'power_state': vm.runtime.powerState,
'vcpu': vm.config.hardware.numCPU,
'memory_mb': vm.config.hardware.memoryMB,
'guest_os': vm.config.guestFullName,
'host': vm.runtime.host.name if vm.runtime.host else 'N/A',
'storage_gb': sum(d.capacityInBytes for d in vm.config.hardware.device
if isinstance(d, vim.vm.device.VirtualDisk)) / 1024**3
})
return vms
def collect_host_inventory(self) -> List[Dict]:
"""Collect ESXi hosts"""
content = self.si.RetrieveContent()
hosts = []
for datacenter in content.rootFolder.childEntity:
if hasattr(datacenter, 'hostFolder'):
for cluster in datacenter.hostFolder.childEntity:
for host in cluster.host:
hosts.append({
'name': host.name,
'cluster': cluster.name,
'cpu_cores': host.hardware.cpuInfo.numCpuCores,
'memory_gb': host.hardware.memorySize / 1024**3,
'cpu_usage': host.summary.quickStats.overallCpuUsage,
'memory_usage': host.summary.quickStats.overallMemoryUsage,
'vms_count': len(host.vm),
'uptime': host.summary.quickStats.uptime,
})
return hosts
def collect(self) -> Dict:
"""Main collection method"""
data = {
'vms': self.collect_vm_inventory(),
'hosts': self.collect_host_inventory(),
'datastores': self.collect_datastore_info(),
'clusters': self.collect_cluster_config(),
'timestamp': datetime.now().isoformat()
}
Disconnect(self.si)
return data
```
---
## 3. Helper Functions
### 3.1 SNMP Utilities
```python
"""
utils/snmp_helper.py
"""
from pysnmp.hlapi import *
def snmp_get(target, oid, community='public'):
"""Simple SNMP GET"""
iterator = getCmd(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((target, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid))
)
errorIndication, errorStatus, errorIndex, varBinds = next(iterator)
if errorIndication:
raise Exception(f"SNMP Error: {errorIndication}")
return str(varBinds[0][1])
def snmp_walk(target, oid, community='public'):
"""Simple SNMP WALK"""
results = []
for (errorIndication, errorStatus, errorIndex, varBinds) in nextCmd(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((target, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid)),
lexicographicMode=False
):
if errorIndication:
break
for varBind in varBinds:
results.append((str(varBind[0]), str(varBind[1])))
return results
```
### 3.2 Token Counter
```python
"""
utils/token_counter.py
"""
def count_tokens(text):
"""
Stima approssimativa dei token
1 token ≈ 4 caratteri in inglese
"""
return len(text) // 4
def count_file_tokens(file_path):
"""Count tokens in a file"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return count_tokens(content)
```
---
## 4. Configuration File Example
### 4.1 config.yaml
```yaml
# Configuration file for datacenter documentation generator
# Database connections
databases:
asset_db:
host: db.company.local
port: 3306
user: readonly_user
password: ${VAULT:asset_db_password}
database: asset_management
# Infrastructure
infrastructure:
ups_devices:
- id: UPS-01
ip: 10.0.10.10
power_kva: 100
- id: UPS-02
ip: 10.0.10.11
power_kva: 100
sensors_api: http://sensors.company.local
# Network devices
network:
switches:
- hostname: core-sw-01
host: 10.0.10.20
type: cisco_ios
username: readonly
password: ${VAULT:network_password}
# VMware
vmware:
vcenter_host: vcenter.company.local
username: automation@vsphere.local
password: ${VAULT:vmware_password}
# SNMP
snmp:
community: ${VAULT:snmp_community}
version: 2c
# Output
output:
directory: /opt/datacenter-docs/output
format: markdown
# Thresholds
thresholds:
cpu_warning: 80
cpu_critical: 90
memory_warning: 85
memory_critical: 95
```
---
## 5. Deployment Script
### 5.1 deploy.sh
```bash
#!/bin/bash
# Deploy datacenter documentation generator
set -e
INSTALL_DIR="/opt/datacenter-docs"
VENV_DIR="$INSTALL_DIR/venv"
LOG_DIR="/var/log/datacenter-docs"
echo "Installing datacenter documentation generator..."
# Create directories
mkdir -p $INSTALL_DIR
mkdir -p $LOG_DIR
mkdir -p $INSTALL_DIR/output
# Create virtual environment
python3 -m venv $VENV_DIR
source $VENV_DIR/bin/activate
# Install dependencies
pip install --upgrade pip
pip install -r requirements.txt
# Copy files
cp -r collectors $INSTALL_DIR/
cp -r generators $INSTALL_DIR/
cp -r validators $INSTALL_DIR/
cp -r templates $INSTALL_DIR/
cp main.py $INSTALL_DIR/
cp config.yaml $INSTALL_DIR/
# Set permissions
chown -R automation:automation $INSTALL_DIR
chmod +x $INSTALL_DIR/main.py
# Install cron job
cat > /etc/cron.d/datacenter-docs << 'CRON'
# Datacenter documentation generation
0 */6 * * * automation /opt/datacenter-docs/venv/bin/python /opt/datacenter-docs/main.py >> /var/log/datacenter-docs/cron.log 2>&1
CRON
echo "✓ Installation complete!"
echo "Run: cd $INSTALL_DIR && source venv/bin/activate && python main.py --help"
```
---
## 6. Testing Framework
### 6.1 test_collectors.py
```python
#!/usr/bin/env python3
"""
tests/test_collectors.py
"""
import unittest
from unittest.mock import Mock, patch
from collectors.infrastructure import InfrastructureCollector
class TestInfrastructureCollector(unittest.TestCase):
def setUp(self):
self.config = {
'databases': {'asset_db': {...}},
'snmp': {'community': 'public'}
}
self.collector = InfrastructureCollector(self.config)
@patch('mysql.connector.connect')
def test_asset_db_connection(self, mock_connect):
"""Test database connection"""
mock_connect.return_value = Mock()
db = self.collector.connect_asset_db()
self.assertIsNotNone(db)
def test_ups_data_collection(self):
"""Test UPS data collection"""
# Mock SNMP responses
ups_data = self.collector.collect_ups_data()
self.assertIsInstance(ups_data, list)
if __name__ == '__main__':
unittest.main()
```
---
**Documento Versione**: 1.0
**Per Supporto**: automation-team@company.com