2018-12-13 18:01:40 +00:00
|
|
|
from aiohttp.test_utils import unused_port
|
|
|
|
|
2018-12-12 18:47:40 +00:00
|
|
|
from catfish import __version__, worker
|
2018-12-12 15:53:56 +00:00
|
|
|
from tests import BaseTestCase
|
|
|
|
|
|
|
|
|
|
|
|
class MainCLITestCase(BaseTestCase):
|
|
|
|
def test_version(self):
|
2018-12-12 18:47:40 +00:00
|
|
|
result = self.run_cli(["--version"])
|
2018-12-12 15:53:56 +00:00
|
|
|
self.assertEqual(result.exit_code, 0)
|
|
|
|
self.assertIn(__version__, result.output)
|
|
|
|
self.assertIn("catfish", result.output)
|
2018-12-12 18:47:40 +00:00
|
|
|
|
|
|
|
|
|
|
|
class WorkerControlTestCase(BaseTestCase):
|
|
|
|
def test_starts_worker(self):
|
|
|
|
result = self.run_cli(["start"])
|
|
|
|
self.assertEqual(result.exit_code, 0)
|
|
|
|
self.assertTrue(worker.is_running())
|
|
|
|
worker_process = worker.get_running_process()
|
|
|
|
self.assertIn(str(worker_process.pid), result.output)
|
|
|
|
|
2018-12-13 18:01:40 +00:00
|
|
|
def test_starts_worker_custom_port(self):
|
|
|
|
port = unused_port()
|
|
|
|
result = self.run_cli(["start", "--port", port])
|
|
|
|
self.assertEqual(result.exit_code, 0)
|
|
|
|
worker_process = worker.get_running_process()
|
|
|
|
self.assertEqual(worker_process.connections("inet4")[0].laddr.port, port)
|
|
|
|
|
2018-12-12 18:47:40 +00:00
|
|
|
def test_stops_worker(self):
|
|
|
|
result = self.run_cli(["start"])
|
|
|
|
self.assertEqual(result.exit_code, 0)
|
|
|
|
self.assertTrue(worker.is_running())
|
|
|
|
worker_pid = worker.get_running_process().pid
|
|
|
|
result = self.run_cli(["stop"])
|
|
|
|
self.assertEqual(result.exit_code, 0)
|
|
|
|
self.assertIn(str(worker_pid), result.output)
|
|
|
|
self.assertFalse(worker.is_running())
|
|
|
|
|
|
|
|
def test_starting_when_already_running(self):
|
|
|
|
result = self.run_cli(["start"])
|
|
|
|
self.assertEqual(result.exit_code, 0)
|
|
|
|
self.assertTrue(worker.is_running())
|
|
|
|
result = self.run_cli(["start"])
|
|
|
|
self.assertEqual(result.exit_code, 2)
|
|
|
|
self.assertIn("Worker already running", result.output)
|
|
|
|
self.assertTrue(worker.is_running())
|
|
|
|
|
|
|
|
def test_stop_when_not_running(self):
|
|
|
|
self.assertFalse(worker.is_running())
|
|
|
|
result = self.run_cli(["stop"])
|
|
|
|
self.assertEqual(result.exit_code, 2)
|
|
|
|
self.assertIn("Worker not running", result.output)
|