Skip to content
Snippets Groups Projects
Commit 4f334d0d authored by Bjarke Madsen's avatar Bjarke Madsen
Browse files

Add back in data publishers until we clean the survey data

parent ea2254dd
Branches
Tags 3.7
No related merge requests found
"""
survey_publisher_v1
=========================
This module loads the survey data from before 2022 from a legacy Excel files.
Missing info is filled in from the survey db for some questions.
Registered as click cli command when installing compendium-v2.
"""
from __future__ import annotations
import itertools
import logging
import math
import click
from sqlalchemy import select, delete
from collections import defaultdict
import compendium_v2
from compendium_v2.config import load
from compendium_v2.db import db, presentation_models
from compendium_v2.environment import setup_logging
from compendium_v2.publishers import helpers, excel_parser
from compendium_v2.survey_db import model as survey_model
setup_logging()
logger = logging.getLogger('survey-publisher-legacy-excel')
def db_budget_migration(nren_dict):
# move data from Survey DB budget table
data = db.session.scalars(select(survey_model.Nrens))
db.session.execute(delete(presentation_models.BudgetEntry).where(
presentation_models.BudgetEntry.year < 2022))
inserts = defaultdict(dict)
for nren in data:
for budget in nren.budgets:
abbrev = nren.abbreviation.upper()
year = budget.year
if float(budget.budget) > 400:
logger.warning(f'Incorrect Data: {abbrev} has budget set >400M EUR for {year}. ({budget.budget})')
continue
if float(budget.budget) == 0:
logger.warning(f'Incorrect Data: {abbrev} has budget set to 0 EUR for {year}.')
continue
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
budget_entry = {
'nren': nren_dict[abbrev],
'nren_id': nren_dict[abbrev].id,
'budget': float(budget.budget),
'year': year
}
inserts[nren_dict[abbrev].id][year] = budget_entry
# Import the data from excel sheet to database
exceldata = excel_parser.fetch_budget_excel_data()
for abbrev, budget, year in exceldata:
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
if budget > 400:
logger.warning(f'{nren} has budget set to >400M EUR for {year}. ({budget})')
continue
budget_entry = {
'nren': nren_dict[abbrev],
'nren_id': nren_dict[abbrev].id,
'budget': budget,
'year': year
}
inserts[nren_dict[abbrev].id][year] = budget_entry
all_inserts = list(itertools.chain([i for y in inserts.values() for i in y.values()]))
db.session.bulk_insert_mappings(presentation_models.BudgetEntry, all_inserts)
db.session.commit()
def db_funding_migration(nren_dict):
# Import the data to database
data = excel_parser.fetch_funding_excel_data()
db.session.execute(delete(presentation_models.FundingSource).where(
presentation_models.FundingSource.year < 2022))
inserts = []
for (abbrev, year, client_institution,
european_funding,
gov_public_bodies,
commercial, other) in data:
_data = [client_institution, european_funding, gov_public_bodies, commercial, other]
total = sum(_data)
if not math.isclose(total, 100, abs_tol=0.01):
logger.warning(f'{abbrev} funding sources for {year} do not sum to 100% ({total})')
continue
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
inserts.append({
'nren': nren_dict[abbrev],
'nren_id': nren_dict[abbrev].id,
'year': year,
'client_institutions': client_institution,
'european_funding': european_funding,
'gov_public_bodies': gov_public_bodies,
'commercial': commercial,
'other': other
})
db.session.bulk_insert_mappings(presentation_models.FundingSource, inserts)
db.session.commit()
def db_charging_structure_migration(nren_dict):
# Import the data to database
data = excel_parser.fetch_charging_structure_excel_data()
db.session.execute(delete(presentation_models.ChargingStructure).where(
presentation_models.ChargingStructure.year < 2022))
inserts = []
for (abbrev, year, charging_structure) in data:
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
inserts.append({
'nren': nren_dict[abbrev],
'nren_id': nren_dict[abbrev].id,
'year': year,
'fee_type': charging_structure
})
db.session.bulk_insert_mappings(presentation_models.ChargingStructure, inserts)
db.session.commit()
def db_staffing_migration(nren_dict):
db.session.execute(delete(presentation_models.NrenStaff).where(
presentation_models.NrenStaff.year < 2022))
staff_data = list(excel_parser.fetch_staffing_excel_data())
nren_staff_map = {}
inserts = []
for (abbrev, year, permanent_fte, subcontracted_fte) in staff_data:
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping staff data.')
continue
nren = nren_dict[abbrev]
nren_staff_map[(nren.id, year)] = {
'nren': nren,
'nren_id': nren.id,
'year': year,
'permanent_fte': permanent_fte,
'subcontracted_fte': subcontracted_fte,
'technical_fte': 0,
'non_technical_fte': 0
}
function_data = excel_parser.fetch_staff_function_excel_data()
for (abbrev, year, technical_fte, non_technical_fte) in function_data:
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping staff function data.')
continue
nren = nren_dict[abbrev]
if (nren.id, year) in nren_staff_map:
nren_staff_map[(nren.id, year)]['technical_fte'] = technical_fte
nren_staff_map[(nren.id, year)]['non_technical_fte'] = non_technical_fte
else:
nren_staff_map[(nren.id, year)] = {
'nren': nren,
'nren_id': nren.id,
'year': year,
'permanent_fte': 0,
'subcontracted_fte': 0,
'technical_fte': technical_fte,
'non_technical_fte': non_technical_fte
}
for nren_staff_model in nren_staff_map.values():
employed = nren_staff_model['permanent_fte'] + nren_staff_model['subcontracted_fte']
technical = nren_staff_model['technical_fte'] + nren_staff_model['non_technical_fte']
if not math.isclose(employed, technical, abs_tol=0.01) and employed != 0 and technical != 0:
logger.warning(f'{nren_staff_model["nren"].name} in {nren_staff_model["year"]}:'
f' FTE do not equal across employed/technical categories ({employed} != {technical})')
del nren_staff_model['nren']
inserts.append(nren_staff_model)
db.session.bulk_insert_mappings(presentation_models.NrenStaff, inserts)
db.session.commit()
def db_ecprojects_migration(nren_dict):
db.session.execute(delete(presentation_models.ECProject).where(
presentation_models.ECProject.year < 2022))
ecproject_data = excel_parser.fetch_ecproject_excel_data()
inserted = set()
inserts = []
for (abbrev, year, project) in ecproject_data:
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
nren = nren_dict[abbrev]
if (nren.id, year, project) in inserted:
logger.warning(f'{nren.name} has duplicate EC project {project} in {year}. Skipping.')
continue
inserted.add((nren.id, year, project))
inserts.append({
'nren_id': nren.id,
'year': year,
'project': project
})
db.session.bulk_insert_mappings(presentation_models.ECProject, inserts)
db.session.commit()
def db_organizations_migration(nren_dict):
db.session.execute(delete(presentation_models.ParentOrganization).where(
presentation_models.ParentOrganization.year < 2022))
organization_data = excel_parser.fetch_organization_excel_data()
inserts = []
for (abbrev, year, org) in organization_data:
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
nren = nren_dict[abbrev]
inserts.append({
'nren_id': nren.id,
'year': year,
'organization': org
})
db.session.bulk_insert_mappings(presentation_models.ParentOrganization, inserts)
db.session.commit()
def db_traffic_volume_migration(nren_dict):
db.session.execute(delete(presentation_models.TrafficVolume).where(
presentation_models.TrafficVolume.year < 2023))
traffic_data = excel_parser.fetch_traffic_excel_data()
inserts = []
for (abbrev, year, from_external, to_external, from_customers, to_customers) in traffic_data:
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
nren = nren_dict[abbrev]
if nren.name == 'CESNET':
# COMP-447: correct CESNET traffic data for 2019
if year == 2019:
to_customers = 222766
inserts.append({
'nren_id': nren.id,
'year': year,
'from_customers': from_customers,
'to_customers': to_customers,
'from_external': from_external,
'to_external': to_external
})
db.session.bulk_insert_mappings(presentation_models.TrafficVolume, inserts)
db.session.commit()
def db_nren_services_migration(nren_dict):
services = [s for s in db.session.scalars(select(presentation_models.Service))]
for service_info in excel_parser.fetch_nren_services_excel_data():
[service] = [s for s in services if s.name_key == service_info['service_name_key']]
abbrev = service_info['nren_name']
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
nren = nren_dict[abbrev]
nren_service = presentation_models.NRENService(
nren=nren,
nren_id=nren.id,
year=service_info['year'],
service=service,
service_key=service.name_key,
product_name=service_info['product_name'],
additional_information=service_info['additional_information'],
official_description=service_info['official_description']
)
db.session.merge(nren_service)
db.session.commit()
def db_connected_proportion_migration(nren_dict):
db.session.execute(delete(presentation_models.ConnectedProportion).where(
presentation_models.ConnectedProportion.year < 2023))
remit = excel_parser.fetch_remit_excel_data()
nr_connected = excel_parser.fetch_nr_connected_excel_data()
market_share = excel_parser.fetch_market_share_excel_data()
users_served = excel_parser.fetch_users_served_excel_data()
data_by_nren = defaultdict(dict)
for key in itertools.chain(remit.keys(), nr_connected.keys(), market_share.keys(), users_served.keys()):
(abbrev, year, user_category) = key
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
nren = nren_dict[abbrev]
to_add = (nren.id, year, user_category)
data_by_nren[to_add].update({
'nren_id': nren.id,
'year': year,
'user_category': user_category,
'coverage': remit.get(key),
'number_connected': nr_connected.get(key, data_by_nren[to_add].get('number_connected')),
'market_share': market_share.get(key, data_by_nren[to_add].get('market_share')),
'users_served': users_served.get(key, data_by_nren[to_add].get('users_served'))
})
inserts = list(data_by_nren.values())
db.session.bulk_insert_mappings(presentation_models.ConnectedProportion, inserts)
db.session.commit()
def db_connectivity_level_migration(nren_dict):
db.session.execute(delete(presentation_models.ConnectivityLevel).where(
presentation_models.ConnectivityLevel.year < 2023))
typical_speed = excel_parser.fetch_typical_speed_excel_data()
highest_speed = excel_parser.fetch_highest_speed_excel_data()
highest_speed_proportion = excel_parser.fetch_highest_speed_proportion_excel_data()
data_by_nren = defaultdict(dict)
for key in itertools.chain(typical_speed.keys(), highest_speed.keys(), highest_speed_proportion.keys()):
(abbrev, year, user_category) = key
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
nren = nren_dict[abbrev]
to_add = (nren.id, year, user_category)
data_by_nren[to_add].update({
'nren_id': nren.id,
'year': year,
'user_category': user_category,
'typical_speed': typical_speed.get(key, data_by_nren[to_add].get('typical_speed')),
'highest_speed': highest_speed.get(key, data_by_nren[to_add].get('highest_speed')),
'highest_speed_proportion': highest_speed_proportion.get(
key, data_by_nren[to_add].get('highest_speed_proportion'))
})
inserts = list(data_by_nren.values())
db.session.bulk_insert_mappings(presentation_models.ConnectivityLevel, inserts)
db.session.commit()
def db_connection_carrier_migration(nren_dict):
db.session.execute(delete(presentation_models.ConnectionCarrier).where(
presentation_models.ConnectionCarrier.year < 2023))
carriers = excel_parser.fetch_carriers_excel_data()
inserts = []
for key, carry_mechanism in carriers.items():
(abbrev, year, user_category) = key
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
nren = nren_dict[abbrev]
inserts.append({
'nren_id': nren.id,
'year': year,
'user_category': user_category,
'carry_mechanism': carry_mechanism
})
db.session.bulk_insert_mappings(presentation_models.ConnectionCarrier, inserts)
db.session.commit()
def db_connectivity_growth_migration(nren_dict):
db.session.execute(delete(presentation_models.ConnectivityGrowth).where(
presentation_models.ConnectivityGrowth.year < 2023))
growth = excel_parser.fetch_growth_excel_data()
inserts = []
for key, growth_percent in growth.items():
(abbrev, year, user_category) = key
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
nren = nren_dict[abbrev]
inserts.append({
'nren_id': nren.id,
'year': year,
'user_category': user_category,
'growth': growth_percent
})
db.session.bulk_insert_mappings(presentation_models.ConnectivityGrowth, inserts)
db.session.commit()
def db_connectivity_load_migration(nren_dict):
db.session.execute(delete(presentation_models.ConnectivityLoad).where(
presentation_models.ConnectivityLoad.year < 2023))
average = excel_parser.fetch_average_traffic_excel_data()
peak = excel_parser.fetch_peak_traffic_excel_data()
all_entry_keys = set()
all_entry_keys.update(average.keys())
all_entry_keys.update(peak.keys())
inserts = []
for key in all_entry_keys:
(abbrev, year, user_category) = key
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
nren = nren_dict[abbrev]
inserts.append({
'nren_id': nren.id,
'year': year,
'user_category': user_category,
'average_load_from_institutions': average.get(key, (None, None))[0],
'average_load_to_institutions': average.get(key, (None, None))[1],
'peak_load_from_institutions': peak.get(key, (None, None))[0],
'peak_load_to_institutions': peak.get(key, (None, None))[1]
})
db.session.bulk_insert_mappings(presentation_models.ConnectivityLoad, inserts)
db.session.commit()
def db_remote_campuses_migration(nren_dict):
db.session.execute(delete(presentation_models.RemoteCampuses).where(
presentation_models.RemoteCampuses.year < 2023))
campuses = excel_parser.fetch_remote_campuses_excel_data()
inserts = []
for (abbrev, year, connectivity, country, connected_to_r_e) in campuses:
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
connections = []
if country:
connections.append({'country': country, 'local_r_and_e_connection': connected_to_r_e})
nren = nren_dict[abbrev]
inserts.append({
'nren_id': nren.id,
'year': year,
'remote_campus_connectivity': connectivity,
'connections': connections
})
db.session.bulk_insert_mappings(presentation_models.RemoteCampuses, inserts)
db.session.commit()
def db_dark_fibre_lease_migration(nren_dict):
db.session.execute(delete(presentation_models.DarkFibreLease).where(
presentation_models.DarkFibreLease.year < 2023))
data_rows = excel_parser.fetch_dark_fibre_iru_excel_data()
iru_duration = excel_parser.fetch_iru_duration_excel_data()
inserts = []
for (abbrev, year, iru, length_in_country, length_out_country) in data_rows:
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
nren = nren_dict[abbrev]
inserts.append({
'nren_id': nren.id,
'year': year,
'iru_or_lease': iru,
'fibre_length_in_country': length_in_country,
'fibre_length_outside_country': length_out_country,
'iru_duration': iru_duration.get((abbrev, year))
})
db.session.bulk_insert_mappings(presentation_models.DarkFibreLease, inserts)
db.session.commit()
def db_dark_fibre_installed_migration(nren_dict):
db.session.execute(delete(presentation_models.DarkFibreInstalled).where(
presentation_models.DarkFibreInstalled.year < 2023))
data_rows = excel_parser.fetch_dark_fibre_installed_excel_data()
inserts = []
for (abbrev, year, installed, length) in data_rows:
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
nren = nren_dict[abbrev]
inserts.append({
'nren_id': nren.id,
'year': year,
'installed': installed,
'fibre_length_in_country': length
})
db.session.bulk_insert_mappings(presentation_models.DarkFibreInstalled, inserts)
db.session.commit()
def db_passive_monitoring_migration(nren_dict):
db.session.execute(delete(presentation_models.PassiveMonitoring).where(
presentation_models.PassiveMonitoring.year < 2023))
data_rows = excel_parser.fetch_passive_monitoring_excel_data()
inserts = []
for (abbrev, year, monitoring, method) in data_rows:
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
nren = nren_dict[abbrev]
inserts.append({
'nren_id': nren.id,
'year': year,
'monitoring': monitoring,
'method': method
})
db.session.bulk_insert_mappings(presentation_models.PassiveMonitoring, inserts)
db.session.commit()
def db_capacity_migration(nren_dict):
db.session.execute(delete(presentation_models.Capacity).where(
presentation_models.Capacity.year < 2023))
largest_data_rows = excel_parser.fetch_largest_link_capacity_excel_data()
typical_data_rows = excel_parser.fetch_typical_backbone_capacity_excel_data()
by_nren = defaultdict(dict)
for key in itertools.chain(largest_data_rows.keys(), typical_data_rows.keys()):
(abbrev, year) = key
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
nren = nren_dict[abbrev]
to_add = (nren.id, year)
by_nren[to_add].update({
'nren_id': nren.id,
'year': year,
'largest_link_capacity': largest_data_rows.get(key, by_nren[to_add].get('largest_link_capacity')),
'typical_backbone_capacity': typical_data_rows.get(key, by_nren[to_add].get('typical_backbone_capacity'))
})
inserts = list(by_nren.values())
db.session.bulk_insert_mappings(presentation_models.Capacity, inserts)
db.session.commit()
def db_non_r_e_peers_migration(nren_dict):
db.session.execute(delete(presentation_models.NonREPeers).where(
presentation_models.NonREPeers.year < 2023))
data_rows = excel_parser.fetch_non_r_e_peers_excel_data()
inserts = []
for (abbrev, year, nr_of_non_r_and_e_peers) in data_rows:
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
nren = nren_dict[abbrev]
inserts.append({
'nren_id': nren.id,
'year': year,
'nr_of_non_r_and_e_peers': nr_of_non_r_and_e_peers
})
db.session.bulk_insert_mappings(presentation_models.NonREPeers, inserts)
db.session.commit()
def db_ops_automation_migration(nren_dict):
db.session.execute(delete(presentation_models.OpsAutomation).where(
presentation_models.OpsAutomation.year < 2023))
data_rows = excel_parser.fetch_ops_automation_excel_data()
inserts = []
for (abbrev, year, automation, specifics) in data_rows:
if abbrev not in nren_dict:
logger.warning(f'{abbrev} unknown. Skipping.')
continue
nren = nren_dict[abbrev]
inserts.append({
'nren_id': nren.id,
'year': year,
'ops_automation': automation,
'ops_automation_specifics': specifics
})
db.session.bulk_insert_mappings(presentation_models.OpsAutomation, inserts)
db.session.commit()
def _cli(app):
with app.app_context():
nren_dict = helpers.get_uppercase_nren_dict()
db_budget_migration(nren_dict)
db_funding_migration(nren_dict)
db_charging_structure_migration(nren_dict)
db_staffing_migration(nren_dict)
db_ecprojects_migration(nren_dict)
db_organizations_migration(nren_dict)
db_traffic_volume_migration(nren_dict)
db_nren_services_migration(nren_dict)
db_connected_proportion_migration(nren_dict)
db_connectivity_level_migration(nren_dict)
db_connection_carrier_migration(nren_dict)
db_connectivity_growth_migration(nren_dict)
db_connectivity_load_migration(nren_dict)
db_remote_campuses_migration(nren_dict)
db_dark_fibre_lease_migration(nren_dict)
db_dark_fibre_installed_migration(nren_dict)
db_passive_monitoring_migration(nren_dict)
db_capacity_migration(nren_dict)
db_non_r_e_peers_migration(nren_dict)
db_ops_automation_migration(nren_dict)
@click.command()
@click.option('--config', type=click.STRING, default='config.json')
def cli(config):
app_config = load(open(config, 'r'))
app_config['SQLALCHEMY_BINDS'] = {survey_model.SURVEY_DB_BIND: app_config['SURVEY_DATABASE_URI']}
app = compendium_v2._create_app_with_db(app_config)
print("survey-publisher-v1 starting")
_cli(app)
if __name__ == "__main__":
cli()
This diff is collapsed.
...@@ -33,6 +33,8 @@ setup( ...@@ -33,6 +33,8 @@ setup(
'conversion=compendium_v2.conversion.conversion:cli', 'conversion=compendium_v2.conversion.conversion:cli',
'dump_survey_model=compendium_v2.migrations.dump_survey_model:cli', 'dump_survey_model=compendium_v2.migrations.dump_survey_model:cli',
'legacy-survey-publisher=compendium_v2.publishers.legacy_publisher.survey_publisher_legacy:cli', 'legacy-survey-publisher=compendium_v2.publishers.legacy_publisher.survey_publisher_legacy:cli',
'excel-survey-publisher=compendium_v2.publishers.survey_publisher_legacy_excel:cli',
'db-publisher-2022=compendium_v2.publishers.survey_publisher_old_db_2022:cli',
] ]
}, },
license='MIT', license='MIT',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment