Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
xicilion committed Dec 11, 2017
0 parents commit 9c2c461
Show file tree
Hide file tree
Showing 5 changed files with 363 additions and 0 deletions.
1 change: 1 addition & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test
136 changes: 136 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
var ws = require('ws');
var Link = require('./link');

var chs = {};
var idles = new Link();
var idle_limit = 100;
var msg_limit = 100;

function channel(name) {
var conns = new Link();
var msgs = new Link();
var idle_node;
var start_timestamp = new Date();

this.name = name;
this.on = (ws, timestamp) => {
if (idle_node) {
idles.remove(idle_node);
idle_node = undefined;
}

var m = msgs.head();
if (m != undefined) {
if (m.data.timestamp > timestamp)
ws.send(JSON.stringify({
"timestamp": m.data.timestamp
}));

while (m !== undefined && m.data.timestamp < timestamp)
m = m.next;

while (m !== undefined) {
ws.send(m.data.data);
m = m.next;
}
} else {
if (start_timestamp > timestamp)
ws.send(JSON.stringify({
"timestamp": start_timestamp
}));
}

return conns.addTail(ws);
};

this.off = node => {
if (conns.remove(node) === 0) {
idle_node = idles.addTail(this);
if (idles.count() > idle_limit) {
var head = idles.head();
delete chs[head.data.name];
idles.remove(head);
}
}
};

function post(data) {
if (Array.isArray(data))
return data.forEach(d => post(d));

var timestamp = new Date();
var json = JSON.stringify({
timestamp: timestamp,
ch: name,
data: data
});

msgs.addTail({
timestamp: timestamp,
data: json
});

if (msgs.count() > msg_limit)
msgs.remove(msgs.head());

var node = conns.head();
while (node !== undefined) {
node.data.send(json);
node = node.next;
}
};

this.post = post;

this.status = () => {
return conns.toJSON();
}
}

exports.on = (ch, ws, timestamp) => {
if (Array.isArray(ch))
return ch.forEach(c => exports.on(c, ws, timestamp));

var ons = ws._ons;
if (ons === undefined) {
ws._ons = ons = {};
ws.onclose = ev => {
for (var ch in ons)
exports.off(ch, ws);
}
}

var cho = chs[ch];
if (cho === undefined)
chs[ch] = cho = new channel(ch);
ons[ch] = cho.on(ws, timestamp);
};

exports.off = (ch, ws) => {
if (Array.isArray(ch))
return ch.forEach(c => exports.off(c, ws));

var ons = ws._ons;
if (ons !== undefined && ons[ch] !== undefined) {
chs[ch].off(ons[ch]);
delete ons[ch];
}
};

exports.post = (ch, data) => {
var cho = chs[ch];
if (cho !== undefined)
cho.post(data);
};

exports.status = () => {
var r = {};
for (ch in chs)
r[ch] = chs[ch].status();
return r;
};

exports.config = opts => {
idle_limit = opts.idle_limit || idle_limit;
msg_limit = opts.msg_limit || msg_limit;
}
79 changes: 79 additions & 0 deletions lib/link.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
module.exports = function Link() {
var _head, _tail;
var _count = 0;

this.head = () => {
return _head;
}

this.tail = () => {
return _tail;
}

this.count = () => {
return _count;
}

this.addHead = (data, timestamp) => {
var node = {
next: _head,
data: data
};

if (_head)
_head.prev = node;
else
_tail = node;

_head = node;

_count++;

return node;
};

this.addTail = (data, timestamp) => {
var node = {
prev: _tail,
data: data
};

if (_tail)
_tail.next = node;
else
_head = node;

_tail = node;

_count++;

return node;
};

this.remove = node => {
if (_head === node)
_head = node.next;
else
node.prev.next = node.next;

if (_tail === node)
_tail = node.prev;
else
node.next.prev = node.prev;

_count--;

return _count;
};

this.toJSON = () => {
var a = [];

var node = _head;
while (node !== undefined) {
a.push(node.data);
node = node.next;
}
return a;
}
};
8 changes: 8 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"name": "fib-push",
"version": "1.0.0",
"description": "",
"main": "lib/index",
"author": "",
"license": "ISC"
}
139 changes: 139 additions & 0 deletions test/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
const test = require('test');
test.setup();

const push = require('..');

push.config({
idle_limit: 10,
msg_limit: 10
});

describe("push", () => {
it("on/off", () => {
var ws = {
value: 1024
};

var ws1 = {
value: 1025
};

var ws2 = {
value: 1025
};

push.on("aaa", ws);
assert.deepEqual(push.status().aaa, [ws]);

push.on("aaa", ws1);
assert.deepEqual(push.status().aaa, [ws, ws1]);

push.on("aaa", ws2);
assert.deepEqual(push.status().aaa, [ws, ws1, ws2]);

push.off("aaa", ws1);
assert.deepEqual(push.status().aaa, [ws, ws2]);

push.off("aaa", ws2);
assert.deepEqual(push.status().aaa, [ws]);

ws.onclose();
assert.deepEqual(push.status().aaa, []);
});

it("post", () => {
var r = [];
var ws = {
send: m => r.push(m)
};

push.on("aaa", ws);
push.post("aaa", {
a: 100,
b: 200
});

assert.deepEqual(JSON.parse(r[0]).data, {
a: 100,
b: 200
});

push.post("aaa", {
a: 200
});

assert.deepEqual(JSON.parse(r[1]).data, {
a: 200
});
});

it("not post empty channel", () => {
push.post("aaa1", {
a: 100,
b: 200
});

var r = [];
var ws = {
send: m => r.push(m)
};

push.on("aaa1", ws, 0);
assert.equal(r.length, 1);
assert.strictEqual(JSON.parse(r[0]).data, undefined);
assert.property(JSON.parse(r[0]), "timestamp");
});

it("post limit", () => {
var ws = {
send: m => {}
};

push.on("aaa2", ws, 0);
for (var i = 0; i < 100; i++)
push.post("aaa2", {
a: i
});

var r = [];
var ws1 = {
send: m => r.push(m)
};

push.on("aaa2", ws1, 0);

assert.equal(r.length, 11);
assert.strictEqual(JSON.parse(r[0]).data, undefined);
assert.property(JSON.parse(r[0]), "timestamp");

for (var i = 1; i < 11; i++)
assert.strictEqual(JSON.parse(r[i]).data.a, i + 89);
});

it("idle limit", () => {
var ws = {
send: m => {}
};

var chs = Object.keys(push.status());
for (var i = 0; i < 100; i++)
push.on(`idle_${i}`, ws);
ws.onclose();
var chs1 = Object.keys(push.status());

assert.deepEqual(chs1.slice(chs.length), [
"idle_90",
"idle_91",
"idle_92",
"idle_93",
"idle_94",
"idle_95",
"idle_96",
"idle_97",
"idle_98",
"idle_99"
]);
});
});

test.run();

0 comments on commit 9c2c461

Please sign in to comment.