Skip to content
Snippets Groups Projects
tasks.py 18.37 KiB
# -*- coding: utf-8 -*- vim:fileencoding=utf-8:
# vim: tabstop=4:shiftwidth=4:softtabstop=4:expandtab

# Copyright (C) 2010-2014 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import pytest
from utils import proxy as PR
from celery import shared_task, subtask
import logging
import json
from django.conf import settings
import datetime
from django.core.mail import send_mail
from django.template.loader import render_to_string
import os
from celery.exceptions import TimeLimitExceeded, SoftTimeLimitExceeded
from ipaddress import *
from os import fork,_exit
from sys import exit
import time
import redis

LOG_FILENAME = os.path.join(settings.LOG_FILE_LOCATION, 'celery_jobs.log')

# FORMAT = '%(asctime)s %(levelname)s: %(message)s'
# logging.basicConfig(format=FORMAT)
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler(LOG_FILENAME)
handler.setFormatter(formatter)
logger.addHandler(handler)

@shared_task(ignore_result=True, autoretry_for=(TimeLimitExceeded, SoftTimeLimitExceeded), retry_backoff=True)
def add(routepk, callback=None):
    from flowspec.models import Route
    route = Route.objects.get(pk=routepk)
    try:
        applier = PR.Applier(route_object=route)
        commit, response = applier.apply()
        if commit:
            status = "ACTIVE"
            #snmp_add_initial_zero_value.delay(str(route.id), True)
            snmp_add_initial_zero_value(str(route.id), True)
        else:
            status = "ERROR"
        route.status = status
        route.response = response
        route.save()
        announce("[%s] Rule add: %s - Result: %s" % (route.applier_username_nice, route.name, response), route.applier, route)
    except TimeLimitExceeded:
        route.status = "ERROR"
        route.response = "Task timeout"
        route.save()
        announce("[%s] Rule add: %s - Result: %s" % (route.applier_username_nice, route.name, route.response), route.applier, route)
    except SoftTimeLimitExceeded:
        route.status = "ERROR"
        route.response = "Task timeout"
        route.save()
        announce("[%s] Rule add: %s - Result: %s" % (route.applier_username_nice, route.name, route.response), route.applier, route)
    except Exception:
        route.status = "ERROR"
        route.response = "Error"
        route.save()
        announce("[%s] Rule add: %s - Result: %s" % (route.applier_username_nice, route.name, route.response), route.applier, route)


@shared_task(ignore_result=True, autoretry_for=(TimeLimitExceeded, SoftTimeLimitExceeded), retry_backoff=True)
def edit(routepk, callback=None):
    from flowspec.models import Route
    route = Route.objects.get(pk=routepk)
    try:
        status_pre = route.status
        logger.info("tasks::edit(): route="+str(route)+", status_pre="+str(status_pre))
        applier = PR.Applier(route_object=route)
        commit, response = applier.apply(operation="replace")
        if commit:
            status = "ACTIVE"
            try:
              #snmp_add_initial_zero_value.delay(str(route.id), True)
              snmp_add_initial_zero_value(str(route.id), True)
            except Exception as e:
              logger.error("tasks::edit(): route="+str(route)+", ACTIVE, add_initial_zero_value failed: "+str(e))
        else:
            status = "ERROR"
        route.status = status
        route.response = response
        route.save()
        announce("[%s] Rule edit: %s - Result: %s" % (route.applier_username_nice, route.name, response), route.applier, route)
    except TimeLimitExceeded:
        route.status = "ERROR"
        route.response = "Task timeout"
        route.save()
        announce("[%s] Rule edit: %s - Result: %s" % (route.applier_username_nice, route.name, route.response), route.applier, route)
    except SoftTimeLimitExceeded:
        route.status = "ERROR"
        route.response = "Task timeout"
        route.save()
        announce("[%s] Rule edit: %s - Result: %s" % (route.applier_username_nice, route.name, route.response), route.applier, route)
    except Exception as e:
        route.status = "ERROR"
        route.response = "Error"
        route.save()
        logger.error(str(e))
        announce("[%s] Rule edit: %s - Result: %s" % (route.applier_username_nice, route.name, route.response), route.applier, route)


@shared_task(ignore_result=True, autoretry_for=(TimeLimitExceeded, SoftTimeLimitExceeded), retry_backoff=True)
def delete(routepk, **kwargs):
    from flowspec.models import Route
    route = Route.objects.get(pk=routepk)
    initial_status = route.status
    try:
        applier = PR.Applier(route_object=route)
        commit, response = applier.apply(operation="delete")
        reason_text = ''
        logger.info("tasks::delete(): initial_status="+str(initial_status))
        if commit and initial_status == "PENDING_TODELETE": # special new case for fully deleting a rule via REST API (only for users/admins authorized by special settings)
            route.status="INACTIVE"
            msg1 = "[%s] Fully deleted route : %s%s- Result %s" % (route.applier, route.name, reason_text, response)
            logger.info("tasks::delete(): FULLY DELETED msg="+msg1)
            announce(msg1, route.applier, route)
            try:
                snmp_add_initial_zero_value(str(route.id), False)
            except Exception as e:
                logger.error("edit(): route="+str(route)+", INACTIVE, add_null_value failed: "+str(e))
            route.delete()
            return
        elif commit: # commit worked, but rule should stay in DB (NOT PENDING_TODELETE)
            route.status="INACTIVE"
            msg1 = "[%s] Deleted route : %s%s- Result %s" % (route.applier, route.name, reason_text, response)
            logger.info("tasks::delete(): DELETED msg="+msg1)
            announce(msg1, route.applier, route)
            try:
                snmp_add_initial_zero_value(str(route.id), False)
            except Exception as e:
                logger.error("edit(): route="+str(route)+", INACTIVE, add_null_value failed: "+str(e))
            route.response = response
            route.save()
            return
        else: # removing rule in NETCONF failed, it is still ACTIVE and also collects statistics
            # NETCONF "delete" operation failed, keep the object in DB
            if "reason" in kwargs and kwargs['reason'] == 'EXPIRED':
                status = 'EXPIRED'
                reason_text = " Reason: %s " % status
            else:
                status = "ERROR"
            route.status = status
            route.response = response
            route.save()
            announce("[%s] Suspending rule : %s%s- Result %s" % (route.applier_username_nice, route.name, reason_text, response), route.applier, route)
    except TimeLimitExceeded:
        route.status = "ERROR"
        route.response = "Task timeout"
        route.save()
        announce("[%s] Suspending rule : %s - Result: %s" % (route.applier_username_nice, route.name, route.response), route.applier, route)
    except SoftTimeLimitExceeded:
        route.status = "ERROR"
        route.response = "Task timeout"
        route.save()
        announce("[%s] Suspending rule : %s - Result: %s" % (route.applier_username_nice, route.name, route.response), route.applier, route)
    except Exception as e:
        logger.error("tasks::edit(): route="+str(route)+", got unexpected exception="+str(e))
        route.status = "ERROR"
        route.response = "Error"
        route.save()
        announce("[%s] Suspending rule : %s - Result: %s" % (route.applier_username_nice, route.name, route.response), route.applier, route)


# May not work in the first place... proxy is not aware of Route models
@shared_task
def batch_delete(routes, **kwargs):
    if routes:
        for route in routes:
            route.status = 'PENDING';route.save()
        applier = PR.Applier(route_objects=routes)
        conf = applier.delete_routes()
        commit, response = applier.apply(configuration=conf)
        reason_text = ''
        if commit:
            status = "INACTIVE"
            if "reason" in kwargs and kwargs['reason'] == 'EXPIRED':
                status = 'EXPIRED'
                reason_text = " Reason: %s " % status
            elif "reason" in kwargs and kwargs['reason'] != 'EXPIRED':
                status = kwargs['reason']
                reason_text = " Reason: %s " % status
        else:
            status = "ERROR"
        for route in routes:
            route.status = status
            route.response = response
            route.expires = datetime.date.today()
            route.save()
            announce("[%s] Rule removal: %s%s- Result %s" % (route.applier_username_nice, route.name, reason_text, response), route.applier, route)
    else:
        return False


@shared_task(ignore_result=True)
def announce(messg, user, route):
    peers = user.userprofile.peers.all()
    username = user.username
    tgt_net = ip_network(route.destination)
    for peer in peers:
        for network in peer.networks.all():
            net = ip_network(network)
            logger.info("ANNOUNCE check ip " + str(ip_network(route.destination)) + str(type(ip_network(route.destination))) + " in net " + str(net) + str(type(net)))
            # check if the target is a subnet of peer range (python3.6 doesn't have subnet_of())
            if tgt_net.network_address >= net.network_address and tgt_net.broadcast_address <= net.broadcast_address:
                username = peer.peer_tag
                logger.info("ANNOUNCE found peer " + str(username))
                break

    messg = str(messg)
    logger.info("ANNOUNCE " + messg)
    r = redis.StrictRedis()
    key = "notifstream_%s" % username
    obj = {"m": messg, "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
    logger.info("ANNOUNCE " + str(obj))
    lastid = r.xadd(key, obj, maxlen=settings.NOTIF_STREAM_MAXSIZE, approximate=False)
    logger.info("ANNOUNCE key " + key + " with lastid " + lastid.decode("utf-8"))
    r.expire(key, settings.NOTIF_STREAM_MAXLIFE)

@shared_task(ignore_result=True,default_retry_delay=5,max_retries=2,autoretry_for=(TimeLimitExceeded, SoftTimeLimitExceeded))
def check_sync(route_name=None, selected_routes=[]):
    from flowspec.models import Route
    if not selected_routes:
        routes = Route.objects.all()
    else:
        routes = selected_routes
    if route_name:
        routes = routes.filter(name=route_name)
    for route in routes:
        if route.has_expired() and (route.status != 'EXPIRED' and route.status != 'ADMININACTIVE' and route.status != 'INACTIVE' and route.status != 'INACTIVE_TODELETE' and route.status != 'PENDING_TODELETE'):
            if route.status != 'ERROR':
                logger.info('Expiring %s route %s' %(route.status, route.name))
                subtask(delete).delay(route, reason="EXPIRED")
        else:
            if route.status != 'EXPIRED':
                route.check_sync()


@shared_task(ignore_result=True)
def notify_expired():
    from flowspec.models import Route
    from django.contrib.sites.models import Site
    logger.info('Initializing expiration notification')
    routes = Route.objects.all()
    for route in routes:
        if route.status not in ['EXPIRED', 'ADMININACTIVE', 'INACTIVE', 'INACTIVE_TODELETE', 'PENDING_TODELETE', 'ERROR']:
            expiration_days = (route.expires - datetime.date.today()).days
            if expiration_days < settings.EXPIRATION_NOTIFY_DAYS:
                try:
                    fqdn = Site.objects.get_current().domain
                    admin_url = "https://%s%s" % \
                    (fqdn,
                     "/edit/%s"%route.name)
                    mail_body = render_to_string("rule_action.txt",
                                             {"route": route, 'expiration_days':expiration_days, 'action':'expires', 'url':admin_url})
                    days_num = ' days'
                    expiration_days_text = "%s %s" %('in',expiration_days)
                    if expiration_days == 0:
                        days_num = ' today'
                        expiration_days_text = ''
                    if expiration_days == 1:
                        days_num = ' day'
                    logger.info('Route %s expires %s%s. Notifying %s (%s)' %(route.name, expiration_days_text, days_num, route.applier.username, route.applier.email))
                    send_mail(settings.EMAIL_SUBJECT_PREFIX + "Rule %s expires %s%s" %
                              (route.name,expiration_days_text, days_num),
                              mail_body, settings.SERVER_EMAIL,
                              [route.applier.email])
                except Exception as e:
                    logger.info("Exception: %s"%e)
                    pass
    logger.info('Expiration notification process finished')

##############################################################################
##############################################################################
# snmp task handling (including helper functions)

import os
import signal

def handleSIGCHLD(signal, frame):
  logger.info("handleSIGCHLD(): reaping childs")
  os.waitpid(-1, os.WNOHANG)

def snmp_lock_create(wait=0):
    first=1
    success=0
    while first or wait:
      first=0
      try:
          os.mkdir(settings.SNMP_POLL_LOCK)
          logger.error("snmp_lock_create(): creating lock dir succeeded")
          success=1
          return success
      except OSError as e:
          logger.error("snmp_lock_create(): creating lock dir failed: OSError: "+str(e))
          success=0
      except Exception as e:
          logger.error("snmp_lock_create(): Lock already exists")
          logger.error("snmp_lock_create(): creating lock dir failed: "+str(e))
          success=0
      if not success and wait:
        time.sleep(1)
    return success;

def snmp_lock_remove():
    try:
      os.rmdir(settings.SNMP_POLL_LOCK)
    except Exception as e:
      logger.info("snmp_lock_remove(): failed "+str(e))

def exit_process():
    import sys
    pid = os.getpid()
    logger.info("exit_process(): before exit in child process (pid="+str(pid)+")")
    exit()
    logger.info("exit_process(): before exit in child process (pid="+str(pid)+"), after exit")
    sys.exit()
    logger.info("exit_process(): before exit in child process (pid="+str(pid)+"), after sys.exit")
    os._exit()
    logger.info("exit_process(): before exit in child process (pid="+str(pid)+"), after os._exit")

#@shared_task(ignore_result=True, time_limit=580, soft_time_limit=550)
@shared_task(ignore_result=True, max_retries=0)
def poll_snmp_statistics():
    from flowspec import snmpstats

    if not snmp_lock_create(0):
      return

    signal.signal(signal.SIGCHLD, handleSIGCHLD)

    pid = os.getpid()
    logger.info("poll_snmp_statistics(): before fork (pid="+str(pid)+")")
    npid = os.fork()
    if npid == -1:
      pass
    elif npid > 0:
      logger.info("poll_snmp_statistics(): returning in parent process (pid="+str(pid)+", npid="+str(npid)+")")
    else:
      logger.info("poll_snmp_statistics(): in child process (pid="+str(pid)+", npid="+str(npid)+")")
      try:
        snmpstats.poll_snmp_statistics()        
      except Exception as e:
        logger.error("poll_snmp_statistics(): exception occured in snmp poll (pid="+str(pid)+", npid="+str(npid)+"): "+str(e))
      snmp_lock_remove()
      #exit_process()
      logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+")")
      exit()
      logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after exit")
      import sys
      sys.exit()
      logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after sys.exit")
      os._exit()
      logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after os._exit")

@shared_task(ignore_result=True, max_retries=0)
def snmp_add_initial_zero_value(rule_id, zero_or_null=True):
    from flowspec import snmpstats

    use_fork = False

    if not use_fork:
      snmpstats.add_initial_zero_value(rule_id, zero_or_null)
    else:
      signal.signal(signal.SIGCHLD, handleSIGCHLD)

      pid = os.getpid()
      logger.info("snmp_add_initial_zero_value(): before fork (pid="+str(pid)+" rule_id="+str(rule_id)+","+str(zero_or_null)+")")
      npid = os.fork()
      if npid == -1:
        pass
      elif npid > 0:
        logger.info("snmp_add_initial_zero_value(): returning in parent process (pid="+str(pid)+", npid="+str(npid)+")")
      else:
        logger.info("snmp_add_initial_zero_value(): in child process (pid="+str(pid)+", npid="+str(npid)+")")

        try:
          snmpstats.add_initial_zero_value(rule_id, zero_or_null)
          logger.info("snmp_add_initial_zero_value(): rule_id="+str(rule_id)+","+str(zero_or_null)+" sucesss")
        except Exception as e:
          logger.error("snmp_add_initial_zero_value(): rule_id="+str(rule_id)+","+str(zero_or_null)+" failed: "+str(e))

        #exit_process()
        logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+")")
        exit()
        logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after exit")
        sys.exit()
        logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after sys.exit")
        os._exit()
        logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after os._exit")


@pytest.mark.skip
@shared_task(ignore_result=True,default_retry_delay=5,max_retries=2,autoretry_for=(TimeoutError,))
def testcelerytask():
    lockname = "/tmp/testlock"
    try:
        os.mkdir(lockname)
        logger.info("testcelerytask: Do something and return")
        return
    except FileExistsError:
        logger.info("testcelerytask: SKipping, raising exception for repeating")
        os.rmdir(lockname)
        raise TimeoutError