Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,30 @@ def _helper_reraises_exception(ex):
# Class representing a process pool
#

class _ChangeNotifier:
"""Pipe-based notifier that does not depend on POSIX named semaphores.

Replaces the use of multiprocessing.SimpleQueue for the pool's change
notification mechanism. SimpleQueue's internal locks require sem_open(),
which fails on platforms without /dev/shm (AWS Lambda, Android, iOS).
The change notifier never crosses a process boundary, so named semaphores
were never needed.
"""

def __init__(self):
from .connection import Pipe
self._reader, self._writer = Pipe(duplex=False)

def put(self, obj):
self._writer.send_bytes(b'\0')

def get(self):
self._reader.recv_bytes()

def empty(self):
return not self._reader.poll(0)


class _PoolCache(dict):
"""
Class that implements a cache for the Pool class that will notify
Expand Down Expand Up @@ -190,10 +214,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
self._ctx = context or get_context()
self._setup_queues()
self._taskqueue = queue.SimpleQueue()
# The _change_notifier queue exist to wake up self._handle_workers()
# when the cache (self._cache) is empty or when there is a change in
# the _state variable of the thread that runs _handle_workers.
self._change_notifier = self._ctx.SimpleQueue()
self._change_notifier = _ChangeNotifier()
self._cache = _PoolCache(notifier=self._change_notifier)
self._maxtasksperchild = maxtasksperchild
self._initializer = initializer
Expand Down
68 changes: 68 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2828,6 +2828,9 @@ def raise_large_valuerror(wait):
def identity(x):
return x

def _kill_self():
os.kill(os.getpid(), signal.SIGKILL)

class CountedObject(object):
n_instances = 0

Expand Down Expand Up @@ -3068,6 +3071,43 @@ def test_make_pool(self):
p.close()
p.join()

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_change_notifier_no_semaphore(self):
# gh-134634: The pool's change notifier uses a pipe instead of
# a multiprocessing.SimpleQueue to avoid depending on sem_open(),
# which requires /dev/shm.
if self.TYPE == 'manager':
return
from multiprocessing.pool import _ChangeNotifier
p = self.Pool(2)
try:
self.assertIsInstance(p._change_notifier, _ChangeNotifier)
result = p.map(sqr, list(range(10)))
self.assertEqual(result, list(map(sqr, range(10))))
finally:
p.close()
p.join()

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_change_notifier_drain(self):
# gh-134634: Verify that multiple rapid notifications (from many
# tasks completing and cache emptying) are properly drained and
# the pool shuts down without hanging.
if self.TYPE == 'manager':
return
p = self.Pool(4)
try:
# Submit many short tasks so notifications pile up faster
# than the worker handler can drain them.
results = [p.apply_async(sqr, (i,)) for i in range(200)]
for i, r in enumerate(results):
self.assertEqual(r.get(timeout=support.SHORT_TIMEOUT), sqr(i))
finally:
# close() adds another notification on top of the cache-empty
# ones. If draining is broken, join() hangs.
p.close()
p.join()

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_terminate(self):
# Simulate slow tasks which take "forever" to complete
Expand Down Expand Up @@ -3331,6 +3371,34 @@ def test_pool_worker_lifetime_early_close(self):
for (j, res) in enumerate(results):
self.assertEqual(res.get(), sqr(j))

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_pool_worker_killed_mid_task(self):
# gh-134634: Verify the worker handler detects a killed worker
# via its sentinel fd and replaces it, keeping the pool functional.
p = multiprocessing.Pool(3)
orig_count = len(p._pool)
self.assertEqual(orig_count, 3)

# Kill one worker. The task will never return a result.
p.apply_async(_kill_self)

# Give the worker handler time to detect the death and replace.
deadline = time.monotonic() + support.SHORT_TIMEOUT
while time.monotonic() < deadline:
alive = [w for w in p._pool if w.is_alive()]
if len(alive) >= orig_count:
break
time.sleep(0.1)
self.assertEqual(len(p._pool), orig_count,
"worker handler did not replace the killed worker")

# Pool should still be functional with the replacement worker.
result = p.map(sqr, list(range(5)))
self.assertEqual(result, list(map(sqr, range(5))))

p.terminate()
p.join()

def test_pool_maxtasksperchild_invalid(self):
for value in [0, -1, 0.5, "12"]:
with self.assertRaises(ValueError):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
:class:`multiprocessing.pool.Pool` and :class:`multiprocessing.pool.ThreadPool`
no longer require POSIX named semaphores for the internal change notification
mechanism. This fixes ``Pool`` and ``ThreadPool`` on platforms where
``sem_open()`` is unavailable, such as AWS Lambda (no ``/dev/shm``), Android,
and iOS.
Loading