diff --git a/gso/services/_ipam.py b/gso/services/_ipam.py deleted file mode 100644 index 85555619c51a3950409b6d354d36f3405b1fc7e1..0000000000000000000000000000000000000000 --- a/gso/services/_ipam.py +++ /dev/null @@ -1,687 +0,0 @@ -# mypy: ignore-errors -import ipaddress -from enum import Enum -from typing import Union - -import requests -from pydantic import BaseSettings -from requests.auth import HTTPBasicAuth - -from gso import settings - - -class V4ServiceNetwork(BaseSettings): - v4: ipaddress.IPv4Network - - -class V6ServiceNetwork(BaseSettings): - v6: ipaddress.IPv6Network - - -class ServiceNetworks(BaseSettings): - v4: ipaddress.IPv4Network - v6: ipaddress.IPv6Network - - -class V4HostAddress(BaseSettings): - v4: ipaddress.IPv4Address - - -class V6HostAddress(BaseSettings): - v6: ipaddress.IPv6Address - - -class HostAddresses(BaseSettings): - v4: ipaddress.IPv4Address - v6: ipaddress.IPv6Address - - -class IPAMErrors(Enum): - # HTTP error code, match in error message - CONTAINER_FULL = 400, "Can not find requested number of networks" - NETWORK_FULL = 400, "Cannot find 1 available IP address(es) in this network" - EXTATTR_UNKNOWN = 400, "Unknown extensible attribute" - EXTATTR_BADVALUE = 400, "Bad value for extensible attribute" - - -# TODO: remove this! -# lab infoblox cert is not valid for the ipv4 address -# ... disable warnings for now -requests.packages.urllib3.disable_warnings() - - -def _match_error_code(response, error_code): - return response.status_code == error_code.value[0] and error_code.value[1] in response.text - - -def _wapi(infoblox_params: settings.InfoBloxParams): - return f"https://{infoblox_params.host}" f"/wapi/{infoblox_params.wapi_version}" - - -def _ip_addr_version(addr): - ip_version = None - ip_addr = ipaddress.ip_address(addr) - if isinstance(ip_addr, ipaddress.IPv4Address): - ip_version = 4 - elif isinstance(ip_addr, ipaddress.IPv6Address): - ip_version = 6 - assert ip_version in [4, 6] - return ip_version - - -def _ip_network_version(network): - ip_version = None - ip_network = ipaddress.ip_network(network) - if isinstance(ip_network, ipaddress.IPv4Network): - ip_version = 4 - elif isinstance(ip_network, ipaddress.IPv6Network): - ip_version = 6 - assert ip_version in [4, 6] - return ip_version - - -def _assert_host_in_service( - ipv4_addr="", - ipv6_addr="", - oss_ipv4_containers=None, - oss_ipv6_containers=None, - oss_ipv4_networks=None, - oss_ipv6_networks=None, -): - # IPv4 - if oss_ipv4_containers: - assert any( - ipv4_addr in oss_ipv4_container for oss_ipv4_container in oss_ipv4_containers - ), "Host's IPv4 address doesn't belong to service type." - else: - assert any( - ipv4_addr in oss_ipv4_network for oss_ipv4_network in oss_ipv4_networks - ), "Host's IPv4 address doesn't belong to service type." - - # IPv6 - if oss_ipv6_containers: - assert any( - ipv6_addr in oss_ipv6_container for oss_ipv6_container in oss_ipv6_containers - ), "Host's IPv6 address doesn't belong to service type." - else: - assert any( - ipv6_addr in oss_ipv6_network for oss_ipv6_network in oss_ipv6_networks - ), "Host's IPv6 address doesn't belong to service type." - - -def _find_networks(network_container=None, network=None, ip_version=4): - """If network_container is not None, find all networks within the specified - container. - Otherwise, if network is not None, find the specified network. - Otherwise find all networks. - A list of all found networks is returned (an HTTP 200 code - may be returned with an empty list.). - """ - assert ip_version in [4, 6] - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - endpoint = "network" if ip_version == 4 else "ipv6network" - params = None - if network_container: - params = {"network_container": network_container} - elif network: - params = {"network": network} - r = requests.get( - f"{_wapi(infoblox_params)}/{endpoint}", - params=params, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - return r.json() - - -def _allocate_network( - infoblox_params: settings.InfoBloxParams, - network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams], - ip_version=4, - comment="", - extattrs={}, -) -> Union[V4ServiceNetwork, V6ServiceNetwork]: - assert ip_version in [4, 6] - endpoint = "network" if ip_version == 4 else "ipv6network" - ip_container = "networkcontainer" if ip_version == 4 else "ipv6networkcontainer" - - assert network_params.containers, ( - "No containers available to allocate networks for this service." - "Maybe you want to allocate a host from a network directly?" - ) - - # only return in the response the allocated network, not all available - # TODO: any validation needed for extrattrs wherever it's used? - req_payload = { - "network": { - "_object_function": "next_available_network", - "_parameters": {"cidr": network_params.mask}, - "_object": ip_container, - "_object_parameters": {"network": str(network_params.containers[0])}, - "_result_field": "networks", - }, - "comment": comment, - "extattrs": extattrs, - } - - container_index = 0 - while True: - r = requests.post( - f"{_wapi(infoblox_params)}/{endpoint}", - params={"_return_fields": "network"}, - json=req_payload, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - headers={"content-type": "application/json"}, - verify=False, - ) - if not _match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL): - break - # Container full: try with next valid container for service (if any) - container_index += 1 - if len(network_params.containers) < (container_index + 1): - break - req_payload["network"]["_object_parameters"]["network"] = str(network_params.containers[container_index]) - - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - assert "network" in r.json() - allocated_network = r.json()["network"] - if ip_version == 4: - return V4ServiceNetwork(v4=ipaddress.ip_network(allocated_network)) - else: - return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network)) - - -def allocate_service_ipv4_network(service_type="", comment="", extattrs={}) -> V4ServiceNetwork: - """Allocate IPv4 network within the container of the specified service type.""" - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." - return _allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs) - - -def allocate_service_ipv6_network(service_type="", comment="", extattrs={}) -> V6ServiceNetwork: - """Allocate IPv6 network within the container of the specified service type.""" - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." - return _allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V6, 6, comment, extattrs) - - -def _find_next_available_ip(infoblox_params, network_ref=""): - """Find the next available IP address from a network given its ref. - Returns "NETWORK_FULL" if there's no space in the network. - Otherwise returns the next available IP address in the network. - """ - r = requests.post( - f"{_wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1", # noqa: E501 - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - - if _match_error_code(response=r, error_code=IPAMErrors.NETWORK_FULL): - return "NETWORK_FULL" - - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - assert "ips" in r.json() - received_ip = r.json()["ips"] - assert len(received_ip) == 1 - return received_ip[0] - - -def _allocate_host( - hostname="", addrs=None, networks=None, cname_aliases=[], dns_view="default", extattrs={} -) -> Union[HostAddresses, str]: - """If networks is not None, allocate host in those networks. - Otherwise if addrs is not None, allocate host with those addresses. - hostname parameter must be full name including domain name. - Return "IPV4_NETWORK_FULL" or "IPV6_NETWORK_FULL" - if couldn't allocate host due to requested network being full. - Return "IPV4_NETWORK_NOT_FOUND" or "IPV6_NETWORK_NOT_FOUND" - if couldn't allocate host due to requested network not existing. - """ - # TODO: should hostnames be unique - # (that is, fail if hostname already exists in this domain/service)? - assert addrs or networks, "You must specify either the host addresses or the networks CIDR." - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - if networks: - ipv4_network = networks[0] - ipv6_network = networks[1] - assert _ip_network_version(ipv4_network) == 4 - assert _ip_network_version(ipv6_network) == 6 - - # Find the next available IP address in each network - network_info = _find_networks(network=ipv4_network, ip_version=4) - if len(network_info) != 1: - return "IPV4_NETWORK_NOT_FOUND" - assert "_ref" in network_info[0] - ipv4_addr = _find_next_available_ip(infoblox_params, network_info[0]["_ref"]) - - network_info = _find_networks(network=ipv6_network, ip_version=6) - if len(network_info) != 1: - return "IPV6_NETWORK_NOT_FOUND" - assert "_ref" in network_info[0] - ipv6_addr = _find_next_available_ip(infoblox_params, network_info[0]["_ref"]) - - # If couldn't find next available IPs, return error - if ipv4_addr == "NETWORK_FULL" or ipv6_addr == "NETWORK_FULL": - if ipv4_addr == "NETWORK_FULL": - return "IPV4_NETWORK_FULL" - if ipv6_addr == "NETWORK_FULL": - return "IPV6_NETWORK_FULL" - - else: - ipv4_addr = addrs[0] - ipv6_addr = addrs[1] - assert _ip_addr_version(ipv4_addr) == 4 - assert _ip_addr_version(ipv6_addr) == 6 - - req_payload = { - "ipv4addrs": [{"ipv4addr": ipv4_addr}], - "ipv6addrs": [{"ipv6addr": ipv6_addr}], - "name": hostname, - "configure_for_dns": True, - "view": dns_view, - "extattrs": extattrs, - } - - r = requests.post( - f"{_wapi(infoblox_params)}/record:host", - json=req_payload, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - assert isinstance(r.json(), str) - assert r.json().startswith("record:host/") - - if cname_aliases: - cname_req_payload = {"name": "", "canonical": hostname, "view": dns_view, "extattrs": extattrs} - - for alias in cname_aliases: - cname_req_payload["name"] = alias - r = requests.post( - f"{_wapi(infoblox_params)}/record:cname", - json=cname_req_payload, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - assert r.json().startswith("record:cname/") - - return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), v6=ipaddress.ip_address(ipv6_addr)) - - -def allocate_service_host( - hostname="", - service_type="", - service_networks: ServiceNetworks = None, - host_addresses: HostAddresses = None, - cname_aliases=None, - extattrs={}, -) -> HostAddresses: - """Allocate host record with both IPv4 and IPv6 address, and respective DNS - A and AAAA records. - - If service_networks is provided, and it's in a valid container, - that one is used. - - If service_networks is not provided, and host_addresses is provided, - those specific addresses are used. - - If neither is provided: - - If service has configured containers, new ipv4 and ipv6 networks are - created and those are used. Note that in this case extattrs is for the - hosts and not for the networks. - - If service doesn't have configured containers and has configured - networks instead, the configured networks are used (they are filled up - in order of appearance in the configuration file). - The domain name is taken from the service type and appended to the - specified hostname. - """ - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - - assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." - oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers - oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers - oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks - oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks - domain_name = getattr(ipam_params, service_type).domain_name - dns_view = getattr(ipam_params, service_type).dns_view - - assert (oss_ipv4_containers and oss_ipv6_containers) or ( - oss_ipv4_networks and oss_ipv6_networks - ), "This service is missing either containers or networks configuration." - assert domain_name, "This service is missing domain_name configuration." - assert dns_view, "This service is missing dns_view configuration." - - if cname_aliases: - cname_aliases = [alias + domain_name for alias in cname_aliases] - - if not service_networks and not host_addresses: - if oss_ipv4_containers and oss_ipv6_containers: - # This service has configured containers. - # Use them to allocate new networks that can allocate the hosts. - - # IPv4 - ipv4_network = str(allocate_service_ipv4_network(service_type=service_type).v4) - assert ipv4_network, "No available space for IPv4 networks for this service type." - - # IPv6 - ipv6_network = str(allocate_service_ipv6_network(service_type=service_type).v6) - assert ipv6_network, "No available space for IPv6 networks for this service type." - - elif oss_ipv4_networks and oss_ipv6_networks: - # This service has configured networks. - # Allocate a host inside an ipv4 and ipv6 network from among them. - ipv4_network = str(oss_ipv4_networks[0]) - ipv6_network = str(oss_ipv6_networks[0]) - - ipv4_network_index = 0 - ipv6_network_index = 0 - while True: - network_tuple = (ipv4_network, ipv6_network) - host = _allocate_host( - hostname=hostname + domain_name, - networks=network_tuple, - cname_aliases=cname_aliases, - dns_view=dns_view, - extattrs=extattrs, - ) - - if "NETWORK_FULL" not in host and "NETWORK_NOT_FOUND" not in host: - break - elif "IPV4" in host: - ipv4_network_index += 1 - assert oss_ipv4_networks, "No available space in any IPv4 network for this service." - assert ipv4_network_index < len( - oss_ipv4_networks - ), "No available space in any IPv4 network for this service." - ipv4_network = str(oss_ipv4_networks[ipv4_network_index]) - else: # "IPV6" in host - ipv6_network_index += 1 - assert oss_ipv6_networks, "No available space in any IPv6 network for this service." - assert ipv6_network_index < len( - oss_ipv6_networks - ), "No available space in any IPv6 network for this service." - ipv6_network = str(oss_ipv6_networks[ipv6_network_index]) - - elif service_networks: - # IPv4 - ipv4_network = service_networks.v4 - if oss_ipv4_containers: - assert any(ipv4_network.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers) - else: - assert ipv4_network in oss_ipv4_networks - - # IPv6 - ipv6_network = service_networks.v6 - if oss_ipv6_containers: - assert any(ipv6_network.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers) - else: - assert ipv6_network in oss_ipv6_networks - - host = _allocate_host( - hostname=hostname + domain_name, - networks=(str(ipv4_network), str(ipv6_network)), - cname_aliases=cname_aliases, - dns_view=dns_view, - extattrs=extattrs, - ) - assert "NETWORK_FULL" not in host, "Network is full." - assert "NETWORK_NOT_FOUND" not in host, "Network does not exist. Create it first." - - elif host_addresses: - ipv4_addr = host_addresses.v4 - ipv6_addr = host_addresses.v6 - _assert_host_in_service( - ipv4_addr, ipv6_addr, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks - ) - - host = _allocate_host( - hostname=hostname + domain_name, - addrs=(str(ipv4_addr), str(ipv6_addr)), - cname_aliases=cname_aliases, - dns_view=dns_view, - extattrs=extattrs, - ) - assert "NETWORK_FULL" not in host - - return host - - -def delete_service_network(ipnetwork=None, service_type="") -> Union[V4ServiceNetwork, V6ServiceNetwork]: - """Delete IPv4 or IPv6 network by CIDR.""" - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - assert ipam_params.INFOBLOX - infoblox_params = ipam_params.INFOBLOX - - assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." - - network = str(ipnetwork) - ip_version = _ip_network_version(network) - - # Ensure that the network to be deleted is under the service type. - # Otherwise user is not allowed to delete it - if ip_version == 4: - oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers - oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks - if oss_ipv4_containers: - assert any( - ipnetwork.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers - ), "Can't delete: network doesn't belong to service type." - else: - assert ipnetwork in oss_ipv4_networks, "Can't delete: network doesn't belong to service type." - - else: - oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers - oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks - if oss_ipv6_containers: - assert any( - ipnetwork.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers - ), "Can't delete: network doesn't belong to service type." - else: - assert ipnetwork in oss_ipv6_networks, "Can't delete: network doesn't belong to service type." - - network_info = _find_networks(network=network, ip_version=ip_version) - assert len(network_info) == 1, "Network does not exist." - assert "_ref" in network_info[0] - - r = requests.delete( - f'{_wapi(infoblox_params)}/{network_info[0]["_ref"]}', - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - # Extract ipv4/ipv6 address from the network reference obtained in the - # response - r_text = r.text - print(r_text) - network_address = ipaddress.ip_network(r_text.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) - if ip_version == 4: - return V4ServiceNetwork(v4=ipaddress.ip_network(network_address)) - else: - return V6ServiceNetwork(v6=ipaddress.ip_network(network_address)) - - -def delete_service_host( - hostname="", host_addresses: HostAddresses = None, cname_aliases=[], service_type="" -) -> Union[V4HostAddress, V6HostAddress]: - """Delete host record and associated CNAME records. - All arguments passed to this function must match together a host record in - IPAM, and all CNAME records associated to it must also be passed exactly. - """ - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - assert ipam_params.INFOBLOX - infoblox_params = ipam_params.INFOBLOX - - assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." - oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers - oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers - oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks - oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks - domain_name = getattr(ipam_params, service_type).domain_name - dns_view = getattr(ipam_params, service_type).dns_view - ipv4_addr = str(host_addresses.v4) - ipv6_addr = str(host_addresses.v6) - - _assert_host_in_service( - host_addresses.v4, - host_addresses.v6, - oss_ipv4_containers, - oss_ipv6_containers, - oss_ipv4_networks, - oss_ipv6_networks, - ) - - # Find host record reference - r = requests.get( - f"{_wapi(infoblox_params)}/record:host", - params={ - "name": (hostname + domain_name).lower(), # hostnames are lowercase - "ipv4addr": ipv4_addr, - "ipv6addr": ipv6_addr, - "view": dns_view, - }, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - host_data = r.json() - assert len(host_data) == 1, "Host does not exist." - assert "_ref" in host_data[0] - host_ref = host_data[0]["_ref"] - - # Find CNAME records reference - r = requests.get( - f"{_wapi(infoblox_params)}/record:cname", - params={ - "canonical": hostname + domain_name, - "view": dns_view, - }, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - cname_data = r.json() - provided_cnames = [item + domain_name for item in cname_aliases] - found_cnames = [item["name"] for item in cname_data if "name" in item] - assert provided_cnames == found_cnames, "Provided CNAME alias names don't match the ones poiting to hostname." - - # Delete the host record - r = requests.delete( - f"{_wapi(infoblox_params)}/{host_ref}", - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - # Delete the CNAME records - cname_refs = [item["_ref"] for item in cname_data if "name" in item] - for cname_ref in cname_refs: - r = requests.delete( - f"{_wapi(infoblox_params)}/{cname_ref}", - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - return host_addresses - - -""" -Below methods are not used for supported outside calls -""" - -''' -def _find_containers(network=None, ip_version=4): - """ - If network is not None, find that container. - Otherwise find all containers. - """ - assert ip_version in [4, 6] - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - endpoint = 'networkcontainer' if ip_version == 4 \ - else 'ipv6networkcontainer' - r = requests.get( - f'{_wapi(infoblox_params)}/{endpoint}', - params={'network': network} if network else None, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - return r.json() - - -def _get_network_capacity(network=None): - """ - Get utilization of a IPv4 network in a fraction of 1000. - """ - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - ip_version = _ip_network_version(network) - assert ip_version == 4, "Utilization is only available for IPv4 networks." - params = { - 'network': network, - '_return_fields': 'network,total_hosts,utilization' - } - - r = requests.get( - f'{_wapi(infoblox_params)}/network', - params=params, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - # Utilization info takes several minutes to converge. - # The IPAM utilization bar in the GUI as well. Why? - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - capacity_info = r.json() - assert len(capacity_info) == 1, "Requested IPv4 network doesn't exist." - assert 'utilization' in capacity_info[0] - utilization = capacity_info[0]['utilization'] - return utilization - - -def _get_network_usage_status(network): - """ - Get status and usage fields of all hosts in the specified ipv4 or ipv6 - network. - """ - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - ip_version = _ip_network_version(network) - endpoint = 'ipv4address' if ip_version == 4 else 'ipv6address' - - r = requests.get( - f'{_wapi(infoblox_params)}/{endpoint}', - params={ - '_return_fields': 'ip_address,status,usage', - 'network': network}, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - return r.json() -'''