import os import glob import pandas as pd from docx import Document from docx.document import Document as _Document from docx.oxml.text.paragraph import CT_P from docx.oxml.table import CT_Tbl from docx.table import _Cell, Table from docx.text.paragraph import Paragraph import google.generativeai as genai from dotenv import load_dotenv import time # Carica le variabili d'ambiente load_dotenv() # Configurazione directory INPUT_DIR = './input' OUTPUT_DIR = './output' # Assicurati che le directory esistano os.makedirs(OUTPUT_DIR, exist_ok=True) # Configura Gemini # Nota: Assicurati di aver impostato GOOGLE_API_KEY nel tuo file .env api_key = os.getenv("GOOGLE_API_KEY") if not api_key: print("Warning: GOOGLE_API_KEY non trovata nel file .env") else: genai.configure(api_key=api_key) def iter_block_items(parent): """ Yield each paragraph and table child within *parent*, in document order. Each returned value is an instance of either Table or Paragraph. """ if isinstance(parent, _Document): parent_elm = parent.element.body elif isinstance(parent, _Cell): parent_elm = parent._tc else: raise ValueError("something's not right") for child in parent_elm.iterchildren(): if isinstance(child, CT_P): yield Paragraph(child, parent) elif isinstance(child, CT_Tbl): yield Table(child, parent) def read_docx_chunks(file_path, chunk_size=4000): """ Legge il file .docx e restituisce un generatore di chunk di testo. Mantiene l'ordine di paragrafi e tabelle. chunk_size: numero approssimativo di caratteri per chunk. """ try: doc = Document(file_path) current_chunk = [] current_length = 0 for block in iter_block_items(doc): text = "" if isinstance(block, Paragraph): text = block.text.strip() if text: text += "\n" elif isinstance(block, Table): # Converti tabella in testo (markdown-like o pipe separated) for row in block.rows: row_data = [cell.text.strip() for cell in row.cells] text += "| " + " | ".join(row_data) + " |\n" text += "\n" if text: current_chunk.append(text) current_length += len(text) # Se superiamo la dimensione del chunk, yieldiamo if current_length >= chunk_size: yield "".join(current_chunk) current_chunk = [] current_length = 0 # Yield dell'ultimo chunk se presente if current_chunk: yield "".join(current_chunk) except Exception as e: print(f"Errore lettura {file_path}: {e}") yield None def parse_markdown_table(text): """ Analizza un testo contenente una tabella Markdown e restituisce un DataFrame pandas. """ lines = text.split('\n') table_lines = [line.strip() for line in lines if line.strip().startswith('|')] if len(table_lines) < 2: return None # Trova la riga separatrice (es. |---|---|) separator_index = -1 for i, line in enumerate(table_lines): # Rimuovi pipe e spazi per controllare se contiene solo trattini/due punti content = line.replace('|', '').replace(':', '').replace('-', '').strip() if not content: # Se vuota dopo aver rimosso i caratteri separatori, è una riga separatrice separator_index = i break if separator_index <= 0: return None def split_row(row_str): # Split semplice per pipe parts = row_str.split('|') # Rimuovi primo e ultimo elemento se vuoti (dovuti ai pipe esterni) if row_str.startswith('|'): parts.pop(0) if row_str.endswith('|'): parts.pop(-1) return [p.strip() for p in parts] headers = split_row(table_lines[separator_index - 1]) data_rows = table_lines[separator_index + 1:] data = [] for row in data_rows: cols = split_row(row) # Allinea colonne if len(cols) == len(headers): data.append(cols) elif len(cols) < len(headers): data.append(cols + [''] * (len(headers) - len(cols))) else: data.append(cols[:len(headers)]) if not data: return None return pd.DataFrame(data, columns=headers) def step1_process_pages(): """Step 1: Processa pagine -> Markdown -> CSV (Algoritmico)""" print("\n--- INIZIO STEP 1: Word -> Markdown -> CSV (Gemini) ---") word_files = glob.glob(os.path.join(INPUT_DIR, "*.docx")) if not word_files: print("Nessun file .docx trovato in input.") return # Colonne attese per validazione (opzionale, ma utile per il prompt) expected_columns = ["ID", "Descrizione", "Risultato Atteso", "Risultato Ottenuto", "Stato"] # Definizione System Prompt system_prompt = f""" Sei un esperto QA. Analizza il frammento di documento fornito. Estrai TUTTI i dati relativi a Use Case, Test Case, Scenari e Verifiche senza rielaborare il testo, il testo di Use Case, Test Case, Scenari e Verifiche deve rimanere IDENTICO a quello letto. Output richiesto: - Genera una TABELLA MARKDOWN valida. - La tabella DEVE avere ESATTAMENTE queste colonne: {', '.join(expected_columns)}. - Se un dato non è presente, lascia la cella vuota. - NON aggiungere altro testo prima o dopo la tabella. - NON inventare ID, se li trovi usali altrimenti lascia vuoto. """ # Configurazione Modello # Utilizziamo gemini-flash-latest come richiesto (o la versione più vicina disponibile) generation_config = { "temperature": 0.1, } # Configurazione Safety Settings safety_settings = { genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE, genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE, genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE, genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE, } try: model = genai.GenerativeModel( model_name="gemini-flash-latest", system_instruction=system_prompt, generation_config=generation_config, safety_settings=safety_settings ) except Exception as e: print(f"Errore inizializzazione modello Gemini: {e}") return for file_path in word_files: filename = os.path.basename(file_path) file_base_name = os.path.splitext(filename)[0] print(f"Elaborazione: {filename}...") # Directory output output_subdir = os.path.join(OUTPUT_DIR, file_base_name) os.makedirs(output_subdir, exist_ok=True) chunk_count = 0 for chunk in read_docx_chunks(file_path): if not chunk: continue chunk_count += 1 print(f" -> Elaborazione Pagina/Chunk {chunk_count}...") user_prompt = f"Frammento {chunk_count}:\n\n{chunk}" try: # Chiamata a Gemini response = model.generate_content(user_prompt) try: content = response.text.strip() except ValueError: print(f" -> Warning: response.text non disponibile. Finish reason: {response.candidates[0].finish_reason if response.candidates else 'N/A'}") content = "" # Pulisci markdown fences if content.startswith("```markdown"): content = content.replace("```markdown", "").replace("```", "") elif content.startswith("```"): content = content.replace("```", "") content = content.strip() if content and "NO_DATA" not in content: # 1. Salva Markdown md_path = os.path.join(output_subdir, f"chunk_{chunk_count}.md") with open(md_path, 'w', encoding='utf-8') as f: f.write(content) # 2. Converti in CSV algoritmicamente df = parse_markdown_table(content) if df is not None and not df.empty: csv_path = os.path.join(output_subdir, f"chunk_{chunk_count}.csv") df.to_csv(csv_path, index=False, encoding='utf-8') # print(f" -> Generato CSV: {csv_path}") else: print(f" -> Warning: Impossibile parsare tabella in chunk {chunk_count}") else: pass # Rispetta i rate limit (opzionale ma consigliato) time.sleep(1) except Exception as e: print(f"Errore chunk {chunk_count} di {filename}: {e}") def step2_aggregate_csvs(): """Step 2: Aggrega i frammenti CSV in un unico file finale per documento.""" print("\n--- INIZIO STEP 2: Aggregazione CSV ---") subdirs = [d for d in glob.glob(os.path.join(OUTPUT_DIR, "*")) if os.path.isdir(d)] if not subdirs: print("Nessuna cartella di frammenti trovata in output.") return for fragments_dir in subdirs: doc_name = os.path.basename(fragments_dir) print(f"Aggregazione per: {doc_name}...") all_files = glob.glob(os.path.join(fragments_dir, "*.csv")) try: all_files.sort(key=lambda x: int(os.path.basename(x).split('_')[1].split('.')[0])) except: all_files.sort() if not all_files: print(f" -> Nessun frammento CSV trovato in {fragments_dir}") continue combined_df = pd.concat((pd.read_csv(f) for f in all_files), ignore_index=True) output_csv_path = os.path.join(OUTPUT_DIR, f"{doc_name}.csv") combined_df.to_csv(output_csv_path, index=False, encoding='utf-8') print(f" -> Creato CSV finale: {output_csv_path}") def main(): step1_process_pages() step2_aggregate_csvs() print("\nProcesso completato.") if __name__ == "__main__": main()