Skip to content
Snippets Groups Projects
Commit 3eadc8b9 authored by Marco Malavolti's avatar Marco Malavolti
Browse files

Refactoring

parent 6f0500cf
No related branches found
No related tags found
No related merge requests found
......@@ -3,10 +3,8 @@
import argparse
import json
import logging
import time
import os
import eccs2properties
import psutil
import signal
import re
import requests
......@@ -21,105 +19,47 @@ from selenium.common.exceptions import TimeoutException
"""
Apre un SP con Discovery Service, seleziona l'IdP di cui fare il test e lo raggiunge iniziando una vera sessione via browser.
A noi serve fare un test di accesso e presentazione della pagina di Login su 2 SP dislocati geograficamente in punti diversi.
Per questo sono stati scelti SP24(IDEM) e l'Attribute Viewer (SWITCH). Se il test fallisce su entrambi, allora non va bene.
Questo script funziona SOLO con SP aventi Embedded Discovery Service come DS.
This script use Selenium and Chromium to select the IdP to check from a Shibboleth SP with the Shibboleth Embedded Discovery Service installed and configured to answer to all eduGAIN IdPs.
The SPs used to check an IdP will be SP24(IDEM) and Attribute Viewer (SWITCH).
The check will be passed when both SPs will return the authentication page of the IdP checked.
"""
def getIdpListFromUrl():
import certifi
import urllib3
import json
manager = urllib3.PoolManager(
cert_reqs='CERT_REQUIRED',
ca_certs=certifi.where()
)
url = "https://technical.edugain.org/api.php?action=list_eccs_idps"
json_data = manager.request('GET', url)
data = json.loads(json_data.data.decode('utf-8'))
return data
def getIdpListFromFile():
import json
#with open('list_eccs_idps-idem.txt','r',encoding='utf-8') as f:
with open('federation_idps.txt','r',encoding='utf-8') as f:
json_data = json.loads(f.read())
return json_data
def checkIdP(sp,idp,logger):
# Disable SSL requests warning messages
requests.packages.urllib3.disable_warnings()
# Configure Web-driver
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--ignore-certificate-errors')
driver = webdriver.Chrome('chromedriver', options=chrome_options, service_args=['--log-path=./selenium_chromedriver.log'])
#driver = webdriver.Chrome('chromedriver', chrome_options=chrome_options, service_args=['--verbose', '--log-path=./selenium_chromedriver.log'])
#driver = webdriver.Chrome('chromedriver', chrome_options=chrome_options)
# Configure timeouts: 30 sec
driver.set_page_load_timeout(30)
driver.set_script_timeout(30)
def checkIdP(sp,idp,logger,driver):
# Configure Blacklists
federation_blacklist = [
'http://www.surfconext.nl/',
'https://www.wayf.dk',
'http://feide.no/'
]
entities_blacklist = [
'https://idp.eie.gr/idp/shibboleth',
'https://gn-vho.grnet.gr/idp/shibboleth',
'https://wtc.tu-chemnitz.de/shibboleth',
'https://wtc.tu-chemnitz.de/shibboleth',
'https://idp.fraunhofer.de/idp/shibboleth',
'https://login.hs-owl.de/nidp/saml2/metadata',
'https://idp.dfn-cert.de/idp/shibboleth'
]
federation_blacklist = eccs2properties.FEDS_BLACKLIST
entities_blacklist = eccs2properties.IDPS_BLACKLIST
if (idp['registrationAuthority'] in federation_blacklist):
logger.info("%s;%s;NULL;Federation excluded from checks" % (idp['entityID'],sp))
driver.close()
driver.quit()
return "DISABLED"
if (idp['entityID'] in entities_blacklist):
logger.info("%s;%s;NULL;IdP excluded from checks" % (idp['entityID'],sp))
driver.close()
driver.quit()
return "DISABLED"
# Open SP, select the IDP from the EDS and press 'Enter' to reach the IdP login page to check
try:
driver.get(sp)
driver.find_element_by_id("idpSelectInput").send_keys(idp['entityID'] + Keys.ENTER)
driver.find_element_by_id("username")
driver.find_element_by_id("password")
except TimeoutException as e:
driver.delete_all_cookies()
print("TIMEOUT - driver.current_url: %s" % (driver.current_url))
status_code = requests.get(driver.current_url, verify=False).status_code
logger.info("%s;%s;%s;TIMEOUT" % (idp['entityID'],sp,status_code))
driver.close()
driver.quit()
logger.info("%s;%s;999;TIMEOUT" % (idp['entityID'],sp))
return "TIMEOUT"
except NoSuchElementException as e:
driver.delete_all_cookies()
print("!!! NO SUCH ELEMENT EXCEPTION !!!")
print(e.__str__())
pass
except WebDriverException as e:
if "ConnectionRefusedError" in e.__str__():
logger.info("%s;%s;000;ConnectionError" % (idp['entityID'],sp))
return "Connection-Error"
else:
print("!!! UN-HANDLE WEB DRIVER EXCEPTION !!!")
raise e
except:
print("!!! UN-HANDLE OTHER EXCEPTION !!!")
raise e
pattern_metadata = "Unable.to.locate(\sissuer.in|).metadata(\sfor|)|no.metadata.found|profile.is.not.configured.for.relying.party|Cannot.locate.entity|fail.to.load.unknown.provider|does.not.recognise.the.service|unable.to.load.provider|Nous.n'avons.pas.pu.(charg|charger).le.fournisseur.de service|Metadata.not.found|application.you.have.accessed.is.not.registered.for.use.with.this.service|Message.did.not.meet.security.requirements"
......@@ -130,29 +70,35 @@ def checkIdP(sp,idp,logger):
username_found = re.search(pattern_username,driver.page_source, re.I)
password_found = re.search(pattern_password,driver.page_source, re.I)
if(metadata_not_found):
#print("MD-NOT-FOUND - driver.current_url: %s" % (driver.current_url))
status_code = requests.get(driver.current_url, verify=False).status_code
logger.info("%s;%s;%s;No-eduGAIN-Metadata" % (idp['entityID'],sp,status_code))
try:
r = requests.get(driver.current_url, verify=False)
status_code = r.status_code
except requests.exceptions.ConnectionError as e:
driver.delete_all_cookies()
driver.close()
driver.quit()
logger.info("%s;%s;000;ConnectionError" % (idp['entityID'],sp))
return "Connection-Error"
except requests.exceptions.RequestException as e:
driver.delete_all_cookies()
driver.close()
driver.quit()
print("!!! UN-HANDLE REQUEST EXCEPTION !!!")
raise SystemExit(e)
if(metadata_not_found):
#print("MD-NOT-FOUND - driver.current_url: %s" % (driver.current_url))
logger.info("%s;%s;%s;No-eduGAIN-Metadata" % (idp['entityID'],sp,status_code))
return "No-eduGAIN-Metadata"
elif not username_found or not password_found:
#print("INVALID-FORM - entityID: %s, sp: %s, driver.current_url: %s" % (idp['entityID'],sp,driver.current_url))
status_code = requests.get(driver.current_url, verify=False).status_code
logger.info("%s;%s;%s;Invalid-Form" % (idp['entityID'],sp,status_code))
driver.delete_all_cookies()
driver.close()
driver.quit()
return "Invalid Form"
return "Invalid-Form"
else:
#print("MD-FOUND - driver.current_url: %s" % (driver.current_url))
status_code = requests.get(driver.current_url, verify=False).status_code
logger.info("%s;%s;%s;OK" % (idp['entityID'],sp,status_code))
driver.delete_all_cookies()
driver.close()
driver.quit()
return "OK"
......@@ -196,10 +142,10 @@ def getIdPContacts(idp,contactType):
return ctcList
def checkIdp(idp,sps,eccs2log,eccs2checksLog):
def checkIdp(idp,sps,eccs2log,eccs2checksLog,driver):
result = []
for sp in sps:
resultCheck = checkIdP(sp,idp,eccs2checksLog)
resultCheck = checkIdP(sp,idp,eccs2checksLog,driver)
result.append(resultCheck)
listTechContacts = getIdPContacts(idp,'technical')
......@@ -254,7 +200,6 @@ if __name__=="__main__":
eccs2checksLog = getLogger("logs/"+eccs2properties.ECCS2CHECKSLOGPATH,"INFO")
sps = ["https://sp24-test.garr.it/secure", "https://attribute-viewer.aai.switch.ch/eds/"]
#sps = ["https://attribute-viewer.aai.switch.ch/eds/", "https://attribute-viewer.aai.switch.ch/eds/"]
parser = argparse.ArgumentParser(description='Checks if the input IdP consumed correctly eduGAIN metadata by accessing two different SPs')
parser.add_argument("idpJson", metavar="idpJson", nargs=1, help="An IdP in Json format")
......@@ -263,4 +208,32 @@ if __name__=="__main__":
idp = json.loads(args.idpJson[0])
checkIdp(idp,sps,eccs2log,eccs2checksLog)
# Disable SSL requests warning messages
requests.packages.urllib3.disable_warnings()
# Configure Web-driver
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument('--start-maximized')
chrome_options.add_argument('--disable-extensions')
#driver = webdriver.Chrome('chromedriver', options=chrome_options, service_args=['--log-path=./selenium_chromedriver.log'])
#driver = webdriver.Chrome('chromedriver', options=chrome_options, service_args=['--verbose', '--log-path=./selenium_chromedriver.log'])
driver = webdriver.Chrome('chromedriver', options=chrome_options)
# Configure timeouts: 30 sec
driver.set_page_load_timeout(30)
driver.set_script_timeout(30)
checkIdp(idp,sps,eccs2log,eccs2checksLog,driver)
driver.delete_all_cookies()
driver.close()
driver.quit()
# Kill process to release resources and to avoid zombies
# pid = os.getpid()
# os.kill(pid, signal.SIGTERM)
......@@ -3,46 +3,69 @@
import asyncio
import eccs2properties
import json
import pathlib
import requests
import sys
import time
from subprocess import Popen,PIPE
def getIdPs():
import certifi
import urllib3
import json
# returns a Dict on "{ nameFed:reg_auth }"
def getRegAuthDict(list_feds):
regAuth_dict = {}
for key,value in list_feds.items():
name = value['name']
reg_auth = value['reg_auth']
regAuth_dict[name] = reg_auth
return regAuth_dict
manager = urllib3.PoolManager(
cert_reqs='CERT_REQUIRED',
ca_certs=certifi.where()
)
url = "https://technical.edugain.org/api.php?action=list_eccs_idps"
idp_json = manager.request('GET', url)
# returns a list of IdP for a single federation
def getIdpList(list_eccs_idps,reg_auth):
idp_dict = json.loads(idp_json.data.decode('utf-8'))
fed_idp_list = []
for idp in list_eccs_idps:
if (idp['registrationAuthority'] == reg_auth):
fed_idp_list.append(idp)
idp_list = []
return fed_idp_list
#federation = input("Insert the registrationAuthority: ")
federation = "http://www.idem.garr.it/"
for idp in idp_dict:
if (idp['registrationAuthority'] == federation):
idp_list.append(idp)
# Returns a Python Dictionary
def getListFeds(url, filename):
# If file does not exists... download it into the filename
path = pathlib.Path(filename)
if(path.exists() == False):
with open("%s" % (filename), mode="w+", encoding='utf-8') as f:
f.write(requests.get(url).text)
return json.dumps(idp_list)
# then open it and work with local file
with open("%s" % (filename), mode="r", encoding='utf-8') as f:
return json.loads(f.read())
def getIdpListFromFile():
import json
# Returns a Python List
def getListEccsIdps(url, filename):
# If file does not exists... download it into the filename
path = pathlib.Path(filename)
if(path.exists() == False):
with open("%s" % (filename), mode="w+", encoding='utf-8') as f:
f.write(requests.get(url).text)
#with open('list_eccs_idps-idem.txt','r',encoding='utf-8') as f:
with open('federation_idps.txt','r',encoding='utf-8') as f:
json_data = json.loads(f.read())
return json_data
# then open it and work with local file
with open("%s" % (filename), mode="r", encoding='utf-8') as f:
return json.loads(f.read())
# Prepare input file for ECCS2
def genEccs2input(reg_auth_dict):
for name,regAuth in reg_auth_dict.items():
fed_idp_list = getIdpList(list_eccs_idps,regAuth)
filename = "/tmp/data/inputEccs2/%s.txt" % name
with open("%s" % (filename), mode="w+", encoding='utf-8') as f:
f.write(','.join(str(idp) for idp in fed_idp_list))
async def run(name,queue,stdout_file,stderr_file):
while True:
......@@ -59,9 +82,9 @@ async def run(name,queue,stdout_file,stderr_file):
stdout, stderr = await proc.communicate()
if stdout:
stdout_file.write(f'[stdout]\n{stdout.decode()}')
stdout_file.write(f'-----\n[cmd-out]\n{cmd}\n[stdout]\n{stdout.decode()}')
if stderr:
stderr_file.write(f'[stderr]\n{stderr.decode()}\n\n[cmd]\n{cmd}')
stderr_file.write(f'-----\n[cmd-err]\n{cmd}\n[stderr]\n{stderr.decode()}')
# Notify the queue that the "work cmd" has been processed.
queue.task_done()
......@@ -77,7 +100,9 @@ async def main(cmd_list,stdout_file,stderr_file):
# Create worker tasks to process the queue concurrently.
tasks = []
for i in range(30):
#for i in range(15): # !!!-WORKING-!!!
#for i in range(30): # !!!-WORSTE-!!!
for i in range(10):
task = asyncio.create_task(run("cmd-{%d}" % i, queue, stdout_file, stderr_file))
tasks.append(task)
......@@ -97,30 +122,40 @@ async def main(cmd_list,stdout_file,stderr_file):
# MAIN
if __name__=="__main__":
start = time.time()
start = time.time()
# Setup list_feds
url = 'https://technical.edugain.org/api.php?action=list_feds&opt=1'
filename = "/tmp/data/list_feds.txt"
list_feds = getListFeds(url, filename)
# Setup list_eccs_idps
url = 'https://technical.edugain.org/api.php?action=list_eccs_idps'
filename = "/tmp/data/list_eccs_idps.txt"
list_eccs_idps = getListEccsIdps(url, filename)
stdout_file = open(eccs2properties.ECCS2STDOUT,"w+")
stderr_file = open(eccs2properties.ECCS2STDERR,"w+")
'''
data = getIdPs()
# Prepare input file for ECCS2
regAuthDict = getRegAuthDict(list_feds)
#genEccs2input(regAuthDict)
f = open('federation_idps.txt', 'w')
f.write(data)
f.close()
'''
stdout_file = open(eccs2properties.ECCS2STDOUT,"w+")
stderr_file = open(eccs2properties.ECCS2STDERR,"w+")
for name,regAuth in regAuthDict.items():
idpJsonList = getIdpList(list_eccs_idps,regAuth)
idpJsonList = getIdpListFromFile()
num_idps = len(idpJsonList)
cmd_list = [["%s/eccs2.py \'%s\'" % (eccs2properties.ECCS2PATH, json.dumps(idp))] for idp in idpJsonList]
num_idps = len(idpJsonList)
cmd_list = [["%s/eccs2.py \'%s\'" % (eccs2properties.ECCS2PATH, json.dumps(idp))] for idp in idpJsonList]
proc_list = []
count = 0
while (count < num_idps):
cmd = "".join(cmd_list.pop())
proc_list.append(cmd)
count = count + 1
proc_list = []
count = 0
while (count < num_idps):
cmd = "".join(cmd_list.pop())
proc_list.append(cmd)
count = count + 1
asyncio.run(main(proc_list,stdout_file,stderr_file))
asyncio.run(main(proc_list,stdout_file,stderr_file))
# asyncio.run(main(cmd_list,stdout_file,stderr_file))
end = time.time()
print("Time taken in seconds - ", end - start)
end = time.time()
print("Time taken in seconds - ", end - start)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment