Messaging with the native mqtt library
Introduction
The ClearBlade Platform supports MQTT, a lightweight IoT messaging protocol. ClearBlade has built an OAuth 2 security model into the MQTT broker to provide out-of-the-box messaging security.
Click here to learn about sending messages.
Purpose
Lightweight messaging is key to moving data around an IoT system at scale.
Workflows
Workflow 1: Upon publishing, a message is processed with code services, which publishes the output.
Workflow 2: Code service is invoked, computation is performed, and output is published.
MQTT
The ClearBlade Platform implements the full specification of MQTT 3.1.1 to allow for multiple quality of service (QoS) settings that best fit a developer's use case.
QoS
Quality of service | Definition | Description |
---|---|---|
0 | At most once | The message is only sent once whether it makes it to the subscriber or not. |
1 | At least once | The message will be sent one or more times to the subscriber until it is received. |
2 | Exactly once | The message will be received by the subscriber exactly once. |
MQTT ports and their use
Port | TLS | Information | Requirements |
---|---|---|---|
1883 | False | MQTT Pub/Sub | user-token |
1884 | True | MQTT Pub/Sub | user-token |
8903 | False | WebSockets, usually used by browsers/applications (cannot communicate directly over MQTT) | dev-token |
8904 | True | WebSockets | dev-token |
8905 | False | Auth over MQTT | SystemKey, Secret, Username, Password, ClientId |
8906 | True | Auth over MQTT | |
8907 | False | Auth over MQTT via WebSockets | TODO |
8908 | True | Auth over MQTT via WebSockets | TODO |
Working with MQTT
These steps are involved with MQTT:
Prerequisites
You must have a user or device with valid permissions to publish messages.
Please follow the instructions to do so:
1. Log in or create a developer account on a ClearBlade Platform instance.
2. View or create a system.
3. View or create a user.
4. View or assign a role to the user.
5. View the Roles page.
6. Add a topic to the message topics list enabled for that role (for example, mytopic).
Broker connection
The MQTT protocol allows for the connect action to provide a username and password. We will modify the use of those fields to accommodate our OAuth-styled token model.
Key | Value | Example |
---|---|---|
URL | URL_OF_BROKER | |
PORT | PORT | 1883 |
Username | USER_TOKEN | abcdefabcdef01234567890 |
Password | SYSTEM_KEY | f0cbf0cbf0cbf0cbf0cbf0cbf0cb |
ClientID | UNIQUE_CLIENT_ID | sjdbfkasdbf |
Duplicate client ID behavior
If two clients connect to a broker with the same client ID, the broker behavior can be configured to be one of the following:
1. The new client can connect, and the existing client is kicked off. Depending on the client firmware, this could result in the two clients alternately connecting and disconnecting repeatedly.
2. The existing client connection is maintained, and the new client is rejected.
The chosen configuration is set when an environment is deployed and applies to all systems (brokers) in the given environment. Configuration #1 conforms to the MQTT specification.
MQTT authentication
A ClearBlade user token is required to communicate with a broker. A token can be obtained via a REST endpoint call or MQTT.
Connect
Required keys | Description | Example values |
---|---|---|
URL | <PLATFORM_IP> | |
Port | <PORT_NUMBER> | 8905 |
Username | <SYSTEM_KEY> | bacb8fb60bb4d7c2c2c0e4bb9701 |
Password | <SYSTEM_SECRET> | BACB8FB60BFDDB7DB97D7A8BF01 |
ClientId | <USER_EMAIL>:<PASSWORD> for User, <DEVICE_NAME>:<ACTIVE_KEY> for devices | cbman@clearblade.com:cl34r8l4d3 or temperature-sensor:faqb8fb60bc2c2b1c0e4bb9701 |
TLS (optional) | <ALLOW_TLS> | true |
Subscribe
To receive the new JWT token, subscribe to the auth topic (recommended).
To receive the legacy token, subscribe to the v/1/auth topic.
Extract token
ClearBlade publishes the user token on the auth message topic with bit-level encoding.
The payload is in this format:
The length block is a 2-byte unsigned 16-bit integer.
The data block is UTF-8 encoded bytes.
Block mapping in the above packet structure:
Block-Num | Description |
---|---|
1 | Token length |
2 | Token |
3 | User ID or device name length |
4 | User ID or device name |
5 | Messaging URL length |
6 | Messaging URL |
Once the client is authenticated, the token can be extracted, and the connection can be established.
Error codes
Referring to MQTT 3.1.1 spec
Value | Return code response | Description |
---|---|---|
0 | 0x00 Connection Accepted | Connection accepted |
1 | 0x01 Connection Refused, unacceptable protocol version | The server does not support the MQTT protocol level requested by the client |
2 | 0x02 Connection Refused, identifier rejected | The client identifier is correct (UTF-8) but unallowed by the server. A duplicate |
3 | 0x03 Connection Refused, Server unavailable | The network connection has been made, but the MQTT service is unavailable |
4 | 0x04 Connection Refused, bad user name or password | The data in the user name or password is malformed |
5 | 0x05 Connection Refused, not authorized | The client is not authorized to connect. For example, entering the incorrect password |
6-255 | Reserved for future use |
Message topics
Topics are used to separate messages and keep them organized based on what they are about.
There are three topic types:
Regular topics
Topics comprised of string literals: /devices
Wildcard topics
Wildcard subscription means they are subscribed to multiple topics simultaneously. There are two types:
Single level (+)
The single level wildcard (+) allows a subscription to match the topic path’s entire level.
/device/+/temp
Topic | Matches |
---|---|
/device/a/temp | ✓ |
/device/b/temp | ✓ |
/device/123/temp | ✗ |
/device/temp | ✗ |
/device/temp/a | ✗ |
Multi-level (#)
The multi-level # wildcard matches with all values following the #.
The following subscribes to every topic:
#
See this example:
/device/#
Topic | Matches |
---|---|
/device/1/2/3 | ✓ |
/device/a | ✓ |
/abc | ✗ |
/abc/device/ | ✗ |
A subscription topic like /Foo/#/Bar
is invalid. Once a hash is in the subscription, that hash is the last element in that subscription.
Message Relay
The ClearBlade Message Relay allows edge MQTT clients to communicate with MQTT clients and other edge MQTT clients. To achieve this, a few reserved MQTT topic paths exist to follow. The Message Relay uses the following six topic paths to route messages between edges and their parent platform. Message Relay applies to communication between edges and platforms within one single system.
Message type | Topic path |
---|---|
Edge to Platform | <TOPIC_NAME>/_platform |
Platform to Edge | <TOPIC_NAME>/_edge/<NAME_OF_EDGE> |
Edge to Edge | <TOPIC_NAME>/_edge/<NAME_OF_EDGE> |
Platform to All Broadcast | <TOPIC_NAME>/_broadcast |
Edge to All Broadcast | <TOPIC_NAME>/_broadcast |
Edge to Edge and Platform | <TOPIC_NAME>/_edgeAndPlatform/<NAME_OF_EDGE> |
These topic paths are for sending messages. To receive messages, subscribe to the same topics, except for Edge to Edge Platform (which you need to subscribe to the same topics for platform and edge).
Event topics
Event topics allow for greater scalability when handling events by enabling the platform to send an MQTT message for each event. A system user can subscribe to events by subscribing to a specific topic for an event or using an MQTT wildcard subscription to subscribe to multiple events. This can be done in stream services by using subscribe()
and waitForMessage()
, as well as with shared subscriptions.
Click here to view the topic paths.
The message payload of each event message consists of the same JSON-encoded object data delivered to an event microservice. Each key in the object corresponds to a key set (by the event) in the req.params
input to a microservice. Do the message payload’s JSON.parse(payload)
before using the message. Below is an example of working with specific event types called triggers. Add the mqtt native library as a dependency of this service.
Example
function triggerStreamServiceMqtt(req, resp) {
const deviceCreatedTopic = "$trigger/device/created";
const deviceConnectedTopic = "$trigger/messaging/device/connected";
const errorsTopic = "service/error";
const messaging = new MQTT.Client();
function publishErrorMessage(originalErrorMessage) {
// Error handling function
messaging.publish(errorsTopic, originalErrorMessage).then(function () {
resp.error(originalErrorMessage);
}).catch(function (publishError) {
const overallErrorMessage = originalErrorMessage + "; also failed to publish to 'errors' topic: " + JSON.stringify(publishError);
resp.error(overallErrorMessage);
})
}
function onMessage(topic, message) {
const messagePayload = message.payload;
const topicArray = topic.split("/");
const action = topicArray[topicArray.length - 1];
switch(action) {
case "created":
// Code to run if device created
break;
case "connected":
// Code to run if device connected
break;
default:
// Code to run for any other actions
break;
}
}
messaging.subscribe(deviceCreatedTopic, onMessage).catch(function(subscribeDeviceCreatedError) {
const subscribeDeviceCreatedErrorMessage = "Failed to subscribe to device-created topic: " + JSON.stringify(subscribeDeviceCreatedError);
publishErrorMessage(subscribeDeviceCreatedErrorMessage);
})
messaging.subscribe(deviceConnectedTopic, onMessage).catch(function(subscribeDeviceConnectedError) {
const subscribeDeviceConnectedErrorMessage = "Failed to subscribe to device-connected topic: " + JSON.stringify(subscribeDeviceConnectedErrorMessage);
publishErrorMessage(subscribeDeviceCreatedErrorMessage);
})
}
Shared subscriptions
Shared subscriptions allow clients to share the same subscription on the MQTT broker. Clients are placed in the same subscription group and will receive the messages in a queue format. The message load of a single topic is distributed among all clients in a subscription group. Multiple shared groups on one topic are not supported in shared subscriptions.
There are three parts to the topic structure for sharing:
$share/<GroupID>/<Topic>
$share
keyword: A static shared subscription identifier.GroupID: Used to identify a group.
Topic: A standard MQTT topic (including wildcards).
Sample topic: $share/StreamServiceGroup/devices/+/+/event
Add the mqtt native library as a dependency of this service.
Example
function SharedTopic(req, resp) {
var messaging = new MQTT.Client();
const topic ="topic/+";
const sharedTopic = "$share/group_id/" + topic;
const errorsTopic = "service/error";
function publishErrorMessage(originalErrorMessage) {
// Error handling function
messaging.publish(errorsTopic, originalErrorMessage).then(function () {
resp.error(originalErrorMessage);
}).catch(function (publishError) {
const overallErrorMessage = originalErrorMessage + "; also failed to publish to topic '" + errorsTopic + "': " + JSON.stringify(publishError);
resp.error(overallErrorMessage);
})
}
function onMessage(topic, message) {
const messagePayload = message.payload;
// Message handling code
}
messaging.subscribe(sharedTopic, onMessage).catch(function(subscribeError) {
const subscribeErrorMessage = "Failed to subscribe to topic '" + sharedTopic + "': " + JSON.stringify(subscribeError);
publishErrorMessage(subscribeErrorMessage);
})
}
Sync topics
These topics allow users to monitor a deployment. They give the deployment status on edges and sync at runtime. The payload history can be viewed in Notification history under the username on the top right of the page.
Name | Topic | Description | Payload |
---|---|---|---|
Sync Status | Sync is successfully deployed, and the status is returned |
| See Sync status |
Sync Error | This error happens during a sync (i.e., the device failed to deploy) |
| See Sync error |
Sync Generic Error | This error happens before a sync takes place (i.e., edge not found) |
|
Sync status
{
"asset_class": <assetClass>,
"asset_id": <assetId>,
"edge": <edge>,
"status": <syncStatus>
}
Sync error
{
"asset_class": <assetClass>,
"asset_id": <assetId>,
"edge": <edge>,
"originator": <originator>,
"destination": <destination>,
"message": <errorMessage>,
"event": <crudEvent>,
"timestamp": <timestamp>
}
Sync generic error
{
"table": <table>,
"error": <errMsg>
}
FAQ
1. Why can’t I see messages from a topic with a new shared group?
If you have previously subscribed to a topic with one shared group, you cannot see the messages for the same topic with a different shared group. See Shared subscriptions.
2. How can I do load balancing with trigger topics?
Using the topic path $share/<TriggerGroup>/$trigger/#
.