From 2f5c6345842cb48a2e96bee1a8553c68bc51abb0 Mon Sep 17 00:00:00 2001 From: Marco Malavolti <marco.malavolti@gmail.com> Date: Fri, 29 Apr 2022 16:32:52 +0200 Subject: [PATCH] Enriched 'utils.py' with RST comments --- runEccs.py | 4 +- utils.py | 139 +++++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 106 insertions(+), 37 deletions(-) diff --git a/runEccs.py b/runEccs.py index ce96ebc..b6a1883 100755 --- a/runEccs.py +++ b/runEccs.py @@ -81,10 +81,10 @@ if __name__=="__main__": start = time.time() # Setup list_feds - list_feds = utils.get_list_feds(e_p.ECCS_LISTFEDSURL, e_p.ECCS_LISTFEDSFILE) + list_feds = utils.get_list_from_url(e_p.ECCS_LISTFEDSURL, e_p.ECCS_LISTFEDSFILE) # Setup list_eccs_idps - list_eccs_idps = utils.get_list_eccs_idps(e_p.ECCS_LISTIDPSURL, e_p.ECCS_LISTIDPSFILE) + list_eccs_idps = utils.get_list_from_url(e_p.ECCS_LISTIDPSURL, e_p.ECCS_LISTIDPSFILE) if (args.idp_entityid): stdout_file = open(e_p.ECCS_STDOUTIDP,"w+") diff --git a/utils.py b/utils.py index ae90075..aadb29c 100644 --- a/utils.py +++ b/utils.py @@ -27,21 +27,35 @@ from urllib3.util import parse_url from urllib.parse import urlparse, urlencode def sha1(idp_entity_id): + """Returns the SHA1 of the input value + + :param idp_entity_id: input value + :return: SHA1 of the entityID + :rtype: string + """ import hashlib result = hashlib.sha1(idp_entity_id.encode()) return result.hexdigest() - -# Return a label useful for a filename def get_label(url_or_urn): + """Returns a label usable for filenames + + :param url_or_urn: input value + :return: a label + :rtype: string + """ if url_or_urn.startswith('http'): return parse_url(url_or_urn)[2] else: return parse_url(url_or_urn)[4].lstrip('/') - -# Returns a Dict of "{ nameFed:reg_auth }" def get_reg_auth_dict(list_feds): + """Returns a dictionary of Federation and their registration authorities + + :param list_feds: Python list containing all federations + :return: a dictionary of '{ nameFed:reg_auth }' + :rtype: dict + """ regAuth_dict = {} for key,value in list_feds.items(): @@ -52,9 +66,20 @@ def get_reg_auth_dict(list_feds): return regAuth_dict - -# Returns a list of IdP for a single federation def get_idp_list(list_eccs_idps,reg_auth=None,idp_entityid=None): + """Returns a list of IdPs or a list of only one IdP + + - Only one if 'idp_entityid' parameter is used + - All IdPs of a specific Federation if 'reg_auth' parameter is used + - All eduGAIN IdPs if no parameter is used + + :param list_eccs_idps: Python list containing all IdPs provided to ECCS + :param reg_auth: a string containing a registration authority URL + :param idp_entityid: a string containing an IdP entityID + :return: a list of IdPs + :rtype: list + """ + fed_idp_list = [] for idp in list_eccs_idps: if (idp_entityid): @@ -68,39 +93,34 @@ def get_idp_list(list_eccs_idps,reg_auth=None,idp_entityid=None): return fed_idp_list +def get_list_from_url(url, dest_file): + """Downloads the content of a web page retrieved by its url into a directory and then use the saved file to return a Python list of the converted version of its content -# Download all eduGAIN Federations from URL, store them on a local file and returns a Python Dictionary -def get_list_feds(url, dest_file): - from pathlib import Path - - # If file does not exists... download it into the dest_file - path = pathlib.Path(dest_file) - if(path.exists() == False): - with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f: - f.write(requests.get(url).text) - - # then open it and work with local file - with open("%s" % (dest_file), mode="r", encoding='utf-8') as f: - return json.loads(f.read().replace("'","'")) - - -# Download all eduGAIN IdPs from URL, store them on a local file and returns a Python List -def get_list_eccs_idps(url, dest_file): + :param url: URL to download + :param des_file: destination file + :return: the resulting Python list + :rtype: list + """ from pathlib import Path - # If file does not exists... download it into the dest_file path = pathlib.Path(dest_file) if(path.exists() == False): with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f: f.write(requests.get(url).text) - # then open it and work with local file with open("%s" % (dest_file), mode="r", encoding='utf-8') as f: return json.loads(f.read().replace("'","'")) - -# Use logger to produce files consumed by ECCS API def get_logger(path, filename, mode="a", log_level="DEBUG"): + """Returns a logger used by ECCS API Development Server to create its log file + + :param path: directory path of the files + :param filename: filename of the new file created + :param mode: a(append),w(write),r(read) + :param log_level: log level to use (DEBUG,INFO,WARN,ERROR,CRITICAL) + :return: a logger + :rtype: logger object + """ logger = logging.getLogger(__name__) ch = logging.handlers.RotatingFileHandler(f"{path}/{filename}", mode, 0, 5, 'utf-8') @@ -127,9 +147,15 @@ def get_logger(path, filename, mode="a", log_level="DEBUG"): return logger - -# Return a list of email address for a specific type of contact def get_idp_contacts(idp,contactType): + """Returns a list of email address for a specific type of contact + + :param idp: dictionary containing the IdP info + :param contactType: type of contact to consider + :return: a python list containing all contacts email addresses + :rtype: list + """ + ctcList = [] for ctcType in idp['contacts']: if (ctcType == contactType): @@ -143,9 +169,17 @@ def get_idp_contacts(idp,contactType): ctcList.append('missing email') return ctcList - -# Write the login page source code into its file def store_page_source(page_source,idp,sp,test): + """Writes the login page source into a specific file + + :param page_souce: content to write into a file + :param idp: idp owner of the page source + :param sp: sp who has been used + :param test: flag needed to decide if write on the console instead of on the file + :return: True or False + :rtype: boolean + """ + if (test): sys.stdout.write(f"{page_source}") return True @@ -159,8 +193,16 @@ def store_page_source(page_source,idp,sp,test): return False -# Get the Google Chrom Selenium Driver +# Get the Google Chrome Selenium Driver def get_driver_selenium(idp=None,sp=None,debugSelenium=False): + """Returns driver needed to perform the ECCS check + + :param idp: the idp need to be checked + :param sp: the sp used to check the idp + :param debugSelenium: a flag needed to enable a more verbose logging + :return: selenium driver + :rtype: object + """ # Configure Web-driver # https://peter.sh/experiments/chromium-command-line-switches/ @@ -196,6 +238,13 @@ def get_driver_selenium(idp=None,sp=None,debugSelenium=False): return driver def follow_all_nested_iframes(driver): + """Returns the page source reached by following all the first <iframe> found on the web pages + + :param driver: the selenium driver + :return: the IdP login page source reached + :rtype: string + """ + try: while (driver.find_element(By.XPATH,'//iframe')): driver.switch_to.frame(0) @@ -203,8 +252,8 @@ def follow_all_nested_iframes(driver): return driver.page_source def deflate_and_base64_encode(string_val): - """ - Deflates and the base64 encodes a string + """Deflates and the base64 encodes a string + :param string_val: The string to deflate and encode :return: The deflated and encoded string """ @@ -212,8 +261,16 @@ def deflate_and_base64_encode(string_val): string_val = string_val.encode('utf-8') return base64.b64encode(zlib.compress(string_val)[2:-4]) - def generate_login_url(sp_entity_id, sp_http_post_acs_location, idp_http_redirect_sso_location): + """Returns a login url needed to get the IdP Login page for an SP + + :param sp_entity_id: the SP entityID + :param sp_http_post_acs_location: the SP HTTP-POST AssertionConsumerService URL + :param idp_http_redirect_sso_location: the IDP HTTP-Redirect SSO Location + :return: a login url built upon a SAML Authn Request + :rtype: string + """ + authn_request_id = f'_{str(uuid.uuid4()).replace("-", "")}' issue_instant = str(datetime.datetime.now(datetime.timezone.utc).isoformat(timespec='seconds')).replace('+00:00', 'Z') authn_request = '<samlp:AuthnRequest xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol" ' \ @@ -233,6 +290,12 @@ def generate_login_url(sp_entity_id, sp_http_post_acs_location, idp_http_redirec # ECCS Check made by Selenium def check_idp_response_selenium(sp,idp,test): + """Performs the ECCS check on an IdP + + :param sp: the SP used to test the IDP + :param idp: the IdP to test + :param test: a flag to perform a check without changes + """ # Common variables fqdn_idp = get_label(idp['Location']) @@ -423,6 +486,12 @@ def check_idp_response_selenium(sp,idp,test): driver.quit() def delete_line_with_word(filepath,word): + """Deletes a line from a file by providing a word + + :param filepath: file where the line has to be removed + :param word: the word that identify the line to remove + """ + import os.path if os.path.isfile(filepath): -- GitLab