Normalisation, quantification, Recherche hybride et surveillance qualité : les piliers d'une base de connaissances IA performante, illustrés en Python.
NORMALISATION DES VECTEURS ET MÉTADONNÉES
Avant d'indexer des documents, il est crucial de préparer les vecteurs d'embedding. La normalisation L2 ramène chaque vecteur à une longueur unitaire, ce qui transforme un simple produit scalaire en une mesure de similarité cosinus, standard dans beaucoup de bases vectorielles. La fonction prepare_record enrichit aussi chaque entrée avec des métadonnées (source, aperçu du texte, nombre de caractères) pour permettre un filtrage fin lors des recherches.
import math
import time
import json
from dataclasses import dataclass, field
from typing import Any
import numpy as np
def normalize_l2(vector: list[float]) -> list[float]:
"""
Return an L2-normalized copy of vec.
Many vector stores use dot-product similarity. If you normalize vectors to
unit length, dot-product becomes equivalent to cosine similarity.
"""
arr = np.array(vector, dtype=np.float32)
norm = np.linalg.norm(arr)
if norm == 0:
return vector
return (arr / norm).tolist()
def prepare_record(
doc_id: str,
embedding: list[float],
text: str,
source: str,
extra_metadata: dict[str, Any] | None = None,
) -> dict:
"""
Prepare a single record for vector DB upsert.
Metadata serves two purposes:
- Filtering: narrow down search to a subset
"""
metadata = {
"source": source,
"text_preview": text[:500],
"char_count": len(text),
}
if extra_metadata:
metadata.update(extra_metadata)
return {
"id": doc_id,
"values": normalize_l2(embedding),
"metadata": metadata,
}
QUANTIFICATION SCALAIRE : COMPRESSION EN UINT8
Stocker des vecteurs en float32 consomme beaucoup de mémoire. La quantification scalaire compresse chaque dimension de 32 bits à 8 bits, en mappant les valeurs min/max sur l'intervalle [0, 255]. La décompression reconstruit une approximation du vecteur d'origine, avec une perte minime tant que la plage de valeurs n'est pas trop extrême.
def scalarquantization(inputvec) -> dict:
"""
This funtion demonstrates
how to compress float32 input_vec to uint8
"""
inputarr = np.array(inputvec, dtype=np.float32)
min, max = inputarr.min(), inputarr.max()
range = (max - min)
if range == 0:
quantized = np.zeros_like(arr, dtype=np.uint8)
else:
quantized = ((input_arr - min) / range * 255).astype(np.uint8)
return {
"quantized": quantized.tolist(),
"min": float(min),
"max": float(max),
}
def scalar_dequantization(record: dict) -> list[float]:
"""
You can Reconstruct the original vector
by approximate float32 vector from uint8.
"""
arr = np.array(record["quantized"], dtype=np.float32)
return (arr / 255 * (record["max"] - record["min"]) + record["min"]).tolist()
QUANTIFICATION PAR PRODUIT : ENCORE PLUS DE COMPRESSION
La quantification par produit (PQ) pousse la compression plus loin. Elle découpe chaque vecteur en sous-vecteurs et applique un clustering K-means indépendant sur chaque groupe. Le vecteur d'origine est ainsi remplacé par une séquence d'indices (un par sous-vecteur) pointant vers les centroïdes les plus proches. Résultat : un taux de compression pouvant atteindre 95% selon le nombre de sous-vecteurs et de centroïdes.
def trainproductquantizer( vectors, numsubvectors: int = 8, numcentroids: int = 256, max_iterations: int = 20) -> list:
"""
This function demonstrates
split vector into subvectors, cluster each independently
"""
from sklearn.cluster import KMeans
dim = vectors.shape[1]
assert dim % numsubvectors == 0, "dim must be divisible by numsubvectors"
subdim = dim // numsubvectors
codebooks = []
for i in range(num_subvectors):
subvectors = vectors[:, i subdim : (i + 1) sub_dim]
kmeans = KMeans(nclusters=numcentroids, maxiter=maxiterations, n_init=1)
kmeans.fit(sub_vectors)
codebooks.append(kmeans.clustercenters)
return codebooks
def pq_encode(vector: np.ndarray, codebooks: list[np.ndarray]) -> list[int]:
"""
Encode a single vector into PQ codes (one uint8 per subvector)
"""
num_subvectors = len(codebooks)
subdim = len(vector) // numsubvectors
codes = []
for i, codebook in enumerate(codebooks):
subvec = vector[i subdim : (i + 1) sub_dim]
distances = np.linalg.norm(codebook - sub_vec, axis=1)
codes.append(int(np.argmin(distances)))
return codes
def pq_decode(codes: list[int], codebooks: list[np.ndarray]) -> np.ndarray:
"""
Reconstruct approximate vector from PQ codes
"""
return np.concatenate(
[codebook[code] for code, codebook in zip(codes, codebooks)]
)
RECHERCHE HYBRIDE : MOTS-CLÉS ET SÉMANTIQUE
Aucune approche de recherche n'est parfaite seule. La recherche par mots-clés (type BM25) excelle pour les correspondances exactes mais rate les synonymes ; la recherche vectorielle capture le sens mais peut ignorer les mots importants. Une recherche hybride combine les deux pour tirer le meilleur de chaque monde.
INDEXATION BM25
BM25 est un modèle probabiliste qui pondère la fréquence d'un terme dans un document par sa rareté dans le corpus, tout en normalisant par la longueur du document. Les paramètres k1 et b contrôlent respectivement la saturation de la fréquence et l'impact de la longueur.
class BestMatching25Index:
def __init__(self, k1: float = 1.5, b: float = 0.75):
self.k1 = k1
self.b = b
self.doc_lengths: dict[str, int] = {}
self.avgdoclength: float = 0
self.doc_freqs: dict[str, int] = {}
self.term_freqs: dict[str, dict[str, int]] = {}
self.corpus_size: int = 0
def _tokenize(self, text: str) -> list[str]:
return text.lower().split()
def index(self, documents: list[Document]) -> None:
self.corpus_size = len(documents)
for doc in documents:
tokens = self._tokenize(doc.text)
self.doc_lengths[doc.id] = len(tokens)
self.term_freqs[doc.id] = {}
seen_terms: set[str] = set()
for token in tokens:
self.termfreqs[doc.id][token] = self.termfreqs[doc.id].get(token, 0) + 1
if token not in seen_terms:
self.docfreqs[token] = self.docfreqs.get(token, 0) + 1
seen_terms.add(token)
self.avgdoclength = sum(self.doclengths.values()) / self.corpussize
def score(self, query: str, doc_id: str) -> float:
queryterms = self.tokenize(query)
doclen = self.doclengths[doc_id]
score = 0.0
for term in query_terms:
if term not in self.docfreqs or term not in self.termfreqs.get(doc_id, {}):
continue
tf = self.termfreqs[docid][term]
df = self.doc_freqs[term]
idf = math.log((self.corpus_size - df + 0.5) / (df + 0.5) + 1)
tf_norm = (tf * (self.k1 + 1)) / (
tf + self.k1 (1 - self.b + self.b doclen / self.avgdoc_length)
)
score += idf * tf_norm
return score
def search(self, query: str, top_k: int = 10) -> list[tuple[str, float]]:
scores = [
(docid, self.score(query, docid))
for docid in self.doclengths
]
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]
INDEXATION VECTORIELLE
Un index vectoriel stocke les embeddings normalisés et calcule la similarité cosinus via un produit scalaire. La recherche renvoie les documents les plus similaires à la requête, en se basant sur la proximité sémantique.
class VectorIndex:
"""
Smart search using hybrid search.
The index function normalizes and stores documents.
Search implements cosine similarity search.
"""
def __init__(self):
self.documents: dict[str, np.ndarray] = {}
def index(self, documents: list[Document]) -> None:
for doc in documents:
arr = np.array(doc.embedding, dtype=np.float32)
norm = np.linalg.norm(arr)
self.documents[doc.id] = arr / norm if norm > 0 else arr
def search(self, queryembedding: list[float], topk: int = 10) -> list[tuple[str, float]]:
q = np.array(query_embedding, dtype=np.float32)
q = q / np.linalg.norm(q)
scores = [
(doc_id, float(np.dot(q, emb)))
for doc_id, emb in self.documents.items()
]
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]
COMBINAISON PONDÉRÉE
La fonction hybridsearchweighted fusionne les classements BM25 et vectoriel. Chaque liste de scores est d'abord normalisée entre 0 et 1, puis combinée via un paramètre alpha. Avec alpha=1, seule la recherche vectorielle compte ; avec alpha=0, seule la recherche par mots-clés.
def hybridsearchweighted(
query: str,
query_embedding: list[float],
bm25_index: BestMatching25Index,
vector_index: VectorIndex,
alpha: float = 0.5,
top_k: int = 10,
) -> list[dict]:
"""Combine keyword and vector scores with a tunable weight.
alpha = 1.0 → pure vector search
alpha = 0.0 → pure keyword search
alpha = 0.5 → equal weight (good starting point)
"""
keywordresults = bm25index.search(query, topk=topk * 2)
vectorresults = vectorindex.search(queryembedding, topk=top_k * 2)
# Normalize (min-max) each score list to [0, 1]
def normalize_scores(results: list[tuple[str, float]]) -> dict[str, float]:
if not results:
return {}
scores = [s for _, s in results]
mins, maxs = min(scores), max(scores)
rng = maxs - mins
if rng == 0:
return {docid: 1.0 for docid, _ in results}
return {docid: (s - mins) / rng for doc_id, s in results}
keywordscores = normalizescores(keyword_results)
vectorscores = normalizescores(vector_results)
# Merge
alldocids = set(keywordscores) | set(vectorscores)
combined = []
for docid in alldoc_ids:
ks = keywordscores.get(docid, 0.0)
vs = vectorscores.get(docid, 0.0)
combined.append({
"id": doc_id,
"score": alpha vs + (1 - alpha) ks,
"keyword_score": ks,
"vector_score": vs,
})
combined.sort(key=lambda x: x["score"], reverse=True)
return combined[:top_k]
FUSION PAR RANG RÉCIPROQUE (RRF)
La RRF est une alternative élégante à la combinaison pondérée : elle travaille sur les rangs, non sur les scores bruts. Chaque document reçoit un score égal à la somme de 1/(k + rang) pour chaque liste, où k est une constante (souvent 60). Aucune normalisation préalable n'est nécessaire, ce qui la rend robuste quel que soit la distribution des scores. Utilisée par Elasticsearch, Pinecone et Weaviate.
def reciprocalrankfusion(
*ranked_lists: list[tuple[str, float]],
k: int = 60,
top_n: int = 10,
) -> list[dict]:
"""
Merge multiple ranked lists using RRF (Reciprocal Rank Fusion)
RRF score = sum over all lists of: 1 / (k + rank)
"""
rrf_scores: dict[str, float] = defaultdict(float)
doc_details: dict[str, dict] = {}
for listidx, rankedlist in enumerate(ranked_lists):
for rank, (docid, rawscore) in enumerate(ranked_list, start=1):
rrfscores[docid] += 1.0 / (k + rank)
if docid not in docdetails:
docdetails[docid] = {}
docdetails[docid][f"list{listidx}_rank"] = rank
docdetails[docid][f"list{listidx}score"] = rawscore
results = []
for docid, rrfscore in rrf_scores.items():
results.append({
"id": doc_id,
"rrfscore": round(rrfscore, 6),
**docdetails[docid],
})
results.sort(key=lambda x: x["rrf_score"], reverse=True)
return results[:top_n]
def hybridsearchrrf(
query: str,
query_embedding: list[float],
bm25_index: BestMatching25Index,
vector_index: VectorIndex,
top_k: int = 10,
) -> list[dict]:
keywordresults = bm25index.search(query, topk=topk * 2)
vectorresults = vectorindex.search(queryembedding, topk=top_k * 2)
return reciprocalrankfusion(keywordresults, vectorresults, topn=topk)
SURVEILLANCE DE LA QUALITÉ DE LA BASE
Une base de connaissances n'est jamais figée. La pertinence des réponses, la fraîcheur des documents et la compatibilité des embeddings dans le temps doivent être surveillées en continu. Des outils comme DeepEval et TruLens automatisent ces contrôles.
MÉTRIQUES AVEC DEEPEVAL
DeepEval évalue la qualité de la récupération à l'aide d'un LLM juge. Quatre métriques sont calculées : la pertinence de la réponse (adresse-t-elle la question ?), la fidélité (la réponse est-elle fondée sur le contexte récupéré, sans hallucination ?), la précision contextuelle (les documents récupérés sont-ils réellement pertinents ?) et le rappel contextuel (a-t-on récupéré tout le nécessaire pour répondre ?).
def setupdeepevalmetrics():
"""Define retrieval quality metrics using DeepEval."""
from deepeval.metrics import (
AnswerRelevancyMetric,
FaithfulnessMetric,
ContextualPrecisionMetric,
ContextualRecallMetric,
)
from deepeval.test_case import LLMTestCase
metrics = {
"relevancy": AnswerRelevancyMetric(threshold=0.7),
"faithfulness": FaithfulnessMetric(threshold=0.7),
"context_precision": ContextualPrecisionMetric(threshold=0.7),
"context_recall": ContextualRecallMetric(threshold=0.7),
}
return metrics, LLMTestCase
def evaluateretrievalquality(
rag_pipeline: Callable,
test_cases: list[dict],
) -> list[dict]:
"""Run a set of test queries through your RAG pipeline and score them."""
from deepeval import evaluate
from deepeval.test_case import LLMTestCase
from deepeval.metrics import (
AnswerRelevancyMetric,
FaithfulnessMetric,
ContextualPrecisionMetric,
ContextualRecallMetric,
)
results = []
for tc in test_cases:
response = rag_pipeline(tc["query"])
test_case = LLMTestCase(
input=tc["query"],
actual_output=response["answer"],
expectedoutput=tc["expectedanswer"],
retrievalcontext=response["retrievedcontexts"],
)
metrics = [
AnswerRelevancyMetric(threshold=0.7),
FaithfulnessMetric(threshold=0.7),
ContextualPrecisionMetric(threshold=0.7),
ContextualRecallMetric(threshold=0.7),
]
for metric in metrics:
metric.measure(test_case)
results.append({
"query": tc["query"],
"scores": {m.__class__.__name__: m.score for m in metrics},
"passed": all(m.is_successful() for m in metrics),
})
return results
SURVEILLANCE CONTINUE AVEC TRULENS
TruLens enregistre chaque interaction avec le pipeline RAG et lui applique des feedbacks asynchrones. En utilisant un fournisseur LLM (comme OpenAI), il juge la pertinence, l'ancrage contextuel et la qualité du contexte récupéré. Un tableau de bord (localhost:8501) permet de suivre l'évolution de la qualité dans le temps.
def setuptrulensmonitoring(ragpipeline: Callable, appname: str = "my_kb"):
"""Wrap your RAG pipeline with TruLens for continuous feedback logging."""
from trulens.core import TruSession, Feedback, Select
from trulens.providers.openai import OpenAI as TruLensOpenAI
from trulens.apps.custom import TruCustomApp, instrument
session = TruSession()
provider = TruLensOpenAI()
feedbacks = [
Feedback(provider.relevance)
.on_input()
.on_output(),
Feedback(provider.groundednessmeasurewithcotreasons)
.on(Select.RecordCalls.retrieve.rets)
.on_output(),
Feedback(provider.context_relevance)
.on_input()
.on(Select.RecordCalls.retrieve.rets),
]
@instrument
class InstrumentedRAG:
def __init__(self, pipeline):
self._pipeline = pipeline
@instrument
def retrieve(self, query: str) -> list[str]:
result = self._pipeline(query)
return result["retrieved_contexts"]
@instrument
def query(self, query: str) -> str:
result = self._pipeline(query)
return result["answer"]
instrumented = InstrumentedRAG(rag_pipeline)
tru_app = TruCustomApp(
instrumented,
appname=appname,
feedbacks=feedbacks,
)
return tru_app, session
def gettrulensdashboard_url(session) -> str:
"""Launch the TruLens dashboard to visualize quality over time."""
session.run_dashboard(port=8501)
return "http://localhost:8501"
FRAÎCHEUR DES DOCUMENTS
Un document peut devenir obsolète si son contenu évolue sans que son embedding soit recalculé. Le FreshnessMonitor vérifie périodiquement l'âge des embeddings et compare les empreintes (hash) du contenu source. Si l'écart dépasse un seuil (par défaut 30 jours), le document est signalé comme périmé.
@dataclass
class DocumentFreshness:
doc_id: str
last_updated: datetime
last_embedded: datetime
source_hash: str
class FreshnessMonitor:
def __init__(self, stalenessthresholddays: int = 30):
self.threshold = timedelta(days=stalenessthresholddays)
self.freshness_records: dict[str, DocumentFreshness] = {}
def register(self, docid: str, sourcehash: str) -> None:
now = datetime.utcnow()
self.freshnessrecords[docid] = DocumentFreshness(
docid=docid,
last_updated=now,
last_embedded=now,
sourcehash=sourcehash,
)
def check_staleness(self) -> dict:
now = datetime.utcnow()
stale, fresh = [], []
for docid, record in self.freshnessrecords.items():
age = now - record.last_embedded
if age > self.threshold:
stale.append({"id": docid, "daysstale": age.days})
else:
fresh.append(doc_id)
return {
"total": len(self.freshness_records),
"fresh": len(fresh),
"stale": len(stale),
"stale_documents": stale,
}
def checkcontentdrift(
self, docid: str, currentsource_hash: str
) -> bool:
record = self.freshnessrecords.get(docid)
if not record:
return True
return record.sourcehash .= currentsource_hash
DÉRIVE DES EMBEDDINGS
Changer de modèle d'embedding ou le mettre à jour peut rendre les anciens vecteurs incompatibles. La fonction detectembeddingdrift compare deux ensembles de vecteurs pour un même document et signale ceux dont la distance cosinus dépasse un seuil (par défaut 0,1). Un rappel pour ré-indexer.
def detectembeddingdrift(
old_embeddings: dict[str, list[float]],
new_embeddings: dict[str, list[float]],
drift_threshold: float = 0.1,
) -> dict:
drifted = []
commonids = set(oldembeddings) & set(new_embeddings)
for docid in commonids:
old = np.array(oldembeddings[docid])
new = np.array(newembeddings[docid])
cos_sim = np.dot(old, new) / (np.linalg.norm(old) * np.linalg.norm(new))
cosdist = 1 - cossim
if cosdist > driftthreshold:
drifted.append({
"id": doc_id,
})
return drifted
En combinant normalisation, quantification, recherche hybride et une boucle de surveillance automatisée, vous posez les fondations d'une base de connaissances robuste et évolutive pour vos modèles d'IA.
- Towards Data Science
L'indépendance de CLODCO est votre garantie.
Pour que l'actualité de l'IA reste sans filtre et sans concession, votre soutien est indispensable. Votre contribution est le seul moteur de notre liberté éditoriale.
Soutenir CLODCO

