ansible-role-acme/files/acme_renew_cert.py

486 lines
19 KiB
Python
Raw Normal View History

2020-05-21 20:25:55 +02:00
#!/usr/bin/env python3
import glob
import grp
2020-05-21 20:25:55 +02:00
import logging
import os
import pwd
import subprocess
2020-05-21 20:25:55 +02:00
import sys
from datetime import datetime
from typing import List, Dict
2020-05-21 20:25:55 +02:00
import acme
import josepy as jose
import requests
import yaml
2020-05-21 20:25:55 +02:00
from acme import client, messages, challenges, crypto_util
from OpenSSL import crypto
2020-05-21 20:25:55 +02:00
SSL_CONFIG_DIR = "/etc/ssl"
SSL_CONFIG_FILE = os.path.join(SSL_CONFIG_DIR, "acme.yml")
SSL_ACCOUNTS_DIR = os.path.join(SSL_CONFIG_DIR, "accounts")
SSL_KEYS_DIR = os.path.join(SSL_CONFIG_DIR, "private")
SSL_CSR_DIR = os.path.join(SSL_CONFIG_DIR, "csr")
SSL_CERTS_DIR = os.path.join(SSL_CONFIG_DIR, "certs")
try:
SSL_GROUP = grp.getgrnam("ssl-cert").gr_name
except KeyError:
SSL_GROUP = "root"
ACME_CONFIG_FILE = f"{SSL_CONFIG_DIR}/acme.yml"
ACME_CONFIG_DIR = f"{SSL_CONFIG_DIR}/acme.d"
ACME_ACCOUNT_KEY = os.path.join(SSL_ACCOUNTS_DIR, "acme_account.key")
ACME_CHALLENGE_DIR = "/var/www/acme/.well-known/acme-challenge"
LETSENCRYPT_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory"
2020-05-21 20:25:55 +02:00
OPENSSL_DATE_FORMAT = '%Y%m%d%H%M%SZ'
CERT_REMAINING_DAYS = 30
USER_AGENT = "python-acme"
OS_OWNERS = {}
OS_GROUPS = {}
def chown(path: str, owner: str, group: str, **kwargs) -> None:
if owner not in OS_OWNERS:
OS_OWNERS[owner] = pwd.getpwnam(owner).pw_uid
if group not in OS_GROUPS:
OS_GROUPS[group] = grp.getgrnam(group).gr_gid
os.chown(path, OS_OWNERS[owner], OS_GROUPS[group], **kwargs)
class PrivateKey:
def __init__(self, private_key: str, owner: str, group: str):
self.private_key = private_key
self.owner = owner
self.group = group
def load_or_create_private_key(self) -> crypto.PKey:
logging.info(f"Loading private key from {self.private_key}")
try:
with open(self.private_key, 'rb') as pem_in:
private_key_pem = crypto.load_privatekey(crypto.FILETYPE_PEM,
pem_in.read())
return private_key_pem
except IOError as e:
logging.warning(f"Unable to load private key: {e}")
logging.info("Generating a new private key")
private_key_pem = crypto.PKey()
private_key_pem.generate_key(crypto.TYPE_RSA, 4096)
logging.info(f"Writing private key into {self.private_key}")
private_key_dir = os.path.dirname(self.private_key)
os.makedirs(private_key_dir, mode=0o750, exist_ok=True)
chown(private_key_dir, self.owner, self.group)
ofd = os.open(self.private_key, os.O_CREAT | os.O_WRONLY, 0o640)
with open(ofd, 'wb') as pem_out:
pem_out.write(crypto.dump_privatekey(crypto.FILETYPE_PEM,
private_key_pem))
chown(self.private_key, self.owner, self.group)
return private_key_pem
class Account(PrivateKey):
def __init__(self, private_key: str, email: str, owner: str, group: str):
super().__init__(private_key, owner, group)
self.private_key_pem = self.load_or_create_private_key()
self.email = email
self.jwk = jose.JWKRSA(key=self.private_key_pem.to_cryptography_key())
self.jwk_pub = self.jwk.public_key()
def register(self, directory_url: str) -> acme.client.ClientV2:
client_network = client.ClientNetwork(self.jwk, user_agent=USER_AGENT)
directory_json = client_network.get(directory_url).json()
directory = messages.Directory.from_json(directory_json)
acme_client = client.ClientV2(directory, net=client_network)
try:
logging.info(f"Registering with a new ACME account")
registration = messages.NewRegistration.from_data(
email=self.email,
terms_of_service_agreed=True)
acme_client.new_account(registration)
except acme.errors.ConflictError as e:
logging.warning(f"Unable to create a new ACME account: {e}")
logging.info("Registering with an existing ACME account")
registration_message = messages.NewRegistration(
key=self.jwk_pub,
only_return_existing=True)
registration_response = acme_client._post(directory["newAccount"],
registration_message)
account = acme_client._regr_from_response(registration_response)
acme_client.net.account = account
return acme_client
class Challenge:
def __init__(self, path: str, acme_client: acme.client.ClientV2):
self.path = path
self.acme_client = acme_client
def run(self, csr_pem: str) -> str:
logging.info("Ordering ACME challenge")
order = self.acme_client.new_order(csr_pem)
logging.info("Selecting HTTP-01 ACME challenge")
challenge = self.select_http01_challenge(order)
logging.info("Performing HTTP-01 ACME challenge")
fullchain_pem = self.perform_http01_challenge(challenge, order)
return fullchain_pem
@staticmethod
def select_http01_challenge(order: messages.OrderResource) \
-> challenges.Challenge:
"""Select the HTTP-01 challenge from a given order
:param messages.OrderResource order: Order containing the challenges.
:returns: The HTTP-01 challenge.
:rtype: challenges.Challenge
"""
for auth in order.authorizations:
for challenge in auth.body.challenges:
if isinstance(challenge.chall, challenges.HTTP01):
return challenge
raise Exception("HTTP-01 challenge was not offered by the CA server.")
def perform_http01_challenge(self, challenge: challenges.Challenge,
order: messages.OrderResource) -> str:
"""Perform the HTTP-01 challenge in a given directory
:param challenges.Challenge challenge: A ACME challenge
:param messages.OrderResource order: An ACME order
:returns: The fullchain certificate in PEM file format
:rtype: str
"""
account_key = self.acme_client.net.key
response, validation = challenge.response_and_validation(account_key)
validation_filename, _ = validation.split('.')
challenge_path = os.path.join(self.path, validation_filename)
logging.info(f"Writing challenge into {challenge_path} directory")
os.makedirs(self.path, mode=0o755, exist_ok=True)
with open(challenge_path, 'w') as ofd:
ofd.write(validation)
self.acme_client.answer_challenge(challenge, response)
fullchain_pem = self.acme_client.poll_and_finalize(order).fullchain_pem
os.remove(challenge_path)
return fullchain_pem
class Domain(PrivateKey):
def __init__(self, name: str, owner: str, group: str,
alt_names: List[str] = None,
cert: str = None, chain_cert: str = None,
fullchain_cert: str = None, private_key: str = None,
csr: str = None, remaining_days: int = CERT_REMAINING_DAYS,
hooks: List[str] = None):
if private_key is None:
private_key = os.path.join(SSL_KEYS_DIR, name + '.key')
super().__init__(private_key, owner, group)
self.name = name
self.alt_names = [] if alt_names is None else alt_names
self.remaining_days = remaining_days
self.hooks = [] if hooks is None else hooks
certs_dir = os.path.join(SSL_CERTS_DIR, name + '.d')
if cert is None:
self.cert = os.path.join(certs_dir, 'cert.pem')
else:
self.cert = cert
if chain_cert is None:
self.chain_cert = os.path.join(certs_dir, 'chain.pem')
else:
self.chain_cert = chain_cert
if fullchain_cert is None:
self.fullchain_cert = os.path.join(certs_dir, 'fullchain.pem')
else:
self.fullchain_cert = fullchain_cert
if csr is None:
self.csr = os.path.join(SSL_CSR_DIR, name + '.csr')
else:
self.csr = csr
self.cert_expiration_days = None
def check_certificate_expiration_date(self) -> bool:
"""Indicate whether the certificate will expire soon or not.
:returns: True if the certificate expires soon.
:rtype: bool
"""
if self.cert_expiration_days is None:
logging.debug(f"Checking '{self.cert}' certificate expiration "
"date")
try:
with open(self.cert, 'rb') as pem_in:
cert_pem = crypto.load_certificate(crypto.FILETYPE_PEM,
pem_in.read())
except IOError as e:
logging.warning(f"Unable to load certificate: {e}")
self.cert_expiration_days = float("-inf")
return True
now = datetime.now()
cert_raw_expiration_date = cert_pem.get_notAfter().decode()
cert_expiration_date = datetime.strptime(cert_raw_expiration_date,
OPENSSL_DATE_FORMAT)
self.cert_expiration_days = (cert_expiration_date - now).days
logging.info(f"Certificate expires in {self.cert_expiration_days} "
"days.")
return self.cert_expiration_days <= self.remaining_days
def renew_cert(self, challenge: Challenge, force: bool = False) -> None:
if not (force or self.check_certificate_expiration_date()):
return
logging.info(f"Renewing {self.name} certificates")
csr_pem = self.load_or_create_csr()
fullchain_pem = challenge.run(csr_pem)
logging.info(f"Saving {self.name} certificates")
certs_pem = []
for line in fullchain_pem.split('\n'):
if 'BEGIN CERTIFICATE' in line:
cert_pem = ''
cert_pem += line + '\n'
if 'END CERTIFICATE' in line:
certs_pem.append(cert_pem)
logging.info(f"Writing '{self.cert}' certificate")
cert_dir = os.path.dirname(self.cert)
os.makedirs(cert_dir, mode=0o755, exist_ok=True)
with open(self.cert, 'w') as pem_out:
pem_out.write(certs_pem[0])
logging.info(f"Writing '{self.chain_cert}' chain certificate")
chain_cert_dir = os.path.dirname(self.chain_cert)
os.makedirs(chain_cert_dir, mode=0o755, exist_ok=True)
with open(self.chain_cert, 'w') as pem_out:
pem_out.write(''.join(certs_pem[1:]))
logging.info(f"Writing '{self.fullchain_cert}' full chain certificate")
fullchain_cert_dir = os.path.dirname(self.fullchain_cert)
os.makedirs(fullchain_cert_dir, mode=0o755, exist_ok=True)
with open(self.fullchain_cert, 'w') as pem_out:
pem_out.write(fullchain_pem)
self.run_hooks()
def load_or_create_csr(self) -> str:
"""Load or create a CSR for the domain.
:returns: The CSR in PEM file format.
:rtype: str
"""
logging.info(f"Loading {self.name} CSR file from '{self.csr}'")
try:
with open(self.csr, 'rb') as pem_in:
csr = pem_in.read()
except IOError as e:
logging.warning(f"Unable to load CSR file: {e}")
csr = self.create_csr()
2021-03-26 17:53:50 +01:00
csr_pem = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)
logging.info("Checking if CSR contains all the domains")
domain_names = set()
for ext in csr_pem.get_extensions():
if ext.get_short_name().decode() != "subjectAltName":
continue
subject_domains = str(ext).split(',')
for subject_domain in subject_domains:
_, domain = str(subject_domain).split(':')
domain_names.add(domain)
2020-12-25 23:39:00 +01:00
domain_names_diff = domain_names ^ set([self.name] + self.alt_names)
if len(domain_names_diff) > 0:
logging.warning(f"Differences found in CSR: {domain_names_diff}")
csr = self.create_csr()
return csr
def create_csr(self):
private_key_pem = self.load_or_create_private_key()
private_key = crypto.dump_privatekey(crypto.FILETYPE_PEM,
private_key_pem)
logging.info(f"Generating a new CSR")
csr = crypto_util.make_csr(private_key, [self.name] + self.alt_names)
logging.info(f"Writing CSR file into '{self.csr}'")
csr_dir = os.path.dirname(self.csr)
os.makedirs(csr_dir, mode=0o755, exist_ok=True)
with open(self.csr, 'wb') as pem_out:
pem_out.write(csr)
return csr
def run_hooks(self) -> None:
for hook in self.hooks:
logging.info(f"Running hook: '{hook}'")
2020-12-25 23:06:54 +01:00
hook_res = subprocess.run(hook.split(),
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE)
if hook_res.returncode != 0:
logging.error(f"Unable to run hook: {hook_res.stderr}")
class Config:
def __init__(self, filename, directory):
self.filename = filename
self.directory = directory
self.config = {}
self.domain_configs = {}
def load(self) -> None:
with open(self.filename, 'r') as ifd:
self._data = yaml.safe_load(ifd)
for pattern in ('*.yml', '*.yaml'):
config_glob = os.path.join(self.directory, pattern)
for config_file in glob.glob(config_glob):
with open(config_file, 'r') as ifd:
self.domain_configs[config_file] = yaml.safe_load(ifd)
def parse_domains(self) -> List[Domain]:
domains = self._data.get("domains", {})
domain_uniques = set()
domain_duplicates = set()
if isinstance(domains, list):
for domain in domains:
domain_name = domain["name"]
if domain_name not in domain_uniques:
domain_uniques.add(domain_name)
else:
domain_duplicates.add(domain_name)
domains = {domain["name"]: domain for domain in domains}
for config_file, domain_config in self.domain_configs.items():
domain_filename, _ = os.path.splitext(config_file)
domain_name = domain_config.get("name", domain_filename)
if domain_name not in domain_uniques:
domain_uniques.add(domain_name)
else:
domain_duplicates.add(domain_name)
domains[domain_name] = domain_config
if len(domain_duplicates) > 0:
raise RuntimeError(f"Duplicate domains found: {domain_duplicates}")
domains_parsed = []
for domain_name, domain_details in domains.items():
remaining_days = domain_details.get("remaining_days",
CERT_REMAINING_DAYS)
domain = Domain(
name=domain_name,
owner=domain_details.get("owner", "root"),
group=domain_details.get("group", SSL_GROUP),
alt_names=domain_details.get("alt_names"),
cert=domain_details.get("cert"),
chain_cert=domain_details.get("chain_cert"),
fullchain_cert=domain_details.get("fullchain_cert"),
private_key=domain_details.get("private_key"),
csr=domain_details.get("csr"),
remaining_days=remaining_days,
hooks=domain_details.get("hooks"))
domains_parsed.append(domain)
return domains_parsed
def parse_account(self) -> Account:
account_config = self._data["account"]
return Account(
private_key=account_config.get("private_key", ACME_ACCOUNT_KEY),
email=account_config["email"],
owner=account_config.get("owner", "root"),
group=account_config.get("group", "root"))
@property
def directory_url(self) -> str:
return self._data.get("directory_url", LETSENCRYPT_DIRECTORY_URL)
@property
def challenge_dir(self) -> str:
return self._data.get("challenge_dir", ACME_CHALLENGE_DIR)
def main():
parser = argparse.ArgumentParser(
description="Renew ACME certificates for a list of domains. This "
"script assumes you already have an account on the CA "
"server and a private key for each certificate.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--config", "-c", default=ACME_CONFIG_FILE,
help="Path to the config file.")
parser.add_argument("--config-dir", "-d", default=ACME_CONFIG_DIR,
help="Path to the config directory.")
parser.add_argument("--quiet", "-q", action="store_true",
help="Quiet mode.")
parser.add_argument("--verbose", "-v", action="store_true",
help="Increase verbosity.")
parser.add_argument("--force", "-f", action="store_true",
help="Force certificates renewal without checking "
"their expiration date.")
args = parser.parse_args()
2020-05-21 20:25:55 +02:00
if args.quiet:
log_level = logging.WARNING
elif args.verbose:
log_level = logging.DEBUG
else:
log_level = logging.INFO
2020-05-21 20:25:55 +02:00
logging.basicConfig(stream=sys.stdout, level=log_level,
format="%(levelname)s:%(message)s")
2020-05-21 20:25:55 +02:00
config = Config(args.config, args.config_dir)
2020-05-21 20:25:55 +02:00
try:
config.load()
2020-05-21 20:25:55 +02:00
except IOError as e:
logging.error(f"Unable to load config file: {e}")
return 1
2020-05-21 20:25:55 +02:00
try:
domains = config.parse_domains()
except RuntimeError as e:
logging.error(f"Unable to parse domains: {e}")
return 2
2020-12-25 20:32:30 +01:00
2020-05-21 20:25:55 +02:00
try:
domains_to_renew = []
for domain in domains:
if not (args.force or domain.check_certificate_expiration_date()):
logging.info(f"{domain.name} certificate will not be renewed.")
continue
logging.info(f"{domain.name} certificate will be renewed.")
domains_to_renew.append(domain)
if len(domains_to_renew) == 0:
logging.info("No domain to renew. Aborting.")
return 0
account = config.parse_account()
acme_client = account.register(config.directory_url)
challenge = Challenge(config.challenge_dir, acme_client)
for domain in domains_to_renew:
domain.renew_cert(challenge, force=args.force)
except Exception as e:
logging.exception(e)
2020-12-25 20:32:30 +01:00
return 3
2020-05-21 20:25:55 +02:00
if __name__ == '__main__':
import argparse
exit(main())