This repository has been archived by the owner on Jan 25, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathvalidator.py
124 lines (101 loc) · 3.71 KB
/
validator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import os
import zlib
from threading import Timer
from typing import Callable, Dict
import cbor2
import cwt
from base45 import b45decode
from cwt import Claims
from cert_loaders.at import CertificateLoader_AT
from cert_loaders.at_test import CertificateLoader_AT_TEST
from cert_loaders.de import CertificateLoader_DE
from cert_loaders.test import CertificateLoader_XX
class DCCValidator():
def __init__(self, country, certs=None, dev_mode=False):
self.CERT_LOADERS: Dict[str, Callable[[], None]] = {
'DE': CertificateLoader_DE,
'AT': CertificateLoader_AT,
'AT_TEST': CertificateLoader_AT_TEST,
'XX': CertificateLoader_XX
}
self.DEV_MODE = dev_mode
# initiates the certificate loader if no certificate is passed to the object
# passing the certs to the object will not initiate the certificate loader
# this is useful for testing
if certs is None:
self._cert_loader = self._get_cert_loader(country)()
# loads the certificates from the loader instance
self._certs = self._cert_loader()
else:
self._certs = certs
print("Loaded %i certificates from %s certificate service." %
(len(self._certs), country))
self._start_update_timer()
def validate(self, dcc):
dcc = self._decode(dcc)
if dcc is None:
return [False, None]
try:
decoded = cwt.decode(dcc, keys=self._certs)
claims = Claims.new(decoded)
return [True, claims.to_dict()]
except Exception as e:
try:
decoded_noverify = cbor2.loads(dcc)
decoded_noverify = cbor2.loads(decoded_noverify.value[2])
print("Could not validate certificate.")
except Exception as e:
print("Could not decode certificate.")
return [False, {}]
return [False, decoded_noverify]
def _decode(self, dcc):
dcc = dcc.encode()
if dcc.startswith(b'HC1:'):
dcc = dcc[4:]
try:
dcc = b45decode(dcc)
except Exception as e:
if self.DEV_MODE:
print(e)
if (e is ValueError):
return None
if dcc.startswith(b'x'):
try:
dcc = zlib.decompress(dcc)
except Exception as e:
if self.DEV_MODE:
print(e)
return None
return dcc
def _get_cert_loader(self, country):
return self.CERT_LOADERS[country]
def get_business_rules(self):
return self._cert_loader.rules
def get_status(self):
return "{status}"
def update_certs(self):
print ("Update timer ran out. Updating certificates.")
self._cert_loader.update_certs()
self.certs = self._cert_loader()
self._update_timer.cancel()
self._start_update_timer()
def _start_update_timer(self):
"""
The certificate lists should be updated every day.
This starts a timer to refresh them continously.
"""
self._update_timer = Timer(86400, self.update_certs)
self._update_timer.daemon = True
self._update_timer.start()
def main(dev_mode=False):
# Retrieve the requrested country for the certificates
# from env or fall back to DE
CERT_COUNTRY = os.getenv("CERT_COUNTRY", "AT")
# Retrieve DCC from env or fall back on empty string
DCC = os.getenv("DCC", "")
# Initiate the DSCValidator
validator = DCCValidator(country=CERT_COUNTRY, dev_mode=dev_mode)
# Validate the DCC
print(validator.validate(DCC))
if __name__ == '__main__':
main(True)