|
- import os
- import pickle
- import subprocess
- import sys
- from collections import deque
- from importlib.util import module_from_spec, spec_from_file_location
- from typing import Callable, Deque, List, Optional, Set, Tuple, TypeVar, cast
-
- from ._core._eventloop import current_time, get_asynclib, get_cancelled_exc_class
- from ._core._exceptions import BrokenWorkerProcess
- from ._core._subprocesses import open_process
- from ._core._synchronization import CapacityLimiter
- from ._core._tasks import CancelScope, fail_after
- from .abc import ByteReceiveStream, ByteSendStream, Process
- from .lowlevel import RunVar, checkpoint_if_cancelled
- from .streams.buffered import BufferedByteReceiveStream
-
- WORKER_MAX_IDLE_TIME = 300 # 5 minutes
-
- T_Retval = TypeVar('T_Retval')
- _process_pool_workers: RunVar[Set[Process]] = RunVar('_process_pool_workers')
- _process_pool_idle_workers: RunVar[Deque[Tuple[Process, float]]] = RunVar(
- '_process_pool_idle_workers')
- _default_process_limiter: RunVar[CapacityLimiter] = RunVar('_default_process_limiter')
-
-
- async def run_sync(
- func: Callable[..., T_Retval], *args: object, cancellable: bool = False,
- limiter: Optional[CapacityLimiter] = None) -> T_Retval:
- """
- Call the given function with the given arguments in a worker process.
-
- If the ``cancellable`` option is enabled and the task waiting for its completion is cancelled,
- the worker process running it will be abruptly terminated using SIGKILL (or
- ``terminateProcess()`` on Windows).
-
- :param func: a callable
- :param args: positional arguments for the callable
- :param cancellable: ``True`` to allow cancellation of the operation while it's running
- :param limiter: capacity limiter to use to limit the total amount of processes running
- (if omitted, the default limiter is used)
- :return: an awaitable that yields the return value of the function.
-
- """
- async def send_raw_command(pickled_cmd: bytes) -> object:
- try:
- await stdin.send(pickled_cmd)
- response = await buffered.receive_until(b'\n', 50)
- status, length = response.split(b' ')
- if status not in (b'RETURN', b'EXCEPTION'):
- raise RuntimeError(f'Worker process returned unexpected response: {response!r}')
-
- pickled_response = await buffered.receive_exactly(int(length))
- except BaseException as exc:
- workers.discard(process)
- try:
- process.kill()
- with CancelScope(shield=True):
- await process.aclose()
- except ProcessLookupError:
- pass
-
- if isinstance(exc, get_cancelled_exc_class()):
- raise
- else:
- raise BrokenWorkerProcess from exc
-
- retval = pickle.loads(pickled_response)
- if status == b'EXCEPTION':
- assert isinstance(retval, BaseException)
- raise retval
- else:
- return retval
-
- # First pickle the request before trying to reserve a worker process
- await checkpoint_if_cancelled()
- request = pickle.dumps(('run', func, args), protocol=pickle.HIGHEST_PROTOCOL)
-
- # If this is the first run in this event loop thread, set up the necessary variables
- try:
- workers = _process_pool_workers.get()
- idle_workers = _process_pool_idle_workers.get()
- except LookupError:
- workers = set()
- idle_workers = deque()
- _process_pool_workers.set(workers)
- _process_pool_idle_workers.set(idle_workers)
- get_asynclib().setup_process_pool_exit_at_shutdown(workers)
-
- async with (limiter or current_default_process_limiter()):
- # Pop processes from the pool (starting from the most recently used) until we find one that
- # hasn't exited yet
- process: Process
- while idle_workers:
- process, idle_since = idle_workers.pop()
- if process.returncode is None:
- stdin = cast(ByteSendStream, process.stdin)
- buffered = BufferedByteReceiveStream(cast(ByteReceiveStream, process.stdout))
-
- # Prune any other workers that have been idle for WORKER_MAX_IDLE_TIME seconds or
- # longer
- now = current_time()
- killed_processes: List[Process] = []
- while idle_workers:
- if now - idle_workers[0][1] < WORKER_MAX_IDLE_TIME:
- break
-
- process, idle_since = idle_workers.popleft()
- process.kill()
- workers.remove(process)
- killed_processes.append(process)
-
- with CancelScope(shield=True):
- for process in killed_processes:
- await process.aclose()
-
- break
-
- workers.remove(process)
- else:
- command = [sys.executable, '-u', '-m', __name__]
- process = await open_process(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
- try:
- stdin = cast(ByteSendStream, process.stdin)
- buffered = BufferedByteReceiveStream(cast(ByteReceiveStream, process.stdout))
- with fail_after(20):
- message = await buffered.receive(6)
-
- if message != b'READY\n':
- raise BrokenWorkerProcess(
- f'Worker process returned unexpected response: {message!r}')
-
- main_module_path = getattr(sys.modules['__main__'], '__file__', None)
- pickled = pickle.dumps(('init', sys.path, main_module_path),
- protocol=pickle.HIGHEST_PROTOCOL)
- await send_raw_command(pickled)
- except (BrokenWorkerProcess, get_cancelled_exc_class()):
- raise
- except BaseException as exc:
- process.kill()
- raise BrokenWorkerProcess('Error during worker process initialization') from exc
-
- workers.add(process)
-
- with CancelScope(shield=not cancellable):
- try:
- return cast(T_Retval, await send_raw_command(request))
- finally:
- if process in workers:
- idle_workers.append((process, current_time()))
-
-
- def current_default_process_limiter() -> CapacityLimiter:
- """
- Return the capacity limiter that is used by default to limit the number of worker processes.
-
- :return: a capacity limiter object
-
- """
- try:
- return _default_process_limiter.get()
- except LookupError:
- limiter = CapacityLimiter(os.cpu_count() or 2)
- _default_process_limiter.set(limiter)
- return limiter
-
-
- def process_worker() -> None:
- # Redirect standard streams to os.devnull so that user code won't interfere with the
- # parent-worker communication
- stdin = sys.stdin
- stdout = sys.stdout
- sys.stdin = open(os.devnull)
- sys.stdout = open(os.devnull, 'w')
-
- stdout.buffer.write(b'READY\n')
- while True:
- retval = exception = None
- try:
- command, *args = pickle.load(stdin.buffer)
- except EOFError:
- return
- except BaseException as exc:
- exception = exc
- else:
- if command == 'run':
- func, args = args
- try:
- retval = func(*args)
- except BaseException as exc:
- exception = exc
- elif command == 'init':
- main_module_path: Optional[str]
- sys.path, main_module_path = args
- del sys.modules['__main__']
- if main_module_path:
- # Load the parent's main module but as __mp_main__ instead of __main__
- # (like multiprocessing does) to avoid infinite recursion
- try:
- spec = spec_from_file_location('__mp_main__', main_module_path)
- if spec and spec.loader:
- main = module_from_spec(spec)
- spec.loader.exec_module(main)
- sys.modules['__main__'] = main
- except BaseException as exc:
- exception = exc
-
- try:
- if exception is not None:
- status = b'EXCEPTION'
- pickled = pickle.dumps(exception, pickle.HIGHEST_PROTOCOL)
- else:
- status = b'RETURN'
- pickled = pickle.dumps(retval, pickle.HIGHEST_PROTOCOL)
- except BaseException as exc:
- exception = exc
- status = b'EXCEPTION'
- pickled = pickle.dumps(exc, pickle.HIGHEST_PROTOCOL)
-
- stdout.buffer.write(b'%s %d\n' % (status, len(pickled)))
- stdout.buffer.write(pickled)
-
- # Respect SIGTERM
- if isinstance(exception, SystemExit):
- raise exception
-
-
- if __name__ == '__main__':
- process_worker()
|