Skip to content
Snippets Groups Projects
Select Git revision
  • dad7a5a64bc20fb96a32879100f66d997ab327a7
  • develop default
  • master protected
  • feature/frontend-tests
  • 0.99
  • 0.98
  • 0.97
  • 0.96
  • 0.95
  • 0.94
  • 0.93
  • 0.92
  • 0.91
  • 0.90
  • 0.89
  • 0.88
  • 0.87
  • 0.86
  • 0.85
  • 0.84
  • 0.83
  • 0.82
  • 0.81
  • 0.80
24 results

test_policies.py

Blame
  • eccs2.py 9.46 KiB
    #!/usr/bin/env python3.8
    
    import argparse
    import json
    import logging
    import os
    import signal
    import re
    import requests
    
    from datetime import date
    from eccs2properties import ECCS2LOGSPATH, ECCS2RESULTSLOG, ECCS2CHECKSLOG, ECCS2SELENIUMLOG, ECCS2SELENIUMLOGLEVEL, FEDS_BLACKLIST, IDPS_BLACKLIST
    from selenium import webdriver
    from selenium.webdriver.common.by import By
    from selenium.webdriver.common.keys import Keys
    from selenium.webdriver.support.ui import Select
    from selenium.webdriver.remote.remote_connection import LOGGER
    from selenium.common.exceptions import NoSuchElementException
    from selenium.common.exceptions import TimeoutException
    from selenium.common.exceptions import WebDriverException
    from urllib3.exceptions import MaxRetryError
    
    
    """
      This script use Selenium and Chromium to select the IdP to check from a Shibboleth SP with the Shibboleth Embedded Discovery Service installed and configured to answer to all eduGAIN IdPs.
      The SPs used to check an IdP will be SP24(IDEM) and Attribute Viewer (SWITCH). 
      The check will be passed when both SPs will return the authentication page of the IdP checked.
    """
    
    def checkIdP(sp,idp,logger,driver):
    
       # Configure Blacklists
       federation_blacklist = FEDS_BLACKLIST
       entities_blacklist = IDPS_BLACKLIST 
    
       if (idp['registrationAuthority'] in federation_blacklist):
          logger.info("%s;%s;NULL;Federation excluded from checks" % (idp['entityID'],sp))
          return "DISABLED"
    
       if (idp['entityID'] in entities_blacklist):
          logger.info("%s;%s;NULL;IdP excluded from checks" % (idp['entityID'],sp))
          return "DISABLED"
    
       # Open SP, select the IDP from the EDS and press 'Enter' to reach the IdP login page to check
       try:
          driver.get(sp)
          driver.find_element_by_id("idpSelectInput").send_keys(idp['entityID'] + Keys.ENTER)
          page_source = driver.page_source
          status_code = requests.get(driver.current_url, verify=False).status_code
    
       except TimeoutException as e:
         logger.info("%s;%s;999;TIMEOUT" % (idp['entityID'],sp))
         return "TIMEOUT"
    
       except NoSuchElementException as e:
         logger.info("%s;%s;888;NoSuchElement" % (idp['entityID'],sp))
         print("!!! NO SUCH ELEMENT EXCEPTION !!!")
         raise e
    
       except WebDriverException as e:
         logger.info("%s;%s;777;ConnectionError" % (idp['entityID'],sp))
         print("!!! WEB DRIVER EXCEPTION !!!")
         print(e.__str__())
         return "ERROR"
    
       except requests.exceptions.ConnectionError as e:
          logger.info("%s;%s;000;ConnectionError" % (idp['entityID'],sp))
          return "ERROR"
    
       except requests.exceptions.TooManyRedirects as e:
          logger.info("%s;%s;111;TooManyRedirects" % (idp['entityID'],sp))
          return "ERROR"
    
       except requests.exceptions.RequestException as e:
          logger.info("%s;%s;222;ConnectionError" % (idp['entityID'],sp))
          print ("!!! REQUESTS EXCEPTION !!!")
          print(e.__str__())
          return "ERROR"
    
       except Exception as e:
         logger.info("%s;%s;555;ConnectionError" % (idp['entityID'],sp))
         print ("!!! EXCEPTION !!!")
         print(e.__str__())
         return "ERROR"
    
       pattern_metadata = "Unable.to.locate(\sissuer.in|).metadata(\sfor|)|no.metadata.found|profile.is.not.configured.for.relying.party|Cannot.locate.entity|fail.to.load.unknown.provider|does.not.recognise.the.service|unable.to.load.provider|Nous.n'avons.pas.pu.(charg|charger).le.fournisseur.de service|Metadata.not.found|application.you.have.accessed.is.not.registered.for.use.with.this.service|Message.did.not.meet.security.requirements"
    
       pattern_username = '<input[\s]+[^>]*((type=\s*[\'"](text|email)[\'"]|user)|(name=\s*[\'"](name)[\'"]))[^>]*>';
       pattern_password = '<input[\s]+[^>]*(type=\s*[\'"]password[\'"]|password)[^>]*>';
    
       metadata_not_found = re.search(pattern_metadata,page_source, re.I)
       username_found = re.search(pattern_username,page_source, re.I)
       password_found = re.search(pattern_password,page_source, re.I)
    
       if(metadata_not_found):
          #print("MD-NOT-FOUND - driver.current_url: %s" % (driver.current_url))
          logger.info("%s;%s;%s;No-eduGAIN-Metadata" % (idp['entityID'],sp,status_code))
          return "No-eduGAIN-Metadata"
       elif not username_found or not password_found:
          #print("INVALID-FORM - entityID: %s, sp: %s, driver.current_url: %s" % (idp['entityID'],sp,driver.current_url))
          logger.info("%s;%s;%s;Invalid-Form" % (idp['entityID'],sp,status_code))
          return "Invalid-Form"
       else:
          #print("MD-FOUND - driver.current_url: %s" % (driver.current_url))
          logger.info("%s;%s;%s;OK" % (idp['entityID'],sp,status_code))
          return "OK"
    
    
    # Use logger to produce files consumed by ECCS-2 API
    def getLogger(filename, path=".", log_level="DEBUG"):
    
        logger = logging.getLogger(filename)
        ch = logging.FileHandler(path + '/' + filename,'a','utf-8')
    
        if (log_level == "DEBUG"):
           logger.setLevel(logging.DEBUG)
           ch.setLevel(logging.DEBUG)
        elif (log_level == "INFO"):
           logger.setLevel(logging.INFO)
           ch.setLevel(logging.INFO)
        elif (log_level == "WARN"):
           logger.setLevel(logging.WARN)
           ch.setLevel(logging.WARN)
        elif (log_level == "ERROR"):
           logger.setLevel(logging.ERROR)
           ch.setLevel(logging.ERROR)
        elif (log_level == "CRITICAL"):
           logger.setLevel(logging.CRITICAL)
           ch.setLevel(logging.CRITICAL)
    
        formatter = logging.Formatter('%(message)s')
        ch.setFormatter(formatter)
        logger.addHandler(ch)
    
        return logger
    
    
    # Return a list of email address for a specific type of contact
    def getIdPContacts(idp,contactType):
    
       ctcList = []
       for ctcType in idp['contacts']:
          if (ctcType == contactType):
             for ctc in idp['contacts'][contactType]:
                if (ctc.get('emailOrPhone')):
                   if (ctc['emailOrPhone'].get('EmailAddress')):
                      ctcList.append(ctc['emailOrPhone']['EmailAddress'][0])
                   else:
                      ctcList.append('missing email')
                else:
                   ctcList.append('missing email')
    
       return ctcList
    
    def checkIdp(idp,sps,eccs2log,eccs2checksLog,driver):
          result = []
          for sp in sps:
             resultCheck = checkIdP(sp,idp,eccs2checksLog,driver)
             result.append(resultCheck)
    
          listTechContacts = getIdPContacts(idp,'technical')
          listSuppContacts = getIdPContacts(idp,'support')
    
          strTechContacts = ','.join(listTechContacts)
          strSuppContacts = ','.join(listSuppContacts)
    
          # If all checks are 'OK', than the IdP consuming correctly eduGAIN Metadata.
          if (result[0] == result[1] == "OK"):
             # IdP-DisplayName;IdP-entityID;IdP-RegAuth;IdP-tech-ctc-1,IdP-tech-ctc-2;IdP-supp-ctc-1,IdP-supp-ctc-2;Status;SP-entityID-1;SP-status-1;SP-entityID-2;SP-status-2
             eccs2log.info("%s;%s;%s;%s;%s;%s;%s;%s;%s;%s" % (
                 idp['displayname'].split(';')[1].split('==')[0],
                 idp['entityID'],
                 idp['registrationAuthority'],
                 strTechContacts,
                 strSuppContacts,
                 'OK',
                 sps[0],
                 result[0],
                 sps[1],
                 result[1]))
          elif (result[0] == result[1] == "DISABLED"):
             eccs2log.info("%s;%s;%s;%s;%s;%s;%s;%s;%s;%s" % (
                 idp['displayname'].split(';')[1].split('==')[0],
                 idp['entityID'],
                 idp['registrationAuthority'],
                 strTechContacts,
                 strSuppContacts,
                 'DISABLE',
                 sps[0],
                 result[0],
                 sps[1],
                 result[1]))
          else:
             eccs2log.info("%s;%s;%s;%s;%s;%s;%s;%s;%s;%s" % (
                 idp['displayname'].split(';')[1].split('==')[0],
                 idp['entityID'],
                 idp['registrationAuthority'],
                 strTechContacts,
                 strSuppContacts,
                 'ERROR',
                 sps[0],
                 result[0],
                 sps[1],
                 result[1]))
    
    # MAIN
    if __name__=="__main__":
    
       eccs2log = getLogger(ECCS2RESULTSLOG, ECCS2LOGSPATH, "INFO")
       eccs2checksLog = getLogger(ECCS2CHECKSLOG, ECCS2LOGSPATH, "INFO")
    
       sps = ["https://sp24-test.garr.it/secure", "https://attribute-viewer.aai.switch.ch/eds/"]
    
       parser = argparse.ArgumentParser(description='Checks if the input IdP consumed correctly eduGAIN metadata by accessing two different SPs')
       parser.add_argument("idpJson", metavar="idpJson", nargs=1, help="An IdP in Json format")
    
       args = parser.parse_args()
    
       idp = json.loads(args.idpJson[0])
    
       # Disable SSL requests warning messages
       requests.packages.urllib3.disable_warnings()
    
       # Configure Web-driver
       chrome_options = webdriver.ChromeOptions()
       chrome_options.add_argument('--headless')
       chrome_options.add_argument('--no-sandbox')
       chrome_options.add_argument('--disable-dev-shm-usage')
       chrome_options.add_argument('--ignore-certificate-errors')
       chrome_options.add_argument('--start-maximized')
       chrome_options.add_argument('--disable-extensions')
    
       LOGGER.setLevel(ECCS2SELENIUMLOGLEVEL)
    
       driver = webdriver.Chrome('chromedriver', options=chrome_options,  service_args=['--log-level=%d' % ECCS2SELENIUMLOGLEVEL, '--log-path=%s' % ECCS2SELENIUMLOG])
       # Utility for DEBUG
       #driver = webdriver.Chrome('chromedriver', options=chrome_options,  service_args=['--verbose', '--log-path=%s' % ECCS2SELENIUMLOG])
    
       # Configure timeouts: 30 sec
       driver.set_page_load_timeout(30)
       driver.set_script_timeout(30)
    
       checkIdp(idp,sps,eccs2log,eccs2checksLog,driver)
    
       #driver.delete_all_cookies()
       driver.close()
       driver.quit()
    
       # Kill process to release resources and to avoid zombies - this reaise an issue
       #pid = os.getpid()
       #os.kill(pid, signal.SIGTERM)