This repository was archived by the owner on Feb 6, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js.js
122 lines (113 loc) · 3.22 KB
/
index.js.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
require('dotenv').config()
const needle = require('needle');
const token = process.env.TWITTER_TOKEN;
const rulesURL = 'https://api.twitter.com/2/tweets/search/stream/rules';
const streamURL = 'https://api.twitter.com/2/tweets/search/stream';
const rules = require("./rules.json")
async function getAllRules() {
const response = await needle('get', rulesURL, {
headers: {
"authorization": `Bearer ${token}`
}
})
if (response.statusCode !== 200) {
logger.error("Error:", response.statusMessage, response.statusCode)
throw new Error(response.body);
}
return (response.body);
}
async function deleteAllRules(rules) {
if (!Array.isArray(rules.data)) {
return null;
}
const ids = rules.data.map(rule => rule.id);
const data = {
"delete": {
"ids": ids
}
}
const response = await needle('post', rulesURL, data, {
headers: {
"content-type": "application/json",
"authorization": `Bearer ${token}`
}
})
if (response.statusCode !== 200) {
throw new Error(response.body);
}
return (response.body);
}
async function setRules() {
const data = {
"add": rules
}
const response = await needle('post', rulesURL, data, {
headers: {
"content-type": "application/json",
"authorization": `Bearer ${token}`
}
})
if (response.statusCode !== 201) {
throw new Error(response.body);
}
return (response.body);
}
function streamConnect(retryAttempt) {
const stream = needle.get(streamURL, {
headers: {
"User-Agent": "v2FilterStreamJS",
"Authorization": `Bearer ${token}`
},
timeout: 20000
});
stream.on('data', data => {
try {
const json = JSON.parse(data);
handleStreamMessage(json)
retryAttempt = 0;
} catch (e) {
if (data.detail === "This stream is currently at the maximum allowed connection limit.") {
clogger.error(data.detail)
process.exit(1)
} else {
}
}
}).on('err', error => {
if (error.code !== 'ECONNRESET') {
logger.error(error.code);
process.exit(1);
} else {
setTimeout(() => {
logger.info("A connection error occurred. Reconnecting...")
streamConnect(++retryAttempt);
}, 2 ** retryAttempt)
}
});
return stream;
}
(async () => {
let currentRules;
try {
currentRules = await getAllRules();
await deleteAllRules(currentRules);
await setRules();
} catch (e) {
logger.error(e);
process.exit(1);
}
streamConnect(0);
})();
require('dotenv').config()
const httpServer = require("http").createServer();
const io = require("socket.io")(httpServer, {});
const logger = require("@bunnylogger/bunnylogger")
logger.start(`Server running on ${process.env.SERVER_IP}:${process.env.SERVER_PORT}`)
httpServer.listen(process.env.SERVER_PORT);
function handleStreamMessage(message) {
try {
io.emit('tweet', { message });
}
catch (e) {
logger.error(e)
}
}