No description
This repository has been archived on 2023-03-26. You can view files and clone it, but cannot push or open issues or pull requests.
Find a file
2018-12-08 16:06:54 +00:00
.circleci Move linting to generic script 2018-12-08 16:06:54 +00:00
ipc_unix Enable use as a context manager 2018-12-08 15:32:29 +00:00
scripts Move linting to generic script 2018-12-08 16:06:54 +00:00
tests Refactor simple server client to class 2018-12-08 14:23:16 +00:00
.gitignore Setup project 2018-12-07 13:54:46 +00:00
.travis.yml Build on xenial on travis 2018-12-08 16:02:27 +00:00
dev-requirements.txt Move black to separate script 2018-12-08 15:55:56 +00:00
LICENSE Create LICENSE 2018-12-08 15:08:54 +00:00
README.md Add travis ci badge 2018-12-08 15:41:16 +00:00
setup.cfg Force payload to be dict, increase stability and speed 2018-12-08 01:12:34 +00:00
setup.py Run linters on setup.py 2018-12-07 21:10:52 +00:00

IPC-Unix

CircleCI Build Status

Simple Inter-Process Communication using unix sockets for Python.

Note: Whilst mostly working, there's still some fairly common cases which haven't been tested. Use at your own risk until this message is removed.

Examples

For some more concrete examples, check out the tests/ directory.

Call / Response

from ipc_unix import Server, Client

class EchoServer(Server):
    def handle_request(self, request):
        return request

socket_path = '/tmp/sock.sock'
server = EchoServer(socket_path)
client = Client(socket_path)

print(client.send({"foo": "bar"}))
>>> {"foo": "bar"}

Pub-Sub

from ipc_unix import pubsub

socket_path = '/tmp/sock.sock'
publisher = pubsub.Publisher(socket_path)
subscriber = pubsub.Subscriber(socket_path)

publisher.write({"foo": "bar"})
print(self.subscriber.get_latest_message())
>>> {"foo": "bar"}

publisher.close()
subscriber.close()