#!/usr/bin/env python3 """ End-to-End Workflow Test Script Tests the complete documentation generation workflow: 1. VMware Collector (with mock data) 2. Infrastructure Generator (with mock LLM) 3. MongoDB storage 4. API retrieval This script validates the system architecture without requiring: - Real VMware infrastructure - Real LLM API credentials """ import asyncio import logging from datetime import datetime # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) async def test_collector(): """Test VMware collector with mock data""" logger.info("=" * 70) logger.info("TEST 1: VMware Collector") logger.info("=" * 70) from datacenter_docs.collectors.vmware_collector import VMwareCollector collector = VMwareCollector() logger.info(f"Collector name: {collector.name}") logger.info("Running collector.run()...") result = await collector.run() logger.info(f"Collection result: {result['success']}") if result['success']: data = result['data'] logger.info(f"✅ Data collected successfully!") logger.info(f" - VMs: {len(data.get('data', {}).get('vms', []))}") logger.info(f" - Hosts: {len(data.get('data', {}).get('hosts', []))}") logger.info(f" - Clusters: {len(data.get('data', {}).get('clusters', []))}") logger.info( f" - Datastores: {len(data.get('data', {}).get('datastores', []))}" ) logger.info(f" - Networks: {len(data.get('data', {}).get('networks', []))}") return result else: logger.error(f"❌ Collection failed: {result.get('error')}") return None async def test_generator_structure(): """Test generator structure (without LLM call)""" logger.info("\n" + "=" * 70) logger.info("TEST 2: Infrastructure Generator Structure") logger.info("=" * 70) from datacenter_docs.generators.infrastructure_generator import ( InfrastructureGenerator, ) generator = InfrastructureGenerator() logger.info(f"Generator name: {generator.name}") logger.info(f"Generator section: {generator.section}") logger.info(f"Generator LLM client configured: {generator.llm is not None}") # Test data formatting sample_data = { 'metadata': {'collector': 'vmware', 'collected_at': datetime.now().isoformat()}, 'data': { 'statistics': {'total_vms': 10, 'powered_on_vms': 8}, 'vms': [{'name': 'test-vm-01', 'power_state': 'poweredOn'}], }, } summary = generator._format_data_summary(sample_data['data']) logger.info(f"✅ Data summary formatted ({len(summary)} chars)") logger.info(f" Summary preview: {summary[:200]}...") return generator async def test_database_connection(): """Test MongoDB connection and storage""" logger.info("\n" + "=" * 70) logger.info("TEST 3: Database Connection") logger.info("=" * 70) from beanie import init_beanie from motor.motor_asyncio import AsyncIOMotorClient from datacenter_docs.api.models import ( AuditLog, AutoRemediationPolicy, ChatSession, DocumentationSection, RemediationApproval, RemediationLog, SystemMetric, Ticket, TicketFeedback, TicketPattern, ) from datacenter_docs.utils.config import get_settings settings = get_settings() try: logger.info(f"Connecting to MongoDB: {settings.MONGODB_URL}") client = AsyncIOMotorClient(settings.MONGODB_URL) database = client[settings.MONGODB_DATABASE] # Test connection await database.command('ping') logger.info("✅ MongoDB connection successful!") # Initialize Beanie await init_beanie( database=database, document_models=[ Ticket, TicketFeedback, RemediationLog, RemediationApproval, AutoRemediationPolicy, TicketPattern, DocumentationSection, ChatSession, SystemMetric, AuditLog, ], ) logger.info("✅ Beanie ORM initialized!") # Test creating a document test_section = DocumentationSection( section_id="test_section_" + datetime.now().strftime("%Y%m%d_%H%M%S"), name="Test Section", description="This is a test section for validation", ) await test_section.insert() logger.info(f"✅ Test document created: {test_section.section_id}") # Count documents count = await DocumentationSection.count() logger.info(f" Total DocumentationSection records: {count}") return True except Exception as e: logger.error(f"❌ Database test failed: {e}", exc_info=True) return False async def test_full_workflow_mock(): """Test full workflow with mock data (no LLM call)""" logger.info("\n" + "=" * 70) logger.info("TEST 4: Full Workflow (Mock)") logger.info("=" * 70) try: # Step 1: Collect data logger.info("Step 1: Collecting VMware data...") collector_result = await test_collector() if not collector_result or not collector_result['success']: logger.error("❌ Collector test failed, aborting workflow test") return False # Step 2: Test generator structure logger.info("\nStep 2: Testing generator structure...") generator = await test_generator_structure() # Step 3: Test database logger.info("\nStep 3: Testing database connection...") db_ok = await test_database_connection() if not db_ok: logger.error("❌ Database test failed, aborting workflow test") return False logger.info("\n" + "=" * 70) logger.info("✅ WORKFLOW TEST PASSED (Mock)") logger.info("=" * 70) logger.info("Components validated:") logger.info(" ✅ VMware Collector (mock data)") logger.info(" ✅ Infrastructure Generator (structure)") logger.info(" ✅ MongoDB connection & storage") logger.info(" ✅ Beanie ORM models") logger.info("\nTo test with real LLM:") logger.info(" 1. Configure LLM API key in .env") logger.info(" 2. Run: poetry run datacenter-docs generate vmware") return True except Exception as e: logger.error(f"❌ Workflow test failed: {e}", exc_info=True) return False async def main(): """Main test entry point""" logger.info("🚀 Starting End-to-End Workflow Test") logger.info("=" * 70) try: success = await test_full_workflow_mock() if success: logger.info("\n🎉 All tests passed!") return 0 else: logger.error("\n❌ Some tests failed") return 1 except Exception as e: logger.error(f"\n💥 Test execution failed: {e}", exc_info=True) return 1 if __name__ == "__main__": exit_code = asyncio.run(main()) exit(exit_code)