Skip to content

Commit 248823e

Browse files
committed
concurrent.futures.Executor.map: raise after executor shutdown
1 parent da3fea3 commit 248823e

File tree

2 files changed

+16
-9
lines changed

2 files changed

+16
-9
lines changed

Lib/concurrent/futures/_base.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -614,12 +614,10 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
614614
fs = collections.deque(
615615
self.submit(fn, *args) for args in islice(zipped_iterables, buffersize)
616616
)
617+
executor = self
617618
else:
618619
fs = [self.submit(fn, *args) for args in zipped_iterables]
619-
620-
# Use a weak reference to ensure that the executor can be garbage
621-
# collected independently of the result_iterator closure.
622-
executor_weakref = weakref.ref(self)
620+
executor = None
623621

624622
# Yield must be hidden in closure so that the futures are submitted
625623
# before the first iterator value is required.
@@ -628,11 +626,7 @@ def result_iterator():
628626
# reverse to keep finishing order
629627
fs.reverse()
630628
while fs:
631-
if (
632-
buffersize
633-
and (executor := executor_weakref())
634-
and (args := next(zipped_iterables, None))
635-
):
629+
if buffersize and (args := next(zipped_iterables, None)):
636630
fs.appendleft(executor.submit(fn, *args))
637631
# Careful not to keep a reference to the popped future
638632
if timeout is None:

Lib/test/test_concurrent_futures/executor.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,19 @@ def test_map_buffersize_on_multiple_infinite_iterables(self):
139139
self.assertEqual(next(res, None), 2)
140140
self.assertEqual(next(res, None), 4)
141141

142+
def test_map_buffersize_raise_on_shutdown(self):
143+
for start_iteration in (False, True):
144+
with self.subTest(start_iteration=start_iteration):
145+
with self.executor_type(max_workers=1) as executor:
146+
results = executor.map(str, range(4), buffersize=1)
147+
if start_iteration:
148+
next(results)
149+
with self.assertRaisesRegex(
150+
RuntimeError,
151+
"cannot schedule new futures after shutdown",
152+
):
153+
next(results)
154+
142155
def test_map_buffersize_on_empty_iterable(self):
143156
res = self.executor.map(str, [], buffersize=2)
144157
self.assertIsNone(next(res, None))

0 commit comments

Comments
 (0)