Skip to content
Snippets Groups Projects
eccs.py 7.22 KiB
#!/usr/bin/env python3

import argparse
import json
import pdb
import sys
import utils
import eccs_properties as e_p

from pathlib import Path

"""
The check works with the SAML Request generated for three SP (one fake SP and two SP provided by different NREN) and successed if the IdP Login Page appears and contains the fields "username" and "password" for each 'good' SP and not for the fake SP.
It is possible to disable the check by eccs_properties IDP_DISABLE_DICT or by "robots.txt" put on the SAMLRequest endpoint root web dir.
"""

# Extract IdP DisplayName by fixing input string
def get_display_name(display_name):
    display_name_equal_splitted = display_name.split('==')
    for elem in display_name_equal_splitted:
        if "en" in elem:
           if "'" in elem:
              elem = elem.replace("'","'")
           if '"' in elem:
              elem = elem.replace('"','\\"')
           return elem.split(';', 1)[1]

# Append the result of the check on a file
def store_eccs_result(idp,sp,check_results,idp_status,test):

    # Build the contacts lists: technical/support
    list_technical_contacts = utils.get_idp_contacts(idp,'technical')
    list_support_contacts = utils.get_idp_contacts(idp,'support')

    str_technical_contacts = ','.join(list_technical_contacts)
    str_support_contacts = ','.join(list_support_contacts)

    if (test):
       sys.stdout.write("\nECCS:")
       sys.stdout.write('{"displayName":"%s","entityID":"%s","registrationAuthority":"%s","contacts":{"technical":"%s","support":"%s"},"status":"%s","sp1":{"entityID":"%s","checkTime":"%s","checkResult":"%s"},"sp2":{"entityID":"%s","checkTime":"%s","checkResult":"%s"},"sp3":{"entityID":"%s","checkTime":"%s","checkResult":"%s"}}\n' % (
                   get_display_name(idp['displayname']),  # IdP-DisplayName
                   idp['entityID'],                       # IdP-entityID
                   idp['registrationAuthority'],          # IdP-RegAuth
                   str_technical_contacts,                # IdP-TechCtcsList
                   str_support_contacts,                  # IdP-SuppCtcsList
                   idp_status,                            # IdP-ECCS-Status
                   check_results[0][1],                   # SP-entityID-1
                   check_results[0][2],                   # SP-check-time-1
                   check_results[0][3],                   # SP-check-result-1
                   check_results[1][1],                   # SP-entityID-2
                   check_results[1][2],                   # SP-check-time-2
                   check_results[1][3],                   # SP-check-result-3
                   check_results[2][1],                   # SP-entityID-3
                   check_results[2][2],                   # SP-check-time-3
                   check_results[2][3]                    # SP-check-result-3
                   )
       )  


    else:
       # IdP-DisplayName;IdP-entityID;IdP-RegAuth;IdP-tech-ctc-1,IdP-tech-ctc-2;IdP-supp-ctc-1,IdP-supp-ctc-2;IdP-ECCS-Status;SP-entityID-1;SP-check-time-1;SP-result-1;SP-entityID-2;SP-check-time-2;SP-result-2;SP-entityID-3;SP-check-time-3;SP-result-3
       with open(f"{e_p.ECCS_OUTPUTDIR}/{e_p.ECCS_RESULTSLOG}", 'a') as f:
            try:
               f.write('{"displayName":"%s","entityID":"%s","registrationAuthority":"%s","contacts":{"technical":"%s","support":"%s"},"status":"%s","sp1":{"entityID":"%s","checkTime":"%s","checkResult":"%s"},"sp2":{"entityID":"%s","checkTime":"%s","checkResult":"%s"},"sp3":{"entityID":"%s","checkTime":"%s","checkResult":"%s"}}\n' % (
                   get_display_name(idp['displayname']),  # IdP-DisplayName
                   idp['entityID'],                     # IdP-entityID
                   idp['registrationAuthority'],        # IdP-RegAuth
                   str_technical_contacts,              # IdP-TechCtcsList
                   str_support_contacts,                # IdP-SuppCtcsList
                   idp_status,                          # IdP-ECCS-Status
                   check_results[0][1],                 # SP-entityID-1
                   check_results[0][2],                 # SP-check-time-1
                   check_results[0][3],                 # SP-check-result-1
                   check_results[1][1],                 # SP-entityID-2
                   check_results[1][2],                 # SP-check-time-2
                   check_results[1][3],                 # SP-check-result-2
                   check_results[2][1],                 # SP-entityID-3
                   check_results[2][2],                 # SP-check-time-3
                   check_results[2][3]                  # SP-check-result-3
                   )
               )
            except IOError:
               sys.stderr.write(f"Failed writing result on output file for {idp['entityID']} with {utils.get_label(sp['entityID'])}.\n\nRun {e_p.ECCS_DIR}/runEccs.py --idp {idp['entityID']} --replace\n")
               sys.exit(1)

# Check an IdP with 2 SPs + Fake SP.
def check(idp,test):
    check_results = []

    for sp in e_p.ECCS_SPS:
        result = utils.check_idp_response_selenium(sp,idp,test)
        if (result):
           check_results.append(result)
        else:
           sys.stderr.write(f"\nCheck failed for {idp['entityID']} with {utils.get_label(sp['entityID'])}.\n\nRun {e_p.ECCS_DIR}/runEccs.py --idp {idp['entityID']} --replace\n")
           sys.exit(1)
 
    if (len(check_results) == len(e_p.ECCS_SPS)):
       check_result_sp1 = check_results[0][3]
       check_result_sp2 = check_results[1][3]
       check_result_sp3 = check_results[2][3]
       check_result_weberr1 = check_results[0][4]
       check_result_weberr2 = check_results[1][4]
       check_result_weberr3 = check_results[2][4]

       # If the result of both good SPs is 'OK'
       # and if the result of the fake SP is not 'OK'
       # than the IdP is consuming correctly eduGAIN Metadata.
       if ((check_result_sp1 == check_result_sp2 == "OK") and check_result_sp3 != "OK"):
          store_eccs_result(idp,sp,check_results,'OK',test)

       elif (check_result_sp1 == check_result_sp2 == "DISABLED"):
            store_eccs_result(idp,sp,check_results,'DISABLED',test)

       elif ((check_result_sp1 == check_result_sp2 == check_result_sp3 == "OK") or
              (check_result_sp1 == "Unable-To-Check" or check_result_sp2 == "Unable-To-Check")):
            store_eccs_result(idp,sp,check_results,'UNKNOWN',test)

       else:
            store_eccs_result(idp,sp,check_results,'ERROR',test)

# MAIN
if __name__=="__main__":

   parser = argparse.ArgumentParser(description='Checks if the input IdP consumed correctly eduGAIN metadata by accessing two different SPs')
   parser.add_argument("idpJson", metavar="idpJson", nargs=1, help="An IdP in Json format")
   parser.add_argument("--test", action='store_true', help="Test the IdP without effects")
   parser.add_argument("--replace", action='store_true', help="Check an IdP and replace the result")

   args = parser.parse_args()

   idp = json.loads(args.idpJson[0])

   Path(f"{e_p.ECCS_HTMLDIR}/{e_p.DAY}").mkdir(parents=True, exist_ok=True)   # Create dir needed to page_source content

   if (args.replace and not args.test):
      utils.delete_line_with_word(f"{e_p.ECCS_OUTPUTDIR}/{e_p.ECCS_RESULTSLOG}",idp['entityID'])

   check(idp,args.test)