-
Marco Malavolti authoredMarco Malavolti authored
api.py 7.60 KiB
#!/usr/bin/env python3.8
import json
import logging
import re
from eccs2properties import DAY, ECCS2LOGSDIR, ECCS2OUTPUTDIR, ECCS2LISTFEDSURL, ECCS2LISTFEDSFILE
from flask import Flask, request, jsonify
from flask_restful import Resource, Api
from utils import getLogger, getListFeds, getRegAuthDict
app = Flask(__name__)
api = Api(app)
### Functions
# Build Email Addresses Link for ECCS2 Web Gui
def buildEmailAddress(listContacts):
listCtcs = listContacts.split(",")
hrefList = []
for email in listCtcs:
hrefList.append("<a href='%s'>%s</a>" % (email,email.replace('mailto:', '')))
return hrefList
# Research the value of the research_item into ECCS2 output files
def existsInFile(file_path, value, research_item, eccsDataTable, date):
try:
with open(file_path,"r",encoding="utf-8") as fo:
lines = fo.readlines()
except FileNotFoundError as e:
if (eccsDataTable):
return ''
else:
return jsonify(error='FileNotFound: ECCS2 script has not been executed on %s yet' % date)
for line in lines:
aux = json.loads(line)
if (research_item == "entityID"):
if (value == aux['entityID']):
return True
if (research_item == "registrationAuthority"):
if (value == aux['registrationAuthority']):
return True
return False
### Classes
# /api/test
class Test(Resource):
def get(self):
return {'test':'It Works!'}
# /api/eccsresults
class EccsResults(Resource):
def get(self):
file_path = "%s/eccs2_%s.log" % (ECCS2OUTPUTDIR,DAY)
date = DAY
status = None
idp = None
reg_auth = None
eccsDataTable = False
if 'eccsdt' in request.args:
eccsDataTable = True
if 'date' in request.args:
date = request.args['date']
file_path = "%s/eccs2_%s.log" % (ECCS2OUTPUTDIR,date)
if 'status' in request.args:
status = request.args['status'].upper()
if (status not in ['OK','DISABLED','ERROR']):
return jsonify(error="Incorrect status provided. It can be 'ok','disabled','error'")
if 'idp' in request.args:
idp = request.args['idp']
if (not existsInFile(file_path, idp, "entityID", eccsDataTable, date)):
return jsonify(error="Identity Provider not found with the entityID: %s" % idp)
if 'reg_auth' in request.args:
reg_auth = request.args['reg_auth']
if (not existsInFile(file_path, reg_auth, "registrationAuthority", eccsDataTable, date)):
return jsonify(error="Identity Providers not found with the Registration Authority: %s" % reg_auth)
lines = []
results = []
try:
with open(file_path,"r",encoding="utf-8") as fo:
lines = fo.readlines()
except FileNotFoundError as e:
if (eccsDataTable):
return ''
else:
return jsonify(error='FileNotFound: ECCS2 script has not been executed on this day')
for line in lines:
# Strip the line feed and carriage return characters
line = line.rstrip("\n\r")
# Loads the json line into aux
aux = json.loads(line)
aux['date'] = date
# If the results are for ECCS2 DataTable, otherwise... remove only "mailto:" prefix
if (eccsDataTable):
aux['contacts']['technical'] = buildEmailAddress(aux['contacts']['technical'])
aux['contacts']['support'] = buildEmailAddress(aux['contacts']['support'])
else:
aux['contacts']['technical'] = aux['contacts']['technical'].replace("mailto:","")
aux['contacts']['support'] = aux['contacts']['support'].replace("mailto:","")
if (idp and status):
if (idp == aux['entityID'] and status == aux['status']):
results.append(aux)
elif (reg_auth and status):
if (reg_auth == aux['registrationAuthority'] and status == aux['status']):
results.append(aux)
elif (idp):
if (idp == aux['entityID']):
results.append(aux)
elif (reg_auth):
if (reg_auth == aux['registrationAuthority']):
results.append(aux)
elif (status):
if (status == aux['status']):
results.append(aux)
else:
results.append(aux)
return jsonify(results)
# /api/fedstats
class FedStats(Resource):
def get(self):
list_feds = getListFeds(ECCS2LISTFEDSURL, ECCS2LISTFEDSFILE)
regAuthDict = getRegAuthDict(list_feds)
file_path = "%s/eccs2_%s.log" % (ECCS2OUTPUTDIR,DAY)
date = DAY
reg_auth = None
eccsDataTable = False
if ('date' in request.args):
date = request.args['date']
file_path = "%s/eccs2_%s.log" % (ECCS2OUTPUTDIR,date)
if ('reg_auth' in request.args):
reg_auth = request.args['reg_auth']
if (not existsInFile(file_path, reg_auth, "registrationAuthority", eccsDataTable, date)):
return jsonify(error="Registration Authority not found")
lines = []
results = []
try:
with open(file_path,"r",encoding="utf-8") as fo:
lines = fo.readlines()
except FileNotFoundError as e:
if (eccsDataTable):
return ''
else:
return jsonify(error='FileNotFound: ECCS2 script has not been executed on %s yet' % date)
if (reg_auth):
resultDict = {'date': date, 'registrationAuthority': reg_auth, 'OK': 0, 'ERROR': 0, 'DISABLED': 0}
for line in lines:
# Strip the line feed and carriage return characters
line = line.rstrip("\n\r")
# Loads the json line into aux
aux = json.loads(line)
if (aux['registrationAuthority'] == reg_auth):
if (aux['status'] == "OK"):
resultDict['OK'] = resultDict['OK'] + 1
if (aux['status'] == "ERROR"):
resultDict['ERROR'] = resultDict['ERROR'] + 1
if (aux['status'] == "DISABLED"):
resultDict['DISABLED'] = resultDict['DISABLED'] + 1
results.append(resultDict)
return jsonify(results)
else:
for name,regAuth in regAuthDict.items():
resultDict = {'date': date, 'registrationAuthority': regAuth, 'OK': 0, 'ERROR': 0, 'DISABLED': 0}
for line in lines:
# Strip the line feed and carriage return characters
line = line.rstrip("\n\r")
# Loads the json line into aux
aux = json.loads(line)
if (regAuth == aux['registrationAuthority']):
if (aux['status'] == "OK"):
resultDict['OK'] = resultDict['OK'] + 1
if (aux['status'] == "ERROR"):
resultDict['ERROR'] = resultDict['ERROR'] + 1
if (aux['status'] == "DISABLED"):
resultDict['DISABLED'] = resultDict['DISABLED'] + 1
results.append(resultDict)
return jsonify(results)
# Routes
api.add_resource(Test, '/test') # Route_1
api.add_resource(EccsResults, '/eccsresults') # Route_2
api.add_resource(FedStats, '/fedstats') # Route_3
if __name__ == '__main__':
# Useful only for API development Server
#app.config['JSON_AS_ASCII'] = True
#app.logger.removeHandler(default_handler)
#app.logger = getLogger("eccs2api.log", ECCS2LOGSDIR, "w", "INFO")
app.run(port='5002')