Skip to content
Snippets Groups Projects

Migrate to Traefik v2

Merged Quentin Duchemin requested to merge traefik-v2 into master
39 files
+ 280
962
Compare changes
  • Side-by-side
  • Inline
Files
39
+ 0
200
""" Traefik Certificate extraction and update module
Provides tools to extract TLS certificates from Treafik acme.json files.
Classes
-------
CertUpdater:
This class handles acme files decoding and stores or updates the resulting
certificates in separate folders.
"""
import os
import errno
import json
from base64 import b64decode
class CertUpdater:
"""Decodes acme files and extact the resulting certificates if needed.
This class keeps flat certificates for specified domain names
in sync with the contents of an acme.json file. The certificates are stored in separated
subdirectories named after the main domain name they are valid for.
If the certificates specify x509 SAN records, only the main CN is used (a single
directory is created, regardless of the number of alternate names.)
Attributes
----------
acme_file : str
Name of the acme.json file containing the certificates
names : list
List of domain names to keep updated
certs_root_path : str
Directory where the certificates will be stored. The certificates are
stored in subdirectories of this directory
Methods
-------
add(name)
Adds a domain name to the updater
remove(name)
Removes a domain name from the updater
update()
Decodes the acme.json file and updates the certicates that need to be updated.
"""
def __init__(self, acme_file, certs_root_path):
"""
Parameters
----------
acme_file : str
path of the acme file containing the certificates
certs_root_path : str
root directory where certificates will be extracted.
They are extracted in subdirectories named after the CN stored in
the certificate.
"""
self.acme_file = acme_file
self.names = []
self.certs_root_path = certs_root_path + '/'
def add(self, name):
"""Adds a domain name to the updater.
Parameters
----------
name : str
domain name to add
"""
self.names.append(name)
# Create directories etc if they do not exist
try:
os.makedirs(self.certs_root_path + name + '/')
except OSError as error:
if error.errno != errno.EEXIST:
raise
def remove(self, name):
"""Removes a domain name from the updater.
Parameters
----------
name : str
domain name to remove
"""
try:
self.names.remove(name)
except ValueError:
pass
def update(self):
"""Decodes the acme.json file and updates the certicates that need to be updated.
Returns
-------
list
a list of domain names that have been updated. An empty list if no certificate
has been updated.
"""
# Open and decode the acme file
try:
with open(self.acme_file) as acme:
data = json.loads(acme.read())
except FileNotFoundError:
print('ACME file {0} not found when trying to decode.'.format(self.acme_file))
return []
except json.JSONDecodeError:
print('File {0} does not look like an ACME file.'.format(self.acme_file))
return []
# Get the acme file version
try:
acme_ver = 2 if 'acme-v02' in data['Account']['Registration']['uri'] else 1
except TypeError:
if 'DomainsCertificate' in data:
acme_ver = 1
else:
acme_ver = 2
# Get the certificates
if acme_ver == 1:
certs = data['DomainsCertificate']['Certs']
elif acme_ver == 2:
certs = data['Certificates']
# Iterate over certificates. If a certificate has been updated,
# add its name to the updated_names.
updated_names = []
for c in certs:
if acme_ver == 1:
name = c['Certificate']['Domain']
key = c['Certificate']['PrivateKey']
fullchain = c['Certificate']['Certificate']
elif acme_ver == 2:
name = c['Domain']['Main']
key = c['Key']
fullchain = c['Certificate']
if name in self.names:
key = b64decode(key).decode('utf-8')
fullchain = b64decode(fullchain).decode('utf-8')
chain_start = fullchain.find('-----BEGIN CERTIFICATE-----', 1)
cert = fullchain[0:chain_start]
chain = fullchain[chain_start:]
print('Updating certificates for {0}'.format(name))
if self._needs_updating(name, fullchain):
path = self.certs_root_path + name + '/'
with open(path + 'privkey.pem', 'w') as f:
f.write(key)
with open(path + 'cert.pem', 'w') as f:
f.write(cert)
with open(path + 'chain.pem', 'w') as f:
f.write(chain)
with open(path + 'fullchain.pem', 'w') as f:
f.write(fullchain)
print('Certificates updated')
updated_names.append(name)
else:
print('Cetificates are already up-to-date')
return updated_names
def _needs_updating(self, name, fullchain):
"""Checks if a certificate has changed
Parameters
----------
name : str
Name of the directory containing the certificates
fullchain : str
Full certificates extracted from the acme.json file
Returns
-------
bool
True if the contents of the current certificates are
different from the fullchain. False otherwise.
"""
path = self.certs_root_path + name + '/fullchain.pem'
try:
with open(path, 'r') as f:
return f.read() != fullchain
except FileNotFoundError:
return True
\ No newline at end of file
Loading