Skip to content

MQTT

EdgeX - Levski Release

Overview

In this example, we use a script to simulate a custom-defined MQTT device, instead of a real device. This provides a straight-forward way to test the device-mqtt features using an MQTT-broker.

MQTT Overview

Note

Multi-Level Topics move metadata (i.e. device name, command name,... etc) from the payload into the MQTT topics. Notice the sections marked with Using Multi-level Topic: for relevant input/output throughout this example.

Prepare the Custom Device Configuration

In this section, we create folders that contain files required for deployment of a customized device configuration to work with the existing device service:

- custom-config
  |- devices
     |- my.custom.device.config.yaml
  |- profiles
     |- my.custom.device.profile.yml

Device Configuration

Use this configuration file to define devices and schedule jobs. device-mqtt generates a relative instance on start-up.

Create the device configuration file, named my.custom.device.config.yaml, as shown below:

# Pre-define Devices
deviceList:
  name: "my-custom-device"
  profileName: "my-custom-device-profile"
  description: "MQTT device is created for test purpose"
  labels: 
    - "MQTT"
    - "test"
  protocols:
    mqtt:
       CommandTopic: "command/my-custom-device"
  autoEvents:
   interval: "30s"
   onChange: false
   sourceName: "message"

Note

CommandTopic is used to publish the GET or SET command request

Device Profile

The DeviceProfile defines the device's values and operation method, which can be Read or Write.

Create a device profile, named my.custom.device.profile.yml, with the following content:

name: "my-custom-device-profile"
manufacturer: "iot"
model: "MQTT-DEVICE"
description: "Test device profile"
labels:
  - "mqtt"
  - "test"
deviceResources:
  -
    name: randnum
    isHidden: true
    description: "device random number"
    properties:
      valueType: "Float32"
      readWrite: "R"
  -
    name: ping
    isHidden: true
    description: "device awake"
    properties:
      valueType: "String"
      readWrite: "R"
  -
    name: message
    isHidden: false
    description: "device message"
    properties:
      valueType: "String"
      readWrite: "RW"
  -
    name: json
    isHidden: false
    description: "JSON message"
    properties:
      valueType: "Object"
      readWrite: "RW"
      mediaType: "application/json"

deviceCommands:
  -
    name: values
    readWrite: "R"
    isHidden: false
    resourceOperations:
        - { deviceResource: "randnum" }
        - { deviceResource: "ping" }
        - { deviceResource: "message" }

Prepare docker-compose file

  1. Clone edgex-compose
    $ git clone git@github.com:edgexfoundry/edgex-compose.git
    $ cd edgex-compose
    $ git checkout main
    
    !!! note Use main branch until levski is released.
  2. Generate the docker-compose.yml file (notice this includes mqtt-broker)
    $ cd compose-builder
    $ make gen ds-mqtt mqtt-broker no-secty ui
    
  3. Check the generated file
    $ ls | grep 'docker-compose.yml'
    docker-compose.yml
    

Mount the custom-config

Create a docker-compose file docker-compose.override.yml to extend the compose file which generated by the compose-builder. In this file, we add volume path and environment variables as shown below:

 # docker-compose.override.yml

 version: '3.7'

 services:
     device-mqtt:
        environment:
          DEVICE_DEVICESDIR: /custom-config/devices
          DEVICE_PROFILESDIR: /custom-config/profiles
        volumes:
        - /path/to/custom-config:/custom-config

Note

Replace the /path/to/custom-config in the example with the correct path

Start EdgeX Foundry on Docker

Deploy EdgeX using the following commands:

$ cd edgex-compose/compose-builder
$ docker compose pull
$ docker compose -f docker-compose.yml -f docker-compose.override.yml up -d

Using a MQTT Device Simulator

Overview

MQTT Device Service

Expected Behaviors

Using the detailed script below as a simulator, there are three behaviors:

  1. Publish random number data every 15 seconds.

The simulator publishes the data to the MQTT broker with topic incoming/data/my-custom-device/values and the message is similar to the following:

{
  "randnum" : 4161.3549,
  "ping"    : "pong",
  "message" : "Hello World"
}
  1. Receive the reading request, then return the response.

    1. The simulator receives the request from the MQTT broker, the topic is command/my-custom-device/randnum/get/293d7a00-66e1-4374-ace0-07520103c95f and message returned is similar to the following:

      {"randnum":"42.0"}
      
    2. The simulator returns the response to the MQTT broker, the topic is command/response/# and the message is similar to the following:

      {"randnum":"4.20e+01"}
      
  2. Receive the set request, then change the device value.

    1. The simulator receives the request from the MQTT broker, the topic is command/my-custom-device/testmessage/set/293d7a00-66e1-4374-ace0-07520103c95f and the message is similar to the following:

      {"message":"test message..."}
      
    2. The simulator changes the device value and returns the response to the MQTT broker, the topic is command/response/# and the message is similar to the following:

      {"message":"test message..."}
      

Creating and Running a MQTT Device Simulator

To implement the simulated custom-defined MQTT device, create a javascript, named mock-device.js, with the following content:

function getRandomFloat(min, max) {
    return Math.random() * (max - min) + min;
}

const deviceName = "my-custom-device";
let message = "test-message";
let json = {"name" : "My JSON"};

// DataSender sends async value to MQTT broker every 15 seconds
schedule('*/15 * * * * *', ()=>{
    var data = {};
    data.randnum = getRandomFloat(25,29).toFixed(1);
    data.ping = "pong"
    data.message = "Hello World"

    publish( 'incoming/data/my-custom-device/values', JSON.stringify(data));
});

// CommandHandler receives commands and sends response to MQTT broker
// 1. Receive the reading request, then return the response
// 2. Receive the set request, then change the device value
subscribe( "command/my-custom-device/#" , (topic, val) => {
    const words = topic.split('/');
    var cmd = words[2];
    var method = words[3];
    var uuid = words[4];
    var response = {};
    var data = val;

    if (method == "set") {
        switch(cmd) {
            case "message":
                message = data[cmd];
                break;
            case "json":
                json = data[cmd];
                break;
        }
    }else{
        switch(cmd) {
            case "ping":
                response.ping = "pong";
                break;
            case "message":
                response.message = message;
                break;
            case "randnum":
                response.randnum = 12.123;
                break;
            case "json":
                response.json = json;
                break;
        }
    }
    var sendTopic ="command/response/"+ uuid;
    publish( sendTopic, JSON.stringify(response));
});
To run the device simulator, enter the commands shown below with the following changes:
$ mv mock-device.js /path/to/mqtt-scripts
$ docker run --rm --name=mqtt-scripts \
    -v /path/to/mqtt-scripts:/scripts  --network host \
    dersimn/mqtt-scripts --dir /scripts

Note

Replace the /path/to/mqtt-scripts in the example mv command with the correct path

Then the mqtt-scripts show logs as below:

2022-08-12 09:52:42.086 <info>  mqtt-scripts 1.2.2 starting
2022-08-12 09:52:42.227 <info>  mqtt connected mqtt://127.0.0.1
2022-08-12 09:52:42.733 <info>  /scripts/mock-device.js loading

Execute Commands

Now we're ready to run some commands.

Find Executable Commands

Use the following query to find executable commands:

$ curl http://localhost:59882/api/v3/device/all | json_pp

{
   "deviceCoreCommands" : [
      {
         "profileName" : "my-custom-device-profile",
         "coreCommands" : [
            {
               "name" : "values",
               "get" : true,
               "path" : "/api/v3/device/name/my-custom-device/values",
               "url" : "http://edgex-core-command:59882",
               "parameters" : [
                  {
                     "resourceName" : "randnum",
                     "valueType" : "Float32"
                  },
                  {
                     "resourceName" : "ping",
                     "valueType" : "String"
                  },
                  {
                     "valueType" : "String",
                     "resourceName" : "message"
                  }
               ]
            },
            {
               "url" : "http://edgex-core-command:59882",
               "parameters" : [
                  {
                     "resourceName" : "message",
                     "valueType" : "String"
                  }
               ],
               "name" : "message",
               "get" : true,
               "path" : "/api/v3/device/name/my-custom-device/message",
               "set" : true
            },
            {
                "name": "json",
                "get": true,
                "set": true,
                "path": "/api/v3/device/name/MQTT-test-device/json",
                "url" : "http://edgex-core-command:59882",
                "parameters": [
                    {
                        "resourceName": "json",
                        "valueType": "Object"
                    }
                ]
            }
         ],
         "deviceName" : "my-custom-device"
      }
   ],
   "apiVersion" : "v2",
   "statusCode" : 200
}

Execute SET Command

Execute a SET command according to the url and parameterNames, replacing [host] with the server IP when running the SET command.

$ curl http://localhost:59882/api/v3/device/name/my-custom-device/message \
    -H "Content-Type:application/json" -X PUT  \
    -d '{"message":"Hello!"}'

Execute GET Command

Execute a GET command as follows:

$ curl http://localhost:59882/api/v3/device/name/my-custom-device/message | json_pp

{
   "apiVersion":"v2",
   "event":{
      "apiVersion":"v2",
      "deviceName":"my-custom-device",
      "id":"13164041-2e6c-4454-9bc3-8e8987e85311",
      "origin":1660298227470009014,
      "profileName":"my-custom-device-profile",
      "readings":[
         {
            "deviceName":"my-custom-device",
            "id":"c58e65b4-62f0-4e41-b368-645993ec0bfd",
            "origin":1660298227470005426,
            "profileName":"my-custom-device-profile",
            "resourceName":"message",
            "value":"Hello!",
            "valueType":"String"
         }
      ],
      "sourceName":"message"
   },
   "statusCode":200
}

Schedule Job

The schedule job is defined in the autoEvents section of the device definition file:

    autoEvents:
       Interval: "30s"
       OnChange: false
       SourceName: "message"

After the service starts, query core-data's reading API. The results show that the service auto-executes the command every 30 secs, as shown below:

$ curl http://localhost:59880/api/v3/reading/resourceName/message | json_pp

{
   "statusCode" : 200,
   "readings" : [
      {
         "value" : "test-message",
         "id" : "e91b8ca6-c5c4-4509-bb61-bd4b09fe835c",
         "resourceName" : "message",
         "origin" : 1624418361324331392,
         "profileName" : "my-custom-device-profile",
         "deviceName" : "my-custom-device",
         "valueType" : "String"
      },
      {
         "resourceName" : "message",
         "value" : "test-message",
         "id" : "1da58cb7-2bf4-47f0-bbb8-9519797149a2",
         "deviceName" : "my-custom-device",
         "valueType" : "String",
         "profileName" : "my-custom-device-profile",
         "origin" : 1624418330822988843
      },
      ...
   ],
   "apiVersion" : "v2"
}

Async Device Reading

The device-mqtt subscribes to a DataTopic, which waits for the real device to send value to MQTT broker, then device-mqttparses the value and forward to the northbound.

The data format contains the following values:

  • name = device name
  • cmd = deviceResource name
  • method = get or set
  • cmd = device reading

The following results show that the mock device sent the reading every 15 secs:

$ curl http://localhost:59880/api/v3/reading/resourceName/randnum | json_pp

{
  "readings" : [
    {
        "origin" : 1624418475007110946,
        "valueType" : "Float32",
        "deviceName" : "my-custom-device",
        "id" : "9b3d337e-8a8a-4a6c-8018-b4908b57abb8",
        "resourceName" : "randnum",
        "profileName" : "my-custom-device-profile",
        "value" : "2.630000e+01"
    },
    {
        "deviceName" : "my-custom-device",
        "valueType" : "Float32",
        "id" : "06918cbb-ada0-4752-8877-0ef8488620f6",
        "origin" : 1624418460007833720,
        "profileName" : "my-custom-device-profile",
        "value" : "2.570000e+01",
        "resourceName" : "randnum",
    },
    ...
  ],
"statusCode" : 200,
"apiVersion" : "v2"
}

MQTT Device Service Configuration

MQTT Device Service has the following configurations to implement the MQTT protocol.

Configuration Default Value Description
MQTTBrokerInfo.Schema tcp The URL schema
MQTTBrokerInfo.Host localhost The URL host
MQTTBrokerInfo.Port 1883 The URL port
MQTTBrokerInfo.Qos 0 Quality of Service 0 (At most once), 1 (At least once) or 2 (Exactly once)
MQTTBrokerInfo.KeepAlive 3600 Seconds between client ping when no active data flowing to avoid client being disconnected. Must be greater then 2
MQTTBrokerInfo.ClientId device-mqtt ClientId to connect to the broker with
MQTTBrokerInfo.CredentialsRetryTime 120 The retry times to get the credential
MQTTBrokerInfo.CredentialsRetryWait 1 The wait time(seconds) when retry to get the credential
MQTTBrokerInfo.ConnEstablishingRetry 10 The retry times to establish the MQTT connection
MQTTBrokerInfo.ConnRetryWaitTime 5 The wait time(seconds) when retry to establish the MQTT connection
MQTTBrokerInfo.AuthMode none Indicates what to use when connecting to the broker. Must be one of "none" , "usernamepassword"
MQTTBrokerInfo.CredentialsPath credentials Name of the path in secret provider to retrieve your secrets. Must be non-blank.
MQTTBrokerInfo.IncomingTopic DataTopic (incoming/data/#) IncomingTopic is used to receive the async value
MQTTBrokerInfo.ResponseTopic ResponseTopic (command/response/#) ResponseTopic is used to receive the command response from the device
MQTTBrokerInfo.UseTopicLevels false (true) Boolean setting to use multi-level topics
MQTTBrokerInfo.Writable.ResponseFetchInterval 500 ResponseFetchInterval specifies the retry interval(milliseconds) to fetch the command response from the MQTT broker

Note

Using Multi-level Topic: Remember to change the defaults in parentheses in the table above.

Overriding with Environment Variables

The user can override any of the above configurations using environment: variables to meet their requirement, for example:

# docker-compose.override.yml

  version: '3.7'

  services:
    device-mqtt:
      environment:
        MQTTBROKERINFO_CLIENTID: "my-device-mqtt"
        MQTTBROKERINFO_CONNRETRYWAITTIME: "10"
        MQTTBROKERINFO_USETOPICLEVELS: "false"