diff --git a/inventory_provider/__init__.py b/inventory_provider/__init__.py index 41575bae2d9eca554e522347bd9f2d8c85db71a8..915b79a2563c4a69925ea7b9bb679e323c204a11 100644 --- a/inventory_provider/__init__.py +++ b/inventory_provider/__init__.py @@ -3,7 +3,7 @@ automatically invoked app factory """ import logging import os -from flask import g, Flask, request, jsonify +from flask import Flask from flask_cors import CORS from inventory_provider import environment @@ -53,25 +53,7 @@ def create_app(setup_logging=True): @app.before_request @auth.login_required def secure_before_request(): - """Enforces authentication for all routes""" - client = g.get("auth_service") - - if not client: - # This allows clients to access any resource without providing an API key - # TODO: Only for testing, should be removed in Production - return - # return jsonify({"error": "Unauthorized"}), 403 - - CLIENT_PERMISSIONS = { - "serviceA": ["msr"], - "serviceB": ["testing"], - } - - allowed_routes = CLIENT_PERMISSIONS.get(client, []) - route = request.path.strip("/").split("/")[0] - - if route not in allowed_routes: - return jsonify({"error": "Forbidden"}), 403 + pass # IMS based routes diff --git a/inventory_provider/auth.py b/inventory_provider/auth.py index 466073e33c661ccf03fca9f5f60e9b2f8d8401f9..7ee9651e3b62d75b46f0942b1b33054d896cc5e4 100644 --- a/inventory_provider/auth.py +++ b/inventory_provider/auth.py @@ -1,5 +1,6 @@ -from flask import Blueprint, current_app, g +from flask import current_app, g, jsonify from flask_httpauth import HTTPTokenAuth +from functools import wraps auth = HTTPTokenAuth(scheme="ApiKey") @@ -8,11 +9,34 @@ def verify_api_key(api_key): config = current_app.config["INVENTORY_PROVIDER_CONFIG"] # This is to enable anonymous access for testing. if not api_key: - return "test" + g.auth_client = "anonymous" + return "anonymous" - for service, details in config['api-keys'].items(): + for client, details in config['api-keys'].items(): if details.get('api-key') == api_key: - g.auth_service = service - return service + g.auth_client = client + return client return None +def authorize(*, allowed_clients): + """Decorator to restrict route access to specific clients.""" + if not isinstance(allowed_clients, (list, tuple)): + allowed_clients = [allowed_clients] # Convert single client to list + def decorator(f): + @wraps(f) + def wrapped(*args, **kwargs): + client = g.get("auth_client") + + if not client: + return jsonify({"error": "Unauthorized"}), 403 + + if client not in allowed_clients: + # Anonymous clients are allowed to access any resource without providing an API key + # TODO: Only for testing, should be removed in Production + if client != "anonymous": + return jsonify({"error": "Forbidden"}), 403 + + return f(*args, **kwargs) + + return wrapped + return decorator \ No newline at end of file diff --git a/inventory_provider/routes/msr.py b/inventory_provider/routes/msr.py index ac2aa9b98afff35c6ea537e8f8ccad1853769573..fe40bca8378ee65031179f1f0b176244ee0f1088 100644 --- a/inventory_provider/routes/msr.py +++ b/inventory_provider/routes/msr.py @@ -118,6 +118,7 @@ from inventory_provider.routes.common import _ignore_cache_or_retrieve, \ ims_equipment_to_hostname from inventory_provider.routes.poller import get_services from inventory_provider.tasks import common as tasks_common +from inventory_provider.auth import authorize routes = Blueprint('msr-query-routes', __name__) logger = logging.getLogger(__name__) @@ -1447,6 +1448,7 @@ def _asn_peers(asn, group, instance): @routes.route('/asn-peers', methods=['GET'], defaults={'asn': None}) @routes.route('/asn-peers/<int:asn>', methods=['GET']) @common.require_accepts_json +@authorize(allowed_clients="reporting") def asn_peers_get(asn): """ cf. doc for _asn_peers diff --git a/inventory_provider/routes/testing.py b/inventory_provider/routes/testing.py index 5fcb7cce47a48eaa86ac4794d098448929ff69e4..c43dccb4b145876f436e6a4a8d05eb7e663656ba 100644 --- a/inventory_provider/routes/testing.py +++ b/inventory_provider/routes/testing.py @@ -12,6 +12,7 @@ from inventory_provider import juniper from inventory_provider.routes import common from inventory_provider.tasks import worker from inventory_provider.tasks import common as worker_common +from inventory_provider.auth import authorize routes = Blueprint("inventory-data-testing-support-routes", __name__) @@ -110,6 +111,7 @@ def routers_from_config_dir(): @routes.route("latchdb", methods=['GET']) +@authorize(allowed_clients=("brian", "dashboard")) def latch_db(): config = current_app.config["INVENTORY_PROVIDER_CONFIG"] worker_common.latch_db(config)