|
62 | 62 | PyBytes_Size,
|
63 | 63 | PyErr_CheckSignals,
|
64 | 64 | )
|
| 65 | +from cython.cimports.cpython.buffer import Py_buffer, PyBuffer_IsContiguous |
65 | 66 | from cython.cimports.cpython.memoryview import PyMemoryView_GET_BUFFER
|
66 | 67 | from cython.cimports.libc.errno import EAGAIN, EINTR, ENAMETOOLONG, ENOENT, ENOTSOCK
|
67 | 68 |
|
|
141 | 142 | )
|
142 | 143 | from cython.cimports.zmq.backend.cython.libzmq import zmq_errno as _zmq_errno
|
143 | 144 | from cython.cimports.zmq.backend.cython.libzmq import zmq_poll as zmq_poll_c
|
144 |
| -from cython.cimports.zmq.utils.buffers import asbuffer_r |
145 | 145 |
|
146 | 146 | import zmq
|
147 | 147 | from zmq.constants import SocketOption, _OptType
|
@@ -247,6 +247,18 @@ def _copy_zmq_msg_bytes(zmq_msg: pointer(zmq_msg_t)) -> bytes:
|
247 | 247 | return PyBytes_FromStringAndSize(data_c, data_len_c)
|
248 | 248 |
|
249 | 249 |
|
| 250 | +@cfunc |
| 251 | +@inline |
| 252 | +def _asbuffer(buf, data_c: pointer(p_void)) -> size_t: |
| 253 | + """Get a C buffer from a memoryview""" |
| 254 | + view = memoryview(buf) |
| 255 | + pybuf: pointer(Py_buffer) = PyMemoryView_GET_BUFFER(view) |
| 256 | + if not PyBuffer_IsContiguous(pybuf, 'A'): |
| 257 | + raise BufferError("memoryview: underlying buffer is not contiguous") |
| 258 | + data_c[0] = pybuf.buf |
| 259 | + return pybuf.len |
| 260 | + |
| 261 | + |
250 | 262 | _gc = None
|
251 | 263 |
|
252 | 264 |
|
@@ -288,7 +300,7 @@ def __init__(
|
288 | 300 | self._failed_init = False
|
289 | 301 | return
|
290 | 302 |
|
291 |
| - asbuffer_r(data, cast(pointer(p_void), address(data_c)), address(data_len_c)) |
| 303 | + data_len_c = _asbuffer(data, cast(pointer(p_void), address(data_c))) |
292 | 304 |
|
293 | 305 | # copy unspecified, apply copy_threshold
|
294 | 306 | if copy is None:
|
@@ -1193,7 +1205,12 @@ def recv(self, flags=0, copy: bint = True, track: bint = False):
|
1193 | 1205 | return _recv_copy(self.handle, flags)
|
1194 | 1206 | else:
|
1195 | 1207 | frame = _recv_frame(self.handle, flags, track)
|
1196 |
| - frame.more = self.get(zmq.RCVMORE) |
| 1208 | + more: bint = False |
| 1209 | + sz: size_t = sizeof(bint) |
| 1210 | + _getsockopt( |
| 1211 | + self.handle, ZMQ_RCVMORE, cast(p_void, address(more)), address(sz) |
| 1212 | + ) |
| 1213 | + frame.more = more |
1197 | 1214 | return frame
|
1198 | 1215 |
|
1199 | 1216 | def recv_into(self, buffer, /, *, nbytes=0, flags=0) -> C.int:
|
@@ -1238,22 +1255,20 @@ def recv_into(self, buffer, /, *, nbytes=0, flags=0) -> C.int:
|
1238 | 1255 | """
|
1239 | 1256 | c_flags: C.int = flags
|
1240 | 1257 | _check_closed(self)
|
1241 |
| - view = memoryview(buffer) |
1242 |
| - if not view.contiguous: |
1243 |
| - raise ValueError("Can only recv_into contiguous buffers") |
1244 |
| - if nbytes < 0: |
| 1258 | + c_nbytes: C.int = nbytes |
| 1259 | + if c_nbytes < 0: |
1245 | 1260 | raise ValueError(f"{nbytes=} must be non-negative")
|
1246 |
| - view_bytes: size_t = view.nbytes |
1247 |
| - c_nbytes: size_t = nbytes |
1248 |
| - if nbytes == 0: |
1249 |
| - c_nbytes = view_bytes |
1250 |
| - elif c_nbytes > view_bytes: |
1251 |
| - raise ValueError(f"{nbytes=} too big for memoryview of {view_bytes}B") |
1252 |
| - |
| 1261 | + view = memoryview(buffer) |
1253 | 1262 | # get C buffer
|
1254 |
| - py_buf = PyMemoryView_GET_BUFFER(view) |
| 1263 | + py_buf: pointer(Py_buffer) = PyMemoryView_GET_BUFFER(view) |
1255 | 1264 | if py_buf.readonly:
|
1256 | 1265 | raise ValueError("Cannot recv_into readonly buffer")
|
| 1266 | + if not PyBuffer_IsContiguous(py_buf, 'A'): |
| 1267 | + raise ValueError("Can only recv_into contiguous buffer") |
| 1268 | + if nbytes == 0: |
| 1269 | + c_nbytes = py_buf.len |
| 1270 | + elif c_nbytes > py_buf.len: |
| 1271 | + raise ValueError(f"{nbytes=} too big for memoryview of {py_buf.len}B") |
1257 | 1272 |
|
1258 | 1273 | # call zmq_recv, with retries
|
1259 | 1274 | while True:
|
@@ -1391,11 +1406,10 @@ def _send_copy(handle: p_void, buf, flags: C.int = 0):
|
1391 | 1406 | """Send a message on this socket by copying its content."""
|
1392 | 1407 | rc: C.int
|
1393 | 1408 | msg = declare(zmq_msg_t)
|
1394 |
| - c_bytes = declare(p_char) |
1395 |
| - c_bytes_len: Py_ssize_t = 0 |
| 1409 | + c_bytes = declare(p_void) |
1396 | 1410 |
|
1397 | 1411 | # copy to c array:
|
1398 |
| - asbuffer_r(buf, cast(pointer(p_void), address(c_bytes)), address(c_bytes_len)) |
| 1412 | + c_bytes_len = _asbuffer(buf, address(c_bytes)) |
1399 | 1413 |
|
1400 | 1414 | # Copy the msg before sending. This avoids any complications with
|
1401 | 1415 | # the GIL, etc.
|
@@ -1976,21 +1990,21 @@ def monitored_queue(
|
1976 | 1990 | in_msg = declare(zmq_msg_t)
|
1977 | 1991 | out_msg = declare(zmq_msg_t)
|
1978 | 1992 | swap_ids: bint
|
1979 |
| - msg_c: p_char = NULL |
| 1993 | + msg_c: p_void = NULL |
1980 | 1994 | msg_c_len = declare(Py_ssize_t)
|
1981 | 1995 | rc: C.int
|
1982 | 1996 |
|
1983 | 1997 | # force swap_ids if both ROUTERs
|
1984 | 1998 | swap_ids = in_socket.type == ZMQ_ROUTER and out_socket.type == ZMQ_ROUTER
|
1985 | 1999 |
|
1986 | 2000 | # build zmq_msg objects from str prefixes
|
1987 |
| - asbuffer_r(in_prefix, cast(pointer(p_void), address(msg_c)), address(msg_c_len)) |
| 2001 | + msg_c_len = _asbuffer(in_prefix, address(msg_c)) |
1988 | 2002 | rc = zmq_msg_init_size(address(in_msg), msg_c_len)
|
1989 | 2003 | _check_rc(rc)
|
1990 | 2004 |
|
1991 | 2005 | memcpy(zmq_msg_data(address(in_msg)), msg_c, zmq_msg_size(address(in_msg)))
|
1992 | 2006 |
|
1993 |
| - asbuffer_r(out_prefix, cast(pointer(p_void), address(msg_c)), address(msg_c_len)) |
| 2007 | + msg_c_len = _asbuffer(out_prefix, address(msg_c)) |
1994 | 2008 |
|
1995 | 2009 | rc = zmq_msg_init_size(address(out_msg), msg_c_len)
|
1996 | 2010 | _check_rc(rc)
|
|
0 commit comments