Skip to content
Snippets Groups Projects
Select Git revision
  • d2e2e6745cd865bcf6c9e7a395e7200ae2be5f6f
  • master default protected
  • opensearch
  • master-wo-opendistro-plugins
  • dev-bartos
  • tag-modification
  • java-upgrade
  • ports
  • nifi-update
  • kiril.kjiroski-master-patch-71888
  • user-mgmt-ui
  • interactive
  • cluster-support
  • dev5
  • dev4
  • dev02
  • dev01b
  • dev1
  • dev3
  • dev2
  • v1.0
  • v0.7
22 results

HOWTOS.md

Blame
  • db_publisher.py 19.33 KiB
    """
    survey_publisher_2022
    =========================
    
    This module loads the survey data from 2022 from the survey database.
    Registered as click cli command when installing compendium-v2.
    
    """
    import logging
    import click
    import enum
    import math
    import json
    import html
    
    from sqlalchemy import delete, text
    from collections import defaultdict
    
    import compendium_v2
    from compendium_v2.db.model import FeeType
    from compendium_v2.environment import setup_logging
    from compendium_v2.config import load
    from compendium_v2.publishers.helpers import extract_urls
    from compendium_v2.survey_db import model as survey_model
    from compendium_v2.db import db, model
    from compendium_v2.publishers import helpers
    
    setup_logging()
    
    logger = logging.getLogger('survey-publisher-2022')
    
    BUDGET_QUERY = """
    SELECT DISTINCT ON (n.id, a.question_id)
        n.abbreviation AS nren,
        a.value AS budget
    FROM answers a
    JOIN nrens n ON a.nren_id = n.id
    JOIN questions q ON a.question_id = q.id
    JOIN sections s ON q.section_id = s.id
    JOIN compendia c ON s.compendium_id = c.id
    WHERE
        a.question_id = 16402
    AND c.year = 2022
    ORDER BY n.id, a.question_id, a.updated_at DESC
    """
    
    QUESTION_TEMPLATE_QUERY = """
    SELECT DISTINCT ON (n.id, a.question_id)
        n.abbreviation AS nren,
        a.value AS value
    FROM answers a
    JOIN nrens n ON a.nren_id = n.id
    JOIN questions q ON a.question_id = q.id
    JOIN sections s ON q.section_id = s.id
    JOIN compendia c ON s.compendium_id = c.id
    WHERE
        a.question_id = {}
        AND c.year = {}
        AND a.value NOT IN ('"NA"', '"N/A"', '[""]', '["-"]', '["/"]')
    ORDER BY n.id, a.question_id, a.updated_at DESC
    """
    
    INSTITUTIONS_URLS_QUERY_UNTIL_2022 = """
        WITH RECURSIVE parent_questions AS (
            -- Base case
            SELECT q.id, q.equivalent_question_id, c.year, q.title
            FROM questions q
                     JOIN sections s ON q.section_id = s.id
                     JOIN compendia c ON s.compendium_id = c.id
            WHERE q.id = 16507
            UNION ALL
            -- Recursive case
            SELECT q.id, q.equivalent_question_id, c.year, q.title
            FROM questions q
                     INNER JOIN parent_questions pq ON q.id = pq.equivalent_question_id
                     JOIN sections s ON q.section_id = s.id
                     JOIN compendia c ON s.compendium_id = c.id)
        SELECT DISTINCT ON (n.id, answers.question_id) answers.id,
                                                       UPPER(n.abbreviation) AS nren,
                                                       parent_questions.year,
                                                       answers.value         as answer
        FROM answers
                 JOIN parent_questions ON answers.question_id = parent_questions.id
                 JOIN nrens n on answers.nren_id = n.id
        WHERE UPPER(answers.value) NOT IN ('"NA"', '"N/A"', '[""]', '["-"]', '["/"]', '/', '["NA"]', '""', '[]', '[n/a]')
        ORDER BY n.id, answers.question_id, answers.updated_at DESC;
    """
    
    
    class FundingSource(enum.Enum):
        CLIENT_INSTITUTIONS = 16405
        EUROPEAN_FUNDING = 16406
        COMMERCIAL = 16407
        OTHER = 16408
        GOV_PUBLIC_BODIES = 16409
    
    
    class StaffQuestion(enum.Enum):
        """
        Answers are numbers expressed in FTEs (full time equivalents)
        """
        PERMANENT_FTE = 16414
        SUBCONTRACTED_FTE = 16413
        TECHNICAL_FTE = 16416
        NON_TECHNICAL_FTE = 16417
    
    
    class OrgQuestion(enum.Enum):
        """
        Answers are strings
        """
        PARENT_ORG_NAME = 16419
    
        SUB_ORGS_1_NAME = 16422
        SUB_ORGS_1_CHOICE = 16449
        SUB_ORGS_1_ROLE = 16426
    
        SUB_ORGS_2_NAME = 16429
        SUB_ORGS_2_CHOICE = 16448
        SUB_ORGS_2_ROLE = 16434
    
        SUB_ORGS_3_NAME = 16430
        SUB_ORGS_3_CHOICE = 16446
        SUB_ORGS_3_ROLE = 16435
    
        SUB_ORGS_4_NAME = 16432
        SUB_ORGS_4_CHOICE = 16451
        SUB_ORGS_4_ROLE = 16438
    
        SUB_ORGS_5_NAME = 16433
        SUB_ORGS_5_CHOICE = 16450
        SUB_ORGS_5_ROLE = 16439
    
    
    class ECQuestion(enum.Enum):
        EC_PROJECT = 16453
    
    
    class ChargingStructure(enum.Enum):
        """
        Answers are strings
        """
        charging_structure = 16410
    
    
    def query_budget():
        return db.session.execute(text(BUDGET_QUERY), bind_arguments={'bind': db.engines[survey_model.SURVEY_DB_BIND]})
    
    
    def query_institutions_urls():
        return db.session.execute(text(INSTITUTIONS_URLS_QUERY_UNTIL_2022),
                                  bind_arguments={'bind': db.engines[survey_model.SURVEY_DB_BIND]})
    
    
    def query_funding_sources():
        for source in FundingSource:
            query = QUESTION_TEMPLATE_QUERY.format(source.value, 2022)
            yield source, db.session.execute(text(query), bind_arguments={'bind': db.engines[survey_model.SURVEY_DB_BIND]})
    
    
    def query_question(question: enum.Enum):
        return query_question_id(question.value)
    
    
    def query_question_id(question_id: int, year: int = 2022):
        query = QUESTION_TEMPLATE_QUERY.format(question_id, year)
        return db.session.execute(text(query), bind_arguments={'bind': db.engines[survey_model.SURVEY_DB_BIND]})
    
    
    def transfer_budget(nren_dict):
        rows = query_budget()
        for row in rows:
            nren_name = row[0].upper()
            _budget = row[1]
            try:
                budget = float(_budget.replace('"', '').replace(',', ''))
            except ValueError:
                logger.info(f'{nren_name} has no budget for 2022. Skipping. ({_budget}))')
                continue
    
            if nren_name == 'BREN':
                # obviously invalid datapoint
                continue
    
            if budget > 200:
                logger.info(f'{nren_name} has budget set to >200M EUR for 2022. ({budget})')
    
            if nren_name not in nren_dict:
                logger.info(f'{nren_name} unknown. Skipping.')
                continue
    
            budget_entry = model.BudgetEntry(
                nren=nren_dict[nren_name],
                nren_id=nren_dict[nren_name].id,
                budget=budget,
                year=2022,
            )
            db.session.merge(budget_entry)
        db.session.commit()
    
    
    def transfer_institutions_urls(nren_dict):
        def _parse_json(value):
            if value and not value.startswith('['):
                value = f'[{value}]'
    
            try:
                return [url.strip() for url in json.loads(value) if url.strip()]
            except json.decoder.JSONDecodeError:
                logger.info(f'JSON decode error for institution urls for {nren_name}.')
                return []
    
        rows = query_institutions_urls()
    
        for row in rows:
            answer_id, nren_name, year, answer = row
            if nren_name not in nren_dict:
                logger.info(f'{nren_name} unknown. Skipping.')
                continue
    
            urls = extract_urls(text=answer)
            urls_json = _parse_json(answer)
            if urls != urls_json:
                logger.info(f'Institution URLs for {nren_name} do not match between json and regex. {urls} != {urls_json}')
    
            if not urls:
                logger.info(f'{nren_name} has no urls for {year}. Skipping.')
                continue
    
            institution_urls = model.InstitutionURLs(
                nren=nren_dict[nren_name],
                nren_id=nren_dict[nren_name].id,
                urls=urls,
                year=year,
            )
            db.session.merge(institution_urls)
    
        db.session.commit()
    
    
    def transfer_funding_sources(nren_dict):
        sourcedata = {}
        for source, data in query_funding_sources():
            for row in data:
                nren_name = row[0].upper()
                _value = row[1]
                try:
                    value = float(_value.replace('"', '').replace(',', ''))
                except ValueError:
                    name = source.name
                    logger.info(f'{nren_name} has invalid value for {name}. ({_value}))')
                    value = 0
    
                nren_info = sourcedata.setdefault(
                    nren_name,
                    {source_type: 0 for source_type in FundingSource}
                )
                nren_info[source] = value
    
        for nren_name, nren_info in sourcedata.items():
            total = sum(nren_info.values())
    
            if not math.isclose(total, 100, abs_tol=0.01):
                logger.info(f'{nren_name} funding sources do not sum to 100%. ({total})')
    
            if nren_name not in nren_dict:
                logger.info(f'{nren_name} unknown. Skipping.')
                continue
    
            funding_source = model.FundingSource(
                nren=nren_dict[nren_name],
                nren_id=nren_dict[nren_name].id,
                year=2022,
                client_institutions=nren_info[FundingSource.CLIENT_INSTITUTIONS],
                european_funding=nren_info[FundingSource.EUROPEAN_FUNDING],
                gov_public_bodies=nren_info[FundingSource.GOV_PUBLIC_BODIES],
                commercial=nren_info[FundingSource.COMMERCIAL],
                other=nren_info[FundingSource.OTHER],
            )
            db.session.merge(funding_source)
        db.session.commit()
    
    
    def transfer_staff_data(nren_dict):
        data = {}
        for question in StaffQuestion:
            rows = query_question(question)
            for row in rows:
                nren_name = row[0].upper()
                _value = row[1]
                try:
                    value = float(_value.replace('"', '').replace(',', ''))
                except ValueError:
                    value = 0
    
                if nren_name not in nren_dict:
                    logger.info(f'{nren_name} unknown. Skipping.')
                    continue
    
                # initialize on first use, so we don't add data for nrens with no answers
                data.setdefault(nren_name, {question: 0 for question in StaffQuestion})[question] = value
    
        for nren_name, nren_info in data.items():
            if sum([nren_info[question] for question in StaffQuestion]) == 0:
                logger.info(f'{nren_name} has no staff data. Deleting if exists.')
                db.session.execute(delete(model.NrenStaff).where(
                    model.NrenStaff.nren_id == nren_dict[nren_name].id,
                    model.NrenStaff.year == 2022
                ))
                continue
    
            employed = nren_info[StaffQuestion.PERMANENT_FTE] + nren_info[StaffQuestion.SUBCONTRACTED_FTE]
            technical = nren_info[StaffQuestion.TECHNICAL_FTE] + nren_info[StaffQuestion.NON_TECHNICAL_FTE]
    
            if not math.isclose(employed, technical, abs_tol=0.01):
                logger.info(f'{nren_name} FTE do not equal across employed/technical categories.'
                            f' ({employed} != {technical})')
    
            staff_data = model.NrenStaff(
                nren=nren_dict[nren_name],
                nren_id=nren_dict[nren_name].id,
                year=2022,
                permanent_fte=nren_info[StaffQuestion.PERMANENT_FTE],
                subcontracted_fte=nren_info[StaffQuestion.SUBCONTRACTED_FTE],
                technical_fte=nren_info[StaffQuestion.TECHNICAL_FTE],
                non_technical_fte=nren_info[StaffQuestion.NON_TECHNICAL_FTE],
            )
            db.session.merge(staff_data)
        db.session.commit()
    
    
    def transfer_nren_parent_org(nren_dict):
        # clean up the data a bit by removing some strings
        strings_to_replace = [
            'We are affiliated to '
        ]
    
        db.session.execute(delete(model.ParentOrganization).where(model.ParentOrganization.year == 2022))
    
        rows = query_question(OrgQuestion.PARENT_ORG_NAME)
        for row in rows:
            nren_name = row[0].upper()
            value = str(row[1]).replace('"', '')
    
            if not value:
                continue
    
            for string in strings_to_replace:
                value = value.replace(string, '')
    
            if nren_name not in nren_dict:
                logger.info(f'{nren_name} unknown. Skipping.')
                continue
    
            parent_org = model.ParentOrganization(
                nren=nren_dict[nren_name],
                nren_id=nren_dict[nren_name].id,
                year=2022,
                organization=value,
            )
            db.session.merge(parent_org)
        db.session.commit()
    
    
    def transfer_nren_sub_org(nren_dict):
        suborg_questions = [
            (OrgQuestion.SUB_ORGS_1_NAME, OrgQuestion.SUB_ORGS_1_CHOICE, OrgQuestion.SUB_ORGS_1_ROLE),
            (OrgQuestion.SUB_ORGS_2_NAME, OrgQuestion.SUB_ORGS_2_CHOICE, OrgQuestion.SUB_ORGS_2_ROLE),
            (OrgQuestion.SUB_ORGS_3_NAME, OrgQuestion.SUB_ORGS_3_CHOICE, OrgQuestion.SUB_ORGS_3_ROLE),
            (OrgQuestion.SUB_ORGS_4_NAME, OrgQuestion.SUB_ORGS_4_CHOICE, OrgQuestion.SUB_ORGS_4_ROLE),
            (OrgQuestion.SUB_ORGS_5_NAME, OrgQuestion.SUB_ORGS_5_CHOICE, OrgQuestion.SUB_ORGS_5_ROLE)
        ]
    
        lookup = defaultdict(list)
    
        db.session.execute(delete(model.SubOrganization).where(model.SubOrganization.year == 2022))
    
        for name, choice, role in suborg_questions:
            _name_rows = query_question(name)
            _choice_rows = query_question(choice)
            _role_rows = list(query_question(role))
            for _name, _choice in zip(_name_rows, _choice_rows):
                nren_name = _name[0].upper()
                suborg_name = _name[1].replace('"', '').strip()
                role_choice = _choice[1].replace('"', '').strip()
    
                if nren_name not in nren_dict:
                    logger.info(f'{nren_name} unknown. Skipping.')
                    continue
    
                if role_choice.lower() == 'other':
                    for _role in _role_rows:
                        if _role[0] == _name[0]:
                            role = _role[1].replace('"', '').strip()
                            break
                else:
                    role = role_choice
    
                if not role:
                    continue
    
                lookup[nren_name].append((suborg_name, role))
    
        for nren_name, suborgs in lookup.items():
            for suborg_name, role in suborgs:
                suborg = model.SubOrganization(
                    nren=nren_dict[nren_name],
                    nren_id=nren_dict[nren_name].id,
                    year=2022,
                    organization=suborg_name,
                    role=role,
                )
                db.session.merge(suborg)
        db.session.commit()
    
    
    def transfer_charging_structure(nren_dict):
        rows = query_question(ChargingStructure.charging_structure)
        for row in rows:
            nren_name = row[0].upper()
            value = row[1].replace('"', '').strip()
    
            if nren_name not in nren_dict:
                logger.info(f'{nren_name} unknown. Skipping from charging structure.')
                continue
    
            if "do not charge" in value:
                charging_structure = FeeType.no_charge
            elif "combination" in value:
                charging_structure = FeeType.combination
            elif "flat" in value:
                charging_structure = FeeType.flat_fee
            elif "usage-based" in value:
                charging_structure = FeeType.usage_based_fee
            elif "Other" in value:
                charging_structure = FeeType.other
            else:
                charging_structure = None
    
            charging_structure = model.ChargingStructure(
                nren=nren_dict[nren_name],
                nren_id=nren_dict[nren_name].id,
                year=2022,
                fee_type=charging_structure,
            )
            db.session.merge(charging_structure)
        db.session.commit()
    
    
    def transfer_ec_projects(nren_dict):
        # delete all existing EC projects, in case something changed
        db.session.execute(
            delete(model.ECProject).where(model.ECProject.year == 2022)
        )
    
        rows = query_question(ECQuestion.EC_PROJECT)
        for row in rows:
            nren_name = row[0].upper()
    
            if nren_name not in nren_dict:
                logger.warning(f'{nren_name} unknown. Skipping.')
                continue
    
            try:
                value = json.loads(row[1])
            except json.decoder.JSONDecodeError:
                logger.info(f'JSON decode error for EC project data for {nren_name}. Skipping.')
                continue
    
            for val in value:
                if not val:
                    logger.info(f'Empty EC project value for {nren_name}.')
                    continue
    
                # strip html entities/NBSP from val
                val = html.unescape(val).replace('\xa0', ' ')
    
                # some answers include contract numbers, which we don't want here
                val = val.split('(contract n')[0]
    
                ec_project = model.ECProject(
                    nren=nren_dict[nren_name],
                    nren_id=nren_dict[nren_name].id,
                    year=2022,
                    project=str(val).strip()
                )
                db.session.add(ec_project)
        db.session.commit()
    
    
    def transfer_policies(nren_dict):
        """
        Answers are strings that should be urls, but sometimes there's other stuff
        like email addresses or random text
        """
        policy_questions = {
            'strategy':        {2022: 16469, 2021: 16064, 2020: 15720, 2019: 15305, 2018: 14910},
            'environment':     {2022: 16471, 2021: 16066, 2020: 15722, 2019: 15307, 2018: 14912},
            'equality':        {2022: 16473, 2021: 16378},
            'connectiviy':     {2022: 16475, 2021: 16068, 2020: 15724, 2019: 15309, 2018: 14914},
            'acceptable_use':  {2022: 16477, 2021: 16070, 2020: 15726, 2019: 15311, 2018: 14916},
            'privacy':         {2022: 16479, 2021: 16072, 2020: 15728, 2019: 15575},
            'data_protection': {2022: 16481, 2021: 16074, 2020: 15730, 2019: 15577}
        }
    
        data = {}
        for year in [2018, 2019, 2020, 2021, 2022]:
            policy_questions_year = {key: years[year] for key, years in policy_questions.items() if year in years}
            for question_key, question_id in policy_questions_year.items():
                rows = query_question_id(question_id, year)
                for row in rows:
                    nren_name = row[0].upper()
                    _value = row[1]
    
                    if nren_name not in nren_dict:
                        logger.warning(f'{nren_name} unknown. Skipping.')
                        continue
    
                    value = _value.split()[0].strip('"')
    
                    if not value:
                        # don't warn on empty answers, just skip
                        continue
    
                    if value.upper() == 'N.A.' or ('.' not in value and '@' not in value):
                        # this test is a bit silly but does seem to filter out all the nonsense responses
                        logger.warning(f'"{value}" does not look like an email address or link. Skipping.')
                        continue
    
                    if _value not in [f'"{value}"', value]:
                        logger.info(f'Cleaned policy answer: "{_value}" became "{value}"')
    
                    # initialize on first use, so we don't add data for nrens with no answers
                    data.setdefault((nren_name, year), {q: '' for q in policy_questions.keys()})
                    data[(nren_name, year)][question_key] = value
    
        for (nren_name, year), nren_info in data.items():
            policy_data = model.Policy(
                nren=nren_dict[nren_name],
                nren_id=nren_dict[nren_name].id,
                year=year,
                strategic_plan=nren_info['strategy'],
                environmental=nren_info['environment'],
                equal_opportunity=nren_info['equality'],
                connectivity=nren_info['connectiviy'],
                acceptable_use=nren_info['acceptable_use'],
                privacy_notice=nren_info['privacy'],
                data_protection=nren_info['data_protection'],
            )
            db.session.merge(policy_data)
        db.session.commit()
    
    
    def _cli(config, app):
        with app.app_context():
            nren_dict = helpers.get_uppercase_nren_dict()
            transfer_budget(nren_dict)
            transfer_funding_sources(nren_dict)
            transfer_staff_data(nren_dict)
            transfer_nren_parent_org(nren_dict)
            transfer_nren_sub_org(nren_dict)
            transfer_charging_structure(nren_dict)
            transfer_ec_projects(nren_dict)
            transfer_policies(nren_dict)
            transfer_institutions_urls(nren_dict)
    
    
    @click.command()
    @click.option('--config', type=click.STRING, default='config.json')
    def cli(config):
        app_config = load(open(config, 'r'))
    
        app_config['SQLALCHEMY_BINDS'] = {survey_model.SURVEY_DB_BIND: app_config['SURVEY_DATABASE_URI']}
    
        app = compendium_v2._create_app_with_db(app_config)
        _cli(app_config, app)
    
    
    if __name__ == "__main__":
        cli()