Select Git revision
db_publisher.py 19.33 KiB
"""
survey_publisher_2022
=========================
This module loads the survey data from 2022 from the survey database.
Registered as click cli command when installing compendium-v2.
"""
import logging
import click
import enum
import math
import json
import html
from sqlalchemy import delete, text
from collections import defaultdict
import compendium_v2
from compendium_v2.db.model import FeeType
from compendium_v2.environment import setup_logging
from compendium_v2.config import load
from compendium_v2.publishers.helpers import extract_urls
from compendium_v2.survey_db import model as survey_model
from compendium_v2.db import db, model
from compendium_v2.publishers import helpers
setup_logging()
logger = logging.getLogger('survey-publisher-2022')
BUDGET_QUERY = """
SELECT DISTINCT ON (n.id, a.question_id)
n.abbreviation AS nren,
a.value AS budget
FROM answers a
JOIN nrens n ON a.nren_id = n.id
JOIN questions q ON a.question_id = q.id
JOIN sections s ON q.section_id = s.id
JOIN compendia c ON s.compendium_id = c.id
WHERE
a.question_id = 16402
AND c.year = 2022
ORDER BY n.id, a.question_id, a.updated_at DESC
"""
QUESTION_TEMPLATE_QUERY = """
SELECT DISTINCT ON (n.id, a.question_id)
n.abbreviation AS nren,
a.value AS value
FROM answers a
JOIN nrens n ON a.nren_id = n.id
JOIN questions q ON a.question_id = q.id
JOIN sections s ON q.section_id = s.id
JOIN compendia c ON s.compendium_id = c.id
WHERE
a.question_id = {}
AND c.year = {}
AND a.value NOT IN ('"NA"', '"N/A"', '[""]', '["-"]', '["/"]')
ORDER BY n.id, a.question_id, a.updated_at DESC
"""
INSTITUTIONS_URLS_QUERY_UNTIL_2022 = """
WITH RECURSIVE parent_questions AS (
-- Base case
SELECT q.id, q.equivalent_question_id, c.year, q.title
FROM questions q
JOIN sections s ON q.section_id = s.id
JOIN compendia c ON s.compendium_id = c.id
WHERE q.id = 16507
UNION ALL
-- Recursive case
SELECT q.id, q.equivalent_question_id, c.year, q.title
FROM questions q
INNER JOIN parent_questions pq ON q.id = pq.equivalent_question_id
JOIN sections s ON q.section_id = s.id
JOIN compendia c ON s.compendium_id = c.id)
SELECT DISTINCT ON (n.id, answers.question_id) answers.id,
UPPER(n.abbreviation) AS nren,
parent_questions.year,
answers.value as answer
FROM answers
JOIN parent_questions ON answers.question_id = parent_questions.id
JOIN nrens n on answers.nren_id = n.id
WHERE UPPER(answers.value) NOT IN ('"NA"', '"N/A"', '[""]', '["-"]', '["/"]', '/', '["NA"]', '""', '[]', '[n/a]')
ORDER BY n.id, answers.question_id, answers.updated_at DESC;
"""
class FundingSource(enum.Enum):
CLIENT_INSTITUTIONS = 16405
EUROPEAN_FUNDING = 16406
COMMERCIAL = 16407
OTHER = 16408
GOV_PUBLIC_BODIES = 16409
class StaffQuestion(enum.Enum):
"""
Answers are numbers expressed in FTEs (full time equivalents)
"""
PERMANENT_FTE = 16414
SUBCONTRACTED_FTE = 16413
TECHNICAL_FTE = 16416
NON_TECHNICAL_FTE = 16417
class OrgQuestion(enum.Enum):
"""
Answers are strings
"""
PARENT_ORG_NAME = 16419
SUB_ORGS_1_NAME = 16422
SUB_ORGS_1_CHOICE = 16449
SUB_ORGS_1_ROLE = 16426
SUB_ORGS_2_NAME = 16429
SUB_ORGS_2_CHOICE = 16448
SUB_ORGS_2_ROLE = 16434
SUB_ORGS_3_NAME = 16430
SUB_ORGS_3_CHOICE = 16446
SUB_ORGS_3_ROLE = 16435
SUB_ORGS_4_NAME = 16432
SUB_ORGS_4_CHOICE = 16451
SUB_ORGS_4_ROLE = 16438
SUB_ORGS_5_NAME = 16433
SUB_ORGS_5_CHOICE = 16450
SUB_ORGS_5_ROLE = 16439
class ECQuestion(enum.Enum):
EC_PROJECT = 16453
class ChargingStructure(enum.Enum):
"""
Answers are strings
"""
charging_structure = 16410
def query_budget():
return db.session.execute(text(BUDGET_QUERY), bind_arguments={'bind': db.engines[survey_model.SURVEY_DB_BIND]})
def query_institutions_urls():
return db.session.execute(text(INSTITUTIONS_URLS_QUERY_UNTIL_2022),
bind_arguments={'bind': db.engines[survey_model.SURVEY_DB_BIND]})
def query_funding_sources():
for source in FundingSource:
query = QUESTION_TEMPLATE_QUERY.format(source.value, 2022)
yield source, db.session.execute(text(query), bind_arguments={'bind': db.engines[survey_model.SURVEY_DB_BIND]})
def query_question(question: enum.Enum):
return query_question_id(question.value)
def query_question_id(question_id: int, year: int = 2022):
query = QUESTION_TEMPLATE_QUERY.format(question_id, year)
return db.session.execute(text(query), bind_arguments={'bind': db.engines[survey_model.SURVEY_DB_BIND]})
def transfer_budget(nren_dict):
rows = query_budget()
for row in rows:
nren_name = row[0].upper()
_budget = row[1]
try:
budget = float(_budget.replace('"', '').replace(',', ''))
except ValueError:
logger.info(f'{nren_name} has no budget for 2022. Skipping. ({_budget}))')
continue
if nren_name == 'BREN':
# obviously invalid datapoint
continue
if budget > 200:
logger.info(f'{nren_name} has budget set to >200M EUR for 2022. ({budget})')
if nren_name not in nren_dict:
logger.info(f'{nren_name} unknown. Skipping.')
continue
budget_entry = model.BudgetEntry(
nren=nren_dict[nren_name],
nren_id=nren_dict[nren_name].id,
budget=budget,
year=2022,
)
db.session.merge(budget_entry)
db.session.commit()
def transfer_institutions_urls(nren_dict):
def _parse_json(value):
if value and not value.startswith('['):
value = f'[{value}]'
try:
return [url.strip() for url in json.loads(value) if url.strip()]
except json.decoder.JSONDecodeError:
logger.info(f'JSON decode error for institution urls for {nren_name}.')
return []
rows = query_institutions_urls()
for row in rows:
answer_id, nren_name, year, answer = row
if nren_name not in nren_dict:
logger.info(f'{nren_name} unknown. Skipping.')
continue
urls = extract_urls(text=answer)
urls_json = _parse_json(answer)
if urls != urls_json:
logger.info(f'Institution URLs for {nren_name} do not match between json and regex. {urls} != {urls_json}')
if not urls:
logger.info(f'{nren_name} has no urls for {year}. Skipping.')
continue
institution_urls = model.InstitutionURLs(
nren=nren_dict[nren_name],
nren_id=nren_dict[nren_name].id,
urls=urls,
year=year,
)
db.session.merge(institution_urls)
db.session.commit()
def transfer_funding_sources(nren_dict):
sourcedata = {}
for source, data in query_funding_sources():
for row in data:
nren_name = row[0].upper()
_value = row[1]
try:
value = float(_value.replace('"', '').replace(',', ''))
except ValueError:
name = source.name
logger.info(f'{nren_name} has invalid value for {name}. ({_value}))')
value = 0
nren_info = sourcedata.setdefault(
nren_name,
{source_type: 0 for source_type in FundingSource}
)
nren_info[source] = value
for nren_name, nren_info in sourcedata.items():
total = sum(nren_info.values())
if not math.isclose(total, 100, abs_tol=0.01):
logger.info(f'{nren_name} funding sources do not sum to 100%. ({total})')
if nren_name not in nren_dict:
logger.info(f'{nren_name} unknown. Skipping.')
continue
funding_source = model.FundingSource(
nren=nren_dict[nren_name],
nren_id=nren_dict[nren_name].id,
year=2022,
client_institutions=nren_info[FundingSource.CLIENT_INSTITUTIONS],
european_funding=nren_info[FundingSource.EUROPEAN_FUNDING],
gov_public_bodies=nren_info[FundingSource.GOV_PUBLIC_BODIES],
commercial=nren_info[FundingSource.COMMERCIAL],
other=nren_info[FundingSource.OTHER],
)
db.session.merge(funding_source)
db.session.commit()
def transfer_staff_data(nren_dict):
data = {}
for question in StaffQuestion:
rows = query_question(question)
for row in rows:
nren_name = row[0].upper()
_value = row[1]
try:
value = float(_value.replace('"', '').replace(',', ''))
except ValueError:
value = 0
if nren_name not in nren_dict:
logger.info(f'{nren_name} unknown. Skipping.')
continue
# initialize on first use, so we don't add data for nrens with no answers
data.setdefault(nren_name, {question: 0 for question in StaffQuestion})[question] = value
for nren_name, nren_info in data.items():
if sum([nren_info[question] for question in StaffQuestion]) == 0:
logger.info(f'{nren_name} has no staff data. Deleting if exists.')
db.session.execute(delete(model.NrenStaff).where(
model.NrenStaff.nren_id == nren_dict[nren_name].id,
model.NrenStaff.year == 2022
))
continue
employed = nren_info[StaffQuestion.PERMANENT_FTE] + nren_info[StaffQuestion.SUBCONTRACTED_FTE]
technical = nren_info[StaffQuestion.TECHNICAL_FTE] + nren_info[StaffQuestion.NON_TECHNICAL_FTE]
if not math.isclose(employed, technical, abs_tol=0.01):
logger.info(f'{nren_name} FTE do not equal across employed/technical categories.'
f' ({employed} != {technical})')
staff_data = model.NrenStaff(
nren=nren_dict[nren_name],
nren_id=nren_dict[nren_name].id,
year=2022,
permanent_fte=nren_info[StaffQuestion.PERMANENT_FTE],
subcontracted_fte=nren_info[StaffQuestion.SUBCONTRACTED_FTE],
technical_fte=nren_info[StaffQuestion.TECHNICAL_FTE],
non_technical_fte=nren_info[StaffQuestion.NON_TECHNICAL_FTE],
)
db.session.merge(staff_data)
db.session.commit()
def transfer_nren_parent_org(nren_dict):
# clean up the data a bit by removing some strings
strings_to_replace = [
'We are affiliated to '
]
db.session.execute(delete(model.ParentOrganization).where(model.ParentOrganization.year == 2022))
rows = query_question(OrgQuestion.PARENT_ORG_NAME)
for row in rows:
nren_name = row[0].upper()
value = str(row[1]).replace('"', '')
if not value:
continue
for string in strings_to_replace:
value = value.replace(string, '')
if nren_name not in nren_dict:
logger.info(f'{nren_name} unknown. Skipping.')
continue
parent_org = model.ParentOrganization(
nren=nren_dict[nren_name],
nren_id=nren_dict[nren_name].id,
year=2022,
organization=value,
)
db.session.merge(parent_org)
db.session.commit()
def transfer_nren_sub_org(nren_dict):
suborg_questions = [
(OrgQuestion.SUB_ORGS_1_NAME, OrgQuestion.SUB_ORGS_1_CHOICE, OrgQuestion.SUB_ORGS_1_ROLE),
(OrgQuestion.SUB_ORGS_2_NAME, OrgQuestion.SUB_ORGS_2_CHOICE, OrgQuestion.SUB_ORGS_2_ROLE),
(OrgQuestion.SUB_ORGS_3_NAME, OrgQuestion.SUB_ORGS_3_CHOICE, OrgQuestion.SUB_ORGS_3_ROLE),
(OrgQuestion.SUB_ORGS_4_NAME, OrgQuestion.SUB_ORGS_4_CHOICE, OrgQuestion.SUB_ORGS_4_ROLE),
(OrgQuestion.SUB_ORGS_5_NAME, OrgQuestion.SUB_ORGS_5_CHOICE, OrgQuestion.SUB_ORGS_5_ROLE)
]
lookup = defaultdict(list)
db.session.execute(delete(model.SubOrganization).where(model.SubOrganization.year == 2022))
for name, choice, role in suborg_questions:
_name_rows = query_question(name)
_choice_rows = query_question(choice)
_role_rows = list(query_question(role))
for _name, _choice in zip(_name_rows, _choice_rows):
nren_name = _name[0].upper()
suborg_name = _name[1].replace('"', '').strip()
role_choice = _choice[1].replace('"', '').strip()
if nren_name not in nren_dict:
logger.info(f'{nren_name} unknown. Skipping.')
continue
if role_choice.lower() == 'other':
for _role in _role_rows:
if _role[0] == _name[0]:
role = _role[1].replace('"', '').strip()
break
else:
role = role_choice
if not role:
continue
lookup[nren_name].append((suborg_name, role))
for nren_name, suborgs in lookup.items():
for suborg_name, role in suborgs:
suborg = model.SubOrganization(
nren=nren_dict[nren_name],
nren_id=nren_dict[nren_name].id,
year=2022,
organization=suborg_name,
role=role,
)
db.session.merge(suborg)
db.session.commit()
def transfer_charging_structure(nren_dict):
rows = query_question(ChargingStructure.charging_structure)
for row in rows:
nren_name = row[0].upper()
value = row[1].replace('"', '').strip()
if nren_name not in nren_dict:
logger.info(f'{nren_name} unknown. Skipping from charging structure.')
continue
if "do not charge" in value:
charging_structure = FeeType.no_charge
elif "combination" in value:
charging_structure = FeeType.combination
elif "flat" in value:
charging_structure = FeeType.flat_fee
elif "usage-based" in value:
charging_structure = FeeType.usage_based_fee
elif "Other" in value:
charging_structure = FeeType.other
else:
charging_structure = None
charging_structure = model.ChargingStructure(
nren=nren_dict[nren_name],
nren_id=nren_dict[nren_name].id,
year=2022,
fee_type=charging_structure,
)
db.session.merge(charging_structure)
db.session.commit()
def transfer_ec_projects(nren_dict):
# delete all existing EC projects, in case something changed
db.session.execute(
delete(model.ECProject).where(model.ECProject.year == 2022)
)
rows = query_question(ECQuestion.EC_PROJECT)
for row in rows:
nren_name = row[0].upper()
if nren_name not in nren_dict:
logger.warning(f'{nren_name} unknown. Skipping.')
continue
try:
value = json.loads(row[1])
except json.decoder.JSONDecodeError:
logger.info(f'JSON decode error for EC project data for {nren_name}. Skipping.')
continue
for val in value:
if not val:
logger.info(f'Empty EC project value for {nren_name}.')
continue
# strip html entities/NBSP from val
val = html.unescape(val).replace('\xa0', ' ')
# some answers include contract numbers, which we don't want here
val = val.split('(contract n')[0]
ec_project = model.ECProject(
nren=nren_dict[nren_name],
nren_id=nren_dict[nren_name].id,
year=2022,
project=str(val).strip()
)
db.session.add(ec_project)
db.session.commit()
def transfer_policies(nren_dict):
"""
Answers are strings that should be urls, but sometimes there's other stuff
like email addresses or random text
"""
policy_questions = {
'strategy': {2022: 16469, 2021: 16064, 2020: 15720, 2019: 15305, 2018: 14910},
'environment': {2022: 16471, 2021: 16066, 2020: 15722, 2019: 15307, 2018: 14912},
'equality': {2022: 16473, 2021: 16378},
'connectiviy': {2022: 16475, 2021: 16068, 2020: 15724, 2019: 15309, 2018: 14914},
'acceptable_use': {2022: 16477, 2021: 16070, 2020: 15726, 2019: 15311, 2018: 14916},
'privacy': {2022: 16479, 2021: 16072, 2020: 15728, 2019: 15575},
'data_protection': {2022: 16481, 2021: 16074, 2020: 15730, 2019: 15577}
}
data = {}
for year in [2018, 2019, 2020, 2021, 2022]:
policy_questions_year = {key: years[year] for key, years in policy_questions.items() if year in years}
for question_key, question_id in policy_questions_year.items():
rows = query_question_id(question_id, year)
for row in rows:
nren_name = row[0].upper()
_value = row[1]
if nren_name not in nren_dict:
logger.warning(f'{nren_name} unknown. Skipping.')
continue
value = _value.split()[0].strip('"')
if not value:
# don't warn on empty answers, just skip
continue
if value.upper() == 'N.A.' or ('.' not in value and '@' not in value):
# this test is a bit silly but does seem to filter out all the nonsense responses
logger.warning(f'"{value}" does not look like an email address or link. Skipping.')
continue
if _value not in [f'"{value}"', value]:
logger.info(f'Cleaned policy answer: "{_value}" became "{value}"')
# initialize on first use, so we don't add data for nrens with no answers
data.setdefault((nren_name, year), {q: '' for q in policy_questions.keys()})
data[(nren_name, year)][question_key] = value
for (nren_name, year), nren_info in data.items():
policy_data = model.Policy(
nren=nren_dict[nren_name],
nren_id=nren_dict[nren_name].id,
year=year,
strategic_plan=nren_info['strategy'],
environmental=nren_info['environment'],
equal_opportunity=nren_info['equality'],
connectivity=nren_info['connectiviy'],
acceptable_use=nren_info['acceptable_use'],
privacy_notice=nren_info['privacy'],
data_protection=nren_info['data_protection'],
)
db.session.merge(policy_data)
db.session.commit()
def _cli(config, app):
with app.app_context():
nren_dict = helpers.get_uppercase_nren_dict()
transfer_budget(nren_dict)
transfer_funding_sources(nren_dict)
transfer_staff_data(nren_dict)
transfer_nren_parent_org(nren_dict)
transfer_nren_sub_org(nren_dict)
transfer_charging_structure(nren_dict)
transfer_ec_projects(nren_dict)
transfer_policies(nren_dict)
transfer_institutions_urls(nren_dict)
@click.command()
@click.option('--config', type=click.STRING, default='config.json')
def cli(config):
app_config = load(open(config, 'r'))
app_config['SQLALCHEMY_BINDS'] = {survey_model.SURVEY_DB_BIND: app_config['SURVEY_DATABASE_URI']}
app = compendium_v2._create_app_with_db(app_config)
_cli(app_config, app)
if __name__ == "__main__":
cli()