diff --git a/.env.example b/.env.example index 28b7872..7b104e5 100644 --- a/.env.example +++ b/.env.example @@ -1,2 +1,3 @@ +LLM_PROVIDER=openai OPENAI_API_KEY=xxxxxx GOOGLE_API_KEY=xxxxxx \ No newline at end of file diff --git a/process_files_gemini.py b/process_files.py similarity index 75% rename from process_files_gemini.py rename to process_files.py index 24bb3d4..d70d100 100644 --- a/process_files_gemini.py +++ b/process_files.py @@ -8,6 +8,7 @@ from docx.oxml.table import CT_Tbl from docx.table import _Cell, Table from docx.text.paragraph import Paragraph import google.generativeai as genai +import openai from dotenv import load_dotenv import time @@ -140,7 +141,9 @@ def parse_markdown_table(text): def step1_process_pages(): """Step 1: Processa pagine -> Markdown -> CSV (Algoritmico)""" - print("\n--- INIZIO STEP 1: Word -> Markdown -> CSV (Gemini) ---") + llm_provider = os.getenv("LLM_PROVIDER", "gemini").lower() + print(f"\n--- INIZIO STEP 1: Word -> Markdown -> CSV ({llm_provider.upper()}) ---") + word_files = glob.glob(os.path.join(INPUT_DIR, "*.docx")) if not word_files: @@ -163,29 +166,47 @@ def step1_process_pages(): - NON inventare ID, se li trovi usali altrimenti lascia vuoto. """ - # Configurazione Modello - # Utilizziamo gemini-flash-latest come richiesto (o la versione più vicina disponibile) - generation_config = { - "temperature": 0.1, - } - - # Configurazione Safety Settings - safety_settings = { - genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE, - genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE, - genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE, - genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE, - } + model = None + openai_client = None - try: - model = genai.GenerativeModel( - model_name="gemini-flash-latest", - system_instruction=system_prompt, - generation_config=generation_config, - safety_settings=safety_settings - ) - except Exception as e: - print(f"Errore inizializzazione modello Gemini: {e}") + if llm_provider == "gemini": + # Configurazione Modello Gemini + # Utilizziamo gemini-flash-latest come richiesto (o la versione più vicina disponibile) + generation_config = { + "temperature": 0.1, + } + + # Configurazione Safety Settings + safety_settings = { + genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE, + genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE, + genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE, + genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE, + } + + try: + model = genai.GenerativeModel( + model_name="gemini-flash-latest", + system_instruction=system_prompt, + generation_config=generation_config, + safety_settings=safety_settings + ) + except Exception as e: + print(f"Errore inizializzazione modello Gemini: {e}") + return + + elif llm_provider == "openai": + api_key = os.getenv("OPENAI_API_KEY") + if not api_key: + print("Errore: OPENAI_API_KEY non trovata nel file .env") + return + try: + openai_client = openai.OpenAI(api_key=api_key) + except Exception as e: + print(f"Errore inizializzazione client OpenAI: {e}") + return + else: + print(f"Provider {llm_provider} non supportato. Usa 'gemini' o 'openai'.") return for file_path in word_files: @@ -208,14 +229,28 @@ def step1_process_pages(): user_prompt = f"Frammento {chunk_count}:\n\n{chunk}" try: - # Chiamata a Gemini - response = model.generate_content(user_prompt) - - try: - content = response.text.strip() - except ValueError: - print(f" -> Warning: response.text non disponibile. Finish reason: {response.candidates[0].finish_reason if response.candidates else 'N/A'}") - content = "" + content = "" + if llm_provider == "gemini": + # Chiamata a Gemini + response = model.generate_content(user_prompt) + + try: + content = response.text.strip() + except ValueError: + print(f" -> Warning: response.text non disponibile. Finish reason: {response.candidates[0].finish_reason if response.candidates else 'N/A'}") + content = "" + + elif llm_provider == "openai": + # Chiamata a OpenAI + response = openai_client.chat.completions.create( + model="gpt-4o", + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ], + temperature=0.1 + ) + content = response.choices[0].message.content.strip() # Pulisci markdown fences if content.startswith("```markdown"): diff --git a/process_files_openai.py b/process_files_openai.py deleted file mode 100644 index cac315f..0000000 --- a/process_files_openai.py +++ /dev/null @@ -1,256 +0,0 @@ -import os -import json -import glob -import pandas as pd -from docx import Document -from docx.document import Document as _Document -from docx.oxml.text.paragraph import CT_P -from docx.oxml.table import CT_Tbl -from docx.table import _Cell, Table -from docx.text.paragraph import Paragraph -from openai import OpenAI -from dotenv import load_dotenv - -# Carica le variabili d'ambiente -load_dotenv() - -# Configurazione directory -INPUT_DIR = './input' -OUTPUT_DIR = './output' -TEMPLATE_FILE = './template/use_case_template.csv' - -# Assicurati che le directory esistano -os.makedirs(OUTPUT_DIR, exist_ok=True) - -# Inizializza client OpenAI -client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) - -def iter_block_items(parent): - """ - Yield each paragraph and table child within *parent*, in document order. - Each returned value is an instance of either Table or Paragraph. - """ - if isinstance(parent, _Document): - parent_elm = parent.element.body - elif isinstance(parent, _Cell): - parent_elm = parent._tc - else: - raise ValueError("something's not right") - - for child in parent_elm.iterchildren(): - if isinstance(child, CT_P): - yield Paragraph(child, parent) - elif isinstance(child, CT_Tbl): - yield Table(child, parent) - -def read_docx_chunks(file_path, chunk_size=4000): - """ - Legge il file .docx e restituisce un generatore di chunk di testo. - Mantiene l'ordine di paragrafi e tabelle. - chunk_size: numero approssimativo di caratteri per chunk. - """ - try: - doc = Document(file_path) - current_chunk = [] - current_length = 0 - - for block in iter_block_items(doc): - text = "" - if isinstance(block, Paragraph): - text = block.text.strip() - if text: - text += "\n" - elif isinstance(block, Table): - # Converti tabella in testo (markdown-like o pipe separated) - for row in block.rows: - row_data = [cell.text.strip() for cell in row.cells] - text += "| " + " | ".join(row_data) + " |\n" - text += "\n" - - if text: - current_chunk.append(text) - current_length += len(text) - - # Se superiamo la dimensione del chunk, yieldiamo - if current_length >= chunk_size: - yield "".join(current_chunk) - current_chunk = [] - current_length = 0 - - # Yield dell'ultimo chunk se presente - if current_chunk: - yield "".join(current_chunk) - - except Exception as e: - print(f"Errore lettura {file_path}: {e}") - yield None - -def parse_markdown_table(text): - """ - Analizza un testo contenente una tabella Markdown e restituisce un DataFrame pandas. - """ - lines = text.split('\n') - table_lines = [line.strip() for line in lines if line.strip().startswith('|')] - - if len(table_lines) < 2: - return None - - # Trova la riga separatrice (es. |---|---|) - separator_index = -1 - for i, line in enumerate(table_lines): - # Rimuovi pipe e spazi per controllare se contiene solo trattini/due punti - content = line.replace('|', '').replace(':', '').replace('-', '').strip() - if not content: # Se vuota dopo aver rimosso i caratteri separatori, è una riga separatrice - separator_index = i - break - - if separator_index <= 0: - return None - - def split_row(row_str): - # Split semplice per pipe - parts = row_str.split('|') - # Rimuovi primo e ultimo elemento se vuoti (dovuti ai pipe esterni) - if row_str.startswith('|'): parts.pop(0) - if row_str.endswith('|'): parts.pop(-1) - return [p.strip() for p in parts] - - headers = split_row(table_lines[separator_index - 1]) - data_rows = table_lines[separator_index + 1:] - - data = [] - for row in data_rows: - cols = split_row(row) - # Allinea colonne - if len(cols) == len(headers): - data.append(cols) - elif len(cols) < len(headers): - data.append(cols + [''] * (len(headers) - len(cols))) - else: - data.append(cols[:len(headers)]) - - if not data: - return None - - return pd.DataFrame(data, columns=headers) - -def step1_process_pages(): - """Step 1: Processa pagine -> Markdown -> CSV (Algoritmico)""" - print("\n--- INIZIO STEP 1: Word -> Markdown -> CSV ---") - word_files = glob.glob(os.path.join(INPUT_DIR, "*.docx")) - - if not word_files: - print("Nessun file .docx trovato in input.") - return - - # Colonne attese per validazione (opzionale, ma utile per il prompt) - expected_columns = ["ID", "Descrizione", "Risultato Atteso", "Risultato Ottenuto", "Stato"] - - for file_path in word_files: - filename = os.path.basename(file_path) - file_base_name = os.path.splitext(filename)[0] - print(f"Elaborazione: {filename}...") - - # Directory output - output_subdir = os.path.join(OUTPUT_DIR, file_base_name) - os.makedirs(output_subdir, exist_ok=True) - - chunk_count = 0 - for chunk in read_docx_chunks(file_path): - if not chunk: - continue - - chunk_count += 1 - print(f" -> Elaborazione Pagina/Chunk {chunk_count}...") - - system_prompt = f""" - Sei un esperto QA. Analizza il frammento di documento fornito. - Estrai TUTTI i dati relativi a Use Case, Test Case, Scenari e Verifiche senza rielaborare il testo, il testo di Use Case, Test Case, Scenari e Verifiche deve rimanere IDENTICO a quello letto. - - Output richiesto: - - Genera una TABELLA MARKDOWN valida. - - La tabella DEVE avere ESATTAMENTE queste colonne: {', '.join(expected_columns)}. - - Se un dato non è presente, lascia la cella vuota. - - Se non trovi NESSUN dato rilevante, rispondi ESATTAMENTE con "NO_DATA". - - NON aggiungere altro testo prima o dopo la tabella. - """ - - user_prompt = f"Frammento {chunk_count}:\n\n{chunk}" - - try: - response = client.chat.completions.create( - model="gpt-4o", - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt} - ], - temperature=0.1 - ) - - content = response.choices[0].message.content.strip() - - # Pulisci markdown fences - if content.startswith("```markdown"): - content = content.replace("```markdown", "").replace("```", "") - elif content.startswith("```"): - content = content.replace("```", "") - - content = content.strip() - - if content and "NO_DATA" not in content: - # 1. Salva Markdown - md_path = os.path.join(output_subdir, f"chunk_{chunk_count}.md") - with open(md_path, 'w', encoding='utf-8') as f: - f.write(content) - - # 2. Converti in CSV algoritmicamente - df = parse_markdown_table(content) - if df is not None and not df.empty: - csv_path = os.path.join(output_subdir, f"chunk_{chunk_count}.csv") - df.to_csv(csv_path, index=False, encoding='utf-8') - # print(f" -> Generato CSV: {csv_path}") - else: - print(f" -> Warning: Impossibile parsare tabella in chunk {chunk_count}") - else: - pass - - except Exception as e: - print(f"Errore chunk {chunk_count} di {filename}: {e}") - -def step2_aggregate_csvs(): - """Step 2: Aggrega i frammenti CSV in un unico file finale per documento.""" - print("\n--- INIZIO STEP 2: Aggregazione CSV ---") - - subdirs = [d for d in glob.glob(os.path.join(OUTPUT_DIR, "*")) if os.path.isdir(d)] - - if not subdirs: - print("Nessuna cartella di frammenti trovata in output.") - return - - for fragments_dir in subdirs: - doc_name = os.path.basename(fragments_dir) - print(f"Aggregazione per: {doc_name}...") - - all_files = glob.glob(os.path.join(fragments_dir, "*.csv")) - try: - all_files.sort(key=lambda x: int(os.path.basename(x).split('_')[1].split('.')[0])) - except: - all_files.sort() - - if not all_files: - print(f" -> Nessun frammento CSV trovato in {fragments_dir}") - continue - - combined_df = pd.concat((pd.read_csv(f) for f in all_files), ignore_index=True) - - output_csv_path = os.path.join(OUTPUT_DIR, f"{doc_name}.csv") - combined_df.to_csv(output_csv_path, index=False, encoding='utf-8') - print(f" -> Creato CSV finale: {output_csv_path}") - -def main(): - step1_process_pages() - step2_aggregate_csvs() - print("\nProcesso completato.") - -if __name__ == "__main__": - main()