-
Marco Malavolti authoredMarco Malavolti authored
utils.py 23.36 KiB
#!/usr/bin/env python3
import base64
import datetime
import html
import json
import logging
import pathlib
import re
import requests
import six
import sys
import shutil
import time
import uuid
import zlib
import eccs_properties as e_p
from selenium import webdriver
from selenium.common.exceptions import WebDriverException,TimeoutException,NoSuchElementException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from logging.handlers import RotatingFileHandler
from urllib3.util import parse_url
from urllib.parse import urlparse, urlencode
def sha1(idp_entity_id):
"""Returns the SHA1 of the input value
:param idp_entity_id: input value
:return: SHA1 of the entityID
:rtype: string
"""
import hashlib
result = hashlib.sha1(idp_entity_id.encode())
return result.hexdigest()
def get_label(url_or_urn):
"""Returns a label usable for filenames
:param url_or_urn: input value
:return: a label
:rtype: string
"""
if url_or_urn.startswith('http'):
return parse_url(url_or_urn)[2]
else:
return parse_url(url_or_urn)[4].lstrip('/')
def get_reg_auth_dict(list_feds):
"""Returns a dictionary of Federation and their registration authorities
:param list_feds: Python list containing all federations
:return: a dictionary of '{ nameFed:reg_auth }'
:rtype: dict
"""
regAuth_dict = {}
for key,value in list_feds.items():
name = value['name']
reg_auth = value['reg_auth']
regAuth_dict[name] = reg_auth
return regAuth_dict
def get_idp_list(list_eccs_idps,reg_auth=None,idp_entityid=None):
"""Returns a list of IdPs or a list of only one IdP
- Only one if 'idp_entityid' parameter is used
- All IdPs of a specific Federation if 'reg_auth' parameter is used
- All eduGAIN IdPs if no parameter is used
:param list_eccs_idps: Python list containing all IdPs provided to ECCS
:param reg_auth: a string containing a registration authority URL
:param idp_entityid: a string containing an IdP entityID
:return: a list of IdPs
:rtype: list
"""
fed_idp_list = []
for idp in list_eccs_idps:
if (idp_entityid):
if (idp['entityID'] == idp_entityid):
fed_idp_list.append(idp)
elif (reg_auth):
if (idp['registrationAuthority'] == reg_auth):
fed_idp_list.append(idp)
else:
fed_idp_list.append(idp)
return fed_idp_list
def get_list_from_url(url, dest_file):
"""Downloads the content of a web page retrieved by its url into a directory and then use the saved file to return a Python list of the converted version of its content
:param url: URL to download
:param des_file: destination file
:return: the resulting Python list
:rtype: list
"""
from pathlib import Path
path = pathlib.Path(dest_file)
if(path.exists() == False):
with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f:
f.write(requests.get(url).text)
with open("%s" % (dest_file), mode="r", encoding='utf-8') as f:
return json.loads(f.read().replace("'","'"))
def get_logger(path, filename, mode="a", log_level="DEBUG"):
"""Returns a logger used by ECCS API Development Server to create its log file
:param path: directory path of the files
:param filename: filename of the new file created
:param mode: a(append),w(write),r(read)
:param log_level: log level to use (DEBUG,INFO,WARN,ERROR,CRITICAL)
:return: a logger
:rtype: logger object
"""
logger = logging.getLogger(__name__)
ch = logging.handlers.RotatingFileHandler(f"{path}/{filename}", mode, 0, 5, 'utf-8')
if (log_level == "DEBUG"):
logger.setLevel(logging.DEBUG)
ch.setLevel(logging.DEBUG)
elif (log_level == "INFO"):
logger.setLevel(logging.INFO)
ch.setLevel(logging.INFO)
elif (log_level == "WARN"):
logger.setLevel(logging.WARN)
ch.setLevel(logging.WARN)
elif (log_level == "ERROR"):
logger.setLevel(logging.ERROR)
ch.setLevel(logging.ERROR)
elif (log_level == "CRITICAL"):
logger.setLevel(logging.CRITICAL)
ch.setLevel(logging.CRITICAL)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(module)s - %(message)s', datefmt='%d/%m/%Y %H:%M:%S')
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
def get_idp_contacts(idp,contactType):
"""Returns a list of email address for a specific type of contact
:param idp: dictionary containing the IdP info
:param contactType: type of contact to consider
:return: a python list containing all contacts email addresses
:rtype: list
"""
ctcList = []
for ctcType in idp['contacts']:
if (ctcType == contactType):
for ctc in idp['contacts'][contactType]:
if (ctc.get('emailOrPhone')):
if (ctc['emailOrPhone'].get('EmailAddress')):
ctcList.append(ctc['emailOrPhone']['EmailAddress'][0])
else:
ctcList.append('missing email')
else:
ctcList.append('missing email')
return ctcList
def store_page_source(idp,sp,test,page_source="",header="",footer=""):
"""Writes the login page source into a specific file
:param page_souce: content to write into a file
:param idp: idp owner of the page source
:param sp: sp who has been used
:param test: flag needed to decide if write on the console instead of on the file
:return: True or False
:rtype: boolean
"""
if (test):
sys.stdout.write(f"{header}\n{html.escape(page_source)}\n{footer}")
return True
else:
# Put the page_source into an appropriate HTML file
with open(f"{e_p.ECCS_HTMLDIR}/{e_p.DAY}/{sha1(idp['entityID'])}---{get_label(sp['entityID'])}.html","w") as h:
try:
h.write(f"{header}{html.escape(page_source)}{footer}")
return True
except IOError:
return False
# Get the Google Chrome Selenium Driver
def get_driver_selenium(idp=None,sp=None,debugSelenium=False):
"""Returns driver needed to perform the ECCS check
:param idp: the idp need to be checked
:param sp: the sp used to check the idp
:param debugSelenium: a flag needed to enable a more verbose logging
:return: selenium driver
:rtype: object
"""
# Configure Web-driver
# https://peter.sh/experiments/chromium-command-line-switches/
chrome_options = Options()
chrome_options.page_load_strategy = 'normal'
#chrome_options.add_argument('--start-in-incognito')
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--disable-extensions')
chrome_options.add_argument('--disable-dinosaur-easter-egg')
chrome_options.add_argument('--disable-sync')
# For DEBUG only (By default ChromeDriver logs only warnings/errors to stderr.
# When debugging issues, it is helpful to enable more verbose logging.)
if (debugSelenium):
label_idp = get_label(idp['entityID'])
label_sp = get_label(sp['entityID'])
sha1_idp = sha1(idp['entityID'])
try:
driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options, service_args=['--verbose', f'--log-path={e_p.ECCS_SELENIUMLOGDIR}/{sha1_idp}_{label_idp}_{label_sp}.log'])
except: # Wait 3 seconds before try again to get the webdriver for all kind of exception will occur
time.sleep(3)
driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options, service_args=['--verbose', f'--log-path={e_p.ECCS_SELENIUMLOGDIR}/{sha1_idp}_{label_idp}_{label_sp}.log'])
else:
try:
driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options)
except: # Wait 3 seconds before try again to get the webdriver for all kind of exception will occur
time.sleep(3)
driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options)
return driver
def follow_all_nested_iframes(driver):
"""Returns the page source reached by following all the first <iframe> found on the web pages
:param driver: the selenium driver
:return: the IdP login page source reached
:rtype: string
"""
try:
while (driver.find_element(By.XPATH,'//iframe')):
driver.switch_to.frame(0)
except NoSuchElementException:
return driver.page_source
def deflate_and_base64_encode(string_val):
"""Deflates and the base64 encodes a string
:param string_val: The string to deflate and encode
:return: The deflated and encoded string
"""
if not isinstance(string_val, six.binary_type):
string_val = string_val.encode('utf-8')
return base64.b64encode(zlib.compress(string_val)[2:-4])
def generate_login_url(sp_entity_id, sp_http_post_acs_location, idp_http_redirect_sso_location):
"""Returns a login url needed to get the IdP Login page for an SP
:param sp_entity_id: the SP entityID
:param sp_http_post_acs_location: the SP HTTP-POST AssertionConsumerService URL
:param idp_http_redirect_sso_location: the IDP HTTP-Redirect SSO Location
:return: a login url built upon a SAML Authn Request
:rtype: string
"""
authn_request_id = f'_{str(uuid.uuid4()).replace("-", "")}'
issue_instant = str(datetime.datetime.now(datetime.timezone.utc).isoformat(timespec='seconds')).replace('+00:00', 'Z')
authn_request = '<samlp:AuthnRequest xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol" ' \
f'AssertionConsumerServiceURL="{sp_http_post_acs_location}" ' \
f'Destination="{idp_http_redirect_sso_location}" ' \
f'ID="{authn_request_id}" ' \
f'IssueInstant="{issue_instant}" ' \
'ProtocolBinding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" ' \
'Version="2.0">' \
f'<saml:Issuer xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion">{sp_entity_id}</saml:Issuer>' \
'<samlp:NameIDPolicy AllowCreate="1"/>' \
'</samlp:AuthnRequest>'
args = {"SAMLRequest": deflate_and_base64_encode(authn_request)}
string = urlencode(args)
glue_char = "&" if urlparse(idp_http_redirect_sso_location).query else "?"
return glue_char.join([idp_http_redirect_sso_location, string])
# ECCS Check made by Selenium
def check_idp_response_selenium(sp,idp,test):
"""Performs the ECCS check on an IdP
:param sp: the SP used to test the IDP
:param idp: the IdP to test
:param test: a flag to perform a check without changes
"""
# Common variables
fqdn_idp = get_label(idp['Location'])
saml_request_url = generate_login_url(sp['entityID'], sp['http_post_acs_location'], idp['Location'])
robots = ""
federations_disabled_dict = e_p.FEDS_DISABLED_DICT
idps_disabled_dict = e_p.IDPS_DISABLED_DICT
webdriver_error = 0 # No WebDriver Error
# Handle Disabled Idps/Federations
if (idp['registrationAuthority'] in federations_disabled_dict.keys()):
check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
pgsrc = federations_disabled_dict[idp['registrationAuthority']]
store_page_source(idp,sp,test,pgsrc)
return (idp['entityID'],sp['entityID'],check_time,"DISABLED",webdriver_error)
if (idp['entityID'] in idps_disabled_dict.keys()):
check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
pgsrc = idps_disabled_dict[idp['entityID']]
store_page_source(idp,sp,test,pgsrc)
return (idp['entityID'],sp['entityID'],check_time,"DISABLED",webdriver_error)
# Robots + SSL Check
try:
hdrs = {
'User-Agent': f'{e_p.ROBOTS_USER_AGENT}'
}
check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
robots = requests.get(f"https://{fqdn_idp}/robots.txt", headers=hdrs, verify=e_p.CA_BUNDLE_PATH, timeout=e_p.ECCS_REQUESTSTIMEOUT)
if (robots == ""):
robots = requests.get(f"http://{fqdn_idp}/robots.txt", headers=hdrs, verify=False, timeout=e_p.ECCS_REQUESTSTIMEOUT)
# Catch SSL Exceptions and block the ECCS check
except requests.exceptions.SSLError as e:
if ('unable to get local issuer certificate' not in str(e)):
if (test):
header = f"\nAn SSL Error occurred while opening https://{fqdn_idp}/robots.txt:\n\n{e}\n\nCheck it on SSL Labs: https://www.ssllabs.com/ssltest/analyze.html?d={fqdn_idp}"
else:
header = f"<h1>SSL ERROR</h1><h2>An SSL error occurred for the server {fqdn_idp}:</h2><p>{e}</p><p>Check it on SSL Labs: <a href='https://www.ssllabs.com/ssltest/analyze.html?d={fqdn_idp}'>Click Here</a></p>"
pgsrc = ""
store_page_source(idp,sp,test,pgsrc,header)
return (idp['entityID'],sp['entityID'],check_time,"SSL-Error",webdriver_error)
else:
pass
# Do not consider any other Exception
except:
pass
if (robots):
check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
p = re.compile('^User-Agent:\sECCS\sDisallow:\s\/\s*$', re.MULTILINE)
m = p.search(robots.text)
if (m):
header = "<h1>IdP excluded from check by robots.txt</h1>"
pgsrc = ""
store_page_source(page_source,idp,sp,test,pgsrc,header)
return (idp['entityID'],sp['entityID'],check_time,"DISABLED",webdriver_error)
try:
# WebDriver MUST be instanced here to avoid problems with SESSION
driver = get_driver_selenium(idp,sp,e_p.ECCS_SELENIUMDEBUG)
# Exception of WebDriver raises
if (driver == None):
sys.stderr.write(f"get_driver_selenium() returned None for IDP {idp['entityID']}(SHA1: {sha1(idp['entityID'])}) with SP {get_label(sp['entityID'])}")
return None
driver.set_page_load_timeout(e_p.ECCS_SELENIUMPAGELOADTIMEOUT)
driver.set_script_timeout(e_p.ECCS_SELENIUMSCRIPTTIMEOUT)
check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
driver.get(saml_request_url)
pgsrc = driver.page_source
# Support HTTP Basic Authentication
unauthorized = re.search('401.(\D.|\s.)?Unauthorized', pgsrc, re.IGNORECASE)
if (unauthorized):
if (test):
header = f"\n[SP] {sp['entityID']} - 401 UNATHORIZED FOUND\n\n[PAGE_SOURCE]\n"
else:
header = f"<h1>SP {sp['entityID']} - 401 UNAUTHORIZED FOUND</h1><h2>[PAGE_SOURCE]</h2>"
stored = store_page_source(idp,sp,test,pgsrc,header)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"OK",webdriver_error)
metadata_not_found = re.search(e_p.METADATAPATTERN, pgsrc, re.IGNORECASE)
if (metadata_not_found):
if (test):
header = f"\n[SP] {sp['entityID']} - METADATA NOT FOUND\n[PAGE_SOURCE]\n"
else:
header = f"<h1>SP {sp['entityID']} - METADATA NOT FOUND</h1><h2>[PAGE_SOURCE]</h2>"
stored = store_page_source(idp,sp,test,pgsrc,header)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"No-SP-Metadata-Error",webdriver_error)
idp_error = re.search(e_p.IDPERROR, pgsrc, re.IGNORECASE)
if (idp_error):
if (test):
header = f"\n[SP] {sp['entityID']} - IDP ERROR\n[PAGE_SOURCE]\n"
else:
header = f"<h1>SP {sp['entityID']} - IDP Error</h1><h2>[PAGE_SOURCE]</h2>"
stored = store_page_source(idp,sp,test,pgsrc,header)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"IdP-Generic-Error",webdriver_error)
load_js = re.search(e_p.JAVASCRIPT, pgsrc, re.IGNORECASE)
if (load_js):
driver.refresh()
# If meet <iframe> follow all iframes
if ('<iframe' in pgsrc):
pwd_found = re.search(e_p.PASSWORDPATTERN, pgsrc, re.IGNORECASE)
if (not pwd_found):
follow_all_nested_iframes(driver)
WebDriverWait(driver, e_p.ECCS_SELENIUMPAGELOADTIMEOUT).until(
EC.presence_of_element_located((By.XPATH,e_p.XPATH_CHECK_PATTERN))
)
if (test):
pgsrc = f"\n[SP] {sp['entityID']} - [IDP] {idp['entityID']} - OK"
else:
pgsrc = driver.page_source
stored = store_page_source(idp,sp,test,pgsrc)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"OK",webdriver_error)
except TimeoutException as e:
pgsrc = driver.page_source
metadata_not_found = re.search(e_p.METADATAPATTERN, pgsrc, re.IGNORECASE)
try:
input_xpath_found = driver.find_element(By.XPATH, e_p.XPATH_CHECK_PATTERN)
except NoSuchElementException as e:
exception_msg = ""
# This IF is for those IdP that doesn't consuming the eduGAIN metadata and reaching Timeout
if (metadata_not_found):
if (test):
header = f"\n[SP] {sp['entityID']} - METADATA NOT FOUND\n[PAGE_SOURCE]\n"
else:
header = f"<h1>SP {sp['entityID']} - METADATA NOT FOUND</h1><h2>[PAGE_SOURCE]</h2>"
stored = store_page_source(idp,sp,test,pgsrc,header)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"No-SP-Metadata-Error",webdriver_error)
else:
try:
response = requests.get(f"{driver.current_url}", timeout=e_p.ECCS_REQUESTSTIMEOUT)
if (response.status_code == 401):
if (test):
header = f"\nHTTP Basic Authentication found\n[URL]{driver.current_url} - 401 STATUS CODE FOUND"
else:
header = f"<h1>401 HTTP Basic Authentication found</h1><h2>[PAGE_SOURCE]</h2>"
pgsrc = ""
stored = store_page_source(idp,sp,test,pgsrc,header)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"OK",webdriver_error)
if (response.status_code == 403):
if (test):
header = f"\nForbidden found\n[URL]{driver.current_url} - 403 STATUS CODE FOUND"
else:
header = f"<h1>403 Forbidden found</h1><h2>[PAGE_SOURCE]</h2>"
pgsrc = ""
stored = store_page_source(idp,sp,test,pgsrc,header)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"403-Forbidden",webdriver_error)
except requests.exceptions.SSLError as e:
if ('unable to get local issuer certificate' not in str(e)):
if (test):
header = f"\nAn SSL Error occurred while opening https://{fqdn_idp}/robots.txt:\n\n{e}\n\nCheck it on SSL Labs: https://www.ssllabs.com/ssltest/analyze.html?d={fqdn_idp}\n[PAGE_SOURCE]\n"
else:
header = f"<h1>SSL ERROR</h1><h2>An SSL error occurred for the server {fqdn_idp}:</h2><p>{e}</p><p>Check it on SSL Labs: <a href='https://www.ssllabs.com/ssltest/analyze.html?d={fqdn_idp}'>Click Here</a></p><h3>[PAGE_SOURCE]</h3>"
pgsrc = ""
store_page_source(idp,sp,test,pgsrc,header)
return (idp['entityID'],sp['entityID'],check_time,"SSL-Error",webdriver_error)
else:
pass
except Exception as e:
exception_msg = e
pass # ignore all requests exceptions
# IdPs that do not show a Metadata error after reaching the Timeout and that raise an Exception on the "request"
if (pgsrc != "<html><head></head><body></body></html>" or pgsrc != ""):
if (test):
header = f"\nUnable-To-Check: ECCS can't check the IdP login for {sp['entityID']}.\nError Message: {exception_msg}\n[PAGE_SOURCE]\n{pgsrc}"
else:
header = f"<h1>Unable To Check<h1><h2>ECCS can't check the IdP login for {sp['entityID']}</h2><h3>Error Message:</h3>{exception_msg}<br/><h3>[PAGE SOURCE]</h3>"
stored = store_page_source(idp,sp,test,pgsrc,header)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"Unable-To-Check",webdriver_error)
else:
if (test):
header = f"\nTimeout: No valid login form loaded in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds.\n[PAGE_SOURCE]\n{pgsrc}"
else:
header = f"<h1>Timeout</h1><h2>No valid login form found in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds.</h2><h3>[PAGE_SOURCE]</h3>"
stored = store_page_source(idp,sp,test,pgsrc,header)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"Timeout",webdriver_error)
# Exceptions that are not "NoSuchElementExceptions"
except e:
if (test):
header = f"\nTimeout: No valid login form loaded in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds.\n[PAGE_SOURCE]\n{pgsrc}"
else:
header = f"<h1>Timeout - No valid login form found in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds.</h1><h2>[PAGE_SOURCE]</h2>"
pgsrc = ""
stored = store_page_source(idp,sp,test,pgsrc,header)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"Timeout",webdriver_error)
# input_xpath has been found
# This IF is for those IdPs that Timeout is caused by an image or other that do not prevent the Login process.
if (test):
header = f"\nTimeout but IdP Login found\n[PAGE_SOURCE]\n"
else:
header = f"<h1>Timeout due to a media - But IdP Login is provided.</h1><h2>[PAGE_SOURCE]</h2>"
stored = store_page_source(idp,sp,test,pgsrc,header)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"OK",webdriver_error)
except WebDriverException as e:
error = e.__dict__['msg'].split('(')[0].rstrip()
if (test):
header = f"\nA Connection error occurred while opening {generate_login_url(sp['entityID'], sp['http_post_acs_location'], idp['Location'])}:\n\n{error}"
else:
header = f"<h1>CONNECTION ERROR</h1><h2>A Connection error occurred while opening <a href='{generate_login_url(sp['entityID'], sp['http_post_acs_location'], idp['Location'])}'>SAML Request URL</a>:</h2><p>{error}</p>"
webdriver_error = 1
pgsrc = ""
stored = store_page_source(idp,sp,test,pgsrc,header)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"Connection-Error",webdriver_error)
finally:
driver.quit()
def delete_line_with_word(filepath,word):
"""Deletes a line from a file by providing a word
:param filepath: file where the line has to be removed
:param word: the word that identify the line to remove
"""
import os.path
if os.path.isfile(filepath):
with open(filepath, "r") as f:
lines = f.readlines()
with open(filepath, "w") as f:
for line in lines:
if word not in line:
f.write(line)