Skip to content

EdgeX MessageBus

Introduction

EdgeX has an internal message bus referred to as the EdgeX MessageBus , which is used for internal communications between EdgeX services. An EdgeX Service is any Core/Support/Application/Device Service from EdgeX or any custom Application or Device Service built with the EdgeX SDKs.

The following diagram shows how each of the EdgeX Service use the EdgeX MessageBus.

[Insert Image of EdgeX MessageBus]

The EdgeX MessageBus is meant for internal EdgeX service to service communications. It is not meant as an entry point for external services to communicate with the internal EdgeX services. The eKuiper Rules Engine is an exception to this as it is tightly integrated with EdgeX.

The EdgeX services intended as external entry points are:

  • REST API on all the EdgeX services - Accessed directly in non-secure mode or via the API Gateway when running in secure mode

  • App Service using External MQTT Trigger - An App Service configured to use the External MQTT Trigger will accept data from external services on an "external" MQTT connection

  • App Service using HTTP Trigger - An App Service configured to use the HTTP Trigger will accept data from external services on an "external" REST connection. Accessed in the same manner as other EdgeX REST APIs.

  • App Service using Custom Trigger - An App Service configured to use a Custom Trigger can accept data from external services or over additional protocols with few limitations. See Custom Trigger Example for an example.

  • Core Command External MQTT Connection - Core Command now receives command requests and publishes responses via an external MQTT connection that is separate from the EdgeX MessageBus. The requests are forwarded to the EdgeX MessageBus and the corresponding responses are forwarded back to the external MQTT connection.

Originally the EdgeX MessageBus was only used to send Event/Readings from Core Data to the Application Services layer. In recent V2 releases more services are now using the EdgeX MessageBus rather than REST for inner service communication.

  • Device Services publish Event/Readings directly to the EdgeX MessageBus rather than sending them via REST to Core Data.
  • Service Metrics are published to the EdgeX MessageBus
  • System Events are published to the EdgeX MessageBus.
  • Command Request/Reponses are now published to the EdgeX MessageBus by Core Command and Devices Services.

This trend away from REST to the EdgeX MessageBus for inner service communication continues. In the future, Device Services will receive Device System Events via the EdgeX MessageBus rather than the current REST callbacks used when devices are added/updated/deleted in Core Metadata.

Message Envelope

All messages published to the EdgeX MessageBus are wrapped in a MessageEnvelope. This envelope contains metadata describing the message payload, such as the payload Content Type (JSON or CBOR), Correlation Id, etc.

Note

Unless noted below, the MessageEnvelope is JSON encoded when publishing it to the EdgeX MessageBus. This does result in the MessageEnvelope's payload being double encoded.

Implementations

The EdgeX MessageBus is defined by the message bus abstraction implemented in go-mod-messaging. This module defines an abstract client API which currently has five implementations of the API for the different underlying message bus protocols.

Common Configuration

Each service that uses the EdgeX MessageBus has a configuration section which defines which implementation to use and how to connect and configure the specific underlying protocol client. This section is [Trigger.EdgexMessageBus] for Application Services and[MessageQueue] for Core Data, Core Metadata and Device Services. The Type setting specifies which of the following implementations to use.

  • Redis Pub/Sub (default) - Type=redis
  • MQTT 3.1 - Type=mqtt
  • NATS Core - Type=nats-core
  • NATS JetStream - Type=nats-jetstream
  • ZeroMQ (DEPRECATED) - Type=zero

Note

In general all EdgeX Services running in a deployment must be configured to use the same EdgeX MessageBus implementation. By default all services that use the EdgeX MessageBus are configured to use the Redis Pub/Sub implementation. NATS does support a compatibility mode with MQTT. See the NATS MQTT Mode section below for details.

Redis Pub/Sub

As stated above this is the default implementation that all EdgeX Services are configured to use. It takes advantage of the existing Redis DB instance for the broker. Redis Pub/Sub is a fire and forget protocol, so delivery is not guaranteed. If more robustness is required, use the MQTT or NATS implementations.

Configuration

See Common Configuration section above for the common configuration elements for all implementations.

Security Configuration
Option Default Value Description
AuthMode usernamepassword Mode of authentication to use. Values are none, usernamepassword
, clientcert, or cacert
SecretName redisb Secret name used to look up credentials in the service's SecretStore
Additional Configuration

This implementation does not have any additional configuration.

MQTT 3.1

Robust message bus protocol, which has additional configuration options for robustness and requires an additional MQTT Broker to be running. See MQTT Spec for more details on this protocol.

Configuration

See Common Configuration section above for the common configuration elements for all implementations.

Security Configuration
Option Default Value Description
AuthMode none Mode of authentication to use. Values are none, usernamepassword, clientcert, or cacert. The MQTT Broker is currently not secured in secure mode.
SecretName blank Secret name used to look up credentials in the service's SecretStore
Additional Configuration
Option Default Value Description
ClientId service key Unique name of the client connecting to the MQTT broker
Qos 0 Quality of Service level
0: At most once delivery
1: At least once delivery
2: Exactly once delivery
See the MQTT QOS Spec for more details
KeepAlive 10 Maximum time interval in seconds that is permitted to elapse between the point at which the client finishes transmitting one control packet and the point it starts sending the next. If exceeded, the broker will close the client connection
Retained false If true, Server MUST store the Application Message and its QoS, so that it can be delivered to future subscribers whose subscriptions match its topic name. See Retained Messages for more details.
AutoReconnect true If true, automatically attempts to reconnect to the broker when connection is lost
ConnectTimeout 30 Timeout in seconds for the connection to the broker to be successful
CleanSession false if true, Server MUST discard any previous Session and start a new one. This Session lasts as long as the Network Connection

NATS

EdgeX 2.3

The NATS implementation is new for EdgeX 2.3

NATS is a high performance messaging system that offers some interesting options for local deployments. It uses a lightweight text-based protocol notably similar to http. This protocol includes full header support that can allow conveyance of the EdgeX MessageEnvelope across service boundaries without the need for double-encoding if all services in the deployment are using NATS. Currently services must be specially built with the include_nats_messaging tag to enable this option.

NATS Core

An ordinary NATS server uses interest, or existence of a client subscription, as the basis for subject availability on the server. This makes Publish a fire and forget operation much like Redis, and gives the system an at most once quality of service.

NATS JetStream

The JetStream persistence layer binds NATS subjects to persistent streams which enables the server to collect messages for subjects that have no registered interest, and allows support for at least once quality of service. Notably, services running in core-nats mode can still subscribe and publish to jetstream-enabled subjects without the additional overhead associated with publish acknowledgement.

Configuration

See Common Configuration section above for the common configuration elements for all implementations.

Security Configuration
Option Default Value Description
AuthMode none Mode of authentication to use. Values are none, usernamepassword, clientcert, or cacert. The NATS Server is currently not secured in secure mode.
SecretName blank Secret name used to look up credentials in the service's SecretStore
NKeySeedFile blank Path to a seed file to use for authentication. See the NATS documentation for more detail
CredentialsFile blank Path to a credentials file to use for authentication. See the NATS documentation for more detail
Additional Configuration
Option Default Value Description
ClientId service key Unique name of the client connecting to the NATS Server
Format nats Format of the actual message published. Valid values are:
- nats : Metadata from the MessageEnvlope are put into the NATS header and the payload from the MessageEnvlope is published as is. Preferred format when all services are using NATS
- json : JSON encodes the MessageEnvelope and publish it as the message. Use this format for compatibility when other services using MQTT 3.1 and running the NATS Server in MQTT mode.
ConnectTimeout 30 Timeout in seconds for the connection to the broker to be successful
RetryOnFailedConnect false Retry on connection failure - expects a string representation of a boolean
QueueGroup blank Specifies a queue group to distribute messages from a stream to a pool of worker services
Durable blank Specifies a durable consumer should be used with the given name. Note that if a durable consumer with the specified name does not exist it will be considered ephemeral and deleted by the client on drain / unsubscribe (JetStream only)
Subject blank Specifies the subject for subscribing stream if a Durable is not specified - will also be formatted into a stream name to be used on subscription. This subject is used for auto-provisioning the stream if needed as well and should be configured with the 'root' topic common to all subscriptions (eg edgex/#) to ensure that all topics on the bus are covered. (JetStream only)
AutoProvision false Automatically provision NATS streams. (JetStream only)
Deliver new Specifies delivery mode for subscriptions - options are "new", "all", "last" or "lastpersubject". See the NATS documentation for more detail (JetStream only)
DefaultPubRetryAttempts 2 Number of times to attempt to retry on failed publish (JetStream only)

Resource Provisioning with nats-box

While the SDK will attempt to auto-provision streams needed if configured to do so, if you need specific features or policies enabled it is generally best to provision your own. A nats-box docker image is available preloaded with various utilities to make this easier.

For information on stream provisioning using the nats cli see here.

For nkey generation a utility called nk is provided with nats-box. For generating nkey seed files see here.

For credential management a utility called nsc is provided with nats-box. For using credentials files see documentation on resolvers and the companion memory resolver tutorial.

NATS MQTT Mode

A JetStream enabled server can support MQTT connections on the same set of underlying subjects. This can be especially useful if you are using prebuilt EdgeX services like device-onvif-camera but want to transition your system towards using NATS. Note that format=json must be used so that the NATS messagebus client can read the double-encoded envelopes sent by MQTT clients. For more information see NATS MQTT Documentation.

ZeroMQ (DEPRECATED)

ZeroMQ is a broker-less TCP based message bus protocol. Since it is broker-less, there can only a be a single publisher and many subscribers. Once Device Services also started publishing to the EdgeX MessageBus, ZeroMQ became unusable, thus it has been deprecated and will be removed in the next major release.

Multi-level topics and wildcards

The EdgeX MessageBus uses multi-level topics and wildcards to allow filtering of data via subscriptions and has standardized on a MQTT like scheme. See MQTT multi-level topics and wildcards for more information.

The Redis implementation converts the Redis Pub/Sub multi-level topic scheme to one similar to MQTT. In Redis Pub/Sub the "." is used as a level separator and the "*" is used as a wildcard. These are converted to "/" and "#" respectively, which are used by MQTT. MQTT additionally uses the "+" as a wildcard for single levels and the "#" for multi-level (only at the end). The Redis implementation uses the "#" for both the single and multi-level wildcard.

Note

The inconsistency with how the Redis implementation handles multi-level topics and wildcards not being 100% compliant with the MQTT scheme will be resolved in the next major release.

The NATS implementations convert the NATS multi-level topic scheme to match that of MQTT. In NATS "." is used as a level separator, "*" is used as the single level wildcard and ">" is used for the multi-level wild card. These are converted to "/", "+" and "#" respectively, which are compliant with the MQTT scheme.

Example Multi-level topics and wildcards for EdgeX MessageBus - Redis implementation

  • edgex/events/#

    All events coming from any device service or core data for any device profile, device or source

  • edgex/events/device/#

    All events coming from any device service for any device profile, device or source

  • edgex/events/#/#/camera-001/#

    Events coming from any device service or core data for any device profile, but only for the device "camera-001" and for any source

  • edgex/events/device/onvif/#/status

    Events coming from any device service for only the device profile "onvif", and any device and only for the source "status"

Example Multi-level topics and wildcards for EdgeX MessageBus - MQTT 3.1 and NATS implementations

  • edgex/events/#

    All events coming from any device service or core data for any device profile, device or source

  • edgex/events/device/#

    All events coming from any device service for any device profile, device or source

  • edgex/events/+/+/camera-001/#

    Events coming from any device service or core data for any device profile, but only for the device "camera-001" and for any source

  • edgex/events/device/onvif/+/status

    Events coming from any device service for only the device profile "onvif", and any device and only for the source "status"

Deployment

Redis Pub/Sub (default)

All EdgeX services are capable of using the Redis Pub/Sub without any changes to configuration. The released compose files and snaps use Redis Pub/Sub.

MQTT 3.1

All EdgeX services are capable of using MQTT 3.1 by simply making changes to each service's configuration.

Note

As mentioned above, the MQTT 3.1 implementation requires the addition of a MQTT Broker service to be running.

Configuration Changes

Example MQTT Configurations changes for Core/Support/Device Services

The following MessageQueue configuration settings must be changed for all EdgeX Core/Support/Device Services to use MQTT 3.1

[MessageQueue]
Type = "mqtt"
Protocol = "tcp" 
Host = "localhost" # in docker this must be overriden to be the docker host name of the MQTT Broker
Port = 1883
AuthMode = "none"  # Currently in secure mode the MQTT Broker is not secured
  [MessageQueue.Optional]
  ClientId ="<service-key>" # must be unique name of the service, thus the service key (core-data, etc) is used
  Qos          =  "0" # Quality of Sevice values are 0 (At most once), 1 (At least once) or 2 (Exactly once)
  KeepAlive    =  "10" # Seconds (must be 2 or greater)
  Retained     = "false"
  AutoReconnect  = "true"
  ConnectTimeout = "5" # Seconds

Example MQTT Configurations changes for Application Services

The following Trigger.EdgexMessageBus configuration settings must be changed for all EdgeX Application Services to use MQTT 3.1

[Trigger]
Type="edgex-messagebus"
  [Trigger.EdgexMessageBus]
  Type = "mqtt"
    [Trigger.EdgexMessageBus.SubscribeHost]
    Protocol = "tcp"
    Host = "localhost" # in docker this must be overriden to be the docker host name of the MQTT Broker
    Port = 1883
    [Trigger.EdgexMessageBus.PublishHost]
    Protocol = "tcp"
    Host = "localhost" # in docker this must be overriden to be the docker host name of the MQTT Broker
    Port = 1883
    [Trigger.EdgexMessageBus.Optional]
    ClientId ="<service-key>" # must be unique name of the service, thus the service key (app-rule-engine, etc) is used
    Qos          =  "0" # Quality of Sevice values are 0 (At most once), 1 (At least once) or 2 (Exactly once)
    KeepAlive    =  "10" # Seconds (must be 2 or greater)
    Retained     = "false"
    AutoReconnect  = "true"
    ConnectTimeout = "5" # Seconds
    authmode = "none"  # Currently in secure mode the MQTT Broker is not secured

Docker

The EdgeX Compose Builder utility provides an option to easily generate a compose file with all the selected services re-configured for MQTT 3.1 using environment overrides. This is accomplished by using the mqtt-bus option. See Compose Builder README for details on all available options.

Example Secure mode compose generation for MQTT 3.1

make gen ds-virtual ds-rest mqtt-bus

Non-secure mode compose generation for MQTT 3.1

make gen no-secty ds-virtual ds-rest mqtt-bus

Note

The run command can be used to generate and run the compose file in one command, but any changes made to the generated compose file will be overridden the next time run is used. An alternative is to use the up command, which runs the latest generated compose file with any modifications that may have been made.

Snaps

For Snap deployment, each services' configuration has to modified manually or via environment overrides after install. For more details see the Configuration section in the Snaps getting started guide.

NATS

The EdgeX Go based services are not capable of using the NATS implementation without being rebuild using the include_nats_messaging build tag. Any EdgeX Core/Support/Go Device/Application Service targeted to use NATS in a deployment must have the Makefile modified to add this build flag. The service can then be rebuild for native and/or Docker.

Core Data make target modified to include NATS

cmd/core-data/core-data:
    $(GOCGO) build -tags "include_nats_messaging $(NON_DELAYED_START_GO_BUILD_TAG_FOR_CORE)" $(CGOFLAGS) -o $@ ./cmd/core-data

Note

The C Device SDK does not currently have a NATS implementation, so C Devices can not be used with the NATS based EdgeX MessageBus.

Configuration Changes

Example NATS Configurations changes for Core/Support/Device Services

The following MessageQueue configuration settings must be changed for all EdgeX Core/Support/Device Services to use NATS Jetstream

[MessageQueue]
Type = "nats-jetstream"
Protocol = "tcp" 
Host = "localhost" # in docker this must be overriden to be the docker host name of the MQTT Broker
Port = 4222
AuthMode = "none"  # Currently in secure mode the NATS server is not secured
  [MessageQueue.Optional]
  ClientId ="<service-key>" # must be unique name of the service, thus the service key (core-data, etc) is used
  # Connection information
  Format =  "nats" # Use 'json' for backward compatability with services using MQTT
  ConnectTimeout = "5" # Seconds
  RetryOnFailedConnect = "true"
  QueueGroup = "" 
  Durable =  "" # Jetstream only
  AutoProvision = "true" # Jetstream only
  Deliver = "new" # Jetstream only

Example NATS Configurations changes for Application Services

The following Trigger.EdgexMessageBus configuration settings must be changed for all EdgeX Application Services to use NATsJetstream

[Trigger]
Type="edgex-messagebus"
  [Trigger.EdgexMessageBus]
  Type = "nats-jetstream"
    [Trigger.EdgexMessageBus.SubscribeHost]
    Protocol = "tcp"
    Host = "localhost" # in docker this must be overriden to be the docker host name of the MQTT Broker
    Port = 4222
    [Trigger.EdgexMessageBus.PublishHost]
    Protocol = "tcp"
    Host = "localhost" # in docker this must be overriden to be the docker host name of the MQTT Broker
    Port = 4222
    [Trigger.EdgexMessageBus.Optional]
    ClientId ="<service-key>" # must be unique name of the service, thus the service key (app-rule-engine, etc) is used
    Format =  "nats" # Use 'json' for backward compatability with services using MQTT
    ConnectTimeout = "5" # Seconds
    RetryOnFailedConnect = "true"
    QueueGroup = "" 
    Durable =  "" # Jetstream only
    AutoProvision = "true" # Jetstream only
    Deliver = "new" # Jetstream only
    authmode = "none"  # Currently in secure mode the NATS Server is not secured

Docker

The EdgeX Compose Builder utility provides an option to easily generate a compose file with all the selected services re-configured for NATS using environment overrides. This is accomplished by using the nats-bus option. This option configures the services to use the NATS Jetstream implementation. See Compose Builder README for details on all available options. If NATS Core is preferred, simply do a search and replace of nats-jeststream with nats-core in the generated compose file.

Example Secure mode compose generation for NATS

make gen ds-virtual ds-rest nats-bus

Non-secure mode compose generation for NATS

make gen no-secty ds-virtual ds-rest nats-bus

Snaps

The published Snaps are built without NATS included, so the use of NATS in those Snaps is not possible. One could modify the Makefiles as described above and then build and install local snap packages. In this case it would be easier to modify each service's configuration as describe above so that the locally built and installed snaps are already configured for NATS.

ZeroMQ (DEPRECATED)

Since ZeroMQ only allows one publisher, it has been deprecated and will be removed in the next major release. Please deploy using one of the other implementations.