import psutil from catfish.project import Project from catfish.utils.processes import is_process_running from catfish.worker import BASE_SOCKET_DIR from catfish.worker.server import PayloadType, read_logs_for_process, send_to_server from tests import BaseWorkerTestCase class WorkerServerTestCase(BaseWorkerTestCase): def test_ping(self): response = send_to_server(PayloadType.PING, {}) self.assertEqual(response, {"ping": "pong"}) class ProcessWorkerTestCase(BaseWorkerTestCase): def setUp(self): super().setUp() self.project = Project(self.EXAMPLE_DIR) self.process = self.project.get_process("bg") def test_server_creates_process(self): response = send_to_server( PayloadType.PROCESS, {"path": str(self.project.root), "process": str(self.process.name)}, ) self.assertTrue(is_process_running(response["pid"])) def test_additional_environment(self): response = send_to_server( PayloadType.PROCESS, {"path": str(self.project.root), "process": str(self.process.name)}, ) env = psutil.Process(response["pid"]).environ() self.assertEqual(env["PYTHONUNBUFFERED"], "1") self.assertEqual(env["CATFISH_IDENT"], self.process.ident) path_dirs = env["PATH"].split(":") for path in self.project.get_extra_path(): self.assertIn(str(path), path_dirs) class ProcessLogsTestCase(BaseWorkerTestCase): def setUp(self): super().setUp() self.project = Project(self.EXAMPLE_DIR) self.process = self.project.get_process("bg") def test_creates_socket(self): send_to_server( PayloadType.PROCESS, {"path": str(self.project.root), "process": str(self.process.name)}, ) stdout_socket = BASE_SOCKET_DIR.joinpath(self.process.logs_socket) self.assertTrue(stdout_socket.exists()) def test_gets_logs(self): send_to_server( PayloadType.PROCESS, {"path": str(self.project.root), "process": str(self.process.name)}, ) stdout_iter = read_logs_for_process(self.process) for i in range(3): self.assertEqual(next(stdout_iter), "Round {}".format(i))