Skip to content
Snippets Groups Projects
Select Git revision
  • e8fca72f5ee631febefc0c479d00577ec5898d1c
  • develop default protected
  • feature/nat-1829-fix-scoped-commercial-peers
  • release/4.28
  • validate_l2_circuit
  • master protected
  • feature/NAT-1797-netbox-migration
  • authorship-fix-from-develop
  • 1048-service-config-backfilling
  • feature/nat-1211-edgeport-lacp-xmit
  • fix/nat-1120-sdp-validation
  • NAT-1154-import-edge-port-update
  • fix/l3-imports
  • feature/10GGBS-NAT-980
  • fix/NAT-1009/fix-redeploy-base-config-if-there-is-a-vprn
  • 4.27
  • 4.26
  • 4.25
  • 4.24
  • 4.23
  • 4.22
  • 4.21
  • 4.20
  • 4.19
  • 4.18
  • 4.17
  • 4.16
  • 4.15
  • 4.14
  • 4.13
  • 4.12
  • 4.11
  • 4.10
  • 4.8
  • 4.5
35 results

test-docs.sh

Blame
  • config.py 3.14 KiB
    import json
    import logging.config
    import pathlib
    
    import jsonschema
    
    logger = logging.getLogger(__name__)
    
    ERROR_REPORT_CONFIG_SCHEMA = {
        "$schema": "https://json-schema.org/draft/2020-12/schema",
        "definitions": {
            "email-params": {
                "type": "object",
                "properties": {
                    "from": {"type": "string"},
                    "reply_to": {"type": "string"},
                    "to": {"$ref": "#/definitions/string-or-array"},
                    "cc": {"$ref": "#/definitions/string-or-array"},
                    "hostname": {"type": "string"},
                    "port": {"type": "integer"},
                    "username": {"type": "string"},
                    "password": {"type": "string"},
                    "starttls": {"type": "boolean"},
                },
                "required": [
                    "from",
                    "to",
                    "hostname",
                ],
                "additionalProperties": False,
            },
            "string-or-array": {
                "oneOf": [
                    {"type": "string"},
                    {"type": "array", "items": {"type": "string"}, "minItems": 1},
                ]
            },
            "influx-db-params": {
                "type": "object",
                "properties": {
                    "ssl": {"type": "boolean"},
                    "hostname": {"type": "string"},
                    "port": {"type": "integer"},
                    "username": {"type": "string"},
                    "password": {"type": "string"},
                    "database": {"type": "string"},
                    "measurement": {"type": "string"},
                },
                "required": [
                    # ssl, port are optional
                    "hostname",
                    "username",
                    "password",
                    "database",
                    "measurement",
                ],
                "additionalProperties": False,
            },
        },
        "type": "object",
        "properties": {
            "email": {"$ref": "#/definitions/email-params"},
            "inventory": {
                "type": "array",
                "items": {"type": "string", "format": "uri"},
                "minItems": 1,
            },
            "influx": {"$ref": "#/definitions/influx-db-params"},
            "exclude-interfaces": {
                "type": "array",
                "items": {
                    "type": "array",
                    "prefixItems": [
                        {"type": "string"},
                        {"type": "string"},
                    ],
                },
            },
        },
        "required": ["email", "influx"],
        "additionalProperties": False,
    }
    
    
    def load(config_file: pathlib.Path):
        """
        loads, validates and returns configuration parameters
    
        :param config_file: filename (file-like object, opened for reading)
        :return: a dict containing configuration parameters
        :raises: json.JSONDecodeError, jsonschema.ValidationError
        """
    
        config = json.loads(config_file.read_text())
        jsonschema.validate(config, ERROR_REPORT_CONFIG_SCHEMA)
    
        # convert str addresses into list of str
        if isinstance(config["email"]["to"], str):
            config["email"]["to"] = [config["email"]["to"]]
        if isinstance(config["email"].get("cc"), str):
            config["email"]["cc"] = [config["email"]["cc"]]
        return config