This repository has been archived on 2023-03-26. You can view files and clone it, but cannot push or open issues or pull requests.
ipc-unix/ipc_unix/pubsub.py

112 lines
3.2 KiB
Python
Raw Normal View History

2018-12-08 13:07:26 +00:00
import os
2018-12-07 23:18:29 +00:00
import select
import socket
import threading
2018-12-07 23:18:29 +00:00
import ujson
2018-12-08 14:36:37 +00:00
from ipc_unix.utils import (
DEFAULT_SOCKET_READ_TIMEOUT,
NEW_LINE,
read_payload,
socket_has_data,
)
2018-12-07 23:18:29 +00:00
class Subscriber:
def __init__(self, socket_path):
self.socket_path = socket_path
2018-12-08 13:07:26 +00:00
self.socket = socket.socket(
socket.AF_UNIX, type=socket.SOCK_STREAM | socket.SOCK_NONBLOCK
)
2018-12-07 23:18:29 +00:00
self.socket.connect(self.socket_path)
@property
def has_data(self):
return socket_has_data(self.socket)
2018-12-07 23:18:29 +00:00
def listen(self):
while True:
yield from self.get_message()
2018-12-07 23:18:29 +00:00
def get_messages(self) -> dict:
2018-12-07 23:18:29 +00:00
return read_payload(self.socket)
def flush_data(self):
while self.has_data:
yield from self.get_messages()
def get_latest_message(self):
data = list(self.flush_data())
return data[-1] if data else None
2018-12-07 23:18:29 +00:00
def close(self):
self.socket.close()
class Publisher:
def __init__(self, socket_path):
self.socket_path = socket_path
2018-12-08 13:07:26 +00:00
self.master_socket = socket.socket(
socket.AF_UNIX, type=socket.SOCK_STREAM | socket.SOCK_NONBLOCK
)
2018-12-07 23:18:29 +00:00
self.master_socket.bind(self.socket_path)
2018-12-08 13:07:26 +00:00
self.master_socket.listen()
2018-12-07 23:18:29 +00:00
self.connections = []
self.accepting_new_connections = threading.Event()
self.accepting_new_connections.set()
self.new_connections_thread = threading.Thread(
target=self._accept_new_connections
)
def start(self):
self.accepting_new_connections.set()
self.new_connections_thread.start()
2018-12-07 23:18:29 +00:00
def close(self):
self.accepting_new_connections.clear()
if self.new_connections_thread.is_alive():
self.new_connections_thread.join()
2018-12-07 23:18:29 +00:00
self.master_socket.close()
for connection in self.connections:
connection.close()
2018-12-07 23:18:29 +00:00
self.connections.clear()
os.remove(self.socket_path)
2018-12-07 23:18:29 +00:00
def accept_outstanding_connections(self):
if self.new_connections_thread.is_alive():
raise Exception(
"Cannot accept connections manually whilst thread is running"
)
2018-12-08 12:30:47 +00:00
while socket_has_data(self.master_socket):
2018-12-07 23:18:29 +00:00
new_socket, _ = self.master_socket.accept()
self.connections.append(new_socket)
def _accept_new_connections(self):
while self.accepting_new_connections.is_set():
if socket_has_data(self.master_socket):
new_socket, _ = self.master_socket.accept()
self.connections.append(new_socket)
2018-12-07 23:18:29 +00:00
def write(self, message: dict):
_, writable, errorable = select.select(
[],
self.connections,
[],
DEFAULT_SOCKET_READ_TIMEOUT * len(self.connections),
)
2018-12-07 23:18:29 +00:00
dead_sockets = []
if writable:
2018-12-08 14:36:37 +00:00
data = ujson.dumps(message).encode() + NEW_LINE
2018-12-07 23:18:29 +00:00
for sock in writable:
try:
sock.send(data)
except BrokenPipeError:
dead_sockets.append(sock)
for sock in dead_sockets:
if sock in self.connections:
self.connections.remove(sock)
sock.close()