Skip to content
This repository was archived by the owner on Jan 1, 2023. It is now read-only.

Commit 0d8b89f

Browse files
committed
Initial commit
0 parents  commit 0d8b89f

File tree

4 files changed

+306
-0
lines changed

4 files changed

+306
-0
lines changed

.gitignore

Whitespace-only changes.

README.md

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# ImpfChecker
2+
3+
Basic checker for Bavaria's booking portal for vaccination appointments.
4+
Checks the portal for the first available appointment and prints it.
5+
6+
## Requirements
7+
8+
- You need to be registered with the portal at https://impfzentren.bayern/citizen/
9+
- You need to have selected a vaccination centre in the portal
10+
- Python 3
11+
12+
## How to run
13+
14+
### Install the dependencies
15+
16+
Just run
17+
```shell
18+
pip install -r requirements.txt
19+
```
20+
21+
### Run the checker
22+
23+
The checker requires your username, password, and citizen ID in the environment variables `USER`, `PASSWORD`,
24+
and `CITIZEN_ID`, respectively.
25+
26+
You can find your citizen ID if you login to the portal and select the person. The address bar in your browser contains
27+
the ID in the following format: `https://impfzentren.bayern/citizen/overview/{CITIZEN_ID}`.
28+
29+
```shell
30+
python impf.py
31+
```

impf.py

+273
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
import json
2+
import logging
3+
import os
4+
from datetime import date
5+
from typing import Dict, Optional, Union
6+
from urllib.parse import urlsplit, urlencode, urlunsplit, parse_qs
7+
from uuid import uuid4
8+
9+
import requests
10+
from bs4 import BeautifulSoup
11+
from requests import Response, HTTPError
12+
13+
logging.basicConfig(
14+
format="%(asctime)s %(levelname)-8s %(message)s",
15+
level=logging.DEBUG,
16+
datefmt="%Y-%m-%d %H:%M:%S",
17+
)
18+
19+
20+
def url_with_params(url: str, query_params: Dict[str, str]) -> str:
21+
scheme, netloc, path, _, fragment = urlsplit(url)
22+
query_string = urlencode(query_params, doseq=True)
23+
return urlunsplit((scheme, netloc, path, query_string, fragment))
24+
25+
26+
class ImpfChecker:
27+
MAIN_PAGE = "https://impfzentren.bayern/citizen/"
28+
LOGIN_URL = "https://ciam.impfzentren.bayern/auth/realms/C19V-Citizen/protocol/openid-connect/auth"
29+
TOKEN_URL = "https://ciam.impfzentren.bayern/auth/realms/C19V-Citizen/protocol/openid-connect/token"
30+
APPOINTMENTS_URL_FORMAT = (
31+
"https://impfzentren.bayern/api/v1/citizens/{}/appointments"
32+
)
33+
34+
@property
35+
def auth_token(self):
36+
if self._auth_token is None:
37+
try:
38+
self._auth_token = self._login()
39+
except HTTPError:
40+
logging.error("Login failed.")
41+
exit(1)
42+
return self._auth_token
43+
44+
def __init__(self, username: str, password: str, citizen_id: str):
45+
self._user = username
46+
self._password = password
47+
self.citizen_id = citizen_id
48+
self.session = requests.Session()
49+
self._auth_token = None
50+
51+
def _submit_form(
52+
self,
53+
url: str,
54+
body: Dict[str, str],
55+
allow_redirects: bool = True,
56+
) -> Response:
57+
"""
58+
Submits a form to the API
59+
60+
:param url: The endpoint to post the form to
61+
:param body: The form fields and their values
62+
:param allow_redirects: Whether or not to follow redirects returned by the API
63+
64+
:return: The full Response object
65+
"""
66+
67+
return self.session.post(
68+
url,
69+
headers=self._headers(
70+
**{"Content-Type": "application/x-www-form-urlencoded"}
71+
),
72+
data=body,
73+
allow_redirects=allow_redirects,
74+
)
75+
76+
def _headers(self, with_auth: bool = False, **additional_headers):
77+
"""
78+
:param with_auth: Whether or not to include the Authorization header
79+
:param additional_headers: A mapping of additional headers to add
80+
81+
:return: a dictionary of headers required to make calls to the API
82+
"""
83+
84+
headers = {
85+
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:95.0) Gecko/20100101 Firefox/95.0",
86+
"Accept": "*/*",
87+
}
88+
89+
if with_auth:
90+
headers["Authorization"] = f"Bearer {self.auth_token}"
91+
92+
headers.update(**additional_headers)
93+
return headers
94+
95+
@property
96+
def _login_url(self) -> str:
97+
query_params = dict(
98+
client_id="c19v-frontend",
99+
redirect_uri=self.MAIN_PAGE,
100+
state=uuid4(),
101+
response_mode="fragment",
102+
response_type="code",
103+
scope="openid",
104+
nonce=uuid4(),
105+
ui_locales="de",
106+
)
107+
return url_with_params(self.LOGIN_URL, query_params)
108+
109+
def _get_login_action(self) -> str:
110+
"""
111+
:return: The URL of the endpoint to which the login form posts
112+
"""
113+
login_form_rsp = self.session.get(self._login_url, headers=self._headers())
114+
login_form_rsp.raise_for_status()
115+
return BeautifulSoup(login_form_rsp.text, "html.parser").find(
116+
id="kc-form-login"
117+
)["action"]
118+
119+
def _login(self):
120+
"""
121+
Attempts to log the user in
122+
123+
:raise HTTPError if unsuccessful
124+
:return: The authentication token
125+
"""
126+
login_resp = self._submit_form(
127+
self._get_login_action(),
128+
{
129+
"username": self._user,
130+
"password": self._password,
131+
"credentialId": "",
132+
},
133+
allow_redirects=False,
134+
)
135+
136+
login_resp.raise_for_status()
137+
138+
_, state = login_resp.headers.get("Location").split("#", maxsplit=1)
139+
code = parse_qs(state)["code"][0]
140+
141+
token_rsp = self._submit_form(
142+
self.TOKEN_URL,
143+
{
144+
"code": code,
145+
"grant_type": "authorization_code",
146+
"client_id": "c19v-frontend",
147+
"redirect_uri": "https://impfzentren.bayern/citizen/",
148+
},
149+
)
150+
token_rsp.raise_for_status()
151+
return token_rsp.json()["access_token"]
152+
153+
def _appointments_url(self, resource: Optional[str] = None):
154+
return self.APPOINTMENTS_URL_FORMAT.format(self.citizen_id) + (
155+
resource if resource is not None else ""
156+
)
157+
158+
def _find_appointment(self, earliest_day) -> Optional[Dict]:
159+
"""
160+
Finds an appointment in the user's vaccination centre
161+
162+
:param earliest_day: The earliest acceptable day in ISO format (YYYY-MM-DD)
163+
164+
:return: The JSON payload if an appointment was found, otherwise None
165+
"""
166+
appt_rsp = self.session.get(
167+
url_with_params(
168+
self._appointments_url("/next"),
169+
{
170+
"timeOfDay": "ALL_DAY",
171+
"lastDate": earliest_day,
172+
"lastTime": "00:00",
173+
},
174+
),
175+
headers=self._headers(with_auth=True),
176+
)
177+
178+
if appt_rsp.status_code == 404:
179+
return None
180+
181+
return appt_rsp.json()
182+
183+
def find(self, earliest_day: Optional[str] = None, *, book: bool = False) -> bool:
184+
"""
185+
Finds an appointment in the user's vaccination centre
186+
187+
:param earliest_day: The earliest acceptable day in ISO format (YYYY-MM-DD)
188+
:param book: Whether or not to book the appointment
189+
190+
:return: False if no appointment found or booking failed.
191+
True if the booking was successful or an appointment was found and no booking requested.
192+
"""
193+
if earliest_day is None:
194+
earliest_day = date.today().isoformat()
195+
196+
appt = self._find_appointment(earliest_day)
197+
if appt is None:
198+
logging.info("No appointment available")
199+
return False
200+
201+
logging.info("Found appointment: %s", json.dumps(appt))
202+
203+
if book:
204+
return self._book(appt)
205+
206+
return True
207+
208+
def _book(self, payload: Dict[str, Union[str, Dict[str, bool]]]) -> bool:
209+
"""
210+
Books an appointment
211+
212+
:param payload: The appointment payload as returned by the _find() method
213+
:return: True if the booking was successful, False otherwise
214+
"""
215+
216+
payload["reminderChannel"] = {
217+
"reminderByEmail": True,
218+
"reminderBySms": True,
219+
}
220+
221+
book_rsp = self.session.post(
222+
self._appointments_url(), payload, headers=self._headers(with_auth=True)
223+
)
224+
225+
if book_rsp.status_code != 200:
226+
logging.error(f"Error booking appointment. Status %d", book_rsp.status_code)
227+
return False
228+
229+
logging.info("Appointment booked.")
230+
return True
231+
232+
def print_appointments(self):
233+
"""
234+
Prints the upcoming appointments
235+
"""
236+
237+
appts_rsp = self.session.get(
238+
self._appointments_url(), headers=self._headers(with_auth=True)
239+
)
240+
241+
if appts_rsp.status_code != 200:
242+
logging.error("Error retrieving appointments")
243+
return
244+
245+
appts = appts_rsp.json().get("futureAppointments")
246+
247+
if not appts:
248+
logging.info("No appointments found")
249+
return
250+
251+
for appt in appts:
252+
address = appt["site"]["address"]
253+
254+
logging.info(
255+
"Upcoming appointment at %s (%s) on %s at %s",
256+
appt["site"]["name"],
257+
"{} {}, {} {}".format(
258+
address["street"],
259+
address["streetNumber"],
260+
address["zip"],
261+
address["city"],
262+
),
263+
appt["slotId"]["date"],
264+
appt["slotId"]["time"],
265+
)
266+
267+
268+
checker = ImpfChecker(
269+
username=os.environ.get("USER"),
270+
password=os.environ.get("PASSWORD"),
271+
citizen_id=os.environ.get("CITIZEN_ID"),
272+
)
273+
checker.find()

requirements.txt

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
beautifulsoup4==4.10.0
2+
requests==2.26.0

0 commit comments

Comments
 (0)