forked from panthershark/pipeline
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
119 lines (93 loc) · 3.46 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
var _ = require("lodash"),
util = require("util"),
events = require('events');
/* Example:
var Pipeline = require("pipeline/Pipeline"),
job = Pipeline.create(),
h = require("tests/pipeline/Hello"),
w = require("tests/pipeline/World");
job.use(new h());
job.use(new w());
job.on("end", function(dictionary) {
console.log(dictionary.hello + ' ' + dictionary.world);
process.exit();
});
job.execute();
*/
exports.create = function(name) {
return new Pipeline(name);
};
var Pipeline = function(name) {
var that = this;
this.name = name || '';
this.currentStep = 0;
this.results = [];
this.steps = [];
this.on('step', function(name, action) {
process.nextTick(action);
});
this.on('next', function(err, params) {
process.nextTick( _.bind(that.execute, that, err, params) );
});
};
util.inherits(Pipeline, events.EventEmitter);
// Adds a function to the chain.
// @fn (function): The function to execute in form of fn(err, data) for that step.
// @name (string): Optional friendly string for the step. Names are here to support a future feature where an event can be emitted when a step starts and completes.
Pipeline.prototype.use = function(fn, name) {
this.steps.push({ name: name, run: fn });
return this;
};
// Start the execution of the pipeline. Initial execution can be called like this: pl.execute({ foo: "moo", poo: "doo" })
// @err (Object): An standard convention callback param for reporting errors.
// @params (Object): The initial object state for the pipeline or the callback for a given step. The initial params are available at index zero of the results array that carries state thorugh the call cycle.
Pipeline.prototype.execute =function(err, params) {
var p = Array.prototype.slice.call(arguments, arguments.length - 1);
this.results.push( p.length > 0 ? p[0] : null );
// if the err was thrown on a step, then abort the pipeline
// alternately if there are no more steps, then end.
if ( (this.currentStep > 0 && err) || this.currentStep >= this.steps.length) {
this.stop();
this.emit('end', err, this.results);
if (err) {
this.emit('error', err, this.results);
}
return this;
}
var that = this,
step = this.steps[this.currentStep],
action = _.bind(function(r, n) {
// catch an error instantiating step. Callback also has err field to report errors on callbacks.
try {
var err = step.run(r, n);
if (err) {
throw err;
}
} catch (e) {
// TODO: add stack trace
that.stop();
that.emit('error', e, that.results);
that.emit('end', e, that.results);
}
}, this, this.results, _.bind(this.next, this) );
// execute the step.
this.currentStep++;
this.emit('step', step.name, action);
return this;
};
// Start the execution of the pipeline. Initial execution can be called like this: pl.execute({ foo: "moo", poo: "doo" })
// @err (Object): An standard convention callback param for reporting errors.
// @params (Object): The initial object state for the pipeline or the callback for a given step. The initial params are available at index zero of the results array that carries state thorugh the call cycle.
Pipeline.prototype.next =function(err, params) {
this.emit('next', err, params);
};
Pipeline.prototype.reset = function() {
this.currentStep = 0;
this.results = [];
return this;
};
Pipeline.prototype.stop = function() {
// by setting the currentStep, this will allow any pending async ops to finish before actually stopping.
this.currentStep = this.steps.length;
return this;
};