Skip to content
Snippets Groups Projects
Commit 73769173 authored by Simone Spinelli's avatar Simone Spinelli
Browse files

Manually fix more conflicts

parent a97bc163
No related branches found
No related tags found
1 merge request!52Feature/nat 212 213
......@@ -84,10 +84,17 @@ def ip_network_version(network: str = ""):
def assert_host_in_service(
ipv4_addr: str = "",
ipv6_addr: str = "",
<<<<<<< HEAD
oss_ipv4_containers=None,
oss_ipv6_containers=None,
oss_ipv4_networks=None,
oss_ipv6_networks=None,
=======
oss_ipv4_containers = None,
oss_ipv6_containers = None,
oss_ipv4_networks = None,
oss_ipv6_networks = None,
>>>>>>> 232046e (Fix conflicts in workflows)
):
# IPv4
if oss_ipv4_containers:
......@@ -110,6 +117,7 @@ def assert_host_in_service(
), "Host's IPv6 address doesn't belong to service type."
<<<<<<< HEAD
def assert_network_in_service(
ipv4_network: Optional[V4ServiceNetwork] = None,
ipv6_network: Optional[V6ServiceNetwork] = None,
......@@ -150,6 +158,14 @@ def find_networks(network_container: Optional[str] = "", network: Optional[str]
-------
(list) all found networks mathing the args, which may be empty.
=======
def find_networks(network_container: Optional[str] = "", network: Optional[str] = "", ip_version: int = 4):
"""If network_container is not None, find all networks within the specified container.
Otherwise, if network is not None, find the specified network.
Otherwise find all networks.
A list of all found networks is returned (an HTTP 200 code
may be returned with an empty list).
>>>>>>> 232046e (Fix conflicts in workflows)
"""
assert ip_version in [4, 6]
oss = settings.load_oss_params()
......@@ -165,8 +181,13 @@ def find_networks(network_container: Optional[str] = "", network: Optional[str]
f"{wapi(infoblox_params)}/{endpoint}",
params=params,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
return r.json()
......@@ -212,8 +233,13 @@ def allocate_network_inner(
json=req_payload,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
headers={"content-type": "application/json"},
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
if not match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL):
break
......@@ -232,6 +258,7 @@ def allocate_network_inner(
return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network))
<<<<<<< HEAD
def allocate_ipv4_network(
service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None
) -> V4ServiceNetwork:
......@@ -248,6 +275,10 @@ def allocate_ipv4_network(
(V4ServiceNetwork): the allocated network
"""
=======
def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V4ServiceNetwork:
"""Allocate IPv4 network within the container of the specified service type."""
>>>>>>> 232046e (Fix conflicts in workflows)
if extattrs is None:
extattrs = {}
oss = settings.load_oss_params()
......@@ -257,6 +288,7 @@ def allocate_ipv4_network(
return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs)
<<<<<<< HEAD
def allocate_ipv6_network(
service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None
) -> V6ServiceNetwork:
......@@ -273,6 +305,10 @@ def allocate_ipv6_network(
(V4ServiceNetwork): the allocated network
"""
=======
def allocate_ipv6_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V6ServiceNetwork:
"""Allocate IPv6 network within the container of the specified service type."""
>>>>>>> 232046e (Fix conflicts in workflows)
if extattrs is None:
extattrs = {}
oss = settings.load_oss_params()
......@@ -283,13 +319,28 @@ def allocate_ipv6_network(
def allocate_networks(
<<<<<<< HEAD
service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None
=======
service_type: str = "",
comment: Optional[str] = "",
extattrs: Optional[dict] = None
>>>>>>> 232046e (Fix conflicts in workflows)
) -> ServiceNetworks:
"""Allocate IPv4 and IPv6 network for the specified service type."""
if extattrs is None:
extattrs = {}
<<<<<<< HEAD
v4_service_network = allocate_ipv4_network(service_type=service_type, comment=comment, extattrs=extattrs)
v6_service_network = allocate_ipv6_network(service_type=service_type, comment=comment, extattrs=extattrs)
=======
v4_service_network = allocate_ipv4_network(
service_type=service_type, comment=comment, extattrs=extattrs
)
v6_service_network = allocate_ipv6_network(
service_type=service_type, comment=comment, extattrs=extattrs
)
>>>>>>> 232046e (Fix conflicts in workflows)
return ServiceNetworks(v4=v4_service_network.v4, v6=v6_service_network.v6)
......@@ -309,8 +360,13 @@ def find_next_available_ip(infoblox_params, network_ref: str = ""):
r = requests.post(
f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1",
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
if match_error_code(response=r, error_code=IPAMErrors.NETWORK_FULL):
......@@ -323,21 +379,37 @@ def find_next_available_ip(infoblox_params, network_ref: str = ""):
return received_ip[0]
<<<<<<< HEAD
def allocate_host_inner( # noqa: C901
=======
def allocate_host_inner(
>>>>>>> 232046e (Fix conflicts in workflows)
hostname: str = "",
addrs: Optional[Tuple] = None,
networks: Optional[Tuple] = None,
cname_aliases: Optional[list] = None,
dns_view: Optional[str] = "default",
<<<<<<< HEAD
extattrs: Optional[dict] = None,
=======
extattrs: Optional[dict] = None
>>>>>>> 232046e (Fix conflicts in workflows)
) -> Union[HostAddresses, str]:
# TODO: should hostnames be unique
<<<<<<< HEAD
# (fail if hostname already exists in this domain/service)?
=======
# (i.e. fail if hostname already exists in this domain/service)?
>>>>>>> 232046e (Fix conflicts in workflows)
if cname_aliases is None:
cname_aliases = []
if extattrs is None:
extattrs = {}
<<<<<<< HEAD
assert addrs or networks, "Neither networks nor host addresses could be derived to allocate host."
=======
assert addrs or networks, "You must specify either the host addresses or the networks CIDR."
>>>>>>> 232046e (Fix conflicts in workflows)
oss = settings.load_oss_params()
assert oss.IPAM.INFOBLOX
infoblox_params = oss.IPAM.INFOBLOX
......@@ -391,8 +463,13 @@ def allocate_host_inner( # noqa: C901
f"{wapi(infoblox_params)}/record:host",
json=req_payload,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
assert isinstance(r.json(), str)
......@@ -407,8 +484,13 @@ def allocate_host_inner( # noqa: C901
f"{wapi(infoblox_params)}/record:cname",
json=cname_req_payload,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
assert r.json().startswith("record:cname/")
......@@ -416,11 +498,19 @@ def allocate_host_inner( # noqa: C901
return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), v6=ipaddress.ip_address(ipv6_addr))
<<<<<<< HEAD
def allocate_host( # noqa: C901
hostname: str = "",
service_type: str = "",
service_networks: Optional[ServiceNetworks] = None,
host_addresses: Optional[HostAddresses] = None,
=======
def allocate_host(
hostname: str = "",
service_type: str = "",
service_networks: Optional[ServiceNetworks] = None,
host_addresses: Optional[ServiceNetworks] = None,
>>>>>>> 232046e (Fix conflicts in workflows)
cname_aliases: Optional[list] = None,
extattrs: Optional[dict] = None,
) -> HostAddresses:
......@@ -555,7 +645,12 @@ def allocate_host( # noqa: C901
def delete_network(
<<<<<<< HEAD
network: ipaddress.ip_network = None, service_type: str = ""
=======
network: Union[V4ServiceNetwork, V6ServiceNetwork] = None,
service_type: str = ""
>>>>>>> 232046e (Fix conflicts in workflows)
) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
"""Delete IPv4 or IPv6 network by CIDR."""
oss = settings.load_oss_params()
......@@ -592,8 +687,13 @@ def delete_network(
r = requests.delete(
f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}',
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
......@@ -610,8 +710,13 @@ def delete_host(
hostname: str = "",
host_addresses: HostAddresses = None,
cname_aliases: Optional[list] = None,
<<<<<<< HEAD
service_type: str = "",
) -> HostAddresses:
=======
service_type: str = ""
) -> Union[V4HostAddress, V6HostAddress]:
>>>>>>> 232046e (Fix conflicts in workflows)
"""Delete host record and associated CNAME records.
All arguments passed to this function must match together a host record in
......@@ -655,8 +760,13 @@ def delete_host(
"view": dns_view,
},
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
host_data = r.json()
assert len(host_data) == 1, "Host to delete does not exist in IPAM."
......@@ -671,8 +781,13 @@ def delete_host(
"view": dns_view,
},
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
cname_data = r.json()
provided_cnames = [item + domain_name for item in cname_aliases]
......@@ -683,8 +798,13 @@ def delete_host(
r = requests.delete(
f"{wapi(infoblox_params)}/{host_ref}",
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
......@@ -694,12 +814,18 @@ def delete_host(
r = requests.delete(
f"{wapi(infoblox_params)}/{cname_ref}",
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
<<<<<<< HEAD
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT,
=======
verify=False,
timeout=REQUESTS_TIMEOUT
>>>>>>> 232046e (Fix conflicts in workflows)
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
return host_addresses
<<<<<<< HEAD
def validate_network(
......@@ -786,3 +912,5 @@ def validate_host(
return host_addresses
=======
>>>>>>> 232046e (Fix conflicts in workflows)
......@@ -17,8 +17,8 @@ from gso.products.product_blocks import device as device_pb
from gso.products.product_types import device
from gso.products.product_types.device import DeviceInactive, DeviceProvisioning
from gso.products.product_types.site import Site
from gso.services import ipam, provisioning_proxy
from gso.services.provisioning_proxy import pp_interaction
from gso.services import ipam, provisioning_proxy
def site_selector() -> Choice:
......
......@@ -13,8 +13,8 @@ from gso.products.product_blocks import PhyPortCapacity
from gso.products.product_blocks.iptrunk import IptrunkType
from gso.products.product_types.device import Device
from gso.products.product_types.iptrunk import IptrunkInactive, IptrunkProvisioning
from gso.services import ipam, provisioning_proxy
from gso.services.provisioning_proxy import pp_interaction
from gso.services import ipam, provisioning_proxy
def initial_input_form_generator(product_name: str) -> FormGenerator:
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment