# Script di Raccolta Dati per Documentazione Datacenter ## 1. Script Python Principali ### 1.1 Main Orchestrator ```python #!/usr/bin/env python3 """ main.py - Orchestrator principale per generazione documentazione """ import sys import argparse import logging from datetime import datetime from pathlib import Path # Import moduli custom from collectors import ( InfrastructureCollector, NetworkCollector, VirtualizationCollector, StorageCollector, SecurityCollector, BackupCollector, MonitoringCollector, DatabaseCollector, ProcedureCollector, ImprovementAnalyzer ) from generators import DocumentationGenerator from validators import DocumentValidator logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class DatacenterDocGenerator: def __init__(self, config_file='config.yaml'): self.config = self.load_config(config_file) self.sections = [] def load_config(self, config_file): """Load configuration from YAML file""" import yaml with open(config_file, 'r') as f: return yaml.safe_load(f) def collect_data(self, section=None): """Collect data from all sources""" collectors = { '01': InfrastructureCollector(self.config), '02': NetworkCollector(self.config), '03': VirtualizationCollector(self.config), '04': StorageCollector(self.config), '05': SecurityCollector(self.config), '06': BackupCollector(self.config), '07': MonitoringCollector(self.config), '08': DatabaseCollector(self.config), '09': ProcedureCollector(self.config), } data = {} sections_to_process = [section] if section else collectors.keys() for section_id in sections_to_process: try: logger.info(f"Collecting data for section {section_id}") collector = collectors.get(section_id) if collector: data[section_id] = collector.collect() logger.info(f"✓ Section {section_id} data collected") except Exception as e: logger.error(f"✗ Failed to collect section {section_id}: {e}") data[section_id] = None return data def generate_documentation(self, data): """Generate markdown documentation from collected data""" generator = DocumentationGenerator(self.config) for section_id, section_data in data.items(): if section_data: try: logger.info(f"Generating documentation for section {section_id}") output_file = f"output/section_{section_id}.md" generator.generate(section_id, section_data, output_file) # Validate generated document validator = DocumentValidator() if validator.validate(output_file): logger.info(f"✓ Section {section_id} generated and validated") self.sections.append(section_id) else: logger.warning(f"⚠ Section {section_id} validation warnings") except Exception as e: logger.error(f"✗ Failed to generate section {section_id}: {e}") # Generate improvement section based on all other sections if len(self.sections) > 0: logger.info("Analyzing for improvements...") analyzer = ImprovementAnalyzer(self.config) improvements = analyzer.analyze(data) generator.generate('10', improvements, "output/section_10.md") def run(self, section=None, dry_run=False): """Main execution flow""" logger.info("=" * 60) logger.info("Starting Datacenter Documentation Generation") logger.info(f"Timestamp: {datetime.now().isoformat()}") logger.info("=" * 60) try: # Collect data data = self.collect_data(section) if dry_run: logger.info("DRY RUN - Data collection complete, skipping generation") return True # Generate documentation self.generate_documentation(data) logger.info("=" * 60) logger.info(f"✓ Documentation generation completed successfully") logger.info(f"Sections updated: {', '.join(self.sections)}") logger.info("=" * 60) return True except Exception as e: logger.exception(f"Fatal error during documentation generation: {e}") return False def main(): parser = argparse.ArgumentParser(description='Generate Datacenter Documentation') parser.add_argument('--section', help='Generate specific section only (01-10)') parser.add_argument('--dry-run', action='store_true', help='Collect data without generating docs') parser.add_argument('--config', default='config.yaml', help='Configuration file path') parser.add_argument('--debug', action='store_true', help='Enable debug logging') args = parser.parse_args() if args.debug: logging.getLogger().setLevel(logging.DEBUG) generator = DatacenterDocGenerator(args.config) success = generator.run(section=args.section, dry_run=args.dry_run) sys.exit(0 if success else 1) if __name__ == '__main__': main() ``` --- ## 2. Collector Modules ### 2.1 Infrastructure Collector ```python #!/usr/bin/env python3 """ collectors/infrastructure.py - Raccolta dati infrastruttura fisica """ from dataclasses import dataclass from typing import List, Dict import requests from pysnmp.hlapi import * @dataclass class UPSData: id: str model: str power_kva: float battery_capacity: float autonomy_minutes: int status: str last_test: str class InfrastructureCollector: def __init__(self, config): self.config = config self.asset_db = self.connect_asset_db() def connect_asset_db(self): """Connect to asset management database""" import mysql.connector return mysql.connector.connect( host=self.config['databases']['asset_db']['host'], user=self.config['databases']['asset_db']['user'], password=self.config['databases']['asset_db']['password'], database=self.config['databases']['asset_db']['database'] ) def collect_ups_data(self) -> List[UPSData]: """Collect UPS data via SNMP""" ups_devices = self.config['infrastructure']['ups_devices'] ups_data = [] for ups in ups_devices: try: # Query UPS via SNMP iterator = getCmd( SnmpEngine(), CommunityData(self.config['snmp']['community']), UdpTransportTarget((ups['ip'], 161)), ContextData(), ObjectType(ObjectIdentity('UPS-MIB', 'upsIdentModel', 0)), ObjectType(ObjectIdentity('UPS-MIB', 'upsBatteryStatus', 0)), ) errorIndication, errorStatus, errorIndex, varBinds = next(iterator) if errorIndication: logger.error(f"SNMP error for {ups['id']}: {errorIndication}") continue # Parse SNMP response model = str(varBinds[0][1]) status = str(varBinds[1][1]) ups_data.append(UPSData( id=ups['id'], model=model, power_kva=ups.get('power_kva', 0), battery_capacity=ups.get('battery_capacity', 0), autonomy_minutes=ups.get('autonomy_minutes', 0), status=status, last_test=ups.get('last_test', 'N/A') )) except Exception as e: logger.error(f"Failed to collect UPS {ups['id']}: {e}") return ups_data def collect_rack_data(self) -> List[Dict]: """Collect rack inventory from asset database""" cursor = self.asset_db.cursor(dictionary=True) cursor.execute(""" SELECT rack_id, location, total_units, occupied_units, max_power_kw FROM racks ORDER BY location, rack_id """) return cursor.fetchall() def collect_environmental_sensors(self) -> List[Dict]: """Collect temperature/humidity sensor data""" sensors_api = self.config['infrastructure']['sensors_api'] response = requests.get( f"{sensors_api}/api/sensors/current", timeout=10 ) response.raise_for_status() return response.json() def collect(self) -> Dict: """Main collection method""" return { 'ups_systems': self.collect_ups_data(), 'racks': self.collect_rack_data(), 'environmental': self.collect_environmental_sensors(), 'cooling': self.collect_cooling_data(), 'power_distribution': self.collect_pdu_data(), 'timestamp': datetime.now().isoformat() } ``` ### 2.2 Network Collector ```python #!/usr/bin/env python3 """ collectors/network.py - Raccolta configurazioni networking """ from netmiko import ConnectHandler import paramiko class NetworkCollector: def __init__(self, config): self.config = config def connect_device(self, device_config): """SSH connection to network device""" return ConnectHandler( device_type=device_config['type'], host=device_config['host'], username=device_config['username'], password=device_config['password'], secret=device_config.get('enable_password') ) def collect_switch_inventory(self) -> List[Dict]: """Collect switch inventory and configuration""" switches = [] for switch_config in self.config['network']['switches']: try: connection = self.connect_device(switch_config) # Collect basic info version = connection.send_command('show version') interfaces = connection.send_command('show interfaces status') vlan = connection.send_command('show vlan brief') switches.append({ 'hostname': switch_config['hostname'], 'version': self.parse_version(version), 'interfaces': self.parse_interfaces(interfaces), 'vlans': self.parse_vlans(vlan), }) connection.disconnect() except Exception as e: logger.error(f"Failed to collect {switch_config['hostname']}: {e}") return switches def collect_firewall_rules(self) -> Dict: """Collect firewall configuration""" # Implementation depends on firewall vendor pass def collect(self) -> Dict: """Main collection method""" return { 'switches': self.collect_switch_inventory(), 'routers': self.collect_router_data(), 'firewalls': self.collect_firewall_rules(), 'vlans': self.collect_vlan_config(), 'timestamp': datetime.now().isoformat() } ``` ### 2.3 VMware Collector ```python #!/usr/bin/env python3 """ collectors/virtualization.py - Raccolta dati VMware/Hypervisor """ from pyVim.connect import SmartConnect, Disconnect from pyVmomi import vim import ssl class VirtualizationCollector: def __init__(self, config): self.config = config self.si = self.connect_vcenter() def connect_vcenter(self): """Connect to vCenter""" context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) context.verify_mode = ssl.CERT_NONE return SmartConnect( host=self.config['vmware']['vcenter_host'], user=self.config['vmware']['username'], pwd=self.config['vmware']['password'], sslContext=context ) def collect_vm_inventory(self) -> List[Dict]: """Collect all VMs""" content = self.si.RetrieveContent() container = content.rootFolder viewType = [vim.VirtualMachine] recursive = True containerView = content.viewManager.CreateContainerView( container, viewType, recursive ) vms = [] for vm in containerView.view: if vm.config: vms.append({ 'name': vm.name, 'power_state': vm.runtime.powerState, 'vcpu': vm.config.hardware.numCPU, 'memory_mb': vm.config.hardware.memoryMB, 'guest_os': vm.config.guestFullName, 'host': vm.runtime.host.name if vm.runtime.host else 'N/A', 'storage_gb': sum(d.capacityInBytes for d in vm.config.hardware.device if isinstance(d, vim.vm.device.VirtualDisk)) / 1024**3 }) return vms def collect_host_inventory(self) -> List[Dict]: """Collect ESXi hosts""" content = self.si.RetrieveContent() hosts = [] for datacenter in content.rootFolder.childEntity: if hasattr(datacenter, 'hostFolder'): for cluster in datacenter.hostFolder.childEntity: for host in cluster.host: hosts.append({ 'name': host.name, 'cluster': cluster.name, 'cpu_cores': host.hardware.cpuInfo.numCpuCores, 'memory_gb': host.hardware.memorySize / 1024**3, 'cpu_usage': host.summary.quickStats.overallCpuUsage, 'memory_usage': host.summary.quickStats.overallMemoryUsage, 'vms_count': len(host.vm), 'uptime': host.summary.quickStats.uptime, }) return hosts def collect(self) -> Dict: """Main collection method""" data = { 'vms': self.collect_vm_inventory(), 'hosts': self.collect_host_inventory(), 'datastores': self.collect_datastore_info(), 'clusters': self.collect_cluster_config(), 'timestamp': datetime.now().isoformat() } Disconnect(self.si) return data ``` --- ## 3. Helper Functions ### 3.1 SNMP Utilities ```python """ utils/snmp_helper.py """ from pysnmp.hlapi import * def snmp_get(target, oid, community='public'): """Simple SNMP GET""" iterator = getCmd( SnmpEngine(), CommunityData(community), UdpTransportTarget((target, 161)), ContextData(), ObjectType(ObjectIdentity(oid)) ) errorIndication, errorStatus, errorIndex, varBinds = next(iterator) if errorIndication: raise Exception(f"SNMP Error: {errorIndication}") return str(varBinds[0][1]) def snmp_walk(target, oid, community='public'): """Simple SNMP WALK""" results = [] for (errorIndication, errorStatus, errorIndex, varBinds) in nextCmd( SnmpEngine(), CommunityData(community), UdpTransportTarget((target, 161)), ContextData(), ObjectType(ObjectIdentity(oid)), lexicographicMode=False ): if errorIndication: break for varBind in varBinds: results.append((str(varBind[0]), str(varBind[1]))) return results ``` ### 3.2 Token Counter ```python """ utils/token_counter.py """ def count_tokens(text): """ Stima approssimativa dei token 1 token ≈ 4 caratteri in inglese """ return len(text) // 4 def count_file_tokens(file_path): """Count tokens in a file""" with open(file_path, 'r', encoding='utf-8') as f: content = f.read() return count_tokens(content) ``` --- ## 4. Configuration File Example ### 4.1 config.yaml ```yaml # Configuration file for datacenter documentation generator # Database connections databases: asset_db: host: db.company.local port: 3306 user: readonly_user password: ${VAULT:asset_db_password} database: asset_management # Infrastructure infrastructure: ups_devices: - id: UPS-01 ip: 10.0.10.10 power_kva: 100 - id: UPS-02 ip: 10.0.10.11 power_kva: 100 sensors_api: http://sensors.company.local # Network devices network: switches: - hostname: core-sw-01 host: 10.0.10.20 type: cisco_ios username: readonly password: ${VAULT:network_password} # VMware vmware: vcenter_host: vcenter.company.local username: automation@vsphere.local password: ${VAULT:vmware_password} # SNMP snmp: community: ${VAULT:snmp_community} version: 2c # Output output: directory: /opt/datacenter-docs/output format: markdown # Thresholds thresholds: cpu_warning: 80 cpu_critical: 90 memory_warning: 85 memory_critical: 95 ``` --- ## 5. Deployment Script ### 5.1 deploy.sh ```bash #!/bin/bash # Deploy datacenter documentation generator set -e INSTALL_DIR="/opt/datacenter-docs" VENV_DIR="$INSTALL_DIR/venv" LOG_DIR="/var/log/datacenter-docs" echo "Installing datacenter documentation generator..." # Create directories mkdir -p $INSTALL_DIR mkdir -p $LOG_DIR mkdir -p $INSTALL_DIR/output # Create virtual environment python3 -m venv $VENV_DIR source $VENV_DIR/bin/activate # Install dependencies pip install --upgrade pip pip install -r requirements.txt # Copy files cp -r collectors $INSTALL_DIR/ cp -r generators $INSTALL_DIR/ cp -r validators $INSTALL_DIR/ cp -r templates $INSTALL_DIR/ cp main.py $INSTALL_DIR/ cp config.yaml $INSTALL_DIR/ # Set permissions chown -R automation:automation $INSTALL_DIR chmod +x $INSTALL_DIR/main.py # Install cron job cat > /etc/cron.d/datacenter-docs << 'CRON' # Datacenter documentation generation 0 */6 * * * automation /opt/datacenter-docs/venv/bin/python /opt/datacenter-docs/main.py >> /var/log/datacenter-docs/cron.log 2>&1 CRON echo "✓ Installation complete!" echo "Run: cd $INSTALL_DIR && source venv/bin/activate && python main.py --help" ``` --- ## 6. Testing Framework ### 6.1 test_collectors.py ```python #!/usr/bin/env python3 """ tests/test_collectors.py """ import unittest from unittest.mock import Mock, patch from collectors.infrastructure import InfrastructureCollector class TestInfrastructureCollector(unittest.TestCase): def setUp(self): self.config = { 'databases': {'asset_db': {...}}, 'snmp': {'community': 'public'} } self.collector = InfrastructureCollector(self.config) @patch('mysql.connector.connect') def test_asset_db_connection(self, mock_connect): """Test database connection""" mock_connect.return_value = Mock() db = self.collector.connect_asset_db() self.assertIsNotNone(db) def test_ups_data_collection(self): """Test UPS data collection""" # Mock SNMP responses ups_data = self.collector.collect_ups_data() self.assertIsInstance(ups_data, list) if __name__ == '__main__': unittest.main() ``` --- **Documento Versione**: 1.0 **Per Supporto**: automation-team@company.com