|
| 1 | +import logging |
| 2 | + |
| 3 | +from tornado import gen |
| 4 | +from tornado import ioloop |
| 5 | +from tornado import web |
| 6 | +from tornado.escape import json_encode |
| 7 | +from tornado.options import define, options |
| 8 | + |
| 9 | +from . import Fib, WeiboClient |
| 10 | + |
| 11 | +weibo_public_timeline_url = 'https://api.weibo.com/2/statuses/public_timeline.json?access_token={}' |
| 12 | +access_logger = logging.getLogger('tornado.access') |
| 13 | +app_logger = logging.getLogger('tornado.application') |
| 14 | +sleep_duration_origin = 3 |
| 15 | +sleep_duration_increase_factor = 2 |
| 16 | + |
| 17 | +# chunked transfer encoding delimiter |
| 18 | +CRLF = '\r\n' |
| 19 | + |
| 20 | +# define tornado command line arguments |
| 21 | +define('weibo_access_token') |
| 22 | +define('debug', default=False) |
| 23 | + |
| 24 | + |
| 25 | +def remote_ip(request): |
| 26 | + return request.headers.get('X-Real-IP') or request.remote_ip |
| 27 | + |
| 28 | + |
| 29 | +# noinspection PyAbstractClass |
| 30 | +class MainHandler(web.RequestHandler): |
| 31 | + def get(self): |
| 32 | + self.write(''' |
| 33 | + access /public_timeline to get a weibo public status stream |
| 34 | + ''') |
| 35 | + |
| 36 | + |
| 37 | +# noinspection PyAbstractClass |
| 38 | +class PublicTimelineHandler(web.RequestHandler): |
| 39 | + @gen.coroutine |
| 40 | + def get(self): |
| 41 | + client = WeiboClient(options.weibo_access_token) |
| 42 | + access_logger.info('start streaming to {}'.format(remote_ip(self.request))) |
| 43 | + self.set_header('transfer-encoding', 'chunked') |
| 44 | + self.set_header('content-type', 'application/json; charset=utf-8') |
| 45 | + fib = Fib() |
| 46 | + |
| 47 | + while True: |
| 48 | + statuses = yield client.public_timeline() |
| 49 | + if not self.request.connection.stream.closed(): |
| 50 | + statuses_count = len(statuses) |
| 51 | + if statuses_count > 0: |
| 52 | + app_logger.info('received {} new statuses'.format(statuses_count)) |
| 53 | + for s in statuses: |
| 54 | + chunked = json_encode(s) |
| 55 | + chunked_size = len(chunked) |
| 56 | + self.write('{:x}{}'.format(chunked_size + 1, CRLF)) |
| 57 | + self.write('{}\n{}'.format(chunked, CRLF)) |
| 58 | + self.flush() |
| 59 | + app_logger.info('last id updated to {}'.format(client.last_id)) |
| 60 | + fib.reset() |
| 61 | + sleep_duration = fib.next() |
| 62 | + else: |
| 63 | + sleep_duration = fib.next() |
| 64 | + app_logger.warn('no new statuses (sleeping {} seconds)'.format(sleep_duration)) |
| 65 | + |
| 66 | + yield gen.sleep(sleep_duration) |
| 67 | + else: |
| 68 | + # access_logger.info('stop streaming to {}'.format(remote_ip(self.request)) |
| 69 | + return |
| 70 | + # self.write('0' + CRLF) |
| 71 | + # self.write(CRLF) |
| 72 | + |
| 73 | + def on_connection_close(self): |
| 74 | + access_logger.info('close connection to {}'.format(remote_ip(self.request))) |
| 75 | + self.finish() |
| 76 | + |
| 77 | + |
| 78 | +if __name__ == '__main__': |
| 79 | + options.parse_command_line() |
| 80 | + |
| 81 | + app = web.Application([ |
| 82 | + (r'/', MainHandler), |
| 83 | + (r'/public_timeline', PublicTimelineHandler) |
| 84 | + ], |
| 85 | + debug=options.debug) |
| 86 | + # listen to default HTTP port |
| 87 | + app.listen(80) |
| 88 | + ioloop.IOLoop.current().start() |
0 commit comments