geant_acme.py 6.33 KiB
#!/usr/bin/env python3
#
"""Geant Acme
Usage:
geant_acme.py --domain <DOMAIN>... (--provider <PROVIDER> | --wildcard) (--client <CLIENT>... | --wildcard)
geant_acme.py (-h | --help)
Options:
-h --help Show this screen.
-c CLIENT --client=CLIENT Client
-d DOMAIN --domain=DOMAIN Domain
-p PROVIDER --provider=PROVIDER Provider
-w --wildcard Use wildcard
"""
import os
import datetime
import subprocess as sp
import configparser
from docopt import docopt
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning #pylint: disable=E0401
API_URL = 'https://infoblox.geant.org/wapi/v2.6.1'
SEP = '+' + 72*'-' + '+'
def os_exit():
""" exit """
# close logging
end_date = time_log()
print('{}\n[{}] Job ended'.format(SEP, end_date))
print('+' + 72*'=' + '+')
os.sys.exit()
def time_log():
""" return day and time """
return datetime.datetime.now().strftime("%d %b %Y %H:%M:%S")
def get_reference(iblox_domain, iblox_user, iblox_pw):
""" grab reference for txt object """
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) #pylint: disable=E1101
ref_obj = requests.get(
'{}/record:txt?name=_acme-challenge.{}&_return_as_object=1'.format(
API_URL, iblox_domain),
auth=(iblox_user, iblox_pw),
verify=False
)
if ref_obj.status_code != 200:
return 'error'
try:
ref = ref_obj.json()['result'][0]['_ref']
except IndexError:
ref = None
if ref:
print('got reference object: {}'.format(ref))
else:
print('{} empty reference object: no challenge to delete\n{}'.format(
iblox_domain, SEP))
return ref
def delete_challenge(object_reference, iblox_user, iblox_pw):
""" delete txt record """
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) #pylint: disable=E1101
del_req = requests.delete(
'{}/{}'.format(API_URL, object_reference),
auth=(iblox_user, iblox_pw),
verify=False
)
print('delete challenge - http code (200 expected): {}\n{}'.format(del_req.status_code, SEP))
return del_req.status_code
def create_challenge(iblox_domain, acme_token, iblox_user, iblox_pw):
""" upload txt record """
print('{}\ncreating challenge _acme-challenge.{}'.format(SEP, iblox_domain))
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) #pylint: disable=E1101
post_req = requests.post(
'{}/record:txt'.format(API_URL),
auth=(iblox_user, iblox_pw),
data={
'name': '_acme-challenge.{}'.format(iblox_domain),
'text': acme_token,
'ttl': '30',
"view": "External"
},
verify=False
)
print('create challenge - http code (201 expected): {} {}\n{}'.format(
post_req.status_code, post_req.reason, SEP))
return post_req.status_code
def run_certbot(cbot_domain, provider, wild_card=None):
""" get certificate from ACME provider """
domain_list = ' -d '.join(list(cbot_domain))
if wild_card:
domain_list = '*.{}'.format(domain_list)
cbot_cmd = '/usr/local/bin/certbot certonly --non-interactive ' \
+ ' -c /etc/{}/cli.ini'.format(provider) \
+ ' --manual-auth-hook /root/bin/infoblox_hook_{}'.format(provider) \
+ ' --cert-name {} -d {}'.format(cbot_domain[0], domain_list)
print('running: {}'.format(cbot_cmd))
cbot_child = sp.Popen(cbot_cmd, stdout=sp.PIPE, stderr=sp.PIPE, shell=True)
cbot_out, cbot_err = cbot_child.communicate()
if cbot_child.returncode != 0:
print("error running certbot: {}".format(cbot_err.decode("utf-8")))
os_exit()
else:
decoded_msg = cbot_out.decode("utf-8")
msg = decoded_msg[:decoded_msg.rfind('\n')]
print(msg)
#if msg.find('Certificate not yet due for renewal') != -1:
# os_exit()
return msg
# Here we Go.
if __name__ == "__main__":
ARGS = docopt(__doc__)
DOMAIN = ARGS['--domain']
PROVIDER = ARGS['--provider']
CLIENTS = ARGS['--client']
WILDCARD = ARGS['--wildcard']
if ARGS['--wildcard']:
LOG_FILE = '/var/log/acme_letsencrypt/acme.log'
else:
LOG_FILE = '/var/log/acme_{}/acme.log'.format(PROVIDER)
os.sys.stdout = os.sys.stderr = open(LOG_FILE, 'a', 1)
# start logging
START_DATE = time_log()
CMD_LINE = ' '.join(os.sys.argv)
print('+' + 72*'=' + '+')
print('[{}] Job started: {}\n{}'.format(START_DATE, CMD_LINE, SEP))
CONFIG = configparser.RawConfigParser()
CONFIG.read_file(open('/root/.geant_acme.ini'))
IBLOX_PASS = CONFIG.get('geant_acme', 'iblox_pass')
IBLOX_USER = CONFIG.get('geant_acme', 'iblox_user')
# Maybe there is an Acme challenge left over. We try to delete it first.
for domain_item in DOMAIN:
REF_OBJ = get_reference(domain_item, IBLOX_USER, IBLOX_PASS)
if REF_OBJ == 'error':
print('error retrieving reference object')
os_exit()
if REF_OBJ:
DEL_STATUS = delete_challenge(
REF_OBJ, IBLOX_USER, IBLOX_PASS)
if DEL_STATUS != 200:
print('{}: {} error deleting challenge on Infoblox'.format(
domain_item, DEL_STATUS))
os_exit()
# run certbot
run_certbot(DOMAIN, PROVIDER, WILDCARD)
print(SEP)
# remove the Acme challenges from Infoblox
for domain_item in DOMAIN:
REF_OBJ = get_reference(domain_item, IBLOX_USER, IBLOX_PASS)
if REF_OBJ == 'error':
print('error retrieving reference object')
os_exit()
if REF_OBJ:
DEL_STATUS = delete_challenge(
REF_OBJ, IBLOX_USER, IBLOX_PASS)
if DEL_STATUS != 200:
print('{}: {} error deleting challenge on Infoblox'.format(
domain_item, DEL_STATUS))
os_exit()
# if we are here, everything went fine and we can upload the certificates
if WILDCARD:
UPLOADER = '/root/bin/geant_acme_uploader.py -d {} -p letsencrypt -w'.format(DOMAIN[0])
os.system(UPLOADER)
else:
for client in CLIENTS:
UPLOADER = '/root/bin/geant_acme_uploader.py -d {} -c {} -p {}'.format(
DOMAIN[0], client, PROVIDER)
os.system(UPLOADER)
os_exit()