Stream services using mqtt and clearblade_async libraries
Introduction
A stream service is a code service that can be configured to handle requests continuously. Stream services within the ClearBlade Platform or Edge are most commonly used to listen to MQTT topics and handle MQTT message requests from devices/users.
When to use stream services
When users invoke a code service more than once a minute through a timer or trigger event
When users expect a real-time response since they reduce the time to invoke the code service
When users choose to perform parallel processing of incoming data, i.e., distribution of the incoming loads across multiple instances of the same stream service
Configuration
The configuration parameters that differentiate a code service as a stream service are found in the service’s Concurrency Setings in the Settings tab:

Execution timeout set to NEVER.
Auto balance selected. This is optional but recommended. This setting ensures the number of instances of the stream service running is the same on each pod / node in the environment. This setting also ensures that instances are restarted if they stop due to a failure.
Enable Auto Scale. If selected this allows setting a range for the number of instances of this stream service per pod / node. Selecting this disables the subsequent Concurrency setting.
Concurrency. If Enable Auto Scale is not selected, then this must be set to a finite integer of value 1 or more. This defines a static number of instances of the stream service to run per pod / node.
Template for stream services
Stream services are designed to efficiently perform multiple tasks concurrently. This is because a stream service can have multiple instances running at a given time. The instances perform tasks based on incoming MQTT messages. To ensure that an incoming MQTT message is handled by a single instance, Shared Subscriptions are used.
Let’s say publishers publish to topic abc/def and these messages should be handled by instances of a stream service. Then the stream service instances would subscribe to $share/shareGroupName/abc/def where shareGroupName can be any unique name. By subscribing to this Shared Topic instances avoid receiving duplicate messages. This ensures that instances share the message load and work in parallel. Look here for more details on Shared Subscriptions.
The following is a template for stream service and uses a shared subscription. Add the native mqtt and clearblade_async libraries as service dependencies.
function SharedTopicService(req, resp) {
// This stream service receives messages on topics that match the shared topic pattern.
// It retrieves the serial number of the widget from the last part of the topic and gets the widget data from the message.
// It then updates the row in 'widgets_collection' to correspond to the serial number if it is present. Otherwise, it adds a new row.
const messaging = new MQTT.Client();
const sharedTopic = "$share/shareGroup1/widgets/+";
const errorsTopic = "errors"
const widgetsCollection = ClearBladeAsync.Collection("widgets_collection");
const widgetSerialNumberColumnName = "serial_number";
function publishErrorMessage(originalErrorMessage) {
// Error handling function
messaging.publish(errorsTopic, originalErrorMessage).then(function () {
resp.error(originalErrorMessage);
}).catch(function (publishError) {
const overallErrorMessage = originalErrorMessage + "; also failed to publish to 'errors' topic: " + JSON.stringify(publishError);
resp.error(overallErrorMessage);
})
}
messaging.subscribe(sharedTopic, onMessage).catch(function (subscribeError) {
// Subscribe to shared topic
const subscribeErrorMessage = "Failed to subscribe to sharedTopic: " + JSON.stringify(subscribeError);
publishErrorMessage(subscribeErrorMessage);
});
function onMessage(incomingTopic, incomingMessage) {
// Message handler
try {
// Get widget serial number from topic and widget data from message.
// Combine both into one object used to upsert widgets_collection
const incomingTopicArray = incomingTopic.split("/");
const incomingSerialNumber = incomingTopicArray[incomingTopicArray.length - 1];
var widgetData = JSON.parse(incomingMessage.payload);
widgetData[widgetSerialNumberColumnName] = incomingSerialNumber;
} catch (parseError) {
const parseErrorMessage = "Failed to parse incoming message: " + JSON.stringify(parseError);
publishErrorMessage(parseErrorMessage);
};
widgetsCollection.upsert(widgetData, "serial_number").catch(function(widgetUpsertError) {
// Upsert into widgets_collection: if a row already exists for the incoming serial number, then update that row;
// Otherwise create a new row for that serial number including the
const widgetUpsertErrorMessage = "Failed to upsert widget: " + JSON.stringify(widgetUpsertError);
publishErrorMessage(widgetUpsertError);
resp.error(widgetUpsertError);
})
}
}