1
Fork 0
This repository has been archived on 2023-11-05. You can view files and clone it, but cannot push or open issues or pull requests.
dynamic-nginx-host-map/create-nginx-map.py

141 lines
3.7 KiB
Python
Raw Permalink Normal View History

2023-08-18 17:54:34 +01:00
from typing import NamedTuple
import re
from io import StringIO
import sys
2023-10-31 18:22:31 +00:00
import tomllib
from pathlib import Path
from typing import TypedDict
2023-10-31 19:15:26 +00:00
from collections import defaultdict
2023-11-01 21:45:14 +00:00
from urllib.request import urlopen
2023-11-02 21:59:49 +00:00
import argparse
2023-11-01 21:45:14 +00:00
import json
2023-11-02 22:08:51 +00:00
import subprocess
2023-08-18 17:54:34 +01:00
2023-11-02 22:48:23 +00:00
TRAEFIK_RULE_RE = re.compile(r"([a-zA-Z]+)\(`(.+?)`\)")
SUBDOMAIN_WILDCARD_PREFIX = "{subdomain:[a-z]+}"
2023-08-18 17:54:34 +01:00
2023-10-31 18:22:31 +00:00
2023-08-18 17:54:34 +01:00
class Route(NamedTuple):
name: str
destination: str
hostname: str
2023-10-31 18:22:31 +00:00
class Source(TypedDict):
type: str
url: str
destination: str
class Config(TypedDict):
source: list[Source]
def parse_traefik_rule(rule: str) -> list[str]:
2023-11-02 22:48:23 +00:00
rules = defaultdict(list)
for (rule_fn, rule_arg) in TRAEFIK_RULE_RE.findall(rule):
rules[rule_fn].append(rule_arg)
2023-11-02 22:48:23 +00:00
if host_rules := rules.get("Host"):
return host_rules[0]
elif regex_rules := rules.get("HostRegexp"):
regex_rule = regex_rules[0]
if regex_rule.startswith(f"{SUBDOMAIN_WILDCARD_PREFIX}."):
return regex_rule.replace(SUBDOMAIN_WILDCARD_PREFIX, "*", 1)
return None
2023-08-18 17:54:34 +01:00
def get_traefik_routes(traefik_host: str, traefik_route: str):
2023-11-01 21:45:14 +00:00
with urlopen(f"{traefik_host}/api/http/routers") as response:
api_response = json.load(response)
2023-08-18 17:54:34 +01:00
routes = set()
for router in api_response:
2023-11-02 22:48:23 +00:00
host = parse_traefik_rule(router["rule"])
2023-08-18 17:54:34 +01:00
2023-11-02 22:48:23 +00:00
if not host:
print(f"Failed to find host for {router['rule']}", file=sys.stderr)
continue
2023-08-18 17:54:34 +01:00
routes.add(Route(
router["service"],
traefik_route,
2023-11-02 22:48:23 +00:00
host
2023-08-18 17:54:34 +01:00
))
return routes
def get_dokku_routes(dokku_exporter_url: str, dokku_route: str):
2023-11-01 21:45:14 +00:00
with urlopen(dokku_exporter_url) as response:
api_response = json.load(response)
2023-08-18 17:54:34 +01:00
routes = set()
for app in api_response:
for vhost in app["vhosts"]:
routes.add(Route(
app["app"],
dokku_route,
vhost
))
return routes
def main():
2023-11-02 21:59:49 +00:00
parser = argparse.ArgumentParser()
parser.add_argument("--config", "-c", default="./config.toml", type=Path, help="Config file. Default: %(default)s")
parser.add_argument("--output", "-o", type=Path, default="./map.txt", help="Output file. Default: %(default)s")
2023-11-02 22:08:51 +00:00
parser.add_argument("--reload", action="store_true", help="Reload nginx on completion")
2023-11-02 21:59:49 +00:00
args = parser.parse_args()
config: Config = tomllib.loads(args.config.read_text())
2023-10-31 18:22:31 +00:00
2023-08-18 17:54:34 +01:00
routes = []
2023-10-31 18:22:31 +00:00
for source in config["source"]:
match source["type"]:
case "traefik":
routes.extend(get_traefik_routes(source["url"], source["destination"]))
case "dokku":
routes.extend(get_dokku_routes(source["url"], source["destination"]))
2023-08-18 17:54:34 +01:00
2023-10-31 19:10:35 +00:00
if len(routes) != len(set(routes)):
raise ValueError("Conflict found!")
2023-10-31 19:15:26 +00:00
grouped_routes = defaultdict(set)
for route in routes:
grouped_routes[route.hostname, route.destination].add(route.name)
2023-08-18 17:54:34 +01:00
2023-10-31 19:15:26 +00:00
print("Found", len(routes), "routes, grouped into", len(grouped_routes), "groups")
2023-10-31 19:10:35 +00:00
output = StringIO()
for (hostname, destination), names in sorted(grouped_routes.items()):
output.write(f"{hostname}\t{destination}; # {', '.join(names)}\n")
if args.output.is_file():
current_output = args.output.read_text()
if current_output == output.getvalue():
print("Output identical - aborting")
return
args.output.write_text(output.getvalue())
2023-11-02 22:08:51 +00:00
if args.reload:
print("Reloading nginx")
subprocess.check_call(
[
"nginx",
"-s",
"reload"
],
)
2023-08-18 17:54:34 +01:00
if __name__ == "__main__":
main()