Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Release GIL around C functions #391

Open
wants to merge 8 commits into
base: branch-0.13
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 43 additions & 37 deletions ucp/_libs/core.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import os
import asyncio
import weakref
from functools import partial
from libc.stdint cimport uint64_t, uintptr_t
from libc.stdint cimport uint16_t, uint64_t, uintptr_t
import uuid
import socket
import logging
Expand Down Expand Up @@ -253,19 +253,20 @@ async def listener_handler(ucp_endpoint, ctx, ucp_worker, func, guarantee_msg_or
await _func(pub_ep)


cdef void _listener_callback(ucp_ep_h ep, void *args):
cdef void _listener_callback(ucp_ep_h ep, void *args) nogil:
cdef _listener_callback_args *a = <_listener_callback_args *> args
cdef object ctx = <object> a.py_ctx
cdef object func = <object> a.py_func
asyncio.ensure_future(
listener_handler(
int(<uintptr_t><void*>ep),
ctx,
int(<uintptr_t><void*>a.ucp_worker),
func,
a.guarantee_msg_order
with gil:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be with the gil ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh , any call back needs to be with the gil, correct ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right.

ctx = <object> a.py_ctx
func = <object> a.py_func
asyncio.ensure_future(
listener_handler(
int(<uintptr_t><void*>ep),
ctx,
int(<uintptr_t><void*>a.ucp_worker),
func,
a.guarantee_msg_order
)
)
)


async def _non_blocking_mode(weakref_ctx):
Expand Down Expand Up @@ -338,24 +339,26 @@ cdef class ApplicationContext:

self.config['VERSION'] = get_ucx_version()

memset(&ucp_params, 0, sizeof(ucp_params))
ucp_params.field_mask = (UCP_PARAM_FIELD_FEATURES | # noqa
UCP_PARAM_FIELD_REQUEST_SIZE | # noqa
UCP_PARAM_FIELD_REQUEST_INIT)

# We always request UCP_FEATURE_WAKEUP even when in blocking mode
# See <https://github.com/rapidsai/ucx-py/pull/377>
ucp_params.features = (UCP_FEATURE_TAG | # noqa
UCP_FEATURE_WAKEUP | # noqa
UCP_FEATURE_STREAM)

ucp_params.request_size = sizeof(ucp_request)
ucp_params.request_init = (
<ucp_request_init_callback_t>ucp_request_reset
)
with nogil:
memset(&ucp_params, 0, sizeof(ucp_params))
ucp_params.field_mask = (UCP_PARAM_FIELD_FEATURES | # noqa
UCP_PARAM_FIELD_REQUEST_SIZE | # noqa
UCP_PARAM_FIELD_REQUEST_INIT)

# We always request UCP_FEATURE_WAKEUP even when in blocking mode
# See <https://github.com/rapidsai/ucx-py/pull/377>
ucp_params.features = (UCP_FEATURE_TAG | # noqa
UCP_FEATURE_WAKEUP | # noqa
UCP_FEATURE_STREAM)

ucp_params.request_size = sizeof(ucp_request)
ucp_params.request_init = (
<ucp_request_init_callback_t>ucp_request_reset
)

cdef ucp_config_t *config = read_ucx_config(config_dict)
status = ucp_init(&ucp_params, config, &self.context)
with nogil:
status = ucp_init(&ucp_params, config, &self.context)
assert_ucs_status(status)

worker_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE
Expand Down Expand Up @@ -399,7 +402,8 @@ cdef class ApplicationContext:
if self.blocking_progress_mode:
close(self.epoll_fd)

def create_listener(self, callback_func, port, guarantee_msg_order):
def create_listener(self, callback_func,
port, guarantee_msg_order):
from ..public_api import Listener
self.continuous_ucx_progress()
if port in (None, 0):
Expand All @@ -421,14 +425,16 @@ cdef class ApplicationContext:
Py_INCREF(callback_func)

cdef ucp_listener_params_t params
cdef ucp_listener_accept_callback_t _listener_cb = (
<ucp_listener_accept_callback_t>_listener_callback
)
if c_util_get_ucp_listener_params(&params,
port,
_listener_cb,
<void*> &ret._cb_args):
raise MemoryError("Failed allocation of ucp_ep_params_t")
cdef uint16_t port_int = port
cdef ucp_listener_accept_callback_t _listener_cb
with nogil:
_listener_cb = <ucp_listener_accept_callback_t>_listener_callback
if c_util_get_ucp_listener_params(&params,
port_int,
_listener_cb,
<void*> &ret._cb_args):
with gil:
raise MemoryError("Failed allocation of ucp_ep_params_t")

logging.info("create_listener() - Start listening on port %d" % port)
cdef ucs_status_t status = ucp_listener_create(
Expand Down
8 changes: 4 additions & 4 deletions ucp/_libs/core_dep.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ from posix.unistd cimport close
from cpython.ref cimport PyObject, Py_INCREF, Py_DECREF


cdef extern from "src/c_util.h":
cdef extern from "src/c_util.h" nogil:
ctypedef struct ucp_listener_params_t:
pass

Expand All @@ -37,7 +37,7 @@ cdef extern from "src/c_util.h":
void c_util_get_ucp_ep_params_free(ucp_ep_params_t *param)


cdef extern from "ucp/api/ucp.h":
cdef extern from "ucp/api/ucp.h" nogil:
ctypedef struct ucp_context:
pass

Expand Down Expand Up @@ -202,7 +202,7 @@ cdef extern from "ucp/api/ucp.h":
ucs_status_t ucp_config_modify(ucp_config_t *config, const char *name,
const char *value)

cdef extern from "sys/epoll.h":
cdef extern from "sys/epoll.h" nogil:

cdef enum:
EPOLL_CTL_ADD = 1
Expand Down Expand Up @@ -245,7 +245,7 @@ cdef struct ucp_request:
int64_t received


cdef inline void ucp_request_reset(void* request):
cdef inline void ucp_request_reset(void* request) nogil:
cdef ucp_request *req = <ucp_request*> request
req.finished = False
req.future = NULL
Expand Down
187 changes: 99 additions & 88 deletions ucp/_libs/send_recv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,27 @@ cdef create_future_from_comm_status(ucs_status_ptr_t status,
return ret


cdef void _send_callback(void *request, ucs_status_t status):
cdef void _send_callback(void *request, ucs_status_t status) nogil:
cdef ucp_request *req = <ucp_request*> request
if req.future == NULL:
# This callback function was called before ucp_tag_send_nb() returned
req.finished = True
return
cdef object future = <object> req.future
cdef object log_str = <object> req.log_str
if asyncio.get_event_loop().is_closed():
pass
elif status == UCS_ERR_CANCELED:
future.set_exception(UCXCanceled())
elif status != UCS_OK:
msg = "Error sending%s " %(" \"%s\":" % log_str if log_str else ":")
msg += ucs_status_string(status).decode("utf-8")
future.set_exception(UCXError(msg))
else:
future.set_result(True)
Py_DECREF(future)
Py_DECREF(log_str)
with gil:
future = <object> req.future
log_str = <object> req.log_str
if asyncio.get_event_loop().is_closed():
pass
elif status == UCS_ERR_CANCELED:
future.set_exception(UCXCanceled())
elif status != UCS_OK:
msg = "Error sending%s " %(" \"%s\":" % log_str if log_str else ":")
msg += ucs_status_string(status).decode("utf-8")
future.set_exception(UCXError(msg))
else:
future.set_result(True)
Py_DECREF(future)
Py_DECREF(log_str)
ucp_request_reset(request)
ucp_request_free(request)

Expand All @@ -82,43 +83,47 @@ def tag_send(uintptr_t ucp_ep, buffer, size_t nbytes,
cdef ucp_ep_h ep = <ucp_ep_h><uintptr_t>ucp_ep
cdef void *data = <void*><uintptr_t>(get_buffer_data(buffer,
check_writable=False))
cdef ucp_send_callback_t _send_cb = <ucp_send_callback_t>_send_callback
cdef ucs_status_ptr_t status = ucp_tag_send_nb(ep,
data,
nbytes,
ucp_dt_make_contig(1),
tag,
_send_cb)
cdef ucp_send_callback_t _send_cb
cdef ucs_status_ptr_t status
with nogil:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be with the gil since it's a callback executed by C ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our Cython callbacks are acquiring the GIL before they call into Python. So we need to be sure to release the GIL beforehand.

_send_cb = <ucp_send_callback_t>_send_callback
status = ucp_tag_send_nb(ep,
data,
nbytes,
ucp_dt_make_contig(1),
tag,
_send_cb)
return create_future_from_comm_status(status, nbytes, pending_msg)


cdef void _tag_recv_callback(void *request, ucs_status_t status,
ucp_tag_recv_info_t *info):
ucp_tag_recv_info_t *info) nogil:
cdef ucp_request *req = <ucp_request*> request
if req.future == NULL:
# This callback function was called before ucp_tag_recv_nb() returned
req.finished = True
req.received = info.length
return
cdef object future = <object> req.future
cdef object log_str = <object> req.log_str
msg = "Error receiving%s " %(" \"%s\":" % log_str if log_str else ":")
if asyncio.get_event_loop().is_closed():
pass
elif status == UCS_ERR_CANCELED:
future.set_exception(UCXCanceled())
elif status != UCS_OK:
msg += ucs_status_string(status).decode("utf-8")
future.set_exception(UCXError(msg))
elif info.length != req.expected_receive:
msg += "length mismatch: %d (got) != %d (expected)" % (
info.length, req.expected_receive
)
future.set_exception(UCXError(msg))
else:
future.set_result(True)
Py_DECREF(future)
Py_DECREF(log_str)
with gil:
future = <object> req.future
log_str = <object> req.log_str
msg = "Error receiving%s " %(" \"%s\":" % log_str if log_str else ":")
if asyncio.get_event_loop().is_closed():
pass
elif status == UCS_ERR_CANCELED:
future.set_exception(UCXCanceled())
elif status != UCS_OK:
msg += ucs_status_string(status).decode("utf-8")
future.set_exception(UCXError(msg))
elif info.length != req.expected_receive:
msg += "length mismatch: %d (got) != %d (expected)" % (
info.length, req.expected_receive
)
future.set_exception(UCXError(msg))
else:
future.set_result(True)
Py_DECREF(future)
Py_DECREF(log_str)
ucp_request_reset(request)
ucp_request_free(request)

Expand All @@ -128,59 +133,64 @@ def tag_recv(uintptr_t ucp_worker, buffer, size_t nbytes,
cdef ucp_worker_h worker = <ucp_worker_h><uintptr_t>ucp_worker
cdef void *data = <void*><uintptr_t>(get_buffer_data(buffer,
check_writable=True))
cdef ucp_tag_recv_callback_t _tag_recv_cb = (
<ucp_tag_recv_callback_t>_tag_recv_callback
)
cdef ucs_status_ptr_t status = ucp_tag_recv_nb(worker,
data,
nbytes,
ucp_dt_make_contig(1),
tag,
-1,
_tag_recv_cb)
cdef ucp_tag_recv_callback_t _tag_recv_cb
cdef ucs_status_ptr_t status
with nogil:
_tag_recv_cb = <ucp_tag_recv_callback_t>_tag_recv_callback
status = ucp_tag_recv_nb(worker,
data,
nbytes,
ucp_dt_make_contig(1),
tag,
-1,
_tag_recv_cb)
return create_future_from_comm_status(status, nbytes, pending_msg)


def stream_send(uintptr_t ucp_ep, buffer, size_t nbytes, pending_msg=None):
cdef ucp_ep_h ep = <ucp_ep_h><uintptr_t>ucp_ep
cdef void *data = <void*><uintptr_t>(get_buffer_data(buffer,
check_writable=False))
cdef ucp_send_callback_t _send_cb = <ucp_send_callback_t>_send_callback
cdef ucs_status_ptr_t status = ucp_stream_send_nb(ep,
data,
nbytes,
ucp_dt_make_contig(1),
_send_cb,
0)
cdef ucp_send_callback_t _send_cb
cdef ucs_status_ptr_t status
with nogil:
_send_cb = <ucp_send_callback_t>_send_callback
status = ucp_stream_send_nb(ep,
data,
nbytes,
ucp_dt_make_contig(1),
_send_cb,
0)
return create_future_from_comm_status(status, nbytes, pending_msg)


cdef void _stream_recv_callback(void *request, ucs_status_t status,
size_t length):
size_t length) nogil:
cdef ucp_request *req = <ucp_request*> request
if req.future == NULL:
# This callback function was called before ucp_stream_recv_nb() returned
req.finished = True
req.received = length
return
cdef object future = <object> req.future
cdef object log_str = <object> req.log_str
msg = "Error receiving %s" %(" \"%s\":" % log_str if log_str else ":")
if asyncio.get_event_loop().is_closed():
pass
elif status == UCS_ERR_CANCELED:
future.set_exception(UCXCanceled())
elif status != UCS_OK:
msg += ucs_status_string(status).decode("utf-8")
future.set_exception(UCXError(msg))
elif length != req.expected_receive:
msg += "length mismatch: %d (got) != %d (expected)" % (
length, req.expected_receive)
future.set_exception(UCXError(msg))
else:
future.set_result(True)
Py_DECREF(future)
Py_DECREF(log_str)
with gil:
future = <object> req.future
log_str = <object> req.log_str
msg = "Error receiving %s" %(" \"%s\":" % log_str if log_str else ":")
if asyncio.get_event_loop().is_closed():
pass
elif status == UCS_ERR_CANCELED:
future.set_exception(UCXCanceled())
elif status != UCS_OK:
msg += ucs_status_string(status).decode("utf-8")
future.set_exception(UCXError(msg))
elif length != req.expected_receive:
msg += "length mismatch: %d (got) != %d (expected)" % (
length, req.expected_receive)
future.set_exception(UCXError(msg))
else:
future.set_result(True)
Py_DECREF(future)
Py_DECREF(log_str)
ucp_request_reset(request)
ucp_request_free(request)

Expand All @@ -191,14 +201,15 @@ def stream_recv(uintptr_t ucp_ep, buffer, size_t nbytes, pending_msg=None):
check_writable=True))
cdef size_t length
cdef ucp_request *req
cdef ucp_stream_recv_callback_t _stream_recv_cb = (
<ucp_stream_recv_callback_t>_stream_recv_callback
)
cdef ucs_status_ptr_t status = ucp_stream_recv_nb(ep,
data,
nbytes,
ucp_dt_make_contig(1),
_stream_recv_cb,
&length,
0)
cdef ucp_stream_recv_callback_t _stream_recv_cb
cdef ucs_status_ptr_t status
with nogil:
_stream_recv_cb = <ucp_stream_recv_callback_t>_stream_recv_callback
status = ucp_stream_recv_nb(ep,
data,
nbytes,
ucp_dt_make_contig(1),
_stream_recv_cb,
&length,
0)
return create_future_from_comm_status(status, nbytes, pending_msg)
Loading