Skip to main content
Skip table of contents

Streaming MQTT Connectors

ClearBlade’s streaming MQTT Connectors along for setting standard bi-directional mappings between data flowing over ClearBlade Brokered MQTT topics to Third party cloud streaming services and middleware.

This is ideal for sending MQTT messages into your cloud services or integrating data from other services directly with your ClearBlade application.

Currently supporting third party streaming offerings are

  • Google Pub/Sub - supports offline caching up to 10k messages

  • AWS Kinesis

  • Apache Kafka

  • Apache Pulsar

Technical Overview

To create a stream MQTT connect all that is required is to make a REST API call to the mqtt-connector endpoints and the system will immediately began honoring the configured mapping. ClearBlade recommends making this REST API call in a microservice that is triggered to run at IoT Enterprise environment start.

Environment Setup

Generally MQTT Connectors are designed to be started as the IoT Enterprise comes online and begins to allow device connections. When the service runs at start up it will make an HTTP call to the platform The recommended way to implement this is

  1. Create a new Code Service of type micro-service

  2. Include the http library

  3. Click save

Next: Follow the connector setup. Use the sample code below and the appropriate configuration settings for your enterprise streaming binding.

  1. When you are done click “Save and Test” to apply your mapping.

Connector Setup

The triggered micro-service will need to make an API call to the IoT Enterprise platform with the necessary configuration.

API:

Create

URI: /api/v/1/mqtt-connectors/{systemmKey}/name

Method: POST

Headers: 'ClearBlade-UserToken': <userToken>

Body: { "type": <string>, "credentials": <object>, "config": <object> }

Update

URI: /api/v/1/mqtt-connectors/{systemmKey}/name

Method: PUT

Headers: 'ClearBlade-UserToken': <userToken>

Body: { "type": <string>, "credentials": <object>, "config": <object> }

Get

URI: /api/v/1/mqtt-connectors/{systemmKey}/name

Method: PUT

Headers: 'ClearBlade-UserToken': <userToken>

Body:

Get All

URI: /api/v/1/mqtt-connectors/{systemmKey}/

Method: GET

Headers: 'ClearBlade-UserToken': <userToken>

Body:

Delete

URI: /api/v/1/mqtt-connectors/{systemmKey}/name

Method: DELETE

Headers: 'ClearBlade-UserToken': <userToken>

Body:

Sample Code

Ex: A sample micro-service to create a new MQTT Connector Configuration

CODE
function StartMQTTConnector(req, resp) {

  const params = req.params;

  const url = cbmeta.platform_url+"/api/v/1/mqtt-connectors/req.systemKey/{name}";
  const connectorConfig = {
    "type": "pubsub",   // pubsub, kinesis, kafka, pulsar
    "credentials": {},  // specific to each type of connector
    "config": {}        // populate according to the connector
  }

  var options = {
    "uri":cbmeta.platform_url,
    "body": connectorConfig,
    "headers": {
      "Content-Type" : "application/json",
      'ClearBlade-UserToken': req.userToken
    },
  }
  var requestObject = Requests();
  // POST to create the MQTT connector
  requestObject.post(options,function(err,body){
    if(err){
      //the error is a JSON value of the error in question, shaped like {"error":"message"}
      resp.error("Failed to create MQTT Connector:" +err);
    }else{
      //body is JSON of the response
      resp.success("Created MQTT Connector: "+body)
    }
  });
  
}

Connector Configs

Google Pubsub

Credential

CODE
"credentials": { // This is the service account JSON object
        "type": "service_account",
        "project_id": "myprojectid",
        "private_key_id": "blahblahblah",
        "private_key": "<priv_key>",
        "client_email": "a@a.com",
        "client_id": "9126871624161341",
        "auth_uri": "https://accounts.google.com/o/oauth2/auth",
        "token_uri": "https://oauth2.googleapis.com/token",
        "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
        "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/dahjkdhajkd%40dajsdgajdadsad.iam.gserviceaccount.com"
    },

Config

CODE
{
  "topic_mappings": [
    {
      "pubsub_topic": "my-topic",
      "outgoing": "foo/+/bar",
      "type": 0,
      "sub_folder": ""
    },
    {
      "pubsub_topic": "my-subscription",
      "incoming": "foo/bar/baz",
      "type": 1,
      "sub_folder": "bob"
    }
  ]
}


For Non IoT Core integrations, type is not used.

The following parameters exist for setting up your Google Pub/Sub forwarders to ClearBlade MQTT:

ack_strategy: 0 = ack immediately, 1 = never ack. For now, 0 should always be used.

poll_rate: Time (ms) between sending message batches to the broker. 0 will send messages immediately (deprecated after 9.36.x - ignored if included)

max_messages_per_poll: Max number of messages to send in a single batch to the broker. Implicit minimum value of 1 (deprecated after 9.36.x - ignored if included)

Messages are received by ClearBlade as soon as possible from Google. ClearBlade will buffer them internally and send them in batches as defined above.

AWS Kinesis

Credentials Object

CODE
{
  "access_key": <string>,
  "secret": <string>,
  "region": <string>
}

Config Object:

CODE
{
  "topic_mappings": {
      <stream_name::string>: {
          "outgoing": <outgoing_mqtt_topic::string>,
          "incoming": <incoming_mqtt_topic::string>
      }
  },
  "partition_key": <string>
}

All fields required.

stream_name is kinesis stream name.

Data published on theoutgoing_mqtt_topic will be sent to the respective kinesis stream. This topic can have wildcards and it can be a shared topic.

Data received from the kinesis stream will be published on the incoming_mqtt_topic. This topic cannot contain wildcards or be a shared topic.

partition_key specifies the type of partition key used when putting records on the kinesis stream. The partition key has three types:

  • clientid: The MQTT client ID of the publisher will be used as the partition key

  • topic: The MQTT topic path will be used as the partition key

  • random: A randomly generated string will be used as the partition key

Apache Pulsar

Credentials object:

CODE
{
  "url": <string>,
  "tls_certificate": <string>,
  "tls_key": <string>,
  "tls_trusted_certificate": <string>
}

url field is required, TLS fields are optional.

Config object:

CODE
{
    "outgoing": [{
        "pulsar_publish_topic": <string>,
        "mqtt_subscribe_topic": <string>,
        "key": <string>,
        "ordering_key": <string>,
        "hashing_scheme": <int>,
        "compression_type": <int>,
        "compression_level": <int>,
        "router_type": <int>
    }],
    "incoming": [{
        "pulsar_subscribe_topic_pattern": <string>,
        "mqtt_publish_topic": <string>,
        "subscription_name": <string>,
        "subscription_type": <int>
    }]
}

Outgoing config options:

pulsar_publish_topic and mqtt_subscribe_topic fields are required.

hashing_scheme: 0 (default) = Java string hash, 1 = Murmur3 hash

compression_type: 0 (default) = no compression, 1 = LZ4, 2 = zlib, 3 = zstd

compression_level: 0 (default) = standard, 1 = faster, 2 = better

router_type: 0 (default) = round robin, 1 = single partition

Incoming config options:

pulsar_subscribe_topic_pattern, mqtt_publish_topic, and subscription_name are required.

subscription_type: 0 (default) = exclusive, 1 = shared, 2 = failover, 3 = key shared

! Note ! exclusive subscriptions will cause errors in clusters, as each cluster node will attempt to establish an exclusive subscription. Either shared or key shared is recommended in clusters.

Apache Kafka

Kafka credentials and config

Credentials object:

CODE
{
  "seed_brokers": [<string>],
  "tls": <bool>
  "auth_mechanism": <string>,
  "auth_params": <object>
},

At least one seed broker url required. auth_mechanism required. If auth_mechanism != none, auth_params is required.

auth_mechanism can be one of: none, plain, oauth, aws, scram256, scram512.

auth_params:

CODE
plain: {
  Zid: <string>, // optional authorization id
  User: <string>,
  Pass: <string>
}
oauth: {
  Zid: <string>, // optional authorization id
  Token: <string>,
  Extensions: <object>
}
aws: {
  AccessKey: <string>,
  SecretKey: <string>,
  SessionToken: <string>, // optional
  UserAgent: <string> // optional
}
scram256/scram512: {
  Zid: <string>, // optional authorization id
  User: <string>,
  Pass: <string>,
  Nonce: <[byte]>, // optional
  IsToken: <bool>, // set if user/pass are from a delegation token
}

Config object:

CODE
{
  "topic_mappings": {
      <string>: {
        "incoming": <string>,
        "outgoing": <string>
      },
      ...
  }
}

The topic_mappings object keys are Kafka topic names. The incoming and outgoing fields are both MQTT topics. You must provide either an incoming or outgoing MQTT topic (or both) for every Kafka topic.

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.