NVIDIA Deepstream Gst-nvmsgbroker

From RidgeRun Developer Connection
Jump to: navigation, search

Introduction to Gst-nvmsgbroker

This wiki serves as an example on how to use the gst-nvmsgbroker plugin provided in DeepStream 6.0: https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_plugin_gst-nvmsgbroker.html

As stated in the documentation, the plugin will receive GstBuffers with NvDsMeta meta attached in the GstMeta and use the NvDsPayload metadata attached to them on each frame metadata inside the user metadata list. It will then use the nvds_msgapi_* interface to send the messages inside the payload.

The plugin allows an easy configuration in which only one configuration file needs to be specified and a path to the protocol adapter library.

Below you can find a diagram of a possible pipeline to be used with the broker element:

nvmsgbroker pipeline

And below is the flow of configuration and messages sending into the broker by using the nvmsgbroker element:

NvMsgbroker flow

Protocol adapters

The documentation lists different adapters that can be used:

  • Kafka Protocol Adapter
  • Azure MQTT Protocol Adapter
  • AMQP Protocol Adapter
  • REDIS Protocol Adapter

All libraries for the adapters are located in the following path:

/opt/nvidia/deepstream/deepstream-6.0/lib

AMQP protocol adapter example

Below is an example for using this plugin with the AMQP adapter with all steps required to read messages.

Dependencies for AMQP

DS adapter dependencies

The AMQP underlaying protocol adapter implementation uses the librabbitmq.so library, this is build from rabbitmq-c (v0.8.0). To build and install it:

git clone -b v0.8.0  --recursive https://github.com/alanxz/rabbitmq-c.git
cd rabbitmq-c && mkdir build && cd build
cmake .. && cmake --build .
sudo cp ./librabbitmq/librabbitmq.so.4 /usr/lib/aarch64-linux-gnu/
sudo cp ./librabbitmq/librabbitmq.so.4 /opt/nvidia/deepstream/deepstream-6.0/lib/

Broker dependencies

You will also need a message broker running compatible with AMQP 0-9-1, you can install one by looking at here: https://www.rabbitmq.com/install-debian.html

Or you just can execute this to install the package for the system:

sudo apt-get install rabbitmq-server

Element creation

To create a gst-nvmsgbroker:

GstElement *nvmsgbroker1 = NULL;
nvmsgbroker1 = gst_element_factory_make ("nvmsgbroker", "nvmsgbroker1");

Element configuration

1. Configure element sync:

g_object_set (G_OBJECT (nvmsgbroker1), "sync", 0, "async", false, NULL);

2. Configure the protocol library on the element:

g_object_set (G_OBJECT (nvmsgbroker1), "proto-lib", "/opt/nvidia/deepstream/deepstream-6.0/lib/libnvds_amqp_proto.so", NULL);

This will automatically select AMQP as the element message protocol adapter.

3. Configure the element adapter:

g_object_set (G_OBJECT (nvmsgbroker1), "config", "/opt/nvidia/deepstream/deepstream-6.0/sources/libs/amqp_protocol_adaptor/cfg_amqp.txt", NULL);

In this case, the following file will be used:

[message-broker]
hostname = localhost
port = 5672
username = guest
password = guest
exchange = amq.topic
topic = topicname
#share-connection = 1

This will configure the element to send messages with this configuration for the broker.

Reading messages

To read the messages, follow a series of steps:

1. Enable the management plugin for rabbitmq:

sudo rabbitmq-plugins enable rabbitmq_management

2. Declare a new queue named myqueue for the user we are using (guest):

sudo rabbitmqadmin -u guest -p guest -V / declare queue name=myqueue durable=false auto_delete=true

3. Declare a binding to forward the messages received in the specified exchange and topic, into the queue.

sudo rabbitmqadmin -u guest -p guest -V / declare binding source=amq.topic destination=myqueue routing_key=topicname

4. Read the messages back from the queue:

rabbitmqadmin get queue=myqueue count=1 requeue=false

Adding NvDsPayload to NvDsMeta

Assuming that you have a batch_meta (NvDsBatchMeta*) and a frame_meta (NvDsFrameMeta*), you can add the payload like this to these:

/* Acquire new meta and add it to the frame */
NvDsUserMeta* new_user_meta = nvds_acquire_user_meta_from_pool(batch_meta);
new_user_meta->base_meta.meta_type = NVDS_PAYLOAD_META;
nvds_add_user_meta_to_frame(frame_meta, new_user_meta);

/* Create payload */
NvDsPayload* payload = g_new(NvDsPayload, 1);

payload->payloadSize = <PAYLOAD_SIZE>;
gchar* payload_message = (gchar*) g_malloc0(<PAYLOAD_SIZE>);

/* Copy the data to the payload_message ptr */
g_stpcpy(payload_message, <DATA_PTR>);

payload->payload = payload_message;
new_user_meta->user_meta_data = (void*) payload;

You may need to find a way to free this memory if necessary for your use case.


RidgeRun Resources

Quick Start Client Engagement Process RidgeRun Blog Homepage
Technical and Sales Support RidgeRun Online Store RidgeRun Videos Contact Us

OOjs UI icon message-progressive.svg Contact Us

Visit our Main Website for the RidgeRun Products and Online Store. RidgeRun Engineering informations are available in RidgeRun Professional Services, RidgeRun Subscription Model and Client Engagement Process wiki pages. Please email to support@ridgerun.com for technical questions and contactus@ridgerun.com for other queries. Contact details for sponsoring the RidgeRun GStreamer projects are available in Sponsor Projects page. Ridgerun-logo.svg
RR Contact Us.png