ansible-role-acme/files/acme_renew_cert.py

486 lines
19 KiB
Python
Executable File

#!/usr/bin/env python3
import glob
import grp
import logging
import os
import pwd
import subprocess
import sys
from datetime import datetime
from typing import List, Dict
import acme
import josepy as jose
import requests
import yaml
from acme import client, messages, challenges, crypto_util
from OpenSSL import crypto
SSL_CONFIG_DIR = "/etc/ssl"
SSL_CONFIG_FILE = os.path.join(SSL_CONFIG_DIR, "acme.yml")
SSL_ACCOUNTS_DIR = os.path.join(SSL_CONFIG_DIR, "accounts")
SSL_KEYS_DIR = os.path.join(SSL_CONFIG_DIR, "private")
SSL_CSR_DIR = os.path.join(SSL_CONFIG_DIR, "csr")
SSL_CERTS_DIR = os.path.join(SSL_CONFIG_DIR, "certs")
try:
SSL_GROUP = grp.getgrnam("ssl-cert").gr_name
except KeyError:
SSL_GROUP = "root"
ACME_CONFIG_FILE = f"{SSL_CONFIG_DIR}/acme.yml"
ACME_CONFIG_DIR = f"{SSL_CONFIG_DIR}/acme.d"
ACME_ACCOUNT_KEY = os.path.join(SSL_ACCOUNTS_DIR, "acme_account.key")
ACME_CHALLENGE_DIR = "/var/www/acme/.well-known/acme-challenge"
LETSENCRYPT_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory"
OPENSSL_DATE_FORMAT = '%Y%m%d%H%M%SZ'
CERT_REMAINING_DAYS = 30
USER_AGENT = "python-acme"
OS_OWNERS = {}
OS_GROUPS = {}
def chown(path: str, owner: str, group: str, **kwargs) -> None:
if owner not in OS_OWNERS:
OS_OWNERS[owner] = pwd.getpwnam(owner).pw_uid
if group not in OS_GROUPS:
OS_GROUPS[group] = grp.getgrnam(group).gr_gid
os.chown(path, OS_OWNERS[owner], OS_GROUPS[group], **kwargs)
class PrivateKey:
def __init__(self, private_key: str, owner: str, group: str):
self.private_key = private_key
self.owner = owner
self.group = group
def load_or_create_private_key(self) -> crypto.PKey:
logging.info(f"Loading private key from {self.private_key}")
try:
with open(self.private_key, 'rb') as pem_in:
private_key_pem = crypto.load_privatekey(crypto.FILETYPE_PEM,
pem_in.read())
return private_key_pem
except IOError as e:
logging.warning(f"Unable to load private key: {e}")
logging.info("Generating a new private key")
private_key_pem = crypto.PKey()
private_key_pem.generate_key(crypto.TYPE_RSA, 4096)
logging.info(f"Writing private key into {self.private_key}")
private_key_dir = os.path.dirname(self.private_key)
os.makedirs(private_key_dir, mode=0o750, exist_ok=True)
chown(private_key_dir, self.owner, self.group)
ofd = os.open(self.private_key, os.O_CREAT | os.O_WRONLY, 0o640)
with open(ofd, 'wb') as pem_out:
pem_out.write(crypto.dump_privatekey(crypto.FILETYPE_PEM,
private_key_pem))
chown(self.private_key, self.owner, self.group)
return private_key_pem
class Account(PrivateKey):
def __init__(self, private_key: str, email: str, owner: str, group: str):
super().__init__(private_key, owner, group)
self.private_key_pem = self.load_or_create_private_key()
self.email = email
self.jwk = jose.JWKRSA(key=self.private_key_pem.to_cryptography_key())
self.jwk_pub = self.jwk.public_key()
def register(self, directory_url: str) -> acme.client.ClientV2:
client_network = client.ClientNetwork(self.jwk, user_agent=USER_AGENT)
directory_json = client_network.get(directory_url).json()
directory = messages.Directory.from_json(directory_json)
acme_client = client.ClientV2(directory, net=client_network)
try:
logging.info(f"Registering with a new ACME account")
registration = messages.NewRegistration.from_data(
email=self.email,
terms_of_service_agreed=True)
acme_client.new_account(registration)
except acme.errors.ConflictError as e:
logging.warning(f"Unable to create a new ACME account: {e}")
logging.info("Registering with an existing ACME account")
registration_message = messages.NewRegistration(
key=self.jwk_pub,
only_return_existing=True)
registration_response = acme_client._post(directory["newAccount"],
registration_message)
account = acme_client._regr_from_response(registration_response)
acme_client.net.account = account
return acme_client
class Challenge:
def __init__(self, path: str, acme_client: acme.client.ClientV2):
self.path = path
self.acme_client = acme_client
def run(self, csr_pem: str) -> str:
logging.info("Ordering ACME challenge")
order = self.acme_client.new_order(csr_pem)
logging.info("Selecting HTTP-01 ACME challenge")
challenge = self.select_http01_challenge(order)
logging.info("Performing HTTP-01 ACME challenge")
fullchain_pem = self.perform_http01_challenge(challenge, order)
return fullchain_pem
@staticmethod
def select_http01_challenge(order: messages.OrderResource) \
-> challenges.Challenge:
"""Select the HTTP-01 challenge from a given order
:param messages.OrderResource order: Order containing the challenges.
:returns: The HTTP-01 challenge.
:rtype: challenges.Challenge
"""
for auth in order.authorizations:
for challenge in auth.body.challenges:
if isinstance(challenge.chall, challenges.HTTP01):
return challenge
raise Exception("HTTP-01 challenge was not offered by the CA server.")
def perform_http01_challenge(self, challenge: challenges.Challenge,
order: messages.OrderResource) -> str:
"""Perform the HTTP-01 challenge in a given directory
:param challenges.Challenge challenge: A ACME challenge
:param messages.OrderResource order: An ACME order
:returns: The fullchain certificate in PEM file format
:rtype: str
"""
account_key = self.acme_client.net.key
response, validation = challenge.response_and_validation(account_key)
validation_filename, _ = validation.split('.')
challenge_path = os.path.join(self.path, validation_filename)
logging.info(f"Writing challenge into {challenge_path} directory")
os.makedirs(self.path, mode=0o755, exist_ok=True)
with open(challenge_path, 'w') as ofd:
ofd.write(validation)
self.acme_client.answer_challenge(challenge, response)
fullchain_pem = self.acme_client.poll_and_finalize(order).fullchain_pem
os.remove(challenge_path)
return fullchain_pem
class Domain(PrivateKey):
def __init__(self, name: str, owner: str, group: str,
alt_names: List[str] = None,
cert: str = None, chain_cert: str = None,
fullchain_cert: str = None, private_key: str = None,
csr: str = None, remaining_days: int = CERT_REMAINING_DAYS,
hooks: List[str] = None):
if private_key is None:
private_key = os.path.join(SSL_KEYS_DIR, name + '.key')
super().__init__(private_key, owner, group)
self.name = name
self.alt_names = [] if alt_names is None else alt_names
self.remaining_days = remaining_days
self.hooks = [] if hooks is None else hooks
certs_dir = os.path.join(SSL_CERTS_DIR, name + '.d')
if cert is None:
self.cert = os.path.join(certs_dir, 'cert.pem')
else:
self.cert = cert
if chain_cert is None:
self.chain_cert = os.path.join(certs_dir, 'chain.pem')
else:
self.chain_cert = chain_cert
if fullchain_cert is None:
self.fullchain_cert = os.path.join(certs_dir, 'fullchain.pem')
else:
self.fullchain_cert = fullchain_cert
if csr is None:
self.csr = os.path.join(SSL_CSR_DIR, name + '.csr')
else:
self.csr = csr
self.cert_expiration_days = None
def check_certificate_expiration_date(self) -> bool:
"""Indicate whether the certificate will expire soon or not.
:returns: True if the certificate expires soon.
:rtype: bool
"""
if self.cert_expiration_days is None:
logging.debug(f"Checking '{self.cert}' certificate expiration "
"date")
try:
with open(self.cert, 'rb') as pem_in:
cert_pem = crypto.load_certificate(crypto.FILETYPE_PEM,
pem_in.read())
except IOError as e:
logging.warning(f"Unable to load certificate: {e}")
self.cert_expiration_days = float("-inf")
return True
now = datetime.now()
cert_raw_expiration_date = cert_pem.get_notAfter().decode()
cert_expiration_date = datetime.strptime(cert_raw_expiration_date,
OPENSSL_DATE_FORMAT)
self.cert_expiration_days = (cert_expiration_date - now).days
logging.info(f"Certificate expires in {self.cert_expiration_days} "
"days.")
return self.cert_expiration_days <= self.remaining_days
def renew_cert(self, challenge: Challenge, force: bool = False) -> None:
if not (force or self.check_certificate_expiration_date()):
return
logging.info(f"Renewing {self.name} certificates")
csr_pem = self.load_or_create_csr()
fullchain_pem = challenge.run(csr_pem)
logging.info(f"Saving {self.name} certificates")
certs_pem = []
for line in fullchain_pem.split('\n'):
if 'BEGIN CERTIFICATE' in line:
cert_pem = ''
cert_pem += line + '\n'
if 'END CERTIFICATE' in line:
certs_pem.append(cert_pem)
logging.info(f"Writing '{self.cert}' certificate")
cert_dir = os.path.dirname(self.cert)
os.makedirs(cert_dir, mode=0o755, exist_ok=True)
with open(self.cert, 'w') as pem_out:
pem_out.write(certs_pem[0])
logging.info(f"Writing '{self.chain_cert}' chain certificate")
chain_cert_dir = os.path.dirname(self.chain_cert)
os.makedirs(chain_cert_dir, mode=0o755, exist_ok=True)
with open(self.chain_cert, 'w') as pem_out:
pem_out.write(''.join(certs_pem[1:]))
logging.info(f"Writing '{self.fullchain_cert}' full chain certificate")
fullchain_cert_dir = os.path.dirname(self.fullchain_cert)
os.makedirs(fullchain_cert_dir, mode=0o755, exist_ok=True)
with open(self.fullchain_cert, 'w') as pem_out:
pem_out.write(fullchain_pem)
self.run_hooks()
def load_or_create_csr(self) -> str:
"""Load or create a CSR for the domain.
:returns: The CSR in PEM file format.
:rtype: str
"""
logging.info(f"Loading {self.name} CSR file from '{self.csr}'")
try:
with open(self.csr, 'rb') as pem_in:
csr = pem_in.read()
csr_pem = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)
except IOError as e:
logging.warning(f"Unable to load CSR file: {e}")
csr = self.create_csr()
logging.info("Checking if CSR contains all the domains")
domain_names = set()
for ext in csr_pem.get_extensions():
if ext.get_short_name().decode() != "subjectAltName":
continue
subject_domains = str(ext).split(',')
for subject_domain in subject_domains:
_, domain = str(subject_domain).split(':')
domain_names.add(domain)
domain_names_diff = domain_names ^ set([self.name] + self.alt_names)
if len(domain_names_diff) > 0:
logging.warning(f"Differences found in CSR: {domain_names_diff}")
csr = self.create_csr()
return csr
def create_csr(self):
private_key_pem = self.load_or_create_private_key()
private_key = crypto.dump_privatekey(crypto.FILETYPE_PEM,
private_key_pem)
logging.info(f"Generating a new CSR")
csr = crypto_util.make_csr(private_key, [self.name] + self.alt_names)
logging.info(f"Writing CSR file into '{self.csr}'")
csr_dir = os.path.dirname(self.csr)
os.makedirs(csr_dir, mode=0o755, exist_ok=True)
with open(self.csr, 'wb') as pem_out:
pem_out.write(csr)
return csr
def run_hooks(self) -> None:
for hook in self.hooks:
logging.info(f"Running hook: '{hook}'")
hook_res = subprocess.run(hook.split(),
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE)
if hook_res.returncode != 0:
logging.error(f"Unable to run hook: {hook_res.stderr}")
class Config:
def __init__(self, filename, directory):
self.filename = filename
self.directory = directory
self.config = {}
self.domain_configs = {}
def load(self) -> None:
with open(self.filename, 'r') as ifd:
self._data = yaml.safe_load(ifd)
for pattern in ('*.yml', '*.yaml'):
config_glob = os.path.join(self.directory, pattern)
for config_file in glob.glob(config_glob):
with open(config_file, 'r') as ifd:
self.domain_configs[config_file] = yaml.safe_load(ifd)
def parse_domains(self) -> List[Domain]:
domains = self._data.get("domains", {})
domain_uniques = set()
domain_duplicates = set()
if isinstance(domains, list):
for domain in domains:
domain_name = domain["name"]
if domain_name not in domain_uniques:
domain_uniques.add(domain_name)
else:
domain_duplicates.add(domain_name)
domains = {domain["name"]: domain for domain in domains}
for config_file, domain_config in self.domain_configs.items():
domain_filename, _ = os.path.splitext(config_file)
domain_name = domain_config.get("name", domain_filename)
if domain_name not in domain_uniques:
domain_uniques.add(domain_name)
else:
domain_duplicates.add(domain_name)
domains[domain_name] = domain_config
if len(domain_duplicates) > 0:
raise RuntimeError(f"Duplicate domains found: {domain_duplicates}")
domains_parsed = []
for domain_name, domain_details in domains.items():
remaining_days = domain_details.get("remaining_days",
CERT_REMAINING_DAYS)
domain = Domain(
name=domain_name,
owner=domain_details.get("owner", "root"),
group=domain_details.get("group", SSL_GROUP),
alt_names=domain_details.get("alt_names"),
cert=domain_details.get("cert"),
chain_cert=domain_details.get("chain_cert"),
fullchain_cert=domain_details.get("fullchain_cert"),
private_key=domain_details.get("private_key"),
csr=domain_details.get("csr"),
remaining_days=remaining_days,
hooks=domain_details.get("hooks"))
domains_parsed.append(domain)
return domains_parsed
def parse_account(self) -> Account:
account_config = self._data["account"]
return Account(
private_key=account_config.get("private_key", ACME_ACCOUNT_KEY),
email=account_config["email"],
owner=account_config.get("owner", "root"),
group=account_config.get("group", "root"))
@property
def directory_url(self) -> str:
return self._data.get("directory_url", LETSENCRYPT_DIRECTORY_URL)
@property
def challenge_dir(self) -> str:
return self._data.get("challenge_dir", ACME_CHALLENGE_DIR)
def main():
parser = argparse.ArgumentParser(
description="Renew ACME certificates for a list of domains. This "
"script assumes you already have an account on the CA "
"server and a private key for each certificate.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--config", "-c", default=ACME_CONFIG_FILE,
help="Path to the config file.")
parser.add_argument("--config-dir", "-d", default=ACME_CONFIG_DIR,
help="Path to the config directory.")
parser.add_argument("--quiet", "-q", action="store_true",
help="Quiet mode.")
parser.add_argument("--verbose", "-v", action="store_true",
help="Increase verbosity.")
parser.add_argument("--force", "-f", action="store_true",
help="Force certificates renewal without checking "
"their expiration date.")
args = parser.parse_args()
if args.quiet:
log_level = logging.WARNING
elif args.verbose:
log_level = logging.DEBUG
else:
log_level = logging.INFO
logging.basicConfig(stream=sys.stdout, level=log_level,
format="%(levelname)s:%(message)s")
config = Config(args.config, args.config_dir)
try:
config.load()
except IOError as e:
logging.error(f"Unable to load config file: {e}")
return 1
try:
domains = config.parse_domains()
except RuntimeError as e:
logging.error(f"Unable to parse domains: {e}")
return 2
try:
domains_to_renew = []
for domain in domains:
if not (args.force or domain.check_certificate_expiration_date()):
logging.info(f"{domain.name} certificate will not be renewed.")
continue
logging.info(f"{domain.name} certificate will be renewed.")
domains_to_renew.append(domain)
if len(domains_to_renew) == 0:
logging.info("No domain to renew. Aborting.")
return 0
account = config.parse_account()
acme_client = account.register(config.directory_url)
challenge = Challenge(config.challenge_dir, acme_client)
for domain in domains_to_renew:
domain.renew_cert(challenge, force=args.force)
except Exception as e:
logging.exception(e)
return 3
if __name__ == '__main__':
import argparse
exit(main())