Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
I
inventory-provider
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
geant-swd
dashboardv3
inventory-provider
Commits
4928c78a
Commit
4928c78a
authored
3 years ago
by
Robert Latta
Browse files
Options
Downloads
Patches
Plain Diff
added chorded update task and test route to trigger
parent
763d8a7f
Branches
Branches containing commit
Tags
Tags containing commit
No related merge requests found
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
inventory_provider/routes/testing.py
+6
-0
6 additions, 0 deletions
inventory_provider/routes/testing.py
inventory_provider/tasks/worker.py
+625
-2
625 additions, 2 deletions
inventory_provider/tasks/worker.py
test/test_worker.py
+301
-2
301 additions, 2 deletions
test/test_worker.py
with
932 additions
and
4 deletions
inventory_provider/routes/testing.py
+
6
−
0
View file @
4928c78a
...
@@ -18,6 +18,12 @@ routes = Blueprint("inventory-data-testing-support-routes", __name__)
...
@@ -18,6 +18,12 @@ routes = Blueprint("inventory-data-testing-support-routes", __name__)
logger
=
logging
.
getLogger
(
__name__
)
logger
=
logging
.
getLogger
(
__name__
)
@routes.route
(
"
chord-update
"
,
methods
=
[
'
GET
'
,
'
POST
'
])
def
chord_update
():
r
=
worker
.
update_entry_point
.
delay
().
get
()
return
jsonify
(
r
)
@routes.route
(
"
flushdb
"
,
methods
=
[
'
GET
'
,
'
POST
'
])
@routes.route
(
"
flushdb
"
,
methods
=
[
'
GET
'
,
'
POST
'
])
def
flushdb
():
def
flushdb
():
common
.
get_current_redis
().
flushdb
()
common
.
get_current_redis
().
flushdb
()
...
...
This diff is collapsed.
Click to expand it.
inventory_provider/tasks/worker.py
+
625
−
2
View file @
4928c78a
import
concurrent.futures
import
functools
import
functools
import
json
import
json
import
logging
import
logging
...
@@ -10,7 +11,7 @@ from typing import List
...
@@ -10,7 +11,7 @@ from typing import List
from
redis.exceptions
import
RedisError
from
redis.exceptions
import
RedisError
from
kombu.exceptions
import
KombuError
from
kombu.exceptions
import
KombuError
from
celery
import
Task
,
states
from
celery
import
Task
,
states
,
chord
from
celery.result
import
AsyncResult
from
celery.result
import
AsyncResult
from
collections
import
defaultdict
from
collections
import
defaultdict
...
@@ -23,7 +24,7 @@ from inventory_provider.tasks.app import app
...
@@ -23,7 +24,7 @@ from inventory_provider.tasks.app import app
from
inventory_provider.tasks.common
\
from
inventory_provider.tasks.common
\
import
get_next_redis
,
get_current_redis
,
\
import
get_next_redis
,
get_current_redis
,
\
latch_db
,
get_latch
,
set_latch
,
update_latch_status
,
\
latch_db
,
get_latch
,
set_latch
,
update_latch_status
,
\
ims_sorted_service_type_key
ims_sorted_service_type_key
,
set_single_latch
from
inventory_provider.tasks
import
monitor
from
inventory_provider.tasks
import
monitor
from
inventory_provider
import
config
from
inventory_provider
import
config
from
inventory_provider
import
environment
from
inventory_provider
import
environment
...
@@ -1145,3 +1146,625 @@ def check_task_status(task_id, parent=None, forget=False):
...
@@ -1145,3 +1146,625 @@ def check_task_status(task_id, parent=None, forget=False):
r
.
forget
()
r
.
forget
()
yield
result
yield
result
# =================================== chorded - currently only here for testing
# new
@app.task
(
base
=
InventoryTask
,
bind
=
True
,
name
=
'
update_entry_point
'
)
@log_task_entry_and_exit
def
update_entry_point
(
self
):
self
.
log_info
(
'
querying netdash for managed routers
'
)
routers
=
list
(
juniper
.
load_routers_from_netdash
(
InventoryTask
.
config
[
'
managed-routers
'
]))
self
.
log_info
(
f
'
found
{
len
(
routers
)
}
routers
'
)
lab_routers
=
InventoryTask
.
config
.
get
(
'
lab-routers
'
,
[])
_erase_next_db_chorded
(
InventoryTask
.
config
)
update_latch_status
(
InventoryTask
.
config
,
pending
=
True
)
tasks
=
chord
(
(
ims_task
.
s
().
on_error
(
task_error_handler
.
s
()),
chord
(
(
reload_router_config_chorded
.
s
(
r
).
on_error
(
task_error_handler
.
s
())
for
r
in
routers
),
empty_task
.
si
(
'
router tasks complete
'
)
),
chord
(
(
reload_lab_router_config_chorded
.
s
(
r
).
on_error
(
task_error_handler
.
s
())
for
r
in
lab_routers
),
empty_task
.
si
(
'
lab router tasks complete
'
)
)
),
final_task
.
si
().
on_error
(
task_error_handler
.
s
())
)()
return
tasks
# new
@app.task
def
task_error_handler
(
request
,
exc
,
traceback
):
update_latch_status
(
InventoryTask
.
config
,
pending
=
False
,
failure
=
True
)
logger
.
warning
(
'
Task {0!r} raised error: {1!r}
'
.
format
(
request
.
id
,
exc
))
# new
@app.task
(
base
=
InventoryTask
,
bind
=
True
,
name
=
'
empty_task
'
)
def
empty_task
(
self
,
message
):
logger
.
warning
(
f
'
message from empty task:
{
message
}
'
)
# updated with tramsaction
def
_erase_next_db_chorded
(
config
):
"""
flush next db, but first save latch and then restore afterwards
TODO: handle the no latch scenario nicely
:param config:
:return:
"""
r
=
get_next_redis
(
config
)
saved_latch
=
get_latch
(
r
)
if
saved_latch
:
# execute as transaction to ensure that latch is always available in
# db that is being flushed
rp
=
r
.
pipeline
()
rp
.
multi
()
rp
.
flushdb
()
set_single_latch
(
rp
,
saved_latch
[
'
this
'
],
saved_latch
[
'
current
'
],
saved_latch
[
'
next
'
],
saved_latch
.
get
(
'
timestamp
'
,
0
)
)
rp
.
execute
()
# ensure latch is consistent in all dbs
set_latch
(
config
,
new_current
=
saved_latch
[
'
current
'
],
new_next
=
saved_latch
[
'
next
'
],
timestamp
=
saved_latch
.
get
(
'
timestamp
'
,
0
))
# updated
@app.task
(
base
=
InventoryTask
,
bind
=
True
,
name
=
'
reload_lab_router_config
'
)
@log_task_entry_and_exit
def
reload_lab_router_config_chorded
(
self
,
hostname
):
self
.
log_info
(
f
'
loading netconf data for lab
{
hostname
}
RL
'
)
# load new netconf data, in this thread
netconf_str
=
retrieve_and_persist_netconf_config
(
hostname
,
lab
=
True
)
netconf_doc
=
etree
.
fromstring
(
netconf_str
.
decode
(
'
utf-8
'
))
refresh_juniper_interface_list
(
hostname
,
netconf_doc
,
lab
=
True
)
# load snmp indexes
community
=
juniper
.
snmp_community_string
(
netconf_doc
)
if
not
community
:
raise
InventoryTaskError
(
f
'
error extracting community string for
{
hostname
}
'
)
else
:
self
.
log_info
(
f
'
refreshing snmp interface indexes for
{
hostname
}
'
)
logical_systems
=
juniper
.
logical_systems
(
netconf_doc
)
# load snmp data, in this thread
snmp_refresh_interfaces
(
hostname
,
community
,
logical_systems
)
self
.
log_info
(
f
'
updated configuration for lab
{
hostname
}
'
)
# updated
@app.task
(
base
=
InventoryTask
,
bind
=
True
,
name
=
'
reload_router_config
'
)
@log_task_entry_and_exit
def
reload_router_config_chorded
(
self
,
hostname
):
self
.
log_info
(
f
'
loading netconf data for
{
hostname
}
RL
'
)
netconf_str
=
retrieve_and_persist_netconf_config
(
hostname
)
netconf_doc
=
etree
.
fromstring
(
netconf_str
.
decode
(
'
utf-8
'
))
# clear cached classifier responses for this router, and
# refresh peering data
logger
.
info
(
f
'
refreshing peers & clearing cache for
{
hostname
}
'
)
refresh_juniper_bgp_peers
(
hostname
,
netconf_doc
)
refresh_juniper_interface_list
(
hostname
,
netconf_doc
)
# load snmp indexes
community
=
juniper
.
snmp_community_string
(
netconf_doc
)
if
not
community
:
raise
InventoryTaskError
(
f
'
error extracting community string for
{
hostname
}
'
)
else
:
self
.
log_info
(
f
'
refreshing snmp interface indexes for
{
hostname
}
'
)
logical_systems
=
juniper
.
logical_systems
(
netconf_doc
)
# load snmp data, in this thread
snmp_refresh_interfaces_chorded
(
hostname
,
community
,
logical_systems
,
self
.
log_info
)
snmp_refresh_peerings_chorded
(
hostname
,
community
,
logical_systems
)
logger
.
info
(
f
'
updated configuration for
{
hostname
}
'
)
# new
def
retrieve_and_persist_netconf_config
(
hostname
,
lab
=
False
):
redis_key
=
f
'
netconf:
{
hostname
}
'
if
lab
:
redis_key
=
f
'
lab:
{
redis_key
}
'
try
:
netconf_doc
=
juniper
.
load_config
(
hostname
,
InventoryTask
.
config
[
"
ssh
"
])
netconf_str
=
etree
.
tostring
(
netconf_doc
,
encoding
=
'
unicode
'
)
except
(
ConnectionError
,
juniper
.
NetconfHandlingError
):
msg
=
f
'
error loading netconf data from
{
hostname
}
'
logger
.
exception
(
msg
)
r
=
get_current_redis
(
InventoryTask
.
config
)
netconf_str
=
r
.
get
(
redis_key
)
if
not
netconf_str
:
raise
InventoryTaskError
(
f
'
netconf error with
{
hostname
}
'
f
'
and no cached netconf data found
'
)
logger
.
info
(
f
'
Returning cached netconf data for
{
hostname
}
'
)
r
=
get_next_redis
(
InventoryTask
.
config
)
r
.
set
(
redis_key
,
netconf_str
)
logger
.
info
(
f
'
netconf info loaded from
{
hostname
}
'
)
return
netconf_str
# updated as is no longer a task
@log_task_entry_and_exit
def
snmp_refresh_interfaces_chorded
(
hostname
,
community
,
logical_systems
,
update_callback
=
lambda
s
:
None
):
try
:
interfaces
=
list
(
snmp
.
get_router_snmp_indexes
(
hostname
,
community
,
logical_systems
))
except
ConnectionError
:
msg
=
f
'
error loading snmp interface data from
{
hostname
}
'
logger
.
exception
(
msg
)
update_callback
(
msg
)
r
=
get_current_redis
(
InventoryTask
.
config
)
interfaces
=
r
.
get
(
f
'
snmp-interfaces:
{
hostname
}
'
)
if
not
interfaces
:
raise
InventoryTaskError
(
f
'
snmp error with
{
hostname
}
'
f
'
and no cached snmp interface data found
'
)
# unnecessary json encode/decode here ... could be optimized
interfaces
=
json
.
loads
(
interfaces
.
decode
(
'
utf-8
'
))
update_callback
(
f
'
using cached snmp interface data for
{
hostname
}
'
)
r
=
get_next_redis
(
InventoryTask
.
config
)
rp
=
r
.
pipeline
()
rp
.
set
(
f
'
snmp-interfaces:
{
hostname
}
'
,
json
.
dumps
(
interfaces
))
# optimization for DBOARD3-372
# interfaces is a list of dicts like: {'name': str, 'index': int}
for
ifc
in
interfaces
:
ifc
[
'
hostname
'
]
=
hostname
rp
.
set
(
f
'
snmp-interfaces-single:
{
hostname
}
:
{
ifc
[
"
name
"
]
}
'
,
json
.
dumps
(
ifc
))
rp
.
execute
()
update_callback
(
f
'
snmp interface info loaded from
{
hostname
}
'
)
# updated as is no longer a task
@log_task_entry_and_exit
def
snmp_refresh_peerings_chorded
(
hostname
,
community
,
logical_systems
,
update_callback
=
lambda
S
:
None
):
try
:
peerings
=
list
(
snmp
.
get_peer_state_info
(
hostname
,
community
,
logical_systems
))
except
ConnectionError
:
msg
=
f
'
error loading snmp peering data from
{
hostname
}
'
logger
.
exception
(
msg
)
update_callback
(
msg
)
r
=
get_current_redis
(
InventoryTask
.
config
)
peerings
=
r
.
get
(
f
'
snmp-peerings:hosts:
{
hostname
}
'
)
if
peerings
is
None
:
raise
InventoryTaskError
(
f
'
snmp error with
{
peerings
}
'
f
'
and no cached peering data found
'
)
# unnecessary json encode/decode here ... could be optimized
peerings
=
json
.
loads
(
peerings
.
decode
(
'
utf-8
'
))
update_callback
(
f
'
using cached snmp peering data for
{
hostname
}
'
)
r
=
get_next_redis
(
InventoryTask
.
config
)
r
.
set
(
f
'
snmp-peerings:hosts:
{
hostname
}
'
,
json
.
dumps
(
peerings
))
update_callback
(
f
'
snmp peering info loaded from
{
hostname
}
'
)
# new
@app.task
(
base
=
InventoryTask
,
bind
=
True
,
name
=
'
ims_task
'
)
@log_task_entry_and_exit
def
ims_task
(
self
,
use_current
=
False
):
extracted_data
=
extract_ims_data
()
transformed_data
=
transform_ims_data
(
extracted_data
)
transformed_data
[
'
locations
'
]
=
extracted_data
[
'
locations
'
]
transformed_data
[
'
lg_routers
'
]
=
extracted_data
[
'
lg_routers
'
]
persist_ims_data
(
transformed_data
,
use_current
)
# new
def
extract_ims_data
():
c
=
InventoryTask
.
config
[
"
ims
"
]
ds1
=
IMS
(
c
[
'
api
'
],
c
[
'
username
'
],
c
[
'
password
'
])
ds2
=
IMS
(
c
[
'
api
'
],
c
[
'
username
'
],
c
[
'
password
'
])
ds3
=
IMS
(
c
[
'
api
'
],
c
[
'
username
'
],
c
[
'
password
'
])
ds4
=
IMS
(
c
[
'
api
'
],
c
[
'
username
'
],
c
[
'
password
'
])
ds5
=
IMS
(
c
[
'
api
'
],
c
[
'
username
'
],
c
[
'
password
'
])
locations
=
{}
lg_routers
=
[]
customer_contacts
=
{}
circuit_ids_to_monitor
=
[]
additional_circuit_customer_ids
=
{}
hierarchy
=
{}
port_id_details
=
defaultdict
(
list
)
port_id_services
=
defaultdict
(
list
)
def
_populate_locations
():
nonlocal
locations
locations
=
{
k
:
v
for
k
,
v
in
ims_data
.
get_node_locations
(
ds1
)}
def
_populate_lg_routers
():
nonlocal
lg_routers
lg_routers
=
list
(
ims_data
.
lookup_lg_routers
(
ds5
))
def
_populate_customer_contacts
():
nonlocal
customer_contacts
customer_contacts
=
\
{
k
:
v
for
k
,
v
in
ims_data
.
get_customer_service_emails
(
ds2
)}
def
_populate_circuit_ids_to_monitor
():
nonlocal
circuit_ids_to_monitor
circuit_ids_to_monitor
=
\
list
(
ims_data
.
get_monitored_circuit_ids
(
ds3
))
def
_populate_additional_circuit_customer_ids
():
nonlocal
additional_circuit_customer_ids
additional_circuit_customer_ids
=
\
ims_data
.
get_circuit_related_customer_ids
(
ds4
)
exceptions
=
{}
with
concurrent
.
futures
.
ThreadPoolExecutor
()
as
executor
:
futures
=
{
executor
.
submit
(
_populate_locations
):
'
locations
'
,
executor
.
submit
(
_populate_lg_routers
):
'
lg_routers
'
,
executor
.
submit
(
_populate_customer_contacts
):
'
customer_contacts
'
,
executor
.
submit
(
_populate_circuit_ids_to_monitor
):
'
circuit_ids_to_monitor
'
,
executor
.
submit
(
_populate_additional_circuit_customer_ids
):
'
additional_circuit_customer_ids
'
}
for
future
in
concurrent
.
futures
.
as_completed
(
futures
):
if
future
.
exception
():
exceptions
[
futures
[
future
]]
=
str
(
future
.
exception
())
if
exceptions
:
raise
InventoryTaskError
(
json
.
dumps
(
exceptions
,
indent
=
2
))
def
_populate_hierarchy
():
nonlocal
hierarchy
hierarchy
=
{
d
[
'
id
'
]:
d
for
d
in
ims_data
.
get_circuit_hierarchy
(
ds1
)}
logger
.
debug
(
"
hierarchy complete
"
)
def
_populate_port_id_details
():
nonlocal
port_id_details
for
x
in
ims_data
.
get_port_details
(
ds2
):
pd
=
port_id_details
[
x
[
'
port_id
'
]]
pd
.
append
(
x
)
logger
.
debug
(
"
Port details complete
"
)
def
_populate_circuit_info
():
for
x
in
ims_data
.
get_port_id_services
(
ds3
):
port_id_services
[
x
[
'
port_a_id
'
]].
append
(
x
)
logger
.
debug
(
"
port circuits complete
"
)
with
concurrent
.
futures
.
ThreadPoolExecutor
()
as
executor
:
futures
=
{
executor
.
submit
(
_populate_hierarchy
):
'
hierarchy
'
,
executor
.
submit
(
_populate_port_id_details
):
'
port_id_details
'
,
executor
.
submit
(
_populate_circuit_info
):
'
circuit_info
'
}
for
future
in
concurrent
.
futures
.
as_completed
(
futures
):
if
future
.
exception
():
exceptions
[
futures
[
future
]]
=
str
(
future
.
exception
())
if
exceptions
:
raise
InventoryTaskError
(
json
.
dumps
(
exceptions
,
indent
=
2
))
return
{
'
locations
'
:
locations
,
'
lg_routers
'
:
lg_routers
,
'
customer_contacts
'
:
customer_contacts
,
'
circuit_ids_to_monitor
'
:
circuit_ids_to_monitor
,
'
additional_circuit_customer_ids
'
:
additional_circuit_customer_ids
,
'
hierarchy
'
:
hierarchy
,
'
port_id_details
'
:
port_id_details
,
'
port_id_services
'
:
port_id_services
}
# new
def
transform_ims_data
(
data
):
locations
=
data
[
'
locations
'
]
customer_contacts
=
data
[
'
customer_contacts
'
]
circuit_ids_to_monitor
=
data
[
'
circuit_ids_to_monitor
'
]
additional_circuit_customer_ids
=
data
[
'
additional_circuit_customer_ids
'
]
hierarchy
=
data
[
'
hierarchy
'
]
port_id_details
=
data
[
'
port_id_details
'
]
port_id_services
=
data
[
'
port_id_services
'
]
def
_get_circuit_contacts
(
c
):
customer_ids
=
{
c
[
'
customerid
'
]}
customer_ids
.
update
(
additional_circuit_customer_ids
.
get
(
c
[
'
id
'
],
[]))
return
set
().
union
(
*
[
customer_contacts
.
get
(
cid
,
[])
for
cid
in
customer_ids
])
for
d
in
hierarchy
.
values
():
d
[
'
contacts
'
]
=
sorted
(
list
(
_get_circuit_contacts
(
d
)))
def
_convert_to_bits
(
value
,
unit
):
unit
=
unit
.
lower
()
conversions
=
{
'
m
'
:
1
<<
20
,
'
mb
'
:
1
<<
20
,
'
g
'
:
1
<<
30
,
'
gbe
'
:
1
<<
30
,
}
return
int
(
value
)
*
conversions
[
unit
]
def
_get_speed
(
circuit_id
):
c
=
hierarchy
[
circuit_id
]
if
c
[
'
status
'
]
!=
'
operational
'
:
return
0
pattern
=
re
.
compile
(
r
'
^(\d+)([a-zA-z]+)$
'
)
m
=
pattern
.
match
(
c
[
'
speed
'
])
if
m
:
try
:
return
_convert_to_bits
(
m
[
1
],
m
[
2
])
except
KeyError
as
e
:
logger
.
debug
(
f
'
Could not find key:
{
e
}
'
f
'
for circuit:
{
circuit_id
}
'
)
return
0
else
:
if
c
[
'
circuit-type
'
]
==
'
service
'
\
or
c
[
'
product
'
].
lower
()
==
'
ethernet
'
:
return
sum
(
(
_get_speed
(
x
)
for
x
in
c
[
'
carrier-circuits
'
])
)
else
:
return
0
def
_get_fibre_routes
(
c_id
):
_circ
=
hierarchy
.
get
(
c_id
,
None
)
if
_circ
is
None
:
return
if
_circ
[
'
speed
'
].
lower
()
==
'
fibre_route
'
:
yield
_circ
[
'
id
'
]
else
:
for
cc
in
_circ
[
'
carrier-circuits
'
]:
yield
from
_get_fibre_routes
(
cc
)
def
_get_related_services
(
circuit_id
:
str
)
->
List
[
dict
]:
rs
=
{}
c
=
hierarchy
.
get
(
circuit_id
,
None
)
if
c
:
if
c
[
'
circuit-type
'
]
==
'
service
'
:
rs
[
c
[
'
id
'
]]
=
{
'
id
'
:
c
[
'
id
'
],
'
name
'
:
c
[
'
name
'
],
'
circuit_type
'
:
c
[
'
circuit-type
'
],
'
service_type
'
:
c
[
'
product
'
],
'
project
'
:
c
[
'
project
'
],
'
contacts
'
:
c
[
'
contacts
'
]
}
if
c
[
'
id
'
]
in
circuit_ids_to_monitor
:
rs
[
c
[
'
id
'
]][
'
status
'
]
=
c
[
'
status
'
]
else
:
rs
[
c
[
'
id
'
]][
'
status
'
]
=
'
non-monitored
'
if
c
[
'
sub-circuits
'
]:
for
sub
in
c
[
'
sub-circuits
'
]:
temp_parents
=
\
_get_related_services
(
sub
)
rs
.
update
({
t
[
'
id
'
]:
t
for
t
in
temp_parents
})
return
list
(
rs
.
values
())
def
_format_service
(
s
):
if
s
[
'
circuit_type
'
]
==
'
service
'
\
and
s
[
'
id
'
]
not
in
circuit_ids_to_monitor
:
s
[
'
status
'
]
=
'
non-monitored
'
pd_a
=
port_id_details
[
s
[
'
port_a_id
'
]][
0
]
location_a
=
locations
.
get
(
pd_a
[
'
equipment_name
'
],
None
)
if
location_a
:
loc_a
=
location_a
[
'
pop
'
]
else
:
loc_a
=
locations
[
'
UNKNOWN_LOC
'
][
'
pop
'
]
logger
.
warning
(
f
'
Unable to find location for
{
pd_a
[
"
equipment_name
"
]
}
-
'
f
'
Service ID
{
s
[
"
id
"
]
}
'
)
s
[
'
pop_name
'
]
=
loc_a
[
'
name
'
]
s
[
'
pop_abbreviation
'
]
=
loc_a
[
'
abbreviation
'
]
s
[
'
equipment
'
]
=
pd_a
[
'
equipment_name
'
]
s
[
'
card_id
'
]
=
''
# this is redundant I believe
s
[
'
port
'
]
=
pd_a
[
'
interface_name
'
]
s
[
'
logical_unit
'
]
=
''
# this is redundant I believe
if
'
port_b_id
'
in
s
:
pd_b
=
port_id_details
[
s
[
'
port_b_id
'
]][
0
]
location_b
=
locations
.
get
(
pd_b
[
'
equipment_name
'
],
None
)
if
location_b
:
loc_b
=
location_b
[
'
pop
'
]
else
:
loc_b
=
locations
[
'
UNKNOWN_LOC
'
][
'
pop
'
]
logger
.
warning
(
f
'
Unable to find location for
{
pd_b
[
"
equipment_name
"
]
}
-
'
f
'
Service ID
{
s
[
"
id
"
]
}
'
)
s
[
'
other_end_pop_name
'
]
=
loc_b
[
'
name
'
]
s
[
'
other_end_pop_abbreviation
'
]
=
loc_b
[
'
abbreviation
'
]
s
[
'
other_end_equipment
'
]
=
pd_b
[
'
equipment_name
'
]
s
[
'
other_end_port
'
]
=
pd_b
[
'
interface_name
'
]
else
:
s
[
'
other_end_pop_name
'
]
=
''
s
[
'
other_end_pop_abbreviation
'
]
=
''
s
[
'
other_end_equipment
'
]
=
''
s
[
'
other_end_port
'
]
=
''
s
.
pop
(
'
port_a_id
'
,
None
)
s
.
pop
(
'
port_b_id
'
,
None
)
services_by_type
=
{}
interface_services
=
defaultdict
(
list
)
for
key
,
value
in
port_id_details
.
items
():
for
details
in
value
:
k
=
f
"
{
details
[
'
equipment_name
'
]
}
:
"
\
f
"
{
details
[
'
interface_name
'
]
}
"
circuits
=
port_id_services
.
get
(
details
[
'
port_id
'
],
[])
for
circ
in
circuits
:
contacts
=
_get_circuit_contacts
(
circ
)
circ
[
'
fibre-routes
'
]
=
[]
for
x
in
set
(
_get_fibre_routes
(
circ
[
'
id
'
])):
c
=
{
'
id
'
:
hierarchy
[
x
][
'
id
'
],
'
name
'
:
hierarchy
[
x
][
'
name
'
],
'
status
'
:
hierarchy
[
x
][
'
status
'
]
}
circ
[
'
fibre-routes
'
].
append
(
c
)
circ
[
'
related-services
'
]
=
\
_get_related_services
(
circ
[
'
id
'
])
for
tlc
in
circ
[
'
related-services
'
]:
contacts
.
update
(
tlc
.
pop
(
'
contacts
'
))
circ
[
'
contacts
'
]
=
sorted
(
list
(
contacts
))
circ
[
'
calculated-speed
'
]
=
_get_speed
(
circ
[
'
id
'
])
_format_service
(
circ
)
type_services
=
services_by_type
.
setdefault
(
ims_sorted_service_type_key
(
circ
[
'
service_type
'
]),
dict
())
type_services
[
circ
[
'
id
'
]]
=
circ
interface_services
[
k
].
extend
(
circuits
)
return
{
'
hierarchy
'
:
hierarchy
,
'
interface_services
'
:
interface_services
,
'
services_by_type
'
:
services_by_type
}
# new
def
persist_ims_data
(
data
,
use_current
=
False
):
hierarchy
=
data
[
'
hierarchy
'
]
locations
=
data
[
'
locations
'
]
lg_routers
=
data
[
'
lg_routers
'
]
interface_services
=
data
[
'
interface_services
'
]
services_by_type
=
data
[
'
services_by_type
'
]
if
use_current
:
r
=
get_current_redis
(
InventoryTask
.
config
)
# only need to delete the individual keys if it's just an IMS update
# rather than a complete update (the db will have been flushed)
for
key_pattern
in
[
'
ims:location:*
'
,
'
ims:lg:*
'
,
'
ims:circuit_hierarchy:*
'
,
'
ims:interface_services:*
'
,
'
ims:access_services:*
'
,
'
ims:gws_indirect:*
'
]:
rp
=
r
.
pipeline
()
for
k
in
r
.
scan_iter
(
key_pattern
,
count
=
1000
):
rp
.
delete
(
k
)
else
:
r
=
get_next_redis
(
InventoryTask
.
config
)
rp
=
r
.
pipeline
()
for
h
,
d
in
locations
.
items
():
rp
.
set
(
f
'
ims:location:
{
h
}
'
,
json
.
dumps
([
d
]))
rp
.
execute
()
rp
=
r
.
pipeline
()
for
router
in
lg_routers
:
rp
.
set
(
f
'
ims:lg:
{
router
[
"
equipment name
"
]
}
'
,
json
.
dumps
([
router
]))
rp
.
execute
()
rp
=
r
.
pipeline
()
for
circ
in
hierarchy
.
values
():
rp
.
set
(
f
'
ims:circuit_hierarchy:
{
circ
[
"
id
"
]
}
'
,
json
.
dumps
([
circ
]))
rp
.
execute
()
rp
=
r
.
pipeline
()
for
k
,
v
in
interface_services
.
items
():
rp
.
set
(
f
'
ims:interface_services:
{
k
}
'
,
json
.
dumps
(
v
))
rp
.
execute
()
rp
=
r
.
pipeline
()
populate_poller_cache
(
interface_services
,
r
)
for
service_type
,
services
in
services_by_type
.
items
():
for
v
in
services
.
values
():
rp
.
set
(
f
'
ims:services:
{
service_type
}
:
{
v
[
"
name
"
]
}
'
,
json
.
dumps
({
'
id
'
:
v
[
'
id
'
],
'
name
'
:
v
[
'
name
'
],
'
project
'
:
v
[
'
project
'
],
'
here
'
:
{
'
pop
'
:
{
'
name
'
:
v
[
'
pop_name
'
],
'
abbreviation
'
:
v
[
'
pop_abbreviation
'
]
},
'
equipment
'
:
v
[
'
equipment
'
],
'
port
'
:
v
[
'
port
'
],
},
'
there
'
:
{
'
pop
'
:
{
'
name
'
:
v
[
'
other_end_pop_name
'
],
'
abbreviation
'
:
v
[
'
other_end_pop_abbreviation
'
]
},
'
equipment
'
:
v
[
'
other_end_equipment
'
],
'
port
'
:
v
[
'
other_end_port
'
],
},
'
speed_value
'
:
v
[
'
calculated-speed
'
],
'
speed_unit
'
:
'
n/a
'
,
'
status
'
:
v
[
'
status
'
],
'
type
'
:
v
[
'
service_type
'
]
}))
rp
.
execute
()
# new
@app.task
(
base
=
InventoryTask
,
bind
=
True
,
name
=
'
final_task
'
)
@log_task_entry_and_exit
def
final_task
(
self
):
_build_subnet_db
(
update_callback
=
self
.
log_info
)
_build_snmp_peering_db
(
update_callback
=
self
.
log_info
)
_build_juniper_peering_db
(
update_callback
=
self
.
log_info
)
latch_db
(
InventoryTask
.
config
)
self
.
log_info
(
'
latched current/next dbs
'
)
This diff is collapsed.
Click to expand it.
test/test_worker.py
+
301
−
2
View file @
4928c78a
from
inventory_provider.tasks
import
common
from
inventory_provider.tasks.worker
import
transform_ims_data
,
\
extract_ims_data
,
persist_ims_data
def
test_place_holder
():
def
test_extract_ims_data
(
mocker
):
assert
True
mocker
.
patch
(
'
inventory_provider.tasks.worker.InventoryTask.config
'
)
mocker
.
patch
(
'
inventory_provider.tasks.worker.ims_data.get_node_locations
'
,
return_value
=
[(
'
loc_a
'
,
'
LOC A
'
),
(
'
loc_b
'
,
'
LOC B
'
)]
)
mocker
.
patch
(
'
inventory_provider.tasks.worker.ims_data.lookup_lg_routers
'
,
return_value
=
[
'
lg router 1
'
,
'
lg router 2
'
]
)
mocker
.
patch
(
'
inventory_provider.tasks.worker.ims_data.get_customer_service_emails
'
,
return_value
=
[(
'
123
'
,
'
CON A
'
),
(
'
456
'
,
'
CON B
'
)]
)
mocker
.
patch
(
'
inventory_provider.tasks.worker.ims_data.get_monitored_circuit_ids
'
,
return_value
=
[
123
,
456
,
789
]
)
mocker
.
patch
(
'
inventory_provider.tasks.worker.ims_data.
'
'
get_circuit_related_customer_ids
'
,
return_value
=
[{
'
id a
'
:
[
'
A
'
,
'
A2
'
]},
{
'
id b
'
:
[
'
B
'
]}]
)
mocker
.
patch
(
'
inventory_provider.tasks.worker.ims_data.get_circuit_hierarchy
'
,
return_value
=
[
{
'
id
'
:
'
1
'
,
'
value
'
:
'
A
'
},
{
'
id
'
:
'
2
'
,
'
value
'
:
'
B
'
}
]
)
mocker
.
patch
(
'
inventory_provider.tasks.worker.ims_data.get_port_details
'
,
return_value
=
[
{
'
port_id
'
:
'
A
'
,
'
value
'
:
'
a
'
},
{
'
port_id
'
:
'
B
'
,
'
value
'
:
'
b
'
},
{
'
port_id
'
:
'
B
'
,
'
value
'
:
'
c
'
}
]
)
mocker
.
patch
(
'
inventory_provider.tasks.worker.ims_data.get_port_id_services
'
,
return_value
=
[
{
'
port_a_id
'
:
'
1
'
,
'
value
'
:
'
1A
'
},
{
'
port_a_id
'
:
'
1
'
,
'
value
'
:
'
1B
'
},
{
'
port_a_id
'
:
'
2
'
,
'
value
'
:
'
2A
'
}
]
)
res
=
extract_ims_data
()
assert
res
[
'
locations
'
]
==
{
'
loc_a
'
:
'
LOC A
'
,
'
loc_b
'
:
'
LOC B
'
}
assert
res
[
'
lg_routers
'
]
==
[
'
lg router 1
'
,
'
lg router 2
'
]
assert
res
[
'
customer_contacts
'
]
==
{
'
123
'
:
'
CON A
'
,
'
456
'
:
'
CON B
'
}
assert
res
[
'
circuit_ids_to_monitor
'
]
==
[
123
,
456
,
789
]
assert
res
[
'
additional_circuit_customer_ids
'
]
==
\
[{
'
id a
'
:
[
'
A
'
,
'
A2
'
]},
{
'
id b
'
:
[
'
B
'
]}]
assert
res
[
'
hierarchy
'
]
==
{
'
1
'
:
{
'
id
'
:
'
1
'
,
'
value
'
:
'
A
'
},
'
2
'
:
{
'
id
'
:
'
2
'
,
'
value
'
:
'
B
'
}
}
assert
res
[
'
port_id_details
'
]
==
{
'
A
'
:
[{
'
port_id
'
:
'
A
'
,
'
value
'
:
'
a
'
}],
'
B
'
:
[
{
'
port_id
'
:
'
B
'
,
'
value
'
:
'
b
'
},
{
'
port_id
'
:
'
B
'
,
'
value
'
:
'
c
'
}
]
}
assert
res
[
'
port_id_services
'
]
==
{
'
1
'
:
[
{
'
port_a_id
'
:
'
1
'
,
'
value
'
:
'
1A
'
},
{
'
port_a_id
'
:
'
1
'
,
'
value
'
:
'
1B
'
}
],
'
2
'
:
[{
'
port_a_id
'
:
'
2
'
,
'
value
'
:
'
2A
'
}]
}
def
test_transform_ims_data
():
locations
=
{
"
eq_a
"
:
{
"
pop
"
:
{
"
name
"
:
"
pop_loc_a
"
,
"
abbreviation
"
:
"
pla
"
,
}
},
"
eq_b
"
:
{
"
pop
"
:
{
"
name
"
:
"
pop_loc_b
"
,
"
abbreviation
"
:
"
plb
"
,
}
},
"
UNKNOWN_LOC
"
:
{
"
pop
"
:
{
"
name
"
:
"
UNKNOWN
"
,
"
abbreviation
"
:
"
UNKNOWN
"
,
}
}
}
additional_circuit_customer_ids
=
{
"
circ_id_1
"
:
"
cu_1_1
"
}
customer_contacts
=
{
"
cu_1
"
:
[
"
customer_1@a.org
"
],
"
cu_1_1
"
:
[
"
customer_1_1@a.org
"
]
}
port_id_details
=
{
"
port_id_1
"
:
[{
"
equipment_name
"
:
"
eq_a
"
,
"
interface_name
"
:
"
if_a
"
,
"
port_id
"
:
"
port_id_1
"
}],
"
port_id_2
"
:
[{
"
equipment_name
"
:
"
eq_b
"
,
"
interface_name
"
:
"
if_b
"
,
"
port_id
"
:
"
port_id_2
"
}]
}
port_id_services
=
{
"
port_id_1
"
:
[
{
"
id
"
:
"
circ_id_1
"
,
"
customerid
"
:
"
cu_1
"
,
"
circuit_type
"
:
"
circuit
"
,
"
service_type
"
:
"
ETHERNET
"
,
"
status
"
:
"
operational
"
,
"
port_a_id
"
:
"
port_id_1
"
,
"
port_b_id
"
:
"
port_id_2
"
,
}
],
"
port_id_2
"
:
[
{
"
id
"
:
"
circ_id_1
"
,
"
customerid
"
:
"
cu_1
"
,
"
circuit_type
"
:
"
circuit
"
,
"
service_type
"
:
"
ETHERNET
"
,
"
status
"
:
"
operational
"
,
"
port_a_id
"
:
"
port_id_2
"
,
"
port_b_id
"
:
"
port_id_1
"
,
}
]
}
hierarchy
=
{
"
circ_id_1
"
:
{
"
id
"
:
"
circ_id_1
"
,
"
name
"
:
"
circ_name_1
"
,
"
status
"
:
"
operational
"
,
"
circuit-type
"
:
"
circuit
"
,
"
product
"
:
"
ethernet
"
,
"
speed
"
:
"
not fibre_route
"
,
"
carrier-circuits
"
:
[
"
carrier_id_1
"
],
"
sub-circuits
"
:
[
"
sub_circuit_1
"
],
"
customerid
"
:
"
cu_1
"
,
},
"
carrier_id_1
"
:
{
"
id
"
:
"
carrier_id_1
"
,
"
name
"
:
"
circ_carrier_name_1
"
,
"
status
"
:
"
operational
"
,
"
circuit-type
"
:
"
circuit
"
,
"
product
"
:
"
ethernet
"
,
"
speed
"
:
"
10G
"
,
"
carrier-circuits
"
:
[
"
carrier_id_2
"
],
"
sub-circuits
"
:
[
"
circ_id_1
"
],
"
customerid
"
:
"
cu_1
"
,
},
"
carrier_id_2
"
:
{
"
id
"
:
"
carrier_id_2
"
,
"
name
"
:
"
circ_carrier_name_3
"
,
"
status
"
:
"
operational
"
,
"
circuit-type
"
:
"
circuit
"
,
"
product
"
:
"
ethernet
"
,
"
speed
"
:
"
not fibre_route
"
,
"
carrier-circuits
"
:
[
"
carrier_id_3
"
],
"
sub-circuits
"
:
[
"
carrier_id_1
"
],
"
customerid
"
:
"
cu_1
"
,
},
"
carrier_id_3
"
:
{
"
id
"
:
"
carrier_id_3
"
,
"
name
"
:
"
Fiber Route Circuit
"
,
"
status
"
:
"
operational
"
,
"
circuit-type
"
:
"
circuit
"
,
"
product
"
:
"
OCG4
"
,
"
speed
"
:
"
fibre_route
"
,
"
carrier-circuits
"
:
[],
"
sub-circuits
"
:
[
"
carrier_id_2
"
],
"
customerid
"
:
"
cu_1
"
,
},
"
sub_circuit_1
"
:
{
"
id
"
:
"
sub_circuit_1
"
,
"
name
"
:
"
sub_circuit_name_1
"
,
"
status
"
:
"
operational
"
,
"
circuit-type
"
:
"
circuit
"
,
"
product
"
:
"
ethernet
"
,
"
speed
"
:
"
not fibre_route
"
,
"
carrier-circuits
"
:
[
"
circ_id_1
"
],
"
sub-circuits
"
:
[
"
sub_circuit_2
"
],
"
customerid
"
:
"
cu_1
"
,
},
"
sub_circuit_2
"
:
{
"
id
"
:
"
sub_circuit_2
"
,
"
name
"
:
"
sub_circuit_name_2
"
,
"
status
"
:
"
operational
"
,
"
circuit-type
"
:
"
service
"
,
"
product
"
:
"
PEERING R & E
"
,
"
speed
"
:
"
not fiber route
"
,
"
project
"
:
"
Project A
"
,
"
carrier-circuits
"
:
[
"
sub_circuit_1
"
],
"
sub-circuits
"
:
[],
"
customerid
"
:
"
cu_1
"
,
}
}
data
=
{
"
locations
"
:
locations
,
"
customer_contacts
"
:
customer_contacts
,
"
circuit_ids_to_monitor
"
:
[],
"
additional_circuit_customer_ids
"
:
additional_circuit_customer_ids
,
"
hierarchy
"
:
hierarchy
,
"
port_id_details
"
:
port_id_details
,
"
port_id_services
"
:
port_id_services
}
res
=
transform_ims_data
(
data
)
ifs
=
res
[
"
interface_services
"
]
assert
list
(
ifs
.
keys
())
==
[
"
eq_a:if_a
"
,
"
eq_b:if_b
"
]
for
v
in
ifs
.
values
():
assert
len
(
v
)
==
1
assert
len
(
v
[
0
][
"
related-services
"
])
==
1
assert
v
[
0
][
"
related-services
"
][
0
][
"
id
"
]
==
"
sub_circuit_2
"
assert
len
(
v
[
0
][
"
fibre-routes
"
])
==
1
assert
v
[
0
][
"
fibre-routes
"
][
0
][
"
id
"
]
==
"
carrier_id_3
"
def
test_persist_ims_data
(
mocker
,
data_config
,
mocked_redis
):
r
=
common
.
_get_redis
(
data_config
)
mocker
.
patch
(
'
inventory_provider.tasks.worker.get_next_redis
'
,
return_value
=
r
)
data
=
{
"
locations
"
:
{
"
loc_a
"
:
"
LOC A
"
,
"
loc_b
"
:
"
LOC B
"
},
"
lg_routers
"
:
[
{
"
equipment name
"
:
"
lg_eq1
"
},
{
"
equipment name
"
:
"
lg_eq2
"
}
],
"
hierarchy
"
:
{
"
c1
"
:
{
"
id
"
:
"
123
"
},
"
c2
"
:
{
"
id
"
:
"
456
"
}},
"
interface_services
"
:
{
"
if1
"
:
[
{
"
equipment
"
:
"
eq1
"
,
"
port
"
:
"
port1
"
,
"
id
"
:
"
id1
"
,
"
name
"
:
"
name1
"
,
"
service_type
"
:
"
type1
"
,
"
status
"
:
"
operational
"
},
{
"
equipment
"
:
"
eq1
"
,
"
port
"
:
"
port2
"
,
"
id
"
:
"
id3
"
,
"
name
"
:
"
name2
"
,
"
service_type
"
:
"
type2
"
,
"
status
"
:
"
operational
"
}
],
"
if2
"
:
[
{
"
equipment
"
:
"
eq2
"
,
"
port
"
:
"
port1
"
,
"
id
"
:
"
id3
"
,
"
name
"
:
"
name3
"
,
"
service_type
"
:
"
type1
"
,
"
status
"
:
"
operational
"
}
]
},
"
services_by_type
"
:
{},
}
for
k
in
r
.
keys
(
"
ims:*
"
):
r
.
delete
(
k
)
persist_ims_data
(
data
)
assert
[
k
.
decode
(
"
utf-8
"
)
for
k
in
r
.
keys
(
"
ims:location:*
"
)]
==
\
[
"
ims:location:loc_a
"
,
"
ims:location:loc_b
"
]
assert
[
k
.
decode
(
"
utf-8
"
)
for
k
in
r
.
keys
(
"
ims:lg:*
"
)]
==
\
[
"
ims:lg:lg_eq1
"
,
"
ims:lg:lg_eq2
"
]
assert
[
k
.
decode
(
"
utf-8
"
)
for
k
in
r
.
keys
(
"
ims:circuit_hierarchy:*
"
)]
==
\
[
"
ims:circuit_hierarchy:123
"
,
"
ims:circuit_hierarchy:456
"
]
assert
[
k
.
decode
(
"
utf-8
"
)
for
k
in
r
.
keys
(
"
ims:interface_services:*
"
)]
==
\
[
"
ims:interface_services:if1
"
,
"
ims:interface_services:if2
"
]
assert
[
k
.
decode
(
"
utf-8
"
)
for
k
in
r
.
keys
(
"
poller_cache:*
"
)]
==
\
[
"
poller_cache:eq1
"
,
"
poller_cache:eq2
"
]
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment