NVIDIA DeepStream offre des pipelines d'analyse vidéo prêts à l'emploi. Mais pour des modèles personnalisés ou des traitements spécifiques, il faut créer son propre plugin GStreamer en Python. Voici comment faire, étape par étape.

POURQUOI UN PLUGIN PERSONNALISÉ POUR DEEPSTREAM ?

NVIDIA DeepStream fournit un pipeline clé en main pour l'analyse vidéo multi-flux : décodage accéléré par Matériel, suivi d'objets, affichage à l'écran et envoi de messages, le tout connecté via GStreamer. Pour les modèles de détection standard exportés vers TensorRT, le composant nvinfer gère tout automatiquement.

Mais cette solution a des limites. Les modèles vision-langage, les traitements post-détection personnalisés, les boîtes englobantes inclinées, ou encore le besoin de changer de modèle à chaud pendant l'exécution, sont autant de cas où les hypothèses de nvinfer ne fonctionnent plus. Parfois, une équipe a déjà un pipeline d'inférence PyTorch bien optimisé et souhaite que DeepStream l'utilise directement, sans tout réécrire dans un fichier de configuration.

Pour les modèles YOLO en particulier, le projet DeepStream-Yolo de Marcos Luciano propose déjà une excellente implémentation du post-traitement personnalisé en C++. Si le C++ est une option, commencez par là. Cet article prend une autre voie : obtenir le même résultat entièrement en Python, avec un plugin GStreamer personnalisé utilisant pyservicemaker, sans sacrifier le débit.

L'idée clé qui rend cela possible : les éléments en aval comme nvtracker, nvdsosd et nvmsgconv ne se soucient pas de savoir quel élément a produit les métadonnées de détection. Si vous écrivez correctement dans la structure de métadonnées de DeepStream, le reste de l'écosystème fonctionne comme si nvinfer n'avait jamais existé.

COMPRENDRE LES MÉTADONNÉES DE DEEPSTREAM

Chaque buffer qui circule dans un pipeline DeepStream transporte plus que des données pixel. Dès que les images passent par nvstreammux, chaque GstBuffer possède une structure NvDsBatchMeta attachée. Cette hiérarchie est claire et documentée officiellement :

NvDsBatchMeta
├── NvDsUserMeta                        (métadonnées personnalisées au niveau du lot)
└── NvDsFrameMeta                       (une par flux source)
    ├── NvDsUserMeta                    (métadonnées personnalisées au niveau image)
    └── NvDsObjectMeta                  (une par objet détecté)
        ├── NvDsClassifierMeta
        └── NvDsUserMeta                (métadonnées personnalisées au niveau objet)

NvDsBatchMeta décrit l'ensemble du lot. Chaque NvDsFrameMeta correspond à un flux source et contient des informations comme l'identifiant de la source et le numéro de l'image. Chaque NvDsObjectMeta représente une seule détection. Quand notre plugin écrit des détections, il crée une instance de NvDsObjectMeta pour chacune d'elles.

Le point crucial à comprendre : aucune de ces structures n'appartient à nvinfer. Il s'agit d'un contrat de données partagé. Tout élément GStreamer dans le pipeline peut lire ces données, y écrire, ou les deux.

Notre plugin personnalisé va simplement écrire les détections dans cette structure, exactement comme le ferait nvinfer. Tout ce qui est en aval récupère ces données sans modification.

CONTRAINTES IMPORTANTES AVANT DE CODER

Une contrainte majeure est à comprendre avant d'écrire la moindre ligne de code : les instances de NvDsObjectMeta ne peuvent pas être construites directement en Python. Essayer d'instancier cette classe déclenche une erreur No constructor defined. au moment de l'exécution.

La raison est architecturale. DeepStream gère ses objets de métadonnées via des pools de mémoire, des blocs pré-alloués qui sont recyclés entre les images pour éviter le surcoût des allocations et désallocations répétées dans un pipeline à haut débit. Ces pools sont gérés côté C et appartiennent à NvDsBatchMeta. Les bindings Python exposent l'accès à ces pools, mais ne fournissent délibérément pas de constructeur côté Python. Créer une instance de NvDsObjectMeta en dehors du pool contournerait la gestion du cycle de vie qui maintient l'utilisation mémoire de DeepStream prévisible.

La bonne façon d'en obtenir une est de demander au lot : batchmeta.acquireobject_meta(), qui vous donne une instance pré-allouée depuis le pool. Quand l'image est traitée, DeepStream la retourne automatiquement au pool.

INTERAGIR AVEC LES MÉTADONNÉES DE DEEPSTREAM EN PYTHON

Pour interagir avec les métadonnées de DeepStream depuis Python, on utilise pyservicemaker, le SDK Python actuel et supporté par NVIDIA pour DeepStream. La documentation officielle couvre les bases des pipelines et des flux, mais ne montre pas comment écrire et attacher des métadonnées depuis un élément d'inférence personnalisé. C'est ce vide que cet article comble.

L'abstraction clé est BatchMetadataOperator. En sous-classant cette classe et en implémentant handlemetadata(batchmeta), vous obtenez accès à l'intégralité de NvDsBatchMeta pour chaque buffer circulant dans le pipeline. Depuis là, itérer sur les images est aussi simple que d'utiliser batchmeta.frameitems et d'attacher un objet de détection.

pyservicemaker fournit également un wrapper Buffer autour de Gst.Buffer qui expose directement batch_meta, et surtout une méthode extract(batch_id) qui retourne un handle DLPack vers la mémoire GPU de chaque image. C'est ce qui rend possible l'inférence sans copie, car on peut passer l'image directement à TensorRT sans jamais quitter le GPU.

DÉCOUVRIR UN PLUGIN GSTREAMER EN PYTHON

GStreamer découvre les plugins au moment de l'exécution en scannant les répertoires listés dans GSTPLUGINPATH. Pour les plugins Python en particulier, il cherche dans un sous-répertoire python/ à l'intérieur de chacun de ces chemins. Cela signifie que votre plugin n'est qu'un fichier .py déposé au bon endroit, sans compilation, sans CMake, sans bibliothèque partagée.

Le compromis est que le motif d'enregistrement est strict et qu'une erreur de configuration produit des échecs silencieux difficiles à déboguer :

$GSTPLUGINPATH/
└── python/
    └── gstexampleplugin.py   # votre plugin

Définissez GSTPLUGINPATH pour pointer vers le répertoire parent et GStreamer trouvera automatiquement python/gstexampleplugin.py au prochain lancement du pipeline.

LE SQUELETTE MINIMAL D'UN ÉLÉMENT D'INFÉRENCE

Voici le squelette minimal d'un élément d'inférence en passe-through : il reçoit des buffers vidéo en lot, exécute l'inférence, attache les métadonnées, et passe le buffer en aval sans modification.

import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
from gi.repository import Gst, GstBase, GObject

import torch
from pyservicemaker import Buffer

GSTPLUGINNAME = "gstexampleplugin"

Gst.init(None)

class GstExamplePlugin(GstBase.BaseTransform):

    __gstmetadata__ = (
        'GstExamplePlugin',                     # nom
        'Filter/Effect/Video',                  # classification
        'Custom inference element',             # description
        'Your Name'                             # auteur
    )

    srcformat = Gst.Caps.fromstring(
        "video/x-raw(memory:NVMM), format=RGB, "
        "width=(int)[ 1, 2147483647 ], height=(int)[ 1, 2147483647 ], "
        "framerate=(fraction)[ 0/1, 2147483647/1 ]"
    )
    sinkformat = Gst.Caps.fromstring(
        "video/x-raw(memory:NVMM), format=RGB, "
        "width=(int)[ 1, 2147483647 ], height=(int)[ 1, 2147483647 ], "
        "framerate=(fraction)[ 0/1, 2147483647/1 ]"
    )

    srcpadtemplate = Gst.PadTemplate.new(
        "src", Gst.PadDirection.SRC, Gst.PadPresence.ALWAYS, src_format
    )
    sinkpadtemplate = Gst.PadTemplate.new(
        "sink", Gst.PadDirection.SINK, Gst.PadPresence.ALWAYS, sink_format
    )
    __gsttemplates__ = (srcpadtemplate, sinkpadtemplate)

    __gproperties__ = {
        'model-engine': (
            str,
            'TensorRT engine path',
            'Path to the .engine file',
            '',
            GObject.ParamFlags.READWRITE
        ),
        'confidence-threshold': (
            float,
            'Confidence threshold',
            'Minimum confidence to attach a detection',
            0.0, 1.0, 0.5,
            GObject.ParamFlags.READWRITE
        ),
    }

    def __init__(self):
        super().__init__()
        self.model_engine = ''
        self.confidence_threshold = 0.5
        self.engine = None

    def dogetproperty(self, prop):
        if prop.name == 'model-engine':
            return self.model_engine
        elif prop.name == 'confidence-threshold':
            return self.confidence_threshold

    def dosetproperty(self, prop, value):
        if prop.name == 'model-engine':
            self.model_engine = value
        elif prop.name == 'confidence-threshold':
            self.confidence_threshold = value

    def do_start(self):
        # Chargez votre moteur TensorRT ici
        self.engine = loadengine(self.modelengine) # Cette fonction doit être implémentée
        return True

    def dotransformip(self, gst_buffer: Gst.Buffer) -> Gst.FlowReturn:
        """Transformation en place : attache les métadonnées, passe le buffer inchangé."""
        buffer = Buffer(gst_buffer)
        batchmeta = buffer.batchmeta

        frames = []
        for framemeta in batchmeta.frame_items:
            t = torch.utils.dlpack.fromdlpack(buffer.extract(framemeta.batch_id))
            frames.append(t)
        batch = torch.stack(frames, dim=0)

        # Exécutez votre inférence de modèle
        results = self.engine(batch)

        # Il faut maintenant itérer sur les résultats pour chaque image
        # et les attacher à l'objet_meta dans le cas d'une détection/ségmentation
        # sinon, vous pouvez le faire comme user_meta
        # Le code suivant est du pseudocode, qui dépend de votre inférence
        for framemeta in batchmeta.frame_items:
            for det in results:
                obj = batchmeta.acquireobject_meta()
                # Remplissez l'objet avec chaque détection
                .
                frame_meta.append(obj)

        return Gst.FlowReturn.OK


# --- Enregistrement ---
GObject.type_register(GstExamplePlugin)
__gstelementfactory__ = (GSTPLUGINNAME, Gst.Rank.NONE, GstExamplePlugin)

Quelques points importants à noter sur ce squelette :

  • GstBase.BaseTransform est la bonne classe de base pour un filtre en place, un élément qui reçoit un buffer, le modifie (en attachant des métadonnées) et le passe en aval. On surcharge dotransformip plutôt que do_transform car on n'alloue pas de nouveau buffer de sortie.
  • __gstmetadata__ et __gsttemplates__ ne sont pas optionnels. GStreamer ne va pas enregistrer l'élément sans eux. La chaîne de capacités video/x-raw(memory:NVMM) indique à GStreamer que cet élément fonctionne avec la mémoire NVIDIA, ce qui est essentiel pour rester sur GPU dans un pipeline DeepStream.
  • __gproperties__ expose model-engine et confidence-threshold comme propriétés de premier niveau de GStreamer. Cela signifie que vous pouvez les définir depuis une commande gst-launch ou depuis du code Python de pipeline sans toucher au code source.
  • Les deux dernières lignes sont obligatoires pour l'enregistrement : GObject.type_register informe le système de types GObject de l'existence de la classe, et __gstelementfactory__ indique à GStreamer quel nom d'élément exposer et quelle classe instancier.

VÉRIFIER LE PLUGIN

Une fois le fichier en place et le cache vidé, vérifiez l'enregistrement avec :

GSTPLUGINPATH=/path/to/your/plugins gst-inspect-1.0 gstexampleplugin

Vous devriez voir les métadonnées de l'élément, les modèles de pads et les deux propriétés listées. Si vous les voyez, GStreamer connaît votre plugin et vous êtes prêt à l'intégrer dans un pipeline.

Le squelette du plugin est en place. Le prochain défi : remplir la logique d'inférence.

EXEMPLE DE PIPELINE SIMPLE

Voici un exemple simple qui exécute l'inférence et affiche les images par seconde :

gst-launch-1.0 -v \
  nvstreammux name=m width=1280 height=720 batch-size=1 \
    batched-push-timeout=33000 . \
  nvvideoconvert nvbuf-memory-type=0 . \
  'video/x-raw(memory:NVMM), format=RGB' . \
  gstyoloplugin model-path=/path/to/yolo26s.engine . \
  fpsdisplaysink text-overlay=false silent=false sync=false \
    video-sink=fakesink \
  uridecodebin uri=file:///path/to/video.mp4 . m.sink_0

PROBLÈME DE COMPATIBILITÉ : TENSORRT ET PYGIOBJECT

Si vous lisez le code, vous avez peut-être remarqué qu'on surcharge l'objet tuple, mais uniquement à l'intérieur du module ultralytics.nn.backends.tensorrt, car c'est là que se situe le problème. Il existe un cas limite connu de compatibilité entre les bindings Python de TensorRT et le framework de wrapper Python de GStreamer (PyGObject) qui fera planter votre pipeline avec le célèbre message « Segmentation fault (core dumped) ». C'est pourquoi il a fallu créer ce snippet de code qui aide à préserver le comportement attendu :

import ultralytics.nn.backends.tensorrt as trt_backend

originaltuple = tuple

def safe_tuple(obj):
    if "tensorrt" in type(obj).__module__ and type(obj).__name__ == "Dims":
        return originaltuple(obj[i] for i in range(len(obj)))
    return originaltuple(obj)

trtbackend.tuple = safetuple

Ce code remplace la référence à tuple à l'intérieur de l'espace de noms du backend Ultralytics, au moment de l'import, par une version qui utilise l'accès par index pour les objets Dims, sans toucher au reste. Ce n'est pas élégant, mais c'est chirurgical et cela doit se faire au moment de l'import, avant que le modèle ne soit instancié.

LA BOUCLE D'INFÉRENCE EN ZÉRO-COPY

Voici le snippet pour l'inférence sans copie grâce à DLPack :

frames = []
for framemeta in batchmeta.frame_items:
    t = torch.utils.dlpack.fromdlpack(buffer.extract(framemeta.batch_id))
    frames.append(t)
batch = torch.stack(frames, dim=0)

PRÉTRAITEMENT DES IMAGES POUR YOLO

Les modèles YOLO, lorsqu'ils reçoivent un torch.Tensor, attendent une forme d'entrée fixe (N, 3, 640, 640) selon la documentation. Pourtant, les images sortant de nvstreammux auront la résolution de votre source. L'approche utilisée est le letterboxing : redimensionnez l'image pour qu'elle rentre dans les dimensions cibles tout en conservant les proportions, puis remplissez le reste d'espace.

L'idée clé ici est qu'on peut faire cela entièrement sur le GPU, sur l'ensemble du lot en une seule fois, sans jamais toucher à la mémoire CPU. Avec l'extraction des images, le letterboxing, l'inférence et l'inversion des coordonnées qui se déroulent tous sur le GPU dans un seul appel à dotransformip, le plugin se comporte exactement comme nvinfer du point de vue de tous les éléments en aval, mais avec toute la flexibilité d'un pipeline d'inférence Python en dessous.

À partir de là, le reste du pipeline DeepStream prend le relais : nvtracker assigne des identifiants, nvdsosd dessine les surimpressions et nvmsgconv sérialise les charges utiles.

Si vous avez suivi jusqu'ici, vous avez un modèle fonctionnel pour remplacer nvinfer par votre propre élément d'inférence Python, et surtout, vous comprenez pourquoi chaque pièce est conçue de cette façon.

CODE COMPLET DISPONIBLE

Le code complet du plugin est disponible sous forme de GitHub Gist. Si vous construisez quelque chose dessus : un modèle différent, une configuration multi-flux ou une intégration VLM, je serais ravi d'entendre comment cela se passe. Bon codage .

Sources :
  • Towards Data Science

L'indépendance de CLODCO est votre garantie.

Pour que l'actualité de l'IA reste sans filtre et sans concession, votre soutien est indispensable. Votre contribution est le seul moteur de notre liberté éditoriale.

Soutenir CLODCO