readmongo_service

App

src: readmongo/swarmlab-app/src/run/

stream2mongo

async function onCollectionNew(err, collection) {
  let options = {
    tailable: true,
    awaitdata: true,
    numberOfRetries: -1,
    tailableRetryInterval: 500,
  };
  var cursor = collection.find({}, options).stream();
  var itemsProcessed = 0;
  var room = this.user;
  var sid = this.id;
  console.log("Inside callback: " + room + " Id: " + sid);
  var rep = setUser(sid, room);

  cursor.on("data", async function (data) {
    cursor.pause();
    var res = await getkey(sid);

    if (res == "1") {
      cursor.resume();
      var obj = JSON.parse(JSON.stringify(data));
      io.in(room).emit("logsend", obj);
    } else if (res == "2") {
      cursor.resume();
      console.log("Cursor is closing...");
      cursor.close();
    }
  });
}
var cursor = collection.find({}, options).stream();

A Tailable Cursor remains open after the client exhausts the results in the initial cursor.

Tailable cursors are conceptually equivalent to the tail Unix command with the -f option (i.e. with "follow" mode).

After clients insert new additional documents into a capped collection, the tailable cursor will continue to retrieve documents.

socket (open,checkstream,event)

const pubClient = new Redis({
  host: REDIS,
  port: REDIS_PORT,
});

  // ------------------------------
  // read from redis
  // ------------------------------
async function getkey(id) {
  return new Promise((resolve) => {
    pubClient.get(id, function (err, reply) {
      if (err) {
        resolve(null);
      } else {
        if (reply) {
          //console.log("---------fount----------");
          resolve(1);
        } else {
          console.log("----------not fount------------");
          resolve(2);
        }
      }
    });
  });
}

  // ------------------------------
  //   check if stream exists
  // ------------------------------
async function checkstream(data) {
  var res = await getkey(data.id);
  if (res == "1") {
    console.log("Stream is on!");
  } else {
    console.log("Creating Stream....");

    var url = URL;
    MongoClient.connect(
      url,
      { useNewUrlParser: true, useUnifiedTopology: true },
      function (err, db) {
        if (err) throw err;
        var dbo = db.db(DATABASE);
        dbo.collection(COLLECTION, onCollectionNew.bind(data));
      }
    );
  }
}

  // ------------------------------
  // --- open socket -------------
  // ------------------------------
io.on("connection", (s) => {
  console.error("socket connection");
  var usersession = new Object();
  usersession.SOCKET = {};
  usersession.SOCKET.error = {};
  console.error("socket ...");
  s.auth = false;

  // ------------------------------
  // --- authenticate
  // ------------------------------
  s.on("authenticate", function (data) {
    const token = data;
    (async () => {
      var isvalid = await checkToken(token);
      if (isvalid.action == "ok") {
        usersession.SOCKET.user = isvalid.user;
        usersession.SOCKET.scope = isvalid.scope; // space delimeter
        usersession.SOCKET.token = isvalid.token;
        usersession.SOCKET.id = s.id;
        s.auth = true;
      } else {
        s.auth = false;
      }
    })();
  });

  // ------------------------------
  // --- event ----------------
  // ------------------------------
  s.on("onevent", function (data) {
    var binddata = {
      user: data,
      id: s.id,
    };
    checkstream(binddata);
  });

}

restart

To make changes become effective a restart is not required

You may have to wait (couple of minutes) for the system to fully provision resources. You may have to refresh the web interface a couple of times!