-
Marco Malavolti authoredMarco Malavolti authored
utils.py 15.26 KiB
#!/usr/bin/env python3
import datetime
import json
import logging
import pathlib
import re
import requests
import sys
import shutil
import time
import eccs_properties as e_p
from selenium import webdriver
from selenium.common.exceptions import WebDriverException,TimeoutException,NoSuchElementException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from logging.handlers import RotatingFileHandler
from urllib3.util import parse_url
def sha1(idp_entity_id):
import hashlib
result = hashlib.sha1(idp_entity_id.encode())
return result.hexdigest()
# Return a label useful for a filename
def get_label(url_or_urn):
if url_or_urn.startswith('http'):
return parse_url(url_or_urn)[2]
else:
return parse_url(url_or_urn)[4].lstrip('/')
# Returns a Dict of "{ nameFed:reg_auth }"
def get_reg_auth_dict(list_feds):
regAuth_dict = {}
for key,value in list_feds.items():
name = value['name']
reg_auth = value['reg_auth']
regAuth_dict[name] = reg_auth
return regAuth_dict
# Returns a list of IdP for a single federation
def get_idp_list(list_eccs_idps,reg_auth=None,idp_entityid=None):
fed_idp_list = []
for idp in list_eccs_idps:
if (idp_entityid):
if (idp['entityID'] == idp_entityid):
fed_idp_list.append(idp)
elif (reg_auth):
if (idp['registrationAuthority'] == reg_auth):
fed_idp_list.append(idp)
else:
fed_idp_list.append(idp)
return fed_idp_list
# Download all eduGAIN Federations from URL, store them on a local file and returns a Python Dictionary
def get_list_feds(url, dest_file):
from pathlib import Path
# If file does not exists... download it into the dest_file
path = pathlib.Path(dest_file)
if(path.exists() == False):
with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f:
f.write(requests.get(url).text)
# then open it and work with local file
with open("%s" % (dest_file), mode="r", encoding='utf-8') as f:
return json.loads(f.read().replace("'","'"))
# Download all eduGAIN IdPs from URL, store them on a local file and returns a Python List
def get_list_eccs_idps(url, dest_file):
from pathlib import Path
# If file does not exists... download it into the dest_file
path = pathlib.Path(dest_file)
if(path.exists() == False):
with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f:
f.write(requests.get(url).text)
# then open it and work with local file
with open("%s" % (dest_file), mode="r", encoding='utf-8') as f:
return json.loads(f.read().replace("'","'"))
# Use logger to produce files consumed by ECCS API
def get_logger(path, filename, mode="a", log_level="DEBUG"):
logger = logging.getLogger(__name__)
ch = logging.handlers.RotatingFileHandler(f"{path}/{filename}", mode, 0, 5, 'utf-8')
if (log_level == "DEBUG"):
logger.setLevel(logging.DEBUG)
ch.setLevel(logging.DEBUG)
elif (log_level == "INFO"):
logger.setLevel(logging.INFO)
ch.setLevel(logging.INFO)
elif (log_level == "WARN"):
logger.setLevel(logging.WARN)
ch.setLevel(logging.WARN)
elif (log_level == "ERROR"):
logger.setLevel(logging.ERROR)
ch.setLevel(logging.ERROR)
elif (log_level == "CRITICAL"):
logger.setLevel(logging.CRITICAL)
ch.setLevel(logging.CRITICAL)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(module)s - %(message)s', datefmt='%d/%m/%Y %H:%M:%S')
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
# Return a list of email address for a specific type of contact
def get_idp_contacts(idp,contactType):
ctcList = []
for ctcType in idp['contacts']:
if (ctcType == contactType):
for ctc in idp['contacts'][contactType]:
if (ctc.get('emailOrPhone')):
if (ctc['emailOrPhone'].get('EmailAddress')):
ctcList.append(ctc['emailOrPhone']['EmailAddress'][0])
else:
ctcList.append('missing email')
else:
ctcList.append('missing email')
return ctcList
# Write the login page source code into its file
def store_page_source(page_source,idp,sp,test):
if (test):
sys.stdout.write(f"{page_source}")
return True
else:
# Put the page_source into an appropriate HTML file
with open(f"{e_p.ECCS_HTMLDIR}/{e_p.DAY}/{sha1(idp['entityID'])}---{get_label(sp)}.html","w") as html:
try:
html.write(page_source)
return True
except IOError:
return False
# Get the Google Chrom Selenium Driver
def get_driver_selenium(idp=None,sp=None,debugSelenium=False):
# Configure Web-driver
# https://peter.sh/experiments/chromium-command-line-switches/
chrome_options = Options()
chrome_options.page_load_strategy = 'normal'
chrome_options.add_argument('--start-in-incognito')
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--disable-extensions')
chrome_options.add_argument('--disable-dinosaur-easter-egg')
chrome_options.add_argument('--disable-sync')
# For DEBUG only (By default ChromeDriver logs only warnings/errors to stderr.
# When debugging issues, it is helpful to enable more verbose logging.)
if (debugSelenium):
label_idp = get_label(idp['entityID'])
label_sp = get_label(sp)
sha1_idp = sha1(idp['entityID'])
try:
driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options, service_args=['--verbose', f'--log-path={e_p.ECCS_SELENIUMLOGDIR}/{sha1_idp}_{label_idp}_{label_sp}.log'])
except: # Wait 3 seconds before try again to get the webdriver for all kind of exception will occur
time.sleep(3)
driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options, service_args=['--verbose', f'--log-path={e_p.ECCS_SELENIUMLOGDIR}/{sha1_idp}_{label_idp}_{label_sp}.log'])
else:
try:
driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options)
except: # Wait 3 seconds before try again to get the webdriver for all kind of exception will occur
time.sleep(3)
driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options)
return driver
def follow_all_nested_iframes(driver):
try:
while (driver.find_element(By.XPATH,'//iframe')):
driver.switch_to.frame(0)
except NoSuchElementException:
return driver.page_source
# ECCS Check made by Selenium
def check_idp_response_selenium(sp,idp,test):
# Disable SSL requests warning messages
#requests.packages.urllib3.disable_warnings()
# Common variables
fqdn_idp = get_label(idp['Location'])
wayfless_url = f"{sp}{idp['entityID']}"
robots = ""
federations_disabled_dict = e_p.FEDS_DISABLED_DICT
idps_disabled_dict = e_p.IDPS_DISABLED_DICT
webdriver_error = 0 # No WebDriver Error
# Handle Disabled Idps/Federations
if (idp['registrationAuthority'] in federations_disabled_dict.keys()):
check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
page_source = federations_disabled_dict[idp['registrationAuthority']]
store_page_source(page_source,idp,sp,test)
return (idp['entityID'],wayfless_url,check_time,"DISABLED",webdriver_error)
if (idp['entityID'] in idps_disabled_dict.keys()):
check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
page_source = idps_disabled_dict[idp['entityID']]
store_page_source(page_source,idp,sp,test)
return (idp['entityID'],wayfless_url,check_time,"DISABLED",webdriver_error)
# Robots + SSL Check
try:
hdrs = {
'User-Agent': f'{e_p.ROBOTS_USER_AGENT}'
}
check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
robots = requests.get(f"https://{fqdn_idp}/robots.txt", headers=hdrs, verify=True, timeout=e_p.ECCS_REQUESTSTIMEOUT)
if (robots == ""):
robots = requests.get(f"http://{fqdn_idp}/robots.txt", headers=hdrs, verify=False, timeout=e_p.ECCS_REQUESTSTIMEOUT)
# Catch SSL Exceptions and block the ECCS check
except requests.exceptions.SSLError as e:
if (test): page_source = f"\nAn SSL Error occurred while opening https://{fqdn_idp}/robots.txt:\n\n{e}\n\nCheck it on SSL Labs: https://www.ssllabs.com/ssltest/analyze.html?d={fqdn_idp}"
else: page_source = f"<h1>SSL ERROR</h1><h2>An SSL error occurred for the server {fqdn_idp}:</h2><p>{e}</p><p>Check it on SSL Labs: <a href='https://www.ssllabs.com/ssltest/analyze.html?d={fqdn_idp}'>Click Here</a></p>"
store_page_source(page_source,idp,sp,test)
return (idp['entityID'],wayfless_url,check_time,"SSL-Error",webdriver_error)
# Do not consider any other Exception
except:
pass
if (robots):
check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
p = re.compile('^User-Agent:\sECCS\sDisallow:\s\/\s*$', re.MULTILINE)
m = p.search(robots.text)
if (m):
page_source = "<h1>IdP excluded from check by robots.txt</h1>"
store_page_source(page_source,idp,sp,test)
return (idp['entityID'],wayfless_url,check_time,"DISABLED",webdriver_error)
try:
# WebDriver MUST be instanced here to avoid problems with SESSION
driver = get_driver_selenium(idp,sp,e_p.ECCS_SELENIUMDEBUG)
# Exception of WebDriver raises
if (driver == None):
sys.stderr.write(f"get_driver_selenium() returned None for IDP {idp['entityID']}(SHA1: {sha1(idp['entityID'])}) with SP {get_label(sp)}")
return None
driver.set_page_load_timeout(e_p.ECCS_SELENIUMPAGELOADTIMEOUT)
driver.set_script_timeout(e_p.ECCS_SELENIUMSCRIPTTIMEOUT)
check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
driver.get(wayfless_url)
metadata_not_found = re.search(e_p.METADATAPATTERN,driver.page_source, re.I)
idp_error = re.search(e_p.IDPERROR,driver.page_source, re.I)
if (metadata_not_found):
if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\n[WAYFLESS URL]{wayfless_url} - METADATA NOT FOUND"
else: pgsrc = driver.page_source
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],wayfless_url,check_time,"No-eduGAIN-Metadata",webdriver_error)
if (idp_error):
if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\n[WAYFLESS URL]{wayfless_url} - IDP ERROR"
else: pgsrc = driver.page_source
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],wayfless_url,check_time,"IdP-Error",webdriver_error)
# If meet <iframe> follow all iframes
if ('<iframe' in driver.page_source):
follow_all_nested_iframes(driver)
driver.refresh()
WebDriverWait(driver, e_p.ECCS_SELENIUMPAGELOADTIMEOUT).until(
EC.presence_of_element_located((By.XPATH,'//input[@type="password"]'))
)
if (test): pgsrc = f"\n[WAYFLESS_URL]\n{wayfless_url} - OK"
else: pgsrc = driver.page_source
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],wayfless_url,check_time,"OK",webdriver_error)
except TimeoutException as e:
metadata_not_found = re.search(e_p.METADATAPATTERN,driver.page_source, re.I)
try:
input_password_found = driver.find_element(By.XPATH,'//input[@type="password"]')
except NoSuchElementException as e:
# This IF is for those IdP that doesn't consuming the eduGAIN metadata and reaching Timeout
if (metadata_not_found):
if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\n[WAYFLESS URL]{wayfless_url} - METADATA NOT FOUND"
else: pgsrc = driver.page_source
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],wayfless_url,check_time,"No-eduGAIN-Metadata",webdriver_error)
elif(driver.page_source != "<html><head></head><body></body></html>"):
if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\nInvalid-Form: No valid login form found in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds"
else: pgsrc = f"<h1>Invalid Form: no valid login form found in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds</h1><h2>PAGE SOURCE:</h2><br/>{driver.page_source}"
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],wayfless_url,check_time,"Invalid-Form",webdriver_error)
else:
if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\nTimeout: No valid login form loaded in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds."
else: pgsrc = f"<h1>Timeout - No valid login form found in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds.</h1><h2>PAGE SOURCE:</h2><br/>{driver.page_source}"
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],wayfless_url,check_time,"Timeout",webdriver_error)
except e:
if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\nTimeout: No valid login form loaded in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds."
else: pgsrc = driver.page_source
stored = store_page_source(f"<h1>Timeout - No valid login form found in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds.</h1><br/><p>{pgsrc}</p>",idp,sp,test)
if (stored):
return (idp['entityID'],wayfless_url,check_time,"Timeout",webdriver_error)
# <input type="password"> found
# This IF is for those IdPs that Timeout is caused by an image or other that do not prevent the Login process.
if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source} - Timeout but OK"
else: pgsrc = driver.page_source
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],wayfless_url,check_time,"OK",webdriver_error)
except WebDriverException as e:
error = e.__dict__['msg'].split('(')[0].rstrip()
if (test): pgsrc = f"\nA Connection error occurred while opening {wayfless_url}:\n\n{error}"
else: pgsrc = f"<h1>CONNECTION ERROR</h1><h2>A Connection error occurred while opening <a href='{wayfless_url}'>{wayfless_url}</a>:</h2><p>{error}</p>"
webdriver_error = 1
stored = store_page_source(pgsrc,idp,sp,test)
if (stored):
return (idp['entityID'],wayfless_url,check_time,"Connection-Error",webdriver_error)
finally:
driver.quit()
def delete_line_with_word(filepath,word):
import os.path
if os.path.isfile(filepath):
with open(filepath, "r") as f:
lines = f.readlines()
with open(filepath, "w") as f:
for line in lines:
if word not in line:
f.write(line)