-
Marco Malavolti authoredMarco Malavolti authored
utils.py 20.90 KiB
#!/usr/bin/env python3
import base64
import datetime
import json
import logging
import pathlib
import re
import requests
import six
import sys
import shutil
import time
import uuid
import zlib
import eccs_properties as e_p
from selenium import webdriver
from selenium.common.exceptions import WebDriverException,TimeoutException,NoSuchElementException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from logging.handlers import RotatingFileHandler
from urllib3.util import parse_url
from urllib.parse import urlparse, urlencode
def sha1(idp_entity_id):
"""Returns the SHA1 of the input value
:param idp_entity_id: input value
:return: SHA1 of the entityID
:rtype: string
"""
import hashlib
result = hashlib.sha1(idp_entity_id.encode())
return result.hexdigest()
def get_label(url_or_urn):
"""Returns a label usable for filenames
:param url_or_urn: input value
:return: a label
:rtype: string
"""
if url_or_urn.startswith('http'):
return parse_url(url_or_urn)[2]
else:
return parse_url(url_or_urn)[4].lstrip('/')
def get_reg_auth_dict(list_feds):
"""Returns a dictionary of Federation and their registration authorities
:param list_feds: Python list containing all federations
:return: a dictionary of '{ nameFed:reg_auth }'
:rtype: dict
"""
regAuth_dict = {}
for key,value in list_feds.items():
name = value['name']
reg_auth = value['reg_auth']
regAuth_dict[name] = reg_auth
return regAuth_dict
def get_idp_list(list_eccs_idps,reg_auth=None,idp_entityid=None):
"""Returns a list of IdPs or a list of only one IdP
- Only one if 'idp_entityid' parameter is used
- All IdPs of a specific Federation if 'reg_auth' parameter is used
- All eduGAIN IdPs if no parameter is used
:param list_eccs_idps: Python list containing all IdPs provided to ECCS
:param reg_auth: a string containing a registration authority URL
:param idp_entityid: a string containing an IdP entityID
:return: a list of IdPs
:rtype: list
"""
fed_idp_list = []
for idp in list_eccs_idps:
if (idp_entityid):
if (idp['entityID'] == idp_entityid):
fed_idp_list.append(idp)
elif (reg_auth):
if (idp['registrationAuthority'] == reg_auth):
fed_idp_list.append(idp)
else:
fed_idp_list.append(idp)
return fed_idp_list
def get_list_from_url(url, dest_file):
"""Downloads the content of a web page retrieved by its url into a directory and then use the saved file to return a Python list of the converted version of its content
:param url: URL to download
:param des_file: destination file
:return: the resulting Python list
:rtype: list
"""
from pathlib import Path
path = pathlib.Path(dest_file)
if(path.exists() == False):
with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f:
f.write(requests.get(url).text)
with open("%s" % (dest_file), mode="r", encoding='utf-8') as f:
return json.loads(f.read().replace("'","'"))
def get_logger(path, filename, mode="a", log_level="DEBUG"):
"""Returns a logger used by ECCS API Development Server to create its log file
:param path: directory path of the files
:param filename: filename of the new file created
:param mode: a(append),w(write),r(read)
:param log_level: log level to use (DEBUG,INFO,WARN,ERROR,CRITICAL)
:return: a logger
:rtype: logger object
"""
logger = logging.getLogger(__name__)
ch = logging.handlers.RotatingFileHandler(f"{path}/{filename}", mode, 0, 5, 'utf-8')
if (log_level == "DEBUG"):
logger.setLevel(logging.DEBUG)
ch.setLevel(logging.DEBUG)
elif (log_level == "INFO"):
logger.setLevel(logging.INFO)
ch.setLevel(logging.INFO)
elif (log_level == "WARN"):
logger.setLevel(logging.WARN)
ch.setLevel(logging.WARN)
elif (log_level == "ERROR"):
logger.setLevel(logging.ERROR)
ch.setLevel(logging.ERROR)
elif (log_level == "CRITICAL"):
logger.setLevel(logging.CRITICAL)
ch.setLevel(logging.CRITICAL)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(module)s - %(message)s', datefmt='%d/%m/%Y %H:%M:%S')
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
def get_idp_contacts(idp,contactType):
"""Returns a list of email address for a specific type of contact
:param idp: dictionary containing the IdP info
:param contactType: type of contact to consider
:return: a python list containing all contacts email addresses
:rtype: list
"""
ctcList = []
for ctcType in idp['contacts']:
if (ctcType == contactType):
for ctc in idp['contacts'][contactType]:
if (ctc.get('emailOrPhone')):
if (ctc['emailOrPhone'].get('EmailAddress')):
ctcList.append(ctc['emailOrPhone']['EmailAddress'][0])
else:
ctcList.append('missing email')
else:
ctcList.append('missing email')
return ctcList
def store_page_source(page_source,idp,sp,test):
"""Writes the login page source into a specific file
:param page_souce: content to write into a file
:param idp: idp owner of the page source
:param sp: sp who has been used
:param test: flag needed to decide if write on the console instead of on the file
:return: True or False
:rtype: boolean
"""
if (test):
sys.stdout.write(f"{page_source}")
return True
else:
# Put the page_source into an appropriate HTML file
with open(f"{e_p.ECCS_HTMLDIR}/{e_p.DAY}/{sha1(idp['entityID'])}---{get_label(sp['entityID'])}.html","w") as html:
try:
html.write(page_source)
return True
except IOError:
return False
# Get the Google Chrome Selenium Driver
def get_driver_selenium(idp=None,sp=None,debugSelenium=False):
"""Returns driver needed to perform the ECCS check
:param idp: the idp need to be checked
:param sp: the sp used to check the idp
:param debugSelenium: a flag needed to enable a more verbose logging
:return: selenium driver
:rtype: object
"""
# Configure Web-driver
# https://peter.sh/experiments/chromium-command-line-switches/
chrome_options = Options()
chrome_options.page_load_strategy = 'normal'
#chrome_options.add_argument('--start-in-incognito')
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--disable-extensions')
chrome_options.add_argument('--disable-dinosaur-easter-egg')
chrome_options.add_argument('--disable-sync')
# For DEBUG only (By default ChromeDriver logs only warnings/errors to stderr.
# When debugging issues, it is helpful to enable more verbose logging.)
if (debugSelenium):
label_idp = get_label(idp['entityID'])
label_sp = get_label(sp['entityID'])
sha1_idp = sha1(idp['entityID'])
try:
driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options, service_args=['--verbose', f'--log-path={e_p.ECCS_SELENIUMLOGDIR}/{sha1_idp}_{label_idp}_{label_sp}.log'])
except: # Wait 3 seconds before try again to get the webdriver for all kind of exception will occur
time.sleep(3)
driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options, service_args=['--verbose', f'--log-path={e_p.ECCS_SELENIUMLOGDIR}/{sha1_idp}_{label_idp}_{label_sp}.log'])
else:
try:
driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options)
except: # Wait 3 seconds before try again to get the webdriver for all kind of exception will occur
time.sleep(3)
driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options)
return driver
def follow_all_nested_iframes(driver):
"""Returns the page source reached by following all the first <iframe> found on the web pages
:param driver: the selenium driver
:return: the IdP login page source reached
:rtype: string
"""
try:
while (driver.find_element(By.XPATH,'//iframe')):
driver.switch_to.frame(0)
except NoSuchElementException:
return driver.page_source
def deflate_and_base64_encode(string_val):
"""Deflates and the base64 encodes a string
:param string_val: The string to deflate and encode
:return: The deflated and encoded string
"""
if not isinstance(string_val, six.binary_type):
string_val = string_val.encode('utf-8')
return base64.b64encode(zlib.compress(string_val)[2:-4])
def generate_login_url(sp_entity_id, sp_http_post_acs_location, idp_http_redirect_sso_location):
"""Returns a login url needed to get the IdP Login page for an SP
:param sp_entity_id: the SP entityID
:param sp_http_post_acs_location: the SP HTTP-POST AssertionConsumerService URL
:param idp_http_redirect_sso_location: the IDP HTTP-Redirect SSO Location
:return: a login url built upon a SAML Authn Request
:rtype: string
"""
authn_request_id = f'_{str(uuid.uuid4()).replace("-", "")}'
issue_instant = str(datetime.datetime.now(datetime.timezone.utc).isoformat(timespec='seconds')).replace('+00:00', 'Z')
authn_request = '<samlp:AuthnRequest xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol" ' \
f'AssertionConsumerServiceURL="{sp_http_post_acs_location}" ' \
f'Destination="{idp_http_redirect_sso_location}" ' \
f'ID="{authn_request_id}" ' \
f'IssueInstant="{issue_instant}" ' \
'ProtocolBinding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" ' \
'Version="2.0">' \
f'<saml:Issuer xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion">{sp_entity_id}</saml:Issuer>' \
'<samlp:NameIDPolicy AllowCreate="1"/>' \
'</samlp:AuthnRequest>'
args = {"SAMLRequest": deflate_and_base64_encode(authn_request)}
string = urlencode(args)
glue_char = "&" if urlparse(idp_http_redirect_sso_location).query else "?"
return glue_char.join([idp_http_redirect_sso_location, string])
# ECCS Check made by Selenium
def check_idp_response_selenium(sp,idp,test):
"""Performs the ECCS check on an IdP
:param sp: the SP used to test the IDP
:param idp: the IdP to test
:param test: a flag to perform a check without changes
"""
# Common variables
fqdn_idp = get_label(idp['Location'])
saml_request_url = generate_login_url(sp['entityID'], sp['http_post_acs_location'], idp['Location'])
robots = ""
federations_disabled_dict = e_p.FEDS_DISABLED_DICT
idps_disabled_dict = e_p.IDPS_DISABLED_DICT
webdriver_error = 0 # No WebDriver Error
# Handle Disabled Idps/Federations
if (idp['registrationAuthority'] in federations_disabled_dict.keys()):
check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
page_source = federations_disabled_dict[idp['registrationAuthority']]
store_page_source(page_source,idp,sp,test)
return (idp['entityID'],sp['entityID'],check_time,"DISABLED",webdriver_error)
if (idp['entityID'] in idps_disabled_dict.keys()):
check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
page_source = idps_disabled_dict[idp['entityID']]
store_page_source(page_source,idp,sp,test)
return (idp['entityID'],sp['entityID'],check_time,"DISABLED",webdriver_error)
# Robots + SSL Check
try:
hdrs = {
'User-Agent': f'{e_p.ROBOTS_USER_AGENT}'
}
check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
robots = requests.get(f"https://{fqdn_idp}/robots.txt", headers=hdrs, verify=e_p.CA_BUNDLE_PATH, timeout=e_p.ECCS_REQUESTSTIMEOUT)
if (robots == ""):
robots = requests.get(f"http://{fqdn_idp}/robots.txt", headers=hdrs, verify=False, timeout=e_p.ECCS_REQUESTSTIMEOUT)
# Catch SSL Exceptions and block the ECCS check
except requests.exceptions.SSLError as e:
if ('unable to get local issuer certificate' not in str(e)):
if (test): page_source = f"\nAn SSL Error occurred while opening https://{fqdn_idp}/robots.txt:\n\n{e}\n\nCheck it on SSL Labs: https://www.ssllabs.com/ssltest/analyze.html?d={fqdn_idp}"
else: page_source = f"<h1>SSL ERROR</h1><h2>An SSL error occurred for the server {fqdn_idp}:</h2><p>{e}</p><p>Check it on SSL Labs: <a href='https://www.ssllabs.com/ssltest/analyze.html?d={fqdn_idp}'>Click Here</a></p>"
store_page_source(page_source,idp,sp,test)
return (idp['entityID'],sp['entityID'],check_time,"SSL-Error",webdriver_error)
else:
pass
# Do not consider any other Exception
except:
pass
if (robots):
check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
p = re.compile('^User-Agent:\sECCS\sDisallow:\s\/\s*$', re.MULTILINE)
m = p.search(robots.text)
if (m):
page_source = "<h1>IdP excluded from check by robots.txt</h1>"
store_page_source(page_source,idp,sp,test)
return (idp['entityID'],sp['entityID'],check_time,"DISABLED",webdriver_error)
try:
# WebDriver MUST be instanced here to avoid problems with SESSION
driver = get_driver_selenium(idp,sp,e_p.ECCS_SELENIUMDEBUG)
# Exception of WebDriver raises
if (driver == None):
sys.stderr.write(f"get_driver_selenium() returned None for IDP {idp['entityID']}(SHA1: {sha1(idp['entityID'])}) with SP {get_label(sp['entityID'])}")
return None
driver.set_page_load_timeout(e_p.ECCS_SELENIUMPAGELOADTIMEOUT)
driver.set_script_timeout(e_p.ECCS_SELENIUMSCRIPTTIMEOUT)
check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
driver.get(saml_request_url)
pgsrc = driver.page_source
# Support HTTP Basic Authentication
unauthorized = re.search('401.(\D.|\s.)?Unauthorized', pgsrc, re.IGNORECASE)
if (unauthorized):
if (test): pgsrc = f"\n[PAGE_SOURCE]\n{pgsrc}\n[SP] {sp['entityID']} - 401 UNAUTHORIZED FOUND"
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"OK",webdriver_error)
metadata_not_found = re.search(e_p.METADATAPATTERN, pgsrc, re.IGNORECASE)
if (metadata_not_found):
if (test): pgsrc = f"\n[PAGE_SOURCE]\n{pgsrc}\n[SP] {sp['entityID']} - METADATA NOT FOUND"
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"No-eduGAIN-Metadata",webdriver_error)
idp_error = re.search(e_p.IDPERROR, pgsrc, re.IGNORECASE)
if (idp_error):
if (test): pgsrc = f"\n[PAGE_SOURCE]\n{pgsrc}\n[SP] {sp['entityID']} - IDP ERROR"
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"IdP-Error",webdriver_error)
load_js = re.search(e_p.JAVASCRIPT, pgsrc, re.IGNORECASE)
if (load_js):
driver.refresh()
# If meet <iframe> follow all iframes
if ('<iframe' in pgsrc):
pwd_regexp = e_p.PASSWORDPATTERN
pwd_found = re.search(pwd_regexp,pgsrc, re.IGNORECASE)
if (not pwd_found):
follow_all_nested_iframes(driver)
WebDriverWait(driver, e_p.ECCS_SELENIUMPAGELOADTIMEOUT).until(
EC.presence_of_element_located((By.XPATH,e_p.XPATH_CHECK_PATTERN))
)
if (test): pgsrc = f"\n[SP] {sp['entityID']} - [IDP] {idp['entityID']} - OK"
stored = store_page_source(driver.page_source,idp,sp,test)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"OK",webdriver_error)
except TimeoutException as e:
pgsrc = driver.page_source
metadata_not_found = re.search(e_p.METADATAPATTERN, pgsrc, re.IGNORECASE)
try:
input_xpath_found = driver.find_element(By.XPATH, e_p.XPATH_CHECK_PATTERN)
except NoSuchElementException as e:
# This IF is for those IdP that doesn't consuming the eduGAIN metadata and reaching Timeout
if (metadata_not_found):
if (test): pgsrc = f"\n[PAGE_SOURCE]\n{pgsrc}\n[SP] {sp['entityID']} - METADATA NOT FOUND"
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"No-eduGAIN-Metadata",webdriver_error)
else:
try:
response = requests.get(f"{driver.current_url}", timeout=e_p.ECCS_REQUESTSTIMEOUT)
if (response.status_code == 401):
if (test): pgsrc = f"\n[PAGE_SOURCE]\nHTTP Basic Authentication\n[URL]{driver.current_url} - 401 STATUS CODE FOUND"
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"OK",webdriver_error)
if (response.status_code == 403):
if (test): pgsrc = f"\n[PAGE_SOURCE]\nForbidden\n[URL]{driver.current_url} - 403 STATUS CODE FOUND"
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"IdP-Error",webdriver_error)
except:
pass # ignore all requests exceptions
# IdPs that do not show a Metadata error after reaching the Timeout and that raise an Exception on the "request"
if (pgsrc != "<html><head></head><body></body></html>" or pgsrc != ""):
if (test): pgsrc = f"\n[PAGE_SOURCE]\n{pgsrc}\nUnable-To-Check: ECCS can't check the IdP login."
else: pgsrc = f"<h1>Unable To Check - ECCS can't check the IdP login</h1><h2>IDP LOGIN PAGE SOURCE:</h2><br/>{pgsrc}"
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"Unable-To-Check",webdriver_error)
else:
if (test): pgsrc = f"\n[PAGE_SOURCE]\n{pgsrc}\nTimeout: No valid login form loaded in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds."
else: pgsrc = f"<h1>Timeout - No valid login form found in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds.</h1>"
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"Timeout",webdriver_error)
# Exceptions that are not "NoSuchElementExceptions"
except e:
if (test): pgsrc = f"\n[PAGE_SOURCE]\n{pgsrc}\nTimeout: No valid login form loaded in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds."
stored = store_page_source(f"<h1>Timeout - No valid login form found in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds.</h1><br/><p>{pgsrc}</p>",idp,sp,test)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"Timeout",webdriver_error)
# input_xpath has been found
# This IF is for those IdPs that Timeout is caused by an image or other that do not prevent the Login process.
if (test): pgsrc = f"\n[PAGE_SOURCE]\n{pgsrc} - Timeout but OK"
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"OK",webdriver_error)
except WebDriverException as e:
error = e.__dict__['msg'].split('(')[0].rstrip()
if (test): pgsrc = f"\nA Connection error occurred while opening {generate_login_url(sp['entityID'], sp['http_post_acs_location'], idp['Location'])}:\n\n{error}"
else: pgsrc = f"<h1>CONNECTION ERROR</h1><h2>A Connection error occurred while opening <a href='{generate_login_url(sp['entityID'], sp['http_post_acs_location'], idp['Location'])}'>SAML Request URL</a>:</h2><p>{error}</p>"
webdriver_error = 1
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],sp['entityID'],check_time,"Connection-Error",webdriver_error)
finally:
driver.quit()
def delete_line_with_word(filepath,word):
"""Deletes a line from a file by providing a word
:param filepath: file where the line has to be removed
:param word: the word that identify the line to remove
"""
import os.path
if os.path.isfile(filepath):
with open(filepath, "r") as f:
lines = f.readlines()
with open(filepath, "w") as f:
for line in lines:
if word not in line:
f.write(line)