diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index f979890170b1a1..fd06593e864a8d 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -147,6 +147,30 @@ def _helper_reraises_exception(ex): # Class representing a process pool # +class _ChangeNotifier: + """Pipe-based notifier that does not depend on POSIX named semaphores. + + Replaces the use of multiprocessing.SimpleQueue for the pool's change + notification mechanism. SimpleQueue's internal locks require sem_open(), + which fails on platforms without /dev/shm (AWS Lambda, Android, iOS). + The change notifier never crosses a process boundary, so named semaphores + were never needed. + """ + + def __init__(self): + from .connection import Pipe + self._reader, self._writer = Pipe(duplex=False) + + def put(self, obj): + self._writer.send_bytes(b'\0') + + def get(self): + self._reader.recv_bytes() + + def empty(self): + return not self._reader.poll(0) + + class _PoolCache(dict): """ Class that implements a cache for the Pool class that will notify @@ -190,10 +214,7 @@ def __init__(self, processes=None, initializer=None, initargs=(), self._ctx = context or get_context() self._setup_queues() self._taskqueue = queue.SimpleQueue() - # The _change_notifier queue exist to wake up self._handle_workers() - # when the cache (self._cache) is empty or when there is a change in - # the _state variable of the thread that runs _handle_workers. - self._change_notifier = self._ctx.SimpleQueue() + self._change_notifier = _ChangeNotifier() self._cache = _PoolCache(notifier=self._change_notifier) self._maxtasksperchild = maxtasksperchild self._initializer = initializer diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 490c7ae5e8076c..938574ab666c0b 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2828,6 +2828,9 @@ def raise_large_valuerror(wait): def identity(x): return x +def _kill_self(): + os.kill(os.getpid(), signal.SIGKILL) + class CountedObject(object): n_instances = 0 @@ -3068,6 +3071,43 @@ def test_make_pool(self): p.close() p.join() + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_change_notifier_no_semaphore(self): + # gh-134634: The pool's change notifier uses a pipe instead of + # a multiprocessing.SimpleQueue to avoid depending on sem_open(), + # which requires /dev/shm. + if self.TYPE == 'manager': + return + from multiprocessing.pool import _ChangeNotifier + p = self.Pool(2) + try: + self.assertIsInstance(p._change_notifier, _ChangeNotifier) + result = p.map(sqr, list(range(10))) + self.assertEqual(result, list(map(sqr, range(10)))) + finally: + p.close() + p.join() + + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_change_notifier_drain(self): + # gh-134634: Verify that multiple rapid notifications (from many + # tasks completing and cache emptying) are properly drained and + # the pool shuts down without hanging. + if self.TYPE == 'manager': + return + p = self.Pool(4) + try: + # Submit many short tasks so notifications pile up faster + # than the worker handler can drain them. + results = [p.apply_async(sqr, (i,)) for i in range(200)] + for i, r in enumerate(results): + self.assertEqual(r.get(timeout=support.SHORT_TIMEOUT), sqr(i)) + finally: + # close() adds another notification on top of the cache-empty + # ones. If draining is broken, join() hangs. + p.close() + p.join() + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() def test_terminate(self): # Simulate slow tasks which take "forever" to complete @@ -3331,6 +3371,34 @@ def test_pool_worker_lifetime_early_close(self): for (j, res) in enumerate(results): self.assertEqual(res.get(), sqr(j)) + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_pool_worker_killed_mid_task(self): + # gh-134634: Verify the worker handler detects a killed worker + # via its sentinel fd and replaces it, keeping the pool functional. + p = multiprocessing.Pool(3) + orig_count = len(p._pool) + self.assertEqual(orig_count, 3) + + # Kill one worker. The task will never return a result. + p.apply_async(_kill_self) + + # Give the worker handler time to detect the death and replace. + deadline = time.monotonic() + support.SHORT_TIMEOUT + while time.monotonic() < deadline: + alive = [w for w in p._pool if w.is_alive()] + if len(alive) >= orig_count: + break + time.sleep(0.1) + self.assertEqual(len(p._pool), orig_count, + "worker handler did not replace the killed worker") + + # Pool should still be functional with the replacement worker. + result = p.map(sqr, list(range(5))) + self.assertEqual(result, list(map(sqr, range(5)))) + + p.terminate() + p.join() + def test_pool_maxtasksperchild_invalid(self): for value in [0, -1, 0.5, "12"]: with self.assertRaises(ValueError): diff --git a/Misc/NEWS.d/next/Library/2026-04-03-20-00-00.gh-issue-134634.Xk9fRe.rst b/Misc/NEWS.d/next/Library/2026-04-03-20-00-00.gh-issue-134634.Xk9fRe.rst new file mode 100644 index 00000000000000..9637f76710d656 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-04-03-20-00-00.gh-issue-134634.Xk9fRe.rst @@ -0,0 +1,5 @@ +:class:`multiprocessing.pool.Pool` and :class:`multiprocessing.pool.ThreadPool` +no longer require POSIX named semaphores for the internal change notification +mechanism. This fixes ``Pool`` and ``ThreadPool`` on platforms where +``sem_open()`` is unavailable, such as AWS Lambda (no ``/dev/shm``), Android, +and iOS.