Skip to content
Snippets Groups Projects

Feature/nat 212 213

Merged JORGE SASIAIN requested to merge feature/nat-212-213 into develop
4 files
+ 597
468
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 128
0
@@ -84,10 +84,17 @@ def ip_network_version(network: str = ""):
def assert_host_in_service(
ipv4_addr: str = "",
ipv6_addr: str = "",
<<<<<<< HEAD
oss_ipv4_containers=None,
oss_ipv6_containers=None,
oss_ipv4_networks=None,
oss_ipv6_networks=None,
=======
oss_ipv4_containers = None,
oss_ipv6_containers = None,
oss_ipv4_networks = None,
oss_ipv6_networks = None,
>>>>>>> 232046e (Fix conflicts in workflows)
):
# IPv4
if oss_ipv4_containers:
@@ -110,6 +117,7 @@ def assert_host_in_service(
), "Host's IPv6 address doesn't belong to service type."
<<<<<<< HEAD
def assert_network_in_service(
ipv4_network: Optional[V4ServiceNetwork] = None,
ipv6_network: Optional[V6ServiceNetwork] = None,
@@ -150,6 +158,14 @@ def find_networks(network_container: Optional[str] = "", network: Optional[str]
-------
(list) all found networks mathing the args, which may be empty.
=======
def find_networks(network_container: Optional[str] = "", network: Optional[str] = "", ip_version: int = 4):
"""If network_container is not None, find all networks within the specified container.
Otherwise, if network is not None, find the specified network.
Otherwise find all networks.
A list of all found networks is returned (an HTTP 200 code
may be returned with an empty list).
>>>>>>> 232046e (Fix conflicts in workflows)
"""
assert ip_version in [4, 6]
oss = settings.load_oss_params()
@@ -165,8 +181,13 @@ def find_networks(network_container: Optional[str] = "", network: Optional[str]
f"{wapi(infoblox_params)}/{endpoint}",
params=params,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
return r.json()
@@ -212,8 +233,13 @@ def allocate_network_inner(
json=req_payload,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
headers={"content-type": "application/json"},
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
if not match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL):
break
@@ -232,6 +258,7 @@ def allocate_network_inner(
return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network))
<<<<<<< HEAD
def allocate_ipv4_network(
service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None
) -> V4ServiceNetwork:
@@ -248,6 +275,10 @@ def allocate_ipv4_network(
(V4ServiceNetwork): the allocated network
"""
=======
def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V4ServiceNetwork:
"""Allocate IPv4 network within the container of the specified service type."""
>>>>>>> 232046e (Fix conflicts in workflows)
if extattrs is None:
extattrs = {}
oss = settings.load_oss_params()
@@ -257,6 +288,7 @@ def allocate_ipv4_network(
return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs)
<<<<<<< HEAD
def allocate_ipv6_network(
service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None
) -> V6ServiceNetwork:
@@ -273,6 +305,10 @@ def allocate_ipv6_network(
(V4ServiceNetwork): the allocated network
"""
=======
def allocate_ipv6_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V6ServiceNetwork:
"""Allocate IPv6 network within the container of the specified service type."""
>>>>>>> 232046e (Fix conflicts in workflows)
if extattrs is None:
extattrs = {}
oss = settings.load_oss_params()
@@ -283,13 +319,28 @@ def allocate_ipv6_network(
def allocate_networks(
<<<<<<< HEAD
service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None
=======
service_type: str = "",
comment: Optional[str] = "",
extattrs: Optional[dict] = None
>>>>>>> 232046e (Fix conflicts in workflows)
) -> ServiceNetworks:
"""Allocate IPv4 and IPv6 network for the specified service type."""
if extattrs is None:
extattrs = {}
<<<<<<< HEAD
v4_service_network = allocate_ipv4_network(service_type=service_type, comment=comment, extattrs=extattrs)
v6_service_network = allocate_ipv6_network(service_type=service_type, comment=comment, extattrs=extattrs)
=======
v4_service_network = allocate_ipv4_network(
service_type=service_type, comment=comment, extattrs=extattrs
)
v6_service_network = allocate_ipv6_network(
service_type=service_type, comment=comment, extattrs=extattrs
)
>>>>>>> 232046e (Fix conflicts in workflows)
return ServiceNetworks(v4=v4_service_network.v4, v6=v6_service_network.v6)
@@ -309,8 +360,13 @@ def find_next_available_ip(infoblox_params, network_ref: str = ""):
r = requests.post(
f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1",
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
if match_error_code(response=r, error_code=IPAMErrors.NETWORK_FULL):
@@ -323,21 +379,37 @@ def find_next_available_ip(infoblox_params, network_ref: str = ""):
return received_ip[0]
<<<<<<< HEAD
def allocate_host_inner( # noqa: C901
=======
def allocate_host_inner(
>>>>>>> 232046e (Fix conflicts in workflows)
hostname: str = "",
addrs: Optional[Tuple] = None,
networks: Optional[Tuple] = None,
cname_aliases: Optional[list] = None,
dns_view: Optional[str] = "default",
<<<<<<< HEAD
extattrs: Optional[dict] = None,
=======
extattrs: Optional[dict] = None
>>>>>>> 232046e (Fix conflicts in workflows)
) -> Union[HostAddresses, str]:
# TODO: should hostnames be unique
<<<<<<< HEAD
# (fail if hostname already exists in this domain/service)?
=======
# (i.e. fail if hostname already exists in this domain/service)?
>>>>>>> 232046e (Fix conflicts in workflows)
if cname_aliases is None:
cname_aliases = []
if extattrs is None:
extattrs = {}
<<<<<<< HEAD
assert addrs or networks, "Neither networks nor host addresses could be derived to allocate host."
=======
assert addrs or networks, "You must specify either the host addresses or the networks CIDR."
>>>>>>> 232046e (Fix conflicts in workflows)
oss = settings.load_oss_params()
assert oss.IPAM.INFOBLOX
infoblox_params = oss.IPAM.INFOBLOX
@@ -391,8 +463,13 @@ def allocate_host_inner( # noqa: C901
f"{wapi(infoblox_params)}/record:host",
json=req_payload,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
assert isinstance(r.json(), str)
@@ -407,8 +484,13 @@ def allocate_host_inner( # noqa: C901
f"{wapi(infoblox_params)}/record:cname",
json=cname_req_payload,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
assert r.json().startswith("record:cname/")
@@ -416,11 +498,19 @@ def allocate_host_inner( # noqa: C901
return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), v6=ipaddress.ip_address(ipv6_addr))
<<<<<<< HEAD
def allocate_host( # noqa: C901
hostname: str = "",
service_type: str = "",
service_networks: Optional[ServiceNetworks] = None,
host_addresses: Optional[HostAddresses] = None,
=======
def allocate_host(
hostname: str = "",
service_type: str = "",
service_networks: Optional[ServiceNetworks] = None,
host_addresses: Optional[ServiceNetworks] = None,
>>>>>>> 232046e (Fix conflicts in workflows)
cname_aliases: Optional[list] = None,
extattrs: Optional[dict] = None,
) -> HostAddresses:
@@ -555,7 +645,12 @@ def allocate_host( # noqa: C901
def delete_network(
<<<<<<< HEAD
network: ipaddress.ip_network = None, service_type: str = ""
=======
network: Union[V4ServiceNetwork, V6ServiceNetwork] = None,
service_type: str = ""
>>>>>>> 232046e (Fix conflicts in workflows)
) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
"""Delete IPv4 or IPv6 network by CIDR."""
oss = settings.load_oss_params()
@@ -592,8 +687,13 @@ def delete_network(
r = requests.delete(
f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}',
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
@@ -610,8 +710,13 @@ def delete_host(
hostname: str = "",
host_addresses: HostAddresses = None,
cname_aliases: Optional[list] = None,
<<<<<<< HEAD
service_type: str = "",
) -> HostAddresses:
=======
service_type: str = ""
) -> Union[V4HostAddress, V6HostAddress]:
>>>>>>> 232046e (Fix conflicts in workflows)
"""Delete host record and associated CNAME records.
All arguments passed to this function must match together a host record in
@@ -655,8 +760,13 @@ def delete_host(
"view": dns_view,
},
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
host_data = r.json()
assert len(host_data) == 1, "Host to delete does not exist in IPAM."
@@ -671,8 +781,13 @@ def delete_host(
"view": dns_view,
},
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
cname_data = r.json()
provided_cnames = [item + domain_name for item in cname_aliases]
@@ -683,8 +798,13 @@ def delete_host(
r = requests.delete(
f"{wapi(infoblox_params)}/{host_ref}",
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
@@ -694,12 +814,18 @@ def delete_host(
r = requests.delete(
f"{wapi(infoblox_params)}/{cname_ref}",
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
return host_addresses
<<<<<<< HEAD
def validate_network(
@@ -786,3 +912,5 @@ def validate_host(
return host_addresses
=======
>>>>>>> 232046e (Fix conflicts in workflows)
Loading