Skip to content
Snippets Groups Projects
Unverified Commit 560a88b6 authored by Adeel Ahmad's avatar Adeel Ahmad
Browse files

Create decorator to allow per route authorisation

parent 322fa06f
No related branches found
No related tags found
1 merge request!50Dboard3 1142/token auth
......@@ -3,7 +3,7 @@ automatically invoked app factory
"""
import logging
import os
from flask import g, Flask, request, jsonify
from flask import Flask
from flask_cors import CORS
from inventory_provider import environment
......@@ -53,25 +53,7 @@ def create_app(setup_logging=True):
@app.before_request
@auth.login_required
def secure_before_request():
"""Enforces authentication for all routes"""
client = g.get("auth_service")
if not client:
# This allows clients to access any resource without providing an API key
# TODO: Only for testing, should be removed in Production
return
# return jsonify({"error": "Unauthorized"}), 403
CLIENT_PERMISSIONS = {
"serviceA": ["msr"],
"serviceB": ["testing"],
}
allowed_routes = CLIENT_PERMISSIONS.get(client, [])
route = request.path.strip("/").split("/")[0]
if route not in allowed_routes:
return jsonify({"error": "Forbidden"}), 403
pass
# IMS based routes
......
from flask import Blueprint, current_app, g
from flask import current_app, g, jsonify
from flask_httpauth import HTTPTokenAuth
from functools import wraps
auth = HTTPTokenAuth(scheme="ApiKey")
......@@ -8,11 +9,34 @@ def verify_api_key(api_key):
config = current_app.config["INVENTORY_PROVIDER_CONFIG"]
# This is to enable anonymous access for testing.
if not api_key:
return "test"
g.auth_client = "anonymous"
return "anonymous"
for service, details in config['api-keys'].items():
for client, details in config['api-keys'].items():
if details.get('api-key') == api_key:
g.auth_service = service
return service
g.auth_client = client
return client
return None
def authorize(*, allowed_clients):
"""Decorator to restrict route access to specific clients."""
if not isinstance(allowed_clients, (list, tuple)):
allowed_clients = [allowed_clients] # Convert single client to list
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
client = g.get("auth_client")
if not client:
return jsonify({"error": "Unauthorized"}), 403
if client not in allowed_clients:
# Anonymous clients are allowed to access any resource without providing an API key
# TODO: Only for testing, should be removed in Production
if client != "anonymous":
return jsonify({"error": "Forbidden"}), 403
return f(*args, **kwargs)
return wrapped
return decorator
\ No newline at end of file
......@@ -118,6 +118,7 @@ from inventory_provider.routes.common import _ignore_cache_or_retrieve, \
ims_equipment_to_hostname
from inventory_provider.routes.poller import get_services
from inventory_provider.tasks import common as tasks_common
from inventory_provider.auth import authorize
routes = Blueprint('msr-query-routes', __name__)
logger = logging.getLogger(__name__)
......@@ -1447,6 +1448,7 @@ def _asn_peers(asn, group, instance):
@routes.route('/asn-peers', methods=['GET'], defaults={'asn': None})
@routes.route('/asn-peers/<int:asn>', methods=['GET'])
@common.require_accepts_json
@authorize(allowed_clients="reporting")
def asn_peers_get(asn):
"""
cf. doc for _asn_peers
......
......@@ -12,6 +12,7 @@ from inventory_provider import juniper
from inventory_provider.routes import common
from inventory_provider.tasks import worker
from inventory_provider.tasks import common as worker_common
from inventory_provider.auth import authorize
routes = Blueprint("inventory-data-testing-support-routes", __name__)
......@@ -110,6 +111,7 @@ def routers_from_config_dir():
@routes.route("latchdb", methods=['GET'])
@authorize(allowed_clients=("brian", "dashboard"))
def latch_db():
config = current_app.config["INVENTORY_PROVIDER_CONFIG"]
worker_common.latch_db(config)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment