-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathqueue.js
113 lines (102 loc) · 2.86 KB
/
queue.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
module.exports = function queue({
close,
min = 0,
max,
drain,
drainInterval,
skipDrain
}) {
// create a buffer for data
// that have been pushed
// but not yet pulled.
let buffer = [];
// a pushable is a source stream
// (abort, cb) => cb(end, data)
//
// when pushable is pulled,
// keep references to abort and cb
// so we can call back after
// .end(end) or .push(data)
let abort, callback;
let timer;
let ended;
function drainTimerReset() {
if (drain && drainInterval) {
timer && clearTimeout(timer);
timer = setTimeout(notifyDrain, drainInterval);
}
}
function notifyDrain() {
if (ended) return;
drainTimerReset();
if (drain && buffer.length <= min) {
drain(buffer.length);
}
}
function read(_abort, _callback) {
if (_abort) {
abort = _abort;
timer && clearTimeout(timer);
// if there is already a cb waiting, abort it.
if (callback) invokeCallback(abort);
}
callback = _callback;
tryCallback();
}
function end(end) {
ended = ended || end || true;
timer && clearTimeout(timer);
// attempt to drain
tryCallback();
}
function processValue(error, value) {
let skipDrain;
try {
skipDrain = invokeCallback(error, value);
} finally {
if (buffer.length === min && !skipDrain) notifyDrain();
}
}
function push(value) {
if (ended) return;
// if sink already waiting,
// we can call back directly.
if (callback) {
processValue(abort, value);
return;
}
// otherwise push data and
buffer.push(value);
}
return {
push: push,
end: end,
source: read,
length: () => buffer.length,
start: notifyDrain
};
// `tryCallback` calls back to (if any) waiting
// sink with abort, end, or next data.
function tryCallback() {
if (!callback) return;
if (abort) invokeCallback(abort);
else if (!buffer.length && ended) invokeCallback(ended);
else if (buffer.length) processValue(null, buffer.shift());
}
// `callback` calls back to waiting sink,
// and removes references to sink callback.
function invokeCallback(err, val) {
let _callback = callback;
// if error and pushable passed onClose, call it
// the first time this stream ends or errors.
if (err && close) {
let callClose = close;
close = null;
callClose(err === true ? null : err);
}
callback = null;
let result = typeof skipDrain === 'function' && skipDrain(val);
_callback(err, val);
return result;
}
};