This repository has been archived on 2023-03-26. You can view files and clone it, but cannot push or open issues or pull requests.
catfish/tests/test_main.py

53 lines
2 KiB
Python

from aiohttp.test_utils import unused_port
from catfish import __version__, worker
from tests import BaseTestCase
class MainCLITestCase(BaseTestCase):
def test_version(self):
result = self.run_cli(["--version"])
self.assertEqual(result.exit_code, 0)
self.assertIn(__version__, result.output)
self.assertIn("catfish", result.output)
class WorkerControlTestCase(BaseTestCase):
def test_starts_worker(self):
result = self.run_cli(["start"])
self.assertEqual(result.exit_code, 0)
self.assertTrue(worker.is_running())
worker_process = worker.get_running_process()
self.assertIn(str(worker_process.pid), result.output)
def test_starts_worker_custom_port(self):
port = unused_port()
result = self.run_cli(["start", "--port", port])
self.assertEqual(result.exit_code, 0)
worker_process = worker.get_running_process()
self.assertEqual(worker_process.connections("inet4")[0].laddr.port, port)
def test_stops_worker(self):
result = self.run_cli(["start"])
self.assertEqual(result.exit_code, 0)
self.assertTrue(worker.is_running())
worker_pid = worker.get_running_process().pid
result = self.run_cli(["stop"])
self.assertEqual(result.exit_code, 0)
self.assertIn(str(worker_pid), result.output)
self.assertFalse(worker.is_running())
def test_starting_when_already_running(self):
result = self.run_cli(["start"])
self.assertEqual(result.exit_code, 0)
self.assertTrue(worker.is_running())
result = self.run_cli(["start"])
self.assertEqual(result.exit_code, 2)
self.assertIn("Worker already running", result.output)
self.assertTrue(worker.is_running())
def test_stop_when_not_running(self):
self.assertFalse(worker.is_running())
result = self.run_cli(["stop"])
self.assertEqual(result.exit_code, 2)
self.assertIn("Worker not running", result.output)