Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
GÉANT Service Orchestrator
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Jira
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
GÉANT Orchestration and Automation Team
GAP
GÉANT Service Orchestrator
Commits
b14c9d0a
Commit
b14c9d0a
authored
2 years ago
by
JORGE SASIAIN
Browse files
Options
Downloads
Patches
Plain Diff
NAT-152
: fix typo and fix all remaining tox errors
parent
53123d2d
No related branches found
No related tags found
1 merge request
!9
Ipam service
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
gso/services/_ipam.py
+167
-82
167 additions, 82 deletions
gso/services/_ipam.py
gso/services/ipam.py
+13
-5
13 additions, 5 deletions
gso/services/ipam.py
with
180 additions
and
87 deletions
gso/services/_ipam.py
+
167
−
82
View file @
b14c9d0a
...
...
@@ -47,7 +47,8 @@ requests.packages.urllib3.disable_warnings()
def
_match_error_code
(
response
,
error_code
):
return
response
.
status_code
==
error_code
.
value
[
0
]
and
error_code
.
value
[
1
]
in
response
.
text
return
response
.
status_code
==
error_code
.
value
[
0
]
\
and
error_code
.
value
[
1
]
in
response
.
text
def
_wapi
(
infoblox_params
:
settings
.
InfoBloxParams
):
...
...
@@ -79,7 +80,8 @@ def _ip_network_version(network):
def
_find_networks
(
network_container
=
None
,
network
=
None
,
ip_version
=
4
):
"""
If network_container is not None, find all networks within the specified container.
If network_container is not None, find all networks within the specified
container.
Otherwise, if network is not None, find the specified network.
Otherwise find all networks.
"""
...
...
@@ -96,11 +98,13 @@ def _find_networks(network_container=None, network=None, ip_version=4):
r
=
requests
.
get
(
f
'
{
_wapi
(
infoblox_params
)
}
/
{
endpoint
}
'
,
params
=
params
,
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
# TODO: propagate "network not found" error to caller
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
\
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
return
r
.
json
()
...
...
@@ -114,16 +118,22 @@ def _get_network_capacity(network=None):
ip_version
=
_ip_network_version
(
network
)
assert
ip_version
==
4
,
"
Utilization is only available for IPv4 networks.
"
params
=
{
'
network
'
:
network
,
'
_return_fields
'
:
'
network,total_hosts,utilization
'
}
params
=
{
'
network
'
:
network
,
'
_return_fields
'
:
'
network,total_hosts,utilization
'
}
r
=
requests
.
get
(
f
'
{
_wapi
(
infoblox_params
)
}
/network
'
,
params
=
params
,
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
# Utilization info takes several minutes to converge. The IPAM utilization bar in the GUI as well. Why?
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
# Utilization info takes several minutes to converge.
# The IPAM utilization bar in the GUI as well. Why?
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
\
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
capacity_info
=
r
.
json
()
assert
len
(
capacity_info
)
==
1
,
"
Requested IPv4 network doesn
'
t exist.
"
assert
'
utilization
'
in
capacity_info
[
0
]
...
...
@@ -131,11 +141,17 @@ def _get_network_capacity(network=None):
return
utilization
def
_allocate_network
(
infoblox_params
:
settings
.
InfoBloxParams
,
network_params
:
Union
[
settings
.
V4NetworkParams
,
settings
.
V6NetworkParams
],
ip_version
=
4
)
->
Union
[
V4ServiceNetwork
,
V6ServiceNetwork
]:
def
_allocate_network
(
infoblox_params
:
settings
.
InfoBloxParams
,
network_params
:
Union
[
settings
.
V4NetworkParams
,
settings
.
V6NetworkParams
],
ip_version
=
4
)
->
Union
[
V4ServiceNetwork
,
V6ServiceNetwork
]:
assert
ip_version
in
[
4
,
6
]
endpoint
=
'
network
'
if
ip_version
==
4
else
'
ipv6network
'
ip_container
=
'
networkcontainer
'
if
ip_version
==
4
else
'
ipv6networkcontainer
'
ip_container
=
'
networkcontainer
'
if
ip_version
==
4
else
\
'
ipv6networkcontainer
'
# only return in the response the allocated network, not all available
req_payload
=
{
"
network
"
:
{
"
_object_function
"
:
"
next_available_network
"
,
...
...
@@ -146,7 +162,7 @@ def _allocate_network(infoblox_params: settings.InfoBloxParams, network_params:
"
_object_parameters
"
:
{
"
network
"
:
str
(
network_params
.
containers
[
0
])
},
"
_result_field
"
:
"
networks
"
,
# only return in the response the allocated network, not all available
"
_result_field
"
:
"
networks
"
,
}
}
...
...
@@ -156,19 +172,23 @@ def _allocate_network(infoblox_params: settings.InfoBloxParams, network_params:
f
'
{
_wapi
(
infoblox_params
)
}
/
{
endpoint
}
'
,
params
=
{
'
_return_fields
'
:
'
network
'
},
json
=
req_payload
,
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
headers
=
{
'
content-type
'
:
"
application/json
"
},
verify
=
False
)
if
not
_match_error_code
(
response
=
r
,
error_code
=
IPAMErrors
.
CONTAINER_FULL
):
if
not
_match_error_code
(
response
=
r
,
error_code
=
IPAMErrors
.
CONTAINER_FULL
):
break
# Container full: try with next valid container for
the
service (if any)
# Container full: try with next valid container for service (if any)
container_index
+=
1
if
len
(
network_params
.
containers
)
<
(
container_index
+
1
):
break
req_payload
[
"
network
"
][
"
_object_parameters
"
][
"
network
"
]
=
str
(
network_params
.
containers
[
container_index
])
req_payload
[
"
network
"
][
"
_object_parameters
"
][
"
network
"
]
=
\
str
(
network_params
.
containers
[
container_index
])
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
\
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
assert
'
network
'
in
r
.
json
()
allocated_network
=
r
.
json
()[
'
network
'
]
...
...
@@ -185,8 +205,11 @@ def allocate_service_ipv4_network(service_type) -> V4ServiceNetwork:
oss
=
settings
.
load_oss_params
()
assert
oss
.
IPAM
ipam_params
=
oss
.
IPAM
assert
hasattr
(
ipam_params
,
service_type
)
and
service_type
!=
'
INFOBLOX
'
,
"
Invalid service type.
"
return
_allocate_network
(
ipam_params
.
INFOBLOX
,
getattr
(
ipam_params
,
service_type
).
V4
,
4
)
assert
hasattr
(
ipam_params
,
service_type
)
\
and
service_type
!=
'
INFOBLOX
'
,
"
Invalid service type.
"
return
_allocate_network
(
ipam_params
.
INFOBLOX
,
getattr
(
ipam_params
,
service_type
).
V4
,
4
)
def
allocate_service_ipv6_network
(
service_type
)
->
V6ServiceNetwork
:
...
...
@@ -196,32 +219,40 @@ def allocate_service_ipv6_network(service_type) -> V6ServiceNetwork:
oss
=
settings
.
load_oss_params
()
assert
oss
.
IPAM
ipam_params
=
oss
.
IPAM
assert
hasattr
(
ipam_params
,
service_type
)
and
service_type
!=
'
INFOBLOX
'
,
"
Invalid service type.
"
return
_allocate_network
(
ipam_params
.
INFOBLOX
,
getattr
(
ipam_params
,
service_type
).
V6
,
6
)
assert
hasattr
(
ipam_params
,
service_type
)
\
and
service_type
!=
'
INFOBLOX
'
,
"
Invalid service type.
"
return
_allocate_network
(
ipam_params
.
INFOBLOX
,
getattr
(
ipam_params
,
service_type
).
V6
,
6
)
def
_find_next_available_ip
(
infoblox_params
,
network_ref
):
r
=
requests
.
post
(
f
'
{
_wapi
(
infoblox_params
)
}
/
{
network_ref
}
?_function=next_available_ip&num=1
'
,
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
f
'
{
_wapi
(
infoblox_params
)
}
/
{
network_ref
}
?_function=next_available_ip&num=1
'
,
# noqa: E501
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
# TODO: propagate no more available IPs in the network
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
\
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
assert
'
ips
'
in
r
.
json
()
received_ip
=
r
.
json
()[
'
ips
'
]
assert
len
(
received_ip
)
==
1
return
received_ip
[
0
]
def
_allocate_host
(
hostname
=
None
,
addr
=
None
,
network
=
None
)
->
Union
[
V4HostAddress
,
V6HostAddress
]:
def
_allocate_host
(
hostname
=
None
,
addr
=
None
,
network
=
None
)
->
Union
[
V4HostAddress
,
V6HostAddress
]:
"""
If network is not None, allocate host in that network.
Otherwise if addr is not None, allocate host with that address.
hostname parameter must be full name including domain name.
"""
# TODO: should hostnames be unique (i.e. fail if hostname already exists in this domain/service)?
assert
addr
or
network
,
"
You must specify either the host address or the network CIDR.
"
# TODO: should hostnames be unique
# (i.e. fail if hostname already exists in this domain/service)?
assert
addr
or
network
,
\
"
You must specify either the host address or the network CIDR.
"
oss
=
settings
.
load_oss_params
()
assert
oss
.
IPAM
.
INFOBLOX
infoblox_params
=
oss
.
IPAM
.
INFOBLOX
...
...
@@ -230,9 +261,11 @@ def _allocate_host(hostname=None, addr=None, network=None) -> Union[V4HostAddres
ip_version
=
_ip_network_version
(
network
)
# Find the next available IP address in the network
network_info
=
_find_networks
(
network
=
network
,
ip_version
=
ip_version
)
assert
len
(
network_info
)
==
1
,
"
Network does not exist. Create it first.
"
assert
len
(
network_info
)
==
1
,
\
"
Network does not exist. Create it first.
"
assert
'
_ref
'
in
network_info
[
0
]
addr
=
_find_next_available_ip
(
infoblox_params
,
network_info
[
0
][
"
_ref
"
])
addr
=
_find_next_available_ip
(
infoblox_params
,
network_info
[
0
][
"
_ref
"
])
else
:
ip_version
=
_ip_addr_version
(
addr
)
...
...
@@ -251,15 +284,17 @@ def _allocate_host(hostname=None, addr=None, network=None) -> Union[V4HostAddres
r
=
requests
.
post
(
f
'
{
_wapi
(
infoblox_params
)
}
/record:host
'
,
json
=
ip_req_payload
,
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
\
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
assert
isinstance
(
r
.
json
(),
str
)
assert
r
.
json
().
startswith
(
"
record:host/
"
)
dns_req_payload
=
{
f
"
ipv
{
ip_
req_payload
}
addr
"
:
addr
,
f
"
ipv
{
ip_
version
}
addr
"
:
addr
,
"
name
"
:
hostname
,
"
view
"
:
"
default
"
}
...
...
@@ -269,10 +304,12 @@ def _allocate_host(hostname=None, addr=None, network=None) -> Union[V4HostAddres
r
=
requests
.
post
(
f
'
{
_wapi
(
infoblox_params
)
}
/
{
endpoint
}
'
,
json
=
dns_req_payload
,
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
\
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
assert
isinstance
(
r
.
json
(),
str
)
assert
r
.
json
().
startswith
(
f
"
{
endpoint
}
/
"
)
...
...
@@ -282,29 +319,42 @@ def _allocate_host(hostname=None, addr=None, network=None) -> Union[V4HostAddres
return
V6HostAddress
(
v6
=
addr
)
def
allocate_service_host
(
hostname
=
None
,
service_type
=
None
,
service_networks
:
ServiceNetworks
=
None
,
host_addresses
:
HostAddresses
=
None
)
->
HostAddresses
:
def
allocate_service_host
(
hostname
=
None
,
service_type
=
None
,
service_networks
:
ServiceNetworks
=
None
,
host_addresses
:
HostAddresses
=
None
)
->
HostAddresses
:
"""
Allocate host with both IPv4 and IPv6 address (and respective DNS records).
The domain name is also taken from the service type and appended to specified hostname.
Allocate host with both IPv4 and IPv6 address (and respective DNS
records).
The domain name is also taken from the service type and appended to
specified hostname.
If service_networks is provided, that one is used.
If service_networks is not provided, and host_addresses is provided, those specific addresses are used.
If neither is not provided, the first network with available space for this service type is used.
Note that if WFO will always specify the network/addresses after creating it, this mode won
'
t be needed.
Currently this mode doesn
'
t look further than the first container, so if needed, this will need to be updated.
If service_networks is not provided, and host_addresses is provided,
those specific addresses are used.
If neither is not provided, the first network with available space for
this service type is used.
Note that if WFO will always specify the network/addresses after
creating it, this mode won
'
t be needed. Currently this mode doesn
'
t
look further than the first container, so if needed, this will need
to be updated.
"""
oss
=
settings
.
load_oss_params
()
assert
oss
.
IPAM
ipam_params
=
oss
.
IPAM
assert
hasattr
(
ipam_params
,
service_type
)
and
service_type
!=
'
INFOBLOX
'
,
"
Invalid service type.
"
assert
hasattr
(
ipam_params
,
service_type
)
\
and
service_type
!=
'
INFOBLOX
'
,
"
Invalid service type.
"
ipv4_containers
=
getattr
(
ipam_params
,
service_type
).
V4
.
containers
ipv6_containers
=
getattr
(
ipam_params
,
service_type
).
V6
.
containers
domain_name
=
getattr
(
ipam_params
,
service_type
).
domain_name
# IPv4
if
not
service_networks
and
not
host_addresses
:
ipv4_networks_info
=
_find_networks
(
network_container
=
str
(
ipv4_containers
[
0
]),
ip_version
=
4
)
assert
len
(
ipv4_networks_info
)
>=
1
,
"
No IPv4 network exists in the container for this service type.
"
ipv4_networks_info
=
_find_networks
(
network_container
=
str
(
ipv4_containers
[
0
]),
ip_version
=
4
)
assert
len
(
ipv4_networks_info
)
>=
1
,
\
"
No IPv4 network exists in the container for this service type.
"
first_nonfull_ipv4_network
=
None
for
ipv4_network_info
in
ipv4_networks_info
:
assert
'
network
'
in
ipv4_network_info
...
...
@@ -312,37 +362,53 @@ def allocate_service_host(hostname=None, service_type=None, service_networks: Se
if
capacity
<
1000
:
first_nonfull_ipv4_network
=
ipv4_network_info
[
"
network
"
]
break
# Create a new network if the existing networks in the container for the service type are all full.
# Create a new network if the existing networks in the container for
# the service type are all full.
if
not
first_nonfull_ipv4_network
:
first_nonfull_ipv4_network
=
str
(
allocate_service_ipv4_network
(
service_type
=
service_type
).
v4
)
assert
first_nonfull_ipv4_network
,
"
No available IPv4 addresses for this service type.
"
v4_host
=
_allocate_host
(
hostname
=
hostname
+
domain_name
,
network
=
first_nonfull_ipv4_network
)
first_nonfull_ipv4_network
=
str
(
allocate_service_ipv4_network
(
service_type
=
service_type
).
v4
)
assert
first_nonfull_ipv4_network
,
\
"
No available IPv4 addresses for this service type.
"
v4_host
=
_allocate_host
(
hostname
=
hostname
+
domain_name
,
network
=
first_nonfull_ipv4_network
)
elif
service_networks
:
network
=
service_networks
.
v4
assert
any
(
network
.
subnet_of
(
ipv4_container
)
for
ipv4_container
in
ipv4_containers
)
v4_host
=
_allocate_host
(
hostname
=
hostname
+
domain_name
,
network
=
str
(
network
))
assert
any
(
network
.
subnet_of
(
ipv4_container
)
for
ipv4_container
in
ipv4_containers
)
v4_host
=
_allocate_host
(
hostname
=
hostname
+
domain_name
,
network
=
str
(
network
))
elif
host_addresses
:
addr
=
host_addresses
.
v4
assert
any
(
addr
in
ipv4_container
for
ipv4_container
in
ipv4_containers
)
v4_host
=
_allocate_host
(
hostname
=
hostname
+
domain_name
,
addr
=
str
(
addr
))
assert
any
(
addr
in
ipv4_container
for
ipv4_container
in
ipv4_containers
)
v4_host
=
_allocate_host
(
hostname
=
hostname
+
domain_name
,
addr
=
str
(
addr
))
# IPv6
if
not
service_networks
and
not
host_addresses
:
# ipv6 does not support capacity fetching (not even the GUI displays it).
# Maybe it's assumed that there is always available space?
ipv6_networks_info
=
_find_networks
(
network_container
=
str
(
ipv6_containers
[
0
]),
ip_version
=
6
)
assert
len
(
ipv6_networks_info
)
>=
1
,
"
No IPv6 network exists in the container for this service type.
"
# ipv6 does not support capacity fetching (not even the GUI displays
# it). Maybe it's assumed that there is always available space?
ipv6_networks_info
=
_find_networks
(
network_container
=
str
(
ipv6_containers
[
0
]),
ip_version
=
6
)
assert
len
(
ipv6_networks_info
)
>=
1
,
\
"
No IPv6 network exists in the container for this service type.
"
assert
'
network
'
in
ipv6_networks_info
[
0
]
# TODO: if "no available IP" error, create a new network?
v6_host
=
_allocate_host
(
hostname
=
hostname
+
domain_name
,
network
=
ipv6_networks_info
[
0
][
'
network
'
])
v6_host
=
_allocate_host
(
hostname
=
hostname
+
domain_name
,
network
=
ipv6_networks_info
[
0
][
'
network
'
])
elif
service_networks
:
network
=
service_networks
.
v6
assert
any
(
network
.
subnet_of
(
ipv6_container
)
for
ipv6_container
in
ipv6_containers
)
v6_host
=
_allocate_host
(
hostname
=
hostname
+
domain_name
,
network
=
str
(
network
))
assert
any
(
network
.
subnet_of
(
ipv6_container
)
for
ipv6_container
in
ipv6_containers
)
v6_host
=
_allocate_host
(
hostname
=
hostname
+
domain_name
,
network
=
str
(
network
))
elif
host_addresses
:
addr
=
host_addresses
.
v6
assert
any
(
addr
in
ipv6_container
for
ipv6_container
in
ipv6_containers
)
v6_host
=
_allocate_host
(
hostname
=
hostname
+
domain_name
,
addr
=
str
(
addr
))
assert
any
(
addr
in
ipv6_container
for
ipv6_container
in
ipv6_containers
)
v6_host
=
_allocate_host
(
hostname
=
hostname
+
domain_name
,
addr
=
str
(
addr
))
return
HostAddresses
(
v4
=
v4_host
.
v4
,
v6
=
v6_host
.
v6
)
...
...
@@ -361,14 +427,17 @@ def _find_containers(network=None, ip_version=4):
oss
=
settings
.
load_oss_params
()
assert
oss
.
IPAM
.
INFOBLOX
infoblox_params
=
oss
.
IPAM
.
INFOBLOX
endpoint
=
'
networkcontainer
'
if
ip_version
==
4
else
'
ipv6networkcontainer
'
endpoint
=
'
networkcontainer
'
if
ip_version
==
4
\
else
'
ipv6networkcontainer
'
r
=
requests
.
get
(
f
'
{
_wapi
(
infoblox_params
)
}
/
{
endpoint
}
'
,
params
=
{
'
network
'
:
network
}
if
network
else
None
,
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
\
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
return
r
.
json
()
...
...
@@ -376,8 +445,9 @@ def _delete_network(network) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
"""
Delete IPv4 or IPv6 network by CIDR.
"""
# TODO: should we check that there are no hosts in this network before deleting?
# Deleting a network deletes the hosts in it, but not the associated DNS records.
# TODO: should we check that there are no hosts in this network before
# deleting? Deleting a network deletes the hosts in it, but not the
# associated DNS records.
oss
=
settings
.
load_oss_params
()
assert
oss
.
IPAM
.
INFOBLOX
infoblox_params
=
oss
.
IPAM
.
INFOBLOX
...
...
@@ -390,14 +460,18 @@ def _delete_network(network) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
r
=
requests
.
delete
(
f
'
{
_wapi
(
infoblox_params
)
}
/
{
network_info
[
0
][
"
_ref
"
]
}
'
,
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
\
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
# Extract ipv4/ipv6 address from the network reference obtained in the response
# Extract ipv4/ipv6 address from the network reference obtained in the
# response
r_json
=
r
.
json
()
network_address
=
ipaddress
.
ip_network
(
r_json
.
rsplit
(
"
/
"
,
1
)[
0
].
split
(
"
:
"
)[
1
].
replace
(
"
%3A
"
,
"
:
"
))
network_address
=
ipaddress
.
ip_network
(
r_json
.
rsplit
(
"
/
"
,
1
)[
0
].
split
(
"
:
"
)[
1
].
replace
(
"
%3A
"
,
"
:
"
))
if
ip_version
==
4
:
return
V4ServiceNetwork
(
v4
=
network_address
)
else
:
...
...
@@ -419,7 +493,8 @@ def _delete_host_by_ip(addr) -> Union[V4HostAddress, V6HostAddress]:
r
=
requests
.
get
(
f
'
{
_wapi
(
infoblox_params
)
}
/record:host
'
,
params
=
{
ip_param
:
addr
},
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
host_data
=
r
.
json
()
...
...
@@ -430,10 +505,12 @@ def _delete_host_by_ip(addr) -> Union[V4HostAddress, V6HostAddress]:
# Delete it
r
=
requests
.
delete
(
f
'
{
_wapi
(
infoblox_params
)
}
/
{
host_ref
}
'
,
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
\
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
# Also find and delete the associated dns a/aaaa record
endpoint
=
'
record:a
'
if
ip_version
==
4
else
'
record:aaaa
'
...
...
@@ -441,7 +518,8 @@ def _delete_host_by_ip(addr) -> Union[V4HostAddress, V6HostAddress]:
r
=
requests
.
get
(
f
'
{
_wapi
(
infoblox_params
)
}
/
{
endpoint
}
'
,
params
=
{
ip_param
:
addr
},
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
dns_data
=
r
.
json
()
...
...
@@ -451,10 +529,12 @@ def _delete_host_by_ip(addr) -> Union[V4HostAddress, V6HostAddress]:
r
=
requests
.
delete
(
f
'
{
_wapi
(
infoblox_params
)
}
/
{
dns_ref
}
'
,
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
\
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
if
ip_version
==
4
:
return
V4HostAddress
(
v4
=
addr
)
...
...
@@ -496,9 +576,11 @@ if __name__ == '__main__':
service_type
=
input
(
"
Enter service type:
"
)
ip_version
=
int
(
input
(
"
Enter IP version (4 or 6):
"
))
if
ip_version
==
4
:
new_network
=
allocate_service_ipv4_network
(
service_type
=
service_type
)
new_network
=
allocate_service_ipv4_network
(
service_type
=
service_type
)
elif
ip_version
==
6
:
new_network
=
allocate_service_ipv6_network
(
service_type
=
service_type
)
new_network
=
allocate_service_ipv6_network
(
service_type
=
service_type
)
else
:
print
(
"
Invalid IP version. Please enter either 4 or 6.
"
)
continue
...
...
@@ -510,21 +592,24 @@ if __name__ == '__main__':
print
(
json
.
dumps
(
str
(
deleted_network
),
indent
=
2
))
elif
choice
==
'
6
'
:
hostname
=
input
(
"
Enter host name (full name
including
domain name):
"
)
hostname
=
input
(
"
Enter host name (full name
w/
domain name):
"
)
addr
=
input
(
"
Enter IP address to allocate:
"
)
alloc_ip
=
_allocate_host
(
hostname
=
hostname
,
addr
=
addr
)
print
(
json
.
dumps
(
str
(
alloc_ip
),
indent
=
2
))
elif
choice
==
'
7
'
:
hostname
=
input
(
"
Enter host name (full name including domain name):
"
)
network
=
input
(
"
Enter an existing network to allocate from (in CIDR notation):
"
)
hostname
=
input
(
"
Enter host name (full name w/ domain name):
"
)
network
=
input
(
"
Enter existing network to allocate from (CIDR notation):
"
)
alloc_ip
=
_allocate_host
(
hostname
=
hostname
,
network
=
network
)
print
(
json
.
dumps
(
str
(
alloc_ip
),
indent
=
2
))
elif
choice
==
'
8
'
:
hostname
=
input
(
"
Enter host name (
not including
domain name):
"
)
hostname
=
input
(
"
Enter host name (
w/o
domain name):
"
)
service_type
=
input
(
"
Enter service type:
"
)
alloc_ip
=
allocate_service_host
(
hostname
=
hostname
,
service_type
=
service_type
)
alloc_ip
=
allocate_service_host
(
hostname
=
hostname
,
service_type
=
service_type
)
print
(
json
.
dumps
(
str
(
alloc_ip
),
indent
=
2
))
elif
choice
==
'
9
'
:
...
...
This diff is collapsed.
Click to expand it.
gso/services/ipam.py
+
13
−
5
View file @
b14c9d0a
...
...
@@ -31,15 +31,22 @@ class HostAddresses(BaseSettings):
def
new_service_networks
(
service_type
)
->
ServiceNetworks
:
v4_service_network
=
_ipam
.
allocate_service_ipv4_network
(
service_type
=
service_type
)
v6_service_network
=
_ipam
.
allocate_service_ipv6_network
(
service_type
=
service_type
)
v4_service_network
=
_ipam
.
allocate_service_ipv4_network
(
service_type
=
service_type
)
v6_service_network
=
_ipam
.
allocate_service_ipv6_network
(
service_type
=
service_type
)
return
ServiceNetworks
(
v4
=
v4_service_network
.
v4
,
v6
=
v6_service_network
.
v6
)
def
new_service_host
(
hostname
,
service_type
,
service_networks
:
ServiceNetworks
)
->
HostAddresses
:
return
_ipam
.
allocate_service_host
(
hostname
=
hostname
,
service_type
=
service_type
,
service_networks
=
service_networks
)
def
new_service_host
(
hostname
,
service_type
,
service_networks
:
ServiceNetworks
)
->
HostAddresses
:
return
_ipam
.
allocate_service_host
(
hostname
=
hostname
,
service_type
=
service_type
,
service_networks
=
service_networks
)
if
__name__
==
'
__main__
'
:
...
...
@@ -54,7 +61,8 @@ if __name__ == '__main__':
lo1_service_networks
=
new_service_networks
(
'
LO
'
)
lo1_v4_host_address
=
lo1_service_networks
.
v4
.
network_address
lo1_v6_host_address
=
lo1_service_networks
.
v6
.
network_address
lo1_host_addresses
=
HostAddresses
(
v4
=
lo1_v4_host_address
,
v6
=
lo1_v6_host_address
)
lo1_host_addresses
=
HostAddresses
(
v4
=
lo1_v4_host_address
,
v6
=
lo1_v6_host_address
)
new_service_host
(
hostname_A
,
'
LO
'
,
lo1_service_networks
)
# h2 loopback
lo2_service_networks
=
new_service_networks
(
'
LO
'
)
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment