NVIDIA DeepStream offre des pipelines d'analyse vidéo prêts à l'emploi. Mais pour des modèles personnalisés ou des traitements spécifiques, il faut créer son propre plugin GStreamer en Python. Voici comment faire, étape par étape.
POURQUOI UN PLUGIN PERSONNALISÉ POUR DEEPSTREAM ?
NVIDIA DeepStream fournit un pipeline clé en main pour l'analyse vidéo multi-flux : décodage accéléré par Matériel, suivi d'objets, affichage à l'écran et envoi de messages, le tout connecté via GStreamer. Pour les modèles de détection standard exportés vers TensorRT, le composant nvinfer gère tout automatiquement.
Mais cette solution a des limites. Les modèles vision-langage, les traitements post-détection personnalisés, les boîtes englobantes inclinées, ou encore le besoin de changer de modèle à chaud pendant l'exécution, sont autant de cas où les hypothèses de nvinfer ne fonctionnent plus. Parfois, une équipe a déjà un pipeline d'inférence PyTorch bien optimisé et souhaite que DeepStream l'utilise directement, sans tout réécrire dans un fichier de configuration.
L'idée clé qui rend cela possible : les éléments en aval comme nvtracker, nvdsosd et nvmsgconv ne se soucient pas de savoir quel élément a produit les métadonnées de détection. Si vous écrivez correctement dans la structure de métadonnées de DeepStream, le reste de l'écosystème fonctionne comme si nvinfer n'avait jamais existé.
COMPRENDRE LES MÉTADONNÉES DE DEEPSTREAM
Chaque buffer qui circule dans un pipeline DeepStream transporte plus que des données pixel. Dès que les images passent par nvstreammux, chaque GstBuffer possède une structure NvDsBatchMeta attachée. Cette hiérarchie est claire et documentée officiellement :
NvDsBatchMeta
├── NvDsUserMeta (métadonnées personnalisées au niveau du lot)
└── NvDsFrameMeta (une par flux source)
├── NvDsUserMeta (métadonnées personnalisées au niveau image)
└── NvDsObjectMeta (une par objet détecté)
├── NvDsClassifierMeta
└── NvDsUserMeta (métadonnées personnalisées au niveau objet)
NvDsBatchMeta décrit l'ensemble du lot. Chaque NvDsFrameMeta correspond à un flux source et contient des informations comme l'identifiant de la source et le numéro de l'image. Chaque NvDsObjectMeta représente une seule détection. Quand notre plugin écrit des détections, il crée une instance de NvDsObjectMeta pour chacune d'elles.
Le point crucial à comprendre : aucune de ces structures n'appartient à nvinfer. Il s'agit d'un contrat de données partagé. Tout élément GStreamer dans le pipeline peut lire ces données, y écrire, ou les deux.
Notre plugin personnalisé va simplement écrire les détections dans cette structure, exactement comme le ferait nvinfer. Tout ce qui est en aval récupère ces données sans modification.
CONTRAINTES IMPORTANTES AVANT DE CODER
Une contrainte majeure est à comprendre avant d'écrire la moindre ligne de code : les instances de NvDsObjectMeta ne peuvent pas être construites directement en Python. Essayer d'instancier cette classe déclenche une erreur No constructor defined. au moment de l'exécution.
La raison est architecturale. DeepStream gère ses objets de métadonnées via des pools de mémoire, des blocs pré-alloués qui sont recyclés entre les images pour éviter le surcoût des allocations et désallocations répétées dans un pipeline à haut débit. Ces pools sont gérés côté C et appartiennent à NvDsBatchMeta. Les bindings Python exposent l'accès à ces pools, mais ne fournissent délibérément pas de constructeur côté Python. Créer une instance de NvDsObjectMeta en dehors du pool contournerait la gestion du cycle de vie qui maintient l'utilisation mémoire de DeepStream prévisible.
La bonne façon d'en obtenir une est de demander au lot : batchmeta.acquireobject_meta(), qui vous donne une instance pré-allouée depuis le pool. Quand l'image est traitée, DeepStream la retourne automatiquement au pool.
INTERAGIR AVEC LES MÉTADONNÉES DE DEEPSTREAM EN PYTHON
Pour interagir avec les métadonnées de DeepStream depuis Python, on utilise pyservicemaker, le SDK Python actuel et supporté par NVIDIA pour DeepStream. La documentation officielle couvre les bases des pipelines et des flux, mais ne montre pas comment écrire et attacher des métadonnées depuis un élément d'inférence personnalisé. C'est ce vide que cet article comble.
L'abstraction clé est BatchMetadataOperator. En sous-classant cette classe et en implémentant handlemetadata(batchmeta), vous obtenez accès à l'intégralité de NvDsBatchMeta pour chaque buffer circulant dans le pipeline. Depuis là, itérer sur les images est aussi simple que d'utiliser batchmeta.frameitems et d'attacher un objet de détection.
pyservicemaker fournit également un wrapper Buffer autour de Gst.Buffer qui expose directement batch_meta, et surtout une méthode extract(batch_id) qui retourne un handle DLPack vers la mémoire GPU de chaque image. C'est ce qui rend possible l'inférence sans copie, car on peut passer l'image directement à TensorRT sans jamais quitter le GPU.
DÉCOUVRIR UN PLUGIN GSTREAMER EN PYTHON
GStreamer découvre les plugins au moment de l'exécution en scannant les répertoires listés dans GSTPLUGINPATH. Pour les plugins Python en particulier, il cherche dans un sous-répertoire python/ à l'intérieur de chacun de ces chemins. Cela signifie que votre plugin n'est qu'un fichier .py déposé au bon endroit, sans compilation, sans CMake, sans bibliothèque partagée.
Le compromis est que le motif d'enregistrement est strict et qu'une erreur de configuration produit des échecs silencieux difficiles à déboguer :
$GSTPLUGINPATH/
└── python/
└── gstexampleplugin.py # votre plugin
Définissez GSTPLUGINPATH pour pointer vers le répertoire parent et GStreamer trouvera automatiquement python/gstexampleplugin.py au prochain lancement du pipeline.
LE SQUELETTE MINIMAL D'UN ÉLÉMENT D'INFÉRENCE
Voici le squelette minimal d'un élément d'inférence en passe-through : il reçoit des buffers vidéo en lot, exécute l'inférence, attache les métadonnées, et passe le buffer en aval sans modification.
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
from gi.repository import Gst, GstBase, GObject
import torch
from pyservicemaker import Buffer
GSTPLUGINNAME = "gstexampleplugin"
Gst.init(None)
class GstExamplePlugin(GstBase.BaseTransform):
__gstmetadata__ = (
'GstExamplePlugin', # nom
'Filter/Effect/Video', # classification
'Custom inference element', # description
'Your Name' # auteur
)
srcformat = Gst.Caps.fromstring(
"video/x-raw(memory:NVMM), format=RGB, "
"width=(int)[ 1, 2147483647 ], height=(int)[ 1, 2147483647 ], "
"framerate=(fraction)[ 0/1, 2147483647/1 ]"
)
sinkformat = Gst.Caps.fromstring(
"video/x-raw(memory:NVMM), format=RGB, "
"width=(int)[ 1, 2147483647 ], height=(int)[ 1, 2147483647 ], "
"framerate=(fraction)[ 0/1, 2147483647/1 ]"
)
srcpadtemplate = Gst.PadTemplate.new(
"src", Gst.PadDirection.SRC, Gst.PadPresence.ALWAYS, src_format
)
sinkpadtemplate = Gst.PadTemplate.new(
"sink", Gst.PadDirection.SINK, Gst.PadPresence.ALWAYS, sink_format
)
__gsttemplates__ = (srcpadtemplate, sinkpadtemplate)
__gproperties__ = {
'model-engine': (
str,
'TensorRT engine path',
'Path to the .engine file',
'',
GObject.ParamFlags.READWRITE
),
'confidence-threshold': (
float,
'Confidence threshold',
'Minimum confidence to attach a detection',
0.0, 1.0, 0.5,
GObject.ParamFlags.READWRITE
),
}
def __init__(self):
super().__init__()
self.model_engine = ''
self.confidence_threshold = 0.5
self.engine = None
def dogetproperty(self, prop):
if prop.name == 'model-engine':
return self.model_engine
elif prop.name == 'confidence-threshold':
return self.confidence_threshold
def dosetproperty(self, prop, value):
if prop.name == 'model-engine':
self.model_engine = value
elif prop.name == 'confidence-threshold':
self.confidence_threshold = value
def do_start(self):
# Chargez votre moteur TensorRT ici
self.engine = loadengine(self.modelengine) # Cette fonction doit être implémentée
return True
def dotransformip(self, gst_buffer: Gst.Buffer) -> Gst.FlowReturn:
"""Transformation en place : attache les métadonnées, passe le buffer inchangé."""
buffer = Buffer(gst_buffer)
batchmeta = buffer.batchmeta
frames = []
for framemeta in batchmeta.frame_items:
t = torch.utils.dlpack.fromdlpack(buffer.extract(framemeta.batch_id))
frames.append(t)
batch = torch.stack(frames, dim=0)
# Exécutez votre inférence de modèle
results = self.engine(batch)
# Il faut maintenant itérer sur les résultats pour chaque image
# et les attacher à l'objet_meta dans le cas d'une détection/ségmentation
# sinon, vous pouvez le faire comme user_meta
# Le code suivant est du pseudocode, qui dépend de votre inférence
for framemeta in batchmeta.frame_items:
for det in results:
obj = batchmeta.acquireobject_meta()
# Remplissez l'objet avec chaque détection
.
frame_meta.append(obj)
return Gst.FlowReturn.OK
# --- Enregistrement ---
GObject.type_register(GstExamplePlugin)
__gstelementfactory__ = (GSTPLUGINNAME, Gst.Rank.NONE, GstExamplePlugin)
Quelques points importants à noter sur ce squelette :
- GstBase.BaseTransform est la bonne classe de base pour un filtre en place, un élément qui reçoit un buffer, le modifie (en attachant des métadonnées) et le passe en aval. On surcharge
dotransformipplutôt quedo_transformcar on n'alloue pas de nouveau buffer de sortie. - __gstmetadata__ et __gsttemplates__ ne sont pas optionnels. GStreamer ne va pas enregistrer l'élément sans eux. La chaîne de capacités
video/x-raw(memory:NVMM)indique à GStreamer que cet élément fonctionne avec la mémoire NVIDIA, ce qui est essentiel pour rester sur GPU dans un pipeline DeepStream. - __gproperties__ expose model-engine et confidence-threshold comme propriétés de premier niveau de GStreamer. Cela signifie que vous pouvez les définir depuis une commande
gst-launchou depuis du code Python de pipeline sans toucher au code source. - Les deux dernières lignes sont obligatoires pour l'enregistrement :
GObject.type_registerinforme le système de types GObject de l'existence de la classe, et __gstelementfactory__ indique à GStreamer quel nom d'élément exposer et quelle classe instancier.
VÉRIFIER LE PLUGIN
Une fois le fichier en place et le cache vidé, vérifiez l'enregistrement avec :
GSTPLUGINPATH=/path/to/your/plugins gst-inspect-1.0 gstexampleplugin
Vous devriez voir les métadonnées de l'élément, les modèles de pads et les deux propriétés listées. Si vous les voyez, GStreamer connaît votre plugin et vous êtes prêt à l'intégrer dans un pipeline.
EXEMPLE DE PIPELINE SIMPLE
Voici un exemple simple qui exécute l'inférence et affiche les images par seconde :
gst-launch-1.0 -v \
nvstreammux name=m width=1280 height=720 batch-size=1 \
batched-push-timeout=33000 . \
nvvideoconvert nvbuf-memory-type=0 . \
'video/x-raw(memory:NVMM), format=RGB' . \
gstyoloplugin model-path=/path/to/yolo26s.engine . \
fpsdisplaysink text-overlay=false silent=false sync=false \
video-sink=fakesink \
uridecodebin uri=file:///path/to/video.mp4 . m.sink_0
PROBLÈME DE COMPATIBILITÉ : TENSORRT ET PYGIOBJECT
Si vous lisez le code, vous avez peut-être remarqué qu'on surcharge l'objet tuple, mais uniquement à l'intérieur du module ultralytics.nn.backends.tensorrt, car c'est là que se situe le problème. Il existe un cas limite connu de compatibilité entre les bindings Python de TensorRT et le framework de wrapper Python de GStreamer (PyGObject) qui fera planter votre pipeline avec le célèbre message « Segmentation fault (core dumped) ». C'est pourquoi il a fallu créer ce snippet de code qui aide à préserver le comportement attendu :
import ultralytics.nn.backends.tensorrt as trt_backend
originaltuple = tuple
def safe_tuple(obj):
if "tensorrt" in type(obj).__module__ and type(obj).__name__ == "Dims":
return originaltuple(obj[i] for i in range(len(obj)))
return originaltuple(obj)
trtbackend.tuple = safetuple
Ce code remplace la référence à tuple à l'intérieur de l'espace de noms du backend Ultralytics, au moment de l'import, par une version qui utilise l'accès par index pour les objets Dims, sans toucher au reste. Ce n'est pas élégant, mais c'est chirurgical et cela doit se faire au moment de l'import, avant que le modèle ne soit instancié.
LA BOUCLE D'INFÉRENCE EN ZÉRO-COPY
Voici le snippet pour l'inférence sans copie grâce à DLPack :
frames = []
for framemeta in batchmeta.frame_items:
t = torch.utils.dlpack.fromdlpack(buffer.extract(framemeta.batch_id))
frames.append(t)
batch = torch.stack(frames, dim=0)
PRÉTRAITEMENT DES IMAGES POUR YOLO
Les modèles YOLO, lorsqu'ils reçoivent un torch.Tensor, attendent une forme d'entrée fixe (N, 3, 640, 640) selon la documentation. Pourtant, les images sortant de nvstreammux auront la résolution de votre source. L'approche utilisée est le letterboxing : redimensionnez l'image pour qu'elle rentre dans les dimensions cibles tout en conservant les proportions, puis remplissez le reste d'espace.
L'idée clé ici est qu'on peut faire cela entièrement sur le GPU, sur l'ensemble du lot en une seule fois, sans jamais toucher à la mémoire CPU. Avec l'extraction des images, le letterboxing, l'inférence et l'inversion des coordonnées qui se déroulent tous sur le GPU dans un seul appel à dotransformip, le plugin se comporte exactement comme nvinfer du point de vue de tous les éléments en aval, mais avec toute la flexibilité d'un pipeline d'inférence Python en dessous.
À partir de là, le reste du pipeline DeepStream prend le relais : nvtracker assigne des identifiants, nvdsosd dessine les surimpressions et nvmsgconv sérialise les charges utiles.
CODE COMPLET DISPONIBLE
Le code complet du plugin est disponible sous forme de GitHub Gist. Si vous construisez quelque chose dessus : un modèle différent, une configuration multi-flux ou une intégration VLM, je serais ravi d'entendre comment cela se passe. Bon codage .
- Towards Data Science
L'indépendance de CLODCO est votre garantie.
Pour que l'actualité de l'IA reste sans filtre et sans concession, votre soutien est indispensable. Votre contribution est le seul moteur de notre liberté éditoriale.
Soutenir CLODCO


