from typing import NamedTuple import re import sys import tomllib from pathlib import Path from typing import TypedDict from collections import defaultdict from urllib.request import urlopen import argparse import json import subprocess TRAEFIK_HOST_RE = re.compile(r"Host\(`([a-z0-9\.-]+)`\)") class Route(NamedTuple): name: str destination: str hostname: str class Source(TypedDict): type: str url: str destination: str class Config(TypedDict): source: list[Source] def parse_traefik_rule(rule: str) -> list[str]: if "Host(" not in rule: return [] return TRAEFIK_HOST_RE.findall(rule) def get_traefik_routes(traefik_host: str, traefik_route: str): with urlopen(f"{traefik_host}/api/http/routers") as response: api_response = json.load(response) routes = set() for router in api_response: hosts = parse_traefik_rule(router["rule"]) if not hosts: print(f"Failed to find host for {router['rule']}", file=sys.stderr) continue if len(hosts) > 1: print(f"WARNING: Found multiple hosts for rule: {router['rule']}", file=sys.stderr) routes.add(Route( router["service"], traefik_route, hosts[0] )) return routes def get_dokku_routes(dokku_exporter_url: str, dokku_route: str): with urlopen(dokku_exporter_url) as response: api_response = json.load(response) routes = set() for app in api_response: for vhost in app["vhosts"]: routes.add(Route( app["app"], dokku_route, vhost )) return routes def main(): parser = argparse.ArgumentParser() parser.add_argument("--config", "-c", default="./config.toml", type=Path, help="Config file. Default: %(default)s") parser.add_argument("--output", "-o", type=Path, default="./map.txt", help="Output file. Default: %(default)s") parser.add_argument("--reload", action="store_true", help="Reload nginx on completion") args = parser.parse_args() config: Config = tomllib.loads(args.config.read_text()) routes = [] for source in config["source"]: match source["type"]: case "traefik": routes.extend(get_traefik_routes(source["url"], source["destination"])) case "dokku": routes.extend(get_dokku_routes(source["url"], source["destination"])) if len(routes) != len(set(routes)): raise ValueError("Conflict found!") grouped_routes = defaultdict(set) for route in routes: grouped_routes[route.hostname, route.destination].add(route.name) with args.output.open("w", buffering=1024 * 1024) as f: for (hostname, destination), names in sorted(grouped_routes.items()): print(f"{hostname}\t{destination}; # {', '.join(names)}", file=f) print("Found", len(routes), "routes, grouped into", len(grouped_routes), "groups") if args.reload: print("Reloading nginx") subprocess.check_call( [ "nginx", "-s", "reload" ], ) if __name__ == "__main__": main()