-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtwitter_stream_producer.js
136 lines (116 loc) · 4.42 KB
/
twitter_stream_producer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/***
Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Amazon Software License (the "License").
You may not use this file except in compliance with the License.
A copy of the License is located at
http://aws.amazon.com/asl/
or in the "license" file accompanying this file. This file is distributed
on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the specific language governing
permissions and limitations under the License.
***/
'use strict';
var config = require('./config');
var Twit = require('twit');
var util = require('util');
var logger = require('./util/logger');
function twitterStreamProducer(firehose) {
var log = logger().getLogger('producer');
var waitBetweenPutRecordsCallsInMilliseconds = config.waitBetweenPutRecordsCallsInMilliseconds;
var T = new Twit(config.twitter)
// Creates a new kinesis stream if one doesn't exist.
function _createStreamIfNotCreated(callback) {
firehose.describeDeliveryStream({DeliveryStreamName: config.firehose.DeliveryStreamName}, function(err, data) {
if (err) {
firehose.createDeliveryStream(config.firehose, function(err, data) {
if (err) {
// ResourceInUseException is returned when the stream is already created.
if (err.code !== 'ResourceInUseException') {
console.log(err);
callback(err);
return;
}
else {
var msg = util.format('%s stream is already created! Re-using it.', config.firehose.DeliveryStreamName);
console.log(msg);
log.info(msg);
}
}
else {
var msg = util.format('%s stream does not exist. Created a new stream with that name.', config.firehose.DeliveryStreamName);
console.log(msg);
log.info(msg);
}
// Poll to make sure stream is in ACTIVE state before start pushing data.
_waitForStreamToBecomeActive(callback);
});
}
else {
var msg = util.format('%s stream is already created! Re-using it.', config.firehose.DeliveryStreamName);
console.log(msg);
log.info(msg);
}
// Poll to make sure stream is in ACTIVE state before start pushing data.
_waitForStreamToBecomeActive(callback);
});
}
// Checks current status of the stream.
function _waitForStreamToBecomeActive(callback) {
firehose.describeDeliveryStream({DeliveryStreamName: config.firehose.DeliveryStreamName}, function(err, data) {
if (!err) {
if (data.DeliveryStreamDescription.DeliveryStreamStatus === 'ACTIVE') {
log.info('Current status of the stream is ACTIVE.');
callback(null);
}
else {
var msg = util.format('Current status of the stream is %s.', data.DeliveryStreamDescription.DeliveryStreamStatus);
console.log(msg);
log.info(msg);
setTimeout(function() {
_waitForStreamToBecomeActive(callback);
}, 1000 * config.waitBetweenDescribeCallsInSeconds);
}
}
});
}
function _sendToFirehose() {
// var locations = [ '-180,-90,180,90' ]; //all the world
//var stream = T.stream('statuses/filter', { locations: config.locations });
var stream = T.stream('statuses/sample');
var records = [];
var record = {};
var recordParams = {};
stream.on('tweet', function (tweet) {
if (tweet.lang){
if (tweet.lang === 'en'){
//console.log(JSON.stringify(tweet));
recordParams = {
DeliveryStreamName: config.firehose.DeliveryStreamName,
Record: {
Data: JSON.stringify(tweet)+',\n'
}
};
firehose.putRecord(recordParams, function(err, data) {
if (err) {
log.error(err);
}
});
}
}
});
}
return {
run: function() {
log.info(util.format('Configured wait between consecutive PutRecords call in milliseconds: %d',
waitBetweenPutRecordsCallsInMilliseconds));
_createStreamIfNotCreated(function(err) {
if (err) {
log.error(util.format('Error creating stream: %s', err));
return;
}
_sendToFirehose();
});
}
};
}
module.exports = twitterStreamProducer;