Skip to content
Snippets Groups Projects
Commit 8aece9f0 authored by Marco Malavolti's avatar Marco Malavolti
Browse files

Add mode test-only on a single IdP

parent 506890db
Branches
Tags
No related merge requests found
......@@ -35,7 +35,7 @@ def getIDPfqdn(entityIDidp):
# The function check that the IdP recognized the SP by presenting its Login page.
# If the IdP Login page contains "username" and "password" fields, than the test is passed.
def checkIdP(sp,idp):
def checkIdP(sp,idp,test):
# Chromedriver MUST be instanced here to avoid problems with SESSION
# Disable SSL requests warning messages
......@@ -60,15 +60,23 @@ def checkIdP(sp,idp):
if (idp['registrationAuthority'] in federation_blacklist):
check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
with open("%s/%s/%s---%s.html" % (ECCS2HTMLDIR,DAY,fqdn_idp,fqdn_sp),"w") as html:
html.write("Federation excluded from check")
if (test is not True):
with open("%s/%s/%s---%s.html" % (ECCS2HTMLDIR,DAY,fqdn_idp,fqdn_sp),"w") as html:
html.write("Federation excluded from check")
else:
print("Federation excluded from check")
return (idp['entityID'],wayfless_url,check_time,"NULL","DISABLED")
if (idp['entityID'] in entities_blacklist):
check_time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S') + 'Z'
with open("%s/%s/%s---%s.html" % (ECCS2HTMLDIR,DAY,fqdn_idp,fqdn_sp),"w") as html:
html.write("Identity Provider excluded from check")
if (test is not True):
with open("%s/%s/%s---%s.html" % (ECCS2HTMLDIR,DAY,fqdn_idp,fqdn_sp),"w") as html:
html.write("Identity Provider excluded from check")
else:
print("Identity Provider excluded from check")
return (idp['entityID'],wayfless_url,check_time,"NULL","DISABLED")
# Open SP, select the IDP from the EDS and press 'Enter' to reach the IdP login page to check
......@@ -78,14 +86,20 @@ def checkIdP(sp,idp):
page_source = driver.page_source
samlrequest_url = driver.current_url
# Put the page_source into an appropriate HTML file
with open("%s/%s/%s---%s.html" % (ECCS2HTMLDIR,DAY,fqdn_idp,fqdn_sp),"w") as html:
html.write(page_source)
if (test is not True):
# Put the page_source into an appropriate HTML file
with open("%s/%s/%s---%s.html" % (ECCS2HTMLDIR,DAY,fqdn_idp,fqdn_sp),"w") as html:
html.write(page_source)
else:
print("[page_source of %s]\n%s" % (fqdn_idp,page_source))
except TimeoutException as e:
# Put an empty string into the page_source file
with open("%s/%s/%s---%s.html" % (ECCS2HTMLDIR,DAY,fqdn_idp,fqdn_sp),"w") as html:
html.write("")
if (test is not True):
# Put an empty string into the page_source file
with open("%s/%s/%s---%s.html" % (ECCS2HTMLDIR,DAY,fqdn_idp,fqdn_sp),"w") as html:
html.write("")
else:
print("[page_source of %s]\nNo source code" % (fqdn_idp))
return (idp['entityID'],wayfless_url,check_time,"(failed)","Timeout")
except NoSuchElementException as e:
......@@ -169,7 +183,7 @@ def getDisplayName(display_name):
# Append the result of the check on a file
def storeECCS2result(idp,check_results,idp_status):
def storeECCS2result(idp,check_results,idp_status,test):
# Build the contacts lists: technical/support
list_technical_contacts = getIdPContacts(idp,'technical')
......@@ -178,48 +192,71 @@ def storeECCS2result(idp,check_results,idp_status):
str_technical_contacts = ','.join(list_technical_contacts)
str_support_contacts = ','.join(list_support_contacts)
# IdP-DisplayName;IdP-entityID;IdP-RegAuth;IdP-tech-ctc-1,IdP-tech-ctc-2;IdP-supp-ctc-1,IdP-supp-ctc-2;IdP-ECCS-Status;SP-wayfless-url-1;SP-check-time-1;SP-status-code-1;SP-result-1;SP-wayfless-url-2;SP-check-time-2;SP-status-code-2;SP-result-2
with open("%s/%s" % (ECCS2OUTPUTDIR,ECCS2RESULTSLOG), 'a') as f:
f.write("%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s\n" % (
getDisplayName(idp['displayname']), # IdP-DisplayName
idp['entityID'], # IdP-entityID
idp['registrationAuthority'], # IdP-RegAuth
str_technical_contacts, # IdP-TechCtcsList
str_support_contacts, # IdP-SuppCtcsList
idp_status, # IdP-ECCS-Status
check_results[0][1], # SP-wayfless-url-1
check_results[0][2], # SP-check-time-1
check_results[0][3], # SP-status-code-1
check_results[0][4], # SP-result-1
check_results[1][1], # SP-wayfless-url-2
check_results[1][2], # SP-check-time-2
check_results[1][3], # SP-status-code-2
check_results[1][4])) # SP-result-2
if (test is not True):
# IdP-DisplayName;IdP-entityID;IdP-RegAuth;IdP-tech-ctc-1,IdP-tech-ctc-2;IdP-supp-ctc-1,IdP-supp-ctc-2;IdP-ECCS-Status;SP-wayfless-url-1;SP-check-time-1;SP-status-code-1;SP-result-1;SP-wayfless-url-2;SP-check-time-2;SP-status-code-2;SP-result-2
with open("%s/%s" % (ECCS2OUTPUTDIR,ECCS2RESULTSLOG), 'a') as f:
f.write("%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s\n" % (
getDisplayName(idp['displayname']), # IdP-DisplayName
idp['entityID'], # IdP-entityID
idp['registrationAuthority'], # IdP-RegAuth
str_technical_contacts, # IdP-TechCtcsList
str_support_contacts, # IdP-SuppCtcsList
idp_status, # IdP-ECCS-Status
check_results[0][1], # SP-wayfless-url-1
check_results[0][2], # SP-check-time-1
check_results[0][3], # SP-status-code-1
check_results[0][4], # SP-result-1
check_results[1][1], # SP-wayfless-url-2
check_results[1][2], # SP-check-time-2
check_results[1][3], # SP-status-code-2
check_results[1][4])) # SP-result-2
else:
print("\nECCS2:")
print("%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s\n" % (
getDisplayName(idp['displayname']), # IdP-DisplayName
idp['entityID'], # IdP-entityID
idp['registrationAuthority'], # IdP-RegAuth
str_technical_contacts, # IdP-TechCtcsList
str_support_contacts, # IdP-SuppCtcsList
idp_status, # IdP-ECCS-Status
check_results[0][1], # SP-wayfless-url-1
check_results[0][2], # SP-check-time-1
check_results[0][3], # SP-status-code-1
check_results[0][4], # SP-result-1
check_results[1][1], # SP-wayfless-url-2
check_results[1][2], # SP-check-time-2
check_results[1][3], # SP-status-code-2
check_results[1][4])) # SP-result-2
# Check an IdP with 2 SPs.
def check(idp,sps):
def check(idp,sps,test):
check_results = []
for sp in sps:
result = checkIdP(sp,idp)
result = checkIdP(sp,idp,test)
if result is not None:
check_results.append(result)
if len(check_results) == 2:
with open("%s/%s" % (ECCS2OUTPUTDIR,ECCS2CHECKSLOG), 'a') as f:
for elem in check_results:
f.write(";".join(elem))
f.write("\n")
if (test is not True):
with open("%s/%s" % (ECCS2OUTPUTDIR,ECCS2CHECKSLOG), 'a') as f:
for elem in check_results:
f.write(";".join(elem))
f.write("\n")
else:
print("\nECCS2CHECKS:")
for elem in check_results:
print(";".join(elem))
# If all checks are 'OK', than the IdP consuming correctly eduGAIN Metadata.
if (check_results[0][4] == check_results[1][4] == "OK"):
storeECCS2result(idp,check_results,'OK')
storeECCS2result(idp,check_results,'OK',test)
elif (check_results[0][4] == check_results[1][4] == "DISABLED"):
storeECCS2result(idp,check_results,'DISABLED')
storeECCS2result(idp,check_results,'DISABLED',test)
else:
storeECCS2result(idp,check_results,'ERROR')
storeECCS2result(idp,check_results,'ERROR',test)
# MAIN
......@@ -229,6 +266,7 @@ if __name__=="__main__":
parser = argparse.ArgumentParser(description='Checks if the input IdP consumed correctly eduGAIN metadata by accessing two different SPs')
parser.add_argument("idpJson", metavar="idpJson", nargs=1, help="An IdP in Json format")
parser.add_argument("--test", action='store_true', help="Test the IdP without effects")
args = parser.parse_args()
......@@ -236,4 +274,4 @@ if __name__=="__main__":
Path("%s/%s" % (ECCS2HTMLDIR,DAY)).mkdir(parents=True, exist_ok=True) # Create dir needed to page_source content
check(idp,sps)
check(idp,sps,args.test)
......@@ -30,6 +30,9 @@ ECCS2LOGSDIR = "%s/logs" % ECCS2DIR
ECCS2STDOUT = "%s/stdout_%s.log" % (ECCS2LOGSDIR,DAY)
ECCS2STDERR = "%s/stderr_%s.log" % (ECCS2LOGSDIR,DAY)
ECCS2FAILEDCMD = "%s/failed-cmd.sh" % ECCS2LOGSDIR
ECCS2STDOUTIDP = "%s/stdout_idp_%s.log" % (ECCS2LOGSDIR,DAY)
ECCS2STDERRIDP = "%s/stderr_idp_%s.log" % (ECCS2LOGSDIR,DAY)
ECCS2FAILEDCMDIDP = "%s/failed-cmd-idp.sh" % ECCS2LOGSDIR
# Number of processes to run in parallel
ECCS2NUMPROCESSES = 25
......
#!/usr/bin/env python3.8
import argparse
import asyncio
import datetime
import eccs2properties
......@@ -7,7 +8,7 @@ import json
import time
from utils import getListFeds, getListEccsIdps, getRegAuthDict, getIdpList
from eccs2properties import ECCS2FAILEDCMD, ECCS2STDOUT, ECCS2STDERR, ECCS2DIR, ECCS2NUMPROCESSES, ECCS2LISTIDPSURL, ECCS2LISTIDPSFILE, ECCS2LISTFEDSURL, ECCS2LISTFEDSFILE
from eccs2properties import ECCS2FAILEDCMD, ECCS2FAILEDCMDIDP, ECCS2STDOUT, ECCS2STDERR, ECCS2STDOUTIDP, ECCS2STDERRIDP, ECCS2DIR, ECCS2NUMPROCESSES, ECCS2LISTIDPSURL, ECCS2LISTIDPSFILE, ECCS2LISTFEDSURL, ECCS2LISTFEDSFILE
from subprocess import Popen,PIPE
......@@ -67,6 +68,13 @@ async def main(cmd_list,stdout_file,stderr_file,cmd_file):
# MAIN
if __name__=="__main__":
parser = argparse.ArgumentParser(description='This script will call another script in parallel to check one or more IdP on the correct consuming of the eduGAIN metadata')
parser.add_argument("--idp", metavar="entityid", dest="idp_entityid", nargs=1, help="An IdP entityID")
parser.add_argument("--test", action='store_true', dest="test", help="Test without effects")
args = parser.parse_args()
start = time.time()
# Setup list_feds
......@@ -79,27 +87,47 @@ if __name__=="__main__":
dest_file = ECCS2LISTIDPSFILE
list_eccs_idps = getListEccsIdps(url, dest_file)
stdout_file = open(ECCS2STDOUT,"w+")
stderr_file = open(ECCS2STDERR,"w+")
cmd_file = open(ECCS2FAILEDCMD,"w+")
# Prepare input file for ECCS2
regAuthDict = getRegAuthDict(list_feds)
if (args.idp_entityid[0]):
stdout_file = open(ECCS2STDOUTIDP,"w+")
stderr_file = open(ECCS2STDERRIDP,"w+")
cmd_file = open(ECCS2FAILEDCMDIDP,"w+")
idpJsonList = getIdpList(list_eccs_idps,idp_entityid=args.idp_entityid[0])
for name,regAuth in regAuthDict.items():
idpJsonList = getIdpList(list_eccs_idps,regAuth)
if (args.test):
cmd = "%s/eccs2.py \'%s\' --test" % (ECCS2DIR,json.dumps(idpJsonList[0]))
print(cmd)
else:
cmd = "%s/eccs2.py \'%s\'" % (ECCS2DIR,json.dumps(idpJsonList[0]))
print(cmd)
num_idps = len(idpJsonList)
cmd_list = [["%s/eccs2.py \'%s\'" % (ECCS2DIR, json.dumps(idp))] for idp in idpJsonList]
proc_list = [cmd]
asyncio.run(main(proc_list,stdout_file,stderr_file,cmd_file))
proc_list = []
count = 0
while (count < num_idps):
cmd = "".join(cmd_list.pop())
proc_list.append(cmd)
count = count + 1
else:
stdout_file = open(ECCS2STDOUT,"w+")
stderr_file = open(ECCS2STDERR,"w+")
cmd_file = open(ECCS2FAILEDCMD,"w+")
# Prepare input file for ECCS2
regAuthDict = getRegAuthDict(list_feds)
for name,regAuth in regAuthDict.items():
idpJsonList = getIdpList(list_eccs_idps,regAuth)
num_idps = len(idpJsonList)
if (arg.test is not True):
cmd_list = [["%s/eccs2.py \'%s\'" % (ECCS2DIR, json.dumps(idp))] for idp in idpJsonList]
else:
cmd_list = [["%s/eccs2.py \'%s\' --test" % (ECCS2DIR, json.dumps(idp))] for idp in idpJsonList]
proc_list = []
count = 0
while (count < num_idps):
cmd = "".join(cmd_list.pop())
proc_list.append(cmd)
count = count + 1
asyncio.run(main(proc_list,stdout_file,stderr_file,cmd_file))
asyncio.run(main(proc_list,stdout_file,stderr_file,cmd_file))
end = time.time()
print("Time taken in hh:mm:ss - ", str(datetime.timedelta(seconds=end - start)))
......@@ -4,6 +4,7 @@ import json
import logging
import pathlib
import requests
import sys
from eccs2properties import ECCS2SELENIUMLOGDIR, ECCS2SELENIUMPAGELOADTIMEOUT, ECCS2SELENIUMSCRIPTTIMEOUT
from selenium import webdriver
......@@ -24,10 +25,13 @@ def getRegAuthDict(list_feds):
# Returns a list of IdP for a single federation
def getIdpList(list_eccs_idps,reg_auth=None):
def getIdpList(list_eccs_idps,reg_auth=None,idp_entityid=None):
fed_idp_list = []
for idp in list_eccs_idps:
if (reg_auth):
if (idp_entityid):
if (idp['entityID'] == idp_entityid):
fed_idp_list.append(idp)
elif (reg_auth):
if (idp['registrationAuthority'] == reg_auth):
fed_idp_list.append(idp)
else:
......@@ -127,8 +131,8 @@ def getDriver(fqdn_idp=None,debugSelenium=False):
else:
driver = webdriver.Chrome('chromedriver', options=chrome_options)
except WebDriverException as e:
print("!!! WEB DRIVER EXCEPTION - RUN AGAIN THE COMMAND!!!")
print (e.__str__())
sys.stderr.write("!!! WEB DRIVER EXCEPTION - RUN AGAIN THE COMMAND!!!")
sys.stderr.write(e.__str__())
return None
# Configure timeouts
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment