feat: Add document processing scripts using Gemini and OpenAI to generate structured use case chunks from input documents.

This commit is contained in:
2025-12-03 16:28:43 +01:00
commit 8a374be90a
9 changed files with 771 additions and 0 deletions

2
.env.example Normal file
View File

@@ -0,0 +1,2 @@
OPENAI_API_KEY=xxxxxx
GOOGLE_API_KEY=xxxxxx

220
.gitignore vendored Normal file
View File

@@ -0,0 +1,220 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py.cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
# Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
# poetry.lock
# poetry.toml
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
# https://pdm-project.org/en/latest/usage/project/#working-with-version-control
# pdm.lock
# pdm.toml
.pdm-python
.pdm-build/
# pixi
# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
# pixi.lock
# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
# in the .venv directory. It is recommended not to include this directory in version control.
.pixi
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# Redis
*.rdb
*.aof
*.pid
# RabbitMQ
mnesia/
rabbitmq/
rabbitmq-data/
# ActiveMQ
activemq-data/
# SageMath parsed files
*.sage.py
# Environments
.env
.envrc
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
# .idea/
# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/
# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
# .vscode/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# Marimo
marimo/_static/
marimo/_lsp/
__marimo__/
# Streamlit
.streamlit/secrets.toml
*.docx
output/**/**.csv
output/**/**.md

0
input/.gitkeep Normal file
View File

0
output/.gitkeep Normal file
View File

287
process_files_gemini.py Normal file
View File

@@ -0,0 +1,287 @@
import os
import glob
import pandas as pd
from docx import Document
from docx.document import Document as _Document
from docx.oxml.text.paragraph import CT_P
from docx.oxml.table import CT_Tbl
from docx.table import _Cell, Table
from docx.text.paragraph import Paragraph
import google.generativeai as genai
from dotenv import load_dotenv
import time
# Carica le variabili d'ambiente
load_dotenv()
# Configurazione directory
INPUT_DIR = './input'
OUTPUT_DIR = './output'
# Assicurati che le directory esistano
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Configura Gemini
# Nota: Assicurati di aver impostato GOOGLE_API_KEY nel tuo file .env
api_key = os.getenv("GOOGLE_API_KEY")
if not api_key:
print("Warning: GOOGLE_API_KEY non trovata nel file .env")
else:
genai.configure(api_key=api_key)
def iter_block_items(parent):
"""
Yield each paragraph and table child within *parent*, in document order.
Each returned value is an instance of either Table or Paragraph.
"""
if isinstance(parent, _Document):
parent_elm = parent.element.body
elif isinstance(parent, _Cell):
parent_elm = parent._tc
else:
raise ValueError("something's not right")
for child in parent_elm.iterchildren():
if isinstance(child, CT_P):
yield Paragraph(child, parent)
elif isinstance(child, CT_Tbl):
yield Table(child, parent)
def read_docx_chunks(file_path, chunk_size=4000):
"""
Legge il file .docx e restituisce un generatore di chunk di testo.
Mantiene l'ordine di paragrafi e tabelle.
chunk_size: numero approssimativo di caratteri per chunk.
"""
try:
doc = Document(file_path)
current_chunk = []
current_length = 0
for block in iter_block_items(doc):
text = ""
if isinstance(block, Paragraph):
text = block.text.strip()
if text:
text += "\n"
elif isinstance(block, Table):
# Converti tabella in testo (markdown-like o pipe separated)
for row in block.rows:
row_data = [cell.text.strip() for cell in row.cells]
text += "| " + " | ".join(row_data) + " |\n"
text += "\n"
if text:
current_chunk.append(text)
current_length += len(text)
# Se superiamo la dimensione del chunk, yieldiamo
if current_length >= chunk_size:
yield "".join(current_chunk)
current_chunk = []
current_length = 0
# Yield dell'ultimo chunk se presente
if current_chunk:
yield "".join(current_chunk)
except Exception as e:
print(f"Errore lettura {file_path}: {e}")
yield None
def parse_markdown_table(text):
"""
Analizza un testo contenente una tabella Markdown e restituisce un DataFrame pandas.
"""
lines = text.split('\n')
table_lines = [line.strip() for line in lines if line.strip().startswith('|')]
if len(table_lines) < 2:
return None
# Trova la riga separatrice (es. |---|---|)
separator_index = -1
for i, line in enumerate(table_lines):
# Rimuovi pipe e spazi per controllare se contiene solo trattini/due punti
content = line.replace('|', '').replace(':', '').replace('-', '').strip()
if not content: # Se vuota dopo aver rimosso i caratteri separatori, è una riga separatrice
separator_index = i
break
if separator_index <= 0:
return None
def split_row(row_str):
# Split semplice per pipe
parts = row_str.split('|')
# Rimuovi primo e ultimo elemento se vuoti (dovuti ai pipe esterni)
if row_str.startswith('|'): parts.pop(0)
if row_str.endswith('|'): parts.pop(-1)
return [p.strip() for p in parts]
headers = split_row(table_lines[separator_index - 1])
data_rows = table_lines[separator_index + 1:]
data = []
for row in data_rows:
cols = split_row(row)
# Allinea colonne
if len(cols) == len(headers):
data.append(cols)
elif len(cols) < len(headers):
data.append(cols + [''] * (len(headers) - len(cols)))
else:
data.append(cols[:len(headers)])
if not data:
return None
return pd.DataFrame(data, columns=headers)
def step1_process_pages():
"""Step 1: Processa pagine -> Markdown -> CSV (Algoritmico)"""
print("\n--- INIZIO STEP 1: Word -> Markdown -> CSV (Gemini) ---")
word_files = glob.glob(os.path.join(INPUT_DIR, "*.docx"))
if not word_files:
print("Nessun file .docx trovato in input.")
return
# Colonne attese per validazione (opzionale, ma utile per il prompt)
expected_columns = ["ID", "Descrizione", "Risultato Atteso", "Risultato Ottenuto", "Stato"]
# Definizione System Prompt
system_prompt = f"""
Sei un esperto QA. Analizza il frammento di documento fornito.
Estrai TUTTI i dati relativi a Use Case, Test Case, Scenari e Verifiche senza rielaborare il testo, il testo di Use Case, Test Case, Scenari e Verifiche deve rimanere IDENTICO a quello letto.
Output richiesto:
- Genera una TABELLA MARKDOWN valida.
- La tabella DEVE avere ESATTAMENTE queste colonne: {', '.join(expected_columns)}.
- Se un dato non è presente, lascia la cella vuota.
- NON aggiungere altro testo prima o dopo la tabella.
- NON inventare ID, se li trovi usali altrimenti lascia vuoto.
"""
# Configurazione Modello
# Utilizziamo gemini-flash-latest come richiesto (o la versione più vicina disponibile)
generation_config = {
"temperature": 0.1,
}
# Configurazione Safety Settings
safety_settings = {
genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE,
genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE,
genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE,
genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE,
}
try:
model = genai.GenerativeModel(
model_name="gemini-flash-latest",
system_instruction=system_prompt,
generation_config=generation_config,
safety_settings=safety_settings
)
except Exception as e:
print(f"Errore inizializzazione modello Gemini: {e}")
return
for file_path in word_files:
filename = os.path.basename(file_path)
file_base_name = os.path.splitext(filename)[0]
print(f"Elaborazione: {filename}...")
# Directory output
output_subdir = os.path.join(OUTPUT_DIR, file_base_name)
os.makedirs(output_subdir, exist_ok=True)
chunk_count = 0
for chunk in read_docx_chunks(file_path):
if not chunk:
continue
chunk_count += 1
print(f" -> Elaborazione Pagina/Chunk {chunk_count}...")
user_prompt = f"Frammento {chunk_count}:\n\n{chunk}"
try:
# Chiamata a Gemini
response = model.generate_content(user_prompt)
try:
content = response.text.strip()
except ValueError:
print(f" -> Warning: response.text non disponibile. Finish reason: {response.candidates[0].finish_reason if response.candidates else 'N/A'}")
content = ""
# Pulisci markdown fences
if content.startswith("```markdown"):
content = content.replace("```markdown", "").replace("```", "")
elif content.startswith("```"):
content = content.replace("```", "")
content = content.strip()
if content and "NO_DATA" not in content:
# 1. Salva Markdown
md_path = os.path.join(output_subdir, f"chunk_{chunk_count}.md")
with open(md_path, 'w', encoding='utf-8') as f:
f.write(content)
# 2. Converti in CSV algoritmicamente
df = parse_markdown_table(content)
if df is not None and not df.empty:
csv_path = os.path.join(output_subdir, f"chunk_{chunk_count}.csv")
df.to_csv(csv_path, index=False, encoding='utf-8')
# print(f" -> Generato CSV: {csv_path}")
else:
print(f" -> Warning: Impossibile parsare tabella in chunk {chunk_count}")
else:
pass
# Rispetta i rate limit (opzionale ma consigliato)
time.sleep(1)
except Exception as e:
print(f"Errore chunk {chunk_count} di {filename}: {e}")
def step2_aggregate_csvs():
"""Step 2: Aggrega i frammenti CSV in un unico file finale per documento."""
print("\n--- INIZIO STEP 2: Aggregazione CSV ---")
subdirs = [d for d in glob.glob(os.path.join(OUTPUT_DIR, "*")) if os.path.isdir(d)]
if not subdirs:
print("Nessuna cartella di frammenti trovata in output.")
return
for fragments_dir in subdirs:
doc_name = os.path.basename(fragments_dir)
print(f"Aggregazione per: {doc_name}...")
all_files = glob.glob(os.path.join(fragments_dir, "*.csv"))
try:
all_files.sort(key=lambda x: int(os.path.basename(x).split('_')[1].split('.')[0]))
except:
all_files.sort()
if not all_files:
print(f" -> Nessun frammento CSV trovato in {fragments_dir}")
continue
combined_df = pd.concat((pd.read_csv(f) for f in all_files), ignore_index=True)
output_csv_path = os.path.join(OUTPUT_DIR, f"{doc_name}.csv")
combined_df.to_csv(output_csv_path, index=False, encoding='utf-8')
print(f" -> Creato CSV finale: {output_csv_path}")
def main():
step1_process_pages()
step2_aggregate_csvs()
print("\nProcesso completato.")
if __name__ == "__main__":
main()

256
process_files_openai.py Normal file
View File

@@ -0,0 +1,256 @@
import os
import json
import glob
import pandas as pd
from docx import Document
from docx.document import Document as _Document
from docx.oxml.text.paragraph import CT_P
from docx.oxml.table import CT_Tbl
from docx.table import _Cell, Table
from docx.text.paragraph import Paragraph
from openai import OpenAI
from dotenv import load_dotenv
# Carica le variabili d'ambiente
load_dotenv()
# Configurazione directory
INPUT_DIR = './input'
OUTPUT_DIR = './output'
TEMPLATE_FILE = './template/use_case_template.csv'
# Assicurati che le directory esistano
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Inizializza client OpenAI
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
def iter_block_items(parent):
"""
Yield each paragraph and table child within *parent*, in document order.
Each returned value is an instance of either Table or Paragraph.
"""
if isinstance(parent, _Document):
parent_elm = parent.element.body
elif isinstance(parent, _Cell):
parent_elm = parent._tc
else:
raise ValueError("something's not right")
for child in parent_elm.iterchildren():
if isinstance(child, CT_P):
yield Paragraph(child, parent)
elif isinstance(child, CT_Tbl):
yield Table(child, parent)
def read_docx_chunks(file_path, chunk_size=4000):
"""
Legge il file .docx e restituisce un generatore di chunk di testo.
Mantiene l'ordine di paragrafi e tabelle.
chunk_size: numero approssimativo di caratteri per chunk.
"""
try:
doc = Document(file_path)
current_chunk = []
current_length = 0
for block in iter_block_items(doc):
text = ""
if isinstance(block, Paragraph):
text = block.text.strip()
if text:
text += "\n"
elif isinstance(block, Table):
# Converti tabella in testo (markdown-like o pipe separated)
for row in block.rows:
row_data = [cell.text.strip() for cell in row.cells]
text += "| " + " | ".join(row_data) + " |\n"
text += "\n"
if text:
current_chunk.append(text)
current_length += len(text)
# Se superiamo la dimensione del chunk, yieldiamo
if current_length >= chunk_size:
yield "".join(current_chunk)
current_chunk = []
current_length = 0
# Yield dell'ultimo chunk se presente
if current_chunk:
yield "".join(current_chunk)
except Exception as e:
print(f"Errore lettura {file_path}: {e}")
yield None
def parse_markdown_table(text):
"""
Analizza un testo contenente una tabella Markdown e restituisce un DataFrame pandas.
"""
lines = text.split('\n')
table_lines = [line.strip() for line in lines if line.strip().startswith('|')]
if len(table_lines) < 2:
return None
# Trova la riga separatrice (es. |---|---|)
separator_index = -1
for i, line in enumerate(table_lines):
# Rimuovi pipe e spazi per controllare se contiene solo trattini/due punti
content = line.replace('|', '').replace(':', '').replace('-', '').strip()
if not content: # Se vuota dopo aver rimosso i caratteri separatori, è una riga separatrice
separator_index = i
break
if separator_index <= 0:
return None
def split_row(row_str):
# Split semplice per pipe
parts = row_str.split('|')
# Rimuovi primo e ultimo elemento se vuoti (dovuti ai pipe esterni)
if row_str.startswith('|'): parts.pop(0)
if row_str.endswith('|'): parts.pop(-1)
return [p.strip() for p in parts]
headers = split_row(table_lines[separator_index - 1])
data_rows = table_lines[separator_index + 1:]
data = []
for row in data_rows:
cols = split_row(row)
# Allinea colonne
if len(cols) == len(headers):
data.append(cols)
elif len(cols) < len(headers):
data.append(cols + [''] * (len(headers) - len(cols)))
else:
data.append(cols[:len(headers)])
if not data:
return None
return pd.DataFrame(data, columns=headers)
def step1_process_pages():
"""Step 1: Processa pagine -> Markdown -> CSV (Algoritmico)"""
print("\n--- INIZIO STEP 1: Word -> Markdown -> CSV ---")
word_files = glob.glob(os.path.join(INPUT_DIR, "*.docx"))
if not word_files:
print("Nessun file .docx trovato in input.")
return
# Colonne attese per validazione (opzionale, ma utile per il prompt)
expected_columns = ["ID", "Descrizione", "Risultato Atteso", "Risultato Ottenuto", "Stato"]
for file_path in word_files:
filename = os.path.basename(file_path)
file_base_name = os.path.splitext(filename)[0]
print(f"Elaborazione: {filename}...")
# Directory output
output_subdir = os.path.join(OUTPUT_DIR, file_base_name)
os.makedirs(output_subdir, exist_ok=True)
chunk_count = 0
for chunk in read_docx_chunks(file_path):
if not chunk:
continue
chunk_count += 1
print(f" -> Elaborazione Pagina/Chunk {chunk_count}...")
system_prompt = f"""
Sei un esperto QA. Analizza il frammento di documento fornito.
Estrai TUTTI i dati relativi a Use Case, Test Case, Scenari e Verifiche senza rielaborare il testo, il testo di Use Case, Test Case, Scenari e Verifiche deve rimanere IDENTICO a quello letto.
Output richiesto:
- Genera una TABELLA MARKDOWN valida.
- La tabella DEVE avere ESATTAMENTE queste colonne: {', '.join(expected_columns)}.
- Se un dato non è presente, lascia la cella vuota.
- Se non trovi NESSUN dato rilevante, rispondi ESATTAMENTE con "NO_DATA".
- NON aggiungere altro testo prima o dopo la tabella.
"""
user_prompt = f"Frammento {chunk_count}:\n\n{chunk}"
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.1
)
content = response.choices[0].message.content.strip()
# Pulisci markdown fences
if content.startswith("```markdown"):
content = content.replace("```markdown", "").replace("```", "")
elif content.startswith("```"):
content = content.replace("```", "")
content = content.strip()
if content and "NO_DATA" not in content:
# 1. Salva Markdown
md_path = os.path.join(output_subdir, f"chunk_{chunk_count}.md")
with open(md_path, 'w', encoding='utf-8') as f:
f.write(content)
# 2. Converti in CSV algoritmicamente
df = parse_markdown_table(content)
if df is not None and not df.empty:
csv_path = os.path.join(output_subdir, f"chunk_{chunk_count}.csv")
df.to_csv(csv_path, index=False, encoding='utf-8')
# print(f" -> Generato CSV: {csv_path}")
else:
print(f" -> Warning: Impossibile parsare tabella in chunk {chunk_count}")
else:
pass
except Exception as e:
print(f"Errore chunk {chunk_count} di {filename}: {e}")
def step2_aggregate_csvs():
"""Step 2: Aggrega i frammenti CSV in un unico file finale per documento."""
print("\n--- INIZIO STEP 2: Aggregazione CSV ---")
subdirs = [d for d in glob.glob(os.path.join(OUTPUT_DIR, "*")) if os.path.isdir(d)]
if not subdirs:
print("Nessuna cartella di frammenti trovata in output.")
return
for fragments_dir in subdirs:
doc_name = os.path.basename(fragments_dir)
print(f"Aggregazione per: {doc_name}...")
all_files = glob.glob(os.path.join(fragments_dir, "*.csv"))
try:
all_files.sort(key=lambda x: int(os.path.basename(x).split('_')[1].split('.')[0]))
except:
all_files.sort()
if not all_files:
print(f" -> Nessun frammento CSV trovato in {fragments_dir}")
continue
combined_df = pd.concat((pd.read_csv(f) for f in all_files), ignore_index=True)
output_csv_path = os.path.join(OUTPUT_DIR, f"{doc_name}.csv")
combined_df.to_csv(output_csv_path, index=False, encoding='utf-8')
print(f" -> Creato CSV finale: {output_csv_path}")
def main():
step1_process_pages()
step2_aggregate_csvs()
print("\nProcesso completato.")
if __name__ == "__main__":
main()

5
requirements.txt Normal file
View File

@@ -0,0 +1,5 @@
openai
python-docx
pandas
python-dotenv
google-generativeai

0
template/.gitkeep Normal file
View File

View File

@@ -0,0 +1 @@
ID,Descrizione,Risultato Atteso,Risultato Ottenuto,Stato
1 ID Descrizione Risultato Atteso Risultato Ottenuto Stato