1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
#!/usr/bin/env python3
import argparse
import logging
import os
import pathlib
import ssl
import socketserver
import urllib.request
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
class Handler(socketserver.BaseRequestHandler):
def handle(self):
with context.wrap_socket(self.request, server_side=True) as sock:
recv = sock.recv(1024)
recv = recv.decode("ASCII")
assert recv.endswith("\r\n"), f"Received request {repr(recv)} that does not end in \\r\\n"
absolute_uri = recv.removesuffix("\r\n")
assert absolute_uri.startswith("gemini://"), f"Request for uri {absolute_uri} does not start with gemini://"
logging.info(absolute_uri)
request = urllib.request.Request("https://" + absolute_uri.removeprefix("gemini://"))
request.add_header("Accept", "text/gemini")
with urllib.request.urlopen(request) as f:
content = f.read().decode("UTF8")
response = "20 text/gemini\r\n"
response += content
sock.sendall(response.encode("UTF8"))
def main():
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(message)s")
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=1965)
group = parser.add_mutually_exclusive_group()
group.add_argument("--certificates-from-path", type=pathlib.Path)
group.add_argument("--certificates-from-credential")
args = parser.parse_args()
if args.certificates_from_path:
def domain_to_path(server_name):
domain_path = args.certificates_from_path / server_name
return (domain_path / "pubcert.pem" , domain_path / "privkey.pem")
if args.certificates_from_credential:
def domain_to_path(server_name):
credentials_directory = pathlib.Path(os.environ["CREDENTIALS_DIRECTORY"])
return (credentials_directory / f"{args.certificates_from_credential}_{server_name}_pubcert.pem", credentials_directory / f"{args.certificates_from_credential}_{server_name}_privkey.pem")
def sni_callback(socket: ssl.SSLSocket, server_name, _context):
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
certfile, keyfile = domain_to_path(server_name)
context.load_cert_chain(certfile, keyfile)
socket.context = context
context.sni_callback = sni_callback
with socketserver.TCPServer((args.host, args.port), Handler) as server:
server.serve_forever()
if __name__ == "__main__":
main()
|