ansible-role-acme/files/acme_renew_cert.py

409 lines
16 KiB
Python
Raw Normal View History

2020-05-21 20:25:55 +02:00
#!/usr/bin/env python3
import grp
2020-05-21 20:25:55 +02:00
import logging
import os
import pwd
import subprocess
2020-05-21 20:25:55 +02:00
import sys
from datetime import datetime
from typing import List, Dict
2020-05-21 20:25:55 +02:00
import acme
import josepy as jose
import requests
import yaml
2020-05-21 20:25:55 +02:00
from acme import client, messages, challenges, crypto_util
from OpenSSL import crypto
2020-05-21 20:25:55 +02:00
SSL_CONFIG_DIR = "/etc/ssl"
SSL_CONFIG_FILE = os.path.join(SSL_CONFIG_DIR, "acme.yml")
SSL_ACCOUNTS_DIR = os.path.join(SSL_CONFIG_DIR, "accounts")
SSL_KEYS_DIR = os.path.join(SSL_CONFIG_DIR, "private")
SSL_CSR_DIR = os.path.join(SSL_CONFIG_DIR, "csr")
SSL_CERTS_DIR = os.path.join(SSL_CONFIG_DIR, "certs")
try:
SSL_GROUP = grp.getgrnam("ssl-cert").gr_name
except KeyError:
SSL_GROUP = "root"
ACME_ACCOUNT_KEY = os.path.join(SSL_ACCOUNTS_DIR, "acme_account.key")
ACME_CHALLENGE_DIR = "/var/www/acme/.well-known/acme-challenge"
LETSENCRYPT_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory"
2020-05-21 20:25:55 +02:00
OPENSSL_DATE_FORMAT = '%Y%m%d%H%M%SZ'
CERT_REMAINING_DAYS = 30
USER_AGENT = "python-acme"
OS_OWNERS = {}
OS_GROUPS = {}
def chown(path: str, owner: str, group: str, **kwargs) -> None:
if owner not in OS_OWNERS:
OS_OWNERS[owner] = pwd.getpwnam(owner).pw_uid
if group not in OS_GROUPS:
OS_GROUPS[group] = grp.getgrnam(group).gr_gid
os.chown(path, OS_OWNERS[owner], OS_GROUPS[group], **kwargs)
class PrivateKey:
def __init__(self, private_key: str, owner: str, group: str):
self.private_key = private_key
self.owner = owner
self.group = group
def load_or_create_private_key(self) -> crypto.PKey:
logging.info(f"Loading private key from {self.private_key}")
try:
with open(self.private_key, 'rb') as pem_in:
private_key_pem = crypto.load_privatekey(crypto.FILETYPE_PEM,
pem_in.read())
return private_key_pem
except IOError as e:
logging.warning(f"Unable to load private key: {e}")
logging.info("Generating a new private key")
private_key_pem = crypto.PKey()
private_key_pem.generate_key(crypto.TYPE_RSA, 4096)
logging.info(f"Writing private key into {self.private_key}")
private_key_dir = os.path.dirname(self.private_key)
os.makedirs(private_key_dir, mode=0o750, exist_ok=True)
chown(private_key_dir, self.owner, self.group)
ofd = os.open(self.private_key, os.O_CREAT | os.O_WRONLY, 0o640)
with open(ofd, 'wb') as pem_out:
pem_out.write(crypto.dump_privatekey(crypto.FILETYPE_PEM,
private_key_pem))
chown(self.private_key, self.owner, self.group)
return private_key_pem
class Account(PrivateKey):
def __init__(self, private_key: str, email: str, owner: str, group: str):
super().__init__(private_key, owner, group)
self.private_key_pem = self.load_or_create_private_key()
self.email = email
self.jwk = jose.JWKRSA(key=self.private_key_pem.to_cryptography_key())
self.jwk_pub = self.jwk.public_key()
def register(self, directory_url: str) -> acme.client.ClientV2:
client_network = client.ClientNetwork(self.jwk, user_agent=USER_AGENT)
directory_json = client_network.get(directory_url).json()
directory = messages.Directory.from_json(directory_json)
acme_client = client.ClientV2(directory, net=client_network)
try:
logging.info(f"Registering with a new ACME account")
registration = messages.NewRegistration.from_data(
email=self.email,
terms_of_service_agreed=True)
acme_client.new_account(registration)
except acme.errors.ConflictError as e:
logging.warning(f"Unable to create a new ACME account: {e}")
logging.info("Registering with an existing ACME account")
registration_message = messages.NewRegistration(
key=self.jwk_pub,
only_return_existing=True)
registration_response = acme_client._post(directory["newAccount"],
registration_message)
account = acme_client._regr_from_response(registration_response)
acme_client.net.account = account
return acme_client
class Challenge:
def __init__(self, path: str, acme_client: acme.client.ClientV2):
self.path = path
self.acme_client = acme_client
def run(self, csr_pem: str) -> str:
logging.info("Ordering ACME challenge")
order = self.acme_client.new_order(csr_pem)
logging.info("Selecting HTTP-01 ACME challenge")
challenge = self.select_http01_challenge(order)
logging.info("Performing HTTP-01 ACME challenge")
fullchain_pem = self.perform_http01_challenge(challenge, order)
return fullchain_pem
@staticmethod
def select_http01_challenge(order: messages.OrderResource) \
-> challenges.Challenge:
"""Select the HTTP-01 challenge from a given order
:param messages.OrderResource order: Order containing the challenges.
:returns: The HTTP-01 challenge.
:rtype: challenges.Challenge
"""
for auth in order.authorizations:
for challenge in auth.body.challenges:
if isinstance(challenge.chall, challenges.HTTP01):
return challenge
raise Exception("HTTP-01 challenge was not offered by the CA server.")
def perform_http01_challenge(self, challenge: challenges.Challenge,
order: messages.OrderResource) -> str:
"""Perform the HTTP-01 challenge in a given directory
:param challenges.Challenge challenge: A ACME challenge
:param messages.OrderResource order: An ACME order
:returns: The fullchain certificate in PEM file format
:rtype: str
"""
account_key = self.acme_client.net.key
response, validation = challenge.response_and_validation(account_key)
validation_filename, _ = validation.split('.')
challenge_path = os.path.join(self.path, validation_filename)
logging.info(f"Writing challenge into {challenge_path} directory")
os.makedirs(self.path, mode=0o755, exist_ok=True)
with open(challenge_path, 'w') as ofd:
ofd.write(validation)
self.acme_client.answer_challenge(challenge, response)
fullchain_pem = self.acme_client.poll_and_finalize(order).fullchain_pem
os.remove(challenge_path)
return fullchain_pem
class Domain(PrivateKey):
def __init__(self, name: str, owner: str, group: str,
alt_names: List[str] = None,
remaining_days: int = CERT_REMAINING_DAYS,
hooks: List[str] = None):
private_key = os.path.join(SSL_KEYS_DIR, name + '.key')
super().__init__(private_key, owner, group)
self.name = name
self.alt_names = [] if alt_names is None else alt_names
self.remaining_days = remaining_days
self.hooks = [] if hooks is None else hooks
certs_dir = os.path.join(SSL_CERTS_DIR, name + '.d')
self.csr = os.path.join(SSL_CSR_DIR, name + '.csr')
self.cert = os.path.join(certs_dir, 'cert.pem')
self.chain_cert = os.path.join(certs_dir, 'chain.pem')
self.fullchain_cert = os.path.join(certs_dir, 'fullchain.pem')
self.cert_expiration_days = None
def check_certificate_expiration_date(self) -> bool:
"""Indicate whether the certificate will expire soon or not.
:returns: True if the certificate expires soon.
:rtype: bool
"""
if self.cert_expiration_days is None:
logging.debug(f"Checking '{self.cert}' certificate expiration "
"date")
try:
with open(self.cert, 'rb') as pem_in:
cert_pem = crypto.load_certificate(crypto.FILETYPE_PEM,
pem_in.read())
except IOError as e:
logging.warning(f"Unable to load certificate: {e}")
self.cert_expiration_days = float("-inf")
return True
now = datetime.now()
cert_raw_expiration_date = cert_pem.get_notAfter().decode()
cert_expiration_date = datetime.strptime(cert_raw_expiration_date,
OPENSSL_DATE_FORMAT)
self.cert_expiration_days = (cert_expiration_date - now).days
logging.info(f"Certificate expires in {self.cert_expiration_days} "
"days.")
return self.cert_expiration_days <= self.remaining_days
def renew_cert(self, challenge: Challenge, force: bool = False) -> None:
if not (force or self.check_certificate_expiration_date()):
return
logging.info(f"Renewing {self.name} certificates")
csr_pem = self.load_or_create_csr()
fullchain_pem = challenge.run(csr_pem)
logging.info(f"Saving {self.name} certificates")
certs_pem = []
for line in fullchain_pem.split('\n'):
if 'BEGIN CERTIFICATE' in line:
cert_pem = ''
cert_pem += line + '\n'
if 'END CERTIFICATE' in line:
certs_pem.append(cert_pem)
logging.info(f"Writing '{self.cert}' certificate")
cert_dir = os.path.dirname(self.cert)
os.makedirs(cert_dir, mode=0o755, exist_ok=True)
with open(self.cert, 'w') as pem_out:
pem_out.write(certs_pem[0])
logging.info(f"Writing '{self.chain_cert}' chain certificate")
chain_cert_dir = os.path.dirname(self.chain_cert)
os.makedirs(chain_cert_dir, mode=0o755, exist_ok=True)
with open(self.chain_cert, 'w') as pem_out:
pem_out.write(''.join(certs_pem[1:]))
logging.info(f"Writing '{self.fullchain_cert}' full chain certificate")
fullchain_cert_dir = os.path.dirname(self.fullchain_cert)
os.makedirs(fullchain_cert_dir, mode=0o755, exist_ok=True)
with open(self.fullchain_cert, 'w') as pem_out:
pem_out.write(fullchain_pem)
self.run_hooks()
def load_or_create_csr(self) -> str:
"""Load or create a CSR for the domain.
:returns: The CSR in PEM file format.
:rtype: str
"""
logging.info(f"Loading {self.name} CSR file from '{self.csr}'")
try:
with open(self.csr, 'rb') as pem_in:
csr = pem_in.read()
csr_pem = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)
except IOError as e:
logging.warning(f"Unable to load CSR file: {e}")
csr = self.create_csr()
logging.info("Checking if CSR contains all the domains")
domain_names = set()
for ext in csr_pem.get_extensions():
if ext.get_short_name().decode() != "subjectAltName":
continue
subject_domains = str(ext).split(',')
for subject_domain in subject_domains:
_, domain = str(subject_domain).split(':')
domain_names.add(domain)
domain_names_diff = domain_names & set([self.name] + self.alt_names)
if len(domain_names_diff) > 0:
logging.warning(f"Differences found in CSR: {domain_names_diff}")
csr = self.create_csr()
return csr
def create_csr(self):
private_key_pem = self.load_or_create_private_key()
private_key = crypto.dump_privatekey(crypto.FILETYPE_PEM,
private_key_pem)
logging.info(f"Generating a new CSR")
csr = crypto_util.make_csr(private_key, [self.name] + self.alt_names)
logging.info(f"Writing CSR file into '{self.csr}'")
csr_dir = os.path.dirname(self.csr)
os.makedirs(csr_dir, mode=0o755, exist_ok=True)
with open(self.csr, 'wb') as pem_out:
pem_out.write(csr)
return csr
def run_hooks(self) -> None:
for hook in self.hooks:
logging.info(f"Running hook: '{hook}'")
subprocess.run(hook.split())
def main():
parser = argparse.ArgumentParser(
description="Renew ACME certificates for a list of domains. This "
"script assumes you already have an account on the CA "
"server and a private key for each certificate.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--config", "-c", default=SSL_CONFIG_FILE,
help="Path to the config file.")
parser.add_argument("--quiet", "-q", action="store_true",
help="Quiet mode.")
parser.add_argument("--verbose", "-v", action="store_true",
help="Increase verbosity.")
parser.add_argument("--force", "-f", action="store_true",
help="Force certificates renewal without checking "
"their expiration date.")
args = parser.parse_args()
2020-05-21 20:25:55 +02:00
if args.quiet:
log_level = logging.WARNING
elif args.verbose:
log_level = logging.DEBUG
else:
log_level = logging.INFO
2020-05-21 20:25:55 +02:00
logging.basicConfig(stream=sys.stdout, level=log_level,
format="%(levelname)s:%(message)s")
2020-05-21 20:25:55 +02:00
try:
with open(args.config, 'r') as ifd:
config = yaml.safe_load(ifd)
2020-05-21 20:25:55 +02:00
except IOError as e:
logging.error(f"Unable to load config file: {e}")
return 1
2020-05-21 20:25:55 +02:00
2020-12-25 20:32:30 +01:00
if isinstance(config["domains"], list):
2020-12-25 20:35:39 +01:00
domains = {domain["name"]: domain for domain in config["domains"]}
2020-12-25 20:32:30 +01:00
if len(domains) != len(config["domains"]):
domain_uniques = set()
domain_duplicates = set()
for domain in config["domains"]:
2020-12-25 20:35:39 +01:00
domain_name = domain["name"]
if domain_name not in domain_uniques:
domain_uniques.add(domain_name)
2020-12-25 20:32:30 +01:00
else:
2020-12-25 20:35:39 +01:00
domain_duplicates.add(domain_name)
2020-12-25 20:32:30 +01:00
logging.error(f"Duplicate domain name(s) found: {domain_uniques}")
return 2
2020-12-25 20:37:34 +01:00
config["domains"] = domains
2020-12-25 20:32:30 +01:00
2020-05-21 20:25:55 +02:00
try:
domains_to_renew = []
for domain_name, domain_details in config["domains"].items():
remaining_days = domain_details.get("remaining_days",
CERT_REMAINING_DAYS)
domain = Domain(domain_name,
owner=domain_details.get("owner", "root"),
group=domain_details.get("group", SSL_GROUP),
alt_names=domain_details.get("alt_names"),
remaining_days=remaining_days,
hooks=domain_details.get("hooks"))
if not (args.force or domain.check_certificate_expiration_date()):
logging.info(f"{domain.name} certificate will not be renewed.")
continue
logging.info(f"{domain.name} certificate will be renewed.")
domains_to_renew.append(domain)
if len(domains_to_renew) == 0:
logging.info("No domain to renew. Aborting.")
return 0
account_config = config["account"]
account = Account(
private_key=account_config.get("private_key", ACME_ACCOUNT_KEY),
email=account_config["email"],
owner=account_config.get("owner", "root"),
group=account_config.get("group", "root"))
directory_url = config.get("directory_url", LETSENCRYPT_DIRECTORY_URL)
acme_client = account.register(directory_url)
challenge_dir = config.get("challenge_dir", ACME_CHALLENGE_DIR)
challenge = Challenge(challenge_dir, acme_client)
for domain in domains_to_renew:
domain.renew_cert(challenge, force=args.force)
except Exception as e:
logging.exception(e)
2020-12-25 20:32:30 +01:00
return 3
2020-05-21 20:25:55 +02:00
if __name__ == '__main__':
import argparse
exit(main())