Skip to content
Snippets Groups Projects
utils.py 15.26 KiB
#!/usr/bin/env python3

import datetime
import json
import logging
import pathlib
import re
import requests
import sys
import shutil
import time

import eccs_properties as e_p

from selenium import webdriver
from selenium.common.exceptions import WebDriverException,TimeoutException,NoSuchElementException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from logging.handlers import RotatingFileHandler
from urllib3.util import parse_url

def sha1(idp_entity_id):
    import hashlib
    result = hashlib.sha1(idp_entity_id.encode())
    return result.hexdigest()


# Return a label useful for a filename
def get_label(url_or_urn):
    if url_or_urn.startswith('http'):
       return parse_url(url_or_urn)[2]
    else:
       return parse_url(url_or_urn)[4].lstrip('/') 


# Returns a Dict of "{ nameFed:reg_auth }"
def get_reg_auth_dict(list_feds):
    regAuth_dict = {}

    for key,value in list_feds.items():
       name = value['name']
       reg_auth = value['reg_auth']

       regAuth_dict[name] = reg_auth

    return regAuth_dict


# Returns a list of IdP for a single federation
def get_idp_list(list_eccs_idps,reg_auth=None,idp_entityid=None):
    fed_idp_list = []
    for idp in list_eccs_idps:
       if (idp_entityid):
          if (idp['entityID'] == idp_entityid):
             fed_idp_list.append(idp)
       elif (reg_auth):
          if (idp['registrationAuthority'] == reg_auth):
             fed_idp_list.append(idp)
       else:
          fed_idp_list.append(idp)

    return fed_idp_list


# Download all eduGAIN Federations from URL, store them on a local file and returns a Python Dictionary
def get_list_feds(url, dest_file):
    from pathlib import Path

    # If file does not exists... download it into the dest_file
    path = pathlib.Path(dest_file)
    if(path.exists() == False):
       with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f:
            f.write(requests.get(url).text)

    # then open it and work with local file
    with open("%s" % (dest_file), mode="r", encoding='utf-8') as f:
         return json.loads(f.read().replace("'","'"))


# Download all eduGAIN IdPs from URL, store them on a local file and returns a Python List
def get_list_eccs_idps(url, dest_file):
    from pathlib import Path

    # If file does not exists... download it into the dest_file
    path = pathlib.Path(dest_file)
    if(path.exists() == False):
       with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f:
            f.write(requests.get(url).text)

    # then open it and work with local file
    with open("%s" % (dest_file), mode="r", encoding='utf-8') as f:
         return json.loads(f.read().replace("'","'"))


# Use logger to produce files consumed by ECCS API
def get_logger(path, filename, mode="a", log_level="DEBUG"):

    logger = logging.getLogger(__name__)
    ch = logging.handlers.RotatingFileHandler(f"{path}/{filename}", mode, 0, 5, 'utf-8')

    if (log_level == "DEBUG"):
       logger.setLevel(logging.DEBUG)
       ch.setLevel(logging.DEBUG)
    elif (log_level == "INFO"):
       logger.setLevel(logging.INFO)
       ch.setLevel(logging.INFO)
    elif (log_level == "WARN"):
       logger.setLevel(logging.WARN)
       ch.setLevel(logging.WARN)
    elif (log_level == "ERROR"):
       logger.setLevel(logging.ERROR)
       ch.setLevel(logging.ERROR)
    elif (log_level == "CRITICAL"):
       logger.setLevel(logging.CRITICAL)
       ch.setLevel(logging.CRITICAL)

    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(module)s - %(message)s', datefmt='%d/%m/%Y %H:%M:%S')
    ch.setFormatter(formatter)
    logger.addHandler(ch)

    return logger


# Return a list of email address for a specific type of contact
def get_idp_contacts(idp,contactType):
    ctcList = []
    for ctcType in idp['contacts']:
        if (ctcType == contactType):
           for ctc in idp['contacts'][contactType]:
               if (ctc.get('emailOrPhone')):
                  if (ctc['emailOrPhone'].get('EmailAddress')):
                     ctcList.append(ctc['emailOrPhone']['EmailAddress'][0])
                  else:
                     ctcList.append('missing email')
               else:
                  ctcList.append('missing email')
    return ctcList


# Write the login page source code into its file
def store_page_source(page_source,idp,sp,test):
    if (test):
        sys.stdout.write(f"{page_source}")
        return True
    else:
       # Put the page_source into an appropriate HTML file
       with open(f"{e_p.ECCS_HTMLDIR}/{e_p.DAY}/{sha1(idp['entityID'])}---{get_label(sp)}.html","w") as html:
            try:
               html.write(page_source)
               return True
            except IOError:
               return False


# Get the Google Chrom Selenium Driver
def get_driver_selenium(idp=None,sp=None,debugSelenium=False):

    # Configure Web-driver
    # https://peter.sh/experiments/chromium-command-line-switches/
    chrome_options = Options()
    chrome_options.page_load_strategy = 'normal'
 
    chrome_options.add_argument('--start-in-incognito')
    chrome_options.add_argument('--headless')
    chrome_options.add_argument('--no-sandbox')
    chrome_options.add_argument('--disable-dev-shm-usage')
    chrome_options.add_argument('--disable-gpu')
    chrome_options.add_argument('--disable-extensions')
    chrome_options.add_argument('--disable-dinosaur-easter-egg')
    chrome_options.add_argument('--disable-sync')

    # For DEBUG only (By default ChromeDriver logs only warnings/errors to stderr.
    # When debugging issues, it is helpful to enable more verbose logging.)
    if (debugSelenium):
       label_idp = get_label(idp['entityID'])
       label_sp = get_label(sp)
       sha1_idp = sha1(idp['entityID'])
       try:
          driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options, service_args=['--verbose', f'--log-path={e_p.ECCS_SELENIUMLOGDIR}/{sha1_idp}_{label_idp}_{label_sp}.log'])
       except: # Wait 3 seconds before try again to get the webdriver for all kind of exception will occur
          time.sleep(3)
          driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options, service_args=['--verbose', f'--log-path={e_p.ECCS_SELENIUMLOGDIR}/{sha1_idp}_{label_idp}_{label_sp}.log'])
    else:
       try:
          driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options)
       except: # Wait 3 seconds before try again to get the webdriver for all kind of exception will occur
          time.sleep(3)
          driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options)
    return driver

def follow_all_nested_iframes(driver):
    try:
       while (driver.find_element(By.XPATH,'//iframe')):
           driver.switch_to.frame(0)
    except NoSuchElementException:
       return driver.page_source

# ECCS Check made by Selenium
def check_idp_response_selenium(sp,idp,test):

    # Disable SSL requests warning messages
    #requests.packages.urllib3.disable_warnings()

    # Common variables
    fqdn_idp = get_label(idp['Location'])
    wayfless_url = f"{sp}{idp['entityID']}"
    robots = ""
    federations_disabled_dict = e_p.FEDS_DISABLED_DICT
    idps_disabled_dict = e_p.IDPS_DISABLED_DICT
    webdriver_error = 0 # No WebDriver Error

    # Handle Disabled Idps/Federations
    if (idp['registrationAuthority'] in federations_disabled_dict.keys()):
       check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
       page_source = federations_disabled_dict[idp['registrationAuthority']]
       store_page_source(page_source,idp,sp,test)
       return (idp['entityID'],wayfless_url,check_time,"DISABLED",webdriver_error)

    if (idp['entityID'] in idps_disabled_dict.keys()):
       check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
       page_source = idps_disabled_dict[idp['entityID']]
       store_page_source(page_source,idp,sp,test)
       return (idp['entityID'],wayfless_url,check_time,"DISABLED",webdriver_error)

    # Robots + SSL Check
    try:
       hdrs = {
          'User-Agent': f'{e_p.ROBOTS_USER_AGENT}'
       }
       check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
       robots = requests.get(f"https://{fqdn_idp}/robots.txt", headers=hdrs, verify=True, timeout=e_p.ECCS_REQUESTSTIMEOUT)

       if (robots == ""):
          robots  = requests.get(f"http://{fqdn_idp}/robots.txt", headers=hdrs, verify=False, timeout=e_p.ECCS_REQUESTSTIMEOUT)

    # Catch SSL Exceptions and block the ECCS check
    except requests.exceptions.SSLError as e:
       if (test): page_source = f"\nAn SSL Error occurred while opening https://{fqdn_idp}/robots.txt:\n\n{e}\n\nCheck it on SSL Labs: https://www.ssllabs.com/ssltest/analyze.html?d={fqdn_idp}"
       else: page_source = f"<h1>SSL ERROR</h1><h2>An SSL error occurred for the server {fqdn_idp}:</h2><p>{e}</p><p>Check it on SSL Labs: <a href='https://www.ssllabs.com/ssltest/analyze.html?d={fqdn_idp}'>Click Here</a></p>"
       store_page_source(page_source,idp,sp,test)
       return (idp['entityID'],wayfless_url,check_time,"SSL-Error",webdriver_error)

    # Do not consider any other Exception
    except:
       pass

    if (robots):
       check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
       p = re.compile('^User-Agent:\sECCS\sDisallow:\s\/\s*$', re.MULTILINE)
       m = p.search(robots.text)

       if (m):
          page_source = "<h1>IdP excluded from check by robots.txt</h1>"
          store_page_source(page_source,idp,sp,test)
          return (idp['entityID'],wayfless_url,check_time,"DISABLED",webdriver_error)

    try:
       # WebDriver MUST be instanced here to avoid problems with SESSION
       driver = get_driver_selenium(idp,sp,e_p.ECCS_SELENIUMDEBUG)

       # Exception of WebDriver raises
       if (driver == None):
          sys.stderr.write(f"get_driver_selenium() returned None for IDP {idp['entityID']}(SHA1: {sha1(idp['entityID'])}) with SP {get_label(sp)}")
          return None

       driver.set_page_load_timeout(e_p.ECCS_SELENIUMPAGELOADTIMEOUT)
       driver.set_script_timeout(e_p.ECCS_SELENIUMSCRIPTTIMEOUT)

       check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
       driver.get(wayfless_url)

       metadata_not_found = re.search(e_p.METADATAPATTERN,driver.page_source, re.I)

       idp_error = re.search(e_p.IDPERROR,driver.page_source, re.I)

       if (metadata_not_found):
          if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\n[WAYFLESS URL]{wayfless_url} - METADATA NOT FOUND"
          else: pgsrc = driver.page_source
          stored = store_page_source(pgsrc,idp,sp,test)
          if (stored):
             return (idp['entityID'],wayfless_url,check_time,"No-eduGAIN-Metadata",webdriver_error)

       if (idp_error):
          if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\n[WAYFLESS URL]{wayfless_url} - IDP ERROR"
          else: pgsrc = driver.page_source
          stored = store_page_source(pgsrc,idp,sp,test)
          if (stored):
             return (idp['entityID'],wayfless_url,check_time,"IdP-Error",webdriver_error)

       # If meet <iframe> follow all iframes
       if ('<iframe' in driver.page_source):
          follow_all_nested_iframes(driver)

       driver.refresh()

       WebDriverWait(driver, e_p.ECCS_SELENIUMPAGELOADTIMEOUT).until(
          EC.presence_of_element_located((By.XPATH,'//input[@type="password"]'))
       )

       if (test): pgsrc = f"\n[WAYFLESS_URL]\n{wayfless_url} - OK"
       else: pgsrc = driver.page_source
       stored = store_page_source(pgsrc,idp,sp,test)
       if (stored):
          return (idp['entityID'],wayfless_url,check_time,"OK",webdriver_error)


    except TimeoutException as e:
       metadata_not_found = re.search(e_p.METADATAPATTERN,driver.page_source, re.I)

       try:
          input_password_found = driver.find_element(By.XPATH,'//input[@type="password"]')

       except NoSuchElementException as e:
          # This IF is for those IdP that doesn't consuming the eduGAIN metadata and reaching Timeout
          if (metadata_not_found):
             if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\n[WAYFLESS URL]{wayfless_url} - METADATA NOT FOUND"
             else: pgsrc = driver.page_source
             stored = store_page_source(pgsrc,idp,sp,test)
             if (stored):
                return (idp['entityID'],wayfless_url,check_time,"No-eduGAIN-Metadata",webdriver_error)
          elif(driver.page_source != "<html><head></head><body></body></html>"):
             if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\nInvalid-Form: No valid login form found in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds"
             else: pgsrc = f"<h1>Invalid Form: no valid login form found in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds</h1><h2>PAGE SOURCE:</h2><br/>{driver.page_source}"
             stored = store_page_source(pgsrc,idp,sp,test)
             if (stored):
                return (idp['entityID'],wayfless_url,check_time,"Invalid-Form",webdriver_error)
          else:
             if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\nTimeout: No valid login form loaded in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds."
             else: pgsrc = f"<h1>Timeout - No valid login form found in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds.</h1><h2>PAGE SOURCE:</h2><br/>{driver.page_source}"
             stored = store_page_source(pgsrc,idp,sp,test)
             if (stored):
                return (idp['entityID'],wayfless_url,check_time,"Timeout",webdriver_error)

       except e:
          if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\nTimeout: No valid login form loaded in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds."
          else: pgsrc = driver.page_source
          stored = store_page_source(f"<h1>Timeout - No valid login form found in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds.</h1><br/><p>{pgsrc}</p>",idp,sp,test)
          if (stored):
             return (idp['entityID'],wayfless_url,check_time,"Timeout",webdriver_error)


       # <input type="password"> found
       # This IF is for those IdPs that Timeout is caused by an image or other that do not prevent the Login process.
       if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source} - Timeout but OK"
       else: pgsrc = driver.page_source
       stored = store_page_source(pgsrc,idp,sp,test)
       if (stored):
          return (idp['entityID'],wayfless_url,check_time,"OK",webdriver_error)

    except WebDriverException as e:
           error = e.__dict__['msg'].split('(')[0].rstrip()
           if (test): pgsrc = f"\nA Connection error occurred while opening {wayfless_url}:\n\n{error}"
           else: pgsrc = f"<h1>CONNECTION ERROR</h1><h2>A Connection error occurred while opening <a href='{wayfless_url}'>{wayfless_url}</a>:</h2><p>{error}</p>"
           webdriver_error = 1
           stored = store_page_source(pgsrc,idp,sp,test)
           if (stored):
              return (idp['entityID'],wayfless_url,check_time,"Connection-Error",webdriver_error)

    finally:
       driver.quit()

def delete_line_with_word(filepath,word):
    import os.path
 
    if os.path.isfile(filepath):
       with open(filepath, "r") as f:
            lines = f.readlines()

       with open(filepath, "w") as f:
            for line in lines:
                if word not in line:
                   f.write(line)