97 lines
2.9 KiB
Python
97 lines
2.9 KiB
Python
import functools
|
|
import os
|
|
import subprocess
|
|
import time
|
|
from pathlib import Path
|
|
from contextlib import contextmanager
|
|
from unittest import TestCase
|
|
|
|
import psutil
|
|
from aiohttp.test_utils import AioHTTPTestCase, unused_port
|
|
from aiounittest import AsyncTestCase
|
|
from click.testing import CliRunner
|
|
|
|
from catfish.__main__ import cli
|
|
from catfish.router import get_server
|
|
from catfish.utils.processes import (
|
|
get_root_process,
|
|
terminate_processes,
|
|
terminate_subprocesses,
|
|
)
|
|
from catfish.utils.sockets import create_base_socket_dir, delete_base_socket_dir
|
|
from catfish.worker.server import WORKER_SERVER_SOCKET
|
|
|
|
|
|
class BaseTestCase(TestCase):
|
|
TESTS_DIR = Path(os.path.dirname(__file__))
|
|
EXAMPLE_DIR = TESTS_DIR.parent.joinpath("example")
|
|
DUMMY_EXE = EXAMPLE_DIR.joinpath("src", "dummy_program.py")
|
|
|
|
def setUp(self):
|
|
create_base_socket_dir()
|
|
self.cli_runner = CliRunner()
|
|
self.cli = cli
|
|
self.run_cli = functools.partial(self.cli_runner.invoke, self.cli)
|
|
self.in_example_dir = functools.partial(self.change_dir, self.EXAMPLE_DIR)
|
|
|
|
def terminate_dummy_processes(self):
|
|
dummy_processes = []
|
|
for process in get_root_process().children(recursive=True):
|
|
try:
|
|
if str(self.DUMMY_EXE) in " ".join(process.cmdline()):
|
|
dummy_processes.append(process)
|
|
except psutil.NoSuchProcess:
|
|
continue
|
|
terminate_processes(dummy_processes)
|
|
|
|
def tearDown(self):
|
|
terminate_subprocesses()
|
|
self.terminate_dummy_processes()
|
|
delete_base_socket_dir()
|
|
|
|
@contextmanager
|
|
def change_dir(self, newdir):
|
|
prevdir = os.getcwd()
|
|
os.chdir(os.path.expanduser(newdir))
|
|
try:
|
|
yield
|
|
finally:
|
|
os.chdir(prevdir)
|
|
|
|
|
|
class BaseWorkerTestCase(BaseTestCase):
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.unused_port = unused_port()
|
|
self.worker_process = subprocess.Popen(
|
|
[
|
|
"python",
|
|
"-m",
|
|
"catfish",
|
|
"start",
|
|
"--port",
|
|
str(self.unused_port),
|
|
"--no-fork",
|
|
],
|
|
universal_newlines=True,
|
|
)
|
|
while not WORKER_SERVER_SOCKET.exists():
|
|
time.sleep(0.1)
|
|
|
|
def tearDown(self):
|
|
terminate_subprocesses(psutil.Process(self.worker_process.pid))
|
|
self.worker_process.terminate()
|
|
self.worker_process.wait()
|
|
super().tearDown()
|
|
|
|
|
|
class BaseRouterTestCase(AsyncTestCase, AioHTTPTestCase):
|
|
async def get_application(self):
|
|
return get_server()
|
|
|
|
def __getattribute__(self, name):
|
|
attr = super(AioHTTPTestCase, self).__getattribute__(name)
|
|
if name.startswith("test_") and callable(attr):
|
|
return lambda: self.loop.run_until_complete(attr())
|
|
else:
|
|
return attr
|