This repository has been archived on 2023-03-26. You can view files and clone it, but cannot push or open issues or pull requests.
catfish/tests/test_utils/test_processes.py

47 lines
1.5 KiB
Python

import os
import subprocess
import psutil
from catfish.utils import processes
from tests import BaseTestCase
class RootProcessTestCase(BaseTestCase):
def test_root_processes(self):
root_process = processes.get_root_process()
self.assertIsNone(root_process.parent())
all_children = [proc.pid for proc in root_process.children(recursive=True)]
self.assertIn(os.getpid(), all_children)
def test_current_process(self):
self.assertEqual(processes.CURRENT_PROCESS.pid, os.getpid())
class TerminateProcessesTestCase(BaseTestCase):
def test_kills_lots_of_processes(self):
created_processes = []
for _ in range(10):
created_processes.append(subprocess.Popen("yes", stdout=subprocess.PIPE))
for proc in created_processes:
self.assertIsNone(proc.poll())
still_alive = processes.terminate_processes(
[psutil.Process(proc.pid) for proc in created_processes]
)
self.assertIsNone(still_alive)
for proc in created_processes:
self.assertEqual(proc.poll(), 0)
class RunningProcessTestCase(BaseTestCase):
def test_current_process_running(self):
self.assertTrue(processes.is_process_running(os.getpid()))
def test_root_process_running(self):
self.assertTrue(processes.is_process_running(processes.get_root_process().pid))
def test_impossible_running_process(self):
self.assertFalse(processes.is_process_running(2938420983094892))