This repository has been archived on 2023-03-26. You can view files and clone it, but cannot push or open issues or pull requests.
catfish/tests/__init__.py

38 lines
1.0 KiB
Python
Raw Normal View History

2018-12-12 22:34:51 +00:00
import functools
from multiprocessing import Process
2018-12-12 21:55:45 +00:00
from aiohttp.test_utils import unused_port
2018-12-12 22:34:51 +00:00
from aiounittest import AsyncTestCase
2018-12-12 15:53:56 +00:00
from click.testing import CliRunner
2018-12-12 22:34:51 +00:00
2018-12-12 18:47:40 +00:00
from catfish import worker
2018-12-12 22:34:51 +00:00
from catfish.__main__ import cli
from catfish.utils.processes import CURRENT_PROCESS, terminate_processes
2018-12-12 15:53:56 +00:00
2018-12-12 21:55:45 +00:00
class BaseTestCase(AsyncTestCase):
2018-12-12 15:53:56 +00:00
def setUp(self):
2018-12-12 18:47:40 +00:00
worker.stop_worker()
2018-12-12 15:53:56 +00:00
self.cli_runner = CliRunner()
self.cli = cli
2018-12-12 18:47:40 +00:00
self.run_cli = functools.partial(self.cli_runner.invoke, self.cli)
def tearDown(self):
worker.stop_worker()
2018-12-12 22:29:42 +00:00
terminate_processes(CURRENT_PROCESS.children(recursive=True))
2018-12-12 21:55:45 +00:00
class BaseWorkerTestCase(BaseTestCase):
def setUp(self):
super().setUp()
self.unused_port = unused_port()
2018-12-12 22:29:42 +00:00
self.router_process = Process(
target=worker.run, args=(self.unused_port,), daemon=True
)
2018-12-12 21:55:45 +00:00
self.router_process.start()
2018-12-13 11:25:09 +00:00
worker.wait_for_running_worker()
2018-12-12 21:55:45 +00:00
def tearDown(self):
self.router_process.terminate()
super().tearDown()