-
Saket Agrahari authoredSaket Agrahari authored
common.py 24.09 KiB
"""
Utilities used by multiple route blueprints.
"""
import functools
import logging
from itertools import product
from compendium_v2 import db
from compendium_v2.db.presentation_model_enums import CarryMechanism, ConnectionMethod, ConnectivityCoverage, \
UserCategory
from compendium_v2.db.presentation_models import NREN, AlienWave, BudgetEntry, Capacity, CentralProcurement, \
CertificateProviders, ChargingStructure, CommercialChargingLevel, CommercialConnectivity, ConnectedProportion, \
ConnectionCarrier, ConnectivityGrowth, ConnectivityLevel, ConnectivityLoad, \
CrisisExercises, \
DarkFibreInstalled, ECProject, EOSCListings, ExternalConnections, FibreLight, FundingSource, InstitutionURLs, \
MonitoringTools, NRENService, NetworkAutomation, \
NetworkFunctionVirtualisation, NetworkMapUrls, NonREPeers, NrenStaff, OpsAutomation, ParentOrganization, \
PassiveMonitoring, PertTeam, \
DarkFibreLease, Policy, RemoteCampuses, SecurityControls, ServiceManagement, ServiceUserTypes, SiemVendors, \
Standards, SubOrganization, \
TrafficRatio, TrafficStatistics, TrafficVolume, WeatherMap, PreviewYear
from flask import Response, request
from flask_login import current_user # type: ignore
from sqlalchemy import select, distinct, and_
logger = logging.getLogger(__name__)
def require_accepts_json(f):
"""
used as a route handler decorator to return an error
unless the request allows responses with type "application/json"
:param f: the function to be decorated
:return: the decorated function
"""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
# TODO: use best_match to disallow */* ...?
if not request.accept_mimetypes.accept_json:
return Response(
response='response will be json',
status=406,
mimetype='text/html')
return f(*args, **kwargs)
return decorated_function
def init_background_task():
pass
def after_request(response):
"""
Generic function to do additional logging of requests & responses.
:param response:
:return:
"""
if response.status_code != 200:
try:
data = response.data.decode('utf-8')
except Exception:
# never expected to happen, but we don't want any failures here
logging.exception('INTERNAL DECODING ERROR')
data = 'decoding error (see logs)'
logger.warning('"%s %s" "%s" %s' % (
request.method,
request.path,
data,
str(response.status_code)))
return response
def get_data(table_class):
select_statement = select(table_class).join(NREN).order_by(NREN.name.asc(), table_class.year.desc())
is_admin = (not current_user.is_anonymous) and current_user.is_admin
preview = is_admin and request.args.get('preview') is not None
if not preview:
select_statement = select_statement.where(table_class.year.not_in(select(PreviewYear.year)))
return db.session.scalars(select_statement)
def fetch_data_from_table(table_model, extract_function):
distinct_years = (
db.session.query(distinct(table_model.year))
.order_by(table_model.year.asc())
.all()
)
result = []
for distinct_year in distinct_years:
year = distinct_year[0]
entries = (
db.session.query(NREN, year, table_model)
.outerjoin(table_model, and_(NREN.id == table_model.nren_id, table_model.year == year))
.order_by(NREN.name.asc())
.all()
)
result.extend(entries)
if result is not None:
if table_model == ExternalConnections or table_model == RemoteCampuses:
return [item for entry in result for item in extract_function(entry[0], entry[1], entry[2])]
return [extract_function(entry[0], entry[1], entry[2]) for entry in result]
return []
def fetch_data_from_table_with_user_category(table_model, extract_function):
distinct_years = (
db.session.query(distinct(table_model.year))
.order_by(table_model.year.asc())
.all()
)
result = []
for years, user_category in product(distinct_years, UserCategory):
year = years[0]
entries = (
db.session.query(NREN, year, table_model)
.outerjoin(table_model, and_(NREN.id == table_model.nren_id, table_model.year == year,
user_category == table_model.user_category))
.order_by(NREN.name.asc())
.all()
)
for nren, year, entry_model in entries:
if entry_model is None: # Placeholder entry
data = {
'nren': nren.name,
'nren_country': nren.country,
'year': year,
'user_category': user_category.value,
}
else:
data = extract_function(nren, year, entry_model)
result.append(data)
return result
def extract_model_data(nren, year, entry_model):
if isinstance(entry_model, DarkFibreLease):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(year),
'iru_or_lease': entry_model.iru_or_lease,
'fibre_length_in_country': entry_model.fibre_length_in_country,
'fibre_length_outside_country': entry_model.fibre_length_outside_country,
'iru_duration': entry_model.iru_duration
}
if isinstance(entry_model, DarkFibreInstalled):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'installed': entry_model.installed,
'fibre_length_in_country': entry_model.fibre_length_in_country,
}
if isinstance(entry_model, FibreLight):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'light_description': entry_model.light_description
}
if isinstance(entry_model, NetworkMapUrls):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'urls': ' , '.join(entry_model.urls)
}
if isinstance(entry_model, MonitoringTools):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'tool_descriptions': ', '.join(entry_model.tool_descriptions),
'netflow_processing_description': entry_model.netflow_processing_description
}
if isinstance(entry_model, PassiveMonitoring):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'monitoring': entry_model.monitoring,
'method': entry_model.method.value if entry_model.method is not None else None,
}
if isinstance(entry_model, TrafficStatistics):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'traffic_statistics': entry_model.traffic_statistics,
'urls': ' , '.join(entry_model.urls)
}
if isinstance(entry_model, SiemVendors):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'vendor_names': ', '.join(entry_model.vendor_names)
}
if isinstance(entry_model, CertificateProviders):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'provider_names': ', '.join(entry_model.provider_names)
}
if isinstance(entry_model, WeatherMap):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'weather_map': entry_model.weather_map,
'url': entry_model.url
}
if isinstance(entry_model, PertTeam):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'pert_team': entry_model.pert_team.value if entry_model.pert_team is not None else None,
}
if isinstance(entry_model, AlienWave):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'alien_wave_third_pary': entry_model.alien_wave_third_pary.value
if entry_model.alien_wave_third_pary is not None else None,
'nr_of_alien_wave_third_party_services': entry_model.nr_of_alien_wave_third_party_services,
'alien_wave_internal': entry_model.alien_wave_internal
}
if isinstance(entry_model, Capacity):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'largest_link_capacity': entry_model.largest_link_capacity,
'typical_backbone_capacity': entry_model.typical_backbone_capacity,
}
if isinstance(entry_model, NonREPeers):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'nr_of_non_r_and_e_peers': entry_model.nr_of_non_r_and_e_peers
}
if isinstance(entry_model, TrafficRatio):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'r_and_e_percentage': entry_model.r_and_e_percentage,
'commodity_percentage': entry_model.commodity_percentage,
}
if isinstance(entry_model, OpsAutomation):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'ops_automation': entry_model.ops_automation.value if entry_model.ops_automation is not None else None,
'ops_automation_specifics': entry_model.ops_automation_specifics,
}
if isinstance(entry_model, NetworkFunctionVirtualisation):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'nfv': entry_model.nfv.value if entry_model.nfv is not None else None,
'nfv_specifics': ', '.join(entry_model.nfv_specifics)
}
if isinstance(entry_model, NetworkAutomation):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'network_automation': entry_model.network_automation.value
if entry_model.network_automation is not None else None,
'network_automation_specifics': ', '.join(entry_model.network_automation_specifics),
}
if isinstance(entry_model, CrisisExercises):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'exercise_descriptions': ', '.join(entry_model.exercise_descriptions),
}
if isinstance(entry_model, SecurityControls):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'security_control_descriptions': ', '.join(entry_model.security_control_descriptions),
}
if isinstance(entry_model, CentralProcurement):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'central_procurement': entry_model.central_procurement,
'amount': entry_model.amount,
}
if isinstance(entry_model, ServiceManagement):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'service_management_framework': entry_model.service_management_framework,
'service_level_targets': entry_model.service_management_framework
}
if isinstance(entry_model, ServiceUserTypes):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'user_category': entry_model.user_category.value,
'service_category': entry_model.service_category.value
if entry_model.service_category is not None else None,
}
if isinstance(entry_model, EOSCListings):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'service_names': ', '.join(entry_model.service_names),
}
if isinstance(entry_model, Standards):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'audits': entry_model.audits,
'audit_specifics': entry_model.audit_specifics,
'business_continuity_plans': entry_model.business_continuity_plans,
'business_continuity_plans_specifics': entry_model.business_continuity_plans_specifics,
'crisis_management_procedure': entry_model.crisis_management_procedure,
}
if isinstance(entry_model, TrafficVolume):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': entry_model.year,
'to_customers': float(entry_model.to_customers),
'from_customers': float(entry_model.from_customers),
'to_external': float(entry_model.to_external),
'from_external': float(entry_model.from_external)
}
if isinstance(entry_model, NrenStaff):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': entry_model.year,
'permanent_fte': float(entry_model.permanent_fte),
'subcontracted_fte': float(entry_model.subcontracted_fte),
'technical_fte': float(entry_model.technical_fte),
'non_technical_fte': float(entry_model.non_technical_fte)
}
if isinstance(entry_model, Policy):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': entry_model.year,
'strategic_plan': entry_model.strategic_plan,
'environmental': entry_model.environmental,
'equal_opportunity': entry_model.equal_opportunity,
'connectivity': entry_model.connectivity,
'acceptable_use': entry_model.acceptable_use,
'privacy_notice': entry_model.privacy_notice,
'data_protection': entry_model.data_protection,
'gender_equality': entry_model.gender_equality
}
if isinstance(entry_model, SubOrganization):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': entry_model.year,
'name': entry_model.organization,
'role': entry_model.role
}
if isinstance(entry_model, ParentOrganization):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': entry_model.year,
'name': entry_model.organization
}
if isinstance(entry_model, NRENService):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': entry_model.year,
'product_name': entry_model.product_name,
'additional_information': entry_model.additional_information,
'official_description': entry_model.official_description,
'service_name': entry_model.service.name,
'service_category': entry_model.service.category.value,
'service_description': entry_model.service.description
}
if isinstance(entry_model, InstitutionURLs):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': entry_model.year,
'urls': ' , '.join(entry_model.urls)
}
if isinstance(entry_model, ECProject):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': entry_model.year,
'project': entry_model.project
}
if isinstance(entry_model, CommercialConnectivity):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': entry_model.year,
'commercial_r_and_e': entry_model.commercial_r_and_e.value
if entry_model.commercial_r_and_e is not None else None,
'commercial_general': entry_model.commercial_general.value
if entry_model.commercial_general is not None else None,
'commercial_collaboration': entry_model.commercial_collaboration.value
if entry_model.commercial_collaboration is not None else None,
'commercial_service_provider': entry_model.commercial_service_provider.value
if entry_model.commercial_service_provider is not None else None,
'university_spin_off': entry_model.university_spin_off.value
if entry_model.university_spin_off is not None else None,
}
if isinstance(entry_model, ConnectivityGrowth):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': entry_model.year,
'user_category': entry_model.user_category.value,
'growth': entry_model.growth,
}
if isinstance(entry_model, ConnectivityLoad):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': entry_model.year,
'user_category': entry_model.user_category.value,
'average_load_from_institutions': entry_model.average_load_from_institutions,
'average_load_to_institutions': entry_model.average_load_to_institutions,
'peak_load_from_institutions': entry_model.peak_load_from_institutions,
'peak_load_to_institutions': entry_model.peak_load_to_institutions,
}
if isinstance(entry_model, ConnectionCarrier):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': entry_model.year,
'user_category': entry_model.user_category.value,
'carry_mechanism': entry_model.carry_mechanism.value
if entry_model.carry_mechanism is not None else CarryMechanism.other.value,
}
if isinstance(entry_model, ConnectivityLevel):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': entry_model.year,
'user_category': entry_model.user_category.value,
'typical_speed': entry_model.typical_speed,
'highest_speed': entry_model.highest_speed,
'highest_speed_proportion': entry_model.highest_speed_proportion,
}
if isinstance(entry_model, ConnectedProportion):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': entry_model.year,
'user_category': entry_model.user_category.value
if entry_model.user_category is not None else None,
'coverage': entry_model.coverage.value
if entry_model.coverage is not None else ConnectivityCoverage.unsure.value,
'number_connected': entry_model.number_connected,
'market_share': entry_model.market_share,
'users_served': entry_model.users_served
}
if isinstance(entry_model, CommercialChargingLevel):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': entry_model.year,
'collaboration': entry_model.collaboration.value
if entry_model.collaboration is not None else None,
'service_supplier': entry_model.service_supplier.value
if entry_model.service_supplier is not None else None,
'direct_peering': entry_model.direct_peering.value
if entry_model.direct_peering is not None else None,
}
if isinstance(entry_model, ChargingStructure):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'fee_type': entry_model.fee_type.value
if entry_model.fee_type is not None else None,
}
if isinstance(entry_model, FundingSource):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': entry_model.year,
'client_institutions': float(entry_model.client_institutions),
'european_funding': float(entry_model.european_funding),
'gov_public_bodies': float(entry_model.gov_public_bodies),
'commercial': float(entry_model.commercial),
'other': float(entry_model.other)
}
if isinstance(entry_model, BudgetEntry):
return {
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'budget': float("{:.0f}".format(entry_model.budget * 1000000)),
'year': entry_model.year,
}
else:
return {
'nren': nren.name,
'nren_country': nren.country,
'year': int(year),
}
def extract_special_model_data(nren, year, entry_model):
if isinstance(entry_model, ExternalConnections):
flat_remote_campuses_data = []
for connection in entry_model.connections:
flat_remote_campuses_data.append({
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'link_name': connection['link_name'],
'capacity': connection['capacity'],
'from_organization': connection['from_organization'],
'to_organization': connection['to_organization'],
'interconnection_method': ConnectionMethod(connection['interconnection_method']).value
if connection['interconnection_method'] is not None else None
})
return flat_remote_campuses_data
if isinstance(entry_model, RemoteCampuses):
flat_remote_campuses_data = []
if entry_model.connections:
for connection in entry_model.connections:
flat_remote_campuses_data.append({
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'remote_campus_connectivity': entry_model.remote_campus_connectivity,
'country': connection['country'],
'local_r_and_e_connection': connection['local_r_and_e_connection']
})
return flat_remote_campuses_data
else:
return [{
'nren': entry_model.nren.name,
'nren_country': entry_model.nren.country,
'year': int(entry_model.year),
'remote_campus_connectivity': entry_model.remote_campus_connectivity,
}]
else:
return [{
'nren': nren.name,
'nren_country': nren.country,
'year': int(year),
}]