Compare commits

..

No commits in common. "master" and "5-trigger-hooks-after-certificate-renewal" have entirely different histories.

3 changed files with 70 additions and 168 deletions

View File

@ -1,23 +1,22 @@
acme_ssl_dir: /etc/ssl acme_config_dir: /etc/ssl
acme_config_dir: "{{ acme_ssl_dir }}/acme.d" acme_config_file: "{{ acme_config_dir }}/acme.yml"
acme_config_file: "{{ acme_ssl_dir }}/acme.yml" acme_keys_dir: "{{ acme_config_dir }}/private"
acme_keys_dir: "{{ acme_ssl_dir }}/private" acme_csr_dir: "{{ acme_config_dir }}/csr"
acme_csr_dir: "{{ acme_ssl_dir }}/csr" acme_certs_dir: "{{ acme_config_dir }}/certs"
acme_certs_dir: "{{ acme_ssl_dir }}/certs" acme_accounts_dir: "{{ acme_config_dir }}/accounts"
acme_accounts_dir: "{{ acme_ssl_dir }}/accounts"
acme_script_dir: /opt/acme acme_script_dir: /opt/acme
acme_script_bin: /usr/local/bin/acme-renew-cert acme_script_bin: /usr/local/bin/acme-renew-cert
acme_ssl_group: ssl-cert acme_ssl_group: ssl-cert
acme_config:
acme_account_private_key: "{{ acme_accounts_dir }}/acme_account.key" account:
acme_account_email: acme@example.com private_key: "{{ acme_accounts_dir }}/acme_account.key"
acme_account_owner: root email: acme@example.com
acme_account_group: "{{ acme_account_group }}" owner: root
acme_directory_url: https://acme-staging-v02.api.letsencrypt.org/directory group: root
acme_root_dir: /var/www/acme directory_url: https://acme-staging-v02.api.letsencrypt.org/directory
acme_challenge_dir: "{{ acme_challenge_root_dir }}/.well-known/acme-challenge" challenge_dir: /var/www/acme/.well-known/acme-challenge
acme_domains: domains:
example.com: example.com:
alt_names: alt_names:
- test.example.com - test.example.com
@ -26,11 +25,3 @@ acme_domains:
remaining_days: 30 remaining_days: 30
hooks: hooks:
- systemctl reload nginx - systemctl reload nginx
acme_config:
account:
private_key: "{{ acme_account_private_key }}"
email: "{{ acme_account_email }}"
owner: "{{ acme_account_owner }}"
group: "{{ acme_account_group }}"
directory_url: "{{ acme_directory_url }}"
challenge_dir: "{{ acme_challenge_dir }}"

View File

@ -1,6 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import glob
import grp import grp
import logging import logging
import os import os
@ -29,8 +28,6 @@ try:
except KeyError: except KeyError:
SSL_GROUP = "root" SSL_GROUP = "root"
ACME_CONFIG_FILE = f"{SSL_CONFIG_DIR}/acme.yml"
ACME_CONFIG_DIR = f"{SSL_CONFIG_DIR}/acme.d"
ACME_ACCOUNT_KEY = os.path.join(SSL_ACCOUNTS_DIR, "acme_account.key") ACME_ACCOUNT_KEY = os.path.join(SSL_ACCOUNTS_DIR, "acme_account.key")
ACME_CHALLENGE_DIR = "/var/www/acme/.well-known/acme-challenge" ACME_CHALLENGE_DIR = "/var/www/acme/.well-known/acme-challenge"
LETSENCRYPT_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory" LETSENCRYPT_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory"
@ -179,11 +176,8 @@ class Challenge:
class Domain(PrivateKey): class Domain(PrivateKey):
def __init__(self, name: str, owner: str, group: str, def __init__(self, name: str, owner: str, group: str,
alt_names: List[str] = None, alt_names: List[str] = None,
cert: str = None, chain_cert: str = None, remaining_days: int = CERT_REMAINING_DAYS,
fullchain_cert: str = None, private_key: str = None,
csr: str = None, remaining_days: int = CERT_REMAINING_DAYS,
hooks: List[str] = None): hooks: List[str] = None):
if private_key is None:
private_key = os.path.join(SSL_KEYS_DIR, name + '.key') private_key = os.path.join(SSL_KEYS_DIR, name + '.key')
super().__init__(private_key, owner, group) super().__init__(private_key, owner, group)
@ -193,23 +187,10 @@ class Domain(PrivateKey):
self.hooks = [] if hooks is None else hooks self.hooks = [] if hooks is None else hooks
certs_dir = os.path.join(SSL_CERTS_DIR, name + '.d') certs_dir = os.path.join(SSL_CERTS_DIR, name + '.d')
if cert is None:
self.cert = os.path.join(certs_dir, 'cert.pem')
else:
self.cert = cert
if chain_cert is None:
self.chain_cert = os.path.join(certs_dir, 'chain.pem')
else:
self.chain_cert = chain_cert
if fullchain_cert is None:
self.fullchain_cert = os.path.join(certs_dir, 'fullchain.pem')
else:
self.fullchain_cert = fullchain_cert
if csr is None:
self.csr = os.path.join(SSL_CSR_DIR, name + '.csr') self.csr = os.path.join(SSL_CSR_DIR, name + '.csr')
else: self.cert = os.path.join(certs_dir, 'cert.pem')
self.csr = csr self.chain_cert = os.path.join(certs_dir, 'chain.pem')
self.fullchain_cert = os.path.join(certs_dir, 'fullchain.pem')
self.cert_expiration_days = None self.cert_expiration_days = None
def check_certificate_expiration_date(self) -> bool: def check_certificate_expiration_date(self) -> bool:
@ -288,10 +269,10 @@ class Domain(PrivateKey):
try: try:
with open(self.csr, 'rb') as pem_in: with open(self.csr, 'rb') as pem_in:
csr = pem_in.read() csr = pem_in.read()
csr_pem = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)
except IOError as e: except IOError as e:
logging.warning(f"Unable to load CSR file: {e}") logging.warning(f"Unable to load CSR file: {e}")
csr = self.create_csr() csr = self.create_csr()
csr_pem = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)
logging.info("Checking if CSR contains all the domains") logging.info("Checking if CSR contains all the domains")
domain_names = set() domain_names = set()
@ -334,95 +315,14 @@ class Domain(PrivateKey):
logging.error(f"Unable to run hook: {hook_res.stderr}") logging.error(f"Unable to run hook: {hook_res.stderr}")
class Config:
def __init__(self, filename, directory):
self.filename = filename
self.directory = directory
self.config = {}
self.domain_configs = {}
def load(self) -> None:
with open(self.filename, 'r') as ifd:
self._data = yaml.safe_load(ifd)
for pattern in ('*.yml', '*.yaml'):
config_glob = os.path.join(self.directory, pattern)
for config_file in glob.glob(config_glob):
with open(config_file, 'r') as ifd:
self.domain_configs[config_file] = yaml.safe_load(ifd)
def parse_domains(self) -> List[Domain]:
domains = self._data.get("domains", {})
domain_uniques = set()
domain_duplicates = set()
if isinstance(domains, list):
for domain in domains:
domain_name = domain["name"]
if domain_name not in domain_uniques:
domain_uniques.add(domain_name)
else:
domain_duplicates.add(domain_name)
domains = {domain["name"]: domain for domain in domains}
for config_file, domain_config in self.domain_configs.items():
domain_filename, _ = os.path.splitext(config_file)
domain_name = domain_config.get("name", domain_filename)
if domain_name not in domain_uniques:
domain_uniques.add(domain_name)
else:
domain_duplicates.add(domain_name)
domains[domain_name] = domain_config
if len(domain_duplicates) > 0:
raise RuntimeError(f"Duplicate domains found: {domain_duplicates}")
domains_parsed = []
for domain_name, domain_details in domains.items():
remaining_days = domain_details.get("remaining_days",
CERT_REMAINING_DAYS)
domain = Domain(
name=domain_name,
owner=domain_details.get("owner", "root"),
group=domain_details.get("group", SSL_GROUP),
alt_names=domain_details.get("alt_names"),
cert=domain_details.get("cert"),
chain_cert=domain_details.get("chain_cert"),
fullchain_cert=domain_details.get("fullchain_cert"),
private_key=domain_details.get("private_key"),
csr=domain_details.get("csr"),
remaining_days=remaining_days,
hooks=domain_details.get("hooks"))
domains_parsed.append(domain)
return domains_parsed
def parse_account(self) -> Account:
account_config = self._data["account"]
return Account(
private_key=account_config.get("private_key", ACME_ACCOUNT_KEY),
email=account_config["email"],
owner=account_config.get("owner", "root"),
group=account_config.get("group", "root"))
@property
def directory_url(self) -> str:
return self._data.get("directory_url", LETSENCRYPT_DIRECTORY_URL)
@property
def challenge_dir(self) -> str:
return self._data.get("challenge_dir", ACME_CHALLENGE_DIR)
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Renew ACME certificates for a list of domains. This " description="Renew ACME certificates for a list of domains. This "
"script assumes you already have an account on the CA " "script assumes you already have an account on the CA "
"server and a private key for each certificate.", "server and a private key for each certificate.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter) formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--config", "-c", default=ACME_CONFIG_FILE, parser.add_argument("--config", "-c", default=SSL_CONFIG_FILE,
help="Path to the config file.") help="Path to the config file.")
parser.add_argument("--config-dir", "-d", default=ACME_CONFIG_DIR,
help="Path to the config directory.")
parser.add_argument("--quiet", "-q", action="store_true", parser.add_argument("--quiet", "-q", action="store_true",
help="Quiet mode.") help="Quiet mode.")
parser.add_argument("--verbose", "-v", action="store_true", parser.add_argument("--verbose", "-v", action="store_true",
@ -442,22 +342,40 @@ def main():
logging.basicConfig(stream=sys.stdout, level=log_level, logging.basicConfig(stream=sys.stdout, level=log_level,
format="%(levelname)s:%(message)s") format="%(levelname)s:%(message)s")
config = Config(args.config, args.config_dir)
try: try:
config.load() with open(args.config, 'r') as ifd:
config = yaml.safe_load(ifd)
except IOError as e: except IOError as e:
logging.error(f"Unable to load config file: {e}") logging.error(f"Unable to load config file: {e}")
return 1 return 1
try: if isinstance(config["domains"], list):
domains = config.parse_domains() domains = {domain["name"]: domain for domain in config["domains"]}
except RuntimeError as e: if len(domains) != len(config["domains"]):
logging.error(f"Unable to parse domains: {e}") domain_uniques = set()
domain_duplicates = set()
for domain in config["domains"]:
domain_name = domain["name"]
if domain_name not in domain_uniques:
domain_uniques.add(domain_name)
else:
domain_duplicates.add(domain_name)
logging.error(f"Duplicate domain name(s) found: {domain_uniques}")
return 2 return 2
config["domains"] = domains
try: try:
domains_to_renew = [] domains_to_renew = []
for domain in domains: for domain_name, domain_details in config["domains"].items():
remaining_days = domain_details.get("remaining_days",
CERT_REMAINING_DAYS)
domain = Domain(domain_name,
owner=domain_details.get("owner", "root"),
group=domain_details.get("group", SSL_GROUP),
alt_names=domain_details.get("alt_names"),
remaining_days=remaining_days,
hooks=domain_details.get("hooks"))
if not (args.force or domain.check_certificate_expiration_date()): if not (args.force or domain.check_certificate_expiration_date()):
logging.info(f"{domain.name} certificate will not be renewed.") logging.info(f"{domain.name} certificate will not be renewed.")
continue continue
@ -469,9 +387,18 @@ def main():
logging.info("No domain to renew. Aborting.") logging.info("No domain to renew. Aborting.")
return 0 return 0
account = config.parse_account() account_config = config["account"]
acme_client = account.register(config.directory_url) account = Account(
challenge = Challenge(config.challenge_dir, acme_client) private_key=account_config.get("private_key", ACME_ACCOUNT_KEY),
email=account_config["email"],
owner=account_config.get("owner", "root"),
group=account_config.get("group", "root"))
directory_url = config.get("directory_url", LETSENCRYPT_DIRECTORY_URL)
acme_client = account.register(directory_url)
challenge_dir = config.get("challenge_dir", ACME_CHALLENGE_DIR)
challenge = Challenge(challenge_dir, acme_client)
for domain in domains_to_renew: for domain in domains_to_renew:
domain.renew_cert(challenge, force=args.force) domain.renew_cert(challenge, force=args.force)

View File

@ -21,7 +21,6 @@
group: root group: root
mode: "755" mode: "755"
loop: loop:
- "{{ acme_ssl_dir }}"
- "{{ acme_config_dir }}" - "{{ acme_config_dir }}"
- "{{ acme_certs_dir }}" - "{{ acme_certs_dir }}"
- "{{ acme_csr_dir }}" - "{{ acme_csr_dir }}"
@ -53,22 +52,7 @@
dest: "{{ acme_config_file }}" dest: "{{ acme_config_file }}"
owner: root owner: root
group: root group: root
mode: "640" mode: "600"
tags: [acme_install, acme_config]
- name: Copy ACME domain config files
copy:
content: "{{ domain | to_nice_yaml(indent=2) }}"
dest: "{{ acme_config_dir }}/{{ domain_name }}.yml"
owner: root
group: root
mode: "640"
loop: "{{ (acme_domains.keys() | list) if acme_domains is mapping else acme_domains }}"
loop_control:
label: "{{ domain_name }}"
vars:
domain_name: "{{ item if item is string else item.name }}"
domain: "{{ acme_domains[item] if item is string else item }}"
tags: [acme_install, acme_config] tags: [acme_install, acme_config]
- name: Create directory for certificate renewal tool - name: Create directory for certificate renewal tool
@ -100,7 +84,7 @@
tags: acme_install tags: acme_install
- name: Perform ACME challenge for each domain - name: Perform ACME challenge for each domain
command: acme-renew-cert -c {{ acme_config_file | quote }} -d {{ acme_config_dir | quote }} command: acme-renew-cert -c {{ acme_config_file | quote }}
changed_when: "'No domain to renew' not in _acme_challenge.stdout" changed_when: "'No domain to renew' not in _acme_challenge.stdout"
register: _acme_challenge register: _acme_challenge
tags: acme_challenge tags: acme_challenge
@ -110,7 +94,7 @@
user: root user: root
name: acme-renew-cert name: acme-renew-cert
cron_file: acme-renew-cert cron_file: acme-renew-cert
job: "{{ acme_script_bin }} -q -c {{ acme_config_file | quote }} -d {{ acme_config_dir | quote }}" job: "{{ acme_script_bin }} -q -c {{ acme_config_file | quote }}"
minute: "30" minute: "30"
hour: "2" hour: "2"
state: present state: present