Skip to content
Snippets Groups Projects
Commit dc666b5f authored by Marco Malavolti's avatar Marco Malavolti
Browse files

Added timeout to requests

parent 87e0522b
Branches
Tags
No related merge requests found
......@@ -253,7 +253,6 @@ class EccsResults(Resource):
'status' : idp_checks_status
} )
elif (status):
app.logger.info("Results for status '%s'." % status)
if (status == idp_checks_status):
result.append(
{
......@@ -276,7 +275,6 @@ class EccsResults(Resource):
'status' : idp_checks_status
} )
else:
app.logger.info("All checks.")
result.append(
{
'displayName' : idp_displayname,
......
......@@ -2,24 +2,20 @@
import argparse
import json
import logging
import os
import signal
import re
import requests
import time
from datetime import date
from eccs2properties import ECCS2LOGSDIR, ECCS2RESULTSLOG, ECCS2CHECKSLOG, ECCS2SELENIUMLOGDIR, FEDS_BLACKLIST, IDPS_BLACKLIST, ECCS2SELENIUMPAGELOADTIMEOUT, ECCS2SELENIUMSCRIPTTIMEOUT, ECCS2SPS
from selenium import webdriver
from eccs2properties import ECCS2LOGSDIR, ECCS2RESULTSLOG, ECCS2CHECKSLOG, FEDS_BLACKLIST, IDPS_BLACKLIST, ECCS2SPS, ECCS2SELENIUMDEBUG
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import Select, WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.remote.remote_connection import LOGGER
from selenium.common.exceptions import NoSuchElementException, TimeoutException, WebDriverException, UnexpectedAlertPresentException
from urllib3.exceptions import MaxRetryError
from urllib3.util import parse_url
from utils import getLogger, getIdPContacts, getDriver
"""
This script use Selenium and Chromium to select the IdP to check from a Shibboleth SP with the Shibboleth Embedded Discovery Service installed and configured to answer to all eduGAIN IdPs.
......@@ -27,28 +23,15 @@ from urllib3.util import parse_url
The check will be passed when both SPs will return the authentication page of the IdP checked.
"""
#def checkIdP(sp,idp,logger,driver):
def checkIdP(sp,idp,logger):
# Chromedriver MUST be instanced here to avoid problems with SESSION
# Disable SSL requests warning messages
requests.packages.urllib3.disable_warnings()
# Configure Web-driver
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--ignore-certificate-errors')
driver = webdriver.Chrome('chromedriver', options=chrome_options)
# For DEBUG only (By default ChromeDriver logs only warnings/errors to stderr. When debugging issues, it is helpful to enable more verbose logging.)
#driver = webdriver.Chrome('chromedriver', options=chrome_options, service_args=['--verbose', '--log-path=%s/%s.log' % (ECCS2SELENIUMLOGDIR, parse_url(idp['entityID'])[2])])
# Configure timeouts
driver.set_page_load_timeout("%d" % ECCS2SELENIUMPAGELOADTIMEOUT)
driver.set_script_timeout("%d" % ECCS2SELENIUMSCRIPTTIMEOUT)
debugSelenium = ECCS2SELENIUMDEBUG
fqdn_idp = parse_url(idp['entityID'])[2]
driver = getDriver(fqdn_idp,debugSelenium)
# Configure Blacklists
federation_blacklist = FEDS_BLACKLIST
......@@ -68,8 +51,7 @@ def checkIdP(sp,idp,logger):
element = WebDriverWait(driver, 50).until(EC.presence_of_element_located((By.ID,"idpSelectInput")))
element.send_keys(idp['entityID'] + Keys.ENTER)
page_source = driver.page_source
status_code = requests.get(driver.current_url, verify=False).status_code
driver.quit()
samlrequest_url = driver.current_url
except TimeoutException as e:
logger.info("%s;%s;999;Timeout" % (idp['entityID'],sp))
......@@ -90,24 +72,13 @@ def checkIdP(sp,idp,logger):
except WebDriverException as e:
print("!!! WEB DRIVER EXCEPTION - RUN AGAIN THE COMMAND!!!")
print (e.__str__())
return None
except requests.exceptions.ConnectionError as e:
logger.info("%s;%s;000;ConnectionError" % (idp['entityID'],sp))
return "ERROR"
except requests.exceptions.TooManyRedirects as e:
logger.info("%s;%s;111;TooManyRedirects" % (idp['entityID'],sp))
return "ERROR"
except requests.exceptions.RequestException as e:
print ("!!! REQUESTS EXCEPTION !!!")
print (e.__str__())
print ("IdP: %s\nSP: %s" % (idp['entityID'],sp))
return None
except Exception as e:
print ("!!! EXCEPTION !!!")
print (e.__str__())
print ("IdP: %s\nSP: %s" % (idp['entityID'],sp))
return None
finally:
......@@ -122,6 +93,41 @@ def checkIdP(sp,idp,logger):
username_found = re.search(pattern_username,page_source, re.I)
password_found = re.search(pattern_password,page_source, re.I)
try:
headers = {'User-Agent':'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36'}
status_code = requests.get(samlrequest_url, headers=headers, verify=False, timeout=30).status_code
except requests.exceptions.ConnectionError as e:
#print("!!! REQUESTS STATUS CODE CONNECTION ERROR EXCEPTION !!!")
#print (e.__str__())
#print ("IdP: %s\nSP: %s" % (idp['entityID'],sp))
status_code = 000
except requests.exceptions.Timeout as e:
#print("!!! REQUESTS STATUS CODE TIMEOUT EXCEPTION !!!")
#print (e.__str__())
#print ("IdP: %s\nSP: %s" % (idp['entityID'],sp))
status_code = 111
except requests.exceptions.TooManyRedirects as e:
#print("!!! REQUESTS TOO MANY REDIRECTS EXCEPTION !!!")
#print (e.__str__())
#print ("IdP: %s\nSP: %s" % (idp['entityID'],sp))
status_code = 222
except requests.exceptions.RequestException as e:
print ("!!! REQUESTS EXCEPTION !!!")
print (e.__str__())
print ("IdP: %s\nSP: %s" % (idp['entityID'],sp))
status_code = 333
except Exception as e:
print ("!!! EXCEPTION !!!")
print (e.__str__())
print ("IdP: %s\nSP: %s" % (idp['entityID'],sp))
status_code = 555
if(metadata_not_found):
logger.info("%s;%s;%s;No-eduGAIN-Metadata" % (idp['entityID'],sp,status_code))
return "No-eduGAIN-Metadata"
......@@ -133,53 +139,7 @@ def checkIdP(sp,idp,logger):
return "OK"
# Use logger to produce files consumed by ECCS-2 API
def getLogger(filename, path=".", log_level="DEBUG"):
logger = logging.getLogger(filename)
ch = logging.FileHandler(path + '/' + filename,'a','utf-8')
if (log_level == "DEBUG"):
logger.setLevel(logging.DEBUG)
ch.setLevel(logging.DEBUG)
elif (log_level == "INFO"):
logger.setLevel(logging.INFO)
ch.setLevel(logging.INFO)
elif (log_level == "WARN"):
logger.setLevel(logging.WARN)
ch.setLevel(logging.WARN)
elif (log_level == "ERROR"):
logger.setLevel(logging.ERROR)
ch.setLevel(logging.ERROR)
elif (log_level == "CRITICAL"):
logger.setLevel(logging.CRITICAL)
ch.setLevel(logging.CRITICAL)
formatter = logging.Formatter('%(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
# Return a list of email address for a specific type of contact
def getIdPContacts(idp,contactType):
ctcList = []
for ctcType in idp['contacts']:
if (ctcType == contactType):
for ctc in idp['contacts'][contactType]:
if (ctc.get('emailOrPhone')):
if (ctc['emailOrPhone'].get('EmailAddress')):
ctcList.append(ctc['emailOrPhone']['EmailAddress'][0])
else:
ctcList.append('missing email')
else:
ctcList.append('missing email')
return ctcList
def checkIdp(idp,sps,eccs2log,eccs2checksLog):
def check(idp,sps,eccs2log,eccs2checksLog):
result = []
for sp in sps:
resultCheck = checkIdP(sp,idp,eccs2checksLog)
......@@ -236,8 +196,8 @@ def checkIdp(idp,sps,eccs2log,eccs2checksLog):
# MAIN
if __name__=="__main__":
eccs2log = getLogger(ECCS2RESULTSLOG, ECCS2LOGSDIR, "INFO")
eccs2checksLog = getLogger(ECCS2CHECKSLOG, ECCS2LOGSDIR, "INFO")
eccs2log = getLogger(ECCS2RESULTSLOG, ECCS2LOGSDIR, 'a', "INFO")
eccs2checksLog = getLogger(ECCS2CHECKSLOG, ECCS2LOGSDIR, 'a', "INFO")
sps = ECCS2SPS
......@@ -248,4 +208,4 @@ if __name__=="__main__":
idp = json.loads(args.idpJson[0])
checkIdp(idp,sps,eccs2log,eccs2checksLog)
check(idp,sps,eccs2log,eccs2checksLog)
......@@ -25,6 +25,9 @@ ECCS2STDERR = "%s/stderr.log" % ECCS2LOGSDIR
ECCS2SELENIUMPAGELOADTIMEOUT = 30
ECCS2SELENIUMSCRIPTTIMEOUT = 30
# Selenium Debug Enable/Disable
ECCS2SELENIUMDEBUG = True
# Number of processes to run in parallel
ECCS2NUMPROCESSES = 20
......
......@@ -4,62 +4,12 @@ import asyncio
import datetime
import eccs2properties
import json
import pathlib
import requests
import sys
import time
from utils import getListFeds, getListEccsIdps, getRegAuthDict, getIdpList
from eccs2properties import ECCS2STDOUT, ECCS2STDERR, ECCS2DIR, ECCS2NUMPROCESSES, ECCS2LISTIDPSURL, ECCS2LISTIDPSFILE, ECCS2LISTFEDSURL, ECCS2LISTFEDSFILE
from subprocess import Popen,PIPE
# Returns a Dict on "{ nameFed:reg_auth }"
def getRegAuthDict(list_feds):
regAuth_dict = {}
for key,value in list_feds.items():
name = value['name']
reg_auth = value['reg_auth']
regAuth_dict[name] = reg_auth
return regAuth_dict
# Returns a list of IdP for a single federation
def getIdpList(list_eccs_idps,reg_auth):
fed_idp_list = []
for idp in list_eccs_idps:
if (idp['registrationAuthority'] == reg_auth):
fed_idp_list.append(idp)
return fed_idp_list
# Returns a Python Dictionary
def getListFeds(url, dest_file):
# If file does not exists... download it into the dest_file
path = pathlib.Path(dest_file)
if(path.exists() == False):
with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f:
f.write(requests.get(url).text)
# then open it and work with local file
with open("%s" % (dest_file), mode="r", encoding='utf-8') as f:
return json.loads(f.read().replace("'", "'"))
# Returns a Python List
def getListEccsIdps(url, dest_file):
# If file does not exists... download it into the dest_file
path = pathlib.Path(dest_file)
if(path.exists() == False):
with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f:
f.write(requests.get(url).text)
# then open it and work with local file
with open("%s" % (dest_file), mode="r", encoding='utf-8') as f:
return json.loads(f.read().replace("'", "'"))
# Run Command
async def run(name,queue,stdout_file,stderr_file):
......
utils.py 0 → 100644
#!/usr/bin/env python3.8
import json
import logging
import pathlib
import requests
from eccs2properties import ECCS2SELENIUMLOGDIR, ECCS2SELENIUMPAGELOADTIMEOUT, ECCS2SELENIUMSCRIPTTIMEOUT
from selenium import webdriver
# Returns a Dict of "{ nameFed:reg_auth }"
def getRegAuthDict(list_feds):
regAuth_dict = {}
for key,value in list_feds.items():
name = value['name']
reg_auth = value['reg_auth']
regAuth_dict[name] = reg_auth
return regAuth_dict
# Returns a list of IdP for a single federation
def getIdpList(list_eccs_idps,reg_auth=None):
fed_idp_list = []
for idp in list_eccs_idps:
if (reg_auth):
if (idp['registrationAuthority'] == reg_auth):
fed_idp_list.append(idp)
else:
fed_idp_list.append(idp)
return fed_idp_list
# Returns a Python Dictionary
def getListFeds(url, dest_file):
# If file does not exists... download it into the dest_file
path = pathlib.Path(dest_file)
if(path.exists() == False):
with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f:
f.write(requests.get(url).text)
# then open it and work with local file
with open("%s" % (dest_file), mode="r", encoding='utf-8') as f:
return json.loads(f.read().replace("'", "'"))
# Download all eduGAIN IdPs from URL, store them on a local file and returns a Python List
def getListEccsIdps(url, dest_file):
# If file does not exists... download it into the dest_file
path = pathlib.Path(dest_file)
if(path.exists() == False):
with open("%s" % (dest_file), mode="w+", encoding='utf-8') as f:
f.write(requests.get(url).text)
# then open it and work with local file
with open("%s" % (dest_file), mode="r", encoding='utf-8') as f:
return json.loads(f.read().replace("'", "'"))
# Use logger to produce files consumed by ECCS-2 API
def getLogger(filename, path, mode, log_level="DEBUG"):
logger = logging.getLogger(filename)
ch = logging.FileHandler("%s/%s" % (path,filename), mode,'utf-8')
if (log_level == "DEBUG"):
logger.setLevel(logging.DEBUG)
ch.setLevel(logging.DEBUG)
elif (log_level == "INFO"):
logger.setLevel(logging.INFO)
ch.setLevel(logging.INFO)
elif (log_level == "WARN"):
logger.setLevel(logging.WARN)
ch.setLevel(logging.WARN)
elif (log_level == "ERROR"):
logger.setLevel(logging.ERROR)
ch.setLevel(logging.ERROR)
elif (log_level == "CRITICAL"):
logger.setLevel(logging.CRITICAL)
ch.setLevel(logging.CRITICAL)
formatter = logging.Formatter('%(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
# Return a list of email address for a specific type of contact
def getIdPContacts(idp,contactType):
ctcList = []
for ctcType in idp['contacts']:
if (ctcType == contactType):
for ctc in idp['contacts'][contactType]:
if (ctc.get('emailOrPhone')):
if (ctc['emailOrPhone'].get('EmailAddress')):
ctcList.append(ctc['emailOrPhone']['EmailAddress'][0])
else:
ctcList.append('missing email')
else:
ctcList.append('missing email')
return ctcList
def getDriver(fqdn_idp=None,debugSelenium=False):
# Disable SSL requests warning messages
requests.packages.urllib3.disable_warnings()
# Configure Web-driver
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--ignore-certificate-errors')
#chrome_options.add_argument('--start-maximized')
# For DEBUG only (By default ChromeDriver logs only warnings/errors to stderr.
# When debugging issues, it is helpful to enable more verbose logging.)
if (debugSelenium and fqdn_idp):
driver = webdriver.Chrome('chromedriver', options=chrome_options, service_args=['--verbose', '--log-path=%s/%s.log' % (ECCS2SELENIUMLOGDIR, fqdn_idp)])
else:
driver = webdriver.Chrome('chromedriver', options=chrome_options)
# Configure timeouts
driver.set_page_load_timeout("%d" % ECCS2SELENIUMPAGELOADTIMEOUT)
driver.set_script_timeout("%d" % ECCS2SELENIUMSCRIPTTIMEOUT)
return driver
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment