diff --git a/gso/services/ipam.py b/gso/services/ipam.py index a1bbab8d26a3065e708a23e6c66ed8139b6a1ee7..67777e7b4948cf92ce3659a6d97798a7a34aeedb 100644 --- a/gso/services/ipam.py +++ b/gso/services/ipam.py @@ -270,7 +270,7 @@ def find_next_available_ip(infoblox_params, network_ref: str = ""): Otherwise returns the next available IP address in the network. """ r = requests.post( - f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1", # noqa: E501 + f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1", auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, timeout=REQUESTS_TIMEOUT @@ -517,7 +517,7 @@ def allocate_host( def delete_network( - network: Union[V4ServiceNetwork, V6ServiceNetwork] = None, + network: ipaddress.ip_network = None, service_type: str = "" ) -> Union[V4ServiceNetwork, V6ServiceNetwork]: """Delete IPv4 or IPv6 network by CIDR.""" @@ -666,8 +666,7 @@ def delete_host( def validate_network( gso_subscription_id: str = "", - network: Union[V4ServiceNetwork, V6ServiceNetwork] = None, - service_type: str = "" + network: ipaddress.ip_network = None ) -> Union[V4ServiceNetwork, V6ServiceNetwork]: """Validate IPv4 or IPv6 network. Check if the specified network exist, and, if it does, @@ -682,7 +681,9 @@ def validate_network( assert "comment" in network_info[0], "Network to validate does not have comment in IPAM." assert gso_subscription_id in network_info[0]["comment"], "GSO subscription ID does not match the one in the comment field of the IPAM network." - return network + if ip_version == 4: + return V4ServiceNetwork(v4=network) + return V6ServiceNetwork(v6=network) def validate_host( @@ -743,4 +744,4 @@ def validate_host( found_cnames = [item["name"] for item in cname_data if "name" in item] assert provided_cnames == found_cnames, "Provided CNAME alias names don't match the ones poiting to hostname." - return HostAddresses + return host_addresses diff --git a/test/test_ipam.py b/test/test_ipam.py index 8da9ad49ebf484b97879a62f6f6fd0eea5939325..7d637d6374ebc261e09ee31768b15728a0029b72 100644 --- a/test/test_ipam.py +++ b/test/test_ipam.py @@ -47,18 +47,6 @@ def test_allocate_host(data_config_filename: PathLike): json="record:host/ZG5zLmhvc3QkLm5vbl9ETlNfaG9zdF9yb290LjAuMTY4MzcwNTU4MzY3MC5nc28udGVzdA:test.lo/%20", # noqa: E501 ) - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/record:a$"), - json="record:a/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS44:test.lo/default", # noqa: E501 - ) - - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/record:aaaa$"), - json="record:aaaa/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS44:test.lo/default", # noqa: E501 - ) - responses.add( method=responses.GET, url=re.compile(r".*/wapi.*/network.*10.255.255.*"), @@ -378,3 +366,109 @@ def test_delete_host(data_config_filename: PathLike): service_type="LO", ) assert host_addresses is None + + +@responses.activate +def test_validate_network(data_config_filename: PathLike): + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"), + json=[ + { + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 + "network": "10.255.255.0/26", + "network_view": "default", + "comment": "the subscription id is 0123456789abcdef" + } + ], + ) + + service_network = ipam.validate_network( + gso_subscription_id="0123456789abcdef", + network=ipam.ipaddress.ip_network("10.255.255.0/26") + ) + assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26")) + + # Fail because non-matching subscription id + with pytest.raises(AssertionError): + service_network = ipam.validate_network( + gso_subscription_id="1a2b3c4d5e6f7890", + network=ipam.ipaddress.ip_network("10.255.255.0/26") + ) + assert service_network is None + + +@responses.activate +def test_validate_host(data_config_filename: PathLike): + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*record:host.*"), + json=[ + { + "_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYV9sbw:ha_lo.gso/default", # noqa: E501 + "ipv4addrs": [ + { + "_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZ3NvLmhhX2xvLjEwLjI1NS4yNTUuMS40.255.255.1/ha_lo.gso/default", # noqa: E501 + "configure_for_dhcp": False, + "host": "ha_lo.gso", + "ipv4addr": "10.255.255.1", + } + ], + "ipv6addrs": [ + { + "_ref": "record:host_ipv6addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZvLmhhX2xvLmRlYWQ6YmVlZjo6MS4:dead%3Abeef%3A%3A1/ha_lo.gso/default", # noqa: E501 + "configure_for_dhcp": False, + "host": "ha_lo.gso", + "ipv6addr": "dead:beef::1", + } + ], + "name": "ha_lo.gso", + "view": "default", + } + ], + ) + + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*record:cname.*"), + json=[ + { + "_ref": "record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczE:alias1.ha.gso/default", # noqa: E501 + "canonical": "hA_LO.lo", + "name": "alias1.ha.lo", + "view": "default", + }, + { + "_ref": "record:cname/5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczI:alias2.ha.gso/default", # noqa: E501 + "canonical": "hA_LO.lo", + "name": "alias2.ha.lo", + "view": "default", + }, + ], + ) + + input_host_addresses = ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") + ) + host_addresses = ipam.validate_host( + hostname="ha_lo", + host_addresses=input_host_addresses, + cname_aliases=["alias1.ha", "alias2.ha"], + service_type="LO", + ) + assert host_addresses == ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") + ) + + # Fail because non-matching hostname + host_addresses = ipam.validate_host( + hostname="wrong_hostname", + host_addresses=input_host_addresses, + cname_aliases=["alias1.ha", "alias2.ha"], + service_type="LO", + ) + with pytest.raises(AssertionError): + host_addresses = ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") + ) + assert host_addresses is None