Skip to content
Snippets Groups Projects
Commit f4fb59ad authored by JORGE SASIAIN's avatar JORGE SASIAIN
Browse files

NAT-212/213: Make tox (black) sotp failing

parent 5ed5cb0d
No related branches found
No related tags found
1 merge request!52Feature/nat 212 213
Pipeline #83771 failed
......@@ -84,10 +84,10 @@ def ip_network_version(network: str = ""):
def assert_host_in_service(
ipv4_addr: str = "",
ipv6_addr: str = "",
oss_ipv4_containers = None,
oss_ipv6_containers = None,
oss_ipv4_networks = None,
oss_ipv6_networks = None
oss_ipv4_containers=None,
oss_ipv6_containers=None,
oss_ipv4_networks=None,
oss_ipv6_networks=None,
):
# IPv4
if oss_ipv4_containers:
......@@ -113,10 +113,10 @@ def assert_host_in_service(
def assert_network_in_service(
ipv4_network: Optional[V4ServiceNetwork] = None,
ipv6_network: Optional[V6ServiceNetwork] = None,
oss_ipv4_containers = None,
oss_ipv6_containers = None,
oss_ipv4_networks = None,
oss_ipv6_networks = None
oss_ipv4_containers=None,
oss_ipv6_containers=None,
oss_ipv4_networks=None,
oss_ipv6_networks=None,
):
# IPv4
if ipv4_network:
......@@ -166,7 +166,7 @@ def find_networks(network_container: Optional[str] = "", network: Optional[str]
params=params,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
return r.json()
......@@ -213,7 +213,7 @@ def allocate_network_inner(
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
headers={"content-type": "application/json"},
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
if not match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL):
break
......@@ -232,7 +232,9 @@ def allocate_network_inner(
return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network))
def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V4ServiceNetwork:
def allocate_ipv4_network(
service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None
) -> V4ServiceNetwork:
"""Allocate IPv4 network within the container of the specified service type.
Args:
......@@ -255,7 +257,9 @@ def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", e
return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs)
def allocate_ipv6_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V6ServiceNetwork:
def allocate_ipv6_network(
service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None
) -> V6ServiceNetwork:
"""Allocate IPv6 network within the container of the specified service type.
Args:
......@@ -279,19 +283,13 @@ def allocate_ipv6_network(service_type: str = "", comment: Optional[str] = "", e
def allocate_networks(
service_type: str = "",
comment: Optional[str] = "",
extattrs: Optional[dict] = None
service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None
) -> ServiceNetworks:
"""Allocate IPv4 and IPv6 network for the specified service type."""
if extattrs is None:
extattrs = {}
v4_service_network = allocate_ipv4_network(
service_type=service_type, comment=comment, extattrs=extattrs
)
v6_service_network = allocate_ipv6_network(
service_type=service_type, comment=comment, extattrs=extattrs
)
v4_service_network = allocate_ipv4_network(service_type=service_type, comment=comment, extattrs=extattrs)
v6_service_network = allocate_ipv6_network(service_type=service_type, comment=comment, extattrs=extattrs)
return ServiceNetworks(v4=v4_service_network.v4, v6=v6_service_network.v6)
......@@ -312,7 +310,7 @@ def find_next_available_ip(infoblox_params, network_ref: str = ""):
f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1",
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
if match_error_code(response=r, error_code=IPAMErrors.NETWORK_FULL):
......@@ -331,9 +329,8 @@ def allocate_host_inner( # noqa: C901
networks: Optional[Tuple] = None,
cname_aliases: Optional[list] = None,
dns_view: Optional[str] = "default",
extattrs: Optional[dict] = None
extattrs: Optional[dict] = None,
) -> Union[HostAddresses, str]:
# TODO: should hostnames be unique
# (i.e. fail if hostname already exists in this domain/service)?
if cname_aliases is None:
......@@ -395,7 +392,7 @@ def allocate_host_inner( # noqa: C901
json=req_payload,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
assert isinstance(r.json(), str)
......@@ -411,7 +408,7 @@ def allocate_host_inner( # noqa: C901
json=cname_req_payload,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
assert r.json().startswith("record:cname/")
......@@ -433,7 +430,7 @@ def allocate_host( # noqa: C901
----
hostname (str): hostname of the host (without domain name, which is taken from the service type)
service_type (str): the name of the service type (e.g. "TRUNK")
service_networks (ServiceNetworks, optional): ipv4 and ipv6 network to allocate host, must be in a valid container for the service type
service_networks (ServiceNetworks, optional): ipv4 and ipv6 network to allocate host
host_addresses (HostAddresses, optional): ipv4 and ipv6 addresses to allocate host (service_networks has precedence)
cname_aliases (list, optional): to create cname records in addition to the host record
extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"})
......@@ -558,8 +555,7 @@ def allocate_host( # noqa: C901
def delete_network(
network: ipaddress.ip_network = None,
service_type: str = ""
network: ipaddress.ip_network = None, service_type: str = ""
) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
"""Delete IPv4 or IPv6 network by CIDR."""
oss = settings.load_oss_params()
......@@ -597,7 +593,7 @@ def delete_network(
f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}',
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
......@@ -614,7 +610,7 @@ def delete_host(
hostname: str = "",
host_addresses: HostAddresses = None,
cname_aliases: Optional[list] = None,
service_type: str = ""
service_type: str = "",
) -> HostAddresses:
"""Delete host record and associated CNAME records.
......@@ -660,7 +656,7 @@ def delete_host(
},
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
host_data = r.json()
assert len(host_data) == 1, "Host to delete does not exist in IPAM."
......@@ -676,7 +672,7 @@ def delete_host(
},
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
cname_data = r.json()
provided_cnames = [item + domain_name for item in cname_aliases]
......@@ -688,7 +684,7 @@ def delete_host(
f"{wapi(infoblox_params)}/{host_ref}",
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
......@@ -699,7 +695,7 @@ def delete_host(
f"{wapi(infoblox_params)}/{cname_ref}",
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
......@@ -707,8 +703,7 @@ def delete_host(
def validate_network(
gso_subscription_id: str = "",
network: ipaddress.ip_network = None
gso_subscription_id: str = "", network: ipaddress.ip_network = None
) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
"""Validate IPv4 or IPv6 network.
......@@ -721,7 +716,9 @@ def validate_network(
network_info = find_networks(network=str(network), ip_version=ip_version)
assert len(network_info) == 1 and "_ref" in network_info[0], "Network to validate not found in IPAM."
assert "comment" in network_info[0], "Network to validate does not have comment in IPAM."
assert gso_subscription_id in network_info[0]["comment"], "GSO subscription ID does not match the one in the comment field of the IPAM network."
assert (
gso_subscription_id in network_info[0]["comment"]
), "GSO subscription ID does not match the one in the comment field of the IPAM network."
if ip_version == 4:
return V4ServiceNetwork(v4=network)
......@@ -732,7 +729,7 @@ def validate_host(
hostname: str = "",
host_addresses: HostAddresses = None,
cname_aliases: Optional[list] = None,
service_type: str = ""
service_type: str = "",
) -> HostAddresses:
"""Validate host.
......@@ -765,7 +762,7 @@ def validate_host(
},
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
host_data = r.json()
assert len(host_data) == 1, "Host to validate does not exist in IPAM."
......@@ -780,7 +777,7 @@ def validate_host(
},
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
cname_data = r.json()
provided_cnames = [item + domain_name for item in cname_aliases]
......
......@@ -11,7 +11,7 @@ from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products.product_types.device import Device
from gso.services import ipam
from gso.services.ipam import V4HostAddress, V4ServiceNetwork, V6HostAddress, V6ServiceNetwork
from gso.services.ipam import HostAddresses, V4ServiceNetwork, V6ServiceNetwork
logger = logging.getLogger(__name__)
......@@ -31,7 +31,7 @@ def _deprovision_in_user_management_system(fqdn: str) -> None:
@step("Deprovision loopback IPs from IPAM/DNS")
def deprovision_loopback_ips(subscription: Device) -> dict[str, V4HostAddress | V6HostAddress]:
def deprovision_loopback_ips(subscription: Device) -> dict[str, HostAddresses]:
input_host_addresses = ipam.HostAddresses(
v4=ipaddress.IPv4Address(subscription.device.device_lo_ipv4_address),
v6=ipaddress.IPv6Address(subscription.device.device_lo_ipv6_address),
......
......@@ -259,20 +259,14 @@ def test_delete_network(data_config_filename: PathLike):
assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26"))
with pytest.raises(AssertionError):
service_network = ipam.delete_network(
network=ipaddress.ip_network("10.255.255.20/32"), service_type="LO"
)
service_network = ipam.delete_network(network=ipaddress.ip_network("10.255.255.20/32"), service_type="LO")
assert service_network is None
service_network = ipam.delete_network(
network=ipaddress.ip_network("dead:beef::18/128"), service_type="TRUNK"
)
service_network = ipam.delete_network(network=ipaddress.ip_network("dead:beef::18/128"), service_type="TRUNK")
assert service_network == ipam.V6ServiceNetwork(v6=ipaddress.ip_network("dead:beef::18/128"))
with pytest.raises(AssertionError):
service_network = ipam.delete_network(
network=ipaddress.ip_network("beef:dead::18/128"), service_type="TRUNK"
)
service_network = ipam.delete_network(network=ipaddress.ip_network("beef:dead::18/128"), service_type="TRUNK")
assert service_network is None
......@@ -378,22 +372,20 @@ def test_validate_network(data_config_filename: PathLike):
"_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501
"network": "10.255.255.0/26",
"network_view": "default",
"comment": "the subscription id is 0123456789abcdef"
"comment": "the subscription id is 0123456789abcdef",
}
],
)
service_network = ipam.validate_network(
gso_subscription_id="0123456789abcdef",
network=ipam.ipaddress.ip_network("10.255.255.0/26")
gso_subscription_id="0123456789abcdef", network=ipam.ipaddress.ip_network("10.255.255.0/26")
)
assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26"))
# Fail because non-matching subscription id
with pytest.raises(AssertionError):
service_network = ipam.validate_network(
gso_subscription_id="1a2b3c4d5e6f7890",
network=ipam.ipaddress.ip_network("10.255.255.0/26")
gso_subscription_id="1a2b3c4d5e6f7890", network=ipam.ipaddress.ip_network("10.255.255.0/26")
)
assert service_network is None
......
......@@ -27,6 +27,6 @@ commands =
coverage report
isort -c .
ruff .
black --check .
black .
mypy .
flake8
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment