import functools import os import shutil import subprocess import time from pathlib import Path from unittest import TestCase from aiohttp.test_utils import AioHTTPTestCase, unused_port from aiounittest import AsyncTestCase from click.testing import CliRunner from catfish.__main__ import cli from catfish.router import get_server from catfish.utils.processes import ( get_root_process, terminate_processes, terminate_subprocesses, ) from catfish.utils.sockets import create_base_socket_dir, delete_base_socket_dir from catfish.worker.server import WORKER_SERVER_SOCKET class BaseTestCase(TestCase): TESTS_DIR = Path(os.path.dirname(__file__)) EXAMPLE_DIR = TESTS_DIR.parent.joinpath("example") DUMMY_EXE = EXAMPLE_DIR.joinpath("src", "dummy_program.py") def setUp(self): create_base_socket_dir() self.cli_runner = CliRunner() self.cli = cli self.run_cli = functools.partial(self.cli_runner.invoke, self.cli) def terminate_dummy_processes(self): dummy_processes = [] for process in get_root_process().children(recursive=True): if str(self.DUMMY_EXE) in " ".join(process.cmdline()): dummy_processes.append(process) terminate_processes(dummy_processes) def tearDown(self): terminate_subprocesses() self.terminate_dummy_processes() delete_base_socket_dir() class BaseWorkerTestCase(BaseTestCase): def setUp(self): super().setUp() self.unused_port = unused_port() self.worker_process = subprocess.Popen( [ shutil.which("ctf"), "start", "--port", str(self.unused_port), "--no-fork", ], universal_newlines=True, ) while not WORKER_SERVER_SOCKET.exists(): time.sleep(0.1) def tearDown(self): self.worker_process.terminate() self.worker_process.wait() super().tearDown() class BaseRouterTestCase(AsyncTestCase, AioHTTPTestCase): async def get_application(self): return get_server() def __getattribute__(self, name): attr = super(AioHTTPTestCase, self).__getattribute__(name) if name.startswith("test_") and callable(attr): return lambda: self.loop.run_until_complete(attr()) else: return attr