-
Marco Malavolti authoredMarco Malavolti authored
api.py 11.50 KiB
#!/usr/bin/env python3.8
import logging
import re
from eccs2properties import DAY,ECCS2LOGSDIR
from flask import Flask, request, jsonify
from flask_restful import Resource, Api
from json import dumps, loads
from logging.handlers import RotatingFileHandler
from pathlib import PurePath
app = Flask(__name__)
api = Api(app)
def getLogger(filename,log_level="DEBUG",path="./"):
logger = logging.getLogger(filename)
ch = logging.FileHandler(path+filename,'w','utf-8')
if (log_level == "DEBUG"):
logger.setLevel(logging.DEBUG)
ch.setLevel(logging.DEBUG)
elif (log_level == "INFO"):
logger.setLevel(logging.INFO)
ch.setLevel(logging.INFO)
elif (log_level == "WARN"):
logger.setLevel(logging.WARN)
ch.setLevel(logging.WARN)
elif (log_level == "ERROR"):
logger.setLevel(logging.ERROR)
ch.setLevel(logging.ERROR)
elif (log_level == "CRITICAL"):
logger.setLevel(logging.CRITICAL)
ch.setLevel(logging.CRITICAL)
formatter = logging.Formatter('%(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
# Setup Chromium Webdriver
def setup():
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
driver = webdriver.Chrome('chromedriver', chrome_options=chrome_options)
# Configure timeouts
driver.set_page_load_timeout(30)
driver.set_script_timeout(30)
return driver
# /eccs2/test
class Test(Resource):
def get(self):
app.logger.info("Test Passed!")
return {'test':'It Works!'}
class Checks(Resource):
def get(self):
app.logger.info("Request 'Checks'")
file_path = "%s/eccs2checks_%s.log" % (ECCS2LOGSDIR,DAY)
date = PurePath(file_path).parts[-1].split('_')[1].split('.')[0]
pretty = 0
status = None
idp = None
if 'date' in request.args:
app.logger.info("'date' parameter inserted")
date = request.args['date']
file_path = "%s/eccs2checks_%s.log" % (ECCS2LOGSDIR,date)
if 'pretty' in request.args:
app.logger.info("'pretty' parameter inserted")
pretty = request.args['pretty']
if 'status' in request.args:
app.logger.info("'status' parameter inserted")
status = request.args['status']
if 'idp' in request.args:
app.logger.info("'idp' parameter inserted")
idp = request.args['idp']
app.logger.info(idp)
fo = open(file_path,"r",encoding="utf-8")
result = []
lines = fo.readlines()
for line in lines:
check = line.split(";")
check_idp = check[0]
check_sp = check[1]
check_status = check[2].rstrip("\n\r")
if (idp and status):
app.logger.info("Checks for 'idp' and 'status'.")
if (idp == check_idp and status == check_status):
result.append( { 'sp' : check_sp,
'idp' : check_idp,
'status' : check_status,
'date': date
} )
elif (idp):
#app.logger.info(re.search(".*."+idp+".*.", check_idp, re.IGNORECASE))
#app.logger.info(check_idp))
app.logger.info("Checks for Idp '%s'." % idp)
if (re.search(".*."+idp+".*.", check_idp, re.IGNORECASE)):
result.append( { 'sp' : check_sp,
'idp' : check_idp,
'status' : check_status,
'date': date
} )
elif (status):
app.logger.info("Check for the status '%s'." % status)
if (status == check_status):
result.append( { 'sp' : check_sp,
'idp' : check_idp,
'status' : check_status,
'date': date
} )
else:
app.logger.info("All checks.")
result.append( { 'sp' : check_sp,
'idp' : check_idp,
'status' : check_status,
'date': date
} )
if (pretty):
pp_json = dumps(result, indent=4, sort_keys=True)
return jsonify(pp_json)
else:
return jsonify(result)
# Build Email Addresses Link for ECCS2 Web Gui
def buildEmailAddress(listContacts):
listCtcs = listContacts.split(",")
hrefList = []
for email in listCtcs:
hrefList.append("<a href='%s'>%s</a>" % (email,email.replace('mailto:', '')))
return hrefList
class EccsResults(Resource):
def get(self):
app.logger.info("Request 'EccsResults'")
file_path = "%s/eccs2_%s.log" % (ECCS2LOGSDIR,DAY)
date = PurePath(file_path).parts[-1].split('_')[1].split('.')[0]
pretty = 0
status = None
idp = None
if 'date' in request.args:
app.logger.info("'date' parameter inserted")
file_path = "%s/eccs2_%s.log" % (ECCS2LOGSDIR,DAY)
date = request.args['date']
if 'pretty' in request.args:
app.logger.info("'pretty' parameter inserted")
pretty = request.args['pretty']
if 'status' in request.args:
app.logger.info("'status' parameter inserted")
status = request.args['status']
if 'idp' in request.args:
app.logger.info("'idp' parameter inserted")
idp = request.args['idp']
app.logger.info(idp)
fo = open(file_path,"r",encoding="utf-8")
result = []
lines = fo.readlines()
for line in lines:
# Line:
# IdP-DisplayName; check[0]
# IdP-entityID; check[1]
# IdP-RegAuth; check[2]
# IdP-tech-ctc-1,IdP-tech-ctc-2; check[3]
# IdP-supp-ctc-1,IdP-supp-ctc-2; check[4]
# Status; check[5]
# SP-entityID-1; check[6]
# SP-status-1; check[7]
# SP-entityID-2; check[8]
# SP-status-2 check[9]
check = line.split(";")
idp_displayname = check[0].rstrip("\n\r")
idp_entity_id = check[1].rstrip("\n\r")
idp_reg_auth = check[2].rstrip("\n\r")
idp_tech_ctcs = check[3].rstrip("\n\r")
idp_supp_ctcs = check[4].rstrip("\n\r")
idp_checks_status = check[5].rstrip("\n\r")
sp1_entity_id = check[6].rstrip("\n\r")
sp1_check_status = check[7].rstrip("\n\r")
sp2_entity_id = check[8].rstrip("\n\r")
sp2_check_status = check[9].rstrip("\n\r")
if (idp and status):
app.logger.info("Results for the idp '%s' with status '%s'" % (idp, status))
if (idp == idp_entity_id and status == idp_checks_status):
result.append(
{
'displayName' : idp_displayname,
'entityID' : idp_entity_id,
'registrationAuthority' : idp_reg_auth,
'contacts' : {
'technical' : buildEmailAddress(idp_tech_ctcs),
'support' : buildEmailAddress(idp_supp_ctcs),
},
'date' : date,
'sp1' : {
'entityID' : sp1_entity_id,
'status' : sp1_check_status
},
'sp2' : {
'entityID' : sp2_entity_id,
'status' : sp2_check_status
},
'status' : idp_checks_status
} )
elif (idp):
#app.logger.info(re.search(".*."+idp+".*.", idp_entity_id, re.IGNORECASE))
#app.logger.info(idp_entity_id))
app.logger.info("Results for IdP '%s'." % idp)
if (re.search(".*."+idp+".*.", idp_entity_id, re.IGNORECASE)):
result.append(
{
'displayName' : idp_displayname,
'entityID' : idp_entity_id,
'registrationAuthority' : idp_reg_auth,
'contacts' : {
'technical' : buildEmailAddress(idp_tech_ctcs),
'support' : buildEmailAddress(idp_supp_ctcs),
},
'date' : date,
'sp1' : {
'entityID' : sp1_entity_id,
'status' : sp1_check_status
},
'sp2' : {
'entityID' : sp2_entity_id,
'status' : sp2_check_status
},
'status' : idp_checks_status
} )
elif (status):
if (status == idp_checks_status):
result.append(
{
'displayName' : idp_displayname,
'entityID' : idp_entity_id,
'registrationAuthority' : idp_reg_auth,
'contacts' : {
'technical' : buildEmailAddress(idp_tech_ctcs),
'support' : buildEmailAddress(idp_supp_ctcs),
},
'date' : date,
'sp1' : {
'entityID' : sp1_entity_id,
'status' : sp1_check_status
},
'sp2' : {
'entityID' : sp2_entity_id,
'status' : sp2_check_status
},
'status' : idp_checks_status
} )
else:
result.append(
{
'displayName' : idp_displayname,
'entityID' : idp_entity_id,
'registrationAuthority' : idp_reg_auth,
'contacts' : {
'technical' : buildEmailAddress(idp_tech_ctcs),
'support' : buildEmailAddress(idp_supp_ctcs),
},
'date' : date,
'sp1' : {
'entityID' : sp1_entity_id,
'status' : sp1_check_status
},
'sp2' : {
'entityID' : sp2_entity_id,
'status' : sp2_check_status
},
'status' : idp_checks_status
} )
if (pretty):
pp_json = dumps(result, indent=4, sort_keys=True)
return jsonify(pp_json)
else:
return jsonify(result)
# Run check for a specific IDP
# <idpdisc:DiscoveryResponse Location>?entityID=<IDP_ENITIYID>&target=<DESTINATION_RESOURCE_URL> (tutto url encoded)
#class RunCheck(Resource):
# def get(self):
api.add_resource(Test, '/eccs/test') # Route_1
api.add_resource(Checks, '/eccs/checks') # Route_2
api.add_resource(EccsResults, '/eccs/eccsresults') # Route_3
#api.add_resource(RunCheck, '/eccs/runcheck') # Route_4
if __name__ == '__main__':
app.config['JSON_AS_ASCII'] = False
app.logger = getLogger("eccs2api.log", "INFO", ECCS2LOGSDIR)
app.run(port='5002')