Skip to content

Commit

Permalink
gh-71936: Fix race condition in multiprocessing.Pool (GH-124973)
Browse files Browse the repository at this point in the history
* gh-71936: Fix race condition in multiprocessing.Pool

Proxes of shared objects register a Finalizer in BaseProxy._incref(), and it
will call BaseProxy._decref() when it is GCed. This may cause a race condition
with Pool(maxtasksperchild=None) on Windows.

A connection would be closed and raised TypeError when a GC occurs between
_ConnectionBase._check_writable() and _ConnectionBase._send_bytes() in
_ConnectionBase.send() in the second or later task, and a new object
is allocated that shares the id() of a previously deleted one.

Instead of using the id() of the token (or the proxy), use a unique,
non-reusable number.

Co-Authored-By: Akinori Hattori <[email protected]>
  • Loading branch information
encukou and hattya authored Nov 13, 2024
1 parent 1e40c5b commit ba088c8
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 13 deletions.
33 changes: 20 additions & 13 deletions Lib/multiprocessing/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,22 +759,29 @@ class BaseProxy(object):
_address_to_local = {}
_mutex = util.ForkAwareThreadLock()

# Each instance gets a `_serial` number. Unlike `id(...)`, this number
# is never reused.
_next_serial = 1

def __init__(self, token, serializer, manager=None,
authkey=None, exposed=None, incref=True, manager_owned=False):
with BaseProxy._mutex:
tls_idset = BaseProxy._address_to_local.get(token.address, None)
if tls_idset is None:
tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
BaseProxy._address_to_local[token.address] = tls_idset
tls_serials = BaseProxy._address_to_local.get(token.address, None)
if tls_serials is None:
tls_serials = util.ForkAwareLocal(), ProcessLocalSet()
BaseProxy._address_to_local[token.address] = tls_serials

self._serial = BaseProxy._next_serial
BaseProxy._next_serial += 1

# self._tls is used to record the connection used by this
# thread to communicate with the manager at token.address
self._tls = tls_idset[0]
self._tls = tls_serials[0]

# self._idset is used to record the identities of all shared
# objects for which the current process owns references and
# self._all_serials is a set used to record the identities of all
# shared objects for which the current process owns references and
# which are in the manager at token.address
self._idset = tls_idset[1]
self._all_serials = tls_serials[1]

self._token = token
self._id = self._token.id
Expand Down Expand Up @@ -857,20 +864,20 @@ def _incref(self):
dispatch(conn, None, 'incref', (self._id,))
util.debug('INCREF %r', self._token.id)

self._idset.add(self._id)
self._all_serials.add(self._serial)

state = self._manager and self._manager._state

self._close = util.Finalize(
self, BaseProxy._decref,
args=(self._token, self._authkey, state,
self._tls, self._idset, self._Client),
args=(self._token, self._serial, self._authkey, state,
self._tls, self._all_serials, self._Client),
exitpriority=10
)

@staticmethod
def _decref(token, authkey, state, tls, idset, _Client):
idset.discard(token.id)
def _decref(token, serial, authkey, state, tls, idset, _Client):
idset.discard(serial)

# check whether manager is still alive
if state is None or state.value == State.STARTED:
Expand Down
1 change: 1 addition & 0 deletions Misc/ACKS
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ Larry Hastings
Tim Hatch
Zac Hatfield-Dodds
Shane Hathaway
Akinori Hattori
Michael Haubenwallner
Janko Hauser
Flavian Hautbois
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a race condition in :class:`multiprocessing.pool.Pool`.

0 comments on commit ba088c8

Please sign in to comment.