Cas d'usage 3: Formats Variés

Implémentation de RAG sur un répertoire contenant des formats variés (PPTX, PDF, DOCX, XLSX, MD)

Implémentation de RAG sur un Répertoire de Formats Variés

Dans un environnement professionnel réel, les informations sont souvent dispersées dans des documents de formats variés. Ce cas d'usage explore comment implémenter la technique Naive RAG sur un répertoire contenant différents types de fichiers : PPTX (présentations PowerPoint), PDF, DOCX (documents Word), XLSX (feuilles de calcul Excel) et MD (Markdown).

Architecture du Système Multi-Format

Traitement RAG sur un répertoire de formats variés +----------------------+ | Répertoire de | | formats variés | | (PDF, DOCX, PPTX, | | XLSX, MD) | +----------------------+ | v +----------------------+ | Détection du format | | et routage | +----------------------+ | +------------------------+------------------------+------------------------+------------------------+ | | | | | v v v v v +----------------------+ +----------------------+ +----------------------+ +----------------------+ +----------------------+ | Extracteur PDF | | Extracteur DOCX | | Extracteur PPTX | | Extracteur XLSX | | Extracteur MD | +----------------------+ +----------------------+ +----------------------+ +----------------------+ +----------------------+ | | | | | v v v v v +----------------------+ +----------------------+ +----------------------+ +----------------------+ +----------------------+ | Texte + métadonnées | | Texte + métadonnées | | Texte + métadonnées | | Texte + métadonnées | | Texte + métadonnées | +----------------------+ +----------------------+ +----------------------+ +----------------------+ +----------------------+ | | | | | +------------------------+------------------------+------------------------+------------------------+ | v +----------------------+ | Normalisation et | | nettoyage du texte | +----------------------+ | v +----------------------+ | Chunking adaptatif | | selon le format | +----------------------+ | v +----------------------+ | Création des | | embeddings | +----------------------+ | v +----------------------+ | Stockage dans | | base vectorielle | | avec métadonnées | | de format | +----------------------+ | v +----------------------+ +----------------------+ | Requête utilisateur | <---- | Interface utilisateur| +----------------------+ +----------------------+ | v +----------------------+ | Recherche avec | | filtrage par format | | (optionnel) | +----------------------+ | v +----------------------+ | Génération de | | réponse avec | | référence aux | | sources et formats | +----------------------+

Implémentation en Python

Étape 1: Installation des dépendances

pip install langchain pypdf python-docx pptx openpyxl markdown openai chromadb tiktoken tqdm

Étape 2: Création des extracteurs pour chaque format

import os
from langchain.document_loaders import (
    PyPDFLoader,
    Docx2txtLoader,
    UnstructuredPowerPointLoader,
    UnstructuredExcelLoader,
    TextLoader
)
from tqdm import tqdm

class MultiFormatLoader:
    """Classe pour charger des documents de différents formats."""
    
    def __init__(self):
        self.loaders = {
            ".pdf": PyPDFLoader,
            ".docx": Docx2txtLoader,
            ".pptx": UnstructuredPowerPointLoader,
            ".xlsx": UnstructuredExcelLoader,
            ".md": TextLoader
        }
    
    def load_document(self, file_path):
        """Charge un document en fonction de son extension."""
        _, ext = os.path.splitext(file_path)
        if ext.lower() not in self.loaders:
            raise ValueError(f"Format non supporté: {ext}")
        
        loader_class = self.loaders[ext.lower()]
        loader = loader_class(file_path)
        docs = loader.load()
        
        # Ajouter des métadonnées
        for doc in docs:
            doc.metadata["source"] = os.path.basename(file_path)
            doc.metadata["format"] = ext.lower().replace(".", "")
        
        return docs
    
    def load_directory(self, directory_path):
        """Charge tous les documents supportés d'un répertoire."""
        documents = []
        supported_extensions = set(self.loaders.keys())
        
        # Lister tous les fichiers du répertoire
        all_files = []
        for root, _, files in os.walk(directory_path):
            for file in files:
                _, ext = os.path.splitext(file)
                if ext.lower() in supported_extensions:
                    all_files.append(os.path.join(root, file))
        
        # Charger chaque fichier
        for file_path in tqdm(all_files, desc="Chargement des documents"):
            try:
                docs = self.load_document(file_path)
                documents.extend(docs)
                print(f"Chargé: {os.path.basename(file_path)} - {len(docs)} pages/sections")
            except Exception as e:
                print(f"Erreur lors du chargement de {file_path}: {e}")
        
        return documents

# Utilisation
loader = MultiFormatLoader()
documents = loader.load_directory("chemin/vers/repertoire_multi_format")
print(f"Total de documents chargés: {len(documents)}")

Étape 3: Normalisation et chunking adaptatif

from langchain.text_splitter import RecursiveCharacterTextSplitter

def normalize_text(text):
    """Normalise le texte extrait des différents formats."""
    # Supprimer les caractères de contrôle
    import re
    text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\xff]', '', text)
    
    # Normaliser les espaces
    text = re.sub(r'\s+', ' ', text)
    
    # Supprimer les lignes vides multiples
    text = re.sub(r'\n\s*\n', '\n\n', text)
    
    return text.strip()

def get_chunk_params(format_type):
    """Retourne les paramètres de chunking adaptés au format."""
    params = {
        "pdf": {"chunk_size": 1000, "chunk_overlap": 200},
        "docx": {"chunk_size": 1000, "chunk_overlap": 200},
        "pptx": {"chunk_size": 500, "chunk_overlap": 100},  # Plus petits pour les slides
        "xlsx": {"chunk_size": 300, "chunk_overlap": 50},   # Plus petits pour les cellules
        "md": {"chunk_size": 800, "chunk_overlap": 150}
    }
    return params.get(format_type, {"chunk_size": 1000, "chunk_overlap": 200})

# Normaliser le texte des documents
for doc in documents:
    doc.page_content = normalize_text(doc.page_content)

# Chunking adaptatif selon le format
chunks = []
for doc in tqdm(documents, desc="Chunking des documents"):
    format_type = doc.metadata.get("format", "pdf")
    params = get_chunk_params(format_type)
    
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=params["chunk_size"],
        chunk_overlap=params["chunk_overlap"],
        length_function=len
    )
    
    doc_chunks = text_splitter.split_documents([doc])
    chunks.extend(doc_chunks)

print(f"Nombre de chunks créés: {len(chunks)}")

Étape 4: Création des embeddings et stockage avec métadonnées

from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
import uuid

# Générer un ID unique pour cette base de connaissances
knowledge_base_id = str(uuid.uuid4())
persist_directory = f"./chroma_multi_format_{knowledge_base_id}"

# Initialiser le modèle d'embedding
embeddings = OpenAIEmbeddings()

# Créer et persister la base vectorielle
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    persist_directory=persist_directory
)
vectorstore.persist()
print(f"Base vectorielle créée et persistée dans: {persist_directory}")

Étape 5: Recherche avec filtrage par format

def query_by_format(query_text, formats=None, k=4):
    """Interroge la base de connaissances avec filtrage optionnel par format."""
    # Préparer le filtre de métadonnées si des formats sont spécifiés
    filter_dict = None
    if formats:
        if isinstance(formats, str):
            formats = [formats]
        filter_dict = {"format": {"$in": formats}}
    
    # Effectuer la recherche
    retriever = vectorstore.as_retriever(
        search_kwargs={"k": k, "filter": filter_dict}
    )
    docs = retriever.get_relevant_documents(query_text)
    
    return docs

# Exemple d'utilisation
query = "Quelles sont les principales caractéristiques mentionnées dans les présentations?"
# Rechercher uniquement dans les présentations PowerPoint
pptx_results = query_by_format(query, formats=["pptx"])
print(f"Résultats des présentations PowerPoint: {len(pptx_results)}")

# Rechercher dans tous les formats
all_results = query_by_format(query)
print(f"Résultats de tous les formats: {len(all_results)}")

Étape 6: Génération de réponse avec référence aux sources et formats

from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQAWithSourcesChain

# Initialiser le modèle de langage
llm = ChatOpenAI(model_name="gpt-3.5-turbo")

# Créer la chaîne de traitement RAG avec sources
qa_chain = RetrievalQAWithSourcesChain.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(search_kwargs={"k": 4}),
    return_source_documents=True
)

def query_with_format_sources(query_text, formats=None):
    """Interroge la base de connaissances et retourne la réponse avec les sources et formats."""
    # Préparer le filtre de métadonnées si des formats sont spécifiés
    filter_dict = None
    if formats:
        if isinstance(formats, str):
            formats = [formats]
        filter_dict = {"format": {"$in": formats}}
    
    # Créer un retriever avec le filtre
    retriever = vectorstore.as_retriever(
        search_kwargs={"k": 4, "filter": filter_dict}
    )
    
    # Créer une chaîne QA avec ce retriever
    chain = RetrievalQAWithSourcesChain.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=retriever,
        return_source_documents=True
    )
    
    # Exécuter la requête
    result = chain({"question": query_text})
    
    # Extraire la réponse et les sources
    answer = result["answer"]
    source_docs = result["source_documents"]
    
    # Créer un dictionnaire des sources uniques avec leurs formats
    unique_sources = {}
    for doc in source_docs:
        source = doc.metadata["source"]
        format_type = doc.metadata.get("format", "unknown")
        page = doc.metadata.get("page", 1)
        
        if source in unique_sources:
            if page not in unique_sources[source]["pages"]:
                unique_sources[source]["pages"].append(page)
        else:
            unique_sources[source] = {
                "format": format_type,
                "pages": [page]
            }
    
    # Formater les sources pour l'affichage
    formatted_sources = []
    for source, info in unique_sources.items():
        format_type = info["format"].upper()
        pages_str = ", ".join([str(p) for p in info["pages"]])
        formatted_sources.append(f"{source} ({format_type}, pages/sections: {pages_str})")
    
    return {
        "answer": answer,
        "sources": formatted_sources
    }

# Exemple d'utilisation
query = "Quels sont les principaux points abordés dans les documents?"
result = query_with_format_sources(query)
print(f"Réponse: {result['answer']}\n")
print("Sources:")
for source in result['sources']:
    print(f"- {source}")

Implémentation dans n8n

Voici comment implémenter un workflow RAG pour des formats variés dans n8n :

Workflow pour Formats Variés

  1. HTTP Request Node : Pour déclencher le workflow via une API
  2. Directory List Node : Pour lister tous les fichiers du répertoire
  3. Filter Node : Pour filtrer les fichiers par extension supportée
  4. Split In Batches Node : Pour traiter les fichiers par lots
  5. Switch Node : Pour router les fichiers selon leur format
    • PDF : Utiliser PyPDF
    • DOCX : Utiliser docx2txt
    • PPTX : Utiliser python-pptx
    • XLSX : Utiliser openpyxl
    • MD : Lecture directe
  6. Code Node : Pour extraire le texte selon le format
    const { PythonShell } = require('python-shell');
    const fs = require('fs');
    
    // Récupérer les informations du fichier
    const filePath = $input.item.json.path;
    const fileName = $input.item.json.name;
    const fileExt = fileName.split('.').pop().toLowerCase();
    
    // Script Python pour extraire le texte selon le format
    let pythonScript = '';
    
    switch (fileExt) {
      case 'pdf':
        pythonScript = `
    import pypdf
    import json
    
    reader = pypdf.PdfReader('${filePath}')
    chunks = []
    
    for i, page in enumerate(reader.pages):
        text = page.extract_text()
        chunks.append({
            "text": text,
            "page": i+1,
            "source": "${fileName}",
            "format": "pdf"
        })
    
    print(json.dumps(chunks))
    `;
        break;
        
      case 'docx':
        pythonScript = `
    import docx2txt
    import json
    
    text = docx2txt.process('${filePath}')
    paragraphs = text.split('\\n\\n')
    chunks = []
    
    for i, para in enumerate(paragraphs):
        if para.strip():
            chunks.append({
                "text": para,
                "section": i+1,
                "source": "${fileName}",
                "format": "docx"
            })
    
    print(json.dumps(chunks))
    `;
        break;
        
      case 'pptx':
        pythonScript = `
    from pptx import Presentation
    import json
    
    prs = Presentation('${filePath}')
    chunks = []
    
    for i, slide in enumerate(prs.slides):
        text = ""
        for shape in slide.shapes:
            if hasattr(shape, "text"):
                text += shape.text + "\\n"
        
        if text.strip():
            chunks.append({
                "text": text,
                "slide": i+1,
                "source": "${fileName}",
                "format": "pptx"
            })
    
    print(json.dumps(chunks))
    `;
        break;
        
      case 'xlsx':
        pythonScript = `
    import openpyxl
    import json
    
    wb = openpyxl.load_workbook('${filePath}')
    chunks = []
    
    for sheet_name in wb.sheetnames:
        sheet = wb[sheet_name]
        text = ""
        
        for row in sheet.iter_rows(values_only=True):
            row_text = " | ".join([str(cell) if cell is not None else "" for cell in row])
            if row_text.strip():
                text += row_text + "\\n"
        
        if text.strip():
            chunks.append({
                "text": text,
                "sheet": sheet_name,
                "source": "${fileName}",
                "format": "xlsx"
            })
    
    print(json.dumps(chunks))
    `;
        break;
        
      case 'md':
        pythonScript = `
    import json
    
    with open('${filePath}', 'r', encoding='utf-8') as f:
        text = f.read()
    
    sections = text.split('\\n## ')
    chunks = []
    
    # Traiter la première section (qui peut commencer par # ou non)
    first_section = sections[0]
    if not first_section.startswith('# '):
        first_section = '# ' + first_section
    chunks.append({
        "text": first_section,
        "section": 1,
        "source": "${fileName}",
        "format": "md"
    })
    
    # Traiter les sections restantes
    for i, section in enumerate(sections[1:], 2):
        chunks.append({
            "text": '## ' + section,
            "section": i,
            "source": "${fileName}",
            "format": "md"
        })
    
    print(json.dumps(chunks))
    `;
        break;
        
      default:
        return {json: {error: `Format non supporté: ${fileExt}`}};
    }
    
    // Exécuter le script Python
    return new Promise((resolve, reject) => {
      PythonShell.runString(pythonScript, null, (err, results) => {
        if (err) reject(err);
        else {
          try {
            const chunks = JSON.parse(results[0]);
            return resolve([{json: {
              fileName: fileName,
              format: fileExt,
              chunks: chunks
            }}]);
          } catch (e) {
            reject(e);
          }
        }
      });
    });
  7. Merge Node : Pour combiner tous les chunks de tous les formats
  8. OpenAI Node : Pour créer les embeddings de chaque chunk
  9. Function Node : Pour stocker les embeddings dans une base de données
  10. HTTP Request Node : Pour recevoir la requête de l'utilisateur
  11. Function Node : Pour extraire les formats à filtrer (optionnel)
  12. OpenAI Node : Pour créer l'embedding de la requête
  13. Function Node : Pour rechercher les chunks les plus similaires avec filtrage par format
  14. OpenAI Node : Pour générer une réponse basée sur la requête et les chunks pertinents
  15. Function Node : Pour formater la réponse avec les références aux sources et formats
  16. Respond to Webhook Node : Pour renvoyer la réponse

Défis Spécifiques et Solutions

Gestion des Formats Spécifiques

Format Défis Solutions
PDF - Extraction de texte formaté
- Gestion des PDF scannés
- Tables et graphiques
- PyPDF pour l'extraction de base
- OCR (Tesseract) pour les PDF scannés
- Extraction de tables avec Camelot ou Tabula
DOCX - Préservation de la structure
- Contenu imbriqué
- Styles et formatage
- docx2txt pour l'extraction simple
- python-docx pour une extraction plus structurée
- Chunking par paragraphes ou sections
PPTX - Contenu principalement visuel
- Notes de présentation
- Diapositives avec peu de texte
- python-pptx pour l'extraction
- Inclusion des notes de présentation
- Chunking par diapositive
XLSX - Données tabulaires
- Formules vs valeurs
- Multiples feuilles
- openpyxl pour l'extraction
- Conversion en format tabulaire texte
- Chunking par feuille ou par plages de cellules
MD - Syntaxe Markdown
- Liens et références
- Code imbriqué
- Lecture directe ou avec markdown-it
- Préservation des liens
- Chunking par titres ou sections

Stratégies de Chunking Adaptatives

def adaptive_chunking(documents):
    """Applique des stratégies de chunking adaptées à chaque format."""
    chunks = []
    
    for doc in documents:
        format_type = doc.metadata.get("format", "unknown")
        
        if format_type == "pdf":
            # Chunking par page ou paragraphes pour les PDF
            splitter = RecursiveCharacterTextSplitter(
                chunk_size=1000,
                chunk_overlap=200
            )
            doc_chunks = splitter.split_documents([doc])
            
        elif format_type == "docx":
            # Chunking par paragraphes pour les DOCX
            splitter = RecursiveCharacterTextSplitter(
                chunk_size=1000,
                chunk_overlap=200,
                separators=["\n\n", "\n", ". ", " ", ""]
            )
            doc_chunks = splitter.split_documents([doc])
            
        elif format_type == "pptx":
            # Pour les PPTX, chaque diapositive est déjà un chunk
            doc_chunks = [doc]
            
        elif format_type == "xlsx":
            # Pour les XLSX, chaque feuille est déjà un chunk
            doc_chunks = [doc]
            
        elif format_type == "md":
            # Chunking par sections pour les MD
            splitter = RecursiveCharacterTextSplitter(
                chunk_size=800,
                chunk_overlap=150,
                separators=["## ", "### ", "\n\n", "\n", ". ", " ", ""]
            )
            doc_chunks = splitter.split_documents([doc])
            
        else:
            # Chunking par défaut pour les formats inconnus
            splitter = RecursiveCharacterTextSplitter(
                chunk_size=1000,
                chunk_overlap=200
            )
            doc_chunks = splitter.split_documents([doc])
        
        chunks.extend(doc_chunks)
    
    return chunks

Interface Utilisateur avec Filtrage par Format

def create_ui_prompt():
    """Crée un prompt pour l'interface utilisateur avec options de filtrage."""
    prompt = """
Vous pouvez interroger la base de connaissances multi-format.
Votre requête : [Entrez votre question ici]

Options de filtrage (facultatif) :
[ ] PDF
[ ] DOCX
[ ] PPTX
[ ] XLSX
[ ] MD

Exemple : "Quelles sont les principales conclusions du projet XYZ?" [PDF, PPTX]
"""
    return prompt

def parse_user_query(user_input):
    """Parse l'entrée utilisateur pour extraire la requête et les filtres de format."""
    # Rechercher les formats entre crochets à la fin de la requête
    import re
    format_match = re.search(r'\[(.*?)\]$', user_input)
    
    formats = None
    query = user_input
    
    if format_match:
        # Extraire et nettoyer les formats
        formats_str = format_match.group(1)
        formats = [f.strip().lower() for f in formats_str.split(',')]
        
        # Retirer la partie des formats de la requête
        query = user_input[:format_match.start()].strip()
    
    return {
        "query": query,
        "formats": formats
    }

Bonnes Pratiques et Optimisations

Prétraitement Spécifique au Format

Métadonnées Enrichies

L'utilisation de métadonnées enrichies permet d'améliorer la pertinence des résultats :

def enrich_metadata(doc, file_path):
    """Enrichit les métadonnées du document selon son format."""
    _, ext = os.path.splitext(file_path)
    format_type = ext.lower().replace(".", "")
    
    # Métadonnées de base
    metadata = {
        "source": os.path.basename(file_path),
        "format": format_type,
        "path": file_path,
        "last_modified": os.path.getmtime(file_path)
    }
    
    # Métadonnées spécifiques au format
    if format_type == "pdf":
        try:
            import pypdf
            reader = pypdf.PdfReader(file_path)
            info = reader.metadata
            if info:
                metadata["title"] = info.get("/Title", "")
                metadata["author"] = info.get("/Author", "")
                metadata["creation_date"] = info.get("/CreationDate", "")
        except:
            pass
            
    elif format_type == "docx":
        try:
            from docx import Document
            doc_obj = Document(file_path)
            core_props = doc_obj.core_properties
            metadata["title"] = core_props.title or ""
            metadata["author"] = core_props.author or ""
            metadata["created"] = str(core_props.created) if core_props.created else ""
        except:
            pass
            
    # Ajouter les métadonnées au document
    doc.metadata.update(metadata)
    
    return doc

Considérations de Performance