Skip to content
Snippets Groups Projects
utils.py 14.84 KiB
#!/usr/bin/env python3

import datetime
import json
import logging
import pathlib
import re
import requests
import sys
import shutil
import time

import eccs2properties as e2p

from selenium import webdriver
from selenium.common.exceptions import WebDriverException,TimeoutException,NoSuchElementException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from logging.handlers import RotatingFileHandler
from urllib3.util import parse_url

def sha1(idp_entity_id):
    import hashlib
    result = hashlib.sha1(idp_entity_id.encode())
    return result.hexdigest()


# Return a label useful for a filename
def get_label(url_or_urn):
    if url_or_urn.startswith('http'):
       return parse_url(url_or_urn)[2]
    else:
       return parse_url(url_or_urn)[4].lstrip('/') 


# Returns a Dict of "{ nameFed:reg_auth }"
def get_reg_auth_dict(list_feds):
    regAuth_dict = {}

    for key,value in list_feds.items():
       name = value['name']
       reg_auth = value['reg_auth']

       regAuth_dict[name] = reg_auth

    return regAuth_dict


# Returns a list of IdP for a single federation
def get_idp_list(list_eccs_idps,reg_auth=None,idp_entityid=None):
    fed_idp_list = []
    for idp in list_eccs_idps:
       if (idp_entityid):
          if (idp['entityID'] == idp_entityid):
             fed_idp_list.append(idp)
       elif (reg_auth):
          if (idp['registrationAuthority'] == reg_auth):
             fed_idp_list.append(idp)
       else:
          fed_idp_list.append(idp)

    return fed_idp_list


# Download all eduGAIN Federations from URL, store them on a local file and returns a Python Dictionary
def get_list_feds(url, dest_file):
    from pathlib import Path

    # If file does not exists... download it into the dest_file
    path = pathlib.Path(dest_file)
    if(path.exists() == False):
       with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f:
            f.write(requests.get(url).text)

    # then open it and work with local file
    with open("%s" % (dest_file), mode="r", encoding='utf-8') as f:
         return json.loads(f.read().replace("'","'"))


# Download all eduGAIN IdPs from URL, store them on a local file and returns a Python List
def get_list_eccs_idps(url, dest_file):
    from pathlib import Path

    # If file does not exists... download it into the dest_file
    path = pathlib.Path(dest_file)
    if(path.exists() == False):
       with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f:
            f.write(requests.get(url).text)

    # then open it and work with local file
    with open("%s" % (dest_file), mode="r", encoding='utf-8') as f:
         return json.loads(f.read().replace("'","'"))


# Use logger to produce files consumed by ECCS-2 API
def get_logger(path, filename, mode="a", log_level="DEBUG"):

    logger = logging.getLogger(__name__)
    ch = logging.handlers.RotatingFileHandler(f"{path}/{filename}", mode, 0, 5, 'utf-8')

    if (log_level == "DEBUG"):
       logger.setLevel(logging.DEBUG)
       ch.setLevel(logging.DEBUG)
    elif (log_level == "INFO"):
       logger.setLevel(logging.INFO)
       ch.setLevel(logging.INFO)
    elif (log_level == "WARN"):
       logger.setLevel(logging.WARN)
       ch.setLevel(logging.WARN)
    elif (log_level == "ERROR"):
       logger.setLevel(logging.ERROR)
       ch.setLevel(logging.ERROR)
    elif (log_level == "CRITICAL"):
       logger.setLevel(logging.CRITICAL)
       ch.setLevel(logging.CRITICAL)

    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(module)s - %(message)s', datefmt='%d/%m/%Y %H:%M:%S')
    ch.setFormatter(formatter)
    logger.addHandler(ch)

    return logger


# Return a list of email address for a specific type of contact
def get_idp_contacts(idp,contactType):
    ctcList = []
    for ctcType in idp['contacts']:
        if (ctcType == contactType):
           for ctc in idp['contacts'][contactType]:
               if (ctc.get('emailOrPhone')):
                  if (ctc['emailOrPhone'].get('EmailAddress')):
                     ctcList.append(ctc['emailOrPhone']['EmailAddress'][0])
                  else:
                     ctcList.append('missing email')
               else:
                  ctcList.append('missing email')
    return ctcList


# Write the login page source code into its file
def store_page_source(page_source,idp,sp,test):
    if (test):
        sys.stdout.write(f"{page_source}")
        return True
    else:
       # Put the page_source into an appropriate HTML file
       with open(f"{e2p.ECCS2HTMLDIR}/{e2p.DAY}/{sha1(idp['entityID'])}---{get_label(sp)}.html","w") as html:
            try:
               html.write(page_source)
               return True
            except IOError:
               return False


# Get the Google Chrom Selenium Driver
def get_driver_selenium(idp=None,sp=None,debugSelenium=False):

    # Configure Web-driver
    # https://peter.sh/experiments/chromium-command-line-switches/
    chrome_options = Options()
    chrome_options.page_load_strategy = 'normal'
 
    chrome_options.add_argument('--start-in-incognito')
    chrome_options.add_argument('--headless')
    chrome_options.add_argument('--no-sandbox')
    chrome_options.add_argument('--disable-dev-shm-usage')
    chrome_options.add_argument('--disable-gpu')
    chrome_options.add_argument('--disable-extensions')
    chrome_options.add_argument('--disable-dinosaur-easter-egg')
    chrome_options.add_argument('--disable-sync')

    # For DEBUG only (By default ChromeDriver logs only warnings/errors to stderr.
    # When debugging issues, it is helpful to enable more verbose logging.)
    if (debugSelenium):
       label_idp = get_label(idp['entityID'])
       label_sp = get_label(sp)
       sha1_idp = sha1(idp['entityID'])
       try:
          driver = webdriver.Chrome(e2p.PATHCHROMEDRIVER, options=chrome_options, service_args=['--verbose', f'--log-path={e2p.ECCS2SELENIUMLOGDIR}/{sha1_idp}_{label_idp}_{label_sp}.log'])
       except: # Wait 3 seconds before try again to get the webdriver for all kind of exception will occur
          time.sleep(3)
          driver = webdriver.Chrome(e2p.PATHCHROMEDRIVER, options=chrome_options, service_args=['--verbose', f'--log-path={e2p.ECCS2SELENIUMLOGDIR}/{sha1_idp}_{label_idp}_{label_sp}.log'])
    else:
       try:
          driver = webdriver.Chrome(e2p.PATHCHROMEDRIVER, options=chrome_options)
       except: # Wait 3 seconds before try again to get the webdriver for all kind of exception will occur
          time.sleep(3)
          driver = webdriver.Chrome(e2p.PATHCHROMEDRIVER, options=chrome_options)
    return driver

def follow_all_nested_iframes(driver):
    try:
       while (driver.find_element(By.XPATH,'//iframe')):
           driver.switch_to.frame(0)
    except NoSuchElementException:
       return driver.page_source

# ECCS2 Check made by Selenium
def check_idp_response_selenium(sp,idp,test):

    # Disable SSL requests warning messages
    #requests.packages.urllib3.disable_warnings()

    # Common variables
    fqdn_idp = get_label(idp['Location'])
    wayfless_url = f"{sp}{idp['entityID']}"
    robots = ""
    federations_disabled_dict = e2p.FEDS_DISABLED_DICT
    idps_disabled_dict = e2p.IDPS_DISABLED_DICT
    webdriver_error = 0 # No WebDriver Error

    # Handle Disabled Idps/Federations
    if (idp['registrationAuthority'] in federations_disabled_dict.keys()):
       check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
       page_source = federations_disabled_dict[idp['registrationAuthority']]
       store_page_source(page_source,idp,sp,test)
       return (idp['entityID'],wayfless_url,check_time,"DISABLED",webdriver_error)

    if (idp['entityID'] in idps_disabled_dict.keys()):
       check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
       page_source = idps_disabled_dict[idp['entityID']]
       store_page_source(page_source,idp,sp,test)
       return (idp['entityID'],wayfless_url,check_time,"DISABLED",webdriver_error)

    # Robots + SSL Check
    try:
       hdrs = {
          'User-Agent': f'{e2p.ROBOTS_USER_AGENT}'
       }
       check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
       robots = requests.get(f"https://{fqdn_idp}/robots.txt", headers=hdrs, verify=True, timeout=e2p.ECCS2REQUESTSTIMEOUT)

       if (robots == ""):
          robots  = requests.get(f"http://{fqdn_idp}/robots.txt", headers=hdrs, verify=False, timeout=e2p.ECCS2REQUESTSTIMEOUT)

    # Catch SSL Exceptions and block the ECCS check
    except requests.exceptions.SSLError as e:
       if (test): page_source = f"\nAn SSL Error occurred while opening https://{fqdn_idp}/robots.txt:\n\n{e}\n\nCheck it on SSL Labs: https://www.ssllabs.com/ssltest/analyze.html?d={fqdn_idp}"
       else: page_source = f"<h1>SSL ERROR</h1><h2>An SSL error occurred for the server {fqdn_idp}:</h2><p>{e}</p><p>Check it on SSL Labs: <a href='https://www.ssllabs.com/ssltest/analyze.html?d={fqdn_idp}'>Click Here</a></p>"
       store_page_source(page_source,idp,sp,test)
       return (idp['entityID'],wayfless_url,check_time,"SSL-Error",webdriver_error)

    # Do not consider any other Exception
    except:
       pass

    if (robots):
       check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
       p = re.compile('^User-Agent:\sECCS\sDisallow:\s\/\s*$', re.MULTILINE)
       m = p.search(robots.text)

       if (m):
          page_source = "<h1>IdP excluded from check by robots.txt</h1>"
          store_page_source(page_source,idp,sp,test)
          return (idp['entityID'],wayfless_url,check_time,"DISABLED",webdriver_error)

    try:
       # WebDriver MUST be instanced here to avoid problems with SESSION
       driver = get_driver_selenium(idp,sp,e2p.ECCS2SELENIUMDEBUG)

       # Exception of WebDriver raises
       if (driver == None):
          sys.stderr.write(f"get_driver_selenium() returned None for IDP {idp['entityID']}(SHA1: {sha1(idp['entityID'])}) with SP {get_label(sp)}")
          return None

       driver.set_page_load_timeout(e2p.ECCS2SELENIUMPAGELOADTIMEOUT)
       driver.set_script_timeout(e2p.ECCS2SELENIUMSCRIPTTIMEOUT)

       check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
       driver.get(wayfless_url)

       metadata_not_found = re.search(e2p.METADATAPATTERN,driver.page_source, re.I)

       if (metadata_not_found):
          if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\n[WAYFLESS URL]{wayfless_url} - METADATA NOT FOUND"
          else: pgsrc = driver.page_source
          stored = store_page_source(pgsrc,idp,sp,test)
          if (stored):
             return (idp['entityID'],wayfless_url,check_time,"No-eduGAIN-Metadata",webdriver_error)

       # If meet <iframe> follow all iframes
       if ('<iframe' in driver.page_source):
          follow_all_nested_iframes(driver)

       WebDriverWait(driver, e2p.ECCS2SELENIUMPAGELOADTIMEOUT).until(
          EC.presence_of_element_located((By.XPATH,'//input[@type="password"]'))
       )

       if (test): pgsrc = f"\n[WAYFLESS_URL]\n{wayfless_url} - OK"
       else: pgsrc = driver.page_source
       stored = store_page_source(pgsrc,idp,sp,test)
       if (stored):
          return (idp['entityID'],wayfless_url,check_time,"OK",webdriver_error)


    except TimeoutException as e:
       metadata_not_found = re.search(e2p.METADATAPATTERN,driver.page_source, re.I)

       try:
          input_password_found = driver.find_element(By.XPATH,'//input[@type="password"]')

       except NoSuchElementException as e:
          # This IF is for those IdP that doesn't consuming the eduGAIN metadata and reaching Timeout
          if (metadata_not_found):
             if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\n[WAYFLESS URL]{wayfless_url} - METADATA NOT FOUND"
             else: pgsrc = driver.page_source
             stored = store_page_source(pgsrc,idp,sp,test)
             if (stored):
                return (idp['entityID'],wayfless_url,check_time,"No-eduGAIN-Metadata",webdriver_error)
          elif(driver.page_source != "<html><head></head><body></body></html>"):
             if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\nInvalid-Form: No valid login form found in {e2p.ECCS2SELENIUMPAGELOADTIMEOUT} seconds"
             else: pgsrc = f"<h1>Invalid Form: no valid login form found in {e2p.ECCS2SELENIUMPAGELOADTIMEOUT} seconds</h1><h2>PAGE SOURCE:</h2><br/>{driver.page_source}"
             stored = store_page_source(pgsrc,idp,sp,test)
             if (stored):
                return (idp['entityID'],wayfless_url,check_time,"Invalid-Form",webdriver_error)
          else:
             if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\nTimeout: No valid login form loaded in {e2p.ECCS2SELENIUMPAGELOADTIMEOUT} seconds."
             else: pgsrc = f"<h1>Timeout - No valid login form found in {e2p.ECCS2SELENIUMPAGELOADTIMEOUT} seconds.</h1><h2>PAGE SOURCE:</h2><br/>{driver.page_source}"
             stored = store_page_source(pgsrc,idp,sp,test)
             if (stored):
                return (idp['entityID'],wayfless_url,check_time,"Timeout",webdriver_error)

       except e:
          if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source}\nTimeout: No valid login form loaded in {e2p.ECCS2SELENIUMPAGELOADTIMEOUT} seconds."
          else: pgsrc = driver.page_source
          stored = store_page_source(f"<h1>Timeout - No valid login form found in {e2p.ECCS2SELENIUMPAGELOADTIMEOUT} seconds.</h1><br/><p>{pgsrc}</p>",idp,sp,test)
          if (stored):
             return (idp['entityID'],wayfless_url,check_time,"Timeout",webdriver_error)


       # <input type="password"> found
       # This IF is for those IdPs that Timeout is caused by an image or other that do now prevent the Login process.
       if (test): pgsrc = f"\n[PAGE_SOURCE]\n{driver.page_source} - Timeout but OK"
       else: pgsrc = driver.page_source
       stored = store_page_source(pgsrc,idp,sp,test)
       if (stored):
          return (idp['entityID'],wayfless_url,check_time,"OK",webdriver_error)

    except WebDriverException as e:
           error = e.__dict__['msg'].split('(')[0].rstrip()
           if (test): pgsrc = f"\nA Connection error occurred while opening {wayfless_url}:\n\n{error}"
           else: pgsrc = f"<h1>CONNECTION ERROR</h1><h2>A Connection error occurred while opening <a href='{wayfless_url}'>{wayfless_url}</a>:</h2><p>{error}</p>"
           webdriver_error = 1
           stored = store_page_source(pgsrc,idp,sp,test)
           if (stored):
              return (idp['entityID'],wayfless_url,check_time,"Connection-Error",webdriver_error)

    finally:
       driver.quit()

def delete_line_with_word(filepath,word):
    import os.path
 
    if os.path.isfile(filepath):
       with open(filepath, "r") as f:
            lines = f.readlines()

       with open(filepath, "w") as f:
            for line in lines:
                if word not in line:
                   f.write(line)