""" Generic LLM Client using OpenAI-compatible API This client works with: - OpenAI - Anthropic (via OpenAI-compatible endpoint) - LLMStudio - Open-WebUI - Ollama - LocalAI - Any other OpenAI-compatible provider """ import logging from typing import Any, AsyncIterator, Dict, List, Optional, Union, cast import httpx from openai import AsyncOpenAI from openai.types.chat import ChatCompletion, ChatCompletionChunk try: from openai.lib.streaming import AsyncStream # type: ignore[attr-defined] except ImportError: from openai._streaming import AsyncStream # type: ignore[import, attr-defined] from .config import get_settings logger = logging.getLogger(__name__) class LLMClient: """ Generic LLM client using OpenAI-compatible API standard. This allows switching between different LLM providers without code changes, just by updating configuration (base_url, api_key, model). Examples: # OpenAI LLM_BASE_URL=https://api.openai.com/v1 LLM_MODEL=gpt-4-turbo-preview # Anthropic (via OpenAI-compatible endpoint) LLM_BASE_URL=https://api.anthropic.com/v1 LLM_MODEL=claude-sonnet-4-20250514 # LLMStudio LLM_BASE_URL=http://localhost:1234/v1 LLM_MODEL=local-model # Open-WebUI LLM_BASE_URL=http://localhost:8080/v1 LLM_MODEL=llama3 """ def __init__( self, base_url: Optional[str] = None, api_key: Optional[str] = None, model: Optional[str] = None, temperature: Optional[float] = None, max_tokens: Optional[int] = None, ): """ Initialize LLM client with OpenAI-compatible API. Args: base_url: Base URL of the API endpoint (e.g., https://api.openai.com/v1) api_key: API key for authentication model: Model name to use (e.g., gpt-4, claude-sonnet-4, llama3) temperature: Sampling temperature (0.0-1.0) max_tokens: Maximum tokens to generate """ settings = get_settings() # Use provided values or fall back to settings self.base_url = base_url or settings.LLM_BASE_URL self.api_key = api_key or settings.LLM_API_KEY self.model = model or settings.LLM_MODEL self.temperature = temperature if temperature is not None else settings.LLM_TEMPERATURE self.max_tokens = max_tokens or settings.LLM_MAX_TOKENS # Initialize AsyncOpenAI client with custom HTTP client (disable SSL verification for self-signed certs) # Increased timeout to 120s for documentation generation (large prompts) http_client = httpx.AsyncClient(verify=False, timeout=120.0) self.client = AsyncOpenAI( base_url=self.base_url, api_key=self.api_key, http_client=http_client ) logger.info(f"Initialized LLM client: base_url={self.base_url}, model={self.model}") async def chat_completion( self, messages: List[Dict[str, str]], temperature: Optional[float] = None, max_tokens: Optional[int] = None, stream: bool = False, **kwargs: Any, ) -> Dict[str, Any]: """ Generate chat completion using OpenAI-compatible API. Args: messages: List of messages [{"role": "user", "content": "..."}] temperature: Override default temperature max_tokens: Override default max_tokens stream: Enable streaming response **kwargs: Additional parameters for the API Returns: Response with generated text and metadata """ try: response: Union[ChatCompletion, AsyncStream[ChatCompletionChunk]] response = await self.client.chat.completions.create( model=self.model, messages=messages, # type: ignore[arg-type] temperature=temperature or self.temperature, max_tokens=max_tokens or self.max_tokens, stream=stream, **kwargs, ) if stream: # Return generator for streaming return {"stream": response} # Type guard: we know it's ChatCompletion when stream=False response = cast(ChatCompletion, response) # Check for None response or empty choices if response is None: raise ValueError("LLM returned None response") if not response.choices or len(response.choices) == 0: raise ValueError("LLM returned empty choices") # Extract text from first choice message = response.choices[0].message content = message.content or "" return { "content": content, "model": response.model, "usage": { "prompt_tokens": response.usage.prompt_tokens if response.usage else 0, "completion_tokens": ( response.usage.completion_tokens if response.usage else 0 ), "total_tokens": response.usage.total_tokens if response.usage else 0, }, "finish_reason": response.choices[0].finish_reason, } except Exception as e: logger.error(f"LLM API call failed: {e}") raise async def generate_with_system( self, system_prompt: str, user_prompt: str, temperature: Optional[float] = None, max_tokens: Optional[int] = None, **kwargs: Any, ) -> str: """ Generate completion with system and user prompts. Args: system_prompt: System instruction user_prompt: User message temperature: Override default temperature max_tokens: Override default max_tokens **kwargs: Additional API parameters Returns: Generated text content """ messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] response = await self.chat_completion( messages=messages, temperature=temperature, max_tokens=max_tokens, **kwargs ) return str(response["content"]) async def generate_json( self, messages: List[Dict[str, str]], temperature: Optional[float] = None, max_tokens: Optional[int] = None, ) -> Dict[str, Any]: """ Generate JSON response (if provider supports response_format). Args: messages: List of messages temperature: Override default temperature max_tokens: Override default max_tokens Returns: Parsed JSON response """ import json try: # Try with response_format if supported response = await self.chat_completion( messages=messages, temperature=temperature or 0.3, # Lower temp for structured output max_tokens=max_tokens, response_format={"type": "json_object"}, ) except Exception as e: logger.warning(f"response_format not supported, using plain completion: {e}") # Fallback to plain completion response = await self.chat_completion( messages=messages, temperature=temperature or 0.3, max_tokens=max_tokens, ) # Parse JSON from content content = str(response["content"]) try: result: Dict[str, Any] = json.loads(content) return result except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON response: {e}") logger.debug(f"Raw content: {content}") raise ValueError(f"LLM did not return valid JSON: {content[:200]}...") async def generate_stream( self, messages: List[Dict[str, str]], temperature: Optional[float] = None, max_tokens: Optional[int] = None, ) -> AsyncIterator[str]: """ Generate streaming completion. Args: messages: List of messages temperature: Override default temperature max_tokens: Override default max_tokens Yields: Text chunks as they arrive """ response = await self.chat_completion( messages=messages, temperature=temperature, max_tokens=max_tokens, stream=True, ) stream = cast(AsyncStream[ChatCompletionChunk], response["stream"]) async for chunk in stream: if chunk.choices and chunk.choices[0].delta.content: yield chunk.choices[0].delta.content # Singleton instance _llm_client: Optional[LLMClient] = None def get_llm_client() -> LLMClient: """Get or create singleton LLM client instance.""" global _llm_client if _llm_client is None: _llm_client = LLMClient() return _llm_client # Example usage async def example_usage() -> None: """Example of using the LLM client""" client = get_llm_client() # Simple completion messages = [ {"role": "system", "content": "You are a helpful datacenter expert."}, {"role": "user", "content": "Explain what a VLAN is in 2 sentences."}, ] response = await client.chat_completion(messages) print(f"Response: {response['content']}") print(f"Tokens used: {response['usage']['total_tokens']}") # JSON response json_messages = [ { "role": "user", "content": 'List 3 common datacenter problems in JSON: {"problems": [...]}', } ] json_response = await client.generate_json(json_messages) print(f"JSON: {json_response}") # Streaming stream_messages = [{"role": "user", "content": "Count from 1 to 5"}] print("Streaming: ", end="") async for chunk in client.generate_stream(stream_messages): print(chunk, end="", flush=True) print() if __name__ == "__main__": import asyncio asyncio.run(example_usage())