This repository has been archived on 2023-03-26. You can view files and clone it, but cannot push or open issues or pull requests.
catfish/catfish/utils/processes.py

50 lines
1.1 KiB
Python

import time
from typing import List
import psutil
CURRENT_PROCESS = psutil.Process()
def terminate_processes(procs: List[psutil.Process], timeout=3):
# https://psutil.readthedocs.io/en/latest/#terminate-my-children
for p in procs:
p.terminate()
gone, alive = psutil.wait_procs(procs, timeout=timeout)
if alive:
# send SIGKILL
for p in alive:
p.kill()
gone, alive = psutil.wait_procs(alive, timeout=timeout)
return alive
def terminate_subprocesses():
return terminate_processes(CURRENT_PROCESS.children(recursive=True))
def get_root_process() -> psutil.Process:
proc = CURRENT_PROCESS
while proc.parent():
proc = proc.parent()
return proc
def is_process_running(pid: int) -> bool:
try:
psutil.Process(pid)
return True
except psutil.NoSuchProcess:
return False
def wait_for_process_start(pid: int):
while not is_process_running(pid):
time.sleep(0.1)
def wait_for_process_terminate(pid: int):
while is_process_running(pid):
time.sleep(0.1)