2018-12-08 13:07:26 +00:00
|
|
|
import os
|
2018-12-07 23:18:29 +00:00
|
|
|
import select
|
|
|
|
import socket
|
2018-12-08 13:31:22 +00:00
|
|
|
import threading
|
2018-12-07 23:18:29 +00:00
|
|
|
|
|
|
|
import ujson
|
2018-12-08 14:28:26 +00:00
|
|
|
from ipc_unix.utils import DEFAULT_SOCKET_READ_TIMEOUT, read_payload, socket_has_data
|
2018-12-07 23:18:29 +00:00
|
|
|
|
|
|
|
|
|
|
|
class Subscriber:
|
|
|
|
def __init__(self, socket_path):
|
|
|
|
self.socket_path = socket_path
|
2018-12-08 13:07:26 +00:00
|
|
|
self.socket = socket.socket(
|
|
|
|
socket.AF_UNIX, type=socket.SOCK_STREAM | socket.SOCK_NONBLOCK
|
|
|
|
)
|
2018-12-07 23:18:29 +00:00
|
|
|
self.socket.connect(self.socket_path)
|
|
|
|
|
2018-12-08 01:12:34 +00:00
|
|
|
@property
|
|
|
|
def has_data(self):
|
|
|
|
return socket_has_data(self.socket)
|
|
|
|
|
2018-12-07 23:18:29 +00:00
|
|
|
def listen(self):
|
|
|
|
while True:
|
2018-12-08 01:12:34 +00:00
|
|
|
yield from self.get_message()
|
2018-12-07 23:18:29 +00:00
|
|
|
|
2018-12-08 01:12:34 +00:00
|
|
|
def get_messages(self) -> dict:
|
2018-12-07 23:18:29 +00:00
|
|
|
return read_payload(self.socket)
|
|
|
|
|
2018-12-08 01:12:34 +00:00
|
|
|
def flush_data(self):
|
|
|
|
while self.has_data:
|
|
|
|
yield from self.get_messages()
|
|
|
|
|
|
|
|
def get_latest_message(self):
|
|
|
|
data = list(self.flush_data())
|
|
|
|
return data[-1] if data else None
|
|
|
|
|
2018-12-07 23:18:29 +00:00
|
|
|
def close(self):
|
|
|
|
self.socket.close()
|
|
|
|
|
|
|
|
|
|
|
|
class Publisher:
|
|
|
|
def __init__(self, socket_path):
|
|
|
|
self.socket_path = socket_path
|
2018-12-08 13:07:26 +00:00
|
|
|
self.master_socket = socket.socket(
|
|
|
|
socket.AF_UNIX, type=socket.SOCK_STREAM | socket.SOCK_NONBLOCK
|
|
|
|
)
|
2018-12-07 23:18:29 +00:00
|
|
|
self.master_socket.bind(self.socket_path)
|
2018-12-08 13:07:26 +00:00
|
|
|
self.master_socket.listen()
|
2018-12-07 23:18:29 +00:00
|
|
|
self.connections = []
|
2018-12-08 13:31:22 +00:00
|
|
|
self.accepting_new_connections = threading.Event()
|
|
|
|
self.accepting_new_connections.set()
|
|
|
|
self.new_connections_thread = threading.Thread(
|
|
|
|
target=self._accept_new_connections
|
|
|
|
)
|
|
|
|
|
|
|
|
def start(self):
|
|
|
|
self.accepting_new_connections.set()
|
|
|
|
self.new_connections_thread.start()
|
2018-12-07 23:18:29 +00:00
|
|
|
|
|
|
|
def close(self):
|
2018-12-08 13:31:22 +00:00
|
|
|
self.accepting_new_connections.clear()
|
|
|
|
if self.new_connections_thread.is_alive():
|
|
|
|
self.new_connections_thread.join()
|
2018-12-07 23:18:29 +00:00
|
|
|
self.master_socket.close()
|
2018-12-08 12:37:39 +00:00
|
|
|
for connection in self.connections:
|
|
|
|
connection.close()
|
2018-12-07 23:18:29 +00:00
|
|
|
self.connections.clear()
|
2018-12-08 13:06:08 +00:00
|
|
|
os.remove(self.socket_path)
|
2018-12-07 23:18:29 +00:00
|
|
|
|
2018-12-08 13:31:22 +00:00
|
|
|
def accept_outstanding_connections(self):
|
|
|
|
if self.new_connections_thread.is_alive():
|
|
|
|
raise Exception(
|
|
|
|
"Cannot accept connections manually whilst thread is running"
|
|
|
|
)
|
2018-12-08 12:30:47 +00:00
|
|
|
while socket_has_data(self.master_socket):
|
2018-12-07 23:18:29 +00:00
|
|
|
new_socket, _ = self.master_socket.accept()
|
|
|
|
self.connections.append(new_socket)
|
|
|
|
|
2018-12-08 13:31:22 +00:00
|
|
|
def _accept_new_connections(self):
|
|
|
|
while self.accepting_new_connections.is_set():
|
|
|
|
if socket_has_data(self.master_socket):
|
|
|
|
new_socket, _ = self.master_socket.accept()
|
|
|
|
self.connections.append(new_socket)
|
2018-12-07 23:18:29 +00:00
|
|
|
|
2018-12-08 13:31:22 +00:00
|
|
|
def write(self, message: dict):
|
2018-12-08 14:28:26 +00:00
|
|
|
_, writable, errorable = select.select(
|
|
|
|
[],
|
|
|
|
self.connections,
|
|
|
|
[],
|
|
|
|
DEFAULT_SOCKET_READ_TIMEOUT * len(self.connections),
|
|
|
|
)
|
2018-12-07 23:18:29 +00:00
|
|
|
|
|
|
|
dead_sockets = []
|
|
|
|
|
|
|
|
if writable:
|
|
|
|
data = ujson.dumps(message).encode() + b"\n"
|
|
|
|
for sock in writable:
|
|
|
|
try:
|
|
|
|
sock.send(data)
|
|
|
|
except BrokenPipeError:
|
|
|
|
dead_sockets.append(sock)
|
|
|
|
|
|
|
|
for sock in dead_sockets:
|
|
|
|
if sock in self.connections:
|
|
|
|
self.connections.remove(sock)
|
|
|
|
sock.close()
|