This repository has been archived on 2023-03-26. You can view files and clone it, but cannot push or open issues or pull requests.
ipc-unix/tests/test_pubsub.py
2018-12-07 23:18:29 +00:00

28 lines
846 B
Python

from unittest import TestCase
from ipc_unix import pubsub
from tests import get_temp_file_path
class PubSubTestCase(TestCase):
def setUp(self):
self.socket_path = get_temp_file_path()
self.publisher = pubsub.Publisher(self.socket_path)
self.subscriber = pubsub.Subscriber(self.socket_path)
def tearDown(self):
self.publisher.close()
self.subscriber.close()
def test_transmits(self):
self.publisher.write({"foo": "bar"})
response = self.subscriber.get_message()
self.assertEqual(response, {"foo": "bar"})
def test_buffers_messages(self):
for i in range(5):
self.publisher.write(i)
messages = []
for i in range(5):
messages.append(self.subscriber.get_message())
self.assertEqual(messages, [0, 1, 2, 3, 4])