Normalisation, quantification, Recherche hybride et surveillance qualité : les piliers d'une base de connaissances IA performante, illustrés en Python.

NORMALISATION DES VECTEURS ET MÉTADONNÉES

Avant d'indexer des documents, il est crucial de préparer les vecteurs d'embedding. La normalisation L2 ramène chaque vecteur à une longueur unitaire, ce qui transforme un simple produit scalaire en une mesure de similarité cosinus, standard dans beaucoup de bases vectorielles. La fonction prepare_record enrichit aussi chaque entrée avec des métadonnées (source, aperçu du texte, nombre de caractères) pour permettre un filtrage fin lors des recherches.

import math
import time
import json
from dataclasses import dataclass, field
from typing import Any

import numpy as np


def normalize_l2(vector: list[float]) -> list[float]:
    """
    Return an L2-normalized copy of vec.
    Many vector stores use dot-product similarity. If you normalize vectors to
    unit length, dot-product becomes equivalent to cosine similarity.
    """
    arr = np.array(vector, dtype=np.float32)
    norm = np.linalg.norm(arr)
    if norm == 0:
        return vector
    return (arr / norm).tolist()


def prepare_record(
    doc_id: str,
    embedding: list[float],
    text: str,
    source: str,
    extra_metadata: dict[str, Any] | None = None,
) -> dict:
    """
    Prepare a single record for vector DB upsert.
    Metadata serves two purposes:
    - Filtering: narrow down search to a subset
    """
    metadata = {
        "source": source,
        "text_preview": text[:500],
        "char_count": len(text),
    }
    if extra_metadata:
        metadata.update(extra_metadata)

    return {
        "id": doc_id,
        "values": normalize_l2(embedding),
        "metadata": metadata,
    }

QUANTIFICATION SCALAIRE : COMPRESSION EN UINT8

Stocker des vecteurs en float32 consomme beaucoup de mémoire. La quantification scalaire compresse chaque dimension de 32 bits à 8 bits, en mappant les valeurs min/max sur l'intervalle [0, 255]. La décompression reconstruit une approximation du vecteur d'origine, avec une perte minime tant que la plage de valeurs n'est pas trop extrême.

def scalarquantization(inputvec) -> dict:
    """
    This funtion demonstrates 
      how to compress float32 input_vec to uint8
    """
    inputarr = np.array(inputvec, dtype=np.float32)
    min, max = inputarr.min(), inputarr.max()
    range = (max - min)
    if range == 0:
        quantized = np.zeros_like(arr, dtype=np.uint8)
    else:
        quantized = ((input_arr - min) / range * 255).astype(np.uint8)

    return {
        "quantized": quantized.tolist(),
        "min": float(min),
        "max": float(max),
    }


def scalar_dequantization(record: dict) -> list[float]:
    """
    You can Reconstruct the original vector 
      by approximate float32 vector from uint8.
    """
    arr = np.array(record["quantized"], dtype=np.float32)
    return (arr / 255 * (record["max"] - record["min"]) + record["min"]).tolist()

QUANTIFICATION PAR PRODUIT : ENCORE PLUS DE COMPRESSION

La quantification par produit (PQ) pousse la compression plus loin. Elle découpe chaque vecteur en sous-vecteurs et applique un clustering K-means indépendant sur chaque groupe. Le vecteur d'origine est ainsi remplacé par une séquence d'indices (un par sous-vecteur) pointant vers les centroïdes les plus proches. Résultat : un taux de compression pouvant atteindre 95% selon le nombre de sous-vecteurs et de centroïdes.

def trainproductquantizer( vectors, numsubvectors: int = 8, numcentroids: int = 256, max_iterations: int = 20) -> list:
    """
    This function demonstrates 
      split vector into subvectors, cluster each independently
    """
    from sklearn.cluster import KMeans

    dim = vectors.shape[1]
    assert dim % numsubvectors == 0, "dim must be divisible by numsubvectors"
    subdim = dim // numsubvectors

    codebooks = []
    for i in range(num_subvectors):
        subvectors = vectors[:, i  subdim : (i + 1)  sub_dim]
        kmeans = KMeans(nclusters=numcentroids, maxiter=maxiterations, n_init=1)
        kmeans.fit(sub_vectors)
        codebooks.append(kmeans.clustercenters)

    return codebooks


def pq_encode(vector: np.ndarray, codebooks: list[np.ndarray]) -> list[int]:
    """
    Encode a single vector into PQ codes (one uint8 per subvector)
    """
    num_subvectors = len(codebooks)
    subdim = len(vector) // numsubvectors
    codes = []

    for i, codebook in enumerate(codebooks):
        subvec = vector[i  subdim : (i + 1)  sub_dim]
        distances = np.linalg.norm(codebook - sub_vec, axis=1)
        codes.append(int(np.argmin(distances)))

    return codes


def pq_decode(codes: list[int], codebooks: list[np.ndarray]) -> np.ndarray:
    """
    Reconstruct approximate vector from PQ codes
    """
    return np.concatenate(
        [codebook[code] for code, codebook in zip(codes, codebooks)]
    )

RECHERCHE HYBRIDE : MOTS-CLÉS ET SÉMANTIQUE

Aucune approche de recherche n'est parfaite seule. La recherche par mots-clés (type BM25) excelle pour les correspondances exactes mais rate les synonymes ; la recherche vectorielle capture le sens mais peut ignorer les mots importants. Une recherche hybride combine les deux pour tirer le meilleur de chaque monde.

INDEXATION BM25

BM25 est un modèle probabiliste qui pondère la fréquence d'un terme dans un document par sa rareté dans le corpus, tout en normalisant par la longueur du document. Les paramètres k1 et b contrôlent respectivement la saturation de la fréquence et l'impact de la longueur.

class BestMatching25Index:
    def __init__(self, k1: float = 1.5, b: float = 0.75):
        self.k1 = k1
        self.b = b
        self.doc_lengths: dict[str, int] = {}
        self.avgdoclength: float = 0
        self.doc_freqs: dict[str, int] = {} 
        self.term_freqs: dict[str, dict[str, int]] = {} 
        self.corpus_size: int = 0

    def _tokenize(self, text: str) -> list[str]:
        return text.lower().split()

    def index(self, documents: list[Document]) -> None:
        self.corpus_size = len(documents)

        for doc in documents:
            tokens = self._tokenize(doc.text)
            self.doc_lengths[doc.id] = len(tokens)
            self.term_freqs[doc.id] = {}

            seen_terms: set[str] = set()
            for token in tokens:
                self.termfreqs[doc.id][token] = self.termfreqs[doc.id].get(token, 0) + 1
                if token not in seen_terms:
                    self.docfreqs[token] = self.docfreqs.get(token, 0) + 1
                    seen_terms.add(token)

        self.avgdoclength = sum(self.doclengths.values()) / self.corpussize

    def score(self, query: str, doc_id: str) -> float:
        queryterms = self.tokenize(query)
        doclen = self.doclengths[doc_id]
        score = 0.0

        for term in query_terms:
            if term not in self.docfreqs or term not in self.termfreqs.get(doc_id, {}):
                continue

            tf = self.termfreqs[docid][term]
            df = self.doc_freqs[term]
            idf = math.log((self.corpus_size - df + 0.5) / (df + 0.5) + 1)
            tf_norm = (tf * (self.k1 + 1)) / (
                tf + self.k1  (1 - self.b + self.b  doclen / self.avgdoc_length)
            )
            score += idf * tf_norm

        return score

    def search(self, query: str, top_k: int = 10) -> list[tuple[str, float]]:
        scores = [
            (docid, self.score(query, docid))
            for docid in self.doclengths
        ]
        scores.sort(key=lambda x: x[1], reverse=True)
        return scores[:top_k]

INDEXATION VECTORIELLE

Un index vectoriel stocke les embeddings normalisés et calcule la similarité cosinus via un produit scalaire. La recherche renvoie les documents les plus similaires à la requête, en se basant sur la proximité sémantique.

class VectorIndex:
    """
    Smart search using hybrid search.
    The index function normalizes and stores documents.
    Search implements cosine similarity search.
    """
    def __init__(self):
        self.documents: dict[str, np.ndarray] = {}

    def index(self, documents: list[Document]) -> None:
        for doc in documents:
            arr = np.array(doc.embedding, dtype=np.float32)
            norm = np.linalg.norm(arr)
            self.documents[doc.id] = arr / norm if norm > 0 else arr

    def search(self, queryembedding: list[float], topk: int = 10) -> list[tuple[str, float]]:
        q = np.array(query_embedding, dtype=np.float32)
        q = q / np.linalg.norm(q)

        scores = [
            (doc_id, float(np.dot(q, emb)))
            for doc_id, emb in self.documents.items()
        ]
        scores.sort(key=lambda x: x[1], reverse=True)
        return scores[:top_k]
Un alpha de 0,5 donne un poids égal aux scores vectoriels et par mots-clés : un excellent point de départ.

COMBINAISON PONDÉRÉE

La fonction hybridsearchweighted fusionne les classements BM25 et vectoriel. Chaque liste de scores est d'abord normalisée entre 0 et 1, puis combinée via un paramètre alpha. Avec alpha=1, seule la recherche vectorielle compte ; avec alpha=0, seule la recherche par mots-clés.

def hybridsearchweighted(
    query: str,
    query_embedding: list[float],
    bm25_index: BestMatching25Index,
    vector_index: VectorIndex,
    alpha: float = 0.5,
    top_k: int = 10,
) -> list[dict]:
    """Combine keyword and vector scores with a tunable weight.

    alpha = 1.0 → pure vector search
    alpha = 0.0 → pure keyword search
    alpha = 0.5 → equal weight (good starting point)
    """
    keywordresults = bm25index.search(query, topk=topk * 2)
    vectorresults = vectorindex.search(queryembedding, topk=top_k * 2)

    # Normalize (min-max) each score list to [0, 1]
    def normalize_scores(results: list[tuple[str, float]]) -> dict[str, float]:
        if not results:
            return {}
        scores = [s for _, s in results]
        mins, maxs = min(scores), max(scores)
        rng = maxs - mins
        if rng == 0:
            return {docid: 1.0 for docid, _ in results}
        return {docid: (s - mins) / rng for doc_id, s in results}

    keywordscores = normalizescores(keyword_results)
    vectorscores = normalizescores(vector_results)

    # Merge
    alldocids = set(keywordscores) | set(vectorscores)
    combined = []
    for docid in alldoc_ids:
        ks = keywordscores.get(docid, 0.0)
        vs = vectorscores.get(docid, 0.0)
        combined.append({
            "id": doc_id,
            "score": alpha  vs + (1 - alpha)  ks,
            "keyword_score": ks,
            "vector_score": vs,
        })

    combined.sort(key=lambda x: x["score"], reverse=True)
    return combined[:top_k]

FUSION PAR RANG RÉCIPROQUE (RRF)

La RRF est une alternative élégante à la combinaison pondérée : elle travaille sur les rangs, non sur les scores bruts. Chaque document reçoit un score égal à la somme de 1/(k + rang) pour chaque liste, où k est une constante (souvent 60). Aucune normalisation préalable n'est nécessaire, ce qui la rend robuste quel que soit la distribution des scores. Utilisée par Elasticsearch, Pinecone et Weaviate.

def reciprocalrankfusion(
    *ranked_lists: list[tuple[str, float]],
    k: int = 60,
    top_n: int = 10,
) -> list[dict]:
    """
    Merge multiple ranked lists using RRF (Reciprocal Rank Fusion)

    RRF score = sum over all lists of: 1 / (k + rank)
    """
    rrf_scores: dict[str, float] = defaultdict(float)
    doc_details: dict[str, dict] = {}

    for listidx, rankedlist in enumerate(ranked_lists):
        for rank, (docid, rawscore) in enumerate(ranked_list, start=1):
            rrfscores[docid] += 1.0 / (k + rank)
            if docid not in docdetails:
                docdetails[docid] = {}
            docdetails[docid][f"list{listidx}_rank"] = rank
            docdetails[docid][f"list{listidx}score"] = rawscore

    results = []
    for docid, rrfscore in rrf_scores.items():
        results.append({
            "id": doc_id,
            "rrfscore": round(rrfscore, 6),
            **docdetails[docid],
        })

    results.sort(key=lambda x: x["rrf_score"], reverse=True)
    return results[:top_n]


def hybridsearchrrf(
    query: str,
    query_embedding: list[float],
    bm25_index: BestMatching25Index,
    vector_index: VectorIndex,
    top_k: int = 10,
) -> list[dict]:
    keywordresults = bm25index.search(query, topk=topk * 2)
    vectorresults = vectorindex.search(queryembedding, topk=top_k * 2)

    return reciprocalrankfusion(keywordresults, vectorresults, topn=topk)

SURVEILLANCE DE LA QUALITÉ DE LA BASE

Une base de connaissances n'est jamais figée. La pertinence des réponses, la fraîcheur des documents et la compatibilité des embeddings dans le temps doivent être surveillées en continu. Des outils comme DeepEval et TruLens automatisent ces contrôles.

MÉTRIQUES AVEC DEEPEVAL

DeepEval évalue la qualité de la récupération à l'aide d'un LLM juge. Quatre métriques sont calculées : la pertinence de la réponse (adresse-t-elle la question ?), la fidélité (la réponse est-elle fondée sur le contexte récupéré, sans hallucination ?), la précision contextuelle (les documents récupérés sont-ils réellement pertinents ?) et le rappel contextuel (a-t-on récupéré tout le nécessaire pour répondre ?).

def setupdeepevalmetrics():
    """Define retrieval quality metrics using DeepEval."""
    from deepeval.metrics import (
        AnswerRelevancyMetric,
        FaithfulnessMetric,
        ContextualPrecisionMetric,
        ContextualRecallMetric,
    )
    from deepeval.test_case import LLMTestCase

    metrics = {
        "relevancy": AnswerRelevancyMetric(threshold=0.7),
        "faithfulness": FaithfulnessMetric(threshold=0.7),
        "context_precision": ContextualPrecisionMetric(threshold=0.7),
        "context_recall": ContextualRecallMetric(threshold=0.7),
    }

    return metrics, LLMTestCase


def evaluateretrievalquality(
    rag_pipeline: Callable,
    test_cases: list[dict],
) -> list[dict]:
    """Run a set of test queries through your RAG pipeline and score them."""
    from deepeval import evaluate
    from deepeval.test_case import LLMTestCase
    from deepeval.metrics import (
        AnswerRelevancyMetric,
        FaithfulnessMetric,
        ContextualPrecisionMetric,
        ContextualRecallMetric,
    )

    results = []

    for tc in test_cases:
        response = rag_pipeline(tc["query"])

        test_case = LLMTestCase(
            input=tc["query"],
            actual_output=response["answer"],
            expectedoutput=tc["expectedanswer"],
            retrievalcontext=response["retrievedcontexts"],
        )

        metrics = [
            AnswerRelevancyMetric(threshold=0.7),
            FaithfulnessMetric(threshold=0.7),
            ContextualPrecisionMetric(threshold=0.7),
            ContextualRecallMetric(threshold=0.7),
        ]

        for metric in metrics:
            metric.measure(test_case)

        results.append({
            "query": tc["query"],
            "scores": {m.__class__.__name__: m.score for m in metrics},
            "passed": all(m.is_successful() for m in metrics),
        })

    return results

SURVEILLANCE CONTINUE AVEC TRULENS

TruLens enregistre chaque interaction avec le pipeline RAG et lui applique des feedbacks asynchrones. En utilisant un fournisseur LLM (comme OpenAI), il juge la pertinence, l'ancrage contextuel et la qualité du contexte récupéré. Un tableau de bord (localhost:8501) permet de suivre l'évolution de la qualité dans le temps.

def setuptrulensmonitoring(ragpipeline: Callable, appname: str = "my_kb"):
    """Wrap your RAG pipeline with TruLens for continuous feedback logging."""
    from trulens.core import TruSession, Feedback, Select
    from trulens.providers.openai import OpenAI as TruLensOpenAI
    from trulens.apps.custom import TruCustomApp, instrument

    session = TruSession()

    provider = TruLensOpenAI()

    feedbacks = [
        Feedback(provider.relevance)
        .on_input()
        .on_output(),

        Feedback(provider.groundednessmeasurewithcotreasons)
        .on(Select.RecordCalls.retrieve.rets)
        .on_output(),

        Feedback(provider.context_relevance)
        .on_input()
        .on(Select.RecordCalls.retrieve.rets),
    ]

    @instrument
    class InstrumentedRAG:
        def __init__(self, pipeline):
            self._pipeline = pipeline

        @instrument
        def retrieve(self, query: str) -> list[str]:
            result = self._pipeline(query)
            return result["retrieved_contexts"]

        @instrument
        def query(self, query: str) -> str:
            result = self._pipeline(query)
            return result["answer"]

    instrumented = InstrumentedRAG(rag_pipeline)

    tru_app = TruCustomApp(
        instrumented,
        appname=appname,
        feedbacks=feedbacks,
    )

    return tru_app, session


def gettrulensdashboard_url(session) -> str:
    """Launch the TruLens dashboard to visualize quality over time."""
    session.run_dashboard(port=8501)
    return "http://localhost:8501"

FRAÎCHEUR DES DOCUMENTS

Un document peut devenir obsolète si son contenu évolue sans que son embedding soit recalculé. Le FreshnessMonitor vérifie périodiquement l'âge des embeddings et compare les empreintes (hash) du contenu source. Si l'écart dépasse un seuil (par défaut 30 jours), le document est signalé comme périmé.

@dataclass
class DocumentFreshness:
    doc_id: str
    last_updated: datetime
    last_embedded: datetime
    source_hash: str


class FreshnessMonitor:
    def __init__(self, stalenessthresholddays: int = 30):
        self.threshold = timedelta(days=stalenessthresholddays)
        self.freshness_records: dict[str, DocumentFreshness] = {}

    def register(self, docid: str, sourcehash: str) -> None:
        now = datetime.utcnow()
        self.freshnessrecords[docid] = DocumentFreshness(
            docid=docid,
            last_updated=now,
            last_embedded=now,
            sourcehash=sourcehash,
        )

    def check_staleness(self) -> dict:
        now = datetime.utcnow()
        stale, fresh = [], []

        for docid, record in self.freshnessrecords.items():
            age = now - record.last_embedded
            if age > self.threshold:
                stale.append({"id": docid, "daysstale": age.days})
            else:
                fresh.append(doc_id)

        return {
            "total": len(self.freshness_records),
            "fresh": len(fresh),
            "stale": len(stale),
            "stale_documents": stale,
        }

    def checkcontentdrift(
        self, docid: str, currentsource_hash: str
    ) -> bool:
        record = self.freshnessrecords.get(docid)
        if not record:
            return True
        return record.sourcehash .= currentsource_hash

DÉRIVE DES EMBEDDINGS

Changer de modèle d'embedding ou le mettre à jour peut rendre les anciens vecteurs incompatibles. La fonction detectembeddingdrift compare deux ensembles de vecteurs pour un même document et signale ceux dont la distance cosinus dépasse un seuil (par défaut 0,1). Un rappel pour ré-indexer.

def detectembeddingdrift(
    old_embeddings: dict[str, list[float]],
    new_embeddings: dict[str, list[float]],
    drift_threshold: float = 0.1,
) -> dict:
    drifted = []
    commonids = set(oldembeddings) & set(new_embeddings)

    for docid in commonids:
        old = np.array(oldembeddings[docid])
        new = np.array(newembeddings[docid])

        cos_sim = np.dot(old, new) / (np.linalg.norm(old) * np.linalg.norm(new))
        cosdist = 1 - cossim

        if cosdist > driftthreshold:
            drifted.append({
                "id": doc_id,
            })

    return drifted
Surveiller la qualité n'est pas optionnel : c'est la garantie que votre base reste fiable dans le temps.

En combinant normalisation, quantification, recherche hybride et une boucle de surveillance automatisée, vous posez les fondations d'une base de connaissances robuste et évolutive pour vos modèles d'IA.

Sources :
  • Towards Data Science

L'indépendance de CLODCO est votre garantie.

Pour que l'actualité de l'IA reste sans filtre et sans concession, votre soutien est indispensable. Votre contribution est le seul moteur de notre liberté éditoriale.

Soutenir CLODCO