Files
llm-automation-docs-and-rem…/requirements/data_collection_scripts.md
LLM Automation System 1ba5ce851d Initial commit: LLM Automation Docs & Remediation Engine v2.0
Features:
- Automated datacenter documentation generation
- MCP integration for device connectivity
- Auto-remediation engine with safety checks
- Multi-factor reliability scoring (0-100%)
- Human feedback learning loop
- Pattern recognition and continuous improvement
- Agentic chat support with AI
- API for ticket resolution
- Frontend React with Material-UI
- CI/CD pipelines (GitLab + Gitea)
- Docker & Kubernetes deployment
- Complete documentation and guides

v2.0 Highlights:
- Auto-remediation with write operations (disabled by default)
- Reliability calculator with 4-factor scoring
- Human feedback system for continuous learning
- Pattern-based progressive automation
- Approval workflow for critical actions
- Full audit trail and rollback capability
2025-10-17 23:47:28 +00:00

20 KiB

Script di Raccolta Dati per Documentazione Datacenter

1. Script Python Principali

1.1 Main Orchestrator

#!/usr/bin/env python3
"""
main.py - Orchestrator principale per generazione documentazione
"""

import sys
import argparse
import logging
from datetime import datetime
from pathlib import Path

# Import moduli custom
from collectors import (
    InfrastructureCollector,
    NetworkCollector,
    VirtualizationCollector,
    StorageCollector,
    SecurityCollector,
    BackupCollector,
    MonitoringCollector,
    DatabaseCollector,
    ProcedureCollector,
    ImprovementAnalyzer
)

from generators import DocumentationGenerator
from validators import DocumentValidator

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class DatacenterDocGenerator:
    def __init__(self, config_file='config.yaml'):
        self.config = self.load_config(config_file)
        self.sections = []
        
    def load_config(self, config_file):
        """Load configuration from YAML file"""
        import yaml
        with open(config_file, 'r') as f:
            return yaml.safe_load(f)
    
    def collect_data(self, section=None):
        """Collect data from all sources"""
        collectors = {
            '01': InfrastructureCollector(self.config),
            '02': NetworkCollector(self.config),
            '03': VirtualizationCollector(self.config),
            '04': StorageCollector(self.config),
            '05': SecurityCollector(self.config),
            '06': BackupCollector(self.config),
            '07': MonitoringCollector(self.config),
            '08': DatabaseCollector(self.config),
            '09': ProcedureCollector(self.config),
        }
        
        data = {}
        sections_to_process = [section] if section else collectors.keys()
        
        for section_id in sections_to_process:
            try:
                logger.info(f"Collecting data for section {section_id}")
                collector = collectors.get(section_id)
                if collector:
                    data[section_id] = collector.collect()
                    logger.info(f"✓ Section {section_id} data collected")
            except Exception as e:
                logger.error(f"✗ Failed to collect section {section_id}: {e}")
                data[section_id] = None
                
        return data
    
    def generate_documentation(self, data):
        """Generate markdown documentation from collected data"""
        generator = DocumentationGenerator(self.config)
        
        for section_id, section_data in data.items():
            if section_data:
                try:
                    logger.info(f"Generating documentation for section {section_id}")
                    output_file = f"output/section_{section_id}.md"
                    generator.generate(section_id, section_data, output_file)
                    
                    # Validate generated document
                    validator = DocumentValidator()
                    if validator.validate(output_file):
                        logger.info(f"✓ Section {section_id} generated and validated")
                        self.sections.append(section_id)
                    else:
                        logger.warning(f"⚠ Section {section_id} validation warnings")
                        
                except Exception as e:
                    logger.error(f"✗ Failed to generate section {section_id}: {e}")
        
        # Generate improvement section based on all other sections
        if len(self.sections) > 0:
            logger.info("Analyzing for improvements...")
            analyzer = ImprovementAnalyzer(self.config)
            improvements = analyzer.analyze(data)
            generator.generate('10', improvements, "output/section_10.md")
    
    def run(self, section=None, dry_run=False):
        """Main execution flow"""
        logger.info("=" * 60)
        logger.info("Starting Datacenter Documentation Generation")
        logger.info(f"Timestamp: {datetime.now().isoformat()}")
        logger.info("=" * 60)
        
        try:
            # Collect data
            data = self.collect_data(section)
            
            if dry_run:
                logger.info("DRY RUN - Data collection complete, skipping generation")
                return True
            
            # Generate documentation
            self.generate_documentation(data)
            
            logger.info("=" * 60)
            logger.info(f"✓ Documentation generation completed successfully")
            logger.info(f"Sections updated: {', '.join(self.sections)}")
            logger.info("=" * 60)
            
            return True
            
        except Exception as e:
            logger.exception(f"Fatal error during documentation generation: {e}")
            return False

def main():
    parser = argparse.ArgumentParser(description='Generate Datacenter Documentation')
    parser.add_argument('--section', help='Generate specific section only (01-10)')
    parser.add_argument('--dry-run', action='store_true', help='Collect data without generating docs')
    parser.add_argument('--config', default='config.yaml', help='Configuration file path')
    parser.add_argument('--debug', action='store_true', help='Enable debug logging')
    
    args = parser.parse_args()
    
    if args.debug:
        logging.getLogger().setLevel(logging.DEBUG)
    
    generator = DatacenterDocGenerator(args.config)
    success = generator.run(section=args.section, dry_run=args.dry_run)
    
    sys.exit(0 if success else 1)

if __name__ == '__main__':
    main()

2. Collector Modules

2.1 Infrastructure Collector

#!/usr/bin/env python3
"""
collectors/infrastructure.py - Raccolta dati infrastruttura fisica
"""

from dataclasses import dataclass
from typing import List, Dict
import requests
from pysnmp.hlapi import *

@dataclass
class UPSData:
    id: str
    model: str
    power_kva: float
    battery_capacity: float
    autonomy_minutes: int
    status: str
    last_test: str

class InfrastructureCollector:
    def __init__(self, config):
        self.config = config
        self.asset_db = self.connect_asset_db()
    
    def connect_asset_db(self):
        """Connect to asset management database"""
        import mysql.connector
        return mysql.connector.connect(
            host=self.config['databases']['asset_db']['host'],
            user=self.config['databases']['asset_db']['user'],
            password=self.config['databases']['asset_db']['password'],
            database=self.config['databases']['asset_db']['database']
        )
    
    def collect_ups_data(self) -> List[UPSData]:
        """Collect UPS data via SNMP"""
        ups_devices = self.config['infrastructure']['ups_devices']
        ups_data = []
        
        for ups in ups_devices:
            try:
                # Query UPS via SNMP
                iterator = getCmd(
                    SnmpEngine(),
                    CommunityData(self.config['snmp']['community']),
                    UdpTransportTarget((ups['ip'], 161)),
                    ContextData(),
                    ObjectType(ObjectIdentity('UPS-MIB', 'upsIdentModel', 0)),
                    ObjectType(ObjectIdentity('UPS-MIB', 'upsBatteryStatus', 0)),
                )
                
                errorIndication, errorStatus, errorIndex, varBinds = next(iterator)
                
                if errorIndication:
                    logger.error(f"SNMP error for {ups['id']}: {errorIndication}")
                    continue
                
                # Parse SNMP response
                model = str(varBinds[0][1])
                status = str(varBinds[1][1])
                
                ups_data.append(UPSData(
                    id=ups['id'],
                    model=model,
                    power_kva=ups.get('power_kva', 0),
                    battery_capacity=ups.get('battery_capacity', 0),
                    autonomy_minutes=ups.get('autonomy_minutes', 0),
                    status=status,
                    last_test=ups.get('last_test', 'N/A')
                ))
                
            except Exception as e:
                logger.error(f"Failed to collect UPS {ups['id']}: {e}")
                
        return ups_data
    
    def collect_rack_data(self) -> List[Dict]:
        """Collect rack inventory from asset database"""
        cursor = self.asset_db.cursor(dictionary=True)
        cursor.execute("""
            SELECT 
                rack_id,
                location,
                total_units,
                occupied_units,
                max_power_kw
            FROM racks
            ORDER BY location, rack_id
        """)
        return cursor.fetchall()
    
    def collect_environmental_sensors(self) -> List[Dict]:
        """Collect temperature/humidity sensor data"""
        sensors_api = self.config['infrastructure']['sensors_api']
        response = requests.get(
            f"{sensors_api}/api/sensors/current",
            timeout=10
        )
        response.raise_for_status()
        return response.json()
    
    def collect(self) -> Dict:
        """Main collection method"""
        return {
            'ups_systems': self.collect_ups_data(),
            'racks': self.collect_rack_data(),
            'environmental': self.collect_environmental_sensors(),
            'cooling': self.collect_cooling_data(),
            'power_distribution': self.collect_pdu_data(),
            'timestamp': datetime.now().isoformat()
        }

2.2 Network Collector

#!/usr/bin/env python3
"""
collectors/network.py - Raccolta configurazioni networking
"""

from netmiko import ConnectHandler
import paramiko

class NetworkCollector:
    def __init__(self, config):
        self.config = config
    
    def connect_device(self, device_config):
        """SSH connection to network device"""
        return ConnectHandler(
            device_type=device_config['type'],
            host=device_config['host'],
            username=device_config['username'],
            password=device_config['password'],
            secret=device_config.get('enable_password')
        )
    
    def collect_switch_inventory(self) -> List[Dict]:
        """Collect switch inventory and configuration"""
        switches = []
        
        for switch_config in self.config['network']['switches']:
            try:
                connection = self.connect_device(switch_config)
                
                # Collect basic info
                version = connection.send_command('show version')
                interfaces = connection.send_command('show interfaces status')
                vlan = connection.send_command('show vlan brief')
                
                switches.append({
                    'hostname': switch_config['hostname'],
                    'version': self.parse_version(version),
                    'interfaces': self.parse_interfaces(interfaces),
                    'vlans': self.parse_vlans(vlan),
                })
                
                connection.disconnect()
                
            except Exception as e:
                logger.error(f"Failed to collect {switch_config['hostname']}: {e}")
                
        return switches
    
    def collect_firewall_rules(self) -> Dict:
        """Collect firewall configuration"""
        # Implementation depends on firewall vendor
        pass
    
    def collect(self) -> Dict:
        """Main collection method"""
        return {
            'switches': self.collect_switch_inventory(),
            'routers': self.collect_router_data(),
            'firewalls': self.collect_firewall_rules(),
            'vlans': self.collect_vlan_config(),
            'timestamp': datetime.now().isoformat()
        }

2.3 VMware Collector

#!/usr/bin/env python3
"""
collectors/virtualization.py - Raccolta dati VMware/Hypervisor
"""

from pyVim.connect import SmartConnect, Disconnect
from pyVmomi import vim
import ssl

class VirtualizationCollector:
    def __init__(self, config):
        self.config = config
        self.si = self.connect_vcenter()
    
    def connect_vcenter(self):
        """Connect to vCenter"""
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
        context.verify_mode = ssl.CERT_NONE
        
        return SmartConnect(
            host=self.config['vmware']['vcenter_host'],
            user=self.config['vmware']['username'],
            pwd=self.config['vmware']['password'],
            sslContext=context
        )
    
    def collect_vm_inventory(self) -> List[Dict]:
        """Collect all VMs"""
        content = self.si.RetrieveContent()
        container = content.rootFolder
        viewType = [vim.VirtualMachine]
        recursive = True
        
        containerView = content.viewManager.CreateContainerView(
            container, viewType, recursive
        )
        
        vms = []
        for vm in containerView.view:
            if vm.config:
                vms.append({
                    'name': vm.name,
                    'power_state': vm.runtime.powerState,
                    'vcpu': vm.config.hardware.numCPU,
                    'memory_mb': vm.config.hardware.memoryMB,
                    'guest_os': vm.config.guestFullName,
                    'host': vm.runtime.host.name if vm.runtime.host else 'N/A',
                    'storage_gb': sum(d.capacityInBytes for d in vm.config.hardware.device 
                                     if isinstance(d, vim.vm.device.VirtualDisk)) / 1024**3
                })
        
        return vms
    
    def collect_host_inventory(self) -> List[Dict]:
        """Collect ESXi hosts"""
        content = self.si.RetrieveContent()
        hosts = []
        
        for datacenter in content.rootFolder.childEntity:
            if hasattr(datacenter, 'hostFolder'):
                for cluster in datacenter.hostFolder.childEntity:
                    for host in cluster.host:
                        hosts.append({
                            'name': host.name,
                            'cluster': cluster.name,
                            'cpu_cores': host.hardware.cpuInfo.numCpuCores,
                            'memory_gb': host.hardware.memorySize / 1024**3,
                            'cpu_usage': host.summary.quickStats.overallCpuUsage,
                            'memory_usage': host.summary.quickStats.overallMemoryUsage,
                            'vms_count': len(host.vm),
                            'uptime': host.summary.quickStats.uptime,
                        })
        
        return hosts
    
    def collect(self) -> Dict:
        """Main collection method"""
        data = {
            'vms': self.collect_vm_inventory(),
            'hosts': self.collect_host_inventory(),
            'datastores': self.collect_datastore_info(),
            'clusters': self.collect_cluster_config(),
            'timestamp': datetime.now().isoformat()
        }
        
        Disconnect(self.si)
        return data

3. Helper Functions

3.1 SNMP Utilities

"""
utils/snmp_helper.py
"""

from pysnmp.hlapi import *

def snmp_get(target, oid, community='public'):
    """Simple SNMP GET"""
    iterator = getCmd(
        SnmpEngine(),
        CommunityData(community),
        UdpTransportTarget((target, 161)),
        ContextData(),
        ObjectType(ObjectIdentity(oid))
    )
    
    errorIndication, errorStatus, errorIndex, varBinds = next(iterator)
    
    if errorIndication:
        raise Exception(f"SNMP Error: {errorIndication}")
    
    return str(varBinds[0][1])

def snmp_walk(target, oid, community='public'):
    """Simple SNMP WALK"""
    results = []
    
    for (errorIndication, errorStatus, errorIndex, varBinds) in nextCmd(
        SnmpEngine(),
        CommunityData(community),
        UdpTransportTarget((target, 161)),
        ContextData(),
        ObjectType(ObjectIdentity(oid)),
        lexicographicMode=False
    ):
        if errorIndication:
            break
        
        for varBind in varBinds:
            results.append((str(varBind[0]), str(varBind[1])))
    
    return results

3.2 Token Counter

"""
utils/token_counter.py
"""

def count_tokens(text):
    """
    Stima approssimativa dei token
    1 token ≈ 4 caratteri in inglese
    """
    return len(text) // 4

def count_file_tokens(file_path):
    """Count tokens in a file"""
    with open(file_path, 'r', encoding='utf-8') as f:
        content = f.read()
    return count_tokens(content)

4. Configuration File Example

4.1 config.yaml

# Configuration file for datacenter documentation generator

# Database connections
databases:
  asset_db:
    host: db.company.local
    port: 3306
    user: readonly_user
    password: ${VAULT:asset_db_password}
    database: asset_management

# Infrastructure
infrastructure:
  ups_devices:
    - id: UPS-01
      ip: 10.0.10.10
      power_kva: 100
    - id: UPS-02
      ip: 10.0.10.11
      power_kva: 100
  
  sensors_api: http://sensors.company.local

# Network devices
network:
  switches:
    - hostname: core-sw-01
      host: 10.0.10.20
      type: cisco_ios
      username: readonly
      password: ${VAULT:network_password}

# VMware
vmware:
  vcenter_host: vcenter.company.local
  username: automation@vsphere.local
  password: ${VAULT:vmware_password}

# SNMP
snmp:
  community: ${VAULT:snmp_community}
  version: 2c

# Output
output:
  directory: /opt/datacenter-docs/output
  format: markdown
  
# Thresholds
thresholds:
  cpu_warning: 80
  cpu_critical: 90
  memory_warning: 85
  memory_critical: 95

5. Deployment Script

5.1 deploy.sh

#!/bin/bash
# Deploy datacenter documentation generator

set -e

INSTALL_DIR="/opt/datacenter-docs"
VENV_DIR="$INSTALL_DIR/venv"
LOG_DIR="/var/log/datacenter-docs"

echo "Installing datacenter documentation generator..."

# Create directories
mkdir -p $INSTALL_DIR
mkdir -p $LOG_DIR
mkdir -p $INSTALL_DIR/output

# Create virtual environment
python3 -m venv $VENV_DIR
source $VENV_DIR/bin/activate

# Install dependencies
pip install --upgrade pip
pip install -r requirements.txt

# Copy files
cp -r collectors $INSTALL_DIR/
cp -r generators $INSTALL_DIR/
cp -r validators $INSTALL_DIR/
cp -r templates $INSTALL_DIR/
cp main.py $INSTALL_DIR/
cp config.yaml $INSTALL_DIR/

# Set permissions
chown -R automation:automation $INSTALL_DIR
chmod +x $INSTALL_DIR/main.py

# Install cron job
cat > /etc/cron.d/datacenter-docs << 'CRON'
# Datacenter documentation generation
0 */6 * * * automation /opt/datacenter-docs/venv/bin/python /opt/datacenter-docs/main.py >> /var/log/datacenter-docs/cron.log 2>&1
CRON

echo "✓ Installation complete!"
echo "Run: cd $INSTALL_DIR && source venv/bin/activate && python main.py --help"

6. Testing Framework

6.1 test_collectors.py

#!/usr/bin/env python3
"""
tests/test_collectors.py
"""

import unittest
from unittest.mock import Mock, patch
from collectors.infrastructure import InfrastructureCollector

class TestInfrastructureCollector(unittest.TestCase):
    def setUp(self):
        self.config = {
            'databases': {'asset_db': {...}},
            'snmp': {'community': 'public'}
        }
        self.collector = InfrastructureCollector(self.config)
    
    @patch('mysql.connector.connect')
    def test_asset_db_connection(self, mock_connect):
        """Test database connection"""
        mock_connect.return_value = Mock()
        db = self.collector.connect_asset_db()
        self.assertIsNotNone(db)
    
    def test_ups_data_collection(self):
        """Test UPS data collection"""
        # Mock SNMP responses
        ups_data = self.collector.collect_ups_data()
        self.assertIsInstance(ups_data, list)

if __name__ == '__main__':
    unittest.main()

Documento Versione: 1.0
Per Supporto: automation-team@company.com