|
| 1 | +import argparse |
| 2 | + |
| 3 | +from OpenSSL import crypto |
| 4 | +import re |
| 5 | +from requests import Session |
| 6 | +import sys |
| 7 | + |
| 8 | +ca_url_pattern = re.compile(r'CA Issuers \- URI:(\S+)\n') |
| 9 | + |
| 10 | +class CertChainBuilder: |
| 11 | + def get_next_cert_url(self, cert_data: bytes): |
| 12 | + is_pem = False |
| 13 | + try: |
| 14 | + cert_data.decode("utf-8") |
| 15 | + is_pem = True |
| 16 | + except UnicodeDecodeError: |
| 17 | + pass |
| 18 | + if is_pem: |
| 19 | + cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_data) |
| 20 | + else: |
| 21 | + cert = crypto.load_certificate(crypto.FILETYPE_ASN1, cert_data) |
| 22 | + self.certs.append(cert) |
| 23 | + for i in range(cert.get_extension_count()): |
| 24 | + ext = cert.get_extension(i) |
| 25 | + if ext.get_short_name() == b"authorityInfoAccess": |
| 26 | + if match := ca_url_pattern.search(str(ext)): |
| 27 | + return match.group(1) |
| 28 | + return None |
| 29 | + return None |
| 30 | + |
| 31 | + def __init__(self, verbose: bool): |
| 32 | + self.certs = [] |
| 33 | + self.session = Session() |
| 34 | + self.verbose = verbose |
| 35 | + |
| 36 | + def get_cert_from_url(self, url: str) -> bytes: |
| 37 | + with self.session.get(url) as response: |
| 38 | + return response.content |
| 39 | + |
| 40 | + @staticmethod |
| 41 | + def get_cert_common_name(cert: crypto.X509) -> str: |
| 42 | + return cert.get_subject().CN |
| 43 | + |
| 44 | + |
| 45 | + def feed(self, inital_cert: bytes): |
| 46 | + next_url = self.get_next_cert_url(inital_cert) |
| 47 | + while next_url: |
| 48 | + if self.verbose: |
| 49 | + print(f"Fetching next certificate from {next_url}", file=sys.stderr) |
| 50 | + next_cert = self.get_cert_from_url(next_url) |
| 51 | + next_url = self.get_next_cert_url(next_cert) |
| 52 | + |
| 53 | + @staticmethod |
| 54 | + def get_cert_pem(cert: crypto.X509) -> str: |
| 55 | + return crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode("utf-8").rstrip("\n") |
| 56 | + |
| 57 | + def build_chain(self) -> str: |
| 58 | + # Order: Initial, Intermediate Certs, Root |
| 59 | + # Intermediate Certs: Highest -> Lowest (in the context of self.certs, this means from higher indices to lower) |
| 60 | + # Initial: self.certs[0] |
| 61 | + # Root: self.certs[-1] |
| 62 | + chain = [self.get_cert_pem(self.certs[0])] |
| 63 | + for cert in self.certs[-2:0:-1]: # Starts from the second highest index and goes to the second lowest index |
| 64 | + chain.append(self.get_cert_pem(cert)) |
| 65 | + chain.append(self.get_cert_pem(self.certs[-1])) |
| 66 | + return "\n".join(chain) |
| 67 | + |
| 68 | +def main(args=None): |
| 69 | + parser = argparse.ArgumentParser(description="Build a certificate chain from a certificate") |
| 70 | + parser.add_argument("cert", help="The certificate to build the chain from") |
| 71 | + parser.add_argument("--out", help="The file to write the chain to (otherwise writes to stdout)") |
| 72 | + parser.add_argument("--verbose", help="Logs intermediate certificate URLs", action="store_true") |
| 73 | + parser.add_argument("--save-intermediate-certificates", help="Saves intermediate certificates", action="store_true") |
| 74 | + args = parser.parse_args(args) |
| 75 | + with open(args.cert, "rb") as f: |
| 76 | + cert_data = f.read() |
| 77 | + builder = CertChainBuilder(verbose=args.verbose) |
| 78 | + builder.feed(cert_data) |
| 79 | + chain = builder.build_chain() |
| 80 | + if args.out: |
| 81 | + with open(args.out, "w") as f: |
| 82 | + f.write(chain) |
| 83 | + else: |
| 84 | + print(chain) |
| 85 | + return |
| 86 | + |
| 87 | +if __name__ == "__main__": |
| 88 | + main() |
0 commit comments