Skip to content
Snippets Groups Projects
Commit 96b3f7be authored by JORGE SASIAIN's avatar JORGE SASIAIN
Browse files

NAT-212/213: tox

parent 80f02509
No related branches found
No related tags found
1 merge request!52Feature/nat 212 213
Pipeline #83763 failed
...@@ -138,11 +138,18 @@ def assert_network_in_service( ...@@ -138,11 +138,18 @@ def assert_network_in_service(
def find_networks(network_container: Optional[str] = "", network: Optional[str] = "", ip_version: int = 4): def find_networks(network_container: Optional[str] = "", network: Optional[str] = "", ip_version: int = 4):
"""If network_container is not None, find all networks within the specified container. """Get all networks optinally filtering by a container or by a network.
Otherwise, if network is not None, find the specified network.
Otherwise find all networks. Args:
A list of all found networks is returned (an HTTP 200 code ----
may be returned with an empty list). network_container (str, optional): container to filter by
network (str, optional): network to filter by
ip_version (int): 4 or 6
Returns:
-------
(list) all found networks mathing the args, which may be empty.
""" """
assert ip_version in [4, 6] assert ip_version in [4, 6]
oss = settings.load_oss_params() oss = settings.load_oss_params()
...@@ -158,7 +165,7 @@ def find_networks(network_container: Optional[str] = "", network: Optional[str] ...@@ -158,7 +165,7 @@ def find_networks(network_container: Optional[str] = "", network: Optional[str]
f"{wapi(infoblox_params)}/{endpoint}", f"{wapi(infoblox_params)}/{endpoint}",
params=params, params=params,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT timeout=REQUESTS_TIMEOUT
) )
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
...@@ -205,7 +212,7 @@ def allocate_network_inner( ...@@ -205,7 +212,7 @@ def allocate_network_inner(
json=req_payload, json=req_payload,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
headers={"content-type": "application/json"}, headers={"content-type": "application/json"},
verify=False, verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT timeout=REQUESTS_TIMEOUT
) )
if not match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL): if not match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL):
...@@ -226,7 +233,19 @@ def allocate_network_inner( ...@@ -226,7 +233,19 @@ def allocate_network_inner(
def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V4ServiceNetwork: def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V4ServiceNetwork:
"""Allocate IPv4 network within the container of the specified service type.""" """Allocate IPv4 network within the container of the specified service type.
Args:
----
service_type (str): the name of the service type (e.g. "TRUNK")
comment (str, optional): a custom comment to write in the comment field in IPAM
extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"})
Returns:
-------
(V4ServiceNetwork): the allocated network
"""
if extattrs is None: if extattrs is None:
extattrs = {} extattrs = {}
oss = settings.load_oss_params() oss = settings.load_oss_params()
...@@ -237,7 +256,19 @@ def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", e ...@@ -237,7 +256,19 @@ def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", e
def allocate_ipv6_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V6ServiceNetwork: def allocate_ipv6_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V6ServiceNetwork:
"""Allocate IPv6 network within the container of the specified service type.""" """Allocate IPv6 network within the container of the specified service type.
Args:
----
service_type (str): the name of the service type (e.g. "TRUNK")
comment (str, optional): a custom comment to write in the comment field in IPAM
extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"})
Returns:
-------
(V4ServiceNetwork): the allocated network
"""
if extattrs is None: if extattrs is None:
extattrs = {} extattrs = {}
oss = settings.load_oss_params() oss = settings.load_oss_params()
...@@ -266,13 +297,21 @@ def allocate_networks( ...@@ -266,13 +297,21 @@ def allocate_networks(
def find_next_available_ip(infoblox_params, network_ref: str = ""): def find_next_available_ip(infoblox_params, network_ref: str = ""):
"""Find the next available IP address from a network given its ref. """Find the next available IP address from a network given its ref.
Returns "NETWORK_FULL" if there's no space in the network.
Otherwise returns the next available IP address in the network. Args:
----
infoblox_params (settings.InfoBloxParams): infoblox params
network_ref (str): the network to find the next available ip, in InfoBlox reference format
Returns:
-------
(str): next available ip in the network, or "NETWORK_FULL" if there's no space in the network
""" """
r = requests.post( r = requests.post(
f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1", f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1",
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT timeout=REQUESTS_TIMEOUT
) )
...@@ -286,7 +325,7 @@ def find_next_available_ip(infoblox_params, network_ref: str = ""): ...@@ -286,7 +325,7 @@ def find_next_available_ip(infoblox_params, network_ref: str = ""):
return received_ip[0] return received_ip[0]
def allocate_host_inner( def allocate_host_inner( # noqa: max-complexity=15
hostname: str = "", hostname: str = "",
addrs: Optional[Tuple] = None, addrs: Optional[Tuple] = None,
networks: Optional[Tuple] = None, networks: Optional[Tuple] = None,
...@@ -294,14 +333,7 @@ def allocate_host_inner( ...@@ -294,14 +333,7 @@ def allocate_host_inner(
dns_view: Optional[str] = "default", dns_view: Optional[str] = "default",
extattrs: Optional[dict] = None extattrs: Optional[dict] = None
) -> Union[HostAddresses, str]: ) -> Union[HostAddresses, str]:
"""If networks is not None, allocate host in those networks.
Otherwise if addrs is not None, allocate host with those addresses.
hostname parameter must be full name including domain name.
Return "IPV4_NETWORK_FULL" or "IPV6_NETWORK_FULL"
if couldn't allocate host due to requested network being full.
Return "IPV4_NETWORK_NOT_FOUND" or "IPV6_NETWORK_NOT_FOUND"
if couldn't allocate host due to requested network not existing.
"""
# TODO: should hostnames be unique # TODO: should hostnames be unique
# (i.e. fail if hostname already exists in this domain/service)? # (i.e. fail if hostname already exists in this domain/service)?
if cname_aliases is None: if cname_aliases is None:
...@@ -313,6 +345,7 @@ def allocate_host_inner( ...@@ -313,6 +345,7 @@ def allocate_host_inner(
assert oss.IPAM.INFOBLOX assert oss.IPAM.INFOBLOX
infoblox_params = oss.IPAM.INFOBLOX infoblox_params = oss.IPAM.INFOBLOX
# If networks is not None, allocate host in those networks.
if networks: if networks:
ipv4_network = networks[0] ipv4_network = networks[0]
ipv6_network = networks[1] ipv6_network = networks[1]
...@@ -320,6 +353,7 @@ def allocate_host_inner( ...@@ -320,6 +353,7 @@ def allocate_host_inner(
assert ip_network_version(ipv6_network) == 6 assert ip_network_version(ipv6_network) == 6
# Find the next available IP address in each network # Find the next available IP address in each network
# If requested error doesn't exist, return error
network_info = find_networks(network=ipv4_network, ip_version=4) network_info = find_networks(network=ipv4_network, ip_version=4)
if len(network_info) != 1: if len(network_info) != 1:
return "IPV4_NETWORK_NOT_FOUND" return "IPV4_NETWORK_NOT_FOUND"
...@@ -339,12 +373,14 @@ def allocate_host_inner( ...@@ -339,12 +373,14 @@ def allocate_host_inner(
if ipv6_addr == "NETWORK_FULL": if ipv6_addr == "NETWORK_FULL":
return "IPV6_NETWORK_FULL" return "IPV6_NETWORK_FULL"
# Otherwise if addrs is not None, allocate host with those addresses.
else: else:
ipv4_addr = addrs[0] ipv4_addr = addrs[0]
ipv6_addr = addrs[1] ipv6_addr = addrs[1]
assert ip_addr_version(ipv4_addr) == 4 assert ip_addr_version(ipv4_addr) == 4
assert ip_addr_version(ipv6_addr) == 6 assert ip_addr_version(ipv6_addr) == 6
# hostname parameter must be full name including domain name.
req_payload = { req_payload = {
"ipv4addrs": [{"ipv4addr": ipv4_addr}], "ipv4addrs": [{"ipv4addr": ipv4_addr}],
"ipv6addrs": [{"ipv6addr": ipv6_addr}], "ipv6addrs": [{"ipv6addr": ipv6_addr}],
...@@ -358,7 +394,7 @@ def allocate_host_inner( ...@@ -358,7 +394,7 @@ def allocate_host_inner(
f"{wapi(infoblox_params)}/record:host", f"{wapi(infoblox_params)}/record:host",
json=req_payload, json=req_payload,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT timeout=REQUESTS_TIMEOUT
) )
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
...@@ -374,7 +410,7 @@ def allocate_host_inner( ...@@ -374,7 +410,7 @@ def allocate_host_inner(
f"{wapi(infoblox_params)}/record:cname", f"{wapi(infoblox_params)}/record:cname",
json=cname_req_payload, json=cname_req_payload,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT timeout=REQUESTS_TIMEOUT
) )
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
...@@ -383,29 +419,29 @@ def allocate_host_inner( ...@@ -383,29 +419,29 @@ def allocate_host_inner(
return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), v6=ipaddress.ip_address(ipv6_addr)) return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), v6=ipaddress.ip_address(ipv6_addr))
def allocate_host( def allocate_host( # noqa: max-complexity=15
hostname: str = "", hostname: str = "",
service_type: str = "", service_type: str = "",
service_networks: Optional[ServiceNetworks] = None, service_networks: Optional[ServiceNetworks] = None,
host_addresses: Optional[ServiceNetworks] = None, host_addresses: Optional[HostAddresses] = None,
cname_aliases: Optional[list] = None, cname_aliases: Optional[list] = None,
extattrs: Optional[dict] = None, extattrs: Optional[dict] = None,
) -> HostAddresses: ) -> HostAddresses:
"""Allocate host record with both IPv4 and IPv6 address, and respective DNS """Allocate host record with both IPv4 and IPv6 address, and respective DNS A and AAAA records.
A and AAAA records.
- If service_networks is provided, and it's in a valid container, Args:
that one is used. ----
- If service_networks is not provided, and host_addresses is provided, hostname (str): hostname of the host (without domain name, which is taken from the service type)
those specific addresses are used. service_type (str): the name of the service type (e.g. "TRUNK")
- If neither is provided: service_networks (ServiceNetworks, optional): ipv4 and ipv6 network to allocate host, must be in a valid container for the service type
- If service has configured containers, new ipv4 and ipv6 networks are host_addresses (HostAddresses, optional): ipv4 and ipv6 addresses to allocate host (service_networks has precedence)
created and those are used. Note that in this case extattrs is for the cname_aliases (list, optional): to create cname records in addition to the host record
hosts and not for the networks. extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"})
- If service doesn't have configured containers and has configured
networks instead, the configured networks are used (they are filled up Returns:
in order of appearance in the configuration file). -------
The domain name is taken from the service type and appended to the (HostAddresses): ipv4 and ipv6 addresses of the allocated host
specified hostname.
""" """
if cname_aliases is None: if cname_aliases is None:
cname_aliases = [] cname_aliases = []
...@@ -432,6 +468,11 @@ def allocate_host( ...@@ -432,6 +468,11 @@ def allocate_host(
if cname_aliases: if cname_aliases:
cname_aliases = [alias + domain_name for alias in cname_aliases] cname_aliases = [alias + domain_name for alias in cname_aliases]
# When neither service_networks not host_addresses are provided:
# If service has configured containers, new ipv4 and ipv6 networks are created and those are used.
# Note that in this case extattrs is for the hosts and not for the networks.
# If service doesn't have configured containers and has configured networks instead, the configured
# networks are used (they are filled up in order of appearance in the configuration file).
if not service_networks and not host_addresses: if not service_networks and not host_addresses:
if oss_ipv4_containers and oss_ipv6_containers: if oss_ipv4_containers and oss_ipv6_containers:
# This service has configured containers. # This service has configured containers.
...@@ -555,7 +596,7 @@ def delete_network( ...@@ -555,7 +596,7 @@ def delete_network(
r = requests.delete( r = requests.delete(
f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}', f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}',
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT timeout=REQUESTS_TIMEOUT
) )
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
...@@ -576,6 +617,7 @@ def delete_host( ...@@ -576,6 +617,7 @@ def delete_host(
service_type: str = "" service_type: str = ""
) -> HostAddresses: ) -> HostAddresses:
"""Delete host record and associated CNAME records. """Delete host record and associated CNAME records.
All arguments passed to this function must match together a host record in All arguments passed to this function must match together a host record in
IPAM, and all CNAME records associated to it must also be passed exactly. IPAM, and all CNAME records associated to it must also be passed exactly.
""" """
...@@ -617,7 +659,7 @@ def delete_host( ...@@ -617,7 +659,7 @@ def delete_host(
"view": dns_view, "view": dns_view,
}, },
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT timeout=REQUESTS_TIMEOUT
) )
host_data = r.json() host_data = r.json()
...@@ -633,7 +675,7 @@ def delete_host( ...@@ -633,7 +675,7 @@ def delete_host(
"view": dns_view, "view": dns_view,
}, },
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT timeout=REQUESTS_TIMEOUT
) )
cname_data = r.json() cname_data = r.json()
...@@ -645,7 +687,7 @@ def delete_host( ...@@ -645,7 +687,7 @@ def delete_host(
r = requests.delete( r = requests.delete(
f"{wapi(infoblox_params)}/{host_ref}", f"{wapi(infoblox_params)}/{host_ref}",
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT timeout=REQUESTS_TIMEOUT
) )
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
...@@ -656,7 +698,7 @@ def delete_host( ...@@ -656,7 +698,7 @@ def delete_host(
r = requests.delete( r = requests.delete(
f"{wapi(infoblox_params)}/{cname_ref}", f"{wapi(infoblox_params)}/{cname_ref}",
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT timeout=REQUESTS_TIMEOUT
) )
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
...@@ -669,8 +711,8 @@ def validate_network( ...@@ -669,8 +711,8 @@ def validate_network(
network: ipaddress.ip_network = None network: ipaddress.ip_network = None
) -> Union[V4ServiceNetwork, V6ServiceNetwork]: ) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
"""Validate IPv4 or IPv6 network. """Validate IPv4 or IPv6 network.
Check if the specified network exist, and, if it does,
check if its comment field contains gso_subscription_id. Check if the specified network exist, and, if it does, check if its comment field contains gso_subscription_id.
Returns the network if validation successful. Returns the network if validation successful.
""" """
assert network, "No network specified to validate." assert network, "No network specified to validate."
...@@ -693,6 +735,7 @@ def validate_host( ...@@ -693,6 +735,7 @@ def validate_host(
service_type: str = "" service_type: str = ""
) -> HostAddresses: ) -> HostAddresses:
"""Validate host. """Validate host.
Check if all arguments passed to this function match together a host record in Check if all arguments passed to this function match together a host record in
IPAM, and all CNAME records associated to it also match exactly. IPAM, and all CNAME records associated to it also match exactly.
Returns the host if validation successful. Returns the host if validation successful.
...@@ -721,7 +764,7 @@ def validate_host( ...@@ -721,7 +764,7 @@ def validate_host(
"view": dns_view, "view": dns_view,
}, },
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT timeout=REQUESTS_TIMEOUT
) )
host_data = r.json() host_data = r.json()
...@@ -736,7 +779,7 @@ def validate_host( ...@@ -736,7 +779,7 @@ def validate_host(
"view": dns_view, "view": dns_view,
}, },
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT timeout=REQUESTS_TIMEOUT
) )
cname_data = r.json() cname_data = r.json()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment