Contents

Prerequisites for MQTT publishing

(Broker configuration, certificates generation are for development and testing purposes only)

Prior to EVAM publishing, MQTT broker and subscriber needs to be configured and started.

Configure and start MQTT broker

MQTT broker should be configured to accept connections. Start the broker from [WORK_DIR]/IEdgeInsights/EdgeVideoAnalyticsMicroservice directory. For example, start eclipse mosquitto MQTT broker using configuration in mosquitto folder([WORK_DIR]/IEdgeInsights/EdgeVideoAnalyticsMicroservice/mosquitto) as below. Make sure echo $PWD shows the root of EVAM repository.

docker run -d --name=mqtt_broker -p 1883:1883 -v $PWD/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto

With the above configuration, the broker listens on port 1883.

Start MQTT Subscriber

Once the mqtt broker is configured and up, connect to the mqtt broker and subscriber to topics in order to receive messages that will be published.

  • For example, run the below command to subscribe using mosquitto_sub client,

    sudo apt install mosquitto-clients
    
    mosquitto_sub --topic <topic name> -p 1883 -h <mqtt broker address>
    
  • Alternatively, Eclipse Paho MQTT Python Client can be used for subscribing to broker and receiving messages.

    Please make sure to update the <topic_name> and <mqtt broker address> in the script before running.

    Make sure to install the python packages:

    pip install paho-mqtt opencv-python numpy
    
    import paho.mqtt.subscribe as subscribe
    
    def on_message(client, userdata, message):
        print("%s %s" % (message.topic, message.payload))
    
    subscribe.callback(on_message, "<topic name>", hostname="<mqtt broker address>")
    

    Here is a sample subscriber script for receiving both frame (JPEG encoded) and metadata, printing the metadata and saving the frame as jpg image.

    import paho.mqtt.subscribe as subscribe
    import json
    import base64
    import numpy as np
    import cv2
    
    def on_message(client, userdata, message):
        print("Receiving frame and metadata")
        msg = json.loads(message.payload)
    
        metadata = msg[0]
        print("Metadata:", metadata)
    
        output_file_name = "output_" + str(metadata['frame_id']) + ".jpg"
        print("Saving frame to {}".format(output_file_name))
        img = base64.b64decode(msg[1].encode("utf-8"))
        img = np.frombuffer(img, dtype="uint8")
        img = cv2.imdecode(img, cv2.IMREAD_UNCHANGED)
        cv2.imwrite(output_file_name, img)
    
    subscribe.callback(on_message, "<topic_name>", hostname="<mqtt broker address>")
    

    More details on the subscribe helper functions can be found here

Configuration

Configuration options

Add below configuration in config.json([WORK_DIR]/IEdgeInsights/EdgeVideoAnalyticsMicroservice/eii/docs/config.json) to enable publishing to the mqtt broker.

"mqtt_publisher": {
  "host": "<mqtt broker address>",
  "port": 1883
}
  • host mqtt broker hostname or IP address

  • port port to connect to the broker

  • topic topic to which message will be published. Defaults to edge_video_analytics_results (optional)

  • publish_frame whether to publish only metadata or both frames and metadata can be published to the mqtt broker. Defaults to false (optional)

    • When publish_frame is false, only metadata will be published.

    • When publish_frame is true, both metadata and frame will be published.
      Payload | Type |
      :——————- | :————————- |
      Only metadata | JSON (metadata)
      Both frame and metadata| JSON (metadata, frame) where frames are Base64 encoded UTF-8 string |
  • qos quality of service level to use which defaults to 0. Values can be 0, 1, 2. (optional) More details on the QoS levels can be found here

  • protocol protocol version to use which defaults to 4 i.e. MQTTv311. Values can be 3, 4, 5 based on the versions MQTTv3, MQTTv311, MQTTv5 respectively (optional)

Metadata filtering

Below configuration can be used to optionally filter out messages sent to mqtt broker for classification and detection usecases.

  • For detection:

    "mqtt_publisher": {
      ...
      "filter": {
            "type": "detection",
            "label_score": {"Person": 0.4}
      }
      ...
    }
    
  • For classification:

    "mqtt_publisher": {
        ...
        "filter": {
            "type": "classification",
              "label_score": {
                  "anomalous": 0.5
              }
        }
        ...
      }
    
    • type to specify type of filter config classification or detection

    • label score to specify key-value pair class label: threshold. Any detections < threshold will be skipped.

    • Note:

      • For detection, metadata is expected to have, for example, …’predictions’: {‘labels_to_revisit_fullscene’: None, ‘kind’: ‘prediction’, ‘id’: None, ‘maps’: [], ‘media_identifier’: None, ‘modified’: None,’annotations’: [{‘labels_to_revisit’: None,’shape’: {‘type’: ‘RECTANGLE’, ‘x’: 196, ‘height’: 328, ‘y’: 567, ‘width’: 272}, ‘id’: None ‘labels’: [{‘id’: None, ‘probability’: 0.527821958065033, ‘source’: None, ‘color’: ‘#25a18eff’, ‘name’: ‘Person’}], ‘modified’: None}… or …’annotations’: {‘objects’: [{‘label’: ‘Person’, ‘score’: 0.6827021241188049, ‘bbox’: [873, 484, 1045, 702], ‘attributes’: {‘rotation’: 0, ‘occluded’: 0}}],…

      • For classification, metadata is expected to have, for example, …{‘label’: ‘Person’, ‘score’: 0.5}…

Error Handling

  1. If connection to MQTT broker is successful, messages are published to the broker.

  2. If there are connection issues with MQTT broker, messages will not be published to the broker.

  3. Reconnection is automatically attempted when connection is lost. The time between successive reconnect attempts starts with 1s and doubles for every attempt until a max of 30s is reached after which it will always be 30s.

  4. If connection is re-established, subsequent messages will be published to the broker.

  5. Publishing to EII Message bus remains unimpacted.

Secure Publishing

MQTT publishing to broker could be over a secure communication channel providing encryption and authentication over TLS.

Follow the below steps to establish a secure connection with MQTT broker,

  1. Generate Certificates

    CA (Certificate Authority), client and server certificates need to be generated which will be used for configuring MQTT broker and EVAM.

    Below script can be used for generating certificates using openssl. (Command reference: https://mosquitto.org/man/mosquitto-tls-7.html).

    • Make sure to edit the <IP address of broker> in the script. This will be the broker address.

    • Executing the below script with ask for a password for ca.key and the same password to be used again when prompter during signing step.

      echo "Creating CA Key and Certificate"
      openssl req -new -x509 -days 365 -extensions v3_ca -keyout ca.key -out ca.crt -subj "/CN=example.com"
      
      echo "Creating Server key"
      openssl genrsa -out server.key 2048
      
      echo "Creating Server Certificate signing request"
      openssl req -subj "/CN=<IP address of broker>" -out server.csr -key server.key -new
      
      echo "Signing Server certificate"
      openssl x509 -req -in server.csr -extfile <(echo "subjectAltName=IP:<IP address of broker>") -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 365
      
      echo "Creating Client key"
      openssl genrsa -out client.key 2048
      
      echo "Creating Client Certificate signing request"
      openssl req -subj "/" -out client.csr -key client.key -new
      
      echo "Signing Client certificate"
      openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out client.crt -days 365
      
      mkdir -p client
      mv client.* client/
      
      mkdir -p server
      mv server.* server/
      

      Once the certificates are generated, make sure to move the certificates to right location. Make sure echo $PWD shows the root of EVAM repository.

      • Move ca.crt, server/server.crt, server/server.key to $PWD/mosquitto/certificates

        mkdir $PWD/mosquitto/certificates
        mkdir $PWD/mosquitto/certificates/server
        cp <path to ca.crt> $PWD/mosquitto/certificates
        cp <path to server/server.crt> $PWD/mosquitto/certificates/server
        cp <path to server/server.key> $PWD/mosquitto/certificates/server
        
      • Move ca.crt, client/client.crt, client/client.key to $PWD/certificates

        mkdir $PWD/certificates
        mkdir $PWD/certificates/client
        cp <path to ca.crt> $PWD/certificates
        cp <path to client/client.crt> $PWD/certificates/client
        cp <path to client/client.key> $PWD/certificates/client
        
      • Change permission for client.key in $PWD/certificates/client

        sudo chmod 644 $PWD/certificates/client/client.key
        
  2. Configure and start MQTT broker

    • Modify the mosquitto.config([WORK_DIR]/IEdgeInsights/EdgeVideoAnalyticsMicroservice/mosquitto/mosquitto.conf) file as below.

      allow_anonymous true
      
      listener 8883
      
      cafile /mosquitto/config/certificates/ca.crt
      keyfile /mosquitto/config/certificates/server/server.key
      certfile /mosquitto/config/certificates/server/server.crt
      
      require_certificate true
      

      With the above configuration, the broker listens on port 8883 and is set up for mutual authentication.

      • Start MQTT broker

        docker run -d --name=mqtt_broker -p 8883:8883 -v $PWD/mosquitto:/mosquitto/config eclipse-mosquitto
        
  3. Configure and start MQTT subscriber

    As broker is configured for secure connection, CA certificate and client certificate/key needs to be provided when starting the subscriber.

    • For example, run the below command to subscribe using mosquitto_sub client,

      sudo apt install mosquitto-clients
      
      mosquitto_sub --topic <topic name> -p 8883 -h <mqtt broker address> --cafile $PWD/certificates/ca.crt --cert $PWD/certificates/client/client.crt --key $PWD/certificates/client/client.key
      
    • Alternatively, Eclipse Paho MQTT Python Client can be used for subscribing to broker and receiving messages.

      Please make sure to update the <topic_name>, <mqtt broker address>, <path to ca.rt>, <path to client.crt>, <path to client.key> in the script before running.

      Make sure to install the python packages:

      pip install paho-mqtt opencv-python numpy
      
      import paho.mqtt.subscribe as subscribe
      
      def on_message(client, userdata, message):
          print("%s %s" % (message.topic, message.payload))
      
      subscribe.callback(on_message, "<topic name>", hostname="<mqtt broker address>", port=8883, tls={'ca_certs': "<path to ca.crt>", 'certfile': "<path to client.crt>", 'keyfile': "<path to client.key>"})
      

      Here is a sample subscriber script for receiving both frame (JPEG encoded) and metadata, printing the metadata and saving the frame as jpg image.

      import paho.mqtt.subscribe as subscribe
      import json
      import base64
      import numpy as np
      import cv2
      
      def on_message(client, userdata, message):
          print("Receiving frame and metadata")
          msg = json.loads(message.payload)
      
          metadata = msg[0]
          print("Metadata:", metadata)
      
          output_file_name = "output_" + str(metadata['frame_id']) + ".jpg"
          print("Saving frame to {}".format(output_file_name))
          img = base64.b64decode(msg[1].encode("utf-8"))
          img = np.frombuffer(img, dtype="uint8")
          img = cv2.imdecode(img, cv2.IMREAD_UNCHANGED)
          cv2.imwrite(output_file_name, img)
      
      subscribe.callback(on_message, "<topic_name>", hostname="<mqtt broker address>", port=8883, tls={'ca_certs': "<path to ca.crt>", 'certfile': "<path to client.crt>", 'keyfile': "<path to client.key>"})
      
  4. Configure EVAM for establishing secure connection with MQTT broker

    Modify config.json([WORK_DIR]/IEdgeInsights/EdgeVideoAnalyticsMicroservice/eii/config.json)

    "mqtt_publisher": {
      "host": "<mqtt broker address>",
      "port": 8883,
      "tls": {
          "ca_cert": "/MqttCerts/ca.crt",
          "client_key": "/MqttCerts/client/client.key",
          "client_cert": "/MqttCerts/client/client.crt"
      }
    }