Skip to content
Snippets Groups Projects
Commit 53123d2d authored by JORGE SASIAIN's avatar JORGE SASIAIN
Browse files

NAT-152: fix all tox errors except lines too long

parent e5b12bd6
No related branches found
No related tags found
1 merge request!9Ipam service
import ipaddress
import json
import random
import requests
from enum import Enum
from pydantic import BaseSettings
......@@ -37,9 +36,10 @@ class HostAddresses(BaseSettings):
class IPAMErrors(Enum):
# HTTP error code, match in error message
# HTTP error code, match in error message
CONTAINER_FULL = 400, "Can not find requested number of networks"
# TODO: remove this!
# lab infoblox cert is not valid for the ipv4 address
# ... disable warnings for now
......@@ -49,10 +49,12 @@ requests.packages.urllib3.disable_warnings()
def _match_error_code(response, error_code):
return response.status_code == error_code.value[0] and error_code.value[1] in response.text
def _wapi(infoblox_params: settings.InfoBloxParams):
return (f'https://{infoblox_params.host}'
f'/wapi/{infoblox_params.wapi_version}')
def _ip_addr_version(addr):
ip_version = None
ip_addr = ipaddress.ip_address(addr)
......@@ -63,6 +65,7 @@ def _ip_addr_version(addr):
assert ip_version in [4, 6]
return ip_version
def _ip_network_version(network):
ip_version = None
ip_network = ipaddress.ip_network(network)
......@@ -73,6 +76,7 @@ def _ip_network_version(network):
assert ip_version in [4, 6]
return ip_version
def _find_networks(network_container=None, network=None, ip_version=4):
"""
If network_container is not None, find all networks within the specified container.
......@@ -86,9 +90,9 @@ def _find_networks(network_container=None, network=None, ip_version=4):
endpoint = 'network' if ip_version == 4 else 'ipv6network'
params = None
if network_container:
params={'network_container': network_container}
params = {'network_container': network_container}
elif network:
params={'network': network}
params = {'network': network}
r = requests.get(
f'{_wapi(infoblox_params)}/{endpoint}',
params=params,
......@@ -99,6 +103,7 @@ def _find_networks(network_container=None, network=None, ip_version=4):
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
return r.json()
def _get_network_capacity(network=None):
"""
Get utilization of a IPv4 network in a fraction of 1000.
......@@ -109,7 +114,7 @@ def _get_network_capacity(network=None):
ip_version = _ip_network_version(network)
assert ip_version == 4, "Utilization is only available for IPv4 networks."
params={'network': network, '_return_fields': 'network,total_hosts,utilization'}
params = {'network': network, '_return_fields': 'network,total_hosts,utilization'}
r = requests.get(
f'{_wapi(infoblox_params)}/network',
......@@ -141,7 +146,7 @@ def _allocate_network(infoblox_params: settings.InfoBloxParams, network_params:
"_object_parameters": {
"network": str(network_params.containers[0])
},
"_result_field": "networks", # only return in the response the allocated network, not all available
"_result_field": "networks", # only return in the response the allocated network, not all available
}
}
......@@ -167,11 +172,12 @@ def _allocate_network(infoblox_params: settings.InfoBloxParams, network_params:
assert 'network' in r.json()
allocated_network = r.json()['network']
if ip_version==4:
if ip_version == 4:
return V4ServiceNetwork(v4=allocated_network)
else:
return V6ServiceNetwork(v6=allocated_network)
def allocate_service_ipv4_network(service_type) -> V4ServiceNetwork:
"""
Allocate IPv4 network within the container of the specified service type.
......@@ -182,6 +188,7 @@ def allocate_service_ipv4_network(service_type) -> V4ServiceNetwork:
assert hasattr(ipam_params, service_type) and service_type != 'INFOBLOX', "Invalid service type."
return _allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4)
def allocate_service_ipv6_network(service_type) -> V6ServiceNetwork:
"""
Allocate IPv6 network within the container of the specified service type.
......@@ -192,6 +199,7 @@ def allocate_service_ipv6_network(service_type) -> V6ServiceNetwork:
assert hasattr(ipam_params, service_type) and service_type != 'INFOBLOX', "Invalid service type."
return _allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V6, 6)
def _find_next_available_ip(infoblox_params, network_ref):
r = requests.post(
f'{_wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1',
......@@ -205,6 +213,7 @@ def _find_next_available_ip(infoblox_params, network_ref):
assert len(received_ip) == 1
return received_ip[0]
def _allocate_host(hostname=None, addr=None, network=None) -> Union[V4HostAddress, V6HostAddress]:
"""
If network is not None, allocate host in that network.
......@@ -249,20 +258,13 @@ def _allocate_host(hostname=None, addr=None, network=None) -> Union[V4HostAddres
assert isinstance(r.json(), str)
assert r.json().startswith("record:host/")
a_record_payload = {
"ipv4addr": addr,
"name": hostname,
"view": "default"
}
aaaa_record_payload = {
"ipv6addr": addr,
dns_req_payload = {
f"ipv{ip_req_payload}addr": addr,
"name": hostname,
"view": "default"
}
endpoint = 'record:a' if ip_version == 4 else 'record:aaaa'
dns_req_payload = a_record_payload if ip_version == 4 else aaaa_record_payload
r = requests.post(
f'{_wapi(infoblox_params)}/{endpoint}',
......@@ -279,6 +281,7 @@ def _allocate_host(hostname=None, addr=None, network=None) -> Union[V4HostAddres
else:
return V6HostAddress(v6=addr)
def allocate_service_host(hostname=None, service_type=None, service_networks: ServiceNetworks = None, host_addresses: HostAddresses = None) -> HostAddresses:
"""
Allocate host with both IPv4 and IPv6 address (and respective DNS records).
......@@ -323,7 +326,6 @@ def allocate_service_host(hostname=None, service_type=None, service_networks: Se
assert any(addr in ipv4_container for ipv4_container in ipv4_containers)
v4_host = _allocate_host(hostname=hostname+domain_name, addr=str(addr))
# IPv6
if not service_networks and not host_addresses:
# ipv6 does not support capacity fetching (not even the GUI displays it).
......@@ -342,12 +344,14 @@ def allocate_service_host(hostname=None, service_type=None, service_networks: Se
assert any(addr in ipv6_container for ipv6_container in ipv6_containers)
v6_host = _allocate_host(hostname=hostname+domain_name, addr=str(addr))
return HostAddresses(v4=v4_host.v4,v6=v6_host.v6)
return HostAddresses(v4=v4_host.v4, v6=v6_host.v6)
"""
Below methods are not used for supported outside calls
"""
def _find_containers(network=None, ip_version=4):
"""
If network is not None, find that container.
......@@ -367,6 +371,7 @@ def _find_containers(network=None, ip_version=4):
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
return r.json()
def _delete_network(network) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
"""
Delete IPv4 or IPv6 network by CIDR.
......@@ -398,6 +403,7 @@ def _delete_network(network) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
else:
return V6ServiceNetwork(v6=network_address)
def _delete_host_by_ip(addr) -> Union[V4HostAddress, V6HostAddress]:
"""
Delete IPv4 or IPv6 host by its address.
......@@ -455,6 +461,7 @@ def _delete_host_by_ip(addr) -> Union[V4HostAddress, V6HostAddress]:
else:
return V6HostAddress(v6=addr)
if __name__ == '__main__':
while True:
print("1. Find all containers")
......
import ipaddress
from pydantic import BaseSettings
from gso import settings
import _ipam
......@@ -63,4 +62,4 @@ if __name__ == '__main__':
# h1-h2 trunk
trunk12_service_networks = new_service_networks('TRUNK')
new_service_host(hostname_A, 'TRUNK', trunk12_service_networks)
new_service_host(hostname_B, 'TRUNK', trunk12_service_networks)
\ No newline at end of file
new_service_host(hostname_B, 'TRUNK', trunk12_service_networks)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment