-
Robert Latta authoredRobert Latta authored
ims_data.py 9.14 KiB
import logging
import re
from collections import OrderedDict
from inventory_provider import environment
from inventory_provider.db import ims
from inventory_provider.db.ims import InventoryStatus
environment.setup_logging()
logger = logging.getLogger(__name__)
# Dashboard V3
def lookup_pop_info(ds, hostname):
site_nav_props = [
ims.SITE_PROPERTIES['City'],
ims.SITE_PROPERTIES['SiteAliases'],
ims.SITE_PROPERTIES['Country']
]
node = ds.get_entity_by_name('Node', hostname)
if not node:
return None
site = ds.get_entity_by_id('Site', node['SiteId'], site_nav_props, True)
city = site['City']
abbreviation = ''
try:
abbreviation = site['SiteAliases'][0]['AliasName']
except IndexError:
pass # no alias - ignore silently
eq = {
'equipment-name': node['Name'],
'status': InventoryStatus(node['InventoryStatusId']).name,
'pop': {
'name': site['Name'],
'city': city['Name'],
'country': city['Country']['Name'],
'abbreviation': abbreviation,
'longitude': site['Longitude'],
'latitude': site['Latitude'],
}
}
return eq
# End of Dashboard V3 stuff
INTERNAL_POP_NAMES = {
'Cambridge OC',
'DANTE Lab',
'GÉANT LAB',
'GEANT LAB',
'Amsterdam GEANT Office',
'Amsterdam GÉANT Office'
}
def lookup_lg_routers(ds):
pattern = re.compile("vpn-proxy|vrr|taas", re.IGNORECASE)
def _matching_node(node_):
if InventoryStatus(node_['inventorystatusid']) not in [
InventoryStatus.IN_SERVICE,
InventoryStatus.PLANNED # remove once data fully migrated
]:
return False
if pattern.match(node_['name']):
return False
return True
site_nav_props = [
ims.SITE_PROPERTIES['SiteAliases'],
ims.SITE_PROPERTIES['City'],
ims.SITE_PROPERTIES['Country']
]
eq_definitions = ds.get_filtered_entities(
'EquipmentDefinition',
'Name like mx',
ims.EQUIP_DEF_PROPERTIES['Nodes'])
for eq_def in eq_definitions:
nodes = eq_def['nodes']
for node in nodes:
if not _matching_node(node):
continue
site = ds.get_entity_by_id('Site', node['siteid'], site_nav_props,
True)
city = site['city']
abbreviation = ''
try:
abbreviation = site['sitealiases'][0]['aliasname']
except IndexError:
pass # no alias - ignore silently
eq = {
'equipment name': node['name'],
'type':
'INTERNAL'
if site['name'] in INTERNAL_POP_NAMES
else 'CORE',
'pop': {
'name': site['name'],
'city': city['name'],
'country': city['country']['name'],
'country code': city['country']['abbreviation'],
'abbreviation': abbreviation,
'longitude': site['longitude'],
'latitude': site['latitude'],
}
}
yield(eq)
def otrs_get_customer_company_rows(ds):
yield ['customer_id', 'name', 'street', 'zip', 'city', 'country', 'url',
'comments']
all_cus_comps = set()
for customer in ds.get_all_entities('Customer'):
all_cus_comps.add(customer['name'])
yield [customer['name'].replace(' ', ''), customer['name'],
'', '', '', '', '', '']
for vendor in ds.get_all_entities('Vendor'):
if vendor['name'] not in all_cus_comps:
all_cus_comps.add(vendor['name'])
yield [vendor['name'].replace(' ', ''), vendor['name'],
'', '', '', '', '', '']
# TODO - check the rules for validation once model has been confirmed
def _is_valid_customer(cus):
if not cus['email']:
return False
return True
def otrs_get_customer_contacts(ds):
def _get_customer_id2(t):
if t['id'] == 3 or t['name'] == 'EU NREN':
return 'OTRS-GEANT-NREN'
return ''
for customer in ds.get_all_entities(
'Customer',
[
ims.CUSTOMER_PROPERTIES['CustomerRelatedContacts'],
ims.CUSTOMER_PROPERTIES['CustomerType']
]):
if customer['customerrelatedcontacts']:
for contact in customer['customerrelatedcontacts']:
t_customer_user = OrderedDict({
'email': contact['contact']['mail'],
'username': contact['contact']['mail'], # TODO if tal_id is going to be present use that # noqa
'customer_id': customer['name'].replace(' ', ''),
'customer_id_2':
_get_customer_id2(customer['customertype']),
'title': '',
'firstname': contact['contact']['name'],
'lastname':
' '.join(filter(None, [
contact['contact']['infix'],
contact['contact']['lastname']
])),
'phone': '',
'fax': '',
'mobile': '',
'street': '',
'zip': '',
'city': '',
'country': '',
'comments': '',
})
if not _is_valid_customer(t_customer_user):
continue
yield t_customer_user
def otrs_get_vendor_contacts(ds):
for vrc in ds.get_all_entities(
'VendorRelatedContact',
[
ims.VENDOR_RELATED_CONTACT_PROPERTIES['Vendor'],
ims.VENDOR_RELATED_CONTACT_PROPERTIES['Contact']
]):
t_customer_user = OrderedDict({
'email': vrc['contact']['mail'],
'username': vrc['contact']['mail'], # TODO if tal_id is going to be present use that # noqa
'customer_id': vrc['vendor']['name'].replace(' ', ''),
'customer_id_2': '',
'title': '',
'firstname': vrc['contact']['name'],
'lastname':
' '.join(filter(None, [
vrc['contact']['infix'],
vrc['contact']['lastname']
])),
'phone': '',
'fax': '',
'mobile': '',
'street': '',
'zip': '',
'city': '',
'country': '',
'comments': '',
})
if not _is_valid_customer(t_customer_user):
continue
yield t_customer_user
def otrs_get_customer_users_rows(ds, return_duplicates=False):
yield ['email', 'username', 'customer_id', 'customer_id_2', 'title',
'firstname', 'lastname', 'phone', 'fax', 'mobile', 'street', 'zip',
'city', 'country', 'comments']
def get_all_cus_user_rows():
yielded_customer_emails = set()
for c in otrs_get_customer_contacts(ds):
yielded_customer_emails.add(c['email'])
yield c
for c in otrs_get_vendor_contacts(ds):
if c['email'] not in yielded_customer_emails:
yield c
def weed_duplicates(duplicates):
# this is here to allow for additional rules
id_rank = {
'geant': 1
}
top_rank = -1
top_ranked = None
for d in duplicates:
rank = id_rank.get(d['customer_id'].lower(), 0)
if rank > top_rank:
top_rank = rank
top_ranked = []
if rank == top_rank:
top_ranked.append(d)
return top_ranked
cus_by_email = {}
duplicate_emails = set()
for cu in get_all_cus_user_rows():
email = cu['email']
cus_for_email = cus_by_email.get(email, [])
if cus_for_email:
duplicate_emails.add(email)
cus_for_email.append(cu)
cus_by_email[email] = cus_for_email
remaining_duplicates = []
if duplicate_emails:
logger.info('Duplicate emails found in OTRS customer-user export: '
f'{duplicate_emails} - attempting to weed')
for email in duplicate_emails:
weeded = weed_duplicates(cus_by_email.pop(email))
if len(weeded) == 1:
cus_by_email[email] = weeded
else:
remaining_duplicates.extend(
sorted(
[list(w.values()) for w in weeded], key=lambda x: x[2])
)
if remaining_duplicates:
logger.error('Duplicate emails remain after weeding, '
f'{"including" if return_duplicates else "excluding"}'
' duplicates in returned data: ')
for rd in remaining_duplicates:
logger.debug(rd)
for email, details in sorted(cus_by_email.items()):
yield list(details[0].values())
if return_duplicates:
yield from remaining_duplicates