-
Notifications
You must be signed in to change notification settings - Fork 290
/
Copy pathsignalkernel.py
95 lines (79 loc) · 3.34 KB
/
signalkernel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""Test kernel for signalling subprocesses"""
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import os
import signal
import time
from subprocess import PIPE, Popen
from ipykernel.displayhook import ZMQDisplayHook
from ipykernel.kernelapp import IPKernelApp
from ipykernel.kernelbase import Kernel
class SignalTestKernel(Kernel):
"""Kernel for testing subprocess signaling"""
implementation = "signaltest"
implementation_version = "0.0"
banner = ""
def __init__(self, **kwargs):
kwargs.pop("user_ns", None)
super().__init__(**kwargs)
self.children = []
if os.environ.get("NO_SIGTERM_REPLY", None) == "1":
signal.signal(signal.SIGTERM, signal.SIG_IGN)
async def shutdown_request(self, stream, ident, parent):
if os.environ.get("NO_SHUTDOWN_REPLY") != "1":
await super().shutdown_request(stream, ident, parent)
async def do_execute(
self, code, silent, store_history=True, user_expressions=None, allow_stdin=False
):
code = code.strip()
reply: dict = {
"status": "ok",
"user_expressions": {},
}
if code == "start":
child = Popen(["bash", "-i", "-c", "sleep 30"], stderr=PIPE) # noqa
self.children.append(child)
reply["user_expressions"]["pid"] = self.children[-1].pid
elif code == "check":
reply["user_expressions"]["poll"] = [child.poll() for child in self.children]
elif code == "env":
reply["user_expressions"]["env"] = os.getenv("TEST_VARS", "")
elif code == "sleep":
import ipykernel
if ipykernel.version_info < (7, 0):
# ipykernel before anyio.
try:
time.sleep(10)
except KeyboardInterrupt:
reply["user_expressions"]["interrupted"] = True
else:
reply["user_expressions"]["interrupted"] = False
else:
# ipykernel after anyio.
from anyio import create_task_group, open_signal_receiver, sleep
async def signal_handler(cancel_scope, reply):
with open_signal_receiver(signal.SIGINT) as signals:
async for _ in signals:
reply["user_expressions"]["interrupted"] = True
cancel_scope.cancel()
return
reply["user_expressions"]["interrupted"] = False
async with create_task_group() as tg:
tg.start_soon(signal_handler, tg.cancel_scope, reply)
tg.start_soon(sleep, 10)
else:
reply["status"] = "error"
reply["ename"] = "Error"
reply["evalue"] = code
reply["traceback"] = ["no such command: %s" % code]
return reply
class SignalTestApp(IPKernelApp):
kernel_class = SignalTestKernel # type:ignore[assignment]
def init_io(self):
# Overridden to disable stdout/stderr capture
self.displayhook = ZMQDisplayHook(self.session, self.iopub_socket)
if __name__ == "__main__":
# make startup artificially slow,
# so that we exercise client logic for slow-starting kernels
time.sleep(2)
SignalTestApp.launch_instance()