Skip to main content
Skip table of contents

Stream services

Introduction

A stream service is a software service that can be configured to handle requests continuously. The most common use of stream services within the ClearBlade Platform or Edge is to listen to MQTT topics and handle MQTT message requests from devices/users.

When to use stream services?

  • When users invoke a code service more than once a minute through a timer or trigger event

  • When users expect a real-time response since they reduce the time to invoke the code service

  • When users choose to perform parallel processing of incoming data, i.e., distribution of the incoming loads across multiple instances of the same stream service

Configuration

Set execution timeout to never.

Template for stream services

Parallel execution using shared topic

Stream services are very efficient and can be designed to perform parallel processing. In this template, users can have ten stream service instances.

The template code below is written so that every stream service instance acts as a unique client to the shared MQTT group topic. Shared subscriptions allow for uniform load distribution across all the stream service instances. Refer to Shared subscriptions to understand how shared topics work.

CODE
function SharedTopicService(req, resp) {
  ClearBlade.init({ request: req });
  var messaging = ClearBlade.Messaging();
  
  var sharedTopic = "$share/Group1/device/+";

  var deviceCollection = ClearBlade.Collection({
    collectionName: "some_collection"
  });

  // DEBUG MESSAGE
  messaging.publish("success", "Service Started");
  messaging.subscribe(sharedTopic, function(err, errMsg) {
    if (err) {
      // DEBUG MESSAGE
      messaging.publish("error", "Subscribe failed: " + errMsg);
      resp.error();
    }
    // DEBUG MESSAGE
    messaging.publish("success", "Subscribed to Shared Topic");
    // Once successfully subscribed
    WaitLoop();
  });

  function WaitLoop() {
    // DEBUG MESSAGE
    messaging.publish("success", "Starting the Loop");
    while (true) {
      messaging.waitForMessage([sharedTopic], function(err, msg, topic) {
        if (err) {
          // DEBUG MESSAGE
          messaging.publish("error", "Failed to wait for message: " + err + " " + msg + "  " + topic);
          resp.error("Failed to wait for message: " + err + " " + msg + "    " + topic);
        } else {
          // any action
          addCollectionRow(msg);
        }
      });
    }
  }

  function addCollectionRow(msg) {
    try {
      var parseMsg = JSON.parse(msg);
    } catch (e) {
      // DEBUG MESSAGE
      messaging.publish("error","Problem with parsing: " + e);
      
      resp.error("Problem with parsing: " + e);
    }
    var data = {
      column_1_name: "column_1_data",
      column_2_name: "column_2_data"
    };
    //debugging
    deviceCollection.create(data, function(err, result) {
      if (err) {
        // DEBUG MESSAGE
        messaging.publish("error", "failed to create: " + result);
        resp.error("create failed: " + result);
      } else {
        //no op
      }
    });
  }

}
JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.