Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
GÉANT Service Orchestrator
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Jira
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
GÉANT Orchestration and Automation Team
GAP
GÉANT Service Orchestrator
Commits
edb82318
Commit
edb82318
authored
2 years ago
by
JORGE SASIAIN
Browse files
Options
Downloads
Patches
Plain Diff
Experimenting with ipam networks and hosts - ipv4
parent
a51ac07e
No related branches found
No related tags found
1 merge request
!9
Ipam service
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
.gitignore
+2
-1
2 additions, 1 deletion
.gitignore
gso/services/_ipam.py
+277
-0
277 additions, 0 deletions
gso/services/_ipam.py
with
279 additions
and
1 deletion
.gitignore
+
2
−
1
View file @
edb82318
...
@@ -3,4 +3,5 @@ __pycache__/
...
@@ -3,4 +3,5 @@ __pycache__/
.coverage
.coverage
coverage.xml
coverage.xml
.tox/device_vendor
.tox/device_vendor
.vscode
.vscode
\ No newline at end of file
oss-params.json
This diff is collapsed.
Click to expand it.
gso/services/_ipam.py
0 → 100644
+
277
−
0
View file @
edb82318
import
requests
from
requests.auth
import
HTTPBasicAuth
import
json
import
ipaddress
from
pydantic
import
BaseSettings
from
typing
import
Union
from
gso
import
settings
class
V4ServiceNetwork
(
BaseSettings
):
v4
:
ipaddress
.
IPv4Network
class
V6ServiceNetwork
(
BaseSettings
):
v6
:
ipaddress
.
IPv6Network
class
ServiceNetworks
(
BaseSettings
):
v4
:
V4ServiceNetwork
v6
:
V6ServiceNetwork
class
V4HostAddress
(
BaseSettings
):
v4
:
ipaddress
.
IPv4Address
class
V6HostAddress
(
BaseSettings
):
v6
:
ipaddress
.
IPv6Address
class
HostAddresses
(
BaseSettings
):
v4
:
V4HostAddress
v6
:
V6HostAddress
# TODO: remove this!
# lab infoblox cert is not valid for the ipv4 address
# ... disable warnings for now
requests
.
packages
.
urllib3
.
disable_warnings
()
def
_wapi
(
infoblox_params
:
settings
.
InfoBloxParams
):
return
(
f
'
https://
{
infoblox_params
.
host
}
'
f
'
/wapi/
{
infoblox_params
.
wapi_version
}
'
)
def
find_containers
(
network
=
None
):
oss
=
settings
.
load_oss_params
()
assert
oss
.
IPAM
.
INFOBLOX
infoblox_params
=
oss
.
IPAM
.
INFOBLOX
r
=
requests
.
get
(
f
'
{
_wapi
(
infoblox_params
)
}
/networkcontainer
'
,
params
=
{
'
network
'
:
network
}
if
network
else
None
,
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
return
r
.
json
()
def
find_networks
(
network
=
None
):
oss
=
settings
.
load_oss_params
()
assert
oss
.
IPAM
.
INFOBLOX
infoblox_params
=
oss
.
IPAM
.
INFOBLOX
r
=
requests
.
get
(
f
'
{
_wapi
(
infoblox_params
)
}
/network
'
,
params
=
{
'
network
'
:
network
}
if
network
else
None
,
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
return
r
.
json
()
def
_allocate_network
(
infoblox_params
:
settings
.
InfoBloxParams
,
network_params
:
Union
[
settings
.
V4NetworkParams
,
settings
.
V6NetworkParams
],
ip_version
=
4
):
assert
ip_version
in
[
4
,
6
]
_req_payload
=
{
"
network
"
:
{
"
_object_function
"
:
"
next_available_network
"
,
"
_parameters
"
:
{
"
cidr
"
:
network_params
.
mask
},
"
_object
"
:
"
networkcontainer
"
,
"
_object_parameters
"
:
{
"
network
"
:
str
(
network_params
.
container
)
},
"
_result_field
"
:
"
networks
"
,
}
}
r
=
requests
.
post
(
f
'
{
_wapi
(
infoblox_params
)
}
/network
'
,
params
=
{
'
_return_fields
'
:
'
network
'
},
json
=
_req_payload
,
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
headers
=
{
'
content-type
'
:
"
application/json
"
},
verify
=
False
)
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
allocated_network
=
r
.
json
()[
'
network
'
]
if
ip_version
==
4
:
return
V4ServiceNetwork
(
v4
=
allocated_network
)
else
:
return
V6ServiceNetwork
(
v6
=
allocated_network
)
def
allocate_service_ipv4_network
(
service_type
):
oss
=
settings
.
load_oss_params
()
assert
oss
.
IPAM
ipam_params
=
oss
.
IPAM
# sanity: verify the service_type is a proper one
assert
hasattr
(
ipam_params
,
service_type
)
and
service_type
!=
'
INFOBLOX
'
return
_allocate_network
(
ipam_params
.
INFOBLOX
,
getattr
(
ipam_params
,
service_type
).
V4
,
4
)
def
allocate_service_ipv6_network
(
service_type
):
oss
=
settings
.
load_oss_params
()
assert
oss
.
IPAM
ipam_params
=
oss
.
IPAM
# sanity: verify the service_type is a proper one
assert
hasattr
(
ipam_params
,
service_type
)
and
service_type
!=
'
INFOBLOX
'
return
_allocate_network
(
ipam_params
.
INFOBLOX
,
getattr
(
ipam_params
,
service_type
).
V6
,
6
)
def
delete_network
(
network
):
oss
=
settings
.
load_oss_params
()
assert
oss
.
IPAM
.
INFOBLOX
infoblox_params
=
oss
.
IPAM
.
INFOBLOX
network_info
=
find_networks
(
network
=
network
)
assert
len
(
network_info
)
==
1
,
"
Network does not exist.
"
assert
'
_ref
'
in
network_info
[
0
]
r
=
requests
.
delete
(
f
'
{
_wapi
(
infoblox_params
)
}
/
{
network_info
[
0
][
"
_ref
"
]
}
'
,
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
# Extract ipv4/ipv6 address from the network reference obtained in the response
r_json
=
r
.
json
()
network_address
=
ipaddress
.
ip_network
(
r_json
.
rsplit
(
"
/
"
,
1
)[
0
].
split
(
"
:
"
)[
1
])
if
isinstance
(
network_address
,
ipaddress
.
IPv4Network
):
return
V4ServiceNetwork
(
v4
=
network_address
)
elif
isinstance
(
network_address
,
ipaddress
.
IPv6Network
):
return
V6ServiceNetwork
(
v6
=
network_address
)
def
_find_next_available_ip
(
infoblox_params
,
network_ref
):
r
=
requests
.
post
(
f
'
{
_wapi
(
infoblox_params
)
}
/
{
network_ref
}
?_function=next_available_ip&num=1
'
,
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
assert
'
ips
'
in
r
.
json
()
received_ip
=
r
.
json
()[
'
ips
'
]
assert
len
(
received_ip
)
==
1
return
received_ip
[
0
]
def
allocate_host
(
mac
=
None
,
hostname
=
None
,
ipv4addr
=
None
,
network_cidr
=
None
):
oss
=
settings
.
load_oss_params
()
assert
oss
.
IPAM
.
INFOBLOX
infoblox_params
=
oss
.
IPAM
.
INFOBLOX
if
network_cidr
:
# Find the next available IP address in the network
network_info
=
find_networks
(
network
=
network_cidr
)
assert
len
(
network_info
)
==
1
,
"
Network does not exist. Create it first.
"
assert
'
_ref
'
in
network_info
[
0
]
ipv4addr
=
_find_next_available_ip
(
infoblox_params
,
network_info
[
0
][
"
_ref
"
])
_req_payload
=
{
"
ipv4addrs
"
:
[
{
"
ipv4addr
"
:
ipv4addr
,
"
mac
"
:
mac
}
],
"
name
"
:
hostname
,
"
configure_for_dns
"
:
False
,
"
view
"
:
"
default
"
}
r
=
requests
.
post
(
f
'
{
_wapi
(
infoblox_params
)
}
/record:host
'
,
json
=
_req_payload
,
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
assert
r
.
status_code
>=
200
and
r
.
status_code
<
300
,
f
"
HTTP error
{
r
.
status_code
}
:
{
r
.
reason
}
\n\n
{
r
.
text
}
"
assert
isinstance
(
r
.
json
(),
str
)
assert
r
.
json
().
startswith
(
"
record:host/
"
)
return
V4HostAddress
(
v4
=
ipv4addr
)
def
delete_host_by_ip
(
ipv4addr
):
oss
=
settings
.
load_oss_params
()
assert
oss
.
IPAM
.
INFOBLOX
infoblox_params
=
oss
.
IPAM
.
INFOBLOX
# Find host record reference
r
=
requests
.
get
(
f
'
{
_wapi
(
infoblox_params
)
}
/record:host
'
,
params
=
{
'
ipv4addr
'
:
ipv4addr
},
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
host_data
=
r
.
json
()
assert
len
(
host_data
)
==
1
,
"
Host does not exist.
"
assert
'
_ref
'
in
host_data
[
0
]
host_ref
=
host_data
[
0
][
'
_ref
'
]
# Delete it
r
=
requests
.
delete
(
f
'
{
_wapi
(
infoblox_params
)
}
/
{
host_ref
}
'
,
auth
=
HTTPBasicAuth
(
infoblox_params
.
username
,
infoblox_params
.
password
),
verify
=
False
)
r
.
raise_for_status
()
return
V4HostAddress
(
v4
=
ipv4addr
)
if
__name__
==
'
__main__
'
:
while
True
:
print
(
"
1. Find all containers
"
)
print
(
"
2. Find all networks
"
)
print
(
"
3. Create new network
"
)
print
(
"
4. Delete network
"
)
print
(
"
5. Allocate host by IP
"
)
print
(
"
6. Allocate host by network CIDR
"
)
print
(
"
7. Delete host by IP
"
)
print
(
"
8. Exit
"
)
choice
=
input
(
"
Enter your choice:
"
)
if
choice
==
'
1
'
:
containers
=
find_containers
()
print
(
json
.
dumps
(
containers
,
indent
=
2
))
elif
choice
==
'
2
'
:
all_networks
=
find_networks
()
print
(
json
.
dumps
(
all_networks
,
indent
=
2
))
elif
choice
==
'
3
'
:
service_type
=
input
(
"
Enter service type:
"
)
ip_version
=
int
(
input
(
"
Enter IP version (4 or 6):
"
))
if
ip_version
==
4
:
new_network
=
allocate_service_ipv4_network
(
service_type
=
service_type
)
elif
ip_version
==
6
:
new_network
=
allocate_service_ipv6_network
(
service_type
=
service_type
)
else
:
print
(
"
Invalid IP version. Please enter either 4 or 6.
"
)
continue
print
(
json
.
dumps
(
str
(
new_network
),
indent
=
2
))
elif
choice
==
'
4
'
:
network
=
input
(
"
Enter network to delete (in CIDR notation):
"
)
deleted_network
=
delete_network
(
network
=
network
)
print
(
json
.
dumps
(
str
(
deleted_network
),
indent
=
2
))
elif
choice
==
'
5
'
:
hostname
=
input
(
"
Enter host name:
"
)
ipv4addr
=
input
(
"
Enter IPv4 address to allocate:
"
)
mac
=
input
(
"
Enter mac address:
"
)
alloc_ip
=
allocate_host
(
hostname
=
hostname
,
ipv4addr
=
ipv4addr
,
mac
=
mac
)
print
(
json
.
dumps
(
str
(
alloc_ip
),
indent
=
2
))
elif
choice
==
'
6
'
:
hostname
=
input
(
"
Enter host name:
"
)
network_cidr
=
input
(
"
Enter an existing network CIDR to allocate from:
"
)
mac
=
input
(
"
Enter mac address:
"
)
alloc_ip
=
allocate_host
(
hostname
=
hostname
,
network_cidr
=
network_cidr
,
mac
=
mac
)
print
(
json
.
dumps
(
str
(
alloc_ip
),
indent
=
2
))
elif
choice
==
'
7
'
:
ipv4addr
=
input
(
"
Enter IPv4 address:
"
)
deleted_host
=
delete_host_by_ip
(
ipv4addr
=
ipv4addr
)
print
(
json
.
dumps
(
str
(
deleted_host
),
indent
=
2
))
elif
choice
==
'
8
'
:
print
(
"
Exiting...
"
)
break
else
:
print
(
"
Invalid choice. Please try again.
"
)
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment