diff --git a/exabgp/run-exabgp-generic b/exabgp/run-exabgp-generic new file mode 100755 index 0000000000000000000000000000000000000000..f3ca23dfb3ea406785b19bf0cdfa6e72e2e99bc7 --- /dev/null +++ b/exabgp/run-exabgp-generic @@ -0,0 +1,106 @@ +#!/bin/bash + +if [ "$1" = "--conf" ]; then + shift 1 + + myrouter_id="$1" + shift 1 + myas="$1" + shift 1 + + shift_count=2 # used as global var + + ## + + [ -n "$myrouter_id" ] || myrouter_id="10.0.0.1" # defaults for own myrouter_id + + [ -n "$myas" ] || myas="9991" # defaults for myas + + ## + + myrouter_ip="$myrouter_id" + + echo "$0: conf: myrouter_id=$myrouter_id myrouter_ip=$myrouter_ip myas=$myas" 1>&2 + + ## + + [ $# -gt 0 ] || set -- "10.0.0.3" "9993" # defaults for neighbor_ip and neighbor_as + + echo "$0: loop args: $*" 1>&2 + + while [ $# -gt 0 ]; do + neighbor_id="$1" + shift 1 + shift_count=$(( $shift_count + 1 )) + + [ "$neighbor_id" != "--" ] || break + + neighbor_as="$1" + shift 1 + shift_count=$(( $shift_count + 1 )) + + neighbor_ip="$neighbor_id" + + #cat >> /etc/exabgp/exabgp.conf <<EOF + cat <<EOF +neighbor $neighbor_ip { + #router-id 10.0.0.1; + router-id $myrouter_id; + #local-address 10.0.0.1; + local-address $myrouter_ip; + #local-as 9991; + local-as $myas; + + #peer-as 9993; + peer-as $neighbor_as; + + #family { + # ipv4 unicast; + # #ipv4 multicast; + # #ipv4 nlri-mpls; + # #ipv4 mpls-vpn; + # #ipv4 flow; + # #ipv4 flow-vpn; + # #ipv6 unicast; + # #ipv6 flow; + # #ipv6 flow-vpn; + #} + + #static { + # route 10.10.0.0/32 next-hop self; + # route 10.100.0.0/32 next-hop self; + #} +} +EOF + + done + +else + + conf_filename="/etc/exabgp/exabgp.conf" + + shift_count=0 # global var changed by ". $0 --conf" + . "$0" --conf "$@" > "$conf_filename" + shift "${shift_count}" + + ## + + mkfifo /run/exabgp.{in,out} + + if id exabgp > /dev/null; then + chmod 0666 /run/exabgp.{in,out} + chown exabgp: /run/exabgp.{in,out} + fi + + ## + + if [ "$1" = "--bg" ]; then + shift 1 + nohup exabgp --debug "$conf_filename" "$@" > exbgp.log 1>&2 & + else + #/fod_vnet_router --mnexec h1 exabgp --debug /etc/exabgp/exabgp.conf + exec exabgp --debug "$conf_filename" "$@" + fi + +fi + diff --git a/flowspec/admin.py b/flowspec/admin.py index c7cd52c553f414a29d308526f442f4448fcb9c68..b1fdcc424b290f5f67a202b31833c505df36de13 100644 --- a/flowspec/admin.py +++ b/flowspec/admin.py @@ -21,7 +21,7 @@ from django.contrib import admin from flowspec.models import MatchPort, MatchDscp, MatchProtocol, FragmentType, ThenAction, Route from flowspec.forms import * from accounts.models import UserProfile -from utils import proxy as PR +from utils.proxy import PR0 as PR from django.contrib.auth.models import User from django.contrib.auth.admin import UserAdmin from peers.models import * diff --git a/flowspec/models.py b/flowspec/models.py index 255d52067a9eb9b589909197cc33ae97e7482e0b..2b0d3b1b0a464d0428c1c79336b7e5de788f5169 100644 --- a/flowspec/models.py +++ b/flowspec/models.py @@ -26,14 +26,14 @@ from django.urls import reverse from flowspec.tasks import * from flowspec.helpers import send_new_mail, get_peer_techc_mails -from utils import proxy as PR +from utils.proxy import PR0 as PR from ipaddress import * from ipaddress import ip_network import datetime import json from peers.models import PeerRange, Peer -from flowspec.junos import create_junos_name +from utils.rule_spec_utils import create_junos_name #import flowspec.iprange_match from flowspec.iprange_match import find_matching_peer_by_ipprefix__simple @@ -327,6 +327,9 @@ class Route(models.Model): def commit_edit(self, *args, **kwargs): peers = self.applier.userprofile.peers.all() + route_original = kwargs['route_original'] + logger.info("models::commit_edit(): self="+str(self)+" route_original="+str(route_original)+" kwargs="+str(kwargs)) + #username = None #for peer in peers: # if username: @@ -344,7 +347,7 @@ class Route(models.Model): peer = None send_message('[%s] Editing rule %s. Please wait...' % (self.applier_username_nice, self.name_visible), peer, self) - response = edit.delay(self.pk) + response = edit.delay(self.pk, route_original) logger.info('Got edit job id: %s' % response) if not settings.DISABLE_EMAIL_NOTIFICATION: fqdn = Site.objects.get_current().domain diff --git a/flowspec/serializers.py b/flowspec/serializers.py index 5d6bff6fced2d4809cd4c6904c8779ddcd233397..d8629e2b9c189fd4309be7a35c9a4e6394bd2631 100644 --- a/flowspec/serializers.py +++ b/flowspec/serializers.py @@ -85,23 +85,30 @@ class RouteSerializer(serializers.HyperlinkedModelSerializer): #def validate_source(self, attrs, source): def validate_source(self, source): - user = self.context.get('request').user - source_ip = source #attrs.get('source') - res = clean_source(user, source_ip) - if res != source_ip: - raise serializers.ValidationError(res) - #return attrs - return res + request = self.context.get('request') + if request==None: + return source + else: + user = request.user + res = clean_source(user, source) + if res != source: + raise serializers.ValidationError(res) + #return attrs + return res #def validate_destination(self, attrs, source): def validate_destination(self, destination): - user = self.context.get('request').user - #destination = attrs.get('destination') - res = clean_destination(user, destination) - if res != destination: + request = self.context.get('request') + if request==None: + return destination + else: + user = request.user + #destination = attrs.get('destination') + res = clean_destination(user, destination) + if res != destination: raise serializers.ValidationError(res) - #return attrs - return res + #return attrs + return res #def validate_expires(self, attrs, source): def validate_expires(self, expires): diff --git a/flowspec/snmpstats.py b/flowspec/snmpstats.py index b47a831150c2c18c4f78e23f5c7743d831803168..cf51ac07894d59b27583ac98e7795b321041e87f 100644 --- a/flowspec/snmpstats.py +++ b/flowspec/snmpstats.py @@ -25,7 +25,7 @@ import os import time from flowspec.models import Route -from flowspec.junos import create_junos_name +from utils.rule_spec_utils import create_junos_name import flowspec.logging_utils logger = flowspec.logging_utils.logger_init_default(__name__, "celery_snmpstats.log", False) diff --git a/flowspec/tasks.py b/flowspec/tasks.py index e426203ca9787732a4ebbe3110b371be18a6f8dd..2ac1f4743467164cf9a89483aeb2c4dd770ef43a 100644 --- a/flowspec/tasks.py +++ b/flowspec/tasks.py @@ -18,7 +18,7 @@ # import pytest -from utils import proxy as PR +from utils.proxy import PR0 as PR from celery import shared_task, subtask import json from django.conf import settings @@ -29,11 +29,14 @@ from celery.exceptions import TimeLimitExceeded, SoftTimeLimitExceeded from ipaddress import * from os import fork,_exit import os +import io from sys import exit import time import redis from django.forms.models import model_to_dict +#from flowspec.serializers import RouteSerializer + from peers.models import * import flowspec.logging_utils @@ -48,8 +51,9 @@ rule_changelog_logger.setLevel(logging.INFO) def add(routepk, callback=None): from flowspec.models import Route route = Route.objects.get(pk=routepk) - applier = PR.Applier(route_object=route) - commit, response = applier.apply() + #applier = PR.Applier(route_object=route) + applier = PR.Applier(route_object=route, route_objects_all=Route.objects.all()) + commit, response, response_lowlevel = applier.apply() if commit: route.status = "ACTIVE" #snmp_add_initial_zero_value.delay(str(route.id), True) @@ -67,13 +71,33 @@ def add(routepk, callback=None): @shared_task(ignore_result=True, autoretry_for=(TimeLimitExceeded, SoftTimeLimitExceeded), retry_backoff=True, retry_kwargs={'max_retries': settings.NETCONF_MAX_RETRY_BEFORE_ERROR}) -def edit(routepk, callback=None): +def edit(routepk, route_original__data, callback=None): from flowspec.models import Route route = Route.objects.get(pk=routepk) status_pre = route.status logger.info("tasks::edit(): route="+str(route)+", status_pre="+str(status_pre)) - applier = PR.Applier(route_object=route) - commit, response = applier.apply(operation="replace") + logger.info("tasks::edit(): route_original__data_type="+str(type(route_original__data))+" route_original__data="+str(route_original__data)) + + from flowspec.serializers import RouteSerializer + + ##data = json.loads(route_original__data) + #logger.info("data="+str(route_original__data)) + ##route_original__serializer = RouteSerializer(data=route_original__data, context={'request': request}) + #route_original__serializer = RouteSerializer(data=route_original__data) + #logger.info("route_original__serializer="+str(route_original__serializer)) + #route_original__serializer.is_valid() + #logger.info("route_original__serializer.is_valid="+str(route_original__serializer.is_valid())) + ##route_original__object = route_original__serializer.validated_data; + #route_original__object = route_original__serializer.create(route_original__serializer.validated_data); + #logger.info("route_original__object="+str(route_original__object)) + #logger.info("route_original__serializer=.dir="+str(dir(route_original__serializer))) + + #route_original__object = route_original__serializer.deserialize("json", route_original__data) + #logger.info("tasks::edit(): route_original__object_type="+str(type(route_original__object))+" route_original__object="+str(route_original__object)) + + #applier = PR.Applier(route_object=route) + applier = PR.Applier(route_object=route, route_objects_all=Route.objects.all(), route_object_original=route_original__data) + commit, response, response_lowlevel = applier.apply(operation="replace") if commit: route.status = "ACTIVE" try: @@ -112,9 +136,10 @@ def deactivate_route(routepk, **kwargs): announce("[%s] Suspending rule : %s. %sPlease wait..." % (route.applier_username_nice, route.name_visible, reason_text), route.applier, route) - applier = PR.Applier(route_object=route) + #applier = PR.Applier(route_object=route) + applier = PR.Applier(route_object=route, route_objects_all=Route.objects.all()) # Delete from router via NETCONF - commit, response = applier.apply(operation="delete") + commit, response, response_lowlevel = applier.apply(operation="delete") #reason_text = '' logger.info("tasks::deactivate_route(): commit="+str(commit)) if commit: @@ -190,9 +215,10 @@ def batch_delete(routes, **kwargs): if routes: for route in routes: route.status = 'PENDING';route.save() - applier = PR.Applier(route_objects=routes) + #applier = PR.Applier(route_objects=routes) + applier = PR.Applier(route_objects=routes, route_objects_all=Route.objects.all()) conf = applier.delete_routes() - commit, response = applier.apply(configuration=conf) + commit, response, response_lowlevel = applier.apply(configuration=conf) reason_text = '' if commit: status = "INACTIVE" diff --git a/flowspec/views.py b/flowspec/views.py index 17bc9c3ef53085450168303398cd697f8c7a421a..26036fdb24afe0527fd3f0367dd4d8383a32dadd 100644 --- a/flowspec/views.py +++ b/flowspec/views.py @@ -41,6 +41,8 @@ from flowspec.forms import * from flowspec.models import * from flowspec.model_utils import convert_container_to_queryset +from flowspec.serializers import RouteSerializer + from peers.models import * from django_registration.backends.activation.views import RegistrationView @@ -57,6 +59,7 @@ import datetime import flowspec.iprange_match from urllib.parse import urlencode + ############################################################################# ############################################################################# @@ -565,7 +568,11 @@ def edit_route(request, route_slug): route.save() if bool(set(changed_data) & set(critical_changed_values)) or (not route_original.status == 'ACTIVE'): form.save_m2m() - route.commit_edit() + + route_original__serializer = RouteSerializer(route_original) + logger.info("views::edit(): route_original="+str(route_original)) + route.commit_edit(route_original=route_original__serializer.data) + return HttpResponseRedirect(reverse("group-routes")) else: if not request.user.is_superuser: @@ -1222,7 +1229,7 @@ def routedetails(request, route_slug): @login_required def routestats(request, route_slug): route = get_object_or_404(Route, name=route_slug) - import flowspec.junos + import utils.rule_spec_utils import time res = {} try: diff --git a/flowspy/settings.py.dist b/flowspy/settings.py.dist index 6ecead596488b46e7c465a92332ec8731c919976..a5c265e98fa5fa4359004583b7aafe274d9a91a6 100644 --- a/flowspy/settings.py.dist +++ b/flowspy/settings.py.dist @@ -646,4 +646,10 @@ ENABLE_SETUP_VIEW = False ############################################################################## ############################################################################## +#PROXY_CLASS="proxy_netconf_junos" +PROXY_CLASS="proxy_exabgp" + +############################################################################## +############################################################################## + from flowspy.settings_local import * diff --git a/utils/exabgpcli.py b/utils/exabgpcli.py new file mode 100644 index 0000000000000000000000000000000000000000..379c6b034f03733029f6d5618926d6c931999b4d --- /dev/null +++ b/utils/exabgpcli.py @@ -0,0 +1,410 @@ +# -*- coding: utf-8 -*- vim:fileencoding=utf-8: +# vim: tabstop=4:shiftwidth=4:softtabstop=4:expandtab + +# /srv/venv/lib/python3.11/site-packages/exabgp/application/cli.py + +import os +import sys +import time +import select +import signal +import errno + +from exabgp.application.bgp import root_folder +from exabgp.application.bgp import named_pipe +from exabgp.application.bgp import get_envfile +from exabgp.application.bgp import get_env +from exabgp.application.control import check_fifo + +from exabgp.reactor.network.error import error +from exabgp.reactor.api.response.answer import Answer + +from exabgp.vendoring import docopt + + +from django.conf import settings +#from exabgp.application.cli import * + +import flowspec.logging_utils +logger = flowspec.logging_utils.logger_init_default(__name__, "celery_exabpg.log", False) + +## + +#print("loading exabgpcli") + +## + +class CustomError(Exception): + pass + +def open_reader(recv): + def open_timeout(signum, frame): + #sys.stderr.write('could not connect to read response from ExaBGP\n') + #sys.stderr.flush() + #sys.exit(1) + #return 1, 'could not connect to read response from ExaBGP\n', None + raise CustomError("could not connect to read response from ExaBGP") + + signal.signal(signal.SIGALRM, open_timeout) + signal.alarm(5) + + done = False + while not done: + try: + reader = os.open(recv, os.O_RDONLY | os.O_NONBLOCK) + done = True + except CustomError as exc: + return 1, str(exc), None + except IOError as exc: + if exc.args[0] in errno_block: + signal.signal(signal.SIGALRM, open_timeout) + signal.alarm(5) + continue + #sys.stdout.write('could not read answer from ExaBGP') + #sys.stdout.flush() + #sys.exit(1) + return 1, 'could not read answer from ExaBGP', None + signal.alarm(0) + return 0, "", reader + + +def open_writer(send): + def write_timeout(signum, frame): + #sys.stderr.write('could not send command to ExaBGP (command timeout)') + #sys.stderr.flush() + #sys.exit(1) + #return 1, 'could not send command to ExaBGP (command timeout)', None + raise CustomError("could not send command to ExaBGP (command timeout)") + + signal.signal(signal.SIGALRM, write_timeout) + signal.alarm(5) + + try: + writer = os.open(send, os.O_WRONLY) + except CustomError as exc: + return 1, str(exc), None + except OSError as exc: + if exc.errno == errno.ENXIO: + #sys.stdout.write('ExaBGP is not running / using the configured named pipe') + #sys.stdout.flush() + #sys.exit(1) + return 1, 'ExaBGP is not running / using the configured named pipe', None + #sys.stdout.write('could not communicate with ExaBGP') + #sys.stdout.flush() + #sys.exit(1) + return 1, 'could not communicate with ExaBGP', None + except IOError as exc: + #sys.stdout.write('could not communicate with ExaBGP') + #sys.stdout.flush() + #sys.exit(1) + return 1, 'could not communicate with ExaBGP', None + + signal.alarm(0) + return 0, "", writer + +## + +usage = """\ +The BGP swiss army knife of networking + +usage: exabgpcli [--root ROOT] +\t\t\t\t\t\t\t\t [--help|<command>...] +\t\t\t\t\t\t\t\t [--env ENV] + +positional arguments: +\tcommand valid exabgpcli command (see below) + +optional arguments: +\t--env ENV, -e ENV environment configuration file +\t--help, -h exabgp manual page +\t--root ROOT, -f ROOT root folder where etc,bin,sbin are located + +commands: +\thelp show the commands known by ExaBGP +""".replace( + '\t', ' ' +) + + +# adapted from exabgp.application.cli.main +def exabgp_interaction(command_argv): + + try: + + #print("debug0") + options = docopt.docopt(usage, help=False, argv = command_argv) + #print("debug1") + + if options['--env'] is None: + options['--env'] = '' + + root = root_folder(options, ['/bin/exabgpcli', '/sbin/exabgpcli', '/lib/exabgp/application/cli.py']) + prefix = '' if root == '/usr' else root + etc = prefix + '/etc/exabgp' + envfile = get_envfile(options, etc) + env = get_env(envfile) + pipename = env['api']['pipename'] + + if options['--help']: + logger.error(usage) + #sys.stdout.flush() + #sys.exit(0) + return 0, "" + + if not options['<command>']: + logger.error(usage) + #sys.stdout.flush() + #sys.exit(0) + return 0, "no command given" + + command = ' '.join(options['<command>']) + + pipes = named_pipe(root, pipename) + if len(pipes) != 1: + msg1 = 'could not find ExaBGP\'s named pipes (%s.in and %s.out) for the cli\n' % (pipename, pipename) + logger.error(msg1) + logger.error('we scanned the following folders (the number is your PID):\n - ') + logger.error('\n - '.join(pipes)) + #sys.stdout.flush() + #sys.exit(1) + return 1, msg1 + + send = pipes[0] + pipename + '.in' + recv = pipes[0] + pipename + '.out' + + if not check_fifo(send): + msg1 = 'could not find write named pipe to connect to ExaBGP' + logger.error(msg1) + #sys.stdout.flush() + #sys.exit(1) + return 1, msg1 + + if not check_fifo(recv): + msg1 = 'could not find read named pipe to connect to ExaBGP' + logger.error(msg1) + #sys.stdout.flush() + #sys.exit(1) + return 1, msg1 + + #reader = open_reader(recv) + status, msg1, reader = open_reader(recv) + if status!=0: + return 1, msg1 + + rbuffer = b'' + start = time.time() + while True: + try: + while select.select([reader], [], [], 0) != ([], [], []): + rbuffer += os.read(reader, 4096) + rbuffer = rbuffer[-AnswerStream.buffer_size :] + except IOError as exc: + if exc.errno in error.block: + continue + msg1 = 'could not clear named pipe from potential previous command data (%s)' % str(exc) + logger.error(msg1) + #sys.stdout.flush() + #sys.exit(1) + return 1, msg1 + except OSError as exc: + if exc.errno in error.block: + continue + msg1 = 'could not clear named pipe from potential previous command data (%s)' % str(exc) + logger.error(msg1) + logger.error(exc) + #sys.stdout.flush() + #sys.exit(1) + return 1, msg1 + + # we are not ack'ing the command and probably have read all there is + if time.time() > start + 1.5: + break + + # we read nothing, nothing to do + if not rbuffer: + break + + # we read some data but it is not ending by a new line (ie: not a command completion) + if rbuffer[-1] != 10: # \n + continue + if AnswerStream.done.endswith(rbuffer[-len(AnswerStream.done) :]): + break + if AnswerStream.error.endswith(rbuffer[-len(AnswerStream.error) :]): + break + if AnswerStream.shutdown.endswith(rbuffer[-len(AnswerStream.shutdown) :]): + break + + renamed = [''] + + for pos, token in enumerate(command.split()): + for nickname, name, match in ( + ('a', 'announce', lambda pos, pre: pos == 0 or pre.count('.') == 3 or pre.count(':') != 0), + ('a', 'attributes', lambda pos, pre: pre[-1] == 'announce' or pre[-1] == 'withdraw'), + ('c', 'configuration', lambda pos, pre: True), + ('e', 'eor', lambda pos, pre: pre[-1] == 'announce'), + ('e', 'extensive', lambda _, pre: 'show' in pre), + ('f', 'flow', lambda pos, pre: pre[-1] == 'announce' or pre[-1] == 'withdraw'), + ('f', 'flush', lambda pos, pre: pos == 0 or pre.count('.') == 3 or pre.count(':') != 0), + ('h', 'help', lambda pos, pre: pos == 0), + ('i', 'in', lambda pos, pre: pre[-1] == 'adj-rib'), + ('n', 'neighbor', lambda pos, pre: pos == 0 or pre[-1] == 'show'), + ('r', 'route', lambda pos, pre: pre == 'announce' or pre == 'withdraw'), + ('rr', 'route-refresh', lambda _, pre: pre == 'announce'), + ('s', 'show', lambda pos, pre: pos == 0), + ('t', 'teardown', lambda pos, pre: pos == 0 or pre.count('.') == 3 or pre.count(':') != 0), + ('s', 'summary', lambda pos, pre: pos != 0), + ('v', 'vps', lambda pos, pre: pre[-1] == 'announce' or pre[-1] == 'withdraw'), + ('o', 'operation', lambda pos, pre: pre[-1] == 'announce'), + ('o', 'out', lambda pos, pre: pre[-1] == 'adj-rib'), + ('a', 'adj-rib', lambda pos, pre: pre[-1] in ['clear', 'flush', 'show']), + ('w', 'withdraw', lambda pos, pre: pos == 0 or pre.count('.') == 3 or pre.count(':') != 0), + ('w', 'watchdog', lambda pos, pre: pre[-1] == 'announce' or pre[-1] == 'withdraw'), + ('neighbour', 'neighbor', lambda pos, pre: True), + ('neigbour', 'neighbor', lambda pos, pre: True), + ('neigbor', 'neighbor', lambda pos, pre: True), + ): + if (token == nickname or name.startswith(token)) and match(pos, renamed): + renamed.append(name) + break + else: + renamed.append(token) + + sending = ' '.join(renamed).strip() + + # This does not change the behaviour for well formed command + if sending != command: + print('command: %s' % sending) + + #writer = open_writer(send) + status, msg1, writer = open_writer(send) + if status!=0: + return 1, msg1 + + try: + os.write(writer, sending.encode('utf-8') + b'\n') + os.close(writer) + except IOError as exc: + msg1 = 'could not send command to ExaBGP (%s)' % str(exc) + logger.error(msg1) + #sys.stdout.flush() + #sys.exit(1) + return 1, msg1 + except OSError as exc: + msg1 = 'could not send command to ExaBGP (%s)' % str(exc) + logger.error(msg1) + #sys.stdout.flush() + #sys.exit(1) + return 1, msg1 + + if command == 'reset': + #sys.exit(0) + return 0, "reset done" + + string_all="" + + waited = 0.0 + buf = b'' + done = False + done_time_diff = 0.5 + while not done: + try: + r, _, _ = select.select([reader], [], [], 0.01) + except OSError as exc: + if exc.errno in error.block: + continue + msg1 = 'could not get answer from ExaBGP (%s)' % str(exc) + logger.error(msg1) + #sys.stdout.flush() + #sys.exit(1) + return 1, msg1 + except IOError as exc: + if exc.errno in error.block: + continue + msg1 = 'could not get answer from ExaBGP (%s)' % str(exc) + logger.error(msg1) + #sys.stdout.flush() + #sys.exit(1) + return 1, msg1 + + if waited > 5.0: + logger.warn('\n') + msg1 = 'warning: no end of command message received' + logger.warn(msg1+"\n") + logger.warn( + 'warning: normal if exabgp.api.ack is set to false otherwise some data may get stuck on the pipe\n' + ) + logger.warn('warning: otherwise it may cause exabgp reactor to block\n') + #sys.exit(0) + return 0, msg1 + elif not r: + waited += 0.01 + continue + else: + waited = 0.0 + + try: + raw = os.read(reader, 4096) + except OSError as exc: + if exc.errno in error.block: + continue + msg1 = 'could not read answer from ExaBGP (%s)' % str(exc) + logger.error(msg1) + #sys.stdout.flush() + #sys.exit(1) + return 1, msg1 + except IOError as exc: + if exc.errno in error.block: + continue + msg1 = 'could not read answer from ExaBGP (%s)' % str(exc) + logger.error(msg1) + #sys.stdout.flush() + #sys.exit(1) + return 1, msg1 + + buf += raw + while b'\n' in buf: + line, buf = buf.split(b'\n', 1) + string = line.decode() + if string == Answer.done: + done = True + break + if string == Answer.shutdown: + logger.warn('ExaBGP is shutting down, command aborted\n') + sys.stderr.flush() + done = True + break + if string == Answer.error: + done = True + logger.warn('ExaBGP returns an error (see ExaBGP\'s logs for more information)\n') + logger.warn('use help for a list of available commands\n') + sys.stderr.flush() + break + + logger.info('exabgp output: %s' % string) + + if string_all=="": + string_all = string + else: + string_all = string_all+"\n"+string + #sys.stdout.flush() + + if not env.get('api').get('ack') and not raw.decode(): + this_moment = time.time() + recv_epoch_time = os.path.getmtime(recv) + time_diff = this_moment - recv_epoch_time + if time_diff >= done_time_diff: + done = True + + try: + os.close(reader) + except Exception: + pass + + #sys.exit(0) + return 0, string_all + + except Exception as e: + logger.error("exabgpcli::exabgp_interaction(): got exception="+str(e), exc_info=True) + return 1, "got exception "+str(e) + + diff --git a/utils/proxy.py b/utils/proxy.py index 585b69dd85a3973846999a83d09851b65bf09628..8de6502610b7d0cdea6e1d11c59a62c585fd1044 100644 --- a/utils/proxy.py +++ b/utils/proxy.py @@ -1,428 +1,11 @@ # -*- coding: utf-8 -*- vim:fileencoding=utf-8: # vim: tabstop=4:shiftwidth=4:softtabstop=4:expandtab -# Copyright (C) 2010-2014 GRNET S.A. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -# - -from . import jncdevice as np -from ncclient import manager -from ncclient.transport.errors import AuthenticationError, SSHError -from ncclient.operations.rpc import RPCError -from lxml import etree as ET from django.conf import settings -import logging, os -from django.core.cache import cache -import redis -from celery.exceptions import TimeLimitExceeded, SoftTimeLimitExceeded -from .portrange import parse_portrange -import traceback -from ipaddress import ip_network -#import xml.etree.ElementTree as ET - -import flowspec.logging_utils -logger = flowspec.logging_utils.logger_init_default(__name__, "celery_netconf.log", False) - -cwd = os.getcwd() - -def fod_unknown_host_cb(host, fingerprint): - return True - - -class Retriever(object): - def __init__(self, device=settings.NETCONF_DEVICE, username=settings.NETCONF_USER, password=settings.NETCONF_PASS, filter=settings.ROUTES_FILTER, port=settings.NETCONF_PORT, route_name=None, xml=None): - self.device = device - self.username = username - self.password = password - self.port = port - self.filter = filter - self.xml = xml - if route_name: - #self.filter = settings.ROUTE_FILTER%route_name - self.filter = settings.ROUTE_FILTER.replace("%s", route_name) # allow for varying number-of, multiple instances of %s - - def fetch_xml(self): - with manager.connect(host=self.device, port=self.port, username=self.username, password=self.password, hostkey_verify=False) as m: - xmlconfig = m.get_config(source='running', filter=('subtree',self.filter)).data_xml - return xmlconfig - - def get_xml(self): - if self.xml: - xmlconfig = self.xml - else: - xmlconfig = self.fetch_xml() - return xmlconfig - - def proccess_xml(self): - xmlconfig = self.get_xml(); - parser = np.Parser() - parser.confile = xmlconfig - device = parser.export() - return device - - def proccess_xml_generic(self): - xmlconfig = self.get_xml(); - root = ET.fromstring(xmlconfig) - return root - - def fetch_device(self): - device = cache.get("device") - logger.info("[CACHE] hit! got device") - if device: - return device - else: - device = self.proccess_xml() - if device.routing_options: - cache.set("device", device, 3600) - logger.info("[CACHE] miss, setting device") - return device - else: - return False - - -class Applier(object): - def __init__(self, route_objects=[], route_object=None, device=settings.NETCONF_DEVICE, username=settings.NETCONF_USER, password=settings.NETCONF_PASS, port=settings.NETCONF_PORT): - self.route_object = route_object - self.route_objects = route_objects - self.device = device - self.username = username - self.password = password - self.port = port - - def helper_fill_source_and_destination_to_xml(self, route_obj, route, is_ipv4): - - if route_obj.source: - if is_ipv4: - logger.info("source ipv4") - route.match['source'].append(route_obj.source) - else: - logger.info("source ipv6") - route.match['source-v6'].append(route_obj.source) - - if route_obj.destination: - if is_ipv4: - logger.info("destination ipv4") - route.match['destination'].append(route_obj.destination) - else: - logger.info("destination ipv6") - route.match['destination-v6'].append(route_obj.destination) - - def to_xml(self, operation=None): - logger.info("Operation: %s"%operation) - - if self.route_object: - - try: - settings.PORTRANGE_LIMIT - except: - settings.PORTRANGE_LIMIT = 100 - logger.info("Generating XML config") - - route_obj = self.route_object - - is_ipv4 = self.route_object.is_ipv4() - logger.info("proxy::to_xml(): is_ipv4="+str(is_ipv4)) - - device = np.Device() - flow = np.Flow(is_ipv4) - route = np.Route() - flow.routes.append(route) - device.routing_options.append(flow) - route.name = route_obj.name - - if operation == "delete": - logger.info("Requesting a delete operation") - route.operation = operation - device = device.export(netconf_config=True) - return ET.tostring(device) - - self.helper_fill_source_and_destination_to_xml(route_obj, route, is_ipv4) - - try: - if route_obj.protocol: - for protocol in route_obj.protocol.all(): - route.match['protocol'].append(protocol.protocol) - except: - pass - try: - ports = [] - if route_obj.port: - portrange = str(route_obj.port) - for port in portrange.split(","): - route.match['port'].append(port) - except: - pass - try: - ports = [] - if route_obj.destinationport: - portrange = str(route_obj.destinationport) - for port in portrange.split(","): - route.match['destination-port'].append(port) - except: - pass - try: - if route_obj.sourceport: - portrange = str(route_obj.sourceport) - for port in portrange.split(","): - route.match['source-port'].append(port) - except: - pass - if route_obj.icmpcode: - route.match['icmp-code'].append(route_obj.icmpcode) - if route_obj.icmptype: - route.match['icmp-type'].append(route_obj.icmptype) - if route_obj.tcpflag: - route.match['tcp-flags'].append(route_obj.tcpflag) - try: - if route_obj.dscp: - for dscp in route_obj.dscp.all(): - route.match['dscp'].append(dscp.dscp) - except: - pass - - try: - if route_obj.fragmenttype: - for frag in route_obj.fragmenttype.all(): - route.match['fragment'].append(frag.fragmenttype) - except: - pass - - for thenaction in route_obj.then.all(): - if thenaction.action_value: - route.then[thenaction.action] = thenaction.action_value - else: - route.then[thenaction.action] = True - if operation == "replace": - logger.info("Requesting a replace operation") - route.operation = operation - device = device.export(netconf_config=True) - result = ET.tostring(device) - logger.info("result="+str(result)) - return result - else: - return False - - def delete_routes(self): - if self.route_objects: - logger.info("Generating XML config") - device = np.Device() - flow = np.Flow() - for route_object in self.route_objects: - route_obj = route_object - route = np.Route() - flow.routes.append(route) - route.name = route_obj.name - route.operation = 'delete' - device.routing_options.append(flow) - device = device.export(netconf_config=True) - return ET.tostring(device) - else: - return False - - def get_route_name(self): - route_name=None - if self.route_object: - # support for dummy route_object as dicts - if isinstance(self.route_object, dict): - route_name = self.route_object["name"] - else: - route_name = self.route_object.name - - return route_name - - def get_existing_config_xml(self): - route_name = self.get_route_name() - logger.info("get_existing_config_xml(): route_name="+str(route_name)) - retriever0 = Retriever(xml=None, route_name=route_name) - config_xml_running = retriever0.fetch_xml() - #logger.info("proxy::get_existing_config(): config_xml_running="+str(config_xml_running)) - return config_xml_running - - def get_existing_config_xml_generic(self): - route_name = self.get_route_name() - logger.info("get_existing_config_xml_generic(): route_name="+str(route_name)) - retriever0 = Retriever(xml=None, route_name=route_name) - config_xml_running = retriever0.proccess_xml_generic() - #logger.info("proxy::get_existing_config(): config_xml_running="+str(config_xml_running)) - return config_xml_running - - def get_existing_config(self): - route_name = self.get_route_name() - logger.info("get_existing_config_xml(): route_name="+str(route_name)) - retriever0 = Retriever(xml=None) - config_parsed = retriever0.proccess_xml() - #logger.info("proxy::get_existing_config(): config_parsed="+str(config_parsed)) - return config_parsed - - def get_existing_routes(self): - #config_parsed = self.get_existing_config_xml() - config_parsed = self.get_existing_config_xml_generic() - if True: - routes_existing = [] - logger.info("config_parsed="+str(config_parsed)) - #logger.info("config_parsed="+str(ET.dump(config_parsed))) - #flow = config_parsed.routing_options[0] - #for route in config_parsed.iter('ns1:route'): - for route in config_parsed.findall(".//{http://xml.juniper.net/xnm/1.1/xnm}route"): - logger.info("proxy::get_existing_routes(): found route="+str(route)) - routes_existing.append(route) - return routes_existing - else: - logger.info("proxy::get_existing_routes(): no routing_options or is empty") - return [] - - def get_existing_route_names(self): - routes_existing = self.get_existing_routes() - #route_ids_existing = [route.name for route in routes_existing] - #route_ids_existing = [ET.SubElement(route, './/{http://xml.juniper.net/xnm/1.1/xnm}name') for route in routes_existing] - route_ids_existing = [route.find('.//{http://xml.juniper.net/xnm/1.1/xnm}name').text for route in routes_existing] - logger.info("proxy::get_existing_route_names(): config_parsed.flow.routes.ids="+str(route_ids_existing)) - return route_ids_existing - - - def apply(self, configuration = None, operation=None): - reason = None - if not configuration: - configuration = self.to_xml(operation=operation) - edit_is_successful = False - commit_confirmed_is_successful = False - commit_is_successful = False - r = redis.StrictRedis() - lock = r.lock("netconf_lock") - lock.acquire(blocking=True) - try: - if configuration: - with manager.connect(host=self.device, port=self.port, username=self.username, password=self.password, hostkey_verify=False) as m: - assert(":candidate" in m.server_capabilities) - with m.locked(target='candidate'): - m.discard_changes() - try: - edit_response = m.edit_config(target='candidate', config=configuration.decode("utf-8"), test_option='test-then-set') - edit_is_successful, reason = is_successful(edit_response) - logger.info("Successfully edited @ %s" % self.device) - if not edit_is_successful: - raise Exception() - except SoftTimeLimitExceeded: - cause="Task timeout" - logger.error(cause) - return False, cause - except TimeLimitExceeded: - cause="Task timeout" - logger.error(cause) - return False, cause - except RPCError as e: - cause="NETCONF RPC Error: "+str(e) - logger.error(cause) - m.discard_changes() - return False, cause - except Exception as e: - traceback.print_exc() - cause = "Caught edit exception: type='%s' str='%s' => reason='%s'" % (type(e), str(e), reason) - cause = cause.replace('\n', '') - logger.error(cause) - m.discard_changes() - return False, cause - if edit_is_successful: - try: - if ":confirmed-commit" in m.server_capabilities: - commit_confirmed_response = m.commit(confirmed=True, timeout=settings.COMMIT_CONFIRMED_TIMEOUT) - commit_confirmed_is_successful, reason = is_successful(commit_confirmed_response) - if not commit_confirmed_is_successful: - raise Exception() - else: - logger.info("Successfully confirmed committed @ %s" % self.device) - if not settings.COMMIT: - return True, "Successfully confirmed committed" - else: - commit_response = m.commit(confirmed=False, timeout=settings.COMMIT_CONFIRMED_TIMEOUT) - if commit_response.ok: - logger.info("Successfully committed @ %s" % self.device) - return True, "Successfully committed" - else: - return False, "Failed to commit changes %s" % commit_response.errors - - except SoftTimeLimitExceeded: - cause="Task timeout" - logger.error(cause) - return False, cause - except TimeLimitExceeded: - cause="Task timeout" - logger.error(cause) - return False, cause - except RPCError as e: - cause="NETCONF RPC Error: "+str(e) - logger.error(cause) - m.discard_changes() - return False, cause - except Exception as e: - cause="Caught commit confirmed exception: type='%s' str='%s' => reason='%s'" %(type(e), str(e), reason) - cause=cause.replace('\n', '') - logger.error(cause) - return False, cause - - if settings.COMMIT: - if edit_is_successful and commit_confirmed_is_successful: - try: - commit_response = m.commit(confirmed=False) - commit_is_successful, reason = is_successful(commit_response) - logger.info("Successfully committed @ %s" % self.device) - newconfig = m.get_config(source='running', filter=('subtree',settings.ROUTES_FILTER)).data_xml - retrieve = Retriever(xml=newconfig) - logger.info("[CACHE] caching device configuration") - cache.set("device", retrieve.proccess_xml(), 3600) - - if not commit_is_successful: - raise Exception() - else: - logger.info("Successfully cached device configuration") - return True, "Successfully committed" - except SoftTimeLimitExceeded: - cause="Task timeout" - logger.error(cause) - return False, cause - except TimeLimitExceeded: - cause="Task timeout" - logger.error(cause) - return False, cause - except RPCError as e: - cause="NETCONF RPC Error: "+str(e) - logger.error(cause) - m.discard_changes() - return False, cause - except Exception as e: - cause="Caught commit exception: type='%s' str='%s' => reason='%s'" %(type(e), str(e), reason) - cause=cause.replace('\n', '') - logger.error(cause) - return False, cause - else: - return False, "No configuration was supplied" - except Exception as e: - cause="NETCONF connection exception: %s %s" %(e,reason) - cause=cause.replace('\n', '') - logger.error(cause) - cause_user="NETCONF connection failed" - return False, cause_user - finally: - lock.release() +if not hasattr(settings, "PROXY_CLASS") or settings.PROXY_CLASS == "proxy_netconf_junos": + from utils import proxy_netconf_junos as PR0 +elif settings.PROXY_CLASS == "proxy_exabgp": + from utils import proxy_exabgp as PR0 -def is_successful(response): - if response.ok: - return True, None - elif response.error: - return False, '%s %s' % (response.error.type, response.error.message) - else: - return False, "Unknown error" diff --git a/utils/proxy_exabgp.py b/utils/proxy_exabgp.py new file mode 100644 index 0000000000000000000000000000000000000000..18b34dd3fe9c1f2f0ec4d5ffacec3f0c32d367e9 --- /dev/null +++ b/utils/proxy_exabgp.py @@ -0,0 +1,404 @@ +# -*- coding: utf-8 -*- vim:fileencoding=utf-8: +# vim: tabstop=4:shiftwidth=4:softtabstop=4:expandtab + +# /srv/venv/lib/python3.11/site-packages/exabgp/application/cli.py +#from exabgp.application.cli import main as exabgp_cli_main + +# utils/exabgpcli.py + +from django.conf import settings +from utils.exabgpcli import exabgp_interaction +import utils.rule_spec_utils as route_spec_utils + +from . import jncdevice as np +from ncclient import manager +from ncclient.transport.errors import AuthenticationError, SSHError +from ncclient.operations.rpc import RPCError +from lxml import etree as ET +from django.conf import settings +import logging, os +from django.core.cache import cache +import redis +from celery.exceptions import TimeLimitExceeded, SoftTimeLimitExceeded +from .portrange import parse_portrange +import traceback +from ipaddress import ip_network +import xml.etree.ElementTree as ET + +import flowspec.logging_utils +logger = flowspec.logging_utils.logger_init_default(__name__, "celery_exabpg.log", False) + +#print("loading proxy_exabgp") + +cwd = os.getcwd() + +#def fod_unknown_host_cb(host, fingerprint): +# return True + +#print("loading proxy_exabgp: step1") + +from threading import Lock +lock = Lock() + +def do_exabgp_interaction(command_list): + pre1="[pid:"+str(os.getpid())+"] " + logger.info(pre1+"proxy_exabgp::do_exabgp_interaction(): called") + lock.acquire() + logger.info(pre1+"proxy_exabgp::do_exabgp_interaction(): lock acquired") + ret="" + try: + logger.info(pre1+"proxy_exabgp::do_exabgp_interaction(): before exabgp_interaction") + ret, msg = exabgp_interaction(command_list) + logger.info(pre1+"proxy_exabgp::do_exabgp_interaction(): done with exabgp_interaction") + except Exception as e: + logger.info(pre1+"proxy_exabgp::do_exabgp_interaction(): got exception "+str(e), exc_info=True) + except Error as e: + logger.info(pre1+"proxy_exabgp::do_exabgp_interaction(): got error "+str(e), exc_info=True) + except: + logger.info(pre1+"proxy_exabgp::do_exabgp_interaction(): got unknown error ", exc_info=True) + finally: + lock.release() #release lock + logger.info(pre1+"proxy_exabgp::do_exabgp_interaction(): lock released") + return ret, msg + +class Retriever(object): + def __init__(self, device=settings.NETCONF_DEVICE, username=settings.NETCONF_USER, password=settings.NETCONF_PASS, filter=settings.ROUTES_FILTER, port=settings.NETCONF_PORT, route_name=None, xml=None): + self.device = device + self.username = username + self.password = password + self.port = port + self.filter = filter + self.xml = xml + if route_name: + #self.filter = settings.ROUTE_FILTER%route_name + self.filter = settings.ROUTE_FILTER.replace("%s", route_name) # allow for varying number-of, multiple instances of %s + + def fetch_xml(self): + logger.info("proxy_exabgp::Retriever::fetch_xml(): called") + ret, msg = do_exabgp_interaction(["show adj-rib out"]) + logger.info("proxy_exabgp::Retriever::fetch_xml(): ret="+str(ret)) + return msg + +class Applier(object): + def __init__(self, route_objects=[], route_object=None, route_object_original=None, route_objects_all=[]): + logger.info("proxy_exabgp::Appplier::__init__") + self.route_object = route_object + self.route_objects = route_objects + self.route_object_original = route_object_original + self.route_objects_all = route_objects_all + + def get_existing_config_xml(self): + #route_name = self.get_route_name() + #logger.info("get_existing_config_xml(): route_name="+str(route_name)) + retriever0 = Retriever() + config_xml_running = retriever0.fetch_xml() + #logger.info("proxy::get_existing_config(): config_xml_running="+str(config_xml_running)) + return config_xml_running + + def helper_active_routes_with_same_parameters_exist(self, route, route_objects_all, include_route_self): + list2 = self.helper_get_active_routes_with_same_parameters(route, route_objects_all, True) + logger.info("proxy_exabgp::helper_get_active_routes_with_same_parameters(): route="+str(route)+" => list2="+str(list2)) + route_with_same_params__exists = len(list2)>0 + logger.info("proxy_exabgp::helper_get_active_routes_with_same_parameters(): => ret="+str(route_with_same_params__exists)) + return route_with_same_params__exists + + def helper_get_active_routes_with_same_parameters(self, route, route_objects_all, include_route_self): + ret = [] + route_par_str = self.helper_get_exabgp__route_parameter_string(route) + #logger.info("helper_get_active_exabgp__route_parameter_string(): route_par_str="+str(route_par_str)) + for route2 in route_objects_all: + #logger.info("helper_get_active_exabgp__route_parameter_string(): route2="+str(route2)) + if (include_route_self or route2!=route) and route2.status=="ACTIVE": + if self.helper_get_exabgp__route_parameter_string(route2)==route_par_str: + ret.add(route2) + + return ret + + # e.g.: neighbor 14.0.0.2 ipv4 flow flow destination-ipv4 20.20.20.1/32 source-ipv4 15.10.10.1/32 protocol =tcp destination-port [ >=200&<=400 ] source-port [ >=123&<=129 ] next-hop 14.0.0.2 + def helper_get_exabgp__route_parameter_string(self, route): + ret = "" + + if isinstance(route, dict): + source = route['source'] + destination = route['destination'] + sourceport = route['sourceport'] + destinationport = route['destinationport'] + protocols = route['protocol'] + fragtypes = route['fragmenttype'] + else: + source = route.source + destination = route.destination + sourceport = route.sourceport + destinationport = route.destinationport + protocols = route.protocol.all() + fragtypes = route.fragmenttype.all() + + ret = ret + " source-ipv4 " + str(source) + " " + ret = ret + " destination-ipv4 " + str(destination) + " " + + ip_version = 4 + ip_version1 = ip_network(source).version + ip_version2 = ip_network(destination).version + if ip_version1==4 or ip_version2==4: + ip_version = 4 + elif ip_version1==6 or ip_version2==6: + ip_version = 6 + + ## + + ret1 = route_spec_utils.get_protocols_numbers(protocols, ip_version, output_separator=" ") + if ret1 != "": + ret = ret + " protocol [ " + ret1 + " ]" + + ret1 = route_spec_utils.translate_ports(sourceport, output_separator=" ") + if ret1 != "": + ret = ret + " source-port [ " + ret1 + "]" + + ret1 = route_spec_utils.translate_ports(destinationport, output_separator=" ") + if ret1 != "": + ret = ret + " destination-port [ " + ret1 + "]" + + ret1 = "" + for fragtype in fragtypes: + ret1 = ret1 + str(fragtype) + " " + if ret1!="": + ret = ret + " fragment [ " + ret1 + "]" + + return ret + + def announce_route(self, route): + ret, msg = do_exabgp_interaction("announce flow route "+self.helper_get_exabgp__route_parameter_string(route)) + return ret==0, msg + + def withdraw_route(self, route): + ret, msg = do_exabgp_interaction("withdraw flow route "+self.helper_get_exabgp__route_parameter_string(route)) + return ret==0, msg + + ### + + def apply(self, configuration=None, operation=None): + logger.info("proxy_exabgp::apply(): called operation="+str(operation)) + + try: + route = self.route_object + route_objects_all = self.route_objects_all + route_original = self.route_object_original + if isinstance(route, dict): + route_original__status = route['status'] + else: + route_original__status = route.status + + if route==None or route_objects_all==None: + logger.error("proxy_exabgp::apply(): route and route_objects_all have to be defined") + return False, "route and route_objects_all have to be defined" + + logger.info("proxy_exabgp::apply(): route_object="+str(route)) + str1 = self.helper_get_exabgp__route_parameter_string(route) + logger.info("proxy_exabgp::apply(): => route_spec_str="+str(str1)) + + route_with_same_params__exists = self.helper_active_routes_with_same_parameters_exist(route, route_objects_all, False) + logger.info("proxy_exabgp::apply(): => route_with_same_params__exists="+str(route_with_same_params__exists)) + logger.info("proxy_exabgp::apply(): => route.status="+str(route.status)) + + ## + + if operation == "delete": + logger.info("proxy_exabgp::apply(): requesting a delete operation") + if route_with_same_params__exists: + logger.info("proxy_exabgp::apply(): route_with_same_params__exists, nothing todo; list2="+str(list2)) + status =True + msg = "route_with_same_params__exists, nothing todo" + elif route_original__status!="INACTIVE" and route_original__status!="PENDING": + logger.info("proxy_exabgp::apply(): route_original__status!=INACTIVE/PENDING, ignoring request") + status = True + msg = "status!=INACTIVE/PENDING, ignoring request" + else: + logger.info("proxy_exabgp::apply(): actually have to withdraw route") + status, msg1 = self.withdraw_route(route) + logger.info("proxy_exabgp::apply(): withdrawing done status="+str(status)+", "+str(msg1)) + msg = "withdraw route: "+str(msg1) + if status: + return status, "successfully committed", msg + else: + return status, msg, msg + + elif operation == "replace": + logger.info("proxy_exabgp::apply(): requesting a replace operation") + + logger.info("proxy_exabgp::apply(): route_original="+str(route_original)) + if route_original==None: + logger.error("proxy_exabgp::apply(): route_original has to be defined") + return False, "route_original has to be defined" + + route__spec = self.helper_get_exabgp__route_parameter_string(route) + logger.info("proxy_exabgp::apply(): route__spec="+str(route__spec)) + route_original__spec = self.helper_get_exabgp__route_parameter_string(route_original) + logger.info("proxy_exabgp::apply(): route_original__spec="+str(route_original__spec)) + + route_with_same_old_params__exists = self.helper_active_routes_with_same_parameters_exist(route_original, route_objects_all, False) + logger.info("proxy_exabgp::apply(): => route_with_same_old_params__exists="+str(route_with_same_old_params__exists)) + + route_status_changed = route_original__status!=route.status or route.status=="PENDING" + logger.info("proxy_exabgp::apply(): => route_original__status="+str(route_original__status)) + logger.info("proxy_exabgp::apply(): => route.status="+str(route.status)) + logger.info("proxy_exabgp::apply(): => route_status_changed="+str(route_status_changed)) + + if route.status!="ACTIVE" and route.status!="PENDING": + logger.info("proxy_exabgp::apply(): route status="+str(route.status)+"!=ACTIVE/PENDING, ignoring request") + status = True + msg = "status!=ACTIVE/PENDING, ignoring request" + elif route__spec==route_original__spec and not route_status_changed: + #logger.info("proxy_exabgp::apply(): route effetively did not change in parameters or status") + #return True, "nothing todo" + logger.info("proxy_exabgp::apply(): route effetively did not change in parameters or status; anyway ensuring route is announced") + status, msg1 = self.announce_route(route) + logger.info("proxy_exabgp::apply(): announcing done status="+str(status)+", "+str(msg1)) + msg = "re-announce unchanged flow: "+str(msg1) + elif route__spec==route_original__spec and not route_status_changed: + logger.info("proxy_exabgp::apply(): route effetively did not change in parameters but in status; announcing route") + status, msg1 = self.announce_route(route) + logger.info("proxy_exabgp::apply(): announcing done status="+str(status)+", "+str(msg1)) + msg = "announce (re)-added flow: "+str(msg1) + + else: + + status_del = True + if route_with_same_old_params__exists: + logger.info("proxy_exabgp::apply(): route_with_same_old_params__exists => no need to withdraw old route") + status_del = True + msg_del = "route_with_same_old_params__exists, nothing todo" + else: + logger.info("proxy_exabgp::apply(): NO route_with_same_old_params__exists => need to withdraw old route") + status_del, msg1 = self.withdraw_route(route_original) + logger.info("proxy_exabgp::apply(): withdrawing done status="+str(status_del)+", "+str(msg1)) + msg_del = "withdraw old flow: "+str(msg1)+"; " + + if route_with_same_params__exists: + #logger.info("proxy_exabgp::apply(): route_with_same_params__exists => no need to announce changed route") + logger.info("proxy_exabgp::apply(): route_with_same_params__exists; anyway ensuring route is announced") + status, msg1 = self.announce_route(route) + logger.info("proxy_exabgp::apply(): announcing done status="+str(status)+", "+str(msg1)) + status = status_del and status + msg = msg_del+"re-announced changed flow: "+str(msg1) + else: + logger.info("proxy_exabgp::apply(): NO route_with_same_params__exists => need to announce changed route") + status, msg1 = self.announce_route(route) + logger.info("proxy_exabgp::apply(): announcing done status="+str(status)+", "+str(msg1)) + status = status_del and status + msg = msg_del+"announced changed flow: "+str(msg1) + + if status: + return status, "successfully committed", msg + else: + return status, msg, msg + + else: # add operation + logger.info("proxy_exabgp::apply(): requesting (implicitly) an add operation") + + if route.status!="ACTIVE" and route.status!="PENDING": + logger.info("proxy_exabgp::apply(): route.status="+str(route.status)+", ignoring request") + status = True + msg = "status!=ACTIVE/PENDING, ignoring request" + elif route_with_same_params__exists: + logger.info("proxy_exabgp::apply(): route_with_same_params__exists, nothing todo; list2="+str(list2)) + status = True + msg = "route_with_same_params__exists, nothing todo" + else: + logger.info("proxy_exabgp::apply(): actually have to announce route") + status, msg1 = self.announce_route(route) + logger.info("proxy_exabgp::apply(): announcing done status="+str(status)+", "+str(msg1)) + msg = "announce new flow: "+str(msg1) + + if status: + return status, "successfully committed", msg + else: + return status, msg, msg + + except Exception as e: + logger.error("proxy_exabgp::apply(): got exception="+str(e), exc_info=True) + +# def delete_routes(self): +# if self.route_objects: +# logger.info("Generating XML config") +# device = np.Device() +# flow = np.Flow() +# for route_object in self.route_objects: +# route_obj = route_object +# route = np.Route() +# flow.routes.append(route) +# route.name = route_obj.name +# route.operation = 'delete' +# device.routing_options.append(flow) +# device = device.export(netconf_config=True) +# return ET.tostring(device) +# else: +# return False +# +# def get_route_name(self): +# route_name=None +# if self.route_object: +# # support for dummy route_object as dicts +# if isinstance(self.route_object, dict): +# route_name = self.route_object["name"] +# else: +# route_name = self.route_object.name +# +# return route_name +# +# def get_existing_config_xml(self): +# route_name = self.get_route_name() +# logger.info("get_existing_config_xml(): route_name="+str(route_name)) +# retriever0 = Retriever(xml=None, route_name=route_name) +# config_xml_running = retriever0.fetch_xml() +# #logger.info("proxy::get_existing_config(): config_xml_running="+str(config_xml_running)) +# return config_xml_running +# +# def get_existing_config_xml_generic(self): +# route_name = self.get_route_name() +# logger.info("get_existing_config_xml_generic(): route_name="+str(route_name)) +# retriever0 = Retriever(xml=None, route_name=route_name) +# config_xml_running = retriever0.proccess_xml_generic() +# #logger.info("proxy::get_existing_config(): config_xml_running="+str(config_xml_running)) +# return config_xml_running +# +# def get_existing_config(self): +# route_name = self.get_route_name() +# logger.info("get_existing_config_xml(): route_name="+str(route_name)) +# retriever0 = Retriever(xml=None) +# config_parsed = retriever0.proccess_xml() +# #logger.info("proxy::get_existing_config(): config_parsed="+str(config_parsed)) +# return config_parsed +# +# def get_existing_routes(self): +# #config_parsed = self.get_existing_config_xml() +# config_parsed = self.get_existing_config_xml_generic() +# if True: +# routes_existing = [] +# logger.info("config_parsed="+str(config_parsed)) +# #logger.info("config_parsed="+str(ET.dump(config_parsed))) +# #flow = config_parsed.routing_options[0] +# #for route in config_parsed.iter('ns1:route'): +# for route in config_parsed.findall(".//{http://xml.juniper.net/xnm/1.1/xnm}route"): +# logger.info("proxy::get_existing_routes(): found route="+str(route)) +# routes_existing.append(route) +# return routes_existing +# else: +# logger.info("proxy::get_existing_routes(): no routing_options or is empty") +# return [] +# +# def get_existing_route_names(self): +# routes_existing = self.get_existing_routes() +# #route_ids_existing = [route.name for route in routes_existing] +# #route_ids_existing = [ET.SubElement(route, './/{http://xml.juniper.net/xnm/1.1/xnm}name') for route in routes_existing] +# route_ids_existing = [route.find('.//{http://xml.juniper.net/xnm/1.1/xnm}name').text for route in routes_existing] +# logger.info("proxy::get_existing_route_names(): config_parsed.flow.routes.ids="+str(route_ids_existing)) +# return route_ids_existing +# +#def is_successful(response): +# if response.ok: +# return True, None +# elif response.error: +# return False, '%s %s' % (response.error.type, response.error.message) +# else: +# return False, "Unknown error" + diff --git a/utils/proxy_netconf_junos.py b/utils/proxy_netconf_junos.py new file mode 100644 index 0000000000000000000000000000000000000000..3051d06d3be0a2824548fca05de3a29972f54463 --- /dev/null +++ b/utils/proxy_netconf_junos.py @@ -0,0 +1,432 @@ +# -*- coding: utf-8 -*- vim:fileencoding=utf-8: +# vim: tabstop=4:shiftwidth=4:softtabstop=4:expandtab + +# Copyright (C) 2010-2014 GRNET S.A. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +from . import jncdevice as np +from ncclient import manager +from ncclient.transport.errors import AuthenticationError, SSHError +from ncclient.operations.rpc import RPCError +from lxml import etree as ET +from django.conf import settings +import logging, os +from django.core.cache import cache +import redis +from celery.exceptions import TimeLimitExceeded, SoftTimeLimitExceeded +from .portrange import parse_portrange +import traceback +from ipaddress import ip_network +#import xml.etree.ElementTree as ET + +import flowspec.logging_utils +logger = flowspec.logging_utils.logger_init_default(__name__, "celery_netconf.log", False) + +print("loading proxy_netconf_junos") + +cwd = os.getcwd() + +def fod_unknown_host_cb(host, fingerprint): + return True + + +class Retriever(object): + def __init__(self, device=settings.NETCONF_DEVICE, username=settings.NETCONF_USER, password=settings.NETCONF_PASS, filter=settings.ROUTES_FILTER, port=settings.NETCONF_PORT, route_name=None, xml=None): + self.device = device + self.username = username + self.password = password + self.port = port + self.filter = filter + self.xml = xml + if route_name: + #self.filter = settings.ROUTE_FILTER%route_name + self.filter = settings.ROUTE_FILTER.replace("%s", route_name) # allow for varying number-of, multiple instances of %s + + def fetch_xml(self): + with manager.connect(host=self.device, port=self.port, username=self.username, password=self.password, hostkey_verify=False) as m: + xmlconfig = m.get_config(source='running', filter=('subtree',self.filter)).data_xml + return xmlconfig + + def get_xml(self): + if self.xml: + xmlconfig = self.xml + else: + xmlconfig = self.fetch_xml() + return xmlconfig + + def proccess_xml(self): + xmlconfig = self.get_xml(); + parser = np.Parser() + parser.confile = xmlconfig + device = parser.export() + return device + + def proccess_xml_generic(self): + xmlconfig = self.get_xml(); + root = ET.fromstring(xmlconfig) + return root + + def fetch_device(self): + device = cache.get("device") + logger.info("[CACHE] hit! got device") + if device: + return device + else: + device = self.proccess_xml() + if device.routing_options: + cache.set("device", device, 3600) + logger.info("[CACHE] miss, setting device") + return device + else: + return False + + +class Applier(object): + def __init__(self, route_objects=[], route_object=None, route_object_original=None, route_objects_all=[], device=settings.NETCONF_DEVICE, username=settings.NETCONF_USER, password=settings.NETCONF_PASS, port=settings.NETCONF_PORT): + self.route_object = route_object + self.route_objects = route_objects + self.route_objects_all = route_objects_all + self.device = device + self.username = username + self.password = password + self.port = port + + def helper_fill_source_and_destination_to_xml(self, route_obj, route, is_ipv4): + + if route_obj.source: + if is_ipv4: + logger.info("source ipv4") + route.match['source'].append(route_obj.source) + else: + logger.info("source ipv6") + route.match['source-v6'].append(route_obj.source) + + if route_obj.destination: + if is_ipv4: + logger.info("destination ipv4") + route.match['destination'].append(route_obj.destination) + else: + logger.info("destination ipv6") + route.match['destination-v6'].append(route_obj.destination) + + def to_xml(self, operation=None): + logger.info("Operation: %s"%operation) + + if self.route_object: + + try: + settings.PORTRANGE_LIMIT + except: + settings.PORTRANGE_LIMIT = 100 + logger.info("Generating XML config") + + route_obj = self.route_object + + is_ipv4 = self.route_object.is_ipv4() + logger.info("proxy::to_xml(): is_ipv4="+str(is_ipv4)) + + device = np.Device() + flow = np.Flow(is_ipv4) + route = np.Route() + flow.routes.append(route) + device.routing_options.append(flow) + route.name = route_obj.name + + if operation == "delete": + logger.info("Requesting a delete operation") + route.operation = operation + device = device.export(netconf_config=True) + return ET.tostring(device) + + self.helper_fill_source_and_destination_to_xml(route_obj, route, is_ipv4) + + try: + if route_obj.protocol: + for protocol in route_obj.protocol.all(): + route.match['protocol'].append(protocol.protocol) + except: + pass + try: + ports = [] + if route_obj.port: + portrange = str(route_obj.port) + for port in portrange.split(","): + route.match['port'].append(port) + except: + pass + try: + ports = [] + if route_obj.destinationport: + portrange = str(route_obj.destinationport) + for port in portrange.split(","): + route.match['destination-port'].append(port) + except: + pass + try: + if route_obj.sourceport: + portrange = str(route_obj.sourceport) + for port in portrange.split(","): + route.match['source-port'].append(port) + except: + pass + if route_obj.icmpcode: + route.match['icmp-code'].append(route_obj.icmpcode) + if route_obj.icmptype: + route.match['icmp-type'].append(route_obj.icmptype) + if route_obj.tcpflag: + route.match['tcp-flags'].append(route_obj.tcpflag) + try: + if route_obj.dscp: + for dscp in route_obj.dscp.all(): + route.match['dscp'].append(dscp.dscp) + except: + pass + + try: + if route_obj.fragmenttype: + for frag in route_obj.fragmenttype.all(): + route.match['fragment'].append(frag.fragmenttype) + except: + pass + + for thenaction in route_obj.then.all(): + if thenaction.action_value: + route.then[thenaction.action] = thenaction.action_value + else: + route.then[thenaction.action] = True + if operation == "replace": + logger.info("Requesting a replace operation") + route.operation = operation + device = device.export(netconf_config=True) + result = ET.tostring(device) + logger.info("result="+str(result)) + return result + else: + return False + + def delete_routes(self): + if self.route_objects: + logger.info("Generating XML config") + device = np.Device() + flow = np.Flow() + for route_object in self.route_objects: + route_obj = route_object + route = np.Route() + flow.routes.append(route) + route.name = route_obj.name + route.operation = 'delete' + device.routing_options.append(flow) + device = device.export(netconf_config=True) + return ET.tostring(device) + else: + return False + + def get_route_name(self): + route_name=None + if self.route_object: + # support for dummy route_object as dicts + if isinstance(self.route_object, dict): + route_name = self.route_object["name"] + else: + route_name = self.route_object.name + + return route_name + + def get_existing_config_xml(self): + route_name = self.get_route_name() + logger.info("get_existing_config_xml(): route_name="+str(route_name)) + retriever0 = Retriever(xml=None, route_name=route_name) + config_xml_running = retriever0.fetch_xml() + #logger.info("proxy::get_existing_config(): config_xml_running="+str(config_xml_running)) + return config_xml_running + + def get_existing_config_xml_generic(self): + route_name = self.get_route_name() + logger.info("get_existing_config_xml_generic(): route_name="+str(route_name)) + retriever0 = Retriever(xml=None, route_name=route_name) + config_xml_running = retriever0.proccess_xml_generic() + #logger.info("proxy::get_existing_config(): config_xml_running="+str(config_xml_running)) + return config_xml_running + + def get_existing_config(self): + route_name = self.get_route_name() + logger.info("get_existing_config_xml(): route_name="+str(route_name)) + retriever0 = Retriever(xml=None) + config_parsed = retriever0.proccess_xml() + #logger.info("proxy::get_existing_config(): config_parsed="+str(config_parsed)) + return config_parsed + + def get_existing_routes(self): + #config_parsed = self.get_existing_config_xml() + config_parsed = self.get_existing_config_xml_generic() + if True: + routes_existing = [] + logger.info("config_parsed="+str(config_parsed)) + #logger.info("config_parsed="+str(ET.dump(config_parsed))) + #flow = config_parsed.routing_options[0] + #for route in config_parsed.iter('ns1:route'): + for route in config_parsed.findall(".//{http://xml.juniper.net/xnm/1.1/xnm}route"): + logger.info("proxy::get_existing_routes(): found route="+str(route)) + routes_existing.append(route) + return routes_existing + else: + logger.info("proxy::get_existing_routes(): no routing_options or is empty") + return [] + + def get_existing_route_names(self): + routes_existing = self.get_existing_routes() + #route_ids_existing = [route.name for route in routes_existing] + #route_ids_existing = [ET.SubElement(route, './/{http://xml.juniper.net/xnm/1.1/xnm}name') for route in routes_existing] + route_ids_existing = [route.find('.//{http://xml.juniper.net/xnm/1.1/xnm}name').text for route in routes_existing] + logger.info("proxy::get_existing_route_names(): config_parsed.flow.routes.ids="+str(route_ids_existing)) + return route_ids_existing + + + def apply(self, configuration = None, operation=None): + reason = None + if not configuration: + configuration = self.to_xml(operation=operation) + edit_is_successful = False + commit_confirmed_is_successful = False + commit_is_successful = False + r = redis.StrictRedis() + lock = r.lock("netconf_lock") + lock.acquire(blocking=True) + try: + if configuration: + with manager.connect(host=self.device, port=self.port, username=self.username, password=self.password, hostkey_verify=False) as m: + assert(":candidate" in m.server_capabilities) + with m.locked(target='candidate'): + m.discard_changes() + try: + edit_response = m.edit_config(target='candidate', config=configuration.decode("utf-8"), test_option='test-then-set') + edit_is_successful, reason = is_successful(edit_response) + logger.info("Successfully edited @ %s" % self.device) + if not edit_is_successful: + raise Exception() + except SoftTimeLimitExceeded: + cause="Task timeout" + logger.error(cause) + return False, cause, cause + except TimeLimitExceeded: + cause="Task timeout" + logger.error(cause) + return False, cause, cause + except RPCError as e: + cause="NETCONF RPC Error: "+str(e) + logger.error(cause) + m.discard_changes() + return False, cause, cause + except Exception as e: + traceback.print_exc() + cause = "Caught edit exception: type='%s' str='%s' => reason='%s'" % (type(e), str(e), reason) + cause = cause.replace('\n', '') + logger.error(cause) + m.discard_changes() + return False, cause, cause + if edit_is_successful: + try: + if ":confirmed-commit" in m.server_capabilities: + commit_confirmed_response = m.commit(confirmed=True, timeout=settings.COMMIT_CONFIRMED_TIMEOUT) + commit_confirmed_is_successful, reason = is_successful(commit_confirmed_response) + if not commit_confirmed_is_successful: + raise Exception() + else: + logger.info("Successfully confirmed committed @ %s" % self.device) + if not settings.COMMIT: + return True, "Successfully confirmed committed", "NETCONF: "+str(reason) + else: + commit_response = m.commit(confirmed=False, timeout=settings.COMMIT_CONFIRMED_TIMEOUT) + if commit_response.ok: + logger.info("Successfully committed @ %s" % self.device) + return True, "Successfully committed", "NETCONF OK" + else: + cause = "Failed to commit changes %s" % commit_response.erros + return False, cause, cause + + except SoftTimeLimitExceeded: + cause="Task timeout" + logger.error(cause) + return False, cause, cause + except TimeLimitExceeded: + cause="Task timeout" + logger.error(cause) + return False, cause, cause + except RPCError as e: + cause="NETCONF RPC Error: "+str(e) + logger.error(cause) + m.discard_changes() + return False, cause, cause + except Exception as e: + cause="Caught commit confirmed exception: type='%s' str='%s' => reason='%s'" %(type(e), str(e), reason) + cause=cause.replace('\n', '') + logger.error(cause) + return False, cause, cause + + if settings.COMMIT: + if edit_is_successful and commit_confirmed_is_successful: + try: + commit_response = m.commit(confirmed=False) + commit_is_successful, reason = is_successful(commit_response) + logger.info("Successfully committed @ %s" % self.device) + newconfig = m.get_config(source='running', filter=('subtree',settings.ROUTES_FILTER)).data_xml + retrieve = Retriever(xml=newconfig) + logger.info("[CACHE] caching device configuration") + cache.set("device", retrieve.proccess_xml(), 3600) + + if not commit_is_successful: + raise Exception() + else: + logger.info("Successfully cached device configuration") + return True, "Successfully committed", "NETCONF: OK" + except SoftTimeLimitExceeded: + cause="Task timeout" + logger.error(cause) + return False, cause, cause + except TimeLimitExceeded: + cause="Task timeout" + logger.error(cause) + return False, cause, cause + except RPCError as e: + cause="NETCONF RPC Error: "+str(e) + logger.error(cause) + m.discard_changes() + return False, cause, cause + except Exception as e: + cause="Caught commit exception: type='%s' str='%s' => reason='%s'" %(type(e), str(e), reason) + cause=cause.replace('\n', '') + logger.error(cause) + return False, cause, cause + else: + return False, "No configuration was supplied" + except Exception as e: + cause="NETCONF connection exception: %s %s" %(e,reason) + cause=cause.replace('\n', '') + logger.error(cause) + cause_user="NETCONF connection failed" + return False, cause_user, cause + finally: + lock.release() + + +def is_successful(response): + if response.ok: + return True, None + elif response.error: + return False, '%s %s' % (response.error.type, response.error.message) + else: + return False, "Unknown error" + diff --git a/flowspec/junos.py b/utils/rule_spec_utils.py similarity index 93% rename from flowspec/junos.py rename to utils/rule_spec_utils.py index f48a2d154860d8abb6c0176cfcd7d743ad2eef7a..c4f31394dfcfb7a6683874662d0bda663c313b5e 100644 --- a/flowspec/junos.py +++ b/utils/rule_spec_utils.py @@ -2,7 +2,7 @@ from django.conf import settings import flowspec.logging_utils -logger = flowspec.logging_utils.logger_init_default(__name__, "celery_junos.log", False) +logger = flowspec.logging_utils.logger_init_default(__name__, "celery_flow_spec_utils.log", False) PROTOCOL_NUMBERS = { 'HOPOPT': '0', @@ -146,17 +146,17 @@ PROTOCOL_NUMBERS = { 'ROHC': '142' } -def get_protocols_numbers(protocols_set, ip_version): +def get_protocols_numbers(protocols_set, ip_version, output_separator=","): if protocols_set: protocols = 'proto' for protocol in protocols_set: protoNo = PROTOCOL_NUMBERS.get(protocol.protocol.upper()) if ip_version==6 and (protoNo==1 or protocol.protocol=="icmp"): - protocols += '=%s,' % PROTOCOL_NUMBERS.get("IPv6-ICMP") + protocols += '=%s'+output_separator % PROTOCOL_NUMBERS.get("IPv6-ICMP") elif protoNo: - protocols += '=%s,' % PROTOCOL_NUMBERS.get(protocol.protocol.upper()) + protocols += '=%s'+output_separator % PROTOCOL_NUMBERS.get(protocol.protocol.upper()) else: - protocols += '=%s,' % protocol.protocol + protocols += '=%s'+output_separator % protocol.protocol return protocols else: return '' @@ -188,7 +188,7 @@ def get_range(addr_range): addr_range += '/%s' % mask return addr_range + ',' -def translate_ports(portstr): +def translate_ports(portstr, output_separator=","): res = [] if portstr: for p in portstr.split(","): @@ -198,7 +198,7 @@ def translate_ports(portstr): res.append(">=" + boundary[0] + "&<=" + boundary[1]) else: res.append("=" + p) - return ",".join(res) + return output_separator.join(res) else: return ""