-
-
Notifications
You must be signed in to change notification settings - Fork 377
/
Copy pathtest_async.py
74 lines (60 loc) · 2.24 KB
/
test_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""Test async/await integration"""
import os
import time
import pytest
from .test_message_spec import validate_message
from .utils import TIMEOUT, execute, flush_channels, start_new_kernel
KC = KM = None
@pytest.fixture(autouse=True)
def _setup_env():
"""start the global kernel (if it isn't running) and return its client"""
global KM, KC
KM, KC = start_new_kernel()
flush_channels(KC)
yield
assert KC is not None
assert KM is not None
KC.stop_channels()
KM.shutdown_kernel(now=True)
def test_async_await():
flush_channels(KC)
msg_id, content = execute("import asyncio; await asyncio.sleep(0.1)", KC)
assert content["status"] == "ok", content
# FIXME: @pytest.mark.parametrize("asynclib", ["asyncio", "trio", "curio"])
@pytest.mark.skipif(os.name == "nt", reason="Cannot interrupt on Windows")
@pytest.mark.parametrize("asynclib", ["asyncio"])
def test_async_interrupt(asynclib, request):
assert KC is not None
assert KM is not None
try:
__import__(asynclib)
except ImportError:
pytest.skip("Requires %s" % asynclib)
request.addfinalizer(lambda: execute("%autoawait asyncio", KC))
flush_channels(KC)
msg_id, content = execute("%autoawait " + asynclib, KC)
assert content["status"] == "ok", content
flush_channels(KC)
msg_id = KC.execute(f"print('begin'); import {asynclib}; await {asynclib}.sleep(5)")
busy = KC.get_iopub_msg(timeout=TIMEOUT)
validate_message(busy, "status", msg_id)
assert busy["content"]["execution_state"] == "busy"
echo = KC.get_iopub_msg(timeout=TIMEOUT)
validate_message(echo, "execute_input")
# wait for the stream output to be sure kernel is in the async block
stream = ""
t0 = time.monotonic()
while True:
msg = KC.get_iopub_msg(timeout=TIMEOUT)
validate_message(msg, "stream")
stream += msg["content"]["text"]
assert "begin\n".startswith(stream)
if stream == "begin\n":
break
if time.monotonic() - t0 > TIMEOUT:
raise TimeoutError()
KM.interrupt_kernel()
reply = KC.get_shell_msg()["content"]
assert reply["status"] == "error", reply
assert reply["ename"] in {"CancelledError", "KeyboardInterrupt"}
flush_channels(KC)