#!/usr/bin/env python3 import grp import logging import os import pwd import subprocess import sys from datetime import datetime from typing import List, Dict import acme import josepy as jose import requests import yaml from acme import client, messages, challenges, crypto_util from OpenSSL import crypto SSL_CONFIG_DIR = "/etc/ssl" SSL_CONFIG_FILE = os.path.join(SSL_CONFIG_DIR, "acme.yml") SSL_ACCOUNTS_DIR = os.path.join(SSL_CONFIG_DIR, "accounts") SSL_KEYS_DIR = os.path.join(SSL_CONFIG_DIR, "private") SSL_CSR_DIR = os.path.join(SSL_CONFIG_DIR, "csr") SSL_CERTS_DIR = os.path.join(SSL_CONFIG_DIR, "certs") try: SSL_GROUP = grp.getgrnam("ssl-cert").gr_name except KeyError: SSL_GROUP = "root" ACME_ACCOUNT_KEY = os.path.join(SSL_ACCOUNTS_DIR, "acme_account.key") ACME_CHALLENGE_DIR = "/var/www/acme/.well-known/acme-challenge" LETSENCRYPT_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory" OPENSSL_DATE_FORMAT = '%Y%m%d%H%M%SZ' CERT_REMAINING_DAYS = 30 USER_AGENT = "python-acme" OS_OWNERS = {} OS_GROUPS = {} def chown(path: str, owner: str, group: str, **kwargs) -> None: if owner not in OS_OWNERS: OS_OWNERS[owner] = pwd.getpwnam(owner).pw_uid if group not in OS_GROUPS: OS_GROUPS[group] = grp.getgrnam(group).gr_gid os.chown(path, OS_OWNERS[owner], OS_GROUPS[group], **kwargs) class PrivateKey: def __init__(self, private_key: str, owner: str, group: str): self.private_key = private_key self.owner = owner self.group = group def load_or_create_private_key(self) -> crypto.PKey: logging.info(f"Loading private key from {self.private_key}") try: with open(self.private_key, 'rb') as pem_in: private_key_pem = crypto.load_privatekey(crypto.FILETYPE_PEM, pem_in.read()) return private_key_pem except IOError as e: logging.warning(f"Unable to load private key: {e}") logging.info("Generating a new private key") private_key_pem = crypto.PKey() private_key_pem.generate_key(crypto.TYPE_RSA, 4096) logging.info(f"Writing private key into {self.private_key}") private_key_dir = os.path.dirname(self.private_key) os.makedirs(private_key_dir, mode=0o750, exist_ok=True) chown(private_key_dir, self.owner, self.group) ofd = os.open(self.private_key, os.O_CREAT | os.O_WRONLY, 0o640) with open(ofd, 'wb') as pem_out: pem_out.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, private_key_pem)) chown(self.private_key, self.owner, self.group) return private_key_pem class Account(PrivateKey): def __init__(self, private_key: str, email: str, owner: str, group: str): super().__init__(private_key, owner, group) self.private_key_pem = self.load_or_create_private_key() self.email = email self.jwk = jose.JWKRSA(key=self.private_key_pem.to_cryptography_key()) self.jwk_pub = self.jwk.public_key() def register(self, directory_url: str) -> acme.client.ClientV2: client_network = client.ClientNetwork(self.jwk, user_agent=USER_AGENT) directory_json = client_network.get(directory_url).json() directory = messages.Directory.from_json(directory_json) acme_client = client.ClientV2(directory, net=client_network) try: logging.info(f"Registering with a new ACME account") registration = messages.NewRegistration.from_data( email=self.email, terms_of_service_agreed=True) acme_client.new_account(registration) except acme.errors.ConflictError as e: logging.warning(f"Unable to create a new ACME account: {e}") logging.info("Registering with an existing ACME account") registration_message = messages.NewRegistration( key=self.jwk_pub, only_return_existing=True) registration_response = acme_client._post(directory["newAccount"], registration_message) account = acme_client._regr_from_response(registration_response) acme_client.net.account = account return acme_client class Challenge: def __init__(self, path: str, acme_client: acme.client.ClientV2): self.path = path self.acme_client = acme_client def run(self, csr_pem: str) -> str: logging.info("Ordering ACME challenge") order = self.acme_client.new_order(csr_pem) logging.info("Selecting HTTP-01 ACME challenge") challenge = self.select_http01_challenge(order) logging.info("Performing HTTP-01 ACME challenge") fullchain_pem = self.perform_http01_challenge(challenge, order) return fullchain_pem @staticmethod def select_http01_challenge(order: messages.OrderResource) \ -> challenges.Challenge: """Select the HTTP-01 challenge from a given order :param messages.OrderResource order: Order containing the challenges. :returns: The HTTP-01 challenge. :rtype: challenges.Challenge """ for auth in order.authorizations: for challenge in auth.body.challenges: if isinstance(challenge.chall, challenges.HTTP01): return challenge raise Exception("HTTP-01 challenge was not offered by the CA server.") def perform_http01_challenge(self, challenge: challenges.Challenge, order: messages.OrderResource) -> str: """Perform the HTTP-01 challenge in a given directory :param challenges.Challenge challenge: A ACME challenge :param messages.OrderResource order: An ACME order :returns: The fullchain certificate in PEM file format :rtype: str """ account_key = self.acme_client.net.key response, validation = challenge.response_and_validation(account_key) validation_filename, _ = validation.split('.') challenge_path = os.path.join(self.path, validation_filename) logging.info(f"Writing challenge into {challenge_path} directory") os.makedirs(self.path, mode=0o755, exist_ok=True) with open(challenge_path, 'w') as ofd: ofd.write(validation) self.acme_client.answer_challenge(challenge, response) fullchain_pem = self.acme_client.poll_and_finalize(order).fullchain_pem os.remove(challenge_path) return fullchain_pem class Domain(PrivateKey): def __init__(self, name: str, owner: str, group: str, alt_names: List[str] = None, remaining_days: int = CERT_REMAINING_DAYS, hooks: List[str] = None): private_key = os.path.join(SSL_KEYS_DIR, name + '.key') super().__init__(private_key, owner, group) self.name = name self.alt_names = [] if alt_names is None else alt_names self.remaining_days = remaining_days self.hooks = [] if hooks is None else hooks certs_dir = os.path.join(SSL_CERTS_DIR, name + '.d') self.csr = os.path.join(SSL_CSR_DIR, name + '.csr') self.cert = os.path.join(certs_dir, 'cert.pem') self.chain_cert = os.path.join(certs_dir, 'chain.pem') self.fullchain_cert = os.path.join(certs_dir, 'fullchain.pem') self.cert_expiration_days = None def check_certificate_expiration_date(self) -> bool: """Indicate whether the certificate will expire soon or not. :returns: True if the certificate expires soon. :rtype: bool """ if self.cert_expiration_days is None: logging.debug(f"Checking '{self.cert}' certificate expiration " "date") try: with open(self.cert, 'rb') as pem_in: cert_pem = crypto.load_certificate(crypto.FILETYPE_PEM, pem_in.read()) except IOError as e: logging.warning(f"Unable to load certificate: {e}") self.cert_expiration_days = float("-inf") return True now = datetime.now() cert_raw_expiration_date = cert_pem.get_notAfter().decode() cert_expiration_date = datetime.strptime(cert_raw_expiration_date, OPENSSL_DATE_FORMAT) self.cert_expiration_days = (cert_expiration_date - now).days logging.info(f"Certificate expires in {self.cert_expiration_days} " "days.") return self.cert_expiration_days <= self.remaining_days def renew_cert(self, challenge: Challenge, force: bool = False) -> None: if not (force or self.check_certificate_expiration_date()): return logging.info(f"Renewing {self.name} certificates") csr_pem = self.load_or_create_csr() fullchain_pem = challenge.run(csr_pem) logging.info(f"Saving {self.name} certificates") certs_pem = [] for line in fullchain_pem.split('\n'): if 'BEGIN CERTIFICATE' in line: cert_pem = '' cert_pem += line + '\n' if 'END CERTIFICATE' in line: certs_pem.append(cert_pem) logging.info(f"Writing '{self.cert}' certificate") cert_dir = os.path.dirname(self.cert) os.makedirs(cert_dir, mode=0o755, exist_ok=True) with open(self.cert, 'w') as pem_out: pem_out.write(certs_pem[0]) logging.info(f"Writing '{self.chain_cert}' chain certificate") chain_cert_dir = os.path.dirname(self.chain_cert) os.makedirs(chain_cert_dir, mode=0o755, exist_ok=True) with open(self.chain_cert, 'w') as pem_out: pem_out.write(''.join(certs_pem[1:])) logging.info(f"Writing '{self.fullchain_cert}' full chain certificate") fullchain_cert_dir = os.path.dirname(self.fullchain_cert) os.makedirs(fullchain_cert_dir, mode=0o755, exist_ok=True) with open(self.fullchain_cert, 'w') as pem_out: pem_out.write(fullchain_pem) self.run_hooks() def load_or_create_csr(self) -> str: """Load or create a CSR for the domain. :returns: The CSR in PEM file format. :rtype: str """ logging.info(f"Loading {self.name} CSR file from '{self.csr}'") try: with open(self.csr, 'rb') as pem_in: csr = pem_in.read() csr_pem = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr) except IOError as e: logging.warning(f"Unable to load CSR file: {e}") csr = self.create_csr() logging.info("Checking if CSR contains all the domains") domain_names = set() for ext in csr_pem.get_extensions(): if ext.get_short_name().decode() != "subjectAltName": continue subject_domains = str(ext).split(',') for subject_domain in subject_domains: _, domain = str(subject_domain).split(':') domain_names.add(domain) domain_names_diff = domain_names & set([self.name] + self.alt_names) if len(domain_names_diff) > 0: logging.warning(f"Differences found in CSR: {domain_names_diff}") csr = self.create_csr() return csr def create_csr(self): private_key_pem = self.load_or_create_private_key() private_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, private_key_pem) logging.info(f"Generating a new CSR") csr = crypto_util.make_csr(private_key, [self.name] + self.alt_names) logging.info(f"Writing CSR file into '{self.csr}'") csr_dir = os.path.dirname(self.csr) os.makedirs(csr_dir, mode=0o755, exist_ok=True) with open(self.csr, 'wb') as pem_out: pem_out.write(csr) return csr def run_hooks(self) -> None: for hook in self.hooks: logging.info(f"Running hook: '{hook}'") subprocess.run(hook.split()) def main(): parser = argparse.ArgumentParser( description="Renew ACME certificates for a list of domains. This " "script assumes you already have an account on the CA " "server and a private key for each certificate.", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--config", "-c", default=SSL_CONFIG_FILE, help="Path to the config file.") parser.add_argument("--quiet", "-q", action="store_true", help="Quiet mode.") parser.add_argument("--verbose", "-v", action="store_true", help="Increase verbosity.") parser.add_argument("--force", "-f", action="store_true", help="Force certificates renewal without checking " "their expiration date.") args = parser.parse_args() if args.quiet: log_level = logging.WARNING elif args.verbose: log_level = logging.DEBUG else: log_level = logging.INFO logging.basicConfig(stream=sys.stdout, level=log_level, format="%(levelname)s:%(message)s") try: with open(args.config, 'r') as ifd: config = yaml.safe_load(ifd) except IOError as e: logging.error(f"Unable to load config file: {e}") return 1 try: domains_to_renew = [] for domain_name, domain_details in config["domains"].items(): remaining_days = domain_details.get("remaining_days", CERT_REMAINING_DAYS) domain = Domain(domain_name, owner=domain_details.get("owner", "root"), group=domain_details.get("group", SSL_GROUP), alt_names=domain_details.get("alt_names"), remaining_days=remaining_days, hooks=domain_details.get("hooks")) if not (args.force or domain.check_certificate_expiration_date()): logging.info(f"{domain.name} certificate will not be renewed.") continue logging.info(f"{domain.name} certificate will be renewed.") domains_to_renew.append(domain) if len(domains_to_renew) == 0: logging.info("No domain to renew. Aborting.") return 0 account_config = config["account"] account = Account( private_key=account_config.get("private_key", ACME_ACCOUNT_KEY), email=account_config["email"], owner=account_config.get("owner", "root"), group=account_config.get("group", "root")) directory_url = config.get("directory_url", LETSENCRYPT_DIRECTORY_URL) acme_client = account.register(directory_url) challenge_dir = config.get("challenge_dir", ACME_CHALLENGE_DIR) challenge = Challenge(challenge_dir, acme_client) for domain in domains_to_renew: domain.renew_cert(challenge, force=args.force) except Exception as e: logging.exception(e) return 2 if __name__ == '__main__': import argparse exit(main())