Implémentation de RAG sur un Répertoire de Fichiers PDF
Dans de nombreux cas d'usage réels, les informations sont réparties dans plusieurs fichiers PDF plutôt que dans un seul document. Ce cas d'usage explore comment implémenter la technique Naive RAG sur un répertoire contenant plusieurs fichiers PDF.
Architecture du Système
Traitement RAG sur un répertoire de fichiers PDF
+----------------------+
| Répertoire de |
| fichiers PDF |
+----------------------+
|
v
+----------------------+
| Chargement et |
| indexation des |
| fichiers |
+----------------------+
|
v
+----------------------+
| Extraction de texte |
| de chaque PDF |
+----------------------+
|
v
+----------------------+
| Chunking des |
| documents |
+----------------------+
|
v
+----------------------+
| Création des |
| embeddings |
+----------------------+
|
v
+----------------------+
| Stockage dans |
| base vectorielle |
+----------------------+
|
v
+----------------------+ +----------------------+
| Requête utilisateur | <---- | Interface utilisateur|
+----------------------+ +----------------------+
|
v
+----------------------+
| Recherche des chunks |
| pertinents |
+----------------------+
|
v
+----------------------+
| Génération de |
| réponse avec |
| référence aux |
| sources |
+----------------------+
|
v
+----------------------+
| Présentation de la |
| réponse avec liens |
| vers les documents |
| sources |
+----------------------+
Implémentation en Python
Étape 1: Installation des dépendances
pip install langchain pypdf openai chromadb tiktoken tqdm
Étape 2: Chargement des fichiers PDF du répertoire
import os
from langchain.document_loaders import PyPDFLoader
from tqdm import tqdm
def load_pdfs_from_directory(directory_path):
"""Charge tous les fichiers PDF d'un répertoire."""
documents = []
pdf_files = [f for f in os.listdir(directory_path) if f.endswith('.pdf')]
for pdf_file in tqdm(pdf_files, desc="Chargement des PDFs"):
file_path = os.path.join(directory_path, pdf_file)
try:
loader = PyPDFLoader(file_path)
docs = loader.load()
# Ajouter le nom du fichier comme métadonnée
for doc in docs:
doc.metadata["source"] = pdf_file
documents.extend(docs)
print(f"Chargé: {pdf_file} - {len(docs)} pages")
except Exception as e:
print(f"Erreur lors du chargement de {pdf_file}: {e}")
return documents
# Charger tous les PDFs du répertoire
pdf_directory = "chemin/vers/repertoire_pdf"
all_documents = load_pdfs_from_directory(pdf_directory)
print(f"Total de documents chargés: {len(all_documents)}")
Étape 3: Chunking des documents
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Diviser les documents en chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
chunks = text_splitter.split_documents(all_documents)
print(f"Nombre de chunks créés: {len(chunks)}")
Étape 4: Création des embeddings et stockage
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
import uuid
# Générer un ID unique pour cette base de connaissances
knowledge_base_id = str(uuid.uuid4())
persist_directory = f"./chroma_db_{knowledge_base_id}"
# Initialiser le modèle d'embedding
embeddings = OpenAIEmbeddings()
# Créer et persister la base vectorielle
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory=persist_directory
)
vectorstore.persist()
print(f"Base vectorielle créée et persistée dans: {persist_directory}")
Étape 5: Configuration du modèle de langage et de la chaîne RAG
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQAWithSourcesChain
# Initialiser le modèle de langage
llm = ChatOpenAI(model_name="gpt-3.5-turbo")
# Créer la chaîne de traitement RAG avec sources
qa_chain = RetrievalQAWithSourcesChain.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(search_kwargs={"k": 4}),
return_source_documents=True
)
Étape 6: Fonction d'interrogation avec références aux sources
def query_with_sources(query_text):
"""Interroge la base de connaissances et retourne la réponse avec les sources."""
result = qa_chain({"question": query_text})
# Extraire la réponse et les sources
answer = result["answer"]
sources = result["sources"].split(", ")
source_docs = result["source_documents"]
# Créer un dictionnaire des sources uniques avec leurs pages
unique_sources = {}
for doc in source_docs:
source = doc.metadata["source"]
page = doc.metadata.get("page", 1)
if source in unique_sources:
if page not in unique_sources[source]:
unique_sources[source].append(page)
else:
unique_sources[source] = [page]
# Formater les sources pour l'affichage
formatted_sources = []
for source, pages in unique_sources.items():
pages_str = ", ".join([str(p) for p in pages])
formatted_sources.append(f"{source} (pages: {pages_str})")
return {
"answer": answer,
"sources": formatted_sources
}
# Exemple d'utilisation
query = "Quels sont les principaux défis mentionnés dans les documents?"
result = query_with_sources(query)
print(f"Réponse: {result['answer']}\n")
print("Sources:")
for source in result['sources']:
print(f"- {source}")
Implémentation dans n8n
Voici comment implémenter un workflow RAG pour un répertoire de PDFs dans n8n :
Workflow pour Répertoire PDF
- HTTP Request Node : Pour déclencher le workflow via une API
- Directory List Node : Pour lister tous les fichiers PDF du répertoire
- Split In Batches Node : Pour traiter les fichiers par lots
- Read Binary Files Node : Pour lire chaque fichier PDF
- Code Node : Pour extraire et segmenter le texte de chaque PDF
const { PythonShell } = require('python-shell'); const fs = require('fs'); // Écrire le PDF dans un fichier temporaire const pdfBuffer = $input.item.binary.data; const fileName = $input.item.json.name; const tempPath = `/tmp/${fileName}`; fs.writeFileSync(tempPath, Buffer.from(pdfBuffer, 'base64')); // Script Python pour extraire et segmenter le texte const pythonScript = ` import pypdf import json reader = pypdf.PdfReader('${tempPath}') chunks = [] # Extraire le texte page par page for i, page in enumerate(reader.pages): text = page.extract_text() # Diviser en paragraphes paragraphs = text.split('\\n\\n') # Créer des chunks de taille raisonnable current_chunk = "" for para in paragraphs: if len(current_chunk) + len(para) < 1000: current_chunk += para + " " else: chunks.append({ "text": current_chunk, "page": i+1, "source": "${fileName}" }) current_chunk = para + " " if current_chunk: chunks.append({ "text": current_chunk, "page": i+1, "source": "${fileName}" }) print(json.dumps(chunks)) `; // Exécuter le script Python return new Promise((resolve, reject) => { PythonShell.runString(pythonScript, null, (err, results) => { if (err) reject(err); else { const chunks = JSON.parse(results[0]); return resolve([{json: { fileName: fileName, chunks: chunks }}]); } }); });
- Merge Node : Pour combiner tous les chunks de tous les fichiers
- OpenAI Node : Pour créer les embeddings de chaque chunk
- Function Node : Pour stocker les embeddings dans une base de données
- HTTP Request Node : Pour recevoir la requête de l'utilisateur
- OpenAI Node : Pour créer l'embedding de la requête
- Function Node : Pour rechercher les chunks les plus similaires
- OpenAI Node : Pour générer une réponse basée sur la requête et les chunks pertinents
- Function Node : Pour formater la réponse avec les références aux sources
- Respond to Webhook Node : Pour renvoyer la réponse
Optimisations et Bonnes Pratiques
Gestion des métadonnées
Pour un répertoire de PDFs, les métadonnées sont cruciales pour tracer l'origine des informations :
- Nom du fichier source
- Numéro de page
- Date de création/modification du document
- Auteur du document (si disponible)
- Titre du document (si disponible)
Stratégies d'indexation
- Indexation incrémentale : Ajouter de nouveaux documents sans recréer toute la base
- Détection des doublons : Éviter d'indexer plusieurs fois le même document
- Gestion des versions : Suivre les mises à jour des documents
def update_vectorstore(directory_path, vectorstore):
"""Met à jour la base vectorielle avec de nouveaux documents."""
pdf_files = [f for f in os.listdir(directory_path) if f.endswith('.pdf')]
# Récupérer les sources déjà indexées
existing_sources = set()
for doc in vectorstore.get():
if "source" in doc.metadata:
existing_sources.add(doc.metadata["source"])
# Identifier les nouveaux fichiers
new_files = [f for f in pdf_files if f not in existing_sources]
if not new_files:
print("Aucun nouveau fichier à indexer.")
return vectorstore
# Charger et indexer les nouveaux fichiers
new_documents = []
for pdf_file in tqdm(new_files, desc="Chargement des nouveaux PDFs"):
file_path = os.path.join(directory_path, pdf_file)
try:
loader = PyPDFLoader(file_path)
docs = loader.load()
for doc in docs:
doc.metadata["source"] = pdf_file
new_documents.extend(docs)
print(f"Chargé: {pdf_file} - {len(docs)} pages")
except Exception as e:
print(f"Erreur lors du chargement de {pdf_file}: {e}")
# Chunking des nouveaux documents
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
new_chunks = text_splitter.split_documents(new_documents)
# Ajouter les nouveaux chunks à la base vectorielle
vectorstore.add_documents(new_chunks)
vectorstore.persist()
print(f"Base vectorielle mise à jour avec {len(new_chunks)} nouveaux chunks.")
return vectorstore
Considérations de performance
- Traitement par lots : Pour les grands répertoires, traiter les fichiers par lots
- Parallélisation : Utiliser le multiprocessing pour accélérer l'extraction et l'embedding
- Compression des embeddings : Réduire la taille de la base vectorielle
import multiprocessing
from functools import partial
def process_pdf(file_path):
"""Traite un seul fichier PDF."""
try:
loader = PyPDFLoader(file_path)
docs = loader.load()
for doc in docs:
doc.metadata["source"] = os.path.basename(file_path)
return docs
except Exception as e:
print(f"Erreur lors du traitement de {file_path}: {e}")
return []
def load_pdfs_parallel(directory_path, max_workers=None):
"""Charge les PDFs en parallèle."""
pdf_files = [os.path.join(directory_path, f) for f in os.listdir(directory_path) if f.endswith('.pdf')]
if not max_workers:
max_workers = multiprocessing.cpu_count()
with multiprocessing.Pool(processes=max_workers) as pool:
results = list(tqdm(
pool.imap(process_pdf, pdf_files),
total=len(pdf_files),
desc="Traitement des PDFs"
))
# Aplatir la liste de listes
documents = [doc for sublist in results for doc in sublist]
return documents