forked from fabioz/PyDev.Debugger
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_pydev_ipython_011.py
308 lines (257 loc) · 12.7 KB
/
test_pydev_ipython_011.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
import sys
import unittest
import threading
import os
from _pydev_bundle.pydev_imports import SimpleXMLRPCServer
from _pydev_bundle.pydev_localhost import get_localhost
from _pydev_bundle.pydev_console_utils import StdIn
import socket
import time
from _pydevd_bundle import pydevd_io
import pytest
def eq_(a, b):
if a != b:
raise AssertionError("%s != %s" % (a, b))
try:
from IPython import core
has_ipython = True
except:
has_ipython = False
@pytest.mark.skipif(not has_ipython, reason="IPython not available")
class TestBase(unittest.TestCase):
def setUp(self):
from _pydev_bundle.pydev_ipython_console_011 import get_pydev_frontend
# PyDevFrontEnd depends on singleton in IPython, so you
# can't make multiple versions. So we reuse self.front_end for
# all the tests
self.front_end = get_pydev_frontend(get_localhost(), 0)
from pydev_ipython.inputhook import set_return_control_callback
set_return_control_callback(lambda: True)
self.front_end.clear_buffer()
def tearDown(self):
pass
def add_exec(self, code, expected_more=False):
more = self.front_end.add_exec(code)
eq_(expected_more, more)
def redirect_stdout(self):
from IPython.utils import io
self.original_stdout = sys.stdout
sys.stdout = io.stdout = pydevd_io.IOBuf()
def restore_stdout(self):
from IPython.utils import io
io.stdout = sys.stdout = self.original_stdout
@pytest.mark.skipif(not has_ipython, reason="IPython not available")
class TestPyDevFrontEnd(TestBase):
def testAddExec_1(self):
self.add_exec("if True:", True)
def testAddExec_2(self):
# Change: 'more' must now be controlled in the client side after the initial 'True' returned.
self.add_exec("if True:\n testAddExec_a = 10\n", False)
assert "testAddExec_a" in self.front_end.get_namespace()
def testAddExec_3(self):
assert "testAddExec_x" not in self.front_end.get_namespace()
self.add_exec("if True:\n testAddExec_x = 10\n\n")
assert "testAddExec_x" in self.front_end.get_namespace()
eq_(self.front_end.get_namespace()["testAddExec_x"], 10)
def test_get_namespace(self):
assert "testGetNamespace_a" not in self.front_end.get_namespace()
self.add_exec("testGetNamespace_a = 10")
assert "testGetNamespace_a" in self.front_end.get_namespace()
eq_(self.front_end.get_namespace()["testGetNamespace_a"], 10)
def test_complete(self):
unused_text, matches = self.front_end.complete("%")
assert len(matches) > 1, "at least one magic should appear in completions"
def test_complete_does_not_do_python_matches(self):
# Test that IPython's completions do not do the things that
# PyDev's completions will handle
self.add_exec("testComplete_a = 5")
self.add_exec("testComplete_b = 10")
self.add_exec("testComplete_c = 15")
unused_text, matches = self.front_end.complete("testComplete_")
assert len(matches) == 0
def testGetCompletions_1(self):
# Test the merged completions include the standard completions
self.add_exec("testComplete_a = 5")
self.add_exec("testComplete_b = 10")
self.add_exec("testComplete_c = 15")
res = self.front_end.getCompletions("testComplete_", "testComplete_")
matches = [f[0] for f in res]
assert len(matches) == 3
eq_(set(["testComplete_a", "testComplete_b", "testComplete_c"]), set(matches))
def testGetCompletions_2(self):
# Test that we get IPython completions in results
# we do this by checking kw completion which PyDev does
# not do by default
self.add_exec("def ccc(ABC=123): pass")
res = self.front_end.getCompletions("ccc(", "")
matches = [f[0] for f in res]
assert "ABC=" in matches
def testGetCompletions_3(self):
# Test that magics return IPYTHON magic as type
res = self.front_end.getCompletions("%cd", "%cd")
assert len(res) == 1
eq_(res[0][3], "12") # '12' == IToken.TYPE_IPYTHON_MAGIC
assert len(res[0][1]) > 100, "docstring for %cd should be a reasonably long string"
@pytest.mark.skipif(not has_ipython, reason="IPython not available")
class TestRunningCode(TestBase):
def test_print(self):
self.redirect_stdout()
try:
self.add_exec('print("output")')
eq_(sys.stdout.getvalue(), "output\n")
finally:
self.restore_stdout()
def testQuestionMark_1(self):
self.redirect_stdout()
try:
self.add_exec("?")
found = sys.stdout.getvalue()
if len(found) < 1000:
raise AssertionError("Expected IPython help to be big. Found: %s" % (found,))
finally:
self.restore_stdout()
def testQuestionMark_2(self):
self.redirect_stdout()
try:
self.add_exec("int?")
found = sys.stdout.getvalue()
if "Convert" not in found:
raise AssertionError('Expected to find "Convert" in %s' % (found,))
finally:
self.restore_stdout()
def test_gui(self):
try:
import Tkinter
except:
return
else:
from pydev_ipython.inputhook import get_inputhook
assert get_inputhook() is None
self.add_exec("%gui tk")
# we can't test the GUI works here because we aren't connected to XML-RPC so
# nowhere for hook to run
assert get_inputhook() is not None
self.add_exec("%gui none")
assert get_inputhook() is None
def test_history(self):
"""Make sure commands are added to IPython's history"""
self.redirect_stdout()
try:
self.add_exec("a=1")
self.add_exec("b=2")
_ih = self.front_end.get_namespace()["_ih"]
eq_(_ih[-1], "b=2")
eq_(_ih[-2], "a=1")
self.add_exec("history")
hist = sys.stdout.getvalue().split("\n")
eq_(hist[-1], "")
eq_(hist[-2], "history")
eq_(hist[-3], "b=2")
eq_(hist[-4], "a=1")
finally:
self.restore_stdout()
def test_edit(self):
"""Make sure we can issue an edit command"""
if os.environ.get("TRAVIS") == "true":
# This test is too flaky on travis.
return
from _pydev_bundle.pydev_ipython_console_011 import get_pydev_frontend
called_RequestInput = [False]
called_IPythonEditor = [False]
def start_client_thread(client_port):
class ClientThread(threading.Thread):
def __init__(self, client_port):
threading.Thread.__init__(self)
self.client_port = client_port
def run(self):
class HandleRequestInput:
def RequestInput(self):
called_RequestInput[0] = True
return "\n"
def IPythonEditor(self, name, line):
called_IPythonEditor[0] = (name, line)
return True
handle_request_input = HandleRequestInput()
from _pydev_bundle import pydev_localhost
self.client_server = client_server = SimpleXMLRPCServer(
(pydev_localhost.get_localhost(), self.client_port), logRequests=False
)
client_server.register_function(handle_request_input.RequestInput)
client_server.register_function(handle_request_input.IPythonEditor)
client_server.serve_forever()
def shutdown(self):
return
self.client_server.shutdown()
client_thread = ClientThread(client_port)
client_thread.daemon = True
client_thread.start()
return client_thread
# PyDevFrontEnd depends on singleton in IPython, so you
# can't make multiple versions. So we reuse self.front_end for
# all the tests
s = socket.socket()
s.bind(("", 0))
self.client_port = client_port = s.getsockname()[1]
s.close()
self.front_end = get_pydev_frontend(get_localhost(), client_port)
client_thread = start_client_thread(self.client_port)
orig_stdin = sys.stdin
sys.stdin = StdIn(self, get_localhost(), self.client_port)
try:
filename = "made_up_file.py"
self.add_exec("%edit " + filename)
for i in range(10):
if called_IPythonEditor[0] == (os.path.abspath(filename), "0"):
break
time.sleep(0.1)
if not called_IPythonEditor[0]:
# File "/home/travis/miniconda/lib/python3.3/site-packages/IPython/core/interactiveshell.py", line 2883, in run_code
# exec(code_obj, self.user_global_ns, self.user_ns)
# File "<ipython-input-15-09583ca3bce1>", line 1, in <module>
# get_ipython().magic('edit made_up_file.py')
# File "/home/travis/miniconda/lib/python3.3/site-packages/IPython/core/interactiveshell.py", line 2205, in magic
# return self.run_line_magic(magic_name, magic_arg_s)
# File "/home/travis/miniconda/lib/python3.3/site-packages/IPython/core/interactiveshell.py", line 2126, in run_line_magic
# result = fn(*args,**kwargs)
# File "<string>", line 2, in edit
# File "/home/travis/miniconda/lib/python3.3/site-packages/IPython/core/magic.py", line 193, in <lambda>
# call = lambda f, *a, **k: f(*a, **k)
# File "/home/travis/miniconda/lib/python3.3/site-packages/IPython/core/magics/code.py", line 662, in edit
# self.shell.hooks.editor(filename,lineno)
# File "/home/travis/build/fabioz/PyDev.Debugger/pydev_ipython_console_011.py", line 70, in call_editor
# server.IPythonEditor(filename, str(line))
# File "/home/travis/miniconda/lib/python3.3/xmlrpc/client.py", line 1090, in __call__
# return self.__send(self.__name, args)
# File "/home/travis/miniconda/lib/python3.3/xmlrpc/client.py", line 1419, in __request
# verbose=self.__verbose
# File "/home/travis/miniconda/lib/python3.3/xmlrpc/client.py", line 1132, in request
# return self.single_request(host, handler, request_body, verbose)
# File "/home/travis/miniconda/lib/python3.3/xmlrpc/client.py", line 1143, in single_request
# http_conn = self.send_request(host, handler, request_body, verbose)
# File "/home/travis/miniconda/lib/python3.3/xmlrpc/client.py", line 1255, in send_request
# self.send_content(connection, request_body)
# File "/home/travis/miniconda/lib/python3.3/xmlrpc/client.py", line 1285, in send_content
# connection.endheaders(request_body)
# File "/home/travis/miniconda/lib/python3.3/http/client.py", line 1061, in endheaders
# self._send_output(message_body)
# File "/home/travis/miniconda/lib/python3.3/http/client.py", line 906, in _send_output
# self.send(msg)
# File "/home/travis/miniconda/lib/python3.3/http/client.py", line 844, in send
# self.connect()
# File "/home/travis/miniconda/lib/python3.3/http/client.py", line 822, in connect
# self.timeout, self.source_address)
# File "/home/travis/miniconda/lib/python3.3/socket.py", line 435, in create_connection
# raise err
# File "/home/travis/miniconda/lib/python3.3/socket.py", line 426, in create_connection
# sock.connect(sa)
# ConnectionRefusedError: [Errno 111] Connection refused
# I.e.: just warn that the test failing, don't actually fail.
sys.stderr.write(
"Test failed: this test is brittle in travis because sometimes the connection is refused (as above) and we do not have a callback.\n"
)
return
eq_(called_IPythonEditor[0], (os.path.abspath(filename), "0"))
assert called_RequestInput[0], "Make sure the 'wait' parameter has been respected"
finally:
sys.stdin = orig_stdin
client_thread.shutdown()