Skip to main content
Skip table of contents

Stream services using mqtt and clearblade_async libraries

Introduction

A stream service is a software service that can be configured to handle requests continuously. Stream services within the ClearBlade Platform or Edge are most commonly used to listen to MQTT topics and handle MQTT message requests from devices/users.

When to use stream services?

  • When users invoke a code service more than once a minute through a timer or trigger event

  • When users expect a real-time response since they reduce the time to invoke the code service

  • When users choose to perform parallel processing of incoming data, i.e., distribution of the incoming loads across multiple instances of the same stream service

Configuration

Set execution timeout to never.

Template for stream services

Parallel execution using shared topic

Stream services are very efficient and can be designed to perform parallel processing. In this template, users can have ten stream service instances.

The template code below is written so that every stream service instance acts as a unique client to the shared MQTT group topic. Shared subscriptions allow for uniform load distribution across all the stream service instances. Refer to Shared subscriptions to understand how shared topics work.

Add the native mqtt and clearblade_async libraries as service dependencies.

CODE
function SharedTopicService(req, resp) {

    // This stream service receives messages on topics that match the shared topic pattern.
    // It retrieves the serial number of the widget from the last part of the topic and gets the widget data from the message.
    // It then updates the row in 'widgets_collection' to correspond to the serial number if it is present. Otherwise, it adds a new row.
    const messaging = new MQTT.Client();
    const sharedTopic = "$share/shareGroup1/widgets/+";
    const errorsTopic = "errors"
    const widgetsCollection = ClearBladeAsync.Collection("widgets_collection");
    const widgetSerialNumberColumnName = "serial_number";

    function publishErrorMessage(originalErrorMessage) {
        // Error handling function
        messaging.publish(errorsTopic, originalErrorMessage).then(function () {
            resp.error(originalErrorMessage);
        }).catch(function (publishError) {
            const overallErrorMessage = originalErrorMessage + "; also failed to publish to 'errors' topic: " + JSON.stringify(publishError);
            resp.error(overallErrorMessage);
        })
    }

    messaging.subscribe(sharedTopic, onMessage).catch(function (subscribeError) {
        // Subscribe to shared topic
        const subscribeErrorMessage = "Failed to subscribe to sharedTopic: " + JSON.stringify(subscribeError);
        publishErrorMessage(subscribeErrorMessage);    
    });

    function onMessage(incomingTopic, incomingMessage) {
        // Message handler
        try {
            // Get widget serial number from topic and widget data from message.
            // Combine both into one object used to upsert widgets_collection
            const incomingTopicArray = incomingTopic.split("/");
            const incomingSerialNumber = incomingTopicArray[incomingTopicArray.length - 1];
            var widgetData = JSON.parse(incomingMessage.payload);
            widgetData[widgetSerialNumberColumnName] = incomingSerialNumber;
        } catch (parseError) {
            const parseErrorMessage = "Failed to parse incoming message: " + JSON.stringify(parseError);
            publishErrorMessage(parseErrorMessage);
        };
        widgetsCollection.upsert(widgetData, "serial_number").catch(function(widgetUpsertError) {
            // Upsert into widgets_collection: if a row already exists for the incoming serial number, then update that row;
            // Otherwise create a new row for that serial number including the 
            const widgetUpsertErrorMessage = "Failed to upsert widget: " + JSON.stringify(widgetUpsertError);
            publishErrorMessage(widgetUpsertError);
            resp.error(widgetUpsertError);
        })
    }
}
JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.