Skip to main content
Skip table of contents

Stream services using mqtt and clearblade_async libraries

Introduction

A stream service is a code service that can be configured to handle requests continuously. Stream services within the ClearBlade Platform or Edge are most commonly used to listen to MQTT topics and handle MQTT message requests from devices/users.

When to use stream services

  • When users invoke a code service more than once a minute through a timer or trigger event

  • When users expect a real-time response since they reduce the time to invoke the code service

  • When users choose to perform parallel processing of incoming data, i.e., distribution of the incoming loads across multiple instances of the same stream service

Configuration

The configuration parameters that differentiate a code service as a stream service are found in the service’s Concurrency Setings in the Settings tab:

image-20260210-191127.png
  1. Execution timeout set to NEVER.

  2. Auto balance selected. This is optional but recommended. This setting ensures the number of instances of the stream service running is the same on each pod / node in the environment. This setting also ensures that instances are restarted if they stop due to a failure.

  3. Enable Auto Scale. If selected this allows setting a range for the number of instances of this stream service per pod / node. Selecting this disables the subsequent Concurrency setting.

  4. Concurrency. If Enable Auto Scale is not selected, then this must be set to a finite integer of value 1 or more. This defines a static number of instances of the stream service to run per pod / node.

Template for stream services

Stream services are designed to efficiently perform multiple tasks concurrently. This is because a stream service can have multiple instances running at a given time. The instances perform tasks based on incoming MQTT messages. To ensure that an incoming MQTT message is handled by a single instance, Shared Subscriptions are used.

Let’s say publishers publish to topic abc/def and these messages should be handled by instances of a stream service. Then the stream service instances would subscribe to $share/shareGroupName/abc/def where shareGroupName can be any unique name. By subscribing to this Shared Topic instances avoid receiving duplicate messages. This ensures that instances share the message load and work in parallel. Look here for more details on Shared Subscriptions.

The following is a template for stream service and uses a shared subscription. Add the native mqtt and clearblade_async libraries as service dependencies.

CODE
function SharedTopicService(req, resp) {

    // This stream service receives messages on topics that match the shared topic pattern.
    // It retrieves the serial number of the widget from the last part of the topic and gets the widget data from the message.
    // It then updates the row in 'widgets_collection' to correspond to the serial number if it is present. Otherwise, it adds a new row.
    const messaging = new MQTT.Client();
    const sharedTopic = "$share/shareGroup1/widgets/+";
    const errorsTopic = "errors"
    const widgetsCollection = ClearBladeAsync.Collection("widgets_collection");
    const widgetSerialNumberColumnName = "serial_number";

    function publishErrorMessage(originalErrorMessage) {
        // Error handling function
        messaging.publish(errorsTopic, originalErrorMessage).then(function () {
            resp.error(originalErrorMessage);
        }).catch(function (publishError) {
            const overallErrorMessage = originalErrorMessage + "; also failed to publish to 'errors' topic: " + JSON.stringify(publishError);
            resp.error(overallErrorMessage);
        })
    }

    messaging.subscribe(sharedTopic, onMessage).catch(function (subscribeError) {
        // Subscribe to shared topic
        const subscribeErrorMessage = "Failed to subscribe to sharedTopic: " + JSON.stringify(subscribeError);
        publishErrorMessage(subscribeErrorMessage);    
    });

    function onMessage(incomingTopic, incomingMessage) {
        // Message handler
        try {
            // Get widget serial number from topic and widget data from message.
            // Combine both into one object used to upsert widgets_collection
            const incomingTopicArray = incomingTopic.split("/");
            const incomingSerialNumber = incomingTopicArray[incomingTopicArray.length - 1];
            var widgetData = JSON.parse(incomingMessage.payload);
            widgetData[widgetSerialNumberColumnName] = incomingSerialNumber;
        } catch (parseError) {
            const parseErrorMessage = "Failed to parse incoming message: " + JSON.stringify(parseError);
            publishErrorMessage(parseErrorMessage);
        };
        widgetsCollection.upsert(widgetData, "serial_number").catch(function(widgetUpsertError) {
            // Upsert into widgets_collection: if a row already exists for the incoming serial number, then update that row;
            // Otherwise create a new row for that serial number including the 
            const widgetUpsertErrorMessage = "Failed to upsert widget: " + JSON.stringify(widgetUpsertError);
            publishErrorMessage(widgetUpsertError);
            resp.error(widgetUpsertError);
        })
    }
}
JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.