diff --git a/gso/services/ipam.py b/gso/services/ipam.py index 67777e7b4948cf92ce3659a6d97798a7a34aeedb..3ddab7e73cdf9f90c1c0250ceb6255b0ccb48675 100644 --- a/gso/services/ipam.py +++ b/gso/services/ipam.py @@ -138,11 +138,18 @@ def assert_network_in_service( def find_networks(network_container: Optional[str] = "", network: Optional[str] = "", ip_version: int = 4): - """If network_container is not None, find all networks within the specified container. - Otherwise, if network is not None, find the specified network. - Otherwise find all networks. - A list of all found networks is returned (an HTTP 200 code - may be returned with an empty list). + """Get all networks optinally filtering by a container or by a network. + + Args: + ---- + network_container (str, optional): container to filter by + network (str, optional): network to filter by + ip_version (int): 4 or 6 + + Returns: + ------- + (list) all found networks mathing the args, which may be empty. + """ assert ip_version in [4, 6] oss = settings.load_oss_params() @@ -158,7 +165,7 @@ def find_networks(network_container: Optional[str] = "", network: Optional[str] f"{wapi(infoblox_params)}/{endpoint}", params=params, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, + verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" @@ -205,7 +212,7 @@ def allocate_network_inner( json=req_payload, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), headers={"content-type": "application/json"}, - verify=False, + verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT ) if not match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL): @@ -226,7 +233,19 @@ def allocate_network_inner( def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V4ServiceNetwork: - """Allocate IPv4 network within the container of the specified service type.""" + """Allocate IPv4 network within the container of the specified service type. + + Args: + ---- + service_type (str): the name of the service type (e.g. "TRUNK") + comment (str, optional): a custom comment to write in the comment field in IPAM + extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"}) + + Returns: + ------- + (V4ServiceNetwork): the allocated network + + """ if extattrs is None: extattrs = {} oss = settings.load_oss_params() @@ -237,7 +256,19 @@ def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", e def allocate_ipv6_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V6ServiceNetwork: - """Allocate IPv6 network within the container of the specified service type.""" + """Allocate IPv6 network within the container of the specified service type. + + Args: + ---- + service_type (str): the name of the service type (e.g. "TRUNK") + comment (str, optional): a custom comment to write in the comment field in IPAM + extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"}) + + Returns: + ------- + (V4ServiceNetwork): the allocated network + + """ if extattrs is None: extattrs = {} oss = settings.load_oss_params() @@ -266,13 +297,21 @@ def allocate_networks( def find_next_available_ip(infoblox_params, network_ref: str = ""): """Find the next available IP address from a network given its ref. - Returns "NETWORK_FULL" if there's no space in the network. - Otherwise returns the next available IP address in the network. + + Args: + ---- + infoblox_params (settings.InfoBloxParams): infoblox params + network_ref (str): the network to find the next available ip, in InfoBlox reference format + + Returns: + ------- + (str): next available ip in the network, or "NETWORK_FULL" if there's no space in the network + """ r = requests.post( f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1", auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, + verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT ) @@ -286,7 +325,7 @@ def find_next_available_ip(infoblox_params, network_ref: str = ""): return received_ip[0] -def allocate_host_inner( +def allocate_host_inner( # noqa: max-complexity=15 hostname: str = "", addrs: Optional[Tuple] = None, networks: Optional[Tuple] = None, @@ -294,14 +333,7 @@ def allocate_host_inner( dns_view: Optional[str] = "default", extattrs: Optional[dict] = None ) -> Union[HostAddresses, str]: - """If networks is not None, allocate host in those networks. - Otherwise if addrs is not None, allocate host with those addresses. - hostname parameter must be full name including domain name. - Return "IPV4_NETWORK_FULL" or "IPV6_NETWORK_FULL" - if couldn't allocate host due to requested network being full. - Return "IPV4_NETWORK_NOT_FOUND" or "IPV6_NETWORK_NOT_FOUND" - if couldn't allocate host due to requested network not existing. - """ + # TODO: should hostnames be unique # (i.e. fail if hostname already exists in this domain/service)? if cname_aliases is None: @@ -313,6 +345,7 @@ def allocate_host_inner( assert oss.IPAM.INFOBLOX infoblox_params = oss.IPAM.INFOBLOX + # If networks is not None, allocate host in those networks. if networks: ipv4_network = networks[0] ipv6_network = networks[1] @@ -320,6 +353,7 @@ def allocate_host_inner( assert ip_network_version(ipv6_network) == 6 # Find the next available IP address in each network + # If requested error doesn't exist, return error network_info = find_networks(network=ipv4_network, ip_version=4) if len(network_info) != 1: return "IPV4_NETWORK_NOT_FOUND" @@ -339,12 +373,14 @@ def allocate_host_inner( if ipv6_addr == "NETWORK_FULL": return "IPV6_NETWORK_FULL" + # Otherwise if addrs is not None, allocate host with those addresses. else: ipv4_addr = addrs[0] ipv6_addr = addrs[1] assert ip_addr_version(ipv4_addr) == 4 assert ip_addr_version(ipv6_addr) == 6 + # hostname parameter must be full name including domain name. req_payload = { "ipv4addrs": [{"ipv4addr": ipv4_addr}], "ipv6addrs": [{"ipv6addr": ipv6_addr}], @@ -358,7 +394,7 @@ def allocate_host_inner( f"{wapi(infoblox_params)}/record:host", json=req_payload, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, + verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" @@ -374,7 +410,7 @@ def allocate_host_inner( f"{wapi(infoblox_params)}/record:cname", json=cname_req_payload, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, + verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" @@ -383,29 +419,29 @@ def allocate_host_inner( return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), v6=ipaddress.ip_address(ipv6_addr)) -def allocate_host( +def allocate_host( # noqa: max-complexity=15 hostname: str = "", service_type: str = "", service_networks: Optional[ServiceNetworks] = None, - host_addresses: Optional[ServiceNetworks] = None, + host_addresses: Optional[HostAddresses] = None, cname_aliases: Optional[list] = None, extattrs: Optional[dict] = None, ) -> HostAddresses: - """Allocate host record with both IPv4 and IPv6 address, and respective DNS - A and AAAA records. - - If service_networks is provided, and it's in a valid container, - that one is used. - - If service_networks is not provided, and host_addresses is provided, - those specific addresses are used. - - If neither is provided: - - If service has configured containers, new ipv4 and ipv6 networks are - created and those are used. Note that in this case extattrs is for the - hosts and not for the networks. - - If service doesn't have configured containers and has configured - networks instead, the configured networks are used (they are filled up - in order of appearance in the configuration file). - The domain name is taken from the service type and appended to the - specified hostname. + """Allocate host record with both IPv4 and IPv6 address, and respective DNS A and AAAA records. + + Args: + ---- + hostname (str): hostname of the host (without domain name, which is taken from the service type) + service_type (str): the name of the service type (e.g. "TRUNK") + service_networks (ServiceNetworks, optional): ipv4 and ipv6 network to allocate host, must be in a valid container for the service type + host_addresses (HostAddresses, optional): ipv4 and ipv6 addresses to allocate host (service_networks has precedence) + cname_aliases (list, optional): to create cname records in addition to the host record + extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"}) + + Returns: + ------- + (HostAddresses): ipv4 and ipv6 addresses of the allocated host + """ if cname_aliases is None: cname_aliases = [] @@ -432,6 +468,11 @@ def allocate_host( if cname_aliases: cname_aliases = [alias + domain_name for alias in cname_aliases] + # When neither service_networks not host_addresses are provided: + # If service has configured containers, new ipv4 and ipv6 networks are created and those are used. + # Note that in this case extattrs is for the hosts and not for the networks. + # If service doesn't have configured containers and has configured networks instead, the configured + # networks are used (they are filled up in order of appearance in the configuration file). if not service_networks and not host_addresses: if oss_ipv4_containers and oss_ipv6_containers: # This service has configured containers. @@ -555,7 +596,7 @@ def delete_network( r = requests.delete( f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}', auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, + verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" @@ -576,6 +617,7 @@ def delete_host( service_type: str = "" ) -> HostAddresses: """Delete host record and associated CNAME records. + All arguments passed to this function must match together a host record in IPAM, and all CNAME records associated to it must also be passed exactly. """ @@ -617,7 +659,7 @@ def delete_host( "view": dns_view, }, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, + verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT ) host_data = r.json() @@ -633,7 +675,7 @@ def delete_host( "view": dns_view, }, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, + verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT ) cname_data = r.json() @@ -645,7 +687,7 @@ def delete_host( r = requests.delete( f"{wapi(infoblox_params)}/{host_ref}", auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, + verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" @@ -656,7 +698,7 @@ def delete_host( r = requests.delete( f"{wapi(infoblox_params)}/{cname_ref}", auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, + verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" @@ -669,8 +711,8 @@ def validate_network( network: ipaddress.ip_network = None ) -> Union[V4ServiceNetwork, V6ServiceNetwork]: """Validate IPv4 or IPv6 network. - Check if the specified network exist, and, if it does, - check if its comment field contains gso_subscription_id. + + Check if the specified network exist, and, if it does, check if its comment field contains gso_subscription_id. Returns the network if validation successful. """ assert network, "No network specified to validate." @@ -693,6 +735,7 @@ def validate_host( service_type: str = "" ) -> HostAddresses: """Validate host. + Check if all arguments passed to this function match together a host record in IPAM, and all CNAME records associated to it also match exactly. Returns the host if validation successful. @@ -721,7 +764,7 @@ def validate_host( "view": dns_view, }, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, + verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT ) host_data = r.json() @@ -736,7 +779,7 @@ def validate_host( "view": dns_view, }, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, + verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT ) cname_data = r.json()