-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
125 lines (100 loc) · 3.24 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import requests
import zipfile
import datetime
import xmltodict
import schedule
import os
import json
import time
import xml.etree.ElementTree as ET
from flask import Flask, jsonify
from flask_compress import Compress
from multiprocessing import Process, Manager
from io import BytesIO
print('Starting puller for DfT BODS...')
app = Flask(__name__)
Compress(app)
def fetchUpdate(operatorList=['GNEL']):
print('Fetching update from DfT BODS...')
try:
r = requests.get('https://data.bus-data.dft.gov.uk/avl/download/bulk_archive')
except:
print('Unable to obtain updated bulk archive from DfT.')
return
zipdata = BytesIO()
zipdata.write(r.content)
print('Extracting ZIP data...')
ziphandler = zipfile.ZipFile(zipdata)
siri = ziphandler.open('siri.xml')
print('Decoding ZIP file...')
siriXml = siri.read().decode('utf-8')
print('Parsing XML tree...')
siriNode = ET.fromstring(siriXml)
print('Processing XML data...')
siriNs = 'http://www.siri.org.uk/siri'
ns = {'siri': siriNs}
activityLocal = []
for op in operatorList:
print('Finding vehicles for NOC code "%s"...' % op)
operatorFilter = ".='%s'" % op
xpathSelector = [
'siri:ServiceDelivery',
'siri:VehicleMonitoringDelivery',
'siri:VehicleActivity',
'siri:MonitoredVehicleJourney',
'siri:OperatorRef[%s]' % operatorFilter,
'..',
'..'
]
activityNodes = siriNode.findall('./%s' % '/'.join(xpathSelector), namespaces=ns)
for activity in activityNodes:
activityDict = xmltodict.parse(ET.tostring(activity, encoding='utf8', method='xml', default_namespace=siriNs))
activityLocal.append(activityDict)
print('Identified %u buses in the North East.' % len(activityLocal))
return activityLocal
#print(json.dumps(activityLocal, indent=2))
def update(operatorList, returns):
activityLocal = fetchUpdate(operatorList=operatorList)
returns['localBuses'] = activityLocal
def startUpdate(operatorList, returns):
p = Process(target=update, args=(operatorList, returns,))
p.start()
@app.route('/vm', methods=['GET'])
def serveBuses():
global returns
response = jsonify(returns['localBuses'])
response.headers.add('Access-Control-Allow-Origin', '*')
return response
def scheduler(nocTable, returns):
schedule.every(10).seconds.do(startUpdate, nocTable, returns)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
# List of operators to include, from the NOC tables
nocTable = [
# Go North East
'GNEL',
# Stagecoach North East
'SCHA',
'SCLV',
'SCNE',
'SCNS',
'SCSS',
'SCSU',
'SCTE',
# Arriva
'ANEA',
'ANUM',
'ARDU'
]
manager = Manager()
returns = manager.dict()
# Do an initial update
startUpdate(operatorList=nocTable, returns=returns)
# Start a separate process for handling future updates
ps = Process(target=scheduler, args=(nocTable, returns, ))
ps.start()
# Start serving web requests
from waitress import serve
serve(app, host="0.0.0.0", port=5000)