archive
/
tcp-nat-proxy
Archived
1
Fork 0
This repository has been archived on 2023-03-26. You can view files and clone it, but cannot push or open issues or pull requests.
tcp-nat-proxy/tcp_nat_proxy.py

85 lines
2.3 KiB
Python

import argparse
import asyncio
import logging
from collections import namedtuple
from functools import partial
logger = logging.getLogger(__name__)
Route = namedtuple("Route", ["listen_port", "destination_host", "destination_port"])
BUFFER_SIZE = 128 * 1024 # 64kb
FORMAT = "%(levelname)s: %(message)s"
logging.basicConfig(format=FORMAT)
def destination_host_display(route: Route):
return "{}:{}".format(route.destination_host, route.destination_port)
def parse_argument(value):
return Route(*value.split(":"))
async def pipe(reader, writer):
try:
while not reader.at_eof():
writer.write(await reader.read(BUFFER_SIZE))
await writer.drain()
finally:
writer.close()
async def handle_client(route, local_reader, local_writer):
try:
logger.debug(
"Openning connection to {}".format(destination_host_display(route))
)
remote_reader, remote_writer = await asyncio.open_connection(
route.destination_host, route.destination_port
)
await asyncio.gather(
pipe(local_reader, remote_writer), pipe(remote_reader, local_writer)
)
except (ConnectionRefusedError, OSError) as e:
logger.warning(
"Connection to {} refused: {}".format(destination_host_display(route), e)
)
pass
finally:
local_writer.close()
async def create_proxy_pipe(route: Route):
server = await asyncio.start_server(
partial(handle_client, route), "0.0.0.0", route.listen_port
)
logger.info(
"Routing from {} to {}".format(
route.listen_port, destination_host_display(route)
)
)
await server.wait_closed()
async def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"-R", "--route", action="append", required=True, type=parse_argument
)
parser.add_argument("--verbose", action="store_true")
args = parser.parse_args()
logger.setLevel(logging.DEBUG if args.verbose else logging.INFO)
servers = [create_proxy_pipe(route) for route in args.route]
logger.debug("Starting servers...")
await asyncio.gather(*servers)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.error("Process terminated")