Skip to main content
Skip table of contents

Using gateways with the MQTT bridge

This page explains how gateways can use the MQTT bridge to communicate with ClearBlade IoT Core and publish telemetry events on behalf of bound devices. Before you begin, read Using the MQTT bridge for information on using the MQTT bridge with ClearBlade IoT Core.

Using gateways with the MQTT bridge

1. After you've created and configured the gateway, connect it to ClearBlade IoT Core over the MQTT bridge.

2. Create devices if you haven't already.

3. Optional: Bind the devices to the gateway. When using the MQTT bridge, you only need to bind the devices if they can't generate their JWTs.

4. Optional: Subscribe to the system error topic to get feedback on whether device operations are successful.

5. Attach the devices to the gateway.

6. Use the gateway to relay telemetry, device state, and configuration messages on its devices' behalf.

Gateway messages

After the gateway connects to ClearBlade IoT Core over the MQTT bridge, it can send or receive three message types:

  • Control messages: Attaches a device to the gateway or detaches a device from the gateway. These messages are sent between the gateway and ClearBlade IoT Core. ClearBlade IoT Core accepts control messages only from gateways; if another device type attempts to send a control message, ClearBlade IoT Core closes the connection.

  • Messages from gateways and devices: Can be relayed by the gateway on a device’s behalf or sent directly from the gateway.

  • System error messages: When the gateway is subscribed to the MQTT system error topic on the device’s behalf, ClearBlade IoT Core sends error messages whenever the device encounters an error.

Attaching devices to a gateway

To enable the gateway to proxy device communications with ClearBlade IoT Core, have the gateway publish a QoS 1 /devices/{device_ID_to_attach}/attach control message over the MQTT bridge.

If you configured the gateway to authenticate devices using the devices' JWTs, the attached message’s payload must include the token in JSON format: { "authorization" : "{JWT_token}" }. Otherwise, ClearBlade IoT Core authenticates the device by checking its gateway’s association.

Success response

After the device is authorized, ClearBlade IoT Core sends a PUBACK message to the gateway in response to the attached message. After the gateway receives the PUBACK message, it can publish and subscribe to ClearBlade IoT Core topics on the device’s behalf, such as telemetry or configuration messages.

If a device is already attached when the gateway sends the attached message, ClearBlade IoT Core responds with a PUBACK message.

Detaching devices from the gateway

To detach a device from the gateway, have the gateway publish a QoS 1 /devices/{device_ID}/detach control message over the MQTT bridge. If the device isn't attached when the message is sent, ClearBlade IoT Core ignores the detach control message and sends a PUBACK message.

Troubleshooting

To be notified when a device encounters an error, subscribe the gateway to the MQTT /devices/{gateway_ID}/errors topic using QoS level 0.

These code samples illustrate how to subscribe the gateway to the MQTT/devices/{gateway_ID}/errors topic using QoS level 0:

Node.js

CODE
// const deviceId = `myDevice`;
// const gatewayId = `mygateway`;
// const registryId = `myRegistry`;
// const projectId = `my-project-123`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
/**
 * @see https://clearblade.atlassian.net/wiki/spaces/IC/pages/2210299905/Retargeting+Devices#Production-URL-%2F-URLs for a full URL list
 */
// const mqttBridgeHostname = `us-central1-mqtt.clearblade.com`;
// const mqttBridgePort = 8883;
// const clientDuration = 60000;

const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${gatewayId}`;
console.log(mqttClientId);
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: "unused",
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: "mqtts",
  qos: 1,
  secureProtocol: "TLSv1_2_method",
  ca: [readFileSync(serverCertFile)],
};

// Create a client and connect to the ClearBlade MQTT bridge.
const client = mqtt.connect(connectionArgs);

client.on("connect", (success) => {
  if (!success) {
    console.log("Client not connected...");
  } else {
    setTimeout(() => {
      // Subscribe to gateway error topic.
      client.subscribe(`/devices/${gatewayId}/errors`, { qos: 0 });

      attachDevice(deviceId, client);

      setTimeout(() => {
        console.log("Closing connection to MQTT. Goodbye!");
        client.end(true);
      }, clientDuration); // Safely detach the device and close the connection.
    }, 5000);
  }
});

client.on("close", () => {
  console.log("Connection closed");
  shouldBackoff = true;
});

client.on("error", (err) => {
  console.log("error", err);
});

client.on("message", (topic, message) => {
  const decodedMessage = Buffer.from(message, "base64").toString("ascii");

  console.log(`message received on error topic ${topic}: ${decodedMessage}`);
});

client.on("packetsend", () => {
  // Logging packet send is very verbose
});

Python

CODE
import time
import datetime
import ssl
import jwt
import random
import paho.mqtt.client as mqtt

# The initial backoff time after a disconnection occurs in seconds.
minimum_backoff_time = 1

# The maximum backoff time before giving up, in seconds.
MAXIMUM_BACKOFF_TIME = 32

# Whether to wait with exponential backoff before publishing.
should_backoff = False

def create_jwt(project_id, private_key_file, algorithm):
  """Creates a JWT (https://jwt.io) to establish an MQTT connection.
  Args:
    project_id: The cloud project ID this device belongs to
    private_key_file: A path to a file containing an RSA256 or
            ES256 private key.
    algorithm: The encryption algorithm to use (RS256 or ES256)
  Returns:
      A JWT generated from the given project_id and private key, which
      expires in 20 minutes. After 20 minutes, your client will be
      disconnected, and a new JWT will have to be generated.
  Raises:
      ValueError: If the private_key_file does not contain a known key.
  """

  token = {
    # The time that the token was issued at
    "iat": datetime.datetime.now(tz=datetime.timezone.utc),
    # The time the token expires.
    "exp": datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(minutes=20),
    # The audience field should always be set to the GCP project id.
    "aud": project_id,
  }

  # Read the private key file.
  with open(private_key_file, "r") as f:
    private_key = f.read()

  print(
    "Creating JWT using {} from private key file {}".format(
      algorithm, private_key_file
    )
  )

  return jwt.encode(token, private_key, algorithm=algorithm)

def error_str(rc):
  """Convert a Paho error to a human-readable string."""
  return "{}: {}".format(rc, mqtt.error_string(rc))


def on_connect(unused_client, unused_userdata, unused_flags, rc):
  """Callback for when a device connects."""
  print("on_connect", mqtt.connack_string(rc))

  # After a successful connect, reset backoff time and stop backing off.
  global should_backoff
  global minimum_backoff_time
  should_backoff = False
  minimum_backoff_time = 1


def on_disconnect(unused_client, unused_userdata, rc):
  """Paho callback for when a device disconnects."""
  print("on_disconnect", error_str(rc))

  # Since a disconnect occurred, the next loop iteration will wait with
  # exponential backoff.
  global should_backoff
  should_backoff = True


def on_publish(unused_client, unused_userdata, unused_mid):
  """Paho callback when a message is sent to the broker."""
  print("on_publish")


def on_message(unused_client, unused_userdata, message):
  """Callback when the device receives a message on a subscription."""
  payload = str(message.payload.decode("utf-8"))
  print(
    "Received message '{}' on topic '{}' with Qos {}".format(
      payload, message.topic, str(message.qos)
    )
  )


def attach_device(client, device_id, auth):
    """Attach the device to the gateway."""
    # [START iot_attach_device]
    attach_topic = "/devices/{}/attach".format(device_id)
    attach_payload = '{{"authorization" : "{}"}}'.format(auth)
    client.publish(attach_topic, attach_payload, qos=1)
    # [END iot_attach_device]


def detach_device(client, device_id):
    """Detach the device from the gateway."""
    # [START iot_detach_device]
    detach_topic = "/devices/{}/detach".format(device_id)
    print("Detaching: {}".format(detach_topic))
    client.publish(detach_topic, "{}", qos=1)
    # [END iot_detach_device]


def get_client(
  project_id,
  cloud_region,
  registry_id,
  device_id,
  private_key_file,
  algorithm,
  ca_certs,
  mqtt_bridge_hostname,
  mqtt_bridge_port,
):
  """Create our MQTT client. The client_id is a unique string that identifies
  this device. For Google Cloud IoT Core, it must be in the format below."""
  client_id = "projects/{}/locations/{}/registries/{}/devices/{}".format(
    project_id, cloud_region, registry_id, device_id
  )
  print("Device client_id is '{}'".format(client_id))

  client = mqtt.Client(client_id=client_id)

  # With Google Cloud IoT Core, the username field is ignored, and the
  # password field is used to transmit a JWT to authorize the device.
  client.username_pw_set(
    username="unused", password=create_jwt(project_id, private_key_file, algorithm)
  )

  # Enable SSL/TLS support.
  client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)

  # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
  # describes additional callbacks that Paho supports. In this example, the
  # callbacks just print to standard out.
  client.on_connect = on_connect
  client.on_publish = on_publish
  client.on_disconnect = on_disconnect
  client.on_message = on_message

  # Connect to the Google MQTT bridge.
  client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

  # This is the topic on which the device will receive configuration updates.
  mqtt_config_topic = "/devices/{}/config".format(device_id)

  # Subscribe to the config topic.
  client.subscribe(mqtt_config_topic, qos=1)

  # The topic that the device will receive commands on.
  mqtt_command_topic = "/devices/{}/commands/#".format(device_id)

  # Subscribe to the commands topic; QoS 1 enables message acknowledgment.
  print("Subscribing to {}".format(mqtt_command_topic))
  client.subscribe(mqtt_command_topic, qos=0)

  return client


def listen_for_messages(
  service_account_json,
  project_id,
  cloud_region,
  registry_id,
  device_id,
  gateway_id,
  num_messages,
  private_key_file,
  algorithm,
  ca_certs,
  mqtt_bridge_hostname,
  mqtt_bridge_port,
  jwt_expires_minutes,
  duration,
  cb=None,
):
  """Listens for messages sent to the gateway and bound devices."""
  # [START iot_listen_for_messages]
  global minimum_backoff_time

  jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
  jwt_exp_mins = jwt_expires_minutes
  # Use the gateway to connect to the server
  client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
  )

  attach_device(client, device_id, "")
  print("Waiting for device to attach.")
  time.sleep(5)

  # The topic devices receive configuration updates on.
  device_config_topic = "/devices/{}/config".format(device_id)
  client.subscribe(device_config_topic, qos=1)
  
  # The topic gateways receive configuration updates on.
  gateway_config_topic = "/devices/{}/config".format(gateway_id)
  client.subscribe(gateway_config_topic, qos=1)
  
  # The topic gateways receive error updates on. QoS must be 0.
  error_topic = "/devices/{}/errors".format(gateway_id)
  client.subscribe(error_topic, qos=0)
  
  # Wait about a minute for config messages.
  for i in range(1, duration):
    client.loop()
    
    if cb is not None:
      cb(client)

    if should_backoff:
      # If backoff time is too large, give up.
      if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
        print("Exceeded maximum backoff time. Giving up.")
        break

      delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
      time.sleep(delay)
      minimum_backoff_time *= 2
      client.connect(mqtt_bridge_hostname, mqtt_bridge_port)
    
    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
      print("Refreshing token after {}s".format(seconds_since_issue))
      jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
      client = get_client(
        project_id,
        cloud_region,
        registry_id,
        gateway_id,
        private_key_file,
        algorithm,
        ca_certs,
        mqtt_bridge_hostname,
        mqtt_bridge_port,
      )
    
    time.sleep(1)

  detach_device(client, device_id)

  print("Finished.")
  # [END iot_listen_for_messages]



service_account_json = "/path/to/your/credentials.json"
project_id = "YOUR_PROJECT_ID"
cloud_region = "us-central1"
registry_id = "your-registry-id"
gateway_id = "your-gateway-id"
device_id = "your-test-device"

gateway_private_key_file = "/path/to/your/gateway/rsa_private.pem"
algorithm = "RS256"
ca_certs = "DigiCertGlobalRootCA.crt.pem"
mqtt_bridge_hostname = "us-central1-mqtt.clearblade.com"
mqtt_bridge_port = 8883

num_messages = 10
jwt_expires_minutes = 60
duration = 3

listen_for_messages(
  service_account_json,
  project_id,
  cloud_region,
  registry_id,
  device_id,
  gateway_id,
  num_messages,
  gateway_private_key_file,
  algorithm,
  ca_certs,
  mqtt_bridge_hostname,
  mqtt_bridge_port,
  jwt_expires_minutes,
  duration,
  cb=None,
)

ClearBlade IoT Core sends gateway errors on a best-effort basis, delivered over QoS 0. If the gateway isn't subscribed to /devices/{gateway_ID}/errors, ClearBlade IoT Core logs failure events but doesn't send a PUBACK message.

MQTT errors have this structure:

CODE
string error_type;  // An error type's string description.
string device_id;   // The device ID that caused the error.
string description; // An error's description.

If the error message were triggered by an MQTT message, this information would be attached as well:

CODE
string message_type;  // The string MQTT message type.
string topic;  // The MQTT topic, if applicable; otherwise, it is empty.
int packet_id;  // The MQTT message's packet ID, if applicable; otherwise, it is zero.

Error codes and error handling

Error code

Description

Recommended action

GATEWAY_ATTACHMENT_ERROR

A gateway attachment request failed.

Do not retry without fixing the problem.

GATEWAY_DEVICE_NOT_FOUND

The gateway could not find an attached device to handle an incoming message.

Do not retry without fixing the problem.

GATEWAY_INVALID_MQTT_TOPIC

The gateway could not parse the specified MQTT topic, which was ill-formatted or contained an invalid device ID or name.

Do not retry without fixing the problem.

GATEWAY_UNEXPECTED_PACKET_ID

The gateway could not process the message based on its packet ID. For example, a PUBACK may have contained a packet ID, but nothing awaited the response.

Do not retry without fixing the problem.

GATEWAY_UNEXPECTED_MESSAGE_TYPE

The gateway received unexpected messages, such as unsupported PUBREL, PUBREC, etc.

Do not retry without fixing the problem.

GATEWAY_DETACHMENT_DEVICE_ERROR

The gateway detached a device because of a device error.

Do not retry without fixing the problem.

UNKNOWN

The error is unknown.

Retry using exponential backoff.

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.