#!/usr/bin/env python3 import json import logging import re from eccs_properties import DAY, ECCS_LOGSDIR, ECCS_OUTPUTDIR, ECCS_LISTFEDSURL, ECCS_LISTFEDSFILE, ECCS_RESULTSLOG from flask import Flask, request, jsonify from flask_restful import Resource, Api from utils import get_logger, get_list_feds, get_reg_auth_dict app = Flask(__name__) api = Api(app) ### Functions # Build Email Addresses Link for ECCS Web Gui def buildEmailAddress(listContacts): listCtcs = listContacts.split(",") hrefList = [] for email in listCtcs: hrefList.append(f"<a href='{email}'>{email.replace('mailto:', '')}</a>") return hrefList # Research the value of the research_item into ECCS output files def existsInFile(file_path, value, research_item, eccsDataTable, date): try: with open(file_path,"r",encoding="utf-8") as fo: lines = fo.readlines() except FileNotFoundError as e: if (eccsDataTable): return '' else: return jsonify(error=f'FileNotFound: ECCS script has not been executed on {date} yet') for line in lines: aux = json.loads(line) if (research_item == "entityID"): if (value == aux['entityID']): return True if (research_item == "registrationAuthority"): if (value == aux['registrationAuthority']): return True return False def getSimpleDict(aux): simpleDict = { "date": aux['date'], "displayName": aux['displayName'], "entityID": aux['entityID'], "registrationAuthority": aux['registrationAuthority'], "status": aux['status'], "checkResult": [aux["sp1"]["checkResult"],aux["sp2"]["checkResult"]] } return simpleDict ### Classes # /api/test class Test(Resource): def get(self): return {'test':'It Works!'} # /api/eccsresults class EccsResults(Resource): def get(self): file_path = f"{ECCS_OUTPUTDIR}/{ECCS_RESULTSLOG}" date = DAY status = None idp = None reg_auth = None eccsDataTable = False simple = False check_result = None if 'eccsdt' in request.args: eccsDataTable = True if 'date' in request.args: date = request.args['date'] file_path = f"{ECCS_OUTPUTDIR}/eccs_{date}.log" if 'status' in request.args: status = request.args['status'].upper() if (status not in ['OK','DISABLED','ERROR']): return jsonify(error="Incorrect status provided. It can be 'ok','disabled','error'") if 'idp' in request.args: idp = request.args['idp'] if (not existsInFile(file_path, idp, "entityID", eccsDataTable, date)): return jsonify(error=f"Identity Provider not found with the entityID: {idp}") if 'reg_auth' in request.args: reg_auth = request.args['reg_auth'] if (not existsInFile(file_path, reg_auth, "registrationAuthority", eccsDataTable, date)): return jsonify(error=f"Identity Providers not found with the Registration Authority: {reg_auth}") if 'format' in request.args: if 'simple' == request.args['format']: simple = True if 'check_result' in request.args: check_result = request.args['check_result'] if (check_result not in ['OK','Timeout','Invalid-Form','Connection-Error','No-eduGAIN-Metadata','SSL-Error','DISABLED']): return jsonify(error="Incorrect check_result value provided. It can be 'OK','Timeout','Invalid-Form','Connection-Error','No-eduGAIN-Metadata','SSL-Error' ok 'DISABLED'") lines = [] results = [] try: with open(file_path,"r",encoding="utf-8") as fo: lines = fo.readlines() except FileNotFoundError as e: if (eccsDataTable): return '' else: return jsonify(error=f'FileNotFound: ECCS script has not been executed on {date}') for line in lines: # Strip the line feed and carriage return characters line = line.rstrip("\n\r") # Loads the json line into aux aux = json.loads(line) aux['date'] = date # If the results are for ECCS DataTable, otherwise... remove only "mailto:" prefix if (eccsDataTable): aux['contacts']['technical'] = buildEmailAddress(aux['contacts']['technical']) aux['contacts']['support'] = buildEmailAddress(aux['contacts']['support']) else: aux['contacts']['technical'] = aux['contacts']['technical'].replace("mailto:","") aux['contacts']['support'] = aux['contacts']['support'].replace("mailto:","") if (idp and status): if (idp == aux['entityID'] and status == aux['status']): if (simple): auxSimple = getSimpleDict(aux) results.append(auxSimple) else: results.append(aux) elif (reg_auth and status): if (reg_auth == aux['registrationAuthority'] and status == aux['status']): if (simple): auxsimple = getSimpleDict(aux) results.append(auxsimple) else: results.append(aux) elif (reg_auth and check_result): if (reg_auth == aux['registrationAuthority'] and (check_result == aux['sp1']['checkResult'] or check_result == aux['sp2']['checkResult'])): if (simple): auxsimple = getSimpleDict(aux) results.append(auxsimple) else: results.append(aux) elif (idp): if (idp == aux['entityID']): if (simple): auxSimple = getSimpleDict(aux) results.append(auxSimple) else: results.append(aux) elif (reg_auth): if (reg_auth == aux['registrationAuthority']): if (simple): auxSimple = getSimpleDict(aux) results.append(auxSimple) else: results.append(aux) elif (status): if (status == aux['status']): if (simple): auxSimple = getSimpleDict(aux) results.append(auxSimple) else: results.append(aux) elif (check_result): if (check_result == aux['sp1']['checkResult'] or check_result == aux['sp2']['checkResult']): if (simple): auxSimple = getSimpleDict(aux) results.append(auxSimple) else: results.append(aux) elif (simple): auxSimple = getSimpleDict(aux) results.append(auxSimple) else: results.append(aux) return jsonify(results) # /api/fedstats class FedStats(Resource): def get(self): list_feds = get_list_feds(ECCS_LISTFEDSURL, ECCS_LISTFEDSFILE) regAuthDict = get_reg_auth_dict(list_feds) file_path = f"{ECCS_OUTPUTDIR}/{ECCS_RESULTSLOG}" date = DAY reg_auth = None eccsDataTable = False if ('date' in request.args): date = request.args['date'] file_path = f"{ECCS_OUTPUTDIR}/eccs_{date}.log" if ('reg_auth' in request.args): reg_auth = request.args['reg_auth'] if (not existsInFile(file_path, reg_auth, "registrationAuthority", eccsDataTable, date)): return jsonify(error="Registration Authority not found") lines = [] results = [] try: with open(file_path,"r",encoding="utf-8") as fo: lines = fo.readlines() except FileNotFoundError as e: if (eccsDataTable): return '' else: return jsonify(error=f'FileNotFound: ECCS script has not been executed on {date} yet') if (reg_auth): resultDict = {'date': date, 'registrationAuthority': reg_auth, 'OK': 0, 'ERROR': 0, 'DISABLED': 0} for line in lines: # Strip the line feed and carriage return characters line = line.rstrip("\n\r") # Loads the json line into aux aux = json.loads(line) if (aux['registrationAuthority'] == reg_auth): if (aux['status'] == "OK"): resultDict['OK'] = resultDict['OK'] + 1 if (aux['status'] == "ERROR"): resultDict['ERROR'] = resultDict['ERROR'] + 1 if (aux['status'] == "DISABLED"): resultDict['DISABLED'] = resultDict['DISABLED'] + 1 results.append(resultDict) return jsonify(results) else: for name,regAuth in regAuthDict.items(): resultDict = {'date': date, 'registrationAuthority': regAuth, 'OK': 0, 'ERROR': 0, 'DISABLED': 0} for line in lines: # Strip the line feed and carriage return characters line = line.rstrip("\n\r") # Loads the json line into aux aux = json.loads(line) if (regAuth == aux['registrationAuthority']): if (aux['status'] == "OK"): resultDict['OK'] = resultDict['OK'] + 1 if (aux['status'] == "ERROR"): resultDict['ERROR'] = resultDict['ERROR'] + 1 if (aux['status'] == "DISABLED"): resultDict['DISABLED'] = resultDict['DISABLED'] + 1 results.append(resultDict) return jsonify(results) # /api/ class Help(Resource): def get(self): return { 'ECCS JSON Interface': 'https://wiki.geant.org/display/eduGAIN/eduGAIN+Connectivity+Check+2#eduGAINConnectivityCheck2-JSONinterface' } # Routes api.add_resource(Test, '/test') # Route_1 api.add_resource(EccsResults, '/eccsresults') # Route_2 api.add_resource(FedStats, '/fedstats') # Route_3 api.add_resource(Help, '/') # Route_4 if __name__ == '__main__': # Useful only for API development Server #app.config['JSON_AS_ASCII'] = True #app.logger.removeHandler(default_handler) #app.logger = get_logger("eccs_api.log", ECCS_LOGSDIR, "w", "INFO") app.run(port='5002')