Skip to main content
Skip table of contents

Messaging with the native mqtt library

Introduction

The ClearBlade Platform supports MQTT, a lightweight IoT messaging protocol. ClearBlade has built an OAuth 2 security model into the MQTT broker to provide out-of-the-box messaging security.

Click here to learn about sending messages.

Purpose

Lightweight messaging is key to moving data around an IoT system at scale.

Workflows

Workflow 1: Upon publishing, a message is processed with code services, which publishes the output.

Workflow 2: Code service is invoked, computation is performed, and output is published.

MQTT

The ClearBlade Platform implements the full specification of MQTT 3.1.1 to allow for multiple quality of service (QoS) settings that best fit a developer's use case.

QoS

Quality of service

Definition

Description

0

At most once

The message is only sent once whether it makes it to the subscriber or not.

1

At least once

The message will be sent one or more times to the subscriber until it is received.

2

Exactly once

The message will be received by the subscriber exactly once.

MQTT ports and their use

Port

TLS

Information

Requirements

1883

False

MQTT Pub/Sub

user-token

1884

True

MQTT Pub/Sub

user-token

8903

False

WebSockets, usually used by browsers/applications (cannot communicate directly over MQTT)

dev-token

8904

True

WebSockets

dev-token

8905

False

Auth over MQTT

SystemKey, Secret, Username, Password, ClientId

8906

True

Auth over MQTT

8907

False

Auth over MQTT via WebSockets

TODO

8908

True

Auth over MQTT via WebSockets

TODO

Working with MQTT

These steps are involved with MQTT:

1. Prerequisites

2. Broker connection

3. MQTT authentication

Prerequisites

You must have a user or device with valid permissions to publish messages.

Please follow the instructions to do so:

1. Log in or create a developer account on a ClearBlade Platform instance.

2. View or create a system.

3. View or create a user.

4. View or assign a role to the user.

5. View the Roles page.

6. Add a topic to the message topics list enabled for that role (for example, mytopic).

Broker connection

The MQTT protocol allows for the connect action to provide a username and password. We will modify the use of those fields to accommodate our OAuth-styled token model.

Key

Value

Example

URL

URL_OF_BROKER

https://platform.clearblade.com/

PORT

PORT

1883

Username

USER_TOKEN

abcdefabcdef01234567890

Password

SYSTEM_KEY

f0cbf0cbf0cbf0cbf0cbf0cbf0cb

ClientID

UNIQUE_CLIENT_ID

sjdbfkasdbf

Duplicate client ID behavior

If two clients connect to a broker with the same client ID, the broker behavior can be configured to be one of the following:

1. The new client can connect, and the existing client is kicked off. Depending on the client firmware, this could result in the two clients alternately connecting and disconnecting repeatedly.

2. The existing client connection is maintained, and the new client is rejected.

The chosen configuration is set when an environment is deployed and applies to all systems (brokers) in the given environment. Configuration #1 conforms to the MQTT specification.

MQTT authentication

A ClearBlade user token is required to communicate with a broker. A token can be obtained via a REST endpoint call or MQTT.

Connect

Required keys

Description

Example values

URL

<PLATFORM_IP>

https://platform.clearblade.com/

Port

<PORT_NUMBER>

8905

Username

<SYSTEM_KEY>

bacb8fb60bb4d7c2c2c0e4bb9701

Password

<SYSTEM_SECRET>

BACB8FB60BFDDB7DB97D7A8BF01

ClientId

<USER_EMAIL>:<PASSWORD> for User, <DEVICE_NAME>:<ACTIVE_KEY> for devices

cbman@clearblade.com:cl34r8l4d3 or temperature-sensor:faqb8fb60bc2c2b1c0e4bb9701

TLS (optional)

<ALLOW_TLS>

true

Subscribe

  • To receive the new JWT token, subscribe to the auth topic (recommended).

  • To receive the legacy token, subscribe to the v/1/auth topic.

Extract token

  • ClearBlade publishes the user token on the auth message topic with bit-level encoding.

  • The payload is in this format:

  • The length block is a 2-byte unsigned 16-bit integer.

  • The data block is UTF-8 encoded bytes.

Block mapping in the above packet structure:

Block-Num

Description

1

Token length

2

Token

3

User ID or device name length

4

User ID or device name

5

Messaging URL length

6

Messaging URL

Once the client is authenticated, the token can be extracted, and the connection can be established.

Error codes

Referring to MQTT 3.1.1 spec

Value

Return code response

Description

0

0x00 Connection Accepted

Connection accepted

1

0x01 Connection Refused, unacceptable protocol version

The server does not support the MQTT protocol level requested by the client

2

0x02 Connection Refused, identifier rejected

The client identifier is correct (UTF-8) but unallowed by the server. A duplicate clientid can cause this

3

0x03 Connection Refused, Server unavailable

The network connection has been made, but the MQTT service is unavailable

4

0x04 Connection Refused, bad user name or password

The data in the user name or password is malformed

5

0x05 Connection Refused, not authorized

The client is not authorized to connect. For example, entering the incorrect password

6-255

Reserved for future use

Message topics

Topics are used to separate messages and keep them organized based on what they are about.

There are three topic types:

Regular topics

Topics comprised of string literals: /devices

Wildcard topics

Wildcard subscription means they are subscribed to multiple topics simultaneously. There are two types:

Single level (+)

The single level wildcard (+) allows a subscription to match the topic path’s entire level.

CODE
/device/+/temp

Topic

Matches

/device/a/temp

/device/b/temp

/device/123/temp

/device/temp

/device/temp/a

Multi-level (#)

The multi-level # wildcard matches with all values following the #.

The following subscribes to every topic:

CODE
#

See this example:

CODE
/device/#

Topic

Matches

/device/1/2/3

/device/a

/abc

/abc/device/

A subscription topic like /Foo/#/Bar is invalid. Once a hash is in the subscription, that hash is the last element in that subscription.

Message Relay

The ClearBlade Message Relay allows edge MQTT clients to communicate with MQTT clients and other edge MQTT clients. To achieve this, a few reserved MQTT topic paths exist to follow. The Message Relay uses the following six topic paths to route messages between edges and their parent platform. Message Relay applies to communication between edges and platforms within one single system.

Message type

Topic path

Edge to Platform

<TOPIC_NAME>/_platform

Platform to Edge

<TOPIC_NAME>/_edge/<NAME_OF_EDGE>

Edge to Edge

<TOPIC_NAME>/_edge/<NAME_OF_EDGE>

Platform to All Broadcast

<TOPIC_NAME>/_broadcast

Edge to All Broadcast

<TOPIC_NAME>/_broadcast

Edge to Edge and Platform

<TOPIC_NAME>/_edgeAndPlatform/<NAME_OF_EDGE>

These topic paths are for sending messages. To receive messages, subscribe to the same topics, except for Edge to Edge Platform (which you need to subscribe to the same topics for platform and edge).

Event topics

Event topics allow for greater scalability when handling events by enabling the platform to send an MQTT message for each event. A system user can subscribe to events by subscribing to a specific topic for an event or using an MQTT wildcard subscription to subscribe to multiple events. This can be done in stream services by using subscribe() and waitForMessage(), as well as with shared subscriptions.

Click here to view the topic paths.

The message payload of each event message consists of the same JSON-encoded object data delivered to an event microservice. Each key in the object corresponds to a key set (by the event) in the req.params input to a microservice. Do the message payload’s JSON.parse(payload) before using the message. Below is an example of working with specific event types called triggers. Add the mqtt native library as a dependency of this service.

Example

CODE
function triggerStreamServiceMqtt(req, resp) {
  const deviceCreatedTopic = "$trigger/device/created";
  const deviceConnectedTopic = "$trigger/messaging/device/connected";
  const errorsTopic = "service/error";

  const messaging = new MQTT.Client();

  function publishErrorMessage(originalErrorMessage) {
      // Error handling function
      messaging.publish(errorsTopic, originalErrorMessage).then(function () {
          resp.error(originalErrorMessage);
      }).catch(function (publishError) {
          const overallErrorMessage = originalErrorMessage + "; also failed to publish to 'errors' topic: " + JSON.stringify(publishError);
          resp.error(overallErrorMessage);
      })
  }

  function onMessage(topic, message) {
    const messagePayload = message.payload;
    const topicArray = topic.split("/");
    const action = topicArray[topicArray.length - 1];
    switch(action) {
      case "created":
        // Code to run if device created
        break;
      case "connected":
        // Code to run if device connected
        break;
      default:
        // Code to run for any other actions
        break;
    }
  }

  messaging.subscribe(deviceCreatedTopic, onMessage).catch(function(subscribeDeviceCreatedError) {
    const subscribeDeviceCreatedErrorMessage = "Failed to subscribe to device-created topic: " + JSON.stringify(subscribeDeviceCreatedError);
    publishErrorMessage(subscribeDeviceCreatedErrorMessage);
  })

  messaging.subscribe(deviceConnectedTopic, onMessage).catch(function(subscribeDeviceConnectedError) {
    const subscribeDeviceConnectedErrorMessage = "Failed to subscribe to device-connected topic: " + JSON.stringify(subscribeDeviceConnectedErrorMessage);
    publishErrorMessage(subscribeDeviceCreatedErrorMessage);
  })
}

Shared subscriptions

Shared subscriptions allow clients to share the same subscription on the MQTT broker. Clients are placed in the same subscription group and will receive the messages in a queue format. The message load of a single topic is distributed among all clients in a subscription group. Multiple shared groups on one topic are not supported in shared subscriptions.

There are three parts to the topic structure for sharing:

$share/<GroupID>/<Topic>

  • $share keyword: A static shared subscription identifier.

  • GroupID: Used to identify a group.

  • Topic: A standard MQTT topic (including wildcards).

Sample topic: $share/StreamServiceGroup/devices/+/+/event

Add the mqtt native library as a dependency of this service.

Example

CODE
function SharedTopic(req, resp) {
  var messaging = new MQTT.Client();
  
  const topic ="topic/+";
  const sharedTopic = "$share/group_id/" + topic;
  const errorsTopic = "service/error";
  
  function publishErrorMessage(originalErrorMessage) {
      // Error handling function
      messaging.publish(errorsTopic, originalErrorMessage).then(function () {
          resp.error(originalErrorMessage);
      }).catch(function (publishError) {
          const overallErrorMessage = originalErrorMessage + "; also failed to publish to topic '" + errorsTopic + "': " + JSON.stringify(publishError);
          resp.error(overallErrorMessage);
      })
  }
  
  function onMessage(topic, message) {
    const messagePayload = message.payload;
    // Message handling code
  }

  messaging.subscribe(sharedTopic, onMessage).catch(function(subscribeError) {
    const subscribeErrorMessage = "Failed to subscribe to topic '" + sharedTopic + "': " + JSON.stringify(subscribeError);
    publishErrorMessage(subscribeErrorMessage);
  })
}

Sync topics

These topics allow users to monitor a deployment. They give the deployment status on edges and sync at runtime. The payload history can be viewed in Notification history under the username on the top right of the page.

Name

Topic

Description

Payload

Sync Status

Sync is successfully deployed, and the status is returned

$notification/sync/status

See Sync status

Sync Error

This error happens during a sync (i.e., the device failed to deploy)

$notification/sync/error

See Sync error

Sync Generic Error

This error happens before a sync takes place (i.e., edge not found)

$notification/sync/generic_error

See Sync generic error

Sync status

CODE
{
    "asset_class": <assetClass>,
    "asset_id":    <assetId>,
    "edge":        <edge>,
    "status":      <syncStatus>
}

Sync error

CODE
{
    "asset_class": <assetClass>,
    "asset_id":    <assetId>,
    "edge":        <edge>,
    "originator":  <originator>,
    "destination": <destination>,
    "message":     <errorMessage>,
    "event":       <crudEvent>,
    "timestamp":   <timestamp>
}

Sync generic error

CODE
{
    "table": <table>,
    "error": <errMsg>
}

FAQ

1. Why can’t I see messages from a topic with a new shared group?

If you have previously subscribed to a topic with one shared group, you cannot see the messages for the same topic with a different shared group. See Shared subscriptions.

2. How can I do load balancing with trigger topics?

Using the topic path $share/<TriggerGroup>/$trigger/#.

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.