Skip to content
Snippets Groups Projects
utils.py 20.90 KiB
#!/usr/bin/env python3

import base64
import datetime
import json
import logging
import pathlib
import re
import requests
import six
import sys
import shutil
import time
import uuid
import zlib

import eccs_properties as e_p

from selenium import webdriver
from selenium.common.exceptions import WebDriverException,TimeoutException,NoSuchElementException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from logging.handlers import RotatingFileHandler
from urllib3.util import parse_url
from urllib.parse import urlparse, urlencode

def sha1(idp_entity_id):
    """Returns the SHA1 of the input value

    :param idp_entity_id: input value
    :return: SHA1 of the entityID
    :rtype: string
    """
    import hashlib
    result = hashlib.sha1(idp_entity_id.encode())
    return result.hexdigest()

def get_label(url_or_urn):
    """Returns a label usable for filenames

    :param url_or_urn: input value
    :return: a label
    :rtype: string
    """
    if url_or_urn.startswith('http'):
       return parse_url(url_or_urn)[2]
    else:
       return parse_url(url_or_urn)[4].lstrip('/') 

def get_reg_auth_dict(list_feds):
    """Returns a dictionary of Federation and their registration authorities

    :param list_feds: Python list containing all federations
    :return: a dictionary of '{ nameFed:reg_auth }'
    :rtype: dict
    """
    regAuth_dict = {}

    for key,value in list_feds.items():
       name = value['name']
       reg_auth = value['reg_auth']

       regAuth_dict[name] = reg_auth

    return regAuth_dict

def get_idp_list(list_eccs_idps,reg_auth=None,idp_entityid=None):
    """Returns a list of IdPs or a list of only one IdP

       - Only one if 'idp_entityid' parameter is used
       - All IdPs of a specific Federation if 'reg_auth' parameter is used
       - All eduGAIN IdPs if no parameter is used

    :param list_eccs_idps: Python list containing all IdPs provided to ECCS
    :param reg_auth: a string containing a registration authority URL
    :param idp_entityid: a string containing an IdP entityID
    :return: a list of IdPs
    :rtype: list
    """

    fed_idp_list = []
    for idp in list_eccs_idps:
       if (idp_entityid):
          if (idp['entityID'] == idp_entityid):
             fed_idp_list.append(idp)
       elif (reg_auth):
          if (idp['registrationAuthority'] == reg_auth):
             fed_idp_list.append(idp)
       else:
          fed_idp_list.append(idp)

    return fed_idp_list

def get_list_from_url(url, dest_file):
    """Downloads the content of a web page retrieved by its url into a directory and then use the saved file to return a Python list of the converted version of its content

    :param url: URL to download
    :param des_file: destination file
    :return: the resulting Python list
    :rtype: list
    """
    from pathlib import Path

    path = pathlib.Path(dest_file)
    if(path.exists() == False):
       with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f:
            f.write(requests.get(url).text)

    with open("%s" % (dest_file), mode="r", encoding='utf-8') as f:
         return json.loads(f.read().replace("'","'"))

def get_logger(path, filename, mode="a", log_level="DEBUG"):
    """Returns a logger used by ECCS API Development Server to create its log file

    :param path: directory path of the files
    :param filename: filename of the new file created
    :param mode: a(append),w(write),r(read)
    :param log_level: log level to use (DEBUG,INFO,WARN,ERROR,CRITICAL)
    :return: a logger
    :rtype: logger object
    """

    logger = logging.getLogger(__name__)
    ch = logging.handlers.RotatingFileHandler(f"{path}/{filename}", mode, 0, 5, 'utf-8')

    if (log_level == "DEBUG"):
       logger.setLevel(logging.DEBUG)
       ch.setLevel(logging.DEBUG)
    elif (log_level == "INFO"):
       logger.setLevel(logging.INFO)
       ch.setLevel(logging.INFO)
    elif (log_level == "WARN"):
       logger.setLevel(logging.WARN)
       ch.setLevel(logging.WARN)
    elif (log_level == "ERROR"):
       logger.setLevel(logging.ERROR)
       ch.setLevel(logging.ERROR)
    elif (log_level == "CRITICAL"):
       logger.setLevel(logging.CRITICAL)
       ch.setLevel(logging.CRITICAL)

    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(module)s - %(message)s', datefmt='%d/%m/%Y %H:%M:%S')
    ch.setFormatter(formatter)
    logger.addHandler(ch)

    return logger

def get_idp_contacts(idp,contactType):
    """Returns a list of email address for a specific type of contact

    :param idp: dictionary containing the IdP info
    :param contactType: type of contact to consider
    :return: a python list containing all contacts email addresses
    :rtype: list
    """

    ctcList = []
    for ctcType in idp['contacts']:
        if (ctcType == contactType):
           for ctc in idp['contacts'][contactType]:
               if (ctc.get('emailOrPhone')):
                  if (ctc['emailOrPhone'].get('EmailAddress')):
                     ctcList.append(ctc['emailOrPhone']['EmailAddress'][0])
                  else:
                     ctcList.append('missing email')
               else:
                  ctcList.append('missing email')
    return ctcList

def store_page_source(page_source,idp,sp,test):
    """Writes the login page source into a specific file

    :param page_souce: content to write into a file
    :param idp: idp owner of the page source
    :param sp: sp who has been used
    :param test: flag needed to decide if write on the console instead of on the file
    :return: True or False
    :rtype: boolean
    """

    if (test):
        sys.stdout.write(f"{page_source}")
        return True
    else:
       # Put the page_source into an appropriate HTML file
       with open(f"{e_p.ECCS_HTMLDIR}/{e_p.DAY}/{sha1(idp['entityID'])}---{get_label(sp['entityID'])}.html","w") as html:
            try:
               html.write(page_source)
               return True
            except IOError:
               return False


# Get the Google Chrome Selenium Driver
def get_driver_selenium(idp=None,sp=None,debugSelenium=False):
    """Returns driver needed to perform the ECCS check

    :param idp: the idp need to be checked
    :param sp: the sp used to check the idp
    :param debugSelenium: a flag needed to enable a more verbose logging
    :return: selenium driver
    :rtype: object
    """

    # Configure Web-driver
    # https://peter.sh/experiments/chromium-command-line-switches/
    chrome_options = Options()
    chrome_options.page_load_strategy = 'normal'
 
    #chrome_options.add_argument('--start-in-incognito')
    chrome_options.add_argument('--headless')
    chrome_options.add_argument('--no-sandbox')
    chrome_options.add_argument('--disable-dev-shm-usage')
    chrome_options.add_argument('--disable-gpu')
    chrome_options.add_argument('--disable-extensions')
    chrome_options.add_argument('--disable-dinosaur-easter-egg')
    chrome_options.add_argument('--disable-sync')

    # For DEBUG only (By default ChromeDriver logs only warnings/errors to stderr.
    # When debugging issues, it is helpful to enable more verbose logging.)
    if (debugSelenium):
       label_idp = get_label(idp['entityID'])
       label_sp = get_label(sp['entityID'])
       sha1_idp = sha1(idp['entityID'])
       try:
          driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options, service_args=['--verbose', f'--log-path={e_p.ECCS_SELENIUMLOGDIR}/{sha1_idp}_{label_idp}_{label_sp}.log'])
       except: # Wait 3 seconds before try again to get the webdriver for all kind of exception will occur
          time.sleep(3)
          driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options, service_args=['--verbose', f'--log-path={e_p.ECCS_SELENIUMLOGDIR}/{sha1_idp}_{label_idp}_{label_sp}.log'])
    else:
       try:
          driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options)
       except: # Wait 3 seconds before try again to get the webdriver for all kind of exception will occur
          time.sleep(3)
          driver = webdriver.Chrome(e_p.PATHCHROMEDRIVER, options=chrome_options)
    return driver

def follow_all_nested_iframes(driver):
    """Returns the page source reached by following all the first <iframe> found on the web pages

    :param driver: the selenium driver
    :return: the IdP login page source reached
    :rtype: string
    """

    try:
       while (driver.find_element(By.XPATH,'//iframe')):
           driver.switch_to.frame(0)
    except NoSuchElementException:
       return driver.page_source

def deflate_and_base64_encode(string_val):
    """Deflates and the base64 encodes a string

    :param string_val: The string to deflate and encode
    :return: The deflated and encoded string
    """
    if not isinstance(string_val, six.binary_type):
        string_val = string_val.encode('utf-8')
    return base64.b64encode(zlib.compress(string_val)[2:-4])

def generate_login_url(sp_entity_id, sp_http_post_acs_location, idp_http_redirect_sso_location):
    """Returns a login url needed to get the IdP Login page for an SP
    
    :param sp_entity_id: the SP entityID
    :param sp_http_post_acs_location: the SP HTTP-POST AssertionConsumerService URL
    :param idp_http_redirect_sso_location: the IDP HTTP-Redirect SSO Location
    :return: a login url built upon a SAML Authn Request
    :rtype: string
    """

    authn_request_id = f'_{str(uuid.uuid4()).replace("-", "")}'
    issue_instant = str(datetime.datetime.now(datetime.timezone.utc).isoformat(timespec='seconds')).replace('+00:00', 'Z')
    authn_request = '<samlp:AuthnRequest xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol" ' \
                    f'AssertionConsumerServiceURL="{sp_http_post_acs_location}" ' \
                    f'Destination="{idp_http_redirect_sso_location}" ' \
                    f'ID="{authn_request_id}" ' \
                    f'IssueInstant="{issue_instant}" ' \
                    'ProtocolBinding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" ' \
                    'Version="2.0">' \
                    f'<saml:Issuer xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion">{sp_entity_id}</saml:Issuer>' \
                    '<samlp:NameIDPolicy AllowCreate="1"/>' \
                    '</samlp:AuthnRequest>'
    args = {"SAMLRequest": deflate_and_base64_encode(authn_request)}
    string = urlencode(args)
    glue_char = "&" if urlparse(idp_http_redirect_sso_location).query else "?"
    return glue_char.join([idp_http_redirect_sso_location, string])

# ECCS Check made by Selenium
def check_idp_response_selenium(sp,idp,test):
    """Performs the ECCS check on an IdP

    :param sp: the SP used to test the IDP
    :param idp: the IdP to test
    :param test: a flag to perform a check without changes
    """

    # Common variables
    fqdn_idp = get_label(idp['Location'])
    saml_request_url = generate_login_url(sp['entityID'], sp['http_post_acs_location'], idp['Location'])
    robots = ""
    federations_disabled_dict = e_p.FEDS_DISABLED_DICT
    idps_disabled_dict = e_p.IDPS_DISABLED_DICT
    webdriver_error = 0 # No WebDriver Error

    # Handle Disabled Idps/Federations
    if (idp['registrationAuthority'] in federations_disabled_dict.keys()):
       check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
       page_source = federations_disabled_dict[idp['registrationAuthority']]
       store_page_source(page_source,idp,sp,test)
       return (idp['entityID'],sp['entityID'],check_time,"DISABLED",webdriver_error)

    if (idp['entityID'] in idps_disabled_dict.keys()):
       check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
       page_source = idps_disabled_dict[idp['entityID']]
       store_page_source(page_source,idp,sp,test)
       return (idp['entityID'],sp['entityID'],check_time,"DISABLED",webdriver_error)

    # Robots + SSL Check
    try:
       hdrs = {
          'User-Agent': f'{e_p.ROBOTS_USER_AGENT}'
       }
       check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
       robots = requests.get(f"https://{fqdn_idp}/robots.txt", headers=hdrs, verify=e_p.CA_BUNDLE_PATH, timeout=e_p.ECCS_REQUESTSTIMEOUT)

       if (robots == ""):
          robots  = requests.get(f"http://{fqdn_idp}/robots.txt", headers=hdrs, verify=False, timeout=e_p.ECCS_REQUESTSTIMEOUT)

    # Catch SSL Exceptions and block the ECCS check
    except requests.exceptions.SSLError as e:
       if ('unable to get local issuer certificate' not in str(e)):
          if (test): page_source = f"\nAn SSL Error occurred while opening https://{fqdn_idp}/robots.txt:\n\n{e}\n\nCheck it on SSL Labs: https://www.ssllabs.com/ssltest/analyze.html?d={fqdn_idp}"
          else: page_source = f"<h1>SSL ERROR</h1><h2>An SSL error occurred for the server {fqdn_idp}:</h2><p>{e}</p><p>Check it on SSL Labs: <a href='https://www.ssllabs.com/ssltest/analyze.html?d={fqdn_idp}'>Click Here</a></p>"
          store_page_source(page_source,idp,sp,test)
          return (idp['entityID'],sp['entityID'],check_time,"SSL-Error",webdriver_error)
       else:
          pass

    # Do not consider any other Exception
    except:
       pass

    if (robots):
       check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
       p = re.compile('^User-Agent:\sECCS\sDisallow:\s\/\s*$', re.MULTILINE)
       m = p.search(robots.text)

       if (m):
          page_source = "<h1>IdP excluded from check by robots.txt</h1>"
          store_page_source(page_source,idp,sp,test)
          return (idp['entityID'],sp['entityID'],check_time,"DISABLED",webdriver_error)

    try:
       # WebDriver MUST be instanced here to avoid problems with SESSION
       driver = get_driver_selenium(idp,sp,e_p.ECCS_SELENIUMDEBUG)

       # Exception of WebDriver raises
       if (driver == None):
          sys.stderr.write(f"get_driver_selenium() returned None for IDP {idp['entityID']}(SHA1: {sha1(idp['entityID'])}) with SP {get_label(sp['entityID'])}")
          return None

       driver.set_page_load_timeout(e_p.ECCS_SELENIUMPAGELOADTIMEOUT)
       driver.set_script_timeout(e_p.ECCS_SELENIUMSCRIPTTIMEOUT)

       check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'

       driver.get(saml_request_url)
       pgsrc = driver.page_source


       # Support HTTP Basic Authentication
       unauthorized = re.search('401.(\D.|\s.)?Unauthorized', pgsrc, re.IGNORECASE)
       if (unauthorized):
          if (test): pgsrc = f"\n[PAGE_SOURCE]\n{pgsrc}\n[SP] {sp['entityID']} - 401 UNAUTHORIZED FOUND"
          stored = store_page_source(pgsrc,idp,sp,test)
          if (stored):
             return (idp['entityID'],sp['entityID'],check_time,"OK",webdriver_error)

       metadata_not_found = re.search(e_p.METADATAPATTERN, pgsrc, re.IGNORECASE)
       if (metadata_not_found):
          if (test): pgsrc = f"\n[PAGE_SOURCE]\n{pgsrc}\n[SP] {sp['entityID']} - METADATA NOT FOUND"
          stored = store_page_source(pgsrc,idp,sp,test)
          if (stored):
             return (idp['entityID'],sp['entityID'],check_time,"No-eduGAIN-Metadata",webdriver_error)

       idp_error = re.search(e_p.IDPERROR, pgsrc, re.IGNORECASE)
       if (idp_error):
          if (test): pgsrc = f"\n[PAGE_SOURCE]\n{pgsrc}\n[SP] {sp['entityID']} - IDP ERROR"
          stored = store_page_source(pgsrc,idp,sp,test)
          if (stored):
             return (idp['entityID'],sp['entityID'],check_time,"IdP-Error",webdriver_error)

       load_js = re.search(e_p.JAVASCRIPT, pgsrc, re.IGNORECASE)
       if (load_js):
          driver.refresh()

       # If meet <iframe> follow all iframes
       if ('<iframe' in pgsrc):
          pwd_regexp = e_p.PASSWORDPATTERN
          pwd_found = re.search(pwd_regexp,pgsrc, re.IGNORECASE)
          if (not pwd_found):
             follow_all_nested_iframes(driver)

       WebDriverWait(driver, e_p.ECCS_SELENIUMPAGELOADTIMEOUT).until(
          EC.presence_of_element_located((By.XPATH,e_p.XPATH_CHECK_PATTERN))
       )

       if (test): pgsrc = f"\n[SP] {sp['entityID']} - [IDP] {idp['entityID']} - OK"
       stored = store_page_source(driver.page_source,idp,sp,test)
       if (stored):
          return (idp['entityID'],sp['entityID'],check_time,"OK",webdriver_error)


    except TimeoutException as e:
       pgsrc = driver.page_source
       metadata_not_found = re.search(e_p.METADATAPATTERN, pgsrc, re.IGNORECASE)

       try:
          input_xpath_found = driver.find_element(By.XPATH, e_p.XPATH_CHECK_PATTERN)

       except NoSuchElementException as e:
          # This IF is for those IdP that doesn't consuming the eduGAIN metadata and reaching Timeout
          if (metadata_not_found):
             if (test): pgsrc = f"\n[PAGE_SOURCE]\n{pgsrc}\n[SP] {sp['entityID']} - METADATA NOT FOUND"
             stored = store_page_source(pgsrc,idp,sp,test)
             if (stored):
                return (idp['entityID'],sp['entityID'],check_time,"No-eduGAIN-Metadata",webdriver_error)
          else: 
             try:
                response = requests.get(f"{driver.current_url}", timeout=e_p.ECCS_REQUESTSTIMEOUT)

                if (response.status_code == 401):
                   if (test): pgsrc = f"\n[PAGE_SOURCE]\nHTTP Basic Authentication\n[URL]{driver.current_url} - 401 STATUS CODE FOUND"
                   stored = store_page_source(pgsrc,idp,sp,test)
                   if (stored):
                      return (idp['entityID'],sp['entityID'],check_time,"OK",webdriver_error)

                if (response.status_code == 403):
                   if (test): pgsrc = f"\n[PAGE_SOURCE]\nForbidden\n[URL]{driver.current_url} - 403 STATUS CODE FOUND"
                   stored = store_page_source(pgsrc,idp,sp,test)
                   if (stored):
                      return (idp['entityID'],sp['entityID'],check_time,"IdP-Error",webdriver_error)
             except:
                pass   # ignore all requests exceptions

             # IdPs that do not show a Metadata error after reaching the Timeout and that raise an Exception on the "request"
             if (pgsrc != "<html><head></head><body></body></html>" or pgsrc != ""):
                if (test): pgsrc = f"\n[PAGE_SOURCE]\n{pgsrc}\nUnable-To-Check: ECCS can't check the IdP login."
                else: pgsrc = f"<h1>Unable To Check - ECCS can't check the IdP login</h1><h2>IDP LOGIN PAGE SOURCE:</h2><br/>{pgsrc}"
                stored = store_page_source(pgsrc,idp,sp,test)
                if (stored):
                   return (idp['entityID'],sp['entityID'],check_time,"Unable-To-Check",webdriver_error)
             else:
                if (test): pgsrc = f"\n[PAGE_SOURCE]\n{pgsrc}\nTimeout: No valid login form loaded in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds."
                else: pgsrc = f"<h1>Timeout - No valid login form found in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds.</h1>"
                stored = store_page_source(pgsrc,idp,sp,test)
                if (stored):
                   return (idp['entityID'],sp['entityID'],check_time,"Timeout",webdriver_error)
       # Exceptions that are not "NoSuchElementExceptions"
       except e:
          if (test): pgsrc = f"\n[PAGE_SOURCE]\n{pgsrc}\nTimeout: No valid login form loaded in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds."
          stored = store_page_source(f"<h1>Timeout - No valid login form found in {e_p.ECCS_SELENIUMPAGELOADTIMEOUT} seconds.</h1><br/><p>{pgsrc}</p>",idp,sp,test)
          if (stored):
             return (idp['entityID'],sp['entityID'],check_time,"Timeout",webdriver_error)

       # input_xpath has been found
       # This IF is for those IdPs that Timeout is caused by an image or other that do not prevent the Login process.
       if (test): pgsrc = f"\n[PAGE_SOURCE]\n{pgsrc} - Timeout but OK"
       stored = store_page_source(pgsrc,idp,sp,test)
       if (stored):
          return (idp['entityID'],sp['entityID'],check_time,"OK",webdriver_error)

    except WebDriverException as e:
           error = e.__dict__['msg'].split('(')[0].rstrip()
           if (test): pgsrc = f"\nA Connection error occurred while opening {generate_login_url(sp['entityID'], sp['http_post_acs_location'], idp['Location'])}:\n\n{error}"
           else: pgsrc = f"<h1>CONNECTION ERROR</h1><h2>A Connection error occurred while opening <a href='{generate_login_url(sp['entityID'], sp['http_post_acs_location'], idp['Location'])}'>SAML Request URL</a>:</h2><p>{error}</p>"
           webdriver_error = 1
           stored = store_page_source(pgsrc,idp,sp,test)
           if (stored):
              return (idp['entityID'],sp['entityID'],check_time,"Connection-Error",webdriver_error)

    finally:
       driver.quit()

def delete_line_with_word(filepath,word):
    """Deletes a line from a file by providing a word

    :param filepath: file where the line has to be removed
    :param word: the word that identify the line to remove
    """

    import os.path
 
    if os.path.isfile(filepath):
       with open(filepath, "r") as f:
            lines = f.readlines()

       with open(filepath, "w") as f:
            for line in lines:
                if word not in line:
                   f.write(line)