Skip to content
Snippets Groups Projects
api.py 11.50 KiB
#!/usr/bin/env python3.8

import logging
import re

from eccs2properties import DAY,ECCS2LOGSDIR
from flask import Flask, request, jsonify
from flask_restful import Resource, Api
from json import dumps, loads
from logging.handlers import RotatingFileHandler
from pathlib import PurePath

app = Flask(__name__)
api = Api(app)

def getLogger(filename,log_level="DEBUG",path="./"):

    logger = logging.getLogger(filename)
    ch = logging.FileHandler(path+filename,'w','utf-8')

    if (log_level == "DEBUG"):
       logger.setLevel(logging.DEBUG)
       ch.setLevel(logging.DEBUG)
    elif (log_level == "INFO"):
       logger.setLevel(logging.INFO)
       ch.setLevel(logging.INFO)
    elif (log_level == "WARN"):
       logger.setLevel(logging.WARN)
       ch.setLevel(logging.WARN)
    elif (log_level == "ERROR"):
       logger.setLevel(logging.ERROR)
       ch.setLevel(logging.ERROR)
    elif (log_level == "CRITICAL"):
       logger.setLevel(logging.CRITICAL)
       ch.setLevel(logging.CRITICAL)

    formatter = logging.Formatter('%(message)s')
    ch.setFormatter(formatter)
    logger.addHandler(ch)

    return logger


# Setup Chromium Webdriver
def setup():

   chrome_options = webdriver.ChromeOptions()
   chrome_options.add_argument('--headless')
   chrome_options.add_argument('--no-sandbox')

   driver = webdriver.Chrome('chromedriver', chrome_options=chrome_options)

   # Configure timeouts
   driver.set_page_load_timeout(30)
   driver.set_script_timeout(30)

   return driver

# /eccs2/test
class Test(Resource):
    def get(self):
        app.logger.info("Test Passed!")
        return {'test':'It Works!'}


class Checks(Resource):
    def get(self):
       app.logger.info("Request 'Checks'")

       file_path = "%s/eccs2checks_%s.log" % (ECCS2LOGSDIR,DAY) 
       date = PurePath(file_path).parts[-1].split('_')[1].split('.')[0]
       pretty = 0
       status = None 
       idp = None

       if 'date' in request.args:
          app.logger.info("'date' parameter inserted")
          date = request.args['date']
          file_path = "%s/eccs2checks_%s.log" % (ECCS2LOGSDIR,date)
       if 'pretty' in request.args:
          app.logger.info("'pretty' parameter inserted")
          pretty = request.args['pretty']
       if 'status' in request.args:
          app.logger.info("'status' parameter inserted")
          status = request.args['status']
       if 'idp' in request.args:
          app.logger.info("'idp' parameter inserted")
          idp = request.args['idp']
          app.logger.info(idp)

       fo = open(file_path,"r",encoding="utf-8")
       result = []
       lines = fo.readlines()

       for line in lines:
          check = line.split(";")

          check_idp = check[0]
          check_sp = check[1]
          check_status = check[2].rstrip("\n\r")

          if (idp and status):
              app.logger.info("Checks for 'idp' and 'status'.")
              if (idp == check_idp and status == check_status):
                 result.append( { 'sp' : check_sp,
                                  'idp' : check_idp,
                                  'status' : check_status,
                                  'date': date
                                } )
          elif (idp):
              #app.logger.info(re.search(".*."+idp+".*.", check_idp, re.IGNORECASE))
              #app.logger.info(check_idp))
              app.logger.info("Checks for Idp '%s'." % idp)
              if (re.search(".*."+idp+".*.", check_idp, re.IGNORECASE)):
                 result.append( { 'sp' : check_sp,
                                  'idp' : check_idp,
                                  'status' : check_status,
                                  'date': date
                                } )
          elif (status):
              app.logger.info("Check for the status '%s'." % status)
              if (status == check_status):
                 result.append( { 'sp' : check_sp,
                                  'idp' : check_idp,
                                  'status' : check_status,
                                  'date': date
                                } )
          else:
             app.logger.info("All checks.")
             result.append( { 'sp' : check_sp,
                              'idp' : check_idp,
                              'status' : check_status,
                              'date': date
                            } )

       if (pretty):
          pp_json = dumps(result, indent=4, sort_keys=True)
          return jsonify(pp_json)
       else:
          return jsonify(result)


# Build Email Addresses Link for ECCS2 Web Gui
def buildEmailAddress(listContacts):
    listCtcs = listContacts.split(",")
    hrefList = []

    for email in listCtcs:
       hrefList.append("<a href='%s'>%s</a>" % (email,email.replace('mailto:', '')))
 
    return hrefList

class EccsResults(Resource):
    def get(self):
       app.logger.info("Request 'EccsResults'")

       file_path = "%s/eccs2_%s.log" % (ECCS2LOGSDIR,DAY) 
       date = PurePath(file_path).parts[-1].split('_')[1].split('.')[0]
       pretty = 0
       status = None
       idp = None

       if 'date' in request.args:
          app.logger.info("'date' parameter inserted")
          file_path = "%s/eccs2_%s.log" % (ECCS2LOGSDIR,DAY)
          date = request.args['date']
       if 'pretty' in request.args:
          app.logger.info("'pretty' parameter inserted")
          pretty = request.args['pretty']
       if 'status' in request.args:
          app.logger.info("'status' parameter inserted")
          status = request.args['status']
       if 'idp' in request.args:
          app.logger.info("'idp' parameter inserted")
          idp = request.args['idp']
          app.logger.info(idp)

       fo = open(file_path,"r",encoding="utf-8")
       result = []
       lines = fo.readlines()

       for line in lines:
          # Line: 
          # IdP-DisplayName;                 check[0]
          # IdP-entityID;                    check[1]
          # IdP-RegAuth;                     check[2]
          # IdP-tech-ctc-1,IdP-tech-ctc-2;   check[3]
          # IdP-supp-ctc-1,IdP-supp-ctc-2;   check[4]
          # Status;                          check[5]
          # SP-entityID-1;                   check[6]
          # SP-status-1;                     check[7]
          # SP-entityID-2;                   check[8]
          # SP-status-2                      check[9]
          check = line.split(";")

          idp_displayname = check[0].rstrip("\n\r")
          idp_entity_id = check[1].rstrip("\n\r")
          idp_reg_auth = check[2].rstrip("\n\r")
          idp_tech_ctcs = check[3].rstrip("\n\r")
          idp_supp_ctcs = check[4].rstrip("\n\r")
          idp_checks_status = check[5].rstrip("\n\r")
          sp1_entity_id = check[6].rstrip("\n\r")
          sp1_check_status = check[7].rstrip("\n\r")
          sp2_entity_id = check[8].rstrip("\n\r")
          sp2_check_status = check[9].rstrip("\n\r")

          if (idp and status):
              app.logger.info("Results for the idp '%s' with status '%s'" % (idp, status))
              if (idp == idp_entity_id and status == idp_checks_status):
                 result.append( 
                    { 
                        'displayName' : idp_displayname,
                        'entityID' : idp_entity_id,
                        'registrationAuthority' : idp_reg_auth,
                        'contacts' : { 
                            'technical' : buildEmailAddress(idp_tech_ctcs),
                            'support' : buildEmailAddress(idp_supp_ctcs),
                        },
                        'date' : date,
                        'sp1' : {
                            'entityID' : sp1_entity_id,
                            'status' : sp1_check_status
                        },
                        'sp2' : {
                            'entityID' : sp2_entity_id,
                            'status' : sp2_check_status
                        },
                        'status' : idp_checks_status
                    } )
          elif (idp):
              #app.logger.info(re.search(".*."+idp+".*.", idp_entity_id, re.IGNORECASE))
              #app.logger.info(idp_entity_id))
              app.logger.info("Results for IdP '%s'." % idp)
              if (re.search(".*."+idp+".*.", idp_entity_id, re.IGNORECASE)):
                 result.append( 
                    { 
                        'displayName' : idp_displayname,
                        'entityID' : idp_entity_id,
                        'registrationAuthority' : idp_reg_auth,
                        'contacts' : { 
                            'technical' : buildEmailAddress(idp_tech_ctcs),
                            'support' : buildEmailAddress(idp_supp_ctcs),
                        },
                        'date' : date,
                        'sp1' : {
                            'entityID' : sp1_entity_id,
                            'status' : sp1_check_status
                        },
                        'sp2' : {
                            'entityID' : sp2_entity_id,
                            'status' : sp2_check_status
                        },
                        'status' : idp_checks_status
                    } )
          elif (status):
              if (status == idp_checks_status):
                 result.append( 
                    { 
                        'displayName' : idp_displayname,
                        'entityID' : idp_entity_id,
                        'registrationAuthority' : idp_reg_auth,
                        'contacts' : { 
                            'technical' : buildEmailAddress(idp_tech_ctcs),
                            'support' : buildEmailAddress(idp_supp_ctcs),
                        },
                        'date' : date,
                        'sp1' : {
                           'entityID' : sp1_entity_id,
                           'status' : sp1_check_status
                        },
                        'sp2' : {
                           'entityID' : sp2_entity_id,
                           'status' : sp2_check_status
                        },
                        'status' : idp_checks_status
                    } )
          else:
             result.append( 
             { 
                 'displayName' : idp_displayname,
                 'entityID' : idp_entity_id,
                 'registrationAuthority' : idp_reg_auth,
                 'contacts' : { 
                    'technical' : buildEmailAddress(idp_tech_ctcs),
                    'support' : buildEmailAddress(idp_supp_ctcs),
                 },
                 'date' : date,
                 'sp1' : {
                    'entityID' : sp1_entity_id,
                    'status' : sp1_check_status
                 },
                 'sp2' : {
                    'entityID' : sp2_entity_id,
                    'status' : sp2_check_status
                 },
                 'status' : idp_checks_status
             } )

       if (pretty):
          pp_json = dumps(result, indent=4, sort_keys=True)
          return jsonify(pp_json)
       else:
          return jsonify(result)

# Run check for a specific IDP
# <idpdisc:DiscoveryResponse Location>?entityID=<IDP_ENITIYID>&target=<DESTINATION_RESOURCE_URL> (tutto url encoded)
#class RunCheck(Resource):
#    def get(self):

api.add_resource(Test, '/eccs/test') # Route_1
api.add_resource(Checks, '/eccs/checks') # Route_2
api.add_resource(EccsResults, '/eccs/eccsresults') # Route_3
#api.add_resource(RunCheck, '/eccs/runcheck') # Route_4

if __name__ == '__main__':
   
   app.config['JSON_AS_ASCII'] = False
   app.logger = getLogger("eccs2api.log", "INFO", ECCS2LOGSDIR)
   app.run(port='5002')