Skip to content
Snippets Groups Projects
Commit 2f5c6345 authored by Marco Malavolti's avatar Marco Malavolti
Browse files

Enriched 'utils.py' with RST comments

parent 9db2cd7d
Branches
Tags
No related merge requests found
...@@ -81,10 +81,10 @@ if __name__=="__main__": ...@@ -81,10 +81,10 @@ if __name__=="__main__":
start = time.time() start = time.time()
# Setup list_feds # Setup list_feds
list_feds = utils.get_list_feds(e_p.ECCS_LISTFEDSURL, e_p.ECCS_LISTFEDSFILE) list_feds = utils.get_list_from_url(e_p.ECCS_LISTFEDSURL, e_p.ECCS_LISTFEDSFILE)
# Setup list_eccs_idps # Setup list_eccs_idps
list_eccs_idps = utils.get_list_eccs_idps(e_p.ECCS_LISTIDPSURL, e_p.ECCS_LISTIDPSFILE) list_eccs_idps = utils.get_list_from_url(e_p.ECCS_LISTIDPSURL, e_p.ECCS_LISTIDPSFILE)
if (args.idp_entityid): if (args.idp_entityid):
stdout_file = open(e_p.ECCS_STDOUTIDP,"w+") stdout_file = open(e_p.ECCS_STDOUTIDP,"w+")
......
...@@ -27,21 +27,35 @@ from urllib3.util import parse_url ...@@ -27,21 +27,35 @@ from urllib3.util import parse_url
from urllib.parse import urlparse, urlencode from urllib.parse import urlparse, urlencode
def sha1(idp_entity_id): def sha1(idp_entity_id):
"""Returns the SHA1 of the input value
:param idp_entity_id: input value
:return: SHA1 of the entityID
:rtype: string
"""
import hashlib import hashlib
result = hashlib.sha1(idp_entity_id.encode()) result = hashlib.sha1(idp_entity_id.encode())
return result.hexdigest() return result.hexdigest()
# Return a label useful for a filename
def get_label(url_or_urn): def get_label(url_or_urn):
"""Returns a label usable for filenames
:param url_or_urn: input value
:return: a label
:rtype: string
"""
if url_or_urn.startswith('http'): if url_or_urn.startswith('http'):
return parse_url(url_or_urn)[2] return parse_url(url_or_urn)[2]
else: else:
return parse_url(url_or_urn)[4].lstrip('/') return parse_url(url_or_urn)[4].lstrip('/')
# Returns a Dict of "{ nameFed:reg_auth }"
def get_reg_auth_dict(list_feds): def get_reg_auth_dict(list_feds):
"""Returns a dictionary of Federation and their registration authorities
:param list_feds: Python list containing all federations
:return: a dictionary of '{ nameFed:reg_auth }'
:rtype: dict
"""
regAuth_dict = {} regAuth_dict = {}
for key,value in list_feds.items(): for key,value in list_feds.items():
...@@ -52,9 +66,20 @@ def get_reg_auth_dict(list_feds): ...@@ -52,9 +66,20 @@ def get_reg_auth_dict(list_feds):
return regAuth_dict return regAuth_dict
# Returns a list of IdP for a single federation
def get_idp_list(list_eccs_idps,reg_auth=None,idp_entityid=None): def get_idp_list(list_eccs_idps,reg_auth=None,idp_entityid=None):
"""Returns a list of IdPs or a list of only one IdP
- Only one if 'idp_entityid' parameter is used
- All IdPs of a specific Federation if 'reg_auth' parameter is used
- All eduGAIN IdPs if no parameter is used
:param list_eccs_idps: Python list containing all IdPs provided to ECCS
:param reg_auth: a string containing a registration authority URL
:param idp_entityid: a string containing an IdP entityID
:return: a list of IdPs
:rtype: list
"""
fed_idp_list = [] fed_idp_list = []
for idp in list_eccs_idps: for idp in list_eccs_idps:
if (idp_entityid): if (idp_entityid):
...@@ -68,39 +93,34 @@ def get_idp_list(list_eccs_idps,reg_auth=None,idp_entityid=None): ...@@ -68,39 +93,34 @@ def get_idp_list(list_eccs_idps,reg_auth=None,idp_entityid=None):
return fed_idp_list return fed_idp_list
def get_list_from_url(url, dest_file):
"""Downloads the content of a web page retrieved by its url into a directory and then use the saved file to return a Python list of the converted version of its content
# Download all eduGAIN Federations from URL, store them on a local file and returns a Python Dictionary :param url: URL to download
def get_list_feds(url, dest_file): :param des_file: destination file
from pathlib import Path :return: the resulting Python list
:rtype: list
# If file does not exists... download it into the dest_file """
path = pathlib.Path(dest_file)
if(path.exists() == False):
with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f:
f.write(requests.get(url).text)
# then open it and work with local file
with open("%s" % (dest_file), mode="r", encoding='utf-8') as f:
return json.loads(f.read().replace("'","'"))
# Download all eduGAIN IdPs from URL, store them on a local file and returns a Python List
def get_list_eccs_idps(url, dest_file):
from pathlib import Path from pathlib import Path
# If file does not exists... download it into the dest_file
path = pathlib.Path(dest_file) path = pathlib.Path(dest_file)
if(path.exists() == False): if(path.exists() == False):
with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f: with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f:
f.write(requests.get(url).text) f.write(requests.get(url).text)
# then open it and work with local file
with open("%s" % (dest_file), mode="r", encoding='utf-8') as f: with open("%s" % (dest_file), mode="r", encoding='utf-8') as f:
return json.loads(f.read().replace("'","'")) return json.loads(f.read().replace("'","'"))
# Use logger to produce files consumed by ECCS API
def get_logger(path, filename, mode="a", log_level="DEBUG"): def get_logger(path, filename, mode="a", log_level="DEBUG"):
"""Returns a logger used by ECCS API Development Server to create its log file
:param path: directory path of the files
:param filename: filename of the new file created
:param mode: a(append),w(write),r(read)
:param log_level: log level to use (DEBUG,INFO,WARN,ERROR,CRITICAL)
:return: a logger
:rtype: logger object
"""
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
ch = logging.handlers.RotatingFileHandler(f"{path}/{filename}", mode, 0, 5, 'utf-8') ch = logging.handlers.RotatingFileHandler(f"{path}/{filename}", mode, 0, 5, 'utf-8')
...@@ -127,9 +147,15 @@ def get_logger(path, filename, mode="a", log_level="DEBUG"): ...@@ -127,9 +147,15 @@ def get_logger(path, filename, mode="a", log_level="DEBUG"):
return logger return logger
# Return a list of email address for a specific type of contact
def get_idp_contacts(idp,contactType): def get_idp_contacts(idp,contactType):
"""Returns a list of email address for a specific type of contact
:param idp: dictionary containing the IdP info
:param contactType: type of contact to consider
:return: a python list containing all contacts email addresses
:rtype: list
"""
ctcList = [] ctcList = []
for ctcType in idp['contacts']: for ctcType in idp['contacts']:
if (ctcType == contactType): if (ctcType == contactType):
...@@ -143,9 +169,17 @@ def get_idp_contacts(idp,contactType): ...@@ -143,9 +169,17 @@ def get_idp_contacts(idp,contactType):
ctcList.append('missing email') ctcList.append('missing email')
return ctcList return ctcList
# Write the login page source code into its file
def store_page_source(page_source,idp,sp,test): def store_page_source(page_source,idp,sp,test):
"""Writes the login page source into a specific file
:param page_souce: content to write into a file
:param idp: idp owner of the page source
:param sp: sp who has been used
:param test: flag needed to decide if write on the console instead of on the file
:return: True or False
:rtype: boolean
"""
if (test): if (test):
sys.stdout.write(f"{page_source}") sys.stdout.write(f"{page_source}")
return True return True
...@@ -159,8 +193,16 @@ def store_page_source(page_source,idp,sp,test): ...@@ -159,8 +193,16 @@ def store_page_source(page_source,idp,sp,test):
return False return False
# Get the Google Chrom Selenium Driver # Get the Google Chrome Selenium Driver
def get_driver_selenium(idp=None,sp=None,debugSelenium=False): def get_driver_selenium(idp=None,sp=None,debugSelenium=False):
"""Returns driver needed to perform the ECCS check
:param idp: the idp need to be checked
:param sp: the sp used to check the idp
:param debugSelenium: a flag needed to enable a more verbose logging
:return: selenium driver
:rtype: object
"""
# Configure Web-driver # Configure Web-driver
# https://peter.sh/experiments/chromium-command-line-switches/ # https://peter.sh/experiments/chromium-command-line-switches/
...@@ -196,6 +238,13 @@ def get_driver_selenium(idp=None,sp=None,debugSelenium=False): ...@@ -196,6 +238,13 @@ def get_driver_selenium(idp=None,sp=None,debugSelenium=False):
return driver return driver
def follow_all_nested_iframes(driver): def follow_all_nested_iframes(driver):
"""Returns the page source reached by following all the first <iframe> found on the web pages
:param driver: the selenium driver
:return: the IdP login page source reached
:rtype: string
"""
try: try:
while (driver.find_element(By.XPATH,'//iframe')): while (driver.find_element(By.XPATH,'//iframe')):
driver.switch_to.frame(0) driver.switch_to.frame(0)
...@@ -203,8 +252,8 @@ def follow_all_nested_iframes(driver): ...@@ -203,8 +252,8 @@ def follow_all_nested_iframes(driver):
return driver.page_source return driver.page_source
def deflate_and_base64_encode(string_val): def deflate_and_base64_encode(string_val):
""" """Deflates and the base64 encodes a string
Deflates and the base64 encodes a string
:param string_val: The string to deflate and encode :param string_val: The string to deflate and encode
:return: The deflated and encoded string :return: The deflated and encoded string
""" """
...@@ -212,8 +261,16 @@ def deflate_and_base64_encode(string_val): ...@@ -212,8 +261,16 @@ def deflate_and_base64_encode(string_val):
string_val = string_val.encode('utf-8') string_val = string_val.encode('utf-8')
return base64.b64encode(zlib.compress(string_val)[2:-4]) return base64.b64encode(zlib.compress(string_val)[2:-4])
def generate_login_url(sp_entity_id, sp_http_post_acs_location, idp_http_redirect_sso_location): def generate_login_url(sp_entity_id, sp_http_post_acs_location, idp_http_redirect_sso_location):
"""Returns a login url needed to get the IdP Login page for an SP
:param sp_entity_id: the SP entityID
:param sp_http_post_acs_location: the SP HTTP-POST AssertionConsumerService URL
:param idp_http_redirect_sso_location: the IDP HTTP-Redirect SSO Location
:return: a login url built upon a SAML Authn Request
:rtype: string
"""
authn_request_id = f'_{str(uuid.uuid4()).replace("-", "")}' authn_request_id = f'_{str(uuid.uuid4()).replace("-", "")}'
issue_instant = str(datetime.datetime.now(datetime.timezone.utc).isoformat(timespec='seconds')).replace('+00:00', 'Z') issue_instant = str(datetime.datetime.now(datetime.timezone.utc).isoformat(timespec='seconds')).replace('+00:00', 'Z')
authn_request = '<samlp:AuthnRequest xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol" ' \ authn_request = '<samlp:AuthnRequest xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol" ' \
...@@ -233,6 +290,12 @@ def generate_login_url(sp_entity_id, sp_http_post_acs_location, idp_http_redirec ...@@ -233,6 +290,12 @@ def generate_login_url(sp_entity_id, sp_http_post_acs_location, idp_http_redirec
# ECCS Check made by Selenium # ECCS Check made by Selenium
def check_idp_response_selenium(sp,idp,test): def check_idp_response_selenium(sp,idp,test):
"""Performs the ECCS check on an IdP
:param sp: the SP used to test the IDP
:param idp: the IdP to test
:param test: a flag to perform a check without changes
"""
# Common variables # Common variables
fqdn_idp = get_label(idp['Location']) fqdn_idp = get_label(idp['Location'])
...@@ -423,6 +486,12 @@ def check_idp_response_selenium(sp,idp,test): ...@@ -423,6 +486,12 @@ def check_idp_response_selenium(sp,idp,test):
driver.quit() driver.quit()
def delete_line_with_word(filepath,word): def delete_line_with_word(filepath,word):
"""Deletes a line from a file by providing a word
:param filepath: file where the line has to be removed
:param word: the word that identify the line to remove
"""
import os.path import os.path
if os.path.isfile(filepath): if os.path.isfile(filepath):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment