-
Notifications
You must be signed in to change notification settings - Fork 290
/
Copy pathtest_provisioning.py
341 lines (273 loc) · 11.6 KB
/
test_provisioning.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
"""Test Provisioning"""
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import asyncio
import json
import os
import signal
import sys
from subprocess import PIPE
from typing import Any, Optional
import pytest
from jupyter_core import paths
from traitlets import Int, Unicode
from jupyter_client.connect import KernelConnectionInfo
from jupyter_client.kernelspec import KernelSpecManager, NoSuchKernel
from jupyter_client.launcher import launch_kernel
from jupyter_client.manager import AsyncKernelManager
from jupyter_client.provisioning import (
KernelProvisionerBase,
KernelProvisionerFactory,
LocalProvisioner,
)
from jupyter_client.provisioning.factory import EntryPoint
pjoin = os.path.join
class SubclassedTestProvisioner(LocalProvisioner): # type:ignore
config_var_1: int = Int(config=True) # type:ignore
config_var_2: str = Unicode(config=True) # type:ignore
pass
class CustomTestProvisioner(KernelProvisionerBase): # type:ignore
process = None
pid = None
pgid = None
config_var_1: int = Int(config=True) # type:ignore
config_var_2: str = Unicode(config=True) # type:ignore
@property
def has_process(self) -> bool:
return self.process is not None
async def poll(self) -> Optional[int]:
ret = 0
if self.process:
ret = self.process.poll()
return ret
async def wait(self) -> Optional[int]:
ret = 0
if self.process:
while await self.poll() is None:
await asyncio.sleep(0.1)
# Process is no longer alive, wait and clear
ret = self.process.wait()
# Make sure all the fds get closed.
for attr in ["stdout", "stderr", "stdin"]:
fid = getattr(self.process, attr)
if fid:
fid.close()
self.process = None
return ret
async def send_signal(self, signum: int) -> None:
if self.process:
if signum == signal.SIGINT and sys.platform == "win32":
from jupyter_client.win_interrupt import send_interrupt
send_interrupt(self.process.win32_interrupt_event)
return
# Prefer process-group over process
if self.pgid and hasattr(os, "killpg"):
try:
os.killpg(self.pgid, signum)
return
except OSError:
pass
return self.process.send_signal(signum)
async def kill(self, restart: bool = False) -> None:
if self.process:
self.process.kill()
async def terminate(self, restart: bool = False) -> None:
if self.process:
self.process.terminate()
async def pre_launch(self, **kwargs: Any) -> dict[str, Any]:
km = self.parent
if km:
# save kwargs for use in restart
km._launch_args = kwargs.copy()
# build the Popen cmd
extra_arguments = kwargs.pop("extra_arguments", [])
# write connection file / get default ports
km.write_connection_file()
self.connection_info = km.get_connection_info()
kernel_cmd = km.format_kernel_cmd(
extra_arguments=extra_arguments
) # This needs to remain here for b/c
return await super().pre_launch(cmd=kernel_cmd, **kwargs)
return {}
async def launch_kernel(self, cmd: list[str], **kwargs: Any) -> KernelConnectionInfo:
scrubbed_kwargs = kwargs
self.process = launch_kernel(cmd, **scrubbed_kwargs)
pgid = None
if hasattr(os, "getpgid"):
try:
pgid = os.getpgid(self.process.pid)
except OSError:
pass
self.pid = self.process.pid
self.pgid = pgid
return self.connection_info
async def cleanup(self, restart: bool = False) -> None:
pass
class NewTestProvisioner(CustomTestProvisioner): # type:ignore
pass
def build_kernelspec(name: str, provisioner: Optional[str] = None) -> None:
spec: dict = {
"argv": [
sys.executable,
"-m",
"tests.signalkernel",
"-f",
"{connection_file}",
],
"display_name": f"Signal Test Kernel w {provisioner}",
"env": {"TEST_VARS": "${TEST_VARS}:test_var_2"},
"metadata": {},
}
if provisioner:
kernel_provisioner = {"kernel_provisioner": {"provisioner_name": provisioner}}
spec["metadata"].update(kernel_provisioner)
if provisioner != "local-provisioner":
spec["metadata"]["kernel_provisioner"]["config"] = {
"config_var_1": 42,
"config_var_2": name,
}
kernel_dir = pjoin(paths.jupyter_data_dir(), "kernels", name)
os.makedirs(kernel_dir)
with open(pjoin(kernel_dir, "kernel.json"), "w") as f:
f.write(json.dumps(spec))
def new_provisioner():
build_kernelspec("new_provisioner", "new-test-provisioner")
def custom_provisioner():
build_kernelspec("custom_provisioner", "custom-test-provisioner")
@pytest.fixture
def all_provisioners():
build_kernelspec("no_provisioner")
build_kernelspec("missing_provisioner", "missing-provisioner")
build_kernelspec("default_provisioner", "local-provisioner")
build_kernelspec("subclassed_provisioner", "subclassed-test-provisioner")
custom_provisioner()
@pytest.fixture(
params=[
"no_provisioner",
"default_provisioner",
"missing_provisioner",
"custom_provisioner",
"subclassed_provisioner",
]
)
def akm(request, all_provisioners):
return AsyncKernelManager(kernel_name=request.param)
initial_provisioner_map = {
"local-provisioner": "jupyter_client.provisioning:LocalProvisioner",
"subclassed-test-provisioner": "tests.test_provisioning:SubclassedTestProvisioner",
"custom-test-provisioner": "tests.test_provisioning:CustomTestProvisioner",
}
def mock_get_all_provisioners() -> list[EntryPoint]:
result = []
for name, epstr in initial_provisioner_map.items():
result.append(EntryPoint(name, epstr, KernelProvisionerFactory.GROUP_NAME))
return result
def mock_get_provisioner(_: str, name: str) -> EntryPoint:
if name == "new-test-provisioner":
return EntryPoint(
"new-test-provisioner",
"tests.test_provisioning:NewTestProvisioner",
KernelProvisionerFactory.GROUP_NAME,
)
if name in initial_provisioner_map:
return EntryPoint(name, initial_provisioner_map[name], KernelProvisionerFactory.GROUP_NAME)
raise ValueError("No such entry point")
@pytest.fixture
def kpf(monkeypatch):
"""Setup the Kernel Provisioner Factory, mocking the entrypoint fetch calls."""
monkeypatch.setattr(
KernelProvisionerFactory, "_get_all_provisioners", mock_get_all_provisioners
)
monkeypatch.setattr(KernelProvisionerFactory, "_get_provisioner", mock_get_provisioner)
factory = KernelProvisionerFactory.instance()
return factory
class TestDiscovery:
def test_find_all_specs(self, kpf, all_provisioners):
ksm = KernelSpecManager()
kernels = ksm.get_all_specs()
# Ensure specs for initial provisioners exist,
# and missing_provisioner & new_provisioner don't
assert "no_provisioner" in kernels
assert "default_provisioner" in kernels
assert "subclassed_provisioner" in kernels
assert "custom_provisioner" in kernels
assert "missing_provisioner" not in kernels
assert "new_provisioner" not in kernels
def test_get_missing(self, all_provisioners):
ksm = KernelSpecManager()
with pytest.raises(NoSuchKernel):
ksm.get_kernel_spec("missing_provisioner")
def test_get_new(self, kpf):
new_provisioner() # Introduce provisioner after initialization of KPF
ksm = KernelSpecManager()
kernel = ksm.get_kernel_spec("new_provisioner")
assert kernel.metadata["kernel_provisioner"]["provisioner_name"] == "new-test-provisioner"
class TestRuntime:
async def akm_test(self, kernel_mgr):
"""Starts a kernel, validates the associated provisioner's config, shuts down kernel"""
assert kernel_mgr.provisioner is None
if kernel_mgr.kernel_name == "missing_provisioner":
with pytest.raises(NoSuchKernel):
await kernel_mgr.start_kernel()
else:
await kernel_mgr.start_kernel()
TestRuntime.validate_provisioner(kernel_mgr)
await kernel_mgr.shutdown_kernel()
assert kernel_mgr.provisioner is not None
assert kernel_mgr.provisioner.has_process is False
async def test_existing(self, kpf, akm):
await self.akm_test(akm)
async def test_new(self, kpf):
new_provisioner() # Introduce provisioner after initialization of KPF
new_km = AsyncKernelManager(kernel_name="new_provisioner")
await self.akm_test(new_km)
async def test_custom_lifecycle(self, kpf):
custom_provisioner()
async_km = AsyncKernelManager(kernel_name="custom_provisioner")
await async_km.start_kernel(stdout=PIPE, stderr=PIPE)
is_alive = await async_km.is_alive()
assert is_alive
await async_km.restart_kernel(now=True)
is_alive = await async_km.is_alive()
assert is_alive
await async_km.interrupt_kernel()
assert isinstance(async_km, AsyncKernelManager)
await async_km.shutdown_kernel(now=True)
is_alive = await async_km.is_alive()
assert is_alive is False
assert async_km.context.closed
async def test_default_provisioner_config(self, kpf, all_provisioners):
kpf.default_provisioner_name = "custom-test-provisioner"
async_km = AsyncKernelManager(kernel_name="no_provisioner")
await async_km.start_kernel(stdout=PIPE, stderr=PIPE)
is_alive = await async_km.is_alive()
assert is_alive
assert isinstance(async_km.provisioner, CustomTestProvisioner)
assert async_km.provisioner.config_var_1 == 0 # Not in kernelspec, so default of 0 exists
await async_km.shutdown_kernel(now=True)
is_alive = await async_km.is_alive()
assert is_alive is False
assert async_km.context.closed
@staticmethod
def validate_provisioner(akm: AsyncKernelManager) -> None:
# Ensure the provisioner is managing a process at this point
assert akm.provisioner is not None and akm.provisioner.has_process
# Validate provisioner config
if akm.kernel_name in ["no_provisioner", "default_provisioner"]:
assert not hasattr(akm.provisioner, "config_var_1")
assert not hasattr(akm.provisioner, "config_var_2")
else:
assert akm.provisioner.config_var_1 == 42 # type:ignore
assert akm.provisioner.config_var_2 == akm.kernel_name # type:ignore
# Validate provisioner class
if akm.kernel_name in ["no_provisioner", "default_provisioner", "subclassed_provisioner"]:
assert isinstance(akm.provisioner, LocalProvisioner)
if akm.kernel_name == "subclassed_provisioner":
assert isinstance(akm.provisioner, SubclassedTestProvisioner)
else:
assert not isinstance(akm.provisioner, SubclassedTestProvisioner)
else:
assert isinstance(akm.provisioner, CustomTestProvisioner)
assert not isinstance(akm.provisioner, LocalProvisioner)
if akm.kernel_name == "new_provisioner":
assert isinstance(akm.provisioner, NewTestProvisioner)