Trigger hooks after certificate renewal #6
|
@ -0,0 +1 @@
|
||||||
|
.vscode
|
|
@ -1,10 +1,27 @@
|
||||||
acme_directory: https://acme-v02.api.letsencrypt.org/directory
|
|
||||||
acme_config_dir: /etc/ssl
|
acme_config_dir: /etc/ssl
|
||||||
|
acme_config_file: "{{ acme_config_dir }}/acme.yml"
|
||||||
acme_keys_dir: "{{ acme_config_dir }}/private"
|
acme_keys_dir: "{{ acme_config_dir }}/private"
|
||||||
acme_csr_dir: "{{ acme_config_dir }}/csr"
|
acme_csr_dir: "{{ acme_config_dir }}/csr"
|
||||||
acme_certs_dir: "{{ acme_config_dir }}/certs"
|
acme_certs_dir: "{{ acme_config_dir }}/certs"
|
||||||
acme_accounts_dir: "{{ acme_config_dir }}/accounts"
|
acme_accounts_dir: "{{ acme_config_dir }}/accounts"
|
||||||
acme_account_key: "acme_account.key"
|
acme_script_dir: /opt/acme
|
||||||
|
acme_script_bin: /usr/local/bin/acme-renew-cert
|
||||||
|
|
||||||
acme_ssl_group: ssl-cert
|
acme_ssl_group: ssl-cert
|
||||||
acme_challenge_dir: /var/www/acme
|
acme_config:
|
||||||
acme_domains: []
|
account:
|
||||||
|
private_key: "{{ acme_accounts_dir }}/acme_account.key"
|
||||||
|
email: acme@example.com
|
||||||
|
owner: root
|
||||||
|
group: root
|
||||||
|
directory_url: https://acme-staging-v02.api.letsencrypt.org/directory
|
||||||
|
challenge_dir: /var/www/acme/.well-known/acme-challenge
|
||||||
|
domains:
|
||||||
|
example.com:
|
||||||
|
alt_names:
|
||||||
|
- test.example.com
|
||||||
|
owner: root
|
||||||
|
group: "{{ acme_ssl_group }}"
|
||||||
|
remaining_days: 30
|
||||||
|
hooks:
|
||||||
|
- systemctl reload nginx
|
|
@ -1,68 +1,141 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import grp
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import pwd
|
||||||
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from typing import List, Dict
|
||||||
|
|
||||||
import josepy as jose
|
|
||||||
import acme
|
import acme
|
||||||
from OpenSSL import crypto
|
import josepy as jose
|
||||||
|
import requests
|
||||||
|
import yaml
|
||||||
from acme import client, messages, challenges, crypto_util
|
from acme import client, messages, challenges, crypto_util
|
||||||
|
from OpenSSL import crypto
|
||||||
|
|
||||||
SSL_CONFIG_DIR = "/etc/ssl"
|
SSL_CONFIG_DIR = "/etc/ssl"
|
||||||
|
SSL_CONFIG_FILE = os.path.join(SSL_CONFIG_DIR, "acme.yml")
|
||||||
|
SSL_ACCOUNTS_DIR = os.path.join(SSL_CONFIG_DIR, "accounts")
|
||||||
|
SSL_KEYS_DIR = os.path.join(SSL_CONFIG_DIR, "private")
|
||||||
|
SSL_CSR_DIR = os.path.join(SSL_CONFIG_DIR, "csr")
|
||||||
|
SSL_CERTS_DIR = os.path.join(SSL_CONFIG_DIR, "certs")
|
||||||
|
|
||||||
|
try:
|
||||||
|
SSL_GROUP = grp.getgrnam("ssl-cert").gr_name
|
||||||
|
except KeyError:
|
||||||
|
SSL_GROUP = "root"
|
||||||
|
|
||||||
|
ACME_ACCOUNT_KEY = os.path.join(SSL_ACCOUNTS_DIR, "acme_account.key")
|
||||||
|
ACME_CHALLENGE_DIR = "/var/www/acme/.well-known/acme-challenge"
|
||||||
|
LETSENCRYPT_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory"
|
||||||
OPENSSL_DATE_FORMAT = '%Y%m%d%H%M%SZ'
|
OPENSSL_DATE_FORMAT = '%Y%m%d%H%M%SZ'
|
||||||
|
CERT_REMAINING_DAYS = 30
|
||||||
USER_AGENT = "python-acme"
|
USER_AGENT = "python-acme"
|
||||||
|
|
||||||
def get_cert_expiration_days(cert_path: str):
|
OS_OWNERS = {}
|
||||||
"""Calculate remaining number of days before certificate's expiration.
|
OS_GROUPS = {}
|
||||||
|
|
||||||
:param str cert_path: Path to a certificate file.
|
|
||||||
|
|
||||||
:returns: Number of remaining days before certificate's expiration.
|
def chown(path: str, owner: str, group: str, **kwargs) -> None:
|
||||||
:rtype: int
|
if owner not in OS_OWNERS:
|
||||||
"""
|
OS_OWNERS[owner] = pwd.getpwnam(owner).pw_uid
|
||||||
now = datetime.now()
|
if group not in OS_GROUPS:
|
||||||
cert_expiration_date = now
|
OS_GROUPS[group] = grp.getgrnam(group).gr_gid
|
||||||
|
os.chown(path, OS_OWNERS[owner], OS_GROUPS[group], **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class PrivateKey:
|
||||||
|
def __init__(self, private_key: str, owner: str, group: str):
|
||||||
|
self.private_key = private_key
|
||||||
|
self.owner = owner
|
||||||
|
self.group = group
|
||||||
|
|
||||||
|
def load_or_create_private_key(self) -> crypto.PKey:
|
||||||
|
logging.info(f"Loading private key from {self.private_key}")
|
||||||
|
try:
|
||||||
|
with open(self.private_key, 'rb') as pem_in:
|
||||||
|
private_key_pem = crypto.load_privatekey(crypto.FILETYPE_PEM,
|
||||||
|
pem_in.read())
|
||||||
|
return private_key_pem
|
||||||
|
except IOError as e:
|
||||||
|
logging.warning(f"Unable to load private key: {e}")
|
||||||
|
|
||||||
|
logging.info("Generating a new private key")
|
||||||
|
private_key_pem = crypto.PKey()
|
||||||
|
private_key_pem.generate_key(crypto.TYPE_RSA, 4096)
|
||||||
|
|
||||||
|
logging.info(f"Writing private key into {self.private_key}")
|
||||||
|
private_key_dir = os.path.dirname(self.private_key)
|
||||||
|
os.makedirs(private_key_dir, mode=0o750, exist_ok=True)
|
||||||
|
chown(private_key_dir, self.owner, self.group)
|
||||||
|
ofd = os.open(self.private_key, os.O_CREAT | os.O_WRONLY, 0o640)
|
||||||
|
with open(ofd, 'wb') as pem_out:
|
||||||
|
pem_out.write(crypto.dump_privatekey(crypto.FILETYPE_PEM,
|
||||||
|
private_key_pem))
|
||||||
|
chown(self.private_key, self.owner, self.group)
|
||||||
|
return private_key_pem
|
||||||
|
|
||||||
|
|
||||||
|
class Account(PrivateKey):
|
||||||
|
def __init__(self, private_key: str, email: str, owner: str, group: str):
|
||||||
|
super().__init__(private_key, owner, group)
|
||||||
|
self.private_key_pem = self.load_or_create_private_key()
|
||||||
|
self.email = email
|
||||||
|
|
||||||
|
self.jwk = jose.JWKRSA(key=self.private_key_pem.to_cryptography_key())
|
||||||
|
self.jwk_pub = self.jwk.public_key()
|
||||||
|
|
||||||
|
def register(self, directory_url: str) -> acme.client.ClientV2:
|
||||||
|
client_network = client.ClientNetwork(self.jwk, user_agent=USER_AGENT)
|
||||||
|
|
||||||
|
directory_json = client_network.get(directory_url).json()
|
||||||
|
directory = messages.Directory.from_json(directory_json)
|
||||||
|
acme_client = client.ClientV2(directory, net=client_network)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with open(cert_path, 'rb') as pem_in:
|
logging.info(f"Registering with a new ACME account")
|
||||||
fullchain_pem = crypto.load_certificate(crypto.FILETYPE_PEM, pem_in.read())
|
registration = messages.NewRegistration.from_data(
|
||||||
cert_expiration_date = datetime.strptime(fullchain_pem.get_notAfter().decode(), OPENSSL_DATE_FORMAT)
|
email=self.email,
|
||||||
except IOError as e:
|
terms_of_service_agreed=True)
|
||||||
logging.warning(f"Unable to load certificate: {e}")
|
acme_client.new_account(registration)
|
||||||
|
except acme.errors.ConflictError as e:
|
||||||
|
logging.warning(f"Unable to create a new ACME account: {e}")
|
||||||
|
logging.info("Registering with an existing ACME account")
|
||||||
|
registration_message = messages.NewRegistration(
|
||||||
|
key=self.jwk_pub,
|
||||||
|
only_return_existing=True)
|
||||||
|
registration_response = acme_client._post(directory["newAccount"],
|
||||||
|
registration_message)
|
||||||
|
account = acme_client._regr_from_response(registration_response)
|
||||||
|
acme_client.net.account = account
|
||||||
|
return acme_client
|
||||||
|
|
||||||
cert_expiration_days = (cert_expiration_date - now).days
|
|
||||||
return cert_expiration_days
|
|
||||||
|
|
||||||
def load_or_create_csr(domain: str, csr_path: str, privkey_path: str):
|
class Challenge:
|
||||||
"""Load or create a CSR for a given domain.
|
def __init__(self, path: str, acme_client: acme.client.ClientV2):
|
||||||
|
self.path = path
|
||||||
|
self.acme_client = acme_client
|
||||||
|
|
||||||
:param str domain: A domain name.
|
def run(self, csr_pem: str) -> str:
|
||||||
:param str csr_path: Path to a CSR file.
|
logging.info("Ordering ACME challenge")
|
||||||
:param str privkey_path: Path to a private key. It is used as fallback when the CSR doesn't exist.
|
order = self.acme_client.new_order(csr_pem)
|
||||||
|
|
||||||
:returns: The CSR in PEM file format.
|
logging.info("Selecting HTTP-01 ACME challenge")
|
||||||
:rtype: str
|
challenge = self.select_http01_challenge(order)
|
||||||
"""
|
|
||||||
logging.info(f"Loading {domain} CSR file from {csr_path}…")
|
|
||||||
try:
|
|
||||||
with open(csr_path, 'r') as pem_in:
|
|
||||||
csr_pem = pem_in.read()
|
|
||||||
except IOError as e:
|
|
||||||
logging.warning(f"Unable to load CSR file: {e}")
|
|
||||||
logging.info(f"Loading {domain} private key from {privkey_path}…")
|
|
||||||
with open(privkey_path, 'r') as pem_in:
|
|
||||||
privkey_pem = pem_in.read()
|
|
||||||
|
|
||||||
logging.info(f"Generating a new CSR…")
|
logging.info("Performing HTTP-01 ACME challenge")
|
||||||
csr_pem = crypto_util.make_csr(privkey_pem, [domain])
|
fullchain_pem = self.perform_http01_challenge(challenge, order)
|
||||||
return csr_pem
|
return fullchain_pem
|
||||||
|
|
||||||
def select_http01_challenge(order: messages.OrderResource):
|
@staticmethod
|
||||||
|
def select_http01_challenge(order: messages.OrderResource) \
|
||||||
|
-> challenges.Challenge:
|
||||||
"""Select the HTTP-01 challenge from a given order
|
"""Select the HTTP-01 challenge from a given order
|
||||||
|
|
||||||
:param messages.OrderResource order: ACME order containing the challenges.
|
:param messages.OrderResource order: Order containing the challenges.
|
||||||
|
|
||||||
:returns: The HTTP-01 challenge.
|
:returns: The HTTP-01 challenge.
|
||||||
:rtype: challenges.Challenge
|
:rtype: challenges.Challenge
|
||||||
|
@ -73,86 +146,89 @@ def select_http01_challenge(order: messages.OrderResource):
|
||||||
return challenge
|
return challenge
|
||||||
raise Exception("HTTP-01 challenge was not offered by the CA server.")
|
raise Exception("HTTP-01 challenge was not offered by the CA server.")
|
||||||
|
|
||||||
def perform_http01_challenge(
|
def perform_http01_challenge(self, challenge: challenges.Challenge,
|
||||||
client_acme: client.ClientV2,
|
order: messages.OrderResource) -> str:
|
||||||
challenge: challenges.Challenge,
|
|
||||||
order: messages.OrderResource,
|
|
||||||
challenge_dir: str):
|
|
||||||
"""Perform the HTTP-01 challenge in a given directory
|
"""Perform the HTTP-01 challenge in a given directory
|
||||||
|
|
||||||
:param client.ClientV2 client_acme: A ACME v2 client
|
|
||||||
:param challenges.Challenge challenge: A ACME challenge
|
:param challenges.Challenge challenge: A ACME challenge
|
||||||
:param messages.OrderResource order: An ACME order
|
:param messages.OrderResource order: An ACME order
|
||||||
:param str challenge_dir: The directory containing the challenge
|
|
||||||
|
|
||||||
:returns: The fullchain certificate in PEM file format
|
:returns: The fullchain certificate in PEM file format
|
||||||
:rtype: str
|
:rtype: str
|
||||||
"""
|
"""
|
||||||
response, validation = challenge.response_and_validation(client_acme.net.key)
|
account_key = self.acme_client.net.key
|
||||||
|
response, validation = challenge.response_and_validation(account_key)
|
||||||
validation_filename, _ = validation.split('.')
|
validation_filename, _ = validation.split('.')
|
||||||
|
|
||||||
challenge_path = os.path.join(challenge_dir, validation_filename)
|
challenge_path = os.path.join(self.path, validation_filename)
|
||||||
with open(challenge_path, 'w') as f_out:
|
logging.info(f"Writing challenge into {challenge_path} directory")
|
||||||
f_out.write(validation)
|
os.makedirs(self.path, mode=0o755, exist_ok=True)
|
||||||
|
with open(challenge_path, 'w') as ofd:
|
||||||
|
ofd.write(validation)
|
||||||
|
|
||||||
client_acme.answer_challenge(challenge, response)
|
self.acme_client.answer_challenge(challenge, response)
|
||||||
|
|
||||||
fullchain_pem = client_acme.poll_and_finalize(order).fullchain_pem
|
|
||||||
|
|
||||||
|
fullchain_pem = self.acme_client.poll_and_finalize(order).fullchain_pem
|
||||||
os.remove(challenge_path)
|
os.remove(challenge_path)
|
||||||
|
|
||||||
return fullchain_pem
|
return fullchain_pem
|
||||||
|
|
||||||
def renew_cert(domain: str,
|
|
||||||
account_key_path: str,
|
|
||||||
privkey_path: str,
|
|
||||||
csr_path: str,
|
|
||||||
certs_dir: str,
|
|
||||||
challenge_dir: str,
|
|
||||||
directory_url: str,
|
|
||||||
days_before_renewal: int,
|
|
||||||
force: bool = False):
|
|
||||||
logging.info(f"Checking {domain} certificate's expiration date…")
|
|
||||||
fullchain_path = os.path.join(certs_dir, "fullchain.pem")
|
|
||||||
|
|
||||||
cert_expiration_days = get_cert_expiration_days(fullchain_path)
|
class Domain(PrivateKey):
|
||||||
if not force and (cert_expiration_days >= days_before_renewal):
|
def __init__(self, name: str, owner: str, group: str,
|
||||||
logging.info(f"Certificate expires in {cert_expiration_days} days. Nothing to do.")
|
alt_names: List[str] = None,
|
||||||
|
remaining_days: int = CERT_REMAINING_DAYS,
|
||||||
|
hooks: List[str] = None):
|
||||||
|
private_key = os.path.join(SSL_KEYS_DIR, name + '.key')
|
||||||
|
super().__init__(private_key, owner, group)
|
||||||
|
|
||||||
|
self.name = name
|
||||||
|
self.alt_names = [] if alt_names is None else alt_names
|
||||||
|
self.remaining_days = remaining_days
|
||||||
|
self.hooks = [] if hooks is None else hooks
|
||||||
|
|
||||||
|
certs_dir = os.path.join(SSL_CERTS_DIR, name + '.d')
|
||||||
|
self.csr = os.path.join(SSL_CSR_DIR, name + '.csr')
|
||||||
|
self.cert = os.path.join(certs_dir, 'cert.pem')
|
||||||
|
self.chain_cert = os.path.join(certs_dir, 'chain.pem')
|
||||||
|
self.fullchain_cert = os.path.join(certs_dir, 'fullchain.pem')
|
||||||
|
self.cert_expiration_days = None
|
||||||
|
|
||||||
|
def check_certificate_expiration_date(self) -> bool:
|
||||||
|
"""Indicate whether the certificate will expire soon or not.
|
||||||
|
|
||||||
|
:returns: True if the certificate expires soon.
|
||||||
|
:rtype: bool
|
||||||
|
"""
|
||||||
|
if self.cert_expiration_days is None:
|
||||||
|
logging.debug(f"Checking '{self.cert}' certificate expiration "
|
||||||
|
"date")
|
||||||
|
try:
|
||||||
|
with open(self.cert, 'rb') as pem_in:
|
||||||
|
cert_pem = crypto.load_certificate(crypto.FILETYPE_PEM,
|
||||||
|
pem_in.read())
|
||||||
|
except IOError as e:
|
||||||
|
logging.warning(f"Unable to load certificate: {e}")
|
||||||
|
self.cert_expiration_days = float("-inf")
|
||||||
|
return True
|
||||||
|
|
||||||
|
now = datetime.now()
|
||||||
|
cert_raw_expiration_date = cert_pem.get_notAfter().decode()
|
||||||
|
cert_expiration_date = datetime.strptime(cert_raw_expiration_date,
|
||||||
|
OPENSSL_DATE_FORMAT)
|
||||||
|
self.cert_expiration_days = (cert_expiration_date - now).days
|
||||||
|
logging.info(f"Certificate expires in {self.cert_expiration_days} "
|
||||||
|
"days.")
|
||||||
|
return self.cert_expiration_days <= self.remaining_days
|
||||||
|
|
||||||
|
def renew_cert(self, challenge: Challenge, force: bool = False) -> None:
|
||||||
|
if not (force or self.check_certificate_expiration_date()):
|
||||||
return
|
return
|
||||||
|
logging.info(f"Renewing {self.name} certificates")
|
||||||
|
|
||||||
logging.info(f"Certificate expires in {cert_expiration_days} days! Renewing certificate…")
|
csr_pem = self.load_or_create_csr()
|
||||||
|
fullchain_pem = challenge.run(csr_pem)
|
||||||
|
|
||||||
logging.info(f"Loading account key from {account_key_path}…")
|
logging.info(f"Saving {self.name} certificates")
|
||||||
with open(account_key_path, 'rb') as pem_in:
|
|
||||||
account_pem = crypto.load_privatekey(crypto.FILETYPE_PEM, pem_in.read())
|
|
||||||
account_jwk = jose.JWKRSA(key=account_pem.to_cryptography_key())
|
|
||||||
account_jwk_pub = account_jwk.public_key()
|
|
||||||
|
|
||||||
csr_pem = load_or_create_csr(domain, csr_path, privkey_path)
|
|
||||||
|
|
||||||
client_network = client.ClientNetwork(account_jwk, user_agent=USER_AGENT)
|
|
||||||
directory = messages.Directory.from_json(client_network.get(directory_url).json())
|
|
||||||
client_acme = client.ClientV2(directory, net=client_network)
|
|
||||||
|
|
||||||
logging.info("Registering with ACME account…")
|
|
||||||
# Here we assume we already have an account
|
|
||||||
registration_message = messages.NewRegistration(
|
|
||||||
key=account_jwk_pub,
|
|
||||||
only_return_existing=True)
|
|
||||||
registration_response = client_acme._post(directory["newAccount"], registration_message)
|
|
||||||
account = client_acme._regr_from_response(registration_response)
|
|
||||||
client_acme.net.account = account
|
|
||||||
|
|
||||||
logging.info("Ordering ACME challenge…")
|
|
||||||
order = client_acme.new_order(csr_pem)
|
|
||||||
|
|
||||||
logging.info("Selecting HTTP-01 ACME challenge…")
|
|
||||||
challenge = select_http01_challenge(order)
|
|
||||||
|
|
||||||
logging.info("Performing HTTP-01 ACME challenge…")
|
|
||||||
fullchain_pem = perform_http01_challenge(client_acme, challenge, order, challenge_dir)
|
|
||||||
|
|
||||||
logging.info(f"Writing {domain} certificates into {certs_dir}…")
|
|
||||||
certs_pem = []
|
certs_pem = []
|
||||||
for line in fullchain_pem.split('\n'):
|
for line in fullchain_pem.split('\n'):
|
||||||
if 'BEGIN CERTIFICATE' in line:
|
if 'BEGIN CERTIFICATE' in line:
|
||||||
|
@ -163,55 +239,97 @@ def renew_cert(domain: str,
|
||||||
if 'END CERTIFICATE' in line:
|
if 'END CERTIFICATE' in line:
|
||||||
certs_pem.append(cert_pem)
|
certs_pem.append(cert_pem)
|
||||||
|
|
||||||
cert_path = os.path.join(certs_dir, "cert.pem")
|
logging.info(f"Writing '{self.cert}' certificate")
|
||||||
with open(cert_path, 'w') as pem_out:
|
cert_dir = os.path.dirname(self.cert)
|
||||||
|
os.makedirs(cert_dir, mode=0o755, exist_ok=True)
|
||||||
|
with open(self.cert, 'w') as pem_out:
|
||||||
pem_out.write(certs_pem[0])
|
pem_out.write(certs_pem[0])
|
||||||
|
|
||||||
chain_path = os.path.join(certs_dir, "chain.pem")
|
logging.info(f"Writing '{self.chain_cert}' chain certificate")
|
||||||
with open(chain_path, 'w') as pem_out:
|
chain_cert_dir = os.path.dirname(self.chain_cert)
|
||||||
|
os.makedirs(chain_cert_dir, mode=0o755, exist_ok=True)
|
||||||
|
with open(self.chain_cert, 'w') as pem_out:
|
||||||
pem_out.write(''.join(certs_pem[1:]))
|
pem_out.write(''.join(certs_pem[1:]))
|
||||||
|
|
||||||
with open(fullchain_path, 'w') as pem_out:
|
logging.info(f"Writing '{self.fullchain_cert}' full chain certificate")
|
||||||
|
fullchain_cert_dir = os.path.dirname(self.fullchain_cert)
|
||||||
|
os.makedirs(fullchain_cert_dir, mode=0o755, exist_ok=True)
|
||||||
|
with open(self.fullchain_cert, 'w') as pem_out:
|
||||||
pem_out.write(fullchain_pem)
|
pem_out.write(fullchain_pem)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
self.run_hooks()
|
||||||
import argparse
|
|
||||||
|
|
||||||
|
def load_or_create_csr(self) -> str:
|
||||||
|
"""Load or create a CSR for the domain.
|
||||||
|
|
||||||
|
:returns: The CSR in PEM file format.
|
||||||
|
:rtype: str
|
||||||
|
"""
|
||||||
|
logging.info(f"Loading {self.name} CSR file from '{self.csr}'")
|
||||||
|
try:
|
||||||
|
with open(self.csr, 'rb') as pem_in:
|
||||||
|
csr = pem_in.read()
|
||||||
|
csr_pem = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)
|
||||||
|
except IOError as e:
|
||||||
|
logging.warning(f"Unable to load CSR file: {e}")
|
||||||
|
csr = self.create_csr()
|
||||||
|
|
||||||
|
logging.info("Checking if CSR contains all the domains")
|
||||||
|
domain_names = set()
|
||||||
|
for ext in csr_pem.get_extensions():
|
||||||
|
if ext.get_short_name().decode() != "subjectAltName":
|
||||||
|
continue
|
||||||
|
subject_domains = str(ext).split(',')
|
||||||
|
for subject_domain in subject_domains:
|
||||||
|
_, domain = str(subject_domain).split(':')
|
||||||
|
domain_names.add(domain)
|
||||||
|
domain_names_diff = domain_names ^ set([self.name] + self.alt_names)
|
||||||
|
if len(domain_names_diff) > 0:
|
||||||
|
logging.warning(f"Differences found in CSR: {domain_names_diff}")
|
||||||
|
csr = self.create_csr()
|
||||||
|
|
||||||
|
return csr
|
||||||
|
|
||||||
|
def create_csr(self):
|
||||||
|
private_key_pem = self.load_or_create_private_key()
|
||||||
|
private_key = crypto.dump_privatekey(crypto.FILETYPE_PEM,
|
||||||
|
private_key_pem)
|
||||||
|
|
||||||
|
logging.info(f"Generating a new CSR")
|
||||||
|
csr = crypto_util.make_csr(private_key, [self.name] + self.alt_names)
|
||||||
|
|
||||||
|
logging.info(f"Writing CSR file into '{self.csr}'")
|
||||||
|
csr_dir = os.path.dirname(self.csr)
|
||||||
|
os.makedirs(csr_dir, mode=0o755, exist_ok=True)
|
||||||
|
with open(self.csr, 'wb') as pem_out:
|
||||||
|
pem_out.write(csr)
|
||||||
|
return csr
|
||||||
|
|
||||||
|
def run_hooks(self) -> None:
|
||||||
|
for hook in self.hooks:
|
||||||
|
logging.info(f"Running hook: '{hook}'")
|
||||||
|
hook_res = subprocess.run(hook.split(),
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.PIPE)
|
||||||
|
if hook_res.returncode != 0:
|
||||||
|
logging.error(f"Unable to run hook: {hook_res.stderr}")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description="Renew ACME certificates for a list of domains. This script assumes you already have an account on the CA server and a private key for each certificate.",
|
description="Renew ACME certificates for a list of domains. This "
|
||||||
|
"script assumes you already have an account on the CA "
|
||||||
|
"server and a private key for each certificate.",
|
||||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||||
parser.add_argument("domains", nargs='+',
|
parser.add_argument("--config", "-c", default=SSL_CONFIG_FILE,
|
||||||
help="List of domain names for which to renew the certificate.")
|
help="Path to the config file.")
|
||||||
parser.add_argument("--account_key", "-a",
|
parser.add_argument("--quiet", "-q", action="store_true",
|
||||||
default=os.path.join(SSL_CONFIG_DIR, "accounts", "acme_account.key"),
|
|
||||||
help="Path to the account key.")
|
|
||||||
parser.add_argument("--privkey", "-p",
|
|
||||||
default=os.path.join(SSL_CONFIG_DIR, "private", "{domain}.pem"),
|
|
||||||
help="Path to the private certificate.")
|
|
||||||
parser.add_argument("--csr", "-r",
|
|
||||||
default=os.path.join(SSL_CONFIG_DIR, "csr", "{domain}.pem"),
|
|
||||||
help="Path to the CSR file. If the file doesn't exist, it will be generated from the private key and the domain name.")
|
|
||||||
parser.add_argument("--certs", "-o",
|
|
||||||
default=os.path.join(SSL_CONFIG_DIR, "certs", "{domain}.d"),
|
|
||||||
help="Path to the certificates directory.")
|
|
||||||
parser.add_argument("--challenge", "-c",
|
|
||||||
default="/var/www/html/.well-known/acme-challenge",
|
|
||||||
help="Path to the challenge directory.")
|
|
||||||
parser.add_argument("--directory_url", "-u",
|
|
||||||
default="https://acme-v02.api.letsencrypt.org/directory",
|
|
||||||
help="Directory URL on which performing ACME challenges. Only ACME v2 is supported.")
|
|
||||||
parser.add_argument("--days", "-d", type=int,
|
|
||||||
default=30,
|
|
||||||
help="Days before attempting to renew the certificates.")
|
|
||||||
parser.add_argument("--quiet", "-q",
|
|
||||||
action="store_true",
|
|
||||||
help="Quiet mode.")
|
help="Quiet mode.")
|
||||||
parser.add_argument("--verbose", "-v",
|
parser.add_argument("--verbose", "-v", action="store_true",
|
||||||
action="store_true",
|
|
||||||
help="Increase verbosity.")
|
help="Increase verbosity.")
|
||||||
parser.add_argument("--force", "-f",
|
parser.add_argument("--force", "-f", action="store_true",
|
||||||
action="store_true",
|
help="Force certificates renewal without checking "
|
||||||
help="Force certificates renewal without checking their expiration date.")
|
"their expiration date.")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
if args.quiet:
|
if args.quiet:
|
||||||
|
@ -221,21 +339,74 @@ if __name__ == '__main__':
|
||||||
else:
|
else:
|
||||||
log_level = logging.INFO
|
log_level = logging.INFO
|
||||||
|
|
||||||
logging.basicConfig(stream=sys.stdout, level=log_level, format="%(levelname)s:%(message)s")
|
logging.basicConfig(stream=sys.stdout, level=log_level,
|
||||||
|
format="%(levelname)s:%(message)s")
|
||||||
|
|
||||||
for domain in args.domains:
|
try:
|
||||||
account_key = args.account_key.format(domain=domain)
|
with open(args.config, 'r') as ifd:
|
||||||
privkey = args.privkey.format(domain=domain)
|
config = yaml.safe_load(ifd)
|
||||||
csr = args.csr.format(domain=domain)
|
except IOError as e:
|
||||||
certs_dir = args.certs.format(domain=domain)
|
logging.error(f"Unable to load config file: {e}")
|
||||||
challenge_dir = args.challenge.format(domain=domain)
|
return 1
|
||||||
|
|
||||||
renew_cert(domain,
|
if isinstance(config["domains"], list):
|
||||||
account_key_path=account_key,
|
domains = {domain["name"]: domain for domain in config["domains"]}
|
||||||
privkey_path=privkey,
|
if len(domains) != len(config["domains"]):
|
||||||
csr_path=csr,
|
domain_uniques = set()
|
||||||
certs_dir=certs_dir,
|
domain_duplicates = set()
|
||||||
challenge_dir=challenge_dir,
|
for domain in config["domains"]:
|
||||||
directory_url=args.directory_url,
|
domain_name = domain["name"]
|
||||||
days_before_renewal=args.days,
|
if domain_name not in domain_uniques:
|
||||||
force=args.force)
|
domain_uniques.add(domain_name)
|
||||||
|
else:
|
||||||
|
domain_duplicates.add(domain_name)
|
||||||
|
logging.error(f"Duplicate domain name(s) found: {domain_uniques}")
|
||||||
|
return 2
|
||||||
|
config["domains"] = domains
|
||||||
|
|
||||||
|
try:
|
||||||
|
domains_to_renew = []
|
||||||
|
for domain_name, domain_details in config["domains"].items():
|
||||||
|
remaining_days = domain_details.get("remaining_days",
|
||||||
|
CERT_REMAINING_DAYS)
|
||||||
|
domain = Domain(domain_name,
|
||||||
|
owner=domain_details.get("owner", "root"),
|
||||||
|
group=domain_details.get("group", SSL_GROUP),
|
||||||
|
alt_names=domain_details.get("alt_names"),
|
||||||
|
remaining_days=remaining_days,
|
||||||
|
hooks=domain_details.get("hooks"))
|
||||||
|
|
||||||
|
if not (args.force or domain.check_certificate_expiration_date()):
|
||||||
|
logging.info(f"{domain.name} certificate will not be renewed.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
logging.info(f"{domain.name} certificate will be renewed.")
|
||||||
|
domains_to_renew.append(domain)
|
||||||
|
|
||||||
|
if len(domains_to_renew) == 0:
|
||||||
|
logging.info("No domain to renew. Aborting.")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
account_config = config["account"]
|
||||||
|
account = Account(
|
||||||
|
private_key=account_config.get("private_key", ACME_ACCOUNT_KEY),
|
||||||
|
email=account_config["email"],
|
||||||
|
owner=account_config.get("owner", "root"),
|
||||||
|
group=account_config.get("group", "root"))
|
||||||
|
|
||||||
|
directory_url = config.get("directory_url", LETSENCRYPT_DIRECTORY_URL)
|
||||||
|
acme_client = account.register(directory_url)
|
||||||
|
|
||||||
|
challenge_dir = config.get("challenge_dir", ACME_CHALLENGE_DIR)
|
||||||
|
challenge = Challenge(challenge_dir, acme_client)
|
||||||
|
|
||||||
|
for domain in domains_to_renew:
|
||||||
|
domain.renew_cert(challenge, force=args.force)
|
||||||
|
except Exception as e:
|
||||||
|
logging.exception(e)
|
||||||
|
return 3
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
import argparse
|
||||||
|
exit(main())
|
||||||
|
|
|
@ -3,8 +3,13 @@
|
||||||
gather_facts: no
|
gather_facts: no
|
||||||
|
|
||||||
tasks:
|
tasks:
|
||||||
- name: Update apt cache
|
- name: Update APT cache
|
||||||
raw: apt update
|
raw: apt update
|
||||||
|
|
||||||
- name: Install python3 package
|
- name: Install Python3 package
|
||||||
raw: apt install -y --no-install-recommends python3
|
raw: apt install -y --no-install-recommends python3
|
||||||
|
|
||||||
|
- name: Install cron package
|
||||||
|
package:
|
||||||
|
name: cron
|
||||||
|
state: present
|
|
@ -1,74 +0,0 @@
|
||||||
- name: Create {{ domain_name }} certificates directory
|
|
||||||
file:
|
|
||||||
path: "{{ acme_certs_dir }}/{{ domain_name }}.d"
|
|
||||||
state: directory
|
|
||||||
owner: root
|
|
||||||
group: "{{ acme_ssl_group }}"
|
|
||||||
mode: "755"
|
|
||||||
tags: acme_install
|
|
||||||
|
|
||||||
- name: Generate Let's Encrypt account key
|
|
||||||
openssl_privatekey:
|
|
||||||
path: "{{ acme_accounts_dir }}/{{ acme_account_key }}"
|
|
||||||
owner: root
|
|
||||||
group: root
|
|
||||||
mode: "600"
|
|
||||||
type: RSA
|
|
||||||
size: 4096
|
|
||||||
tags: acme_account
|
|
||||||
|
|
||||||
- name: Generate Let's Encrypt private key for {{ domain_name }}
|
|
||||||
openssl_privatekey:
|
|
||||||
path: "{{ acme_keys_dir }}/{{ domain_name }}.pem"
|
|
||||||
owner: root
|
|
||||||
group: "{{ acme_ssl_group }}"
|
|
||||||
mode: "640"
|
|
||||||
type: RSA
|
|
||||||
size: 4096
|
|
||||||
|
|
||||||
- name: Generate Let's Encrypt CSR for {{ domain_name }}
|
|
||||||
openssl_csr:
|
|
||||||
path: "{{ acme_csr_dir }}/{{ domain_name }}.csr"
|
|
||||||
owner: root
|
|
||||||
group: "{{ acme_ssl_group }}"
|
|
||||||
mode: "644"
|
|
||||||
privatekey_path: "{{ acme_keys_dir }}/{{ domain_name }}.pem"
|
|
||||||
common_name: "{{ domain_name }}"
|
|
||||||
|
|
||||||
- name: Begin Let's Encrypt challenges for {{ domain_name }}
|
|
||||||
acme_certificate:
|
|
||||||
acme_directory: "{{ acme_directory }}"
|
|
||||||
acme_version: "{{ acme_version }}"
|
|
||||||
account_key_src: "{{ acme_accounts_dir }}/{{ acme_account_key }}"
|
|
||||||
account_email: "{{ acme_email }}"
|
|
||||||
terms_agreed: yes
|
|
||||||
challenge: http-01
|
|
||||||
csr: "{{ acme_csr_dir }}/{{ domain_name }}.csr"
|
|
||||||
dest: "{{ acme_certs_dir }}/{{ domain_name }}.d/cert.pem"
|
|
||||||
fullchain_dest: "{{ acme_certs_dir }}/{{ domain_name }}.d/fullchain.pem"
|
|
||||||
remaining_days: 30
|
|
||||||
register: _acme_challenge
|
|
||||||
|
|
||||||
- name: Implement and complete Let's Encrypt challenge for {{ domain_name }}
|
|
||||||
when: _acme_challenge is changed
|
|
||||||
block:
|
|
||||||
- name: Implement http-01 challenge files for {{ domain_name }}
|
|
||||||
copy:
|
|
||||||
content: "{{ _acme_challenge.challenge_data[domain_name]['http-01'].resource_value }}"
|
|
||||||
dest: "{{ acme_challenge_dir }}/{{ _acme_challenge.challenge_data[domain_name]['http-01'].resource }}"
|
|
||||||
owner: root
|
|
||||||
group: root
|
|
||||||
mode: "644"
|
|
||||||
|
|
||||||
- name: Complete Let's Encrypt challenges for {{ domain_name }}
|
|
||||||
acme_certificate:
|
|
||||||
acme_directory: "{{ acme_directory }}"
|
|
||||||
acme_version: "{{ acme_version }}"
|
|
||||||
account_key_src: "{{ acme_accounts_dir }}/{{ acme_account_key }}"
|
|
||||||
account_email: "{{ acme_email }}"
|
|
||||||
challenge: http-01
|
|
||||||
csr: "{{ acme_csr_dir }}/{{ domain_name }}.csr"
|
|
||||||
dest: "{{ acme_certs_dir }}/{{ domain_name }}.d/cert.pem"
|
|
||||||
chain_dest: "{{ acme_certs_dir }}/{{ domain_name }}.d/chain.pem"
|
|
||||||
fullchain_dest: "{{ acme_certs_dir }}/{{ domain_name }}.d/fullchain.pem"
|
|
||||||
data: "{{ _acme_challenge }}"
|
|
|
@ -1,48 +1,59 @@
|
||||||
- name: Install ACME dependencies
|
- name: Install ACME dependencies
|
||||||
package:
|
package:
|
||||||
name: python3-acme
|
name: "{{ package }}"
|
||||||
state: present
|
state: present
|
||||||
|
loop: "{{ acme_packages }}"
|
||||||
|
loop_control:
|
||||||
|
loop_var: package
|
||||||
tags: acme_install
|
tags: acme_install
|
||||||
|
|
||||||
- name: Install SSL dependencies
|
- name: Install SSL dependencies
|
||||||
package:
|
package:
|
||||||
name: ssl-cert
|
name: ssl-cert
|
||||||
state: present
|
state: present
|
||||||
|
tags: acme_install
|
||||||
|
|
||||||
- name: Create Let's Encrypt config directories
|
- name: Create ACME config directories
|
||||||
file:
|
file:
|
||||||
path: "{{ config_dir }}"
|
path: "{{ config_dir }}"
|
||||||
state: directory
|
state: directory
|
||||||
owner: root
|
owner: root
|
||||||
group: "{{ acme_ssl_group }}"
|
group: root
|
||||||
mode: "711"
|
mode: "755"
|
||||||
loop:
|
loop:
|
||||||
- "{{ acme_config_dir }}"
|
- "{{ acme_config_dir }}"
|
||||||
- "{{ acme_keys_dir }}"
|
- "{{ acme_certs_dir }}"
|
||||||
- "{{ acme_accounts_dir }}"
|
|
||||||
- "{{ acme_csr_dir }}"
|
- "{{ acme_csr_dir }}"
|
||||||
loop_control:
|
loop_control:
|
||||||
loop_var: config_dir
|
loop_var: config_dir
|
||||||
tags: acme_install
|
tags: acme_install
|
||||||
|
|
||||||
- name: Create challenge directory
|
- name: Create ACME private keys directory
|
||||||
file:
|
file:
|
||||||
path: "{{ acme_challenge_dir }}/.well-known/acme-challenge"
|
path: "{{ acme_keys_dir }}"
|
||||||
|
state: directory
|
||||||
|
owner: root
|
||||||
|
group: "{{ acme_ssl_group }}"
|
||||||
|
mode: "750"
|
||||||
|
tags: acme_install
|
||||||
|
|
||||||
|
- name: Create ACME accounts directory
|
||||||
|
file:
|
||||||
|
path: "{{ acme_accounts_dir }}"
|
||||||
state: directory
|
state: directory
|
||||||
owner: root
|
owner: root
|
||||||
group: root
|
group: root
|
||||||
mode: "755"
|
mode: "750"
|
||||||
tags: acme_install
|
tags: acme_install
|
||||||
|
|
||||||
- name: Perform ACME challenge for each domain
|
- name: Copy ACME config file
|
||||||
include_tasks:
|
copy:
|
||||||
file: acme_challenge.yml
|
content: "{{ acme_config | to_nice_yaml(indent=2) }}"
|
||||||
apply:
|
dest: "{{ acme_config_file }}"
|
||||||
tags: acme_challenge
|
owner: root
|
||||||
loop: "{{ acme_domains | unique }}"
|
group: root
|
||||||
loop_control:
|
mode: "600"
|
||||||
loop_var: domain_name
|
tags: [acme_install, acme_config]
|
||||||
tags: acme_challenge
|
|
||||||
|
|
||||||
- name: Create directory for certificate renewal tool
|
- name: Create directory for certificate renewal tool
|
||||||
file:
|
file:
|
||||||
|
@ -51,31 +62,40 @@
|
||||||
group: root
|
group: root
|
||||||
mode: "755"
|
mode: "755"
|
||||||
state: directory
|
state: directory
|
||||||
tags: acme_renew
|
tags: acme_install
|
||||||
|
|
||||||
- name: Copy script to renew ACME certificates
|
- name: Copy script to renew ACME certificates
|
||||||
copy:
|
copy:
|
||||||
src: acme_renew_cert.py
|
src: acme_renew_cert.py
|
||||||
dest: /opt/acme/acme_renew_cert.py
|
dest: "{{ acme_script_dir }}/acme_renew_cert.py"
|
||||||
owner: root
|
owner: root
|
||||||
group: root
|
group: root
|
||||||
mode: "755"
|
mode: "755"
|
||||||
tags: acme_renew
|
tags: acme_install
|
||||||
|
|
||||||
|
- name: Create '{{ acme_script_bin }}' symlink for ACME renewal script
|
||||||
|
file:
|
||||||
|
src: "{{ acme_script_dir }}/acme_renew_cert.py"
|
||||||
|
dest: "{{ acme_script_bin }}"
|
||||||
|
state: link
|
||||||
|
owner: root
|
||||||
|
group: root
|
||||||
|
mode: "755"
|
||||||
|
tags: acme_install
|
||||||
|
|
||||||
|
- name: Perform ACME challenge for each domain
|
||||||
|
command: acme-renew-cert -c {{ acme_config_file | quote }}
|
||||||
|
changed_when: "'No domain to renew' not in _acme_challenge.stdout"
|
||||||
|
register: _acme_challenge
|
||||||
|
tags: acme_challenge
|
||||||
|
|
||||||
- name: Setup cron job for ACME certificates renewal of {{ domain_name }}
|
- name: Setup cron job for ACME certificates renewal of {{ domain_name }}
|
||||||
cron:
|
cron:
|
||||||
name: acme renew {{ domain_name }} cert
|
user: root
|
||||||
job: >-
|
name: acme-renew-cert
|
||||||
bash -c 'sleep $((RANDOM \% 3600))' && /opt/acme/acme_renew_cert.py {{ domain_name }} -q
|
cron_file: acme-renew-cert
|
||||||
-a {{ (acme_accounts_dir + '/' + acme_account_key) | quote }}
|
job: "{{ acme_script_bin }} -q -c {{ acme_config_file | quote }}"
|
||||||
-p {{ acme_keys_dir | quote }}/{domain}.pem
|
|
||||||
-r {{ acme_csr_dir | quote }}/{domain}.csr
|
|
||||||
-o {{ acme_certs_dir | quote }}/{domain}.d
|
|
||||||
-c {{ acme_challenge_dir | quote }}/.well-known/acme-challenge
|
|
||||||
minute: "30"
|
minute: "30"
|
||||||
hour: "2"
|
hour: "2"
|
||||||
state: present
|
state: present
|
||||||
loop: "{{ acme_domains | unique }}"
|
tags: acme_cron
|
||||||
loop_control:
|
|
||||||
loop_var: domain_name
|
|
||||||
tags: acme_renew
|
|
|
@ -1 +1,4 @@
|
||||||
acme_version: 2
|
acme_version: 2
|
||||||
|
acme_packages:
|
||||||
|
- python3-acme
|
||||||
|
- python3-yaml
|
||||||
|
|
Loading…
Reference in New Issue