-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstream.js
141 lines (116 loc) · 3.66 KB
/
stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// twaaaj streamer
// Authors: Axel Kittenberger ([email protected])
"use strict";
var util = require('util');
var http = require('http');
var url = require('url');
var fs = require('fs');
var mongo = require('mongodb');
var twitter = require('./twitter');
var timers = require('timers');
var keygrip = require('keygrip');
var keys = require('./keys');
var dbhost = '127.0.0.1';
var dbport = mongo.Connection.DEFAULT_PORT;
var dbserver = new mongo.Server(dbhost, dbport, {});
var dbconnector = new mongo.Db('twaaaj', dbserver, {});
var dbclient = null;
var twit = new twitter({
consumer_key : keys.consumer_key,
consumer_secret : keys.consumer_secret,
access_token_key : keys.access_token_key,
access_token_secret : keys.access_token_secret,
keygrip : new keygrip(keys.keygrip),
});
var logst = fs.createWriteStream('./twaaaj.log', {'flags': 'a'});
// logs a console message
function log(text) {
text = (new Date()) + ": " + text;
console.log(text);
logst.write(text);
logst.write('\n');
}
var usersstr = fs.readFileSync('users.json');
var users = JSON.parse(usersstr);
var follow = [];
var track = [];
for(var u = 0; u < users.length; u++) {
follow[follow.length] = users[u].id;
track[track.length] = '@'+users[u].name;
}
function connectDB(callback) {
log('Connecting to database');
dbconnector.open(function(error, p_client) {
if(error) {
log('cannot connect to database');
util.puts(util.inspect(error));
return;
}
// sets the global client variable for easy reuse
dbclient = p_client;
if (callback) callback();
});
};
var reconnectDelay = 500;
var maxReconnectDelay = 5 * 60 * 1000;
var timeoutDelay = 10 * 60 * 1000;
function tweetStream(callback) {
log('--STREAM START--');
var followstr = follow.join(',');
var trackstr = track.join(',');
function dberror(error) {
log('Database error!');
util.puts(util.inspect(error));
process.exit(1);
}
twit.stream('statuses/filter', {'follow' : followstr, 'track' : trackstr}, function(stream) {
var timeoutID = timers.setTimeout(stream.destroy, timeoutDelay);
stream.on('data', function (data) {
timers.clearTimeout(timeoutID);
timeoutID = timers.setTimeout(stream.destroy, timeoutDelay);
reconnectDelay = 500;
if (!data.user || !data.text || !data.id) {
log('Unknown data:'+util.inspect(data));
return;
}
data._id = data.id_str;
data.timestamp = new Date(Date.parse(data.created_at)).getTime();
log(data.user.screen_name+':'+data.text);
dbclient.collection('twdata', function(error, twdata) {
if (error) { dberror(error); return; }
twdata.insert(data, function(error, count) {
if (error) { dberror(error); return; }
});
});
});
stream.on('error', function(error) {
timers.clearTimeout(timeoutID);
log('--STREAM ERROR--');
log(util.inspect(error)); // xxx
reconnectDelay = Math.min(reconnectDelay * 2, maxReconnectDelay);
log('trying to reconnect in '+(reconnectDelay / 1000)+' seconds');
timers.setTimeout(tweetStream, reconnectDelay);
});
stream.on('end', function(error) {
timers.clearTimeout(timeoutID);
log('--STREAM END--');
log('Status: '+error.statusCode);
log(util.inspect(error)); // xxx
reconnectDelay = Math.min(reconnectDelay * 2, maxReconnectDelay);
log('trying to reconnect in '+(reconnectDelay / 1000)+' seconds');
timers.setTimeout(tweetStream, reconnectDelay);
});
if (callback) callback();
});
}
// startup sequence
var callstack = [connectDB, tweetStream];
function workStack() {
if (callstack.length == 0) {
log("Finished startup");
return;
}
var f = callstack.shift();
f(workStack);
}
workStack();