Initial commit: LLM Automation Docs & Remediation Engine v2.0
Features: - Automated datacenter documentation generation - MCP integration for device connectivity - Auto-remediation engine with safety checks - Multi-factor reliability scoring (0-100%) - Human feedback learning loop - Pattern recognition and continuous improvement - Agentic chat support with AI - API for ticket resolution - Frontend React with Material-UI - CI/CD pipelines (GitLab + Gitea) - Docker & Kubernetes deployment - Complete documentation and guides v2.0 Highlights: - Auto-remediation with write operations (disabled by default) - Reliability calculator with 4-factor scoring - Human feedback system for continuous learning - Pattern-based progressive automation - Approval workflow for critical actions - Full audit trail and rollback capability
This commit is contained in:
663
requirements/data_collection_scripts.md
Normal file
663
requirements/data_collection_scripts.md
Normal file
@@ -0,0 +1,663 @@
|
||||
# Script di Raccolta Dati per Documentazione Datacenter
|
||||
|
||||
## 1. Script Python Principali
|
||||
|
||||
### 1.1 Main Orchestrator
|
||||
```python
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
main.py - Orchestrator principale per generazione documentazione
|
||||
"""
|
||||
|
||||
import sys
|
||||
import argparse
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
# Import moduli custom
|
||||
from collectors import (
|
||||
InfrastructureCollector,
|
||||
NetworkCollector,
|
||||
VirtualizationCollector,
|
||||
StorageCollector,
|
||||
SecurityCollector,
|
||||
BackupCollector,
|
||||
MonitoringCollector,
|
||||
DatabaseCollector,
|
||||
ProcedureCollector,
|
||||
ImprovementAnalyzer
|
||||
)
|
||||
|
||||
from generators import DocumentationGenerator
|
||||
from validators import DocumentValidator
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class DatacenterDocGenerator:
|
||||
def __init__(self, config_file='config.yaml'):
|
||||
self.config = self.load_config(config_file)
|
||||
self.sections = []
|
||||
|
||||
def load_config(self, config_file):
|
||||
"""Load configuration from YAML file"""
|
||||
import yaml
|
||||
with open(config_file, 'r') as f:
|
||||
return yaml.safe_load(f)
|
||||
|
||||
def collect_data(self, section=None):
|
||||
"""Collect data from all sources"""
|
||||
collectors = {
|
||||
'01': InfrastructureCollector(self.config),
|
||||
'02': NetworkCollector(self.config),
|
||||
'03': VirtualizationCollector(self.config),
|
||||
'04': StorageCollector(self.config),
|
||||
'05': SecurityCollector(self.config),
|
||||
'06': BackupCollector(self.config),
|
||||
'07': MonitoringCollector(self.config),
|
||||
'08': DatabaseCollector(self.config),
|
||||
'09': ProcedureCollector(self.config),
|
||||
}
|
||||
|
||||
data = {}
|
||||
sections_to_process = [section] if section else collectors.keys()
|
||||
|
||||
for section_id in sections_to_process:
|
||||
try:
|
||||
logger.info(f"Collecting data for section {section_id}")
|
||||
collector = collectors.get(section_id)
|
||||
if collector:
|
||||
data[section_id] = collector.collect()
|
||||
logger.info(f"✓ Section {section_id} data collected")
|
||||
except Exception as e:
|
||||
logger.error(f"✗ Failed to collect section {section_id}: {e}")
|
||||
data[section_id] = None
|
||||
|
||||
return data
|
||||
|
||||
def generate_documentation(self, data):
|
||||
"""Generate markdown documentation from collected data"""
|
||||
generator = DocumentationGenerator(self.config)
|
||||
|
||||
for section_id, section_data in data.items():
|
||||
if section_data:
|
||||
try:
|
||||
logger.info(f"Generating documentation for section {section_id}")
|
||||
output_file = f"output/section_{section_id}.md"
|
||||
generator.generate(section_id, section_data, output_file)
|
||||
|
||||
# Validate generated document
|
||||
validator = DocumentValidator()
|
||||
if validator.validate(output_file):
|
||||
logger.info(f"✓ Section {section_id} generated and validated")
|
||||
self.sections.append(section_id)
|
||||
else:
|
||||
logger.warning(f"⚠ Section {section_id} validation warnings")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"✗ Failed to generate section {section_id}: {e}")
|
||||
|
||||
# Generate improvement section based on all other sections
|
||||
if len(self.sections) > 0:
|
||||
logger.info("Analyzing for improvements...")
|
||||
analyzer = ImprovementAnalyzer(self.config)
|
||||
improvements = analyzer.analyze(data)
|
||||
generator.generate('10', improvements, "output/section_10.md")
|
||||
|
||||
def run(self, section=None, dry_run=False):
|
||||
"""Main execution flow"""
|
||||
logger.info("=" * 60)
|
||||
logger.info("Starting Datacenter Documentation Generation")
|
||||
logger.info(f"Timestamp: {datetime.now().isoformat()}")
|
||||
logger.info("=" * 60)
|
||||
|
||||
try:
|
||||
# Collect data
|
||||
data = self.collect_data(section)
|
||||
|
||||
if dry_run:
|
||||
logger.info("DRY RUN - Data collection complete, skipping generation")
|
||||
return True
|
||||
|
||||
# Generate documentation
|
||||
self.generate_documentation(data)
|
||||
|
||||
logger.info("=" * 60)
|
||||
logger.info(f"✓ Documentation generation completed successfully")
|
||||
logger.info(f"Sections updated: {', '.join(self.sections)}")
|
||||
logger.info("=" * 60)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Fatal error during documentation generation: {e}")
|
||||
return False
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Generate Datacenter Documentation')
|
||||
parser.add_argument('--section', help='Generate specific section only (01-10)')
|
||||
parser.add_argument('--dry-run', action='store_true', help='Collect data without generating docs')
|
||||
parser.add_argument('--config', default='config.yaml', help='Configuration file path')
|
||||
parser.add_argument('--debug', action='store_true', help='Enable debug logging')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.debug:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
generator = DatacenterDocGenerator(args.config)
|
||||
success = generator.run(section=args.section, dry_run=args.dry_run)
|
||||
|
||||
sys.exit(0 if success else 1)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 2. Collector Modules
|
||||
|
||||
### 2.1 Infrastructure Collector
|
||||
```python
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
collectors/infrastructure.py - Raccolta dati infrastruttura fisica
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Dict
|
||||
import requests
|
||||
from pysnmp.hlapi import *
|
||||
|
||||
@dataclass
|
||||
class UPSData:
|
||||
id: str
|
||||
model: str
|
||||
power_kva: float
|
||||
battery_capacity: float
|
||||
autonomy_minutes: int
|
||||
status: str
|
||||
last_test: str
|
||||
|
||||
class InfrastructureCollector:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
self.asset_db = self.connect_asset_db()
|
||||
|
||||
def connect_asset_db(self):
|
||||
"""Connect to asset management database"""
|
||||
import mysql.connector
|
||||
return mysql.connector.connect(
|
||||
host=self.config['databases']['asset_db']['host'],
|
||||
user=self.config['databases']['asset_db']['user'],
|
||||
password=self.config['databases']['asset_db']['password'],
|
||||
database=self.config['databases']['asset_db']['database']
|
||||
)
|
||||
|
||||
def collect_ups_data(self) -> List[UPSData]:
|
||||
"""Collect UPS data via SNMP"""
|
||||
ups_devices = self.config['infrastructure']['ups_devices']
|
||||
ups_data = []
|
||||
|
||||
for ups in ups_devices:
|
||||
try:
|
||||
# Query UPS via SNMP
|
||||
iterator = getCmd(
|
||||
SnmpEngine(),
|
||||
CommunityData(self.config['snmp']['community']),
|
||||
UdpTransportTarget((ups['ip'], 161)),
|
||||
ContextData(),
|
||||
ObjectType(ObjectIdentity('UPS-MIB', 'upsIdentModel', 0)),
|
||||
ObjectType(ObjectIdentity('UPS-MIB', 'upsBatteryStatus', 0)),
|
||||
)
|
||||
|
||||
errorIndication, errorStatus, errorIndex, varBinds = next(iterator)
|
||||
|
||||
if errorIndication:
|
||||
logger.error(f"SNMP error for {ups['id']}: {errorIndication}")
|
||||
continue
|
||||
|
||||
# Parse SNMP response
|
||||
model = str(varBinds[0][1])
|
||||
status = str(varBinds[1][1])
|
||||
|
||||
ups_data.append(UPSData(
|
||||
id=ups['id'],
|
||||
model=model,
|
||||
power_kva=ups.get('power_kva', 0),
|
||||
battery_capacity=ups.get('battery_capacity', 0),
|
||||
autonomy_minutes=ups.get('autonomy_minutes', 0),
|
||||
status=status,
|
||||
last_test=ups.get('last_test', 'N/A')
|
||||
))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to collect UPS {ups['id']}: {e}")
|
||||
|
||||
return ups_data
|
||||
|
||||
def collect_rack_data(self) -> List[Dict]:
|
||||
"""Collect rack inventory from asset database"""
|
||||
cursor = self.asset_db.cursor(dictionary=True)
|
||||
cursor.execute("""
|
||||
SELECT
|
||||
rack_id,
|
||||
location,
|
||||
total_units,
|
||||
occupied_units,
|
||||
max_power_kw
|
||||
FROM racks
|
||||
ORDER BY location, rack_id
|
||||
""")
|
||||
return cursor.fetchall()
|
||||
|
||||
def collect_environmental_sensors(self) -> List[Dict]:
|
||||
"""Collect temperature/humidity sensor data"""
|
||||
sensors_api = self.config['infrastructure']['sensors_api']
|
||||
response = requests.get(
|
||||
f"{sensors_api}/api/sensors/current",
|
||||
timeout=10
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def collect(self) -> Dict:
|
||||
"""Main collection method"""
|
||||
return {
|
||||
'ups_systems': self.collect_ups_data(),
|
||||
'racks': self.collect_rack_data(),
|
||||
'environmental': self.collect_environmental_sensors(),
|
||||
'cooling': self.collect_cooling_data(),
|
||||
'power_distribution': self.collect_pdu_data(),
|
||||
'timestamp': datetime.now().isoformat()
|
||||
}
|
||||
```
|
||||
|
||||
### 2.2 Network Collector
|
||||
```python
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
collectors/network.py - Raccolta configurazioni networking
|
||||
"""
|
||||
|
||||
from netmiko import ConnectHandler
|
||||
import paramiko
|
||||
|
||||
class NetworkCollector:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
|
||||
def connect_device(self, device_config):
|
||||
"""SSH connection to network device"""
|
||||
return ConnectHandler(
|
||||
device_type=device_config['type'],
|
||||
host=device_config['host'],
|
||||
username=device_config['username'],
|
||||
password=device_config['password'],
|
||||
secret=device_config.get('enable_password')
|
||||
)
|
||||
|
||||
def collect_switch_inventory(self) -> List[Dict]:
|
||||
"""Collect switch inventory and configuration"""
|
||||
switches = []
|
||||
|
||||
for switch_config in self.config['network']['switches']:
|
||||
try:
|
||||
connection = self.connect_device(switch_config)
|
||||
|
||||
# Collect basic info
|
||||
version = connection.send_command('show version')
|
||||
interfaces = connection.send_command('show interfaces status')
|
||||
vlan = connection.send_command('show vlan brief')
|
||||
|
||||
switches.append({
|
||||
'hostname': switch_config['hostname'],
|
||||
'version': self.parse_version(version),
|
||||
'interfaces': self.parse_interfaces(interfaces),
|
||||
'vlans': self.parse_vlans(vlan),
|
||||
})
|
||||
|
||||
connection.disconnect()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to collect {switch_config['hostname']}: {e}")
|
||||
|
||||
return switches
|
||||
|
||||
def collect_firewall_rules(self) -> Dict:
|
||||
"""Collect firewall configuration"""
|
||||
# Implementation depends on firewall vendor
|
||||
pass
|
||||
|
||||
def collect(self) -> Dict:
|
||||
"""Main collection method"""
|
||||
return {
|
||||
'switches': self.collect_switch_inventory(),
|
||||
'routers': self.collect_router_data(),
|
||||
'firewalls': self.collect_firewall_rules(),
|
||||
'vlans': self.collect_vlan_config(),
|
||||
'timestamp': datetime.now().isoformat()
|
||||
}
|
||||
```
|
||||
|
||||
### 2.3 VMware Collector
|
||||
```python
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
collectors/virtualization.py - Raccolta dati VMware/Hypervisor
|
||||
"""
|
||||
|
||||
from pyVim.connect import SmartConnect, Disconnect
|
||||
from pyVmomi import vim
|
||||
import ssl
|
||||
|
||||
class VirtualizationCollector:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
self.si = self.connect_vcenter()
|
||||
|
||||
def connect_vcenter(self):
|
||||
"""Connect to vCenter"""
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
|
||||
context.verify_mode = ssl.CERT_NONE
|
||||
|
||||
return SmartConnect(
|
||||
host=self.config['vmware']['vcenter_host'],
|
||||
user=self.config['vmware']['username'],
|
||||
pwd=self.config['vmware']['password'],
|
||||
sslContext=context
|
||||
)
|
||||
|
||||
def collect_vm_inventory(self) -> List[Dict]:
|
||||
"""Collect all VMs"""
|
||||
content = self.si.RetrieveContent()
|
||||
container = content.rootFolder
|
||||
viewType = [vim.VirtualMachine]
|
||||
recursive = True
|
||||
|
||||
containerView = content.viewManager.CreateContainerView(
|
||||
container, viewType, recursive
|
||||
)
|
||||
|
||||
vms = []
|
||||
for vm in containerView.view:
|
||||
if vm.config:
|
||||
vms.append({
|
||||
'name': vm.name,
|
||||
'power_state': vm.runtime.powerState,
|
||||
'vcpu': vm.config.hardware.numCPU,
|
||||
'memory_mb': vm.config.hardware.memoryMB,
|
||||
'guest_os': vm.config.guestFullName,
|
||||
'host': vm.runtime.host.name if vm.runtime.host else 'N/A',
|
||||
'storage_gb': sum(d.capacityInBytes for d in vm.config.hardware.device
|
||||
if isinstance(d, vim.vm.device.VirtualDisk)) / 1024**3
|
||||
})
|
||||
|
||||
return vms
|
||||
|
||||
def collect_host_inventory(self) -> List[Dict]:
|
||||
"""Collect ESXi hosts"""
|
||||
content = self.si.RetrieveContent()
|
||||
hosts = []
|
||||
|
||||
for datacenter in content.rootFolder.childEntity:
|
||||
if hasattr(datacenter, 'hostFolder'):
|
||||
for cluster in datacenter.hostFolder.childEntity:
|
||||
for host in cluster.host:
|
||||
hosts.append({
|
||||
'name': host.name,
|
||||
'cluster': cluster.name,
|
||||
'cpu_cores': host.hardware.cpuInfo.numCpuCores,
|
||||
'memory_gb': host.hardware.memorySize / 1024**3,
|
||||
'cpu_usage': host.summary.quickStats.overallCpuUsage,
|
||||
'memory_usage': host.summary.quickStats.overallMemoryUsage,
|
||||
'vms_count': len(host.vm),
|
||||
'uptime': host.summary.quickStats.uptime,
|
||||
})
|
||||
|
||||
return hosts
|
||||
|
||||
def collect(self) -> Dict:
|
||||
"""Main collection method"""
|
||||
data = {
|
||||
'vms': self.collect_vm_inventory(),
|
||||
'hosts': self.collect_host_inventory(),
|
||||
'datastores': self.collect_datastore_info(),
|
||||
'clusters': self.collect_cluster_config(),
|
||||
'timestamp': datetime.now().isoformat()
|
||||
}
|
||||
|
||||
Disconnect(self.si)
|
||||
return data
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 3. Helper Functions
|
||||
|
||||
### 3.1 SNMP Utilities
|
||||
```python
|
||||
"""
|
||||
utils/snmp_helper.py
|
||||
"""
|
||||
|
||||
from pysnmp.hlapi import *
|
||||
|
||||
def snmp_get(target, oid, community='public'):
|
||||
"""Simple SNMP GET"""
|
||||
iterator = getCmd(
|
||||
SnmpEngine(),
|
||||
CommunityData(community),
|
||||
UdpTransportTarget((target, 161)),
|
||||
ContextData(),
|
||||
ObjectType(ObjectIdentity(oid))
|
||||
)
|
||||
|
||||
errorIndication, errorStatus, errorIndex, varBinds = next(iterator)
|
||||
|
||||
if errorIndication:
|
||||
raise Exception(f"SNMP Error: {errorIndication}")
|
||||
|
||||
return str(varBinds[0][1])
|
||||
|
||||
def snmp_walk(target, oid, community='public'):
|
||||
"""Simple SNMP WALK"""
|
||||
results = []
|
||||
|
||||
for (errorIndication, errorStatus, errorIndex, varBinds) in nextCmd(
|
||||
SnmpEngine(),
|
||||
CommunityData(community),
|
||||
UdpTransportTarget((target, 161)),
|
||||
ContextData(),
|
||||
ObjectType(ObjectIdentity(oid)),
|
||||
lexicographicMode=False
|
||||
):
|
||||
if errorIndication:
|
||||
break
|
||||
|
||||
for varBind in varBinds:
|
||||
results.append((str(varBind[0]), str(varBind[1])))
|
||||
|
||||
return results
|
||||
```
|
||||
|
||||
### 3.2 Token Counter
|
||||
```python
|
||||
"""
|
||||
utils/token_counter.py
|
||||
"""
|
||||
|
||||
def count_tokens(text):
|
||||
"""
|
||||
Stima approssimativa dei token
|
||||
1 token ≈ 4 caratteri in inglese
|
||||
"""
|
||||
return len(text) // 4
|
||||
|
||||
def count_file_tokens(file_path):
|
||||
"""Count tokens in a file"""
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
return count_tokens(content)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 4. Configuration File Example
|
||||
|
||||
### 4.1 config.yaml
|
||||
```yaml
|
||||
# Configuration file for datacenter documentation generator
|
||||
|
||||
# Database connections
|
||||
databases:
|
||||
asset_db:
|
||||
host: db.company.local
|
||||
port: 3306
|
||||
user: readonly_user
|
||||
password: ${VAULT:asset_db_password}
|
||||
database: asset_management
|
||||
|
||||
# Infrastructure
|
||||
infrastructure:
|
||||
ups_devices:
|
||||
- id: UPS-01
|
||||
ip: 10.0.10.10
|
||||
power_kva: 100
|
||||
- id: UPS-02
|
||||
ip: 10.0.10.11
|
||||
power_kva: 100
|
||||
|
||||
sensors_api: http://sensors.company.local
|
||||
|
||||
# Network devices
|
||||
network:
|
||||
switches:
|
||||
- hostname: core-sw-01
|
||||
host: 10.0.10.20
|
||||
type: cisco_ios
|
||||
username: readonly
|
||||
password: ${VAULT:network_password}
|
||||
|
||||
# VMware
|
||||
vmware:
|
||||
vcenter_host: vcenter.company.local
|
||||
username: automation@vsphere.local
|
||||
password: ${VAULT:vmware_password}
|
||||
|
||||
# SNMP
|
||||
snmp:
|
||||
community: ${VAULT:snmp_community}
|
||||
version: 2c
|
||||
|
||||
# Output
|
||||
output:
|
||||
directory: /opt/datacenter-docs/output
|
||||
format: markdown
|
||||
|
||||
# Thresholds
|
||||
thresholds:
|
||||
cpu_warning: 80
|
||||
cpu_critical: 90
|
||||
memory_warning: 85
|
||||
memory_critical: 95
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 5. Deployment Script
|
||||
|
||||
### 5.1 deploy.sh
|
||||
```bash
|
||||
#!/bin/bash
|
||||
# Deploy datacenter documentation generator
|
||||
|
||||
set -e
|
||||
|
||||
INSTALL_DIR="/opt/datacenter-docs"
|
||||
VENV_DIR="$INSTALL_DIR/venv"
|
||||
LOG_DIR="/var/log/datacenter-docs"
|
||||
|
||||
echo "Installing datacenter documentation generator..."
|
||||
|
||||
# Create directories
|
||||
mkdir -p $INSTALL_DIR
|
||||
mkdir -p $LOG_DIR
|
||||
mkdir -p $INSTALL_DIR/output
|
||||
|
||||
# Create virtual environment
|
||||
python3 -m venv $VENV_DIR
|
||||
source $VENV_DIR/bin/activate
|
||||
|
||||
# Install dependencies
|
||||
pip install --upgrade pip
|
||||
pip install -r requirements.txt
|
||||
|
||||
# Copy files
|
||||
cp -r collectors $INSTALL_DIR/
|
||||
cp -r generators $INSTALL_DIR/
|
||||
cp -r validators $INSTALL_DIR/
|
||||
cp -r templates $INSTALL_DIR/
|
||||
cp main.py $INSTALL_DIR/
|
||||
cp config.yaml $INSTALL_DIR/
|
||||
|
||||
# Set permissions
|
||||
chown -R automation:automation $INSTALL_DIR
|
||||
chmod +x $INSTALL_DIR/main.py
|
||||
|
||||
# Install cron job
|
||||
cat > /etc/cron.d/datacenter-docs << 'CRON'
|
||||
# Datacenter documentation generation
|
||||
0 */6 * * * automation /opt/datacenter-docs/venv/bin/python /opt/datacenter-docs/main.py >> /var/log/datacenter-docs/cron.log 2>&1
|
||||
CRON
|
||||
|
||||
echo "✓ Installation complete!"
|
||||
echo "Run: cd $INSTALL_DIR && source venv/bin/activate && python main.py --help"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 6. Testing Framework
|
||||
|
||||
### 6.1 test_collectors.py
|
||||
```python
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
tests/test_collectors.py
|
||||
"""
|
||||
|
||||
import unittest
|
||||
from unittest.mock import Mock, patch
|
||||
from collectors.infrastructure import InfrastructureCollector
|
||||
|
||||
class TestInfrastructureCollector(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.config = {
|
||||
'databases': {'asset_db': {...}},
|
||||
'snmp': {'community': 'public'}
|
||||
}
|
||||
self.collector = InfrastructureCollector(self.config)
|
||||
|
||||
@patch('mysql.connector.connect')
|
||||
def test_asset_db_connection(self, mock_connect):
|
||||
"""Test database connection"""
|
||||
mock_connect.return_value = Mock()
|
||||
db = self.collector.connect_asset_db()
|
||||
self.assertIsNotNone(db)
|
||||
|
||||
def test_ups_data_collection(self):
|
||||
"""Test UPS data collection"""
|
||||
# Mock SNMP responses
|
||||
ups_data = self.collector.collect_ups_data()
|
||||
self.assertIsInstance(ups_data, list)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
**Documento Versione**: 1.0
|
||||
**Per Supporto**: automation-team@company.com
|
||||
Reference in New Issue
Block a user