#!/usr/bin/env python3 import datetime import json import logging import pathlib import re import requests import sys import shutil import time import eccs2properties as e2p from selenium import webdriver from selenium.common.exceptions import WebDriverException,TimeoutException,NoSuchElementException from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options from logging.handlers import RotatingFileHandler from urllib3.util import parse_url def sha1(idp_entity_id): import hashlib result = hashlib.sha1(idp_entity_id.encode()) return result.hexdigest() # Return a label useful for a filename def get_label(url_or_urn): if url_or_urn.startswith('http'): return parse_url(url_or_urn)[2] else: return parse_url(url_or_urn)[4].lstrip('/') # Returns a Dict of "{ nameFed:reg_auth }" def get_reg_auth_dict(list_feds): regAuth_dict = {} for key,value in list_feds.items(): name = value['name'] reg_auth = value['reg_auth'] regAuth_dict[name] = reg_auth return regAuth_dict # Returns a list of IdP for a single federation def get_idp_list(list_eccs_idps,reg_auth=None,idp_entityid=None): fed_idp_list = [] for idp in list_eccs_idps: if (idp_entityid): if (idp['entityID'] == idp_entityid): fed_idp_list.append(idp) elif (reg_auth): if (idp['registrationAuthority'] == reg_auth): fed_idp_list.append(idp) else: fed_idp_list.append(idp) return fed_idp_list # Download all eduGAIN Federations from URL, store them on a local file and returns a Python Dictionary def get_list_feds(url, dest_file): from pathlib import Path # If file does not exists... download it into the dest_file path = pathlib.Path(dest_file) if(path.exists() == False): with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f: f.write(requests.get(url).text) # then open it and work with local file with open("%s" % (dest_file), mode="r", encoding='utf-8') as f: return json.loads(f.read().replace("'","'")) # Download all eduGAIN IdPs from URL, store them on a local file and returns a Python List def get_list_eccs_idps(url, dest_file): from pathlib import Path # If file does not exists... download it into the dest_file path = pathlib.Path(dest_file) if(path.exists() == False): with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f: f.write(requests.get(url).text) # then open it and work with local file with open("%s" % (dest_file), mode="r", encoding='utf-8') as f: return json.loads(f.read().replace("'","'")) # Use logger to produce files consumed by ECCS-2 API def get_logger(path, filename, mode="a", log_level="DEBUG"): logger = logging.getLogger(__name__) ch = logging.handlers.RotatingFileHandler(f"{path}/{filename}", mode, 0, 5, 'utf-8') if (log_level == "DEBUG"): logger.setLevel(logging.DEBUG) ch.setLevel(logging.DEBUG) elif (log_level == "INFO"): logger.setLevel(logging.INFO) ch.setLevel(logging.INFO) elif (log_level == "WARN"): logger.setLevel(logging.WARN) ch.setLevel(logging.WARN) elif (log_level == "ERROR"): logger.setLevel(logging.ERROR) ch.setLevel(logging.ERROR) elif (log_level == "CRITICAL"): logger.setLevel(logging.CRITICAL) ch.setLevel(logging.CRITICAL) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(module)s - %(message)s', datefmt='%d/%m/%Y %H:%M:%S') ch.setFormatter(formatter) logger.addHandler(ch) return logger # Return a list of email address for a specific type of contact def get_idp_contacts(idp,contactType): ctcList = [] for ctcType in idp['contacts']: if (ctcType == contactType): for ctc in idp['contacts'][contactType]: if (ctc.get('emailOrPhone')): if (ctc['emailOrPhone'].get('EmailAddress')): ctcList.append(ctc['emailOrPhone']['EmailAddress'][0]) else: ctcList.append('missing email') else: ctcList.append('missing email') return ctcList # Write the login page source code into its file def store_page_source(page_source,idp,sp,test): if (test): sys.stdout.write(f"{page_source}") return True else: # Put the page_source into an appropriate HTML file with open(f"{e2p.ECCS2HTMLDIR}/{e2p.DAY}/{sha1(idp['entityID'])}---{get_label(sp)}.html","w") as html: try: html.write(page_source) return True except IOError: return False # Get the Google Chrom Selenium Driver def get_driver_selenium(idp=None,sp=None,debugSelenium=False): # Configure Web-driver # https://peter.sh/experiments/chromium-command-line-switches/ chrome_options = Options() chrome_options.page_load_strategy = 'normal' chrome_options.add_argument('--start-in-incognito') chrome_options.add_argument('--headless') chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--disable-extensions') chrome_options.add_argument('--disable-dinosaur-easter-egg') chrome_options.add_argument('--disable-sync') # For DEBUG only (By default ChromeDriver logs only warnings/errors to stderr. # When debugging issues, it is helpful to enable more verbose logging.) if (debugSelenium): label_idp = get_label(idp['entityID']) label_sp = get_label(sp) sha1_idp = sha1(idp['entityID']) try: driver = webdriver.Chrome(e2p.PATHCHROMEDRIVER, options=chrome_options, service_args=['--verbose', f'--log-path={e2p.ECCS2SELENIUMLOGDIR}/{sha1_idp}_{label_idp}_{label_sp}.log']) except: # Wait 3 seconds before try again to get the webdriver for all kind of exception will occur time.sleep(3) driver = webdriver.Chrome(e2p.PATHCHROMEDRIVER, options=chrome_options, service_args=['--verbose', f'--log-path={e2p.ECCS2SELENIUMLOGDIR}/{sha1_idp}_{label_idp}_{label_sp}.log']) else: try: driver = webdriver.Chrome(e2p.PATHCHROMEDRIVER, options=chrome_options) except: # Wait 3 seconds before try again to get the webdriver for all kind of exception will occur time.sleep(3) driver = webdriver.Chrome(e2p.PATHCHROMEDRIVER, options=chrome_options) return driver def follow_all_nested_iframes(driver): try: while (driver.find_element(By.XPATH,'//iframe')): driver.switch_to.frame(0) except NoSuchElementException: return driver.page_source # ECCS2 Check made by Selenium def check_idp_response_selenium(sp,idp,test): # Disable SSL requests warning messages #requests.packages.urllib3.disable_warnings() # Common variables fqdn_idp = get_label(idp['Location']) wayfless_url = f"{sp}{idp['entityID']}" robots = "" federations_disabled_dict = e2p.FEDS_DISABLED_DICT idps_disabled_dict = e2p.IDPS_DISABLED_DICT webdriver_error = 0 # No WebDriver Error # Handle Disabled Idps/Federations if (idp['registrationAuthority'] in federations_disabled_dict.keys()): check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z' page_source = federations_disabled_dict[idp['registrationAuthority']] store_page_source(page_source,idp,sp,test) return (idp['entityID'],wayfless_url,check_time,"DISABLED",webdriver_error) if (idp['entityID'] in idps_disabled_dict.keys()): check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z' page_source = idps_disabled_dict[idp['entityID']] store_page_source(page_source,idp,sp,test) return (idp['entityID'],wayfless_url,check_time,"DISABLED",webdriver_error) # Robots + SSL Check try: hdrs = { 'User-Agent': f'{e2p.ROBOTS_USER_AGENT}' } check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z' robots = requests.get(f"https://{fqdn_idp}/robots.txt", headers=hdrs, verify=True, timeout=e2p.ECCS2REQUESTSTIMEOUT) if (robots == ""): robots = requests.get(f"http://{fqdn_idp}/robots.txt", headers=hdrs, verify=False, timeout=e2p.ECCS2REQUESTSTIMEOUT) # Catch SSL Exceptions and block the ECCS check except requests.exceptions.SSLError as e: if (test): page_source = f"\nAn SSL Error occurred while opening https://{fqdn_idp}/robots.txt:\n\n{e}\n\nCheck it on SSL Labs: https://www.ssllabs.com/ssltest/analyze.html?d={fqdn_idp}" else: page_source = f"<h1>SSL ERROR</h1><h2>An SSL error occurred for the server {fqdn_idp}:</h2><p>{e}</p><p>Check it on SSL Labs: <a href='https://www.ssllabs.com/ssltest/analyze.html?d={fqdn_idp}'>Click Here</a></p>" store_page_source(page_source,idp,sp,test) return (idp['entityID'],wayfless_url,check_time,"SSL-Error",webdriver_error) # Do not consider any other Exception except: pass if (robots): check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z' p = re.compile('^User-Agent:\sECCS\sDisallow:\s\/\s*$', re.MULTILINE) m = p.search(robots.text) if (m): page_source = "<h1>IdP excluded from check by robots.txt</h1>" store_page_source(page_source,idp,sp,test) return (idp['entityID'],wayfless_url,check_time,"DISABLED",webdriver_error) try: # WebDriver MUST be instanced here to avoid problems with SESSION driver = get_driver_selenium(idp,sp,e2p.ECCS2SELENIUMDEBUG) # Exception of WebDriver raises if (driver == None): sys.stderr.write(f"get_driver_selenium() returned None for IDP {idp['entityID']}(SHA1: {sha1(idp['entityID'])}) with SP {get_label(sp)}") return None driver.set_page_load_timeout(e2p.ECCS2SELENIUMPAGELOADTIMEOUT) driver.set_script_timeout(e2p.ECCS2SELENIUMSCRIPTTIMEOUT) check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z' driver.get(wayfless_url) metadata_not_found = re.search(e2p.METADATAPATTERN,driver.page_source, re.I) if (metadata_not_found): if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\n[WAYFLESS URL]{wayfless_url} - METADATA NOT FOUND" else: pgsrc = driver.page_source stored = store_page_source(pgsrc,idp,sp,test) if (stored): return (idp['entityID'],wayfless_url,check_time,"No-eduGAIN-Metadata",webdriver_error) # If meet <iframe> follow all iframes if ('<iframe' in driver.page_source): follow_all_nested_iframes(driver) WebDriverWait(driver, e2p.ECCS2SELENIUMPAGELOADTIMEOUT).until( EC.presence_of_element_located((By.XPATH,'//input[@type="password"]')) ) if (test): pgsrc = f"\n[WAYFLESS_URL]\n{wayfless_url} - OK" else: pgsrc = driver.page_source stored = store_page_source(pgsrc,idp,sp,test) if (stored): return (idp['entityID'],wayfless_url,check_time,"OK",webdriver_error) except TimeoutException as e: metadata_not_found = re.search(e2p.METADATAPATTERN,driver.page_source, re.I) try: input_password_found = driver.find_element(By.XPATH,'//input[@type="password"]') except NoSuchElementException as e: # This IF is for those IdP that doesn't consuming the eduGAIN metadata and reaching Timeout if (metadata_not_found): if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\n[WAYFLESS URL]{wayfless_url} - METADATA NOT FOUND" else: pgsrc = driver.page_source stored = store_page_source(pgsrc,idp,sp,test) if (stored): return (idp['entityID'],wayfless_url,check_time,"No-eduGAIN-Metadata",webdriver_error) elif(driver.page_source != "<html><head></head><body></body></html>"): if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\nInvalid-Form: No valid login form found in {e2p.ECCS2SELENIUMPAGELOADTIMEOUT} seconds" else: pgsrc = f"<h1>Invalid Form: no valid login form found in {e2p.ECCS2SELENIUMPAGELOADTIMEOUT} seconds</h1><h2>PAGE SOURCE:</h2><br/>{driver.page_source}" stored = store_page_source(pgsrc,idp,sp,test) if (stored): return (idp['entityID'],wayfless_url,check_time,"Invalid-Form",webdriver_error) else: if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\nTimeout: No valid login form loaded in {e2p.ECCS2SELENIUMPAGELOADTIMEOUT} seconds." else: pgsrc = f"<h1>Timeout - No valid login form found in {e2p.ECCS2SELENIUMPAGELOADTIMEOUT} seconds.</h1><h2>PAGE SOURCE:</h2><br/>{driver.page_source}" stored = store_page_source(pgsrc,idp,sp,test) if (stored): return (idp['entityID'],wayfless_url,check_time,"Timeout",webdriver_error) except e: if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\nTimeout: No valid login form loaded in {e2p.ECCS2SELENIUMPAGELOADTIMEOUT} seconds." else: pgsrc = driver.page_source stored = store_page_source(f"<h1>Timeout - No valid login form found in {e2p.ECCS2SELENIUMPAGELOADTIMEOUT} seconds.</h1><br/><p>{pgsrc}</p>",idp,sp,test) if (stored): return (idp['entityID'],wayfless_url,check_time,"Timeout",webdriver_error) # <input type="password"> found # This IF is for those IdPs that Timeout is caused by an image or other that do now prevent the Login process. if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source} - Timeout but OK" else: pgsrc = driver.page_source stored = store_page_source(pgsrc,idp,sp,test) if (stored): return (idp['entityID'],wayfless_url,check_time,"OK",webdriver_error) except WebDriverException as e: error = e.__dict__['msg'].split('(')[0].rstrip() if (test): pgsrc = f"\nA Connection error occurred while opening {wayfless_url}:\n\n{error}" else: pgsrc = f"<h1>CONNECTION ERROR</h1><h2>A Connection error occurred while opening <a href='{wayfless_url}'>{wayfless_url}</a>:</h2><p>{error}</p>" webdriver_error = 1 stored = store_page_source(pgsrc,idp,sp,test) if (stored): return (idp['entityID'],wayfless_url,check_time,"Connection-Error",webdriver_error) finally: driver.quit() def delete_line_with_word(filepath,word): import os.path if os.path.isfile(filepath): with open(filepath, "r") as f: lines = f.readlines() with open(filepath, "w") as f: for line in lines: if word not in line: f.write(line)