import ipaddress import re import responses from gso.services import ipam @responses.activate def test_new_service_networks(data_config_filename): responses.add( method=responses.POST, url=re.compile(r'.*/wapi.*/network.*'), json={ '_ref': 'network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default', # noqa: E501 'network': '10.255.255.20/32' } ) responses.add( method=responses.POST, url=re.compile(r'.*/wapi.*/ipv6network.*'), json={ '_ref': 'ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default', # noqa: E501 'network': 'dead:beef::18/128' } ) service_networks = ipam.new_service_networks(service_type='TRUNK') assert service_networks == ipam.ServiceNetworks( v4=ipaddress.ip_network('10.255.255.20/32'), v6=ipaddress.ip_network('dead:beef::18/128') ) @responses.activate def test_new_service_host(data_config_filename): responses.add( method=responses.POST, url=re.compile(r'.*/wapi.*/record:host$'), json='record:host/ZG5zLmhvc3QkLm5vbl9ETlNfaG9zdF9yb290LjAuMTY4MzcwNTU4MzY3MC5nc28udGVzdA:test.lo/%20' # noqa: E501 ) responses.add( method=responses.POST, url=re.compile(r'.*/wapi.*/record:a$'), json='record:a/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS44:test.lo/default' # noqa: E501 ) responses.add( method=responses.POST, url=re.compile(r'.*/wapi.*/record:aaaa$'), json='record:aaaa/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS44:test.lo/default' # noqa: E501 ) responses.add( method=responses.GET, url=re.compile(r'.*/wapi.*/network.*'), json=[ { "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 "network": "10.255.255.20/32", "network_view": "default" } ] ) responses.add( method=responses.GET, url=re.compile(r'.*/wapi.*/ipv6network.*'), json=[ { "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 "network": "dead:beef::18/128", "network_view": "default" } ] ) responses.add( method=responses.POST, url=re.compile(r'.*/wapi.*/network.*/.*?_function=next_available_ip&num=1$'), # noqa: E501 json={'ips': ['10.255.255.20']} ) responses.add( method=responses.POST, url=re.compile(r'.*/wapi.*/ipv6network.*/.*?_function=next_available_ip&num=1$'), # noqa: E501 json={'ips': ['dead:beef::18']} ) responses.add( method=responses.POST, url=re.compile(r'.*/wapi.*/network.*_return_fields.*'), json={ '_ref': 'network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default', # noqa: E501 'network': '10.255.255.20/32' } ) responses.add( method=responses.POST, url=re.compile(r'.*/wapi.*/ipv6network.*_return_fields.*'), json={ '_ref': 'ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default', # noqa: E501 'network': 'dead:beef::18/128' } ) service_hosts = ipam.new_service_host( hostname='test', service_type='TRUNK', host_addresses=ipam.HostAddresses( v4=ipaddress.ip_address('10.255.255.20'), v6=ipaddress.ip_address('dead:beef::18') ) ) assert service_hosts == ipam.HostAddresses( v4=ipaddress.ip_address('10.255.255.20'), v6=ipaddress.ip_address('dead:beef::18') ) service_hosts = ipam.new_service_host( hostname='test', service_type='TRUNK', service_networks=ipam.ServiceNetworks( v4=ipaddress.ip_network('10.255.255.20/32'), v6=ipaddress.ip_network('dead:beef::18/128') ) ) assert service_hosts == ipam.HostAddresses( v4=ipaddress.ip_address('10.255.255.20'), v6=ipaddress.ip_address('dead:beef::18') ) service_hosts = ipam.new_service_host( hostname='test', service_type='LO' ) assert service_hosts == ipam.HostAddresses( v4=ipaddress.ip_address('10.255.255.20'), v6=ipaddress.ip_address('dead:beef::18') ) service_hosts = ipam.new_service_host( hostname='test', service_type='TRUNK' ) assert service_hosts == ipam.HostAddresses( v4=ipaddress.ip_address('10.255.255.20'), v6=ipaddress.ip_address('dead:beef::18') )