Skip to content
Snippets Groups Projects
  • Tomáš Čejka's avatar
    65f2497c
    restapi: reworked deactivate and delete · 65f2497c
    Tomáš Čejka authored
    Added new url for deactivate to split the functionality of deactivation and deletion.
    Delete process is allowed only for enabled users, the process includes deactivation as well.
    Reworked "retry" of the Celery tasks and added new settings variable NETCONF_MAX_RETRY_BEFORE_ERROR.
    65f2497c
    History
    restapi: reworked deactivate and delete
    Tomáš Čejka authored
    Added new url for deactivate to split the functionality of deactivation and deletion.
    Delete process is allowed only for enabled users, the process includes deactivation as well.
    Reworked "retry" of the Celery tasks and added new settings variable NETCONF_MAX_RETRY_BEFORE_ERROR.
tasks.py 17.40 KiB
# -*- coding: utf-8 -*- vim:fileencoding=utf-8:
# vim: tabstop=4:shiftwidth=4:softtabstop=4:expandtab

# Copyright (C) 2010-2014 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import pytest
from utils import proxy as PR
from celery import shared_task, subtask
import logging
import json
from django.conf import settings
import datetime
from django.core.mail import send_mail
from django.template.loader import render_to_string
import os
from celery.exceptions import TimeLimitExceeded, SoftTimeLimitExceeded
from ipaddress import *
from os import fork,_exit
from sys import exit
import time
import redis

LOG_FILENAME = os.path.join(settings.LOG_FILE_LOCATION, 'celery_jobs.log')

# FORMAT = '%(asctime)s %(levelname)s: %(message)s'
# logging.basicConfig(format=FORMAT)
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler(LOG_FILENAME)
handler.setFormatter(formatter)
logger.addHandler(handler)

@shared_task(ignore_result=True, autoretry_for=(TimeoutError, TimeLimitExceeded, SoftTimeLimitExceeded), retry_backoff=True, retry_kwargs={'max_retries': settings.NETCONF_MAX_RETRY_BEFORE_ERROR})
def add(routepk, callback=None):
    from flowspec.models import Route
    route = Route.objects.get(pk=routepk)
    applier = PR.Applier(route_object=route)
    commit, response = applier.apply()
    if commit:
        route.status = "ACTIVE"
        #snmp_add_initial_zero_value.delay(str(route.id), True)
        snmp_add_initial_zero_value(str(route.id), True)
    else:
        if deactivate_route.request.retries < settings.NETCONF_MAX_RETRY_BEFORE_ERROR:
            # repeat the action
            raise TimeoutError()
        route.status = "ERROR"
    route.response = response
    route.save()
    announce("[%s] Rule add: %s - Result: %s" % (route.applier_username_nice, route.name, response), route.applier, route)


@shared_task(ignore_result=True, autoretry_for=(TimeLimitExceeded, SoftTimeLimitExceeded), retry_backoff=True, retry_kwargs={'max_retries': settings.NETCONF_MAX_RETRY_BEFORE_ERROR})
def edit(routepk, callback=None):
    from flowspec.models import Route
    route = Route.objects.get(pk=routepk)
    status_pre = route.status
    logger.info("tasks::edit(): route="+str(route)+", status_pre="+str(status_pre))
    applier = PR.Applier(route_object=route)
    commit, response = applier.apply(operation="replace")
    if commit:
        route.status = "ACTIVE"
        try:
          #snmp_add_initial_zero_value.delay(str(route.id), True)
          snmp_add_initial_zero_value(str(route.id), True)
        except Exception as e:
          logger.error("tasks::edit(): route="+str(route)+", ACTIVE, add_initial_zero_value failed: "+str(e))
    else:
        if deactivate_route.request.retries < settings.NETCONF_MAX_RETRY_BEFORE_ERROR:
            # repeat the action
            raise TimeoutError()
        route.status = "ERROR"
    route.response = response
    route.save()
    announce("[%s] Rule edit: %s - Result: %s" % (route.applier_username_nice, route.name, response), route.applier, route)

@shared_task(ignore_result=True, autoretry_for=(TimeoutError, TimeLimitExceeded, SoftTimeLimitExceeded), retry_backoff=True, retry_kwargs={'max_retries': settings.NETCONF_MAX_RETRY_BEFORE_ERROR})
def deactivate_route(routepk, **kwargs):
    """Deactivate the Route in ACTIVE state. Permissions must be checked before this call."""
    from flowspec.models import Route
    route = Route.objects.get(pk=routepk)
    initial_status = route.status
    if initial_status not in ("ACTIVE", "PENDING", "ERROR"):
        logger.error("tasks::deactivate(): Cannot deactivate route that is not in ACTIVE or potential ACTIVE status.")
        return

    applier = PR.Applier(route_object=route)
    # Delete from router via NETCONF
    commit, response = applier.apply(operation="delete")
    reason_text = ''
    logger.info("tasks::delete(): initial_status="+str(initial_status))
    if commit:
        route.status="INACTIVE"
        try:
            snmp_add_initial_zero_value(str(route.id), False)
        except Exception as e:
            logger.error("edit(): route="+str(route)+", INACTIVE, add_null_value failed: "+str(e))

        announce("[%s] Suspending rule : %s%s- Result %s" % (route.applier_username_nice, route.name, reason_text, response), route.applier, route)
        route.status = "INACTIVE"
        route.response = response
        route.save()
        route.commit_deactivate()
        return
    else: # removing rule in NETCONF failed, it is still ACTIVE and also collects statistics
        # NETCONF "delete" operation failed, keep the object in DB
        if deactivate_route.request.retries < settings.NETCONF_MAX_RETRY_BEFORE_ERROR:
            # repeat the action
            raise TimeoutError()
        else:
            if "reason" in kwargs and kwargs['reason'] == 'EXPIRED':
                status = 'EXPIRED'
                reason_text = " Reason: %s " % status
            else:
                status = "ERROR"
            route.status = status
            route.response = response
            route.save()
            announce("[%s] Suspending rule : %s%s- Result %s" % (route.applier_username_nice, route.name, reason_text, response), route.applier, route)

@shared_task(ignore_result=True, autoretry_for=(TimeoutError, TimeLimitExceeded, SoftTimeLimitExceeded), retry_backoff=True, retry_kwargs={'max_retries': settings.NETCONF_MAX_RETRY_BEFORE_ERROR})
def delete_route(routepk, **kwargs):
    """For Route in ACTIVE state, deactivate it at first. Finally, delete the Route from the DB. Permissions must be checked before this call."""
    from flowspec.models import Route
    route = Route.objects.get(pk=routepk)
    if route.status != "INACTIVE":
        logger.info("Deactivating active route...")
        # call deactivate_route() directly since we are already on background (celery task)
        try:
            deactivate_route(routepk)
        except TimeoutError:
            pass
        if route.status != "INACTIVE" and delete_route.request.retries < settings.NETCONF_MAX_RETRY_BEFORE_ERROR:
            # Repeat due to error in deactivation
            route.status = "PENDING"
            route.save()
            logger.error("Deactivation failed, repeat the deletion process.")
            raise TimeoutError()
            
    if route.status == "INACTIVE":
        logger.info("Deleting inactive route...")
        route.delete()
        logger.info("Deleting finished.")
    else:
        route.status = "ERROR"
        route.save()
        logger.error("Deleting Route failed, it could not be deactivated - remaining in DB.")
    return

# May not work in the first place... proxy is not aware of Route models
@shared_task
def batch_delete(routes, **kwargs):
    if routes:
        for route in routes:
            route.status = 'PENDING';route.save()
        applier = PR.Applier(route_objects=routes)
        conf = applier.delete_routes()
        commit, response = applier.apply(configuration=conf)
        reason_text = ''
        if commit:
            status = "INACTIVE"
            if "reason" in kwargs and kwargs['reason'] == 'EXPIRED':
                status = 'EXPIRED'
                reason_text = " Reason: %s " % status
            elif "reason" in kwargs and kwargs['reason'] != 'EXPIRED':
                status = kwargs['reason']
                reason_text = " Reason: %s " % status
        else:
            status = "ERROR"
        for route in routes:
            route.status = status
            route.response = response
            route.expires = datetime.date.today()
            route.save()
            announce("[%s] Rule removal: %s%s- Result %s" % (route.applier_username_nice, route.name, reason_text, response), route.applier, route)
    else:
        return False


@shared_task(ignore_result=True)
def announce(messg, user, route):
    peers = user.userprofile.peers.all()
    username = user.username
    tgt_net = ip_network(route.destination)
    for peer in peers:
        for network in peer.networks.all():
            net = ip_network(network)
            logger.info("ANNOUNCE check ip " + str(ip_network(route.destination)) + str(type(ip_network(route.destination))) + " in net " + str(net) + str(type(net)))
            # check if the target is a subnet of peer range (python3.6 doesn't have subnet_of())
            if tgt_net.network_address >= net.network_address and tgt_net.broadcast_address <= net.broadcast_address:
                username = peer.peer_tag
                logger.info("ANNOUNCE found peer " + str(username))
                break

    messg = str(messg)
    logger.info("ANNOUNCE " + messg)
    r = redis.StrictRedis()
    key = "notifstream_%s" % username
    obj = {"m": messg, "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
    logger.info("ANNOUNCE " + str(obj))
    lastid = r.xadd(key, obj, maxlen=settings.NOTIF_STREAM_MAXSIZE, approximate=False)
    logger.info("ANNOUNCE key " + key + " with lastid " + lastid.decode("utf-8"))
    r.expire(key, settings.NOTIF_STREAM_MAXLIFE)

@shared_task(ignore_result=True,default_retry_delay=5,max_retries=2,autoretry_for=(TimeLimitExceeded, SoftTimeLimitExceeded))
def check_sync(route_name=None, selected_routes=[]):
    from flowspec.models import Route
    if not selected_routes:
        routes = Route.objects.all()
    else:
        routes = selected_routes
    if route_name:
        routes = routes.filter(name=route_name)
    for route in routes:
        if route.has_expired() and (route.status != 'EXPIRED' and route.status != 'ADMININACTIVE' and route.status != 'INACTIVE' and route.status != 'INACTIVE_TODELETE' and route.status != 'PENDING_TODELETE'):
            if route.status != 'ERROR':
                logger.info('Expiring %s route %s' %(route.status, route.name))
                subtask(delete).delay(route, reason="EXPIRED")
        else:
            if route.status != 'EXPIRED':
                route.check_sync()


@shared_task(ignore_result=True)
def notify_expired():
    from flowspec.models import Route
    from django.contrib.sites.models import Site
    logger.info('Initializing expiration notification')
    routes = Route.objects.all()
    for route in routes:
        if route.status not in ['EXPIRED', 'ADMININACTIVE', 'INACTIVE', 'INACTIVE_TODELETE', 'PENDING_TODELETE', 'ERROR']:
            expiration_days = (route.expires - datetime.date.today()).days
            if expiration_days < settings.EXPIRATION_NOTIFY_DAYS:
                try:
                    fqdn = Site.objects.get_current().domain
                    admin_url = "https://%s%s" % \
                    (fqdn,
                     "/edit/%s"%route.name)
                    mail_body = render_to_string("rule_action.txt",
                                             {"route": route, 'expiration_days':expiration_days, 'action':'expires', 'url':admin_url})
                    days_num = ' days'
                    expiration_days_text = "%s %s" %('in',expiration_days)
                    if expiration_days == 0:
                        days_num = ' today'
                        expiration_days_text = ''
                    if expiration_days == 1:
                        days_num = ' day'
                    logger.info('Route %s expires %s%s. Notifying %s (%s)' %(route.name, expiration_days_text, days_num, route.applier.username, route.applier.email))
                    send_mail(settings.EMAIL_SUBJECT_PREFIX + "Rule %s expires %s%s" %
                              (route.name,expiration_days_text, days_num),
                              mail_body, settings.SERVER_EMAIL,
                              [route.applier.email])
                except Exception as e:
                    logger.info("Exception: %s"%e)
                    pass
    logger.info('Expiration notification process finished')

##############################################################################
##############################################################################
# snmp task handling (including helper functions)

import os
import signal

def handleSIGCHLD(signal, frame):
  logger.info("handleSIGCHLD(): reaping childs")
  os.waitpid(-1, os.WNOHANG)

def snmp_lock_create(wait=0):
    first=1
    success=0
    while first or wait:
      first=0
      try:
          os.mkdir(settings.SNMP_POLL_LOCK)
          logger.error("snmp_lock_create(): creating lock dir succeeded")
          success=1
          return success
      except OSError as e:
          logger.error("snmp_lock_create(): creating lock dir failed: OSError: "+str(e))
          success=0
      except Exception as e:
          logger.error("snmp_lock_create(): Lock already exists")
          logger.error("snmp_lock_create(): creating lock dir failed: "+str(e))
          success=0
      if not success and wait:
        time.sleep(1)
    return success;

def snmp_lock_remove():
    try:
      os.rmdir(settings.SNMP_POLL_LOCK)
    except Exception as e:
      logger.info("snmp_lock_remove(): failed "+str(e))

def exit_process():
    import sys
    pid = os.getpid()
    logger.info("exit_process(): before exit in child process (pid="+str(pid)+")")
    exit()
    logger.info("exit_process(): before exit in child process (pid="+str(pid)+"), after exit")
    sys.exit()
    logger.info("exit_process(): before exit in child process (pid="+str(pid)+"), after sys.exit")
    os._exit()
    logger.info("exit_process(): before exit in child process (pid="+str(pid)+"), after os._exit")

#@shared_task(ignore_result=True, time_limit=580, soft_time_limit=550)
@shared_task(ignore_result=True, max_retries=0)
def poll_snmp_statistics():
    from flowspec import snmpstats

    if not snmp_lock_create(0):
      return

    signal.signal(signal.SIGCHLD, handleSIGCHLD)

    pid = os.getpid()
    logger.info("poll_snmp_statistics(): before fork (pid="+str(pid)+")")
    npid = os.fork()
    if npid == -1:
      pass
    elif npid > 0:
      logger.info("poll_snmp_statistics(): returning in parent process (pid="+str(pid)+", npid="+str(npid)+")")
    else:
      logger.info("poll_snmp_statistics(): in child process (pid="+str(pid)+", npid="+str(npid)+")")
      try:
        snmpstats.poll_snmp_statistics()        
      except Exception as e:
        logger.error("poll_snmp_statistics(): exception occured in snmp poll (pid="+str(pid)+", npid="+str(npid)+"): "+str(e))
      snmp_lock_remove()
      #exit_process()
      logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+")")
      exit()
      logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after exit")
      import sys
      sys.exit()
      logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after sys.exit")
      os._exit()
      logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after os._exit")

@shared_task(ignore_result=True, max_retries=0)
def snmp_add_initial_zero_value(rule_id, zero_or_null=True):
    from flowspec import snmpstats

    use_fork = False

    if not use_fork:
      snmpstats.add_initial_zero_value(rule_id, zero_or_null)
    else:
      signal.signal(signal.SIGCHLD, handleSIGCHLD)

      pid = os.getpid()
      logger.info("snmp_add_initial_zero_value(): before fork (pid="+str(pid)+" rule_id="+str(rule_id)+","+str(zero_or_null)+")")
      npid = os.fork()
      if npid == -1:
        pass
      elif npid > 0:
        logger.info("snmp_add_initial_zero_value(): returning in parent process (pid="+str(pid)+", npid="+str(npid)+")")
      else:
        logger.info("snmp_add_initial_zero_value(): in child process (pid="+str(pid)+", npid="+str(npid)+")")

        try:
          snmpstats.add_initial_zero_value(rule_id, zero_or_null)
          logger.info("snmp_add_initial_zero_value(): rule_id="+str(rule_id)+","+str(zero_or_null)+" sucesss")
        except Exception as e:
          logger.error("snmp_add_initial_zero_value(): rule_id="+str(rule_id)+","+str(zero_or_null)+" failed: "+str(e))

        #exit_process()
        logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+")")
        exit()
        logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after exit")
        sys.exit()
        logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after sys.exit")
        os._exit()
        logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after os._exit")


@pytest.mark.skip
@shared_task(ignore_result=True,default_retry_delay=5,max_retries=2,autoretry_for=(TimeoutError,))
def testcelerytask():
    lockname = "/tmp/testlock"
    try:
        os.mkdir(lockname)
        logger.info("testcelerytask: Do something and return")
        return
    except FileExistsError:
        logger.info("testcelerytask: SKipping, raising exception for repeating")
        os.rmdir(lockname)
        raise TimeoutError