Cas d'usage 2: Répertoire PDF

Implémentation de RAG sur un répertoire contenant des fichiers PDF

Implémentation de RAG sur un Répertoire de Fichiers PDF

Dans de nombreux cas d'usage réels, les informations sont réparties dans plusieurs fichiers PDF plutôt que dans un seul document. Ce cas d'usage explore comment implémenter la technique Naive RAG sur un répertoire contenant plusieurs fichiers PDF.

Architecture du Système

Traitement RAG sur un répertoire de fichiers PDF +----------------------+ | Répertoire de | | fichiers PDF | +----------------------+ | v +----------------------+ | Chargement et | | indexation des | | fichiers | +----------------------+ | v +----------------------+ | Extraction de texte | | de chaque PDF | +----------------------+ | v +----------------------+ | Chunking des | | documents | +----------------------+ | v +----------------------+ | Création des | | embeddings | +----------------------+ | v +----------------------+ | Stockage dans | | base vectorielle | +----------------------+ | v +----------------------+ +----------------------+ | Requête utilisateur | <---- | Interface utilisateur| +----------------------+ +----------------------+ | v +----------------------+ | Recherche des chunks | | pertinents | +----------------------+ | v +----------------------+ | Génération de | | réponse avec | | référence aux | | sources | +----------------------+ | v +----------------------+ | Présentation de la | | réponse avec liens | | vers les documents | | sources | +----------------------+

Implémentation en Python

Étape 1: Installation des dépendances

pip install langchain pypdf openai chromadb tiktoken tqdm

Étape 2: Chargement des fichiers PDF du répertoire

import os
from langchain.document_loaders import PyPDFLoader
from tqdm import tqdm

def load_pdfs_from_directory(directory_path):
    """Charge tous les fichiers PDF d'un répertoire."""
    documents = []
    pdf_files = [f for f in os.listdir(directory_path) if f.endswith('.pdf')]
    
    for pdf_file in tqdm(pdf_files, desc="Chargement des PDFs"):
        file_path = os.path.join(directory_path, pdf_file)
        try:
            loader = PyPDFLoader(file_path)
            docs = loader.load()
            # Ajouter le nom du fichier comme métadonnée
            for doc in docs:
                doc.metadata["source"] = pdf_file
            documents.extend(docs)
            print(f"Chargé: {pdf_file} - {len(docs)} pages")
        except Exception as e:
            print(f"Erreur lors du chargement de {pdf_file}: {e}")
    
    return documents

# Charger tous les PDFs du répertoire
pdf_directory = "chemin/vers/repertoire_pdf"
all_documents = load_pdfs_from_directory(pdf_directory)
print(f"Total de documents chargés: {len(all_documents)}")

Étape 3: Chunking des documents

from langchain.text_splitter import RecursiveCharacterTextSplitter

# Diviser les documents en chunks
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    length_function=len
)
chunks = text_splitter.split_documents(all_documents)
print(f"Nombre de chunks créés: {len(chunks)}")

Étape 4: Création des embeddings et stockage

from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
import uuid

# Générer un ID unique pour cette base de connaissances
knowledge_base_id = str(uuid.uuid4())
persist_directory = f"./chroma_db_{knowledge_base_id}"

# Initialiser le modèle d'embedding
embeddings = OpenAIEmbeddings()

# Créer et persister la base vectorielle
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    persist_directory=persist_directory
)
vectorstore.persist()
print(f"Base vectorielle créée et persistée dans: {persist_directory}")

Étape 5: Configuration du modèle de langage et de la chaîne RAG

from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQAWithSourcesChain

# Initialiser le modèle de langage
llm = ChatOpenAI(model_name="gpt-3.5-turbo")

# Créer la chaîne de traitement RAG avec sources
qa_chain = RetrievalQAWithSourcesChain.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(search_kwargs={"k": 4}),
    return_source_documents=True
)

Étape 6: Fonction d'interrogation avec références aux sources

def query_with_sources(query_text):
    """Interroge la base de connaissances et retourne la réponse avec les sources."""
    result = qa_chain({"question": query_text})
    
    # Extraire la réponse et les sources
    answer = result["answer"]
    sources = result["sources"].split(", ")
    source_docs = result["source_documents"]
    
    # Créer un dictionnaire des sources uniques avec leurs pages
    unique_sources = {}
    for doc in source_docs:
        source = doc.metadata["source"]
        page = doc.metadata.get("page", 1)
        if source in unique_sources:
            if page not in unique_sources[source]:
                unique_sources[source].append(page)
        else:
            unique_sources[source] = [page]
    
    # Formater les sources pour l'affichage
    formatted_sources = []
    for source, pages in unique_sources.items():
        pages_str = ", ".join([str(p) for p in pages])
        formatted_sources.append(f"{source} (pages: {pages_str})")
    
    return {
        "answer": answer,
        "sources": formatted_sources
    }

# Exemple d'utilisation
query = "Quels sont les principaux défis mentionnés dans les documents?"
result = query_with_sources(query)
print(f"Réponse: {result['answer']}\n")
print("Sources:")
for source in result['sources']:
    print(f"- {source}")

Implémentation dans n8n

Voici comment implémenter un workflow RAG pour un répertoire de PDFs dans n8n :

Workflow pour Répertoire PDF

  1. HTTP Request Node : Pour déclencher le workflow via une API
  2. Directory List Node : Pour lister tous les fichiers PDF du répertoire
  3. Split In Batches Node : Pour traiter les fichiers par lots
  4. Read Binary Files Node : Pour lire chaque fichier PDF
  5. Code Node : Pour extraire et segmenter le texte de chaque PDF
    const { PythonShell } = require('python-shell');
    const fs = require('fs');
    
    // Écrire le PDF dans un fichier temporaire
    const pdfBuffer = $input.item.binary.data;
    const fileName = $input.item.json.name;
    const tempPath = `/tmp/${fileName}`;
    fs.writeFileSync(tempPath, Buffer.from(pdfBuffer, 'base64'));
    
    // Script Python pour extraire et segmenter le texte
    const pythonScript = `
    import pypdf
    import json
    
    reader = pypdf.PdfReader('${tempPath}')
    chunks = []
    
    # Extraire le texte page par page
    for i, page in enumerate(reader.pages):
        text = page.extract_text()
        
        # Diviser en paragraphes
        paragraphs = text.split('\\n\\n')
        
        # Créer des chunks de taille raisonnable
        current_chunk = ""
        for para in paragraphs:
            if len(current_chunk) + len(para) < 1000:
                current_chunk += para + " "
            else:
                chunks.append({
                    "text": current_chunk, 
                    "page": i+1,
                    "source": "${fileName}"
                })
                current_chunk = para + " "
        
        if current_chunk:
            chunks.append({
                "text": current_chunk, 
                "page": i+1,
                "source": "${fileName}"
            })
    
    print(json.dumps(chunks))
    `;
    
    // Exécuter le script Python
    return new Promise((resolve, reject) => {
      PythonShell.runString(pythonScript, null, (err, results) => {
        if (err) reject(err);
        else {
          const chunks = JSON.parse(results[0]);
          return resolve([{json: {
            fileName: fileName,
            chunks: chunks
          }}]);
        }
      });
    });
  6. Merge Node : Pour combiner tous les chunks de tous les fichiers
  7. OpenAI Node : Pour créer les embeddings de chaque chunk
  8. Function Node : Pour stocker les embeddings dans une base de données
  9. HTTP Request Node : Pour recevoir la requête de l'utilisateur
  10. OpenAI Node : Pour créer l'embedding de la requête
  11. Function Node : Pour rechercher les chunks les plus similaires
  12. OpenAI Node : Pour générer une réponse basée sur la requête et les chunks pertinents
  13. Function Node : Pour formater la réponse avec les références aux sources
  14. Respond to Webhook Node : Pour renvoyer la réponse

Optimisations et Bonnes Pratiques

Gestion des métadonnées

Pour un répertoire de PDFs, les métadonnées sont cruciales pour tracer l'origine des informations :

Stratégies d'indexation

def update_vectorstore(directory_path, vectorstore):
    """Met à jour la base vectorielle avec de nouveaux documents."""
    pdf_files = [f for f in os.listdir(directory_path) if f.endswith('.pdf')]
    
    # Récupérer les sources déjà indexées
    existing_sources = set()
    for doc in vectorstore.get():
        if "source" in doc.metadata:
            existing_sources.add(doc.metadata["source"])
    
    # Identifier les nouveaux fichiers
    new_files = [f for f in pdf_files if f not in existing_sources]
    
    if not new_files:
        print("Aucun nouveau fichier à indexer.")
        return vectorstore
    
    # Charger et indexer les nouveaux fichiers
    new_documents = []
    for pdf_file in tqdm(new_files, desc="Chargement des nouveaux PDFs"):
        file_path = os.path.join(directory_path, pdf_file)
        try:
            loader = PyPDFLoader(file_path)
            docs = loader.load()
            for doc in docs:
                doc.metadata["source"] = pdf_file
            new_documents.extend(docs)
            print(f"Chargé: {pdf_file} - {len(docs)} pages")
        except Exception as e:
            print(f"Erreur lors du chargement de {pdf_file}: {e}")
    
    # Chunking des nouveaux documents
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=200,
        length_function=len
    )
    new_chunks = text_splitter.split_documents(new_documents)
    
    # Ajouter les nouveaux chunks à la base vectorielle
    vectorstore.add_documents(new_chunks)
    vectorstore.persist()
    
    print(f"Base vectorielle mise à jour avec {len(new_chunks)} nouveaux chunks.")
    return vectorstore

Considérations de performance

import multiprocessing
from functools import partial

def process_pdf(file_path):
    """Traite un seul fichier PDF."""
    try:
        loader = PyPDFLoader(file_path)
        docs = loader.load()
        for doc in docs:
            doc.metadata["source"] = os.path.basename(file_path)
        return docs
    except Exception as e:
        print(f"Erreur lors du traitement de {file_path}: {e}")
        return []

def load_pdfs_parallel(directory_path, max_workers=None):
    """Charge les PDFs en parallèle."""
    pdf_files = [os.path.join(directory_path, f) for f in os.listdir(directory_path) if f.endswith('.pdf')]
    
    if not max_workers:
        max_workers = multiprocessing.cpu_count()
    
    with multiprocessing.Pool(processes=max_workers) as pool:
        results = list(tqdm(
            pool.imap(process_pdf, pdf_files),
            total=len(pdf_files),
            desc="Traitement des PDFs"
        ))
    
    # Aplatir la liste de listes
    documents = [doc for sublist in results for doc in sublist]
    return documents