Skip to content
Snippets Groups Projects
Commit 962dce77 authored by Erik Reid's avatar Erik Reid
Browse files

dummy resource_manager api & tests

parent aee4f076
Branches
Tags
No related merge requests found
# mypy: ignore-errors
import requests
from typing import List
from enum import Enum, auto
from gso import settings
# TODO
# - fill in the implementations
# - consider the additional api methods
# - decided what to do with various error conditions (currently assertions)
def import_new_router(router_name: str, oss_params=settings.OSSParams) -> None:
r = requests.post(
f"{oss_params.RESOURCE_MANAGER_API_PREFIX}" f"/api/interfaces/initialize-router/{router_name}", timeout=10000
)
r.raise_for_status()
class InterfaceAllocationState(Enum):
AVAILABLE = auto()
RESERVED = auto()
ALLOCATED = auto()
def next_lag(router_name: str, oss_params=settings.OSSParams) -> dict:
r = requests.post(
f"{oss_params.RESOURCE_MANAGER_API_PREFIX}" f"/api/interfaces/next-lag/{router_name}", timeout=10000
)
r.raise_for_status()
response = r.json()
return response["name"]
def _dummy_router_interfaces():
return {
'lags': [],
'physical': [
{
'name': f'ifc-{x}',
'state': InterfaceAllocationState.AVAILABLE
} for x in range(250)]
}
def next_physical(router_name: str, lag_name: str, oss_params=settings.OSSParams) -> dict:
# TODO: speed needed (if first interface)
r = requests.post(
f"{oss_params.RESOURCE_MANAGER_API_PREFIX}" f"/api/interfaces/next-physical/{router_name}/{lag_name}",
timeout=10000,
)
r.raise_for_status()
response = r.json()
return response["name"]
_DUMMY_INVENTORY = {
'fqdn-a': _dummy_router_interfaces(),
'fqdn-b': _dummy_router_interfaces(),
'fqdn-c': _dummy_router_interfaces(),
'fqdn-d': _dummy_router_interfaces()
}
def import_new_router(
new_router_fqdn: str, oss_params=settings.OSSParams):
# TODO: this is a dummy implementation
# TODO: specifiy if this should be an error (and if now, what it means)
assert new_router_fqdn not in _DUMMY_INVENTORY
_DUMMY_INVENTORY[new_router_fqdn] = _dummy_router_interfaces()
def next_lag(router_fqdn: str, oss_params=settings.OSSParams) -> str:
# TODO: this is a dummy implementation
assert router_fqdn in _DUMMY_INVENTORY
lag_idx = 0
while True:
lag_name = f'ae-{lag_idx}'
if lag_name not in _DUMMY_INVENTORY[router_fqdn]['lags']:
_DUMMY_INVENTORY[router_fqdn]['lags'].append(lag_name)
return lag_name
lag_idx += 1
def available_physical_interfaces(
router_fqdn: str, oss_params=settings.OSSParams) -> List[str]:
# TODO: this is a dummy implementation
assert router_fqdn in _DUMMY_INVENTORY
return [
ifc['name'] for ifc in _DUMMY_INVENTORY[router_fqdn]['physical']
if ifc['state'] == InterfaceAllocationState.AVAILABLE
]
def _find_physical(router_fqdn: str, interface_name: str) -> dict:
assert router_fqdn in _DUMMY_INVENTORY
for ifc in _DUMMY_INVENTORY[router_fqdn]['physical']:
if ifc['name'] == interface_name:
return ifc
assert False, f'interface {interface_name} not found on {router_fqdn}'
def reserve_physical_interface(
router_fqdn: str,
interface_name: str,
oss_params=settings.OSSParams):
# TODO: this is a dummy implementation
ifc = _find_physical(router_fqdn, interface_name)
assert ifc['state'] == InterfaceAllocationState.AVAILABLE, \
f'interface {router_fqdn}:{interface_name} is not available'
ifc['state'] = InterfaceAllocationState.RESERVED
def allocate_physical_interface(
router_fqdn: str,
interface_name: str,
oss_params=settings.OSSParams):
# TODO: this is a dummy implementation
ifc = _find_physical(router_fqdn, interface_name)
# TODO: is there a use case for moving
# directly from AVAILABLE to ALLOCATED?
assert ifc['state'] == InterfaceAllocationState.RESERVED, \
f'interface {router_fqdn}:{interface_name} is not reserved'
ifc['state'] = InterfaceAllocationState.RESERVED
def free_physical_interface(
router_fqdn: str,
interface_name: str,
oss_params=settings.OSSParams):
# TODO: this is a dummy implementation
ifc = _find_physical(router_fqdn, interface_name)
# TODO: is this really an error that should be handled?
# ... or is it ok to ignore this?
assert ifc['state'] != InterfaceAllocationState.AVAILABLE, \
f'interface {router_fqdn}:{interface_name} is already available'
ifc['state'] = InterfaceAllocationState.AVAILABLE
def free_lag(
router_fqdn: str,
lag_name: str,
oss_params=settings.OSSParams):
# TODO: is this a use case that should be handled?
# e.g. who keeps track of the bundled physical interfaces?
pass
def remove_router(
router_fqdn: str, oss_params=settings.OSSParams):
# TODO: is this a use case that should be handled?
pass
def all_lags(
router_fqdn: str, oss_params=settings.OSSParams) -> List[str]:
# TODO: is this a use case that should be handled?
assert router_fqdn in _DUMMY_INVENTORY
return _DUMMY_INVENTORY[router_fqdn]['lags']
import random
import string
from gso.services import resource_manager
def _random_string(
n=None,
letters=string.ascii_letters + string.digits + string.punctuation):
if not n:
n = random.randint(1, 20)
return ''.join(random.choices(letters, k=n))
def test_new_router():
router_name = _random_string(10)
assert router_name not in resource_manager._DUMMY_INVENTORY
resource_manager.import_new_router(new_router_fqdn=router_name)
assert router_name in resource_manager._DUMMY_INVENTORY
def test_new_lag():
router_name = list(resource_manager._DUMMY_INVENTORY.keys())[0]
new_lags = {
resource_manager.next_lag(router_fqdn=router_name)
for _ in range(10)
}
assert len(new_lags) == 10
assert new_lags <= set(
resource_manager._DUMMY_INVENTORY[router_name]['lags'])
def test_physical_allocation_lifecycle_happy():
router_name = list(resource_manager._DUMMY_INVENTORY.keys())[0]
def _interfaces():
return resource_manager.available_physical_interfaces(
router_fqdn=router_name)
initial_available = _interfaces()
interface_name = initial_available[0]
resource_manager.reserve_physical_interface(
router_fqdn=router_name, interface_name=interface_name)
current_available = _interfaces()
assert interface_name not in current_available
resource_manager.allocate_physical_interface(
router_fqdn=router_name, interface_name=interface_name)
resource_manager.free_physical_interface(
router_fqdn=router_name, interface_name=interface_name)
current_available = _interfaces()
assert interface_name in current_available
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment