from catfish.project import Project from catfish.utils.processes import is_process_running from catfish.worker import BASE_SOCKET_DIR from catfish.worker.server import PayloadType, read_logs_for_process, send_to_server from tests import BaseWorkerTestCase class WorkerServerTestCase(BaseWorkerTestCase): def test_server_creates_process(self): project = Project(self.EXAMPLE_DIR) process = project.get_process("bg") response = send_to_server( PayloadType.PROCESS, {"path": str(project.root), "process": str(process.name)}, ) self.assertTrue(is_process_running(response["pid"])) def test_ping(self): response = send_to_server(PayloadType.PING, {}) self.assertEqual(response, {"ping": "pong"}) class ProcessLogsTestCase(BaseWorkerTestCase): def setUp(self): super().setUp() self.project = Project(self.EXAMPLE_DIR) self.process = self.project.get_process("bg") def test_creates_socket(self): send_to_server( PayloadType.PROCESS, {"path": str(self.project.root), "process": str(self.process.name)}, ) stdout_socket = BASE_SOCKET_DIR.joinpath(self.process.logs_socket) self.assertTrue(stdout_socket.exists()) def test_gets_logs(self): send_to_server( PayloadType.PROCESS, {"path": str(self.project.root), "process": str(self.process.name)}, ) stdout_iter = read_logs_for_process(self.process) for i in range(3): self.assertEqual(next(stdout_iter), "Round {}".format(i))