archive
/
catfish
Archived
1
Fork 0
This repository has been archived on 2023-03-26. You can view files and clone it, but cannot push or open issues or pull requests.
catfish/tests/__init__.py

98 lines
2.9 KiB
Python

import functools
import os
import subprocess
import time
from contextlib import contextmanager
from pathlib import Path
from unittest import TestCase
import psutil
from aiohttp.test_utils import AioHTTPTestCase, unused_port
from aiounittest import AsyncTestCase
from click.testing import CliRunner
from catfish.__main__ import cli
from catfish.router import get_server
from catfish.utils.processes import (
get_root_process,
terminate_processes,
terminate_subprocesses,
)
from catfish.utils.sockets import create_base_socket_dir, delete_base_socket_dir
from catfish.worker.server import WORKER_SERVER_SOCKET
class BaseTestCase(TestCase):
TESTS_DIR = Path(os.path.dirname(__file__))
EXAMPLE_DIR = TESTS_DIR.parent.joinpath("example")
DUMMY_EXE = EXAMPLE_DIR.joinpath("src", "dummy_program.py")
def setUp(self):
create_base_socket_dir()
self.cli_runner = CliRunner()
self.cli = cli
self.run_cli = functools.partial(self.cli_runner.invoke, self.cli)
self.in_example_dir = functools.partial(self.change_dir, self.EXAMPLE_DIR)
def terminate_dummy_processes(self):
dummy_processes = []
for process in get_root_process().children(recursive=True):
try:
if str(self.DUMMY_EXE) in " ".join(process.cmdline()):
dummy_processes.append(process)
except psutil.NoSuchProcess:
continue
terminate_processes(dummy_processes)
def tearDown(self):
terminate_subprocesses()
self.terminate_dummy_processes()
delete_base_socket_dir()
@contextmanager
def change_dir(self, newdir):
prevdir = os.getcwd()
os.chdir(os.path.expanduser(newdir))
try:
yield
finally:
os.chdir(prevdir)
class BaseWorkerTestCase(BaseTestCase):
def setUp(self):
super().setUp()
self.unused_port = unused_port()
self.worker_process = subprocess.Popen(
[
"python",
"-m",
"catfish",
"start",
"--port",
str(self.unused_port),
"--no-fork",
],
universal_newlines=True,
)
while not WORKER_SERVER_SOCKET.exists():
time.sleep(0.1)
def tearDown(self):
terminate_subprocesses(psutil.Process(self.worker_process.pid))
self.worker_process.terminate()
self.worker_process.wait()
super().tearDown()
class BaseRouterTestCase(AsyncTestCase, AioHTTPTestCase):
async def get_application(self):
return get_server()
def __getattribute__(self, name):
attr = super(AioHTTPTestCase, self).__getattribute__(name)
if name.startswith("test_") and callable(attr):
return lambda: self.loop.run_until_complete(attr())
else:
return attr