241 lines
9.0 KiB
Python
Executable File
241 lines
9.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import logging
|
|
import os
|
|
import sys
|
|
from datetime import datetime
|
|
|
|
import josepy as jose
|
|
import acme
|
|
from OpenSSL import crypto
|
|
from acme import client, messages, challenges, crypto_util
|
|
|
|
SSL_CONFIG_DIR="/etc/ssl"
|
|
OPENSSL_DATE_FORMAT = '%Y%m%d%H%M%SZ'
|
|
USER_AGENT="python-acme"
|
|
|
|
def get_cert_expiration_days(cert_path: str):
|
|
"""Calculate remaining number of days before certificate's expiration.
|
|
|
|
:param str cert_path: Path to a certificate file.
|
|
|
|
:returns: Number of remaining days before certificate's expiration.
|
|
:rtype: int
|
|
"""
|
|
now = datetime.now()
|
|
cert_expiration_date = now
|
|
|
|
try:
|
|
with open(cert_path, 'rb') as pem_in:
|
|
fullchain_pem = crypto.load_certificate(crypto.FILETYPE_PEM, pem_in.read())
|
|
cert_expiration_date = datetime.strptime(fullchain_pem.get_notAfter().decode(), OPENSSL_DATE_FORMAT)
|
|
except IOError as e:
|
|
logging.warning(f"Unable to load certificate: {e}")
|
|
|
|
cert_expiration_days = (cert_expiration_date - now).days
|
|
return cert_expiration_days
|
|
|
|
def load_or_create_csr(domain: str, csr_path: str, privkey_path: str):
|
|
"""Load or create a CSR for a given domain.
|
|
|
|
:param str domain: A domain name.
|
|
:param str csr_path: Path to a CSR file.
|
|
:param str privkey_path: Path to a private key. It is used as fallback when the CSR doesn't exist.
|
|
|
|
:returns: The CSR in PEM file format.
|
|
:rtype: str
|
|
"""
|
|
logging.info(f"Loading {domain} CSR file from {csr_path}…")
|
|
try:
|
|
with open(csr_path, 'r') as pem_in:
|
|
csr_pem = pem_in.read()
|
|
except IOError as e:
|
|
logging.warning(f"Unable to load CSR file: {e}")
|
|
logging.info(f"Loading {domain} private key from {privkey_path}…")
|
|
with open(privkey_path, 'r') as pem_in:
|
|
privkey_pem = pem_in.read()
|
|
|
|
logging.info(f"Generating a new CSR…")
|
|
csr_pem = crypto_util.make_csr(privkey_pem, [domain])
|
|
return csr_pem
|
|
|
|
def select_http01_challenge(order: messages.OrderResource):
|
|
"""Select the HTTP-01 challenge from a given order
|
|
|
|
:param messages.OrderResource order: ACME order containing the challenges.
|
|
|
|
:returns: The HTTP-01 challenge.
|
|
:rtype: challenges.Challenge
|
|
"""
|
|
for auth in order.authorizations:
|
|
for challenge in auth.body.challenges:
|
|
if isinstance(challenge.chall, challenges.HTTP01):
|
|
return challenge
|
|
raise Exception("HTTP-01 challenge was not offered by the CA server.")
|
|
|
|
def perform_http01_challenge(
|
|
client_acme: client.ClientV2,
|
|
challenge: challenges.Challenge,
|
|
order: messages.OrderResource,
|
|
challenge_dir: str):
|
|
"""Perform the HTTP-01 challenge in a given directory
|
|
|
|
:param client.ClientV2 client_acme: A ACME v2 client
|
|
:param challenges.Challenge challenge: A ACME challenge
|
|
:param messages.OrderResource order: An ACME order
|
|
:param str challenge_dir: The directory containing the challenge
|
|
|
|
:returns: The fullchain certificate in PEM file format
|
|
:rtype: str
|
|
"""
|
|
response, validation = challenge.response_and_validation(client_acme.net.key)
|
|
validation_filename, _ = validation.split('.')
|
|
|
|
challenge_path = os.path.join(challenge_dir, validation_filename)
|
|
with open(challenge_path, 'w') as f_out:
|
|
f_out.write(validation)
|
|
|
|
client_acme.answer_challenge(challenge, response)
|
|
|
|
fullchain_pem = client_acme.poll_and_finalize(order).fullchain_pem
|
|
|
|
os.remove(challenge_path)
|
|
|
|
return fullchain_pem
|
|
|
|
def renew_cert(domain: str,
|
|
account_key_path: str,
|
|
privkey_path: str,
|
|
csr_path: str,
|
|
certs_dir: str,
|
|
challenge_dir: str,
|
|
directory_url: str,
|
|
days_before_renewal: int,
|
|
force: bool = False):
|
|
logging.info(f"Checking {domain} certificate's expiration date…")
|
|
fullchain_path = os.path.join(certs_dir, "fullchain.pem")
|
|
|
|
cert_expiration_days = get_cert_expiration_days(fullchain_path)
|
|
if not force and (cert_expiration_days >= days_before_renewal):
|
|
logging.info(f"Certificate expires in {cert_expiration_days} days. Nothing to do.")
|
|
return
|
|
|
|
logging.info(f"Certificate expires in {cert_expiration_days} days! Renewing certificate…")
|
|
|
|
logging.info(f"Loading account key from {account_key_path}…")
|
|
with open(account_key_path, 'rb') as pem_in:
|
|
account_pem = crypto.load_privatekey(crypto.FILETYPE_PEM, pem_in.read())
|
|
account_jwk = jose.JWKRSA(key=account_pem.to_cryptography_key())
|
|
account_jwk_pub = account_jwk.public_key()
|
|
|
|
csr_pem = load_or_create_csr(domain, csr_path, privkey_path)
|
|
|
|
client_network = client.ClientNetwork(account_jwk, user_agent=USER_AGENT)
|
|
directory = messages.Directory.from_json(client_network.get(directory_url).json())
|
|
client_acme = client.ClientV2(directory, net=client_network)
|
|
|
|
logging.info("Registering with ACME account…")
|
|
# Here we assume we already have an account
|
|
registration_message = messages.NewRegistration(
|
|
key=account_jwk_pub,
|
|
only_return_existing=True)
|
|
registration_response = client_acme._post(directory["newAccount"], registration_message)
|
|
account = client_acme._regr_from_response(registration_response)
|
|
client_acme.net.account = account
|
|
|
|
logging.info("Ordering ACME challenge…")
|
|
order = client_acme.new_order(csr_pem)
|
|
|
|
logging.info("Selecting HTTP-01 ACME challenge…")
|
|
challenge = select_http01_challenge(order)
|
|
|
|
logging.info("Performing HTTP-01 ACME challenge…")
|
|
fullchain_pem = perform_http01_challenge(client_acme, challenge, order, challenge_dir)
|
|
|
|
logging.info(f"Writing {domain} certificates into {certs_dir}…")
|
|
certs_pem = []
|
|
for line in fullchain_pem.split('\n'):
|
|
if 'BEGIN CERTIFICATE' in line:
|
|
cert_pem = ''
|
|
|
|
cert_pem += line + '\n'
|
|
|
|
if 'END CERTIFICATE' in line:
|
|
certs_pem.append(cert_pem)
|
|
|
|
cert_path = os.path.join(certs_dir, "cert.pem")
|
|
with open(cert_path, 'w') as pem_out:
|
|
pem_out.write(certs_pem[0])
|
|
|
|
chain_path = os.path.join(certs_dir, "chain.pem")
|
|
with open(chain_path, 'w') as pem_out:
|
|
pem_out.write(''.join(certs_pem[1:]))
|
|
|
|
with open(fullchain_path, 'w') as pem_out:
|
|
pem_out.write(fullchain_pem)
|
|
|
|
if __name__ == '__main__':
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Renew ACME certificates for a list of domains. This script assumes you already have an account on the CA server and a private key for each certificate.",
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
|
parser.add_argument("domains", nargs='+',
|
|
help="List of domain names for which to renew the certificate.")
|
|
parser.add_argument("--account_key", "-a",
|
|
default=os.path.join(SSL_CONFIG_DIR, "accounts", "acme_account.key"),
|
|
help="Path to the account key.")
|
|
parser.add_argument("--privkey", "-p",
|
|
default=os.path.join(SSL_CONFIG_DIR, "private", "{domain}.pem"),
|
|
help="Path to the private certificate.")
|
|
parser.add_argument("--csr", "-r",
|
|
default=os.path.join(SSL_CONFIG_DIR, "csr", "{domain}.pem"),
|
|
help="Path to the CSR file. If the file doesn't exist, it will be generated from the private key and the domain name.")
|
|
parser.add_argument("--certs", "-o",
|
|
default=os.path.join(SSL_CONFIG_DIR, "certs", "{domain}.d"),
|
|
help="Path to the certificates directory.")
|
|
parser.add_argument("--challenge", "-c",
|
|
default="/var/www/html/.well-known/acme-challenge",
|
|
help="Path to the challenge directory.")
|
|
parser.add_argument("--directory_url", "-u",
|
|
default="https://acme-v02.api.letsencrypt.org/directory",
|
|
help="Directory URL on which performing ACME challenges. Only ACME v2 is supported.")
|
|
parser.add_argument("--days", "-d", type=int,
|
|
default=30,
|
|
help="Days before attempting to renew the certificates.")
|
|
parser.add_argument("--quiet", "-q",
|
|
action="store_true",
|
|
help="Quiet mode.")
|
|
parser.add_argument("--verbose", "-v",
|
|
action="store_true",
|
|
help="Increase verbosity.")
|
|
parser.add_argument("--force", "-f",
|
|
action="store_true",
|
|
help="Force certificates renewal without checking their expiration date.")
|
|
args = parser.parse_args()
|
|
|
|
if args.quiet:
|
|
log_level = logging.WARNING
|
|
elif args.verbose:
|
|
log_level = logging.DEBUG
|
|
else:
|
|
log_level = logging.INFO
|
|
|
|
logging.basicConfig(stream=sys.stdout, level=log_level, format="%(levelname)s:%(message)s")
|
|
|
|
for domain in args.domains:
|
|
account_key = args.account_key.format(domain=domain)
|
|
privkey = args.privkey.format(domain=domain)
|
|
csr = args.csr.format(domain=domain)
|
|
certs_dir = args.certs.format(domain=domain)
|
|
challenge_dir = args.challenge.format(domain=domain)
|
|
|
|
renew_cert(domain,
|
|
account_key_path=account_key,
|
|
privkey_path=privkey,
|
|
csr_path=csr,
|
|
certs_dir=certs_dir,
|
|
challenge_dir=challenge_dir,
|
|
directory_url=args.directory_url,
|
|
days_before_renewal=args.days,
|
|
force=args.force) |