-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrectangle_vehicles.py
116 lines (93 loc) · 3.31 KB
/
rectangle_vehicles.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""
NOTE: This is probably deprecated/unnecessary since we aren't rectangle-ing any RT data
via Python/GCS; it's going in BigQuery
"""
import os
from pathlib import Path
import pandas as pd
from google.protobuf import json_format
from google.transit import gtfs_realtime_pb2
def parse_pb(path):
"""
Convert pb file to Python dictionary
"""
feed = gtfs_realtime_pb2.FeedMessage()
feed.ParseFromString(open(path, "rb").read())
d = json_format.MessageToDict(feed)
return d
def pull_value(x, path):
"""
Safe extraction for pulling entity values
"""
crnt_obj = x
for attr in path.split("."):
try:
crnt_obj = crnt_obj[attr]
except KeyError:
return None
return crnt_obj
def get_header_details(x):
"""
Returns a dictionary of header values to be added to a dataframe
"""
return {"header_timestamp": pull_value(x, "header.timestamp")}
def get_entity_details(x):
"""
Returns a list of dicts containing entity values to be added to a dataframe
"""
entity = x.get("entity")
details = []
if entity is not None:
for e in entity:
d = {
"entity_id": pull_value(e, "id"),
"vehicle_id": pull_value(e, "vehicle.vehicle.id"),
"vehicle_trip_id": pull_value(e, "vehicle.trip.tripId"),
"vehicle_timestamp": pull_value(e, "vehicle.timestamp"),
"vehicle_position_latitude": pull_value(e, "vehicle.position.latitude"),
"vehicle_position_longitude": pull_value(
e, "vehicle.position.longitude"
),
}
details.append(d)
return details
def rectangle_positions(x):
"""
Create a vehicle positions dataframe from parsed pb files
"""
header_details = get_header_details(x)
entity_details = get_entity_details(x)
if len(entity_details) > 0:
rectangle = pd.DataFrame(entity_details)
for k, v in header_details.items():
rectangle[k] = v
return rectangle
else:
return None
# Files of interest
positions_pb = list(Path("data/bucket/126/rt").glob("vehicle_positions__*.pb"))
positions_parsed = [*map(parse_pb, positions_pb)]
positions_dfs = [*map(rectangle_positions, positions_parsed)]
positions_rectangle = pd.concat([df for df in positions_dfs if df is not None])
positions_rectangle.to_csv("output/positions_rectangle.csv", index=False)
# https://pandas.pydata.org/pandas-docs/version/1.1.5/reference/api/pandas.DataFrame.to_parquet.html
# Could be a benefit to use partition_cols
positions_rectangle.to_parquet("output/positions_rectangle.parquet", index=False)
# Raw Size in MB
raw_file_size = round(sum([*map(os.path.getsize, positions_pb)]) / (1024 * 1024), 2)
print("The raw vehicle positions pb files are {size} MB".format(size=raw_file_size))
print("")
csv_file_size = round(
os.path.getsize("output/positions_rectangle.csv") / (1024 * 1024), 2
)
print("The rectangle'd vehicle positions csv is {size} MB".format(size=csv_file_size))
print("")
parquet_file_size = round(
os.path.getsize("output/positions_rectangle.parquet") / (1024 * 1024), 2
)
print(
"The rectangle'd vehicle positions parquet is {size} MB".format(
size=parquet_file_size
)
)
# QUESTION: How will we get file name?