-
Tomáš Čejka authored
Added new url for deactivate to split the functionality of deactivation and deletion. Delete process is allowed only for enabled users, the process includes deactivation as well. Reworked "retry" of the Celery tasks and added new settings variable NETCONF_MAX_RETRY_BEFORE_ERROR.
Tomáš Čejka authoredAdded new url for deactivate to split the functionality of deactivation and deletion. Delete process is allowed only for enabled users, the process includes deactivation as well. Reworked "retry" of the Celery tasks and added new settings variable NETCONF_MAX_RETRY_BEFORE_ERROR.
tasks.py 17.40 KiB
# -*- coding: utf-8 -*- vim:fileencoding=utf-8:
# vim: tabstop=4:shiftwidth=4:softtabstop=4:expandtab
# Copyright (C) 2010-2014 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import pytest
from utils import proxy as PR
from celery import shared_task, subtask
import logging
import json
from django.conf import settings
import datetime
from django.core.mail import send_mail
from django.template.loader import render_to_string
import os
from celery.exceptions import TimeLimitExceeded, SoftTimeLimitExceeded
from ipaddress import *
from os import fork,_exit
from sys import exit
import time
import redis
LOG_FILENAME = os.path.join(settings.LOG_FILE_LOCATION, 'celery_jobs.log')
# FORMAT = '%(asctime)s %(levelname)s: %(message)s'
# logging.basicConfig(format=FORMAT)
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler(LOG_FILENAME)
handler.setFormatter(formatter)
logger.addHandler(handler)
@shared_task(ignore_result=True, autoretry_for=(TimeoutError, TimeLimitExceeded, SoftTimeLimitExceeded), retry_backoff=True, retry_kwargs={'max_retries': settings.NETCONF_MAX_RETRY_BEFORE_ERROR})
def add(routepk, callback=None):
from flowspec.models import Route
route = Route.objects.get(pk=routepk)
applier = PR.Applier(route_object=route)
commit, response = applier.apply()
if commit:
route.status = "ACTIVE"
#snmp_add_initial_zero_value.delay(str(route.id), True)
snmp_add_initial_zero_value(str(route.id), True)
else:
if deactivate_route.request.retries < settings.NETCONF_MAX_RETRY_BEFORE_ERROR:
# repeat the action
raise TimeoutError()
route.status = "ERROR"
route.response = response
route.save()
announce("[%s] Rule add: %s - Result: %s" % (route.applier_username_nice, route.name, response), route.applier, route)
@shared_task(ignore_result=True, autoretry_for=(TimeLimitExceeded, SoftTimeLimitExceeded), retry_backoff=True, retry_kwargs={'max_retries': settings.NETCONF_MAX_RETRY_BEFORE_ERROR})
def edit(routepk, callback=None):
from flowspec.models import Route
route = Route.objects.get(pk=routepk)
status_pre = route.status
logger.info("tasks::edit(): route="+str(route)+", status_pre="+str(status_pre))
applier = PR.Applier(route_object=route)
commit, response = applier.apply(operation="replace")
if commit:
route.status = "ACTIVE"
try:
#snmp_add_initial_zero_value.delay(str(route.id), True)
snmp_add_initial_zero_value(str(route.id), True)
except Exception as e:
logger.error("tasks::edit(): route="+str(route)+", ACTIVE, add_initial_zero_value failed: "+str(e))
else:
if deactivate_route.request.retries < settings.NETCONF_MAX_RETRY_BEFORE_ERROR:
# repeat the action
raise TimeoutError()
route.status = "ERROR"
route.response = response
route.save()
announce("[%s] Rule edit: %s - Result: %s" % (route.applier_username_nice, route.name, response), route.applier, route)
@shared_task(ignore_result=True, autoretry_for=(TimeoutError, TimeLimitExceeded, SoftTimeLimitExceeded), retry_backoff=True, retry_kwargs={'max_retries': settings.NETCONF_MAX_RETRY_BEFORE_ERROR})
def deactivate_route(routepk, **kwargs):
"""Deactivate the Route in ACTIVE state. Permissions must be checked before this call."""
from flowspec.models import Route
route = Route.objects.get(pk=routepk)
initial_status = route.status
if initial_status not in ("ACTIVE", "PENDING", "ERROR"):
logger.error("tasks::deactivate(): Cannot deactivate route that is not in ACTIVE or potential ACTIVE status.")
return
applier = PR.Applier(route_object=route)
# Delete from router via NETCONF
commit, response = applier.apply(operation="delete")
reason_text = ''
logger.info("tasks::delete(): initial_status="+str(initial_status))
if commit:
route.status="INACTIVE"
try:
snmp_add_initial_zero_value(str(route.id), False)
except Exception as e:
logger.error("edit(): route="+str(route)+", INACTIVE, add_null_value failed: "+str(e))
announce("[%s] Suspending rule : %s%s- Result %s" % (route.applier_username_nice, route.name, reason_text, response), route.applier, route)
route.status = "INACTIVE"
route.response = response
route.save()
route.commit_deactivate()
return
else: # removing rule in NETCONF failed, it is still ACTIVE and also collects statistics
# NETCONF "delete" operation failed, keep the object in DB
if deactivate_route.request.retries < settings.NETCONF_MAX_RETRY_BEFORE_ERROR:
# repeat the action
raise TimeoutError()
else:
if "reason" in kwargs and kwargs['reason'] == 'EXPIRED':
status = 'EXPIRED'
reason_text = " Reason: %s " % status
else:
status = "ERROR"
route.status = status
route.response = response
route.save()
announce("[%s] Suspending rule : %s%s- Result %s" % (route.applier_username_nice, route.name, reason_text, response), route.applier, route)
@shared_task(ignore_result=True, autoretry_for=(TimeoutError, TimeLimitExceeded, SoftTimeLimitExceeded), retry_backoff=True, retry_kwargs={'max_retries': settings.NETCONF_MAX_RETRY_BEFORE_ERROR})
def delete_route(routepk, **kwargs):
"""For Route in ACTIVE state, deactivate it at first. Finally, delete the Route from the DB. Permissions must be checked before this call."""
from flowspec.models import Route
route = Route.objects.get(pk=routepk)
if route.status != "INACTIVE":
logger.info("Deactivating active route...")
# call deactivate_route() directly since we are already on background (celery task)
try:
deactivate_route(routepk)
except TimeoutError:
pass
if route.status != "INACTIVE" and delete_route.request.retries < settings.NETCONF_MAX_RETRY_BEFORE_ERROR:
# Repeat due to error in deactivation
route.status = "PENDING"
route.save()
logger.error("Deactivation failed, repeat the deletion process.")
raise TimeoutError()
if route.status == "INACTIVE":
logger.info("Deleting inactive route...")
route.delete()
logger.info("Deleting finished.")
else:
route.status = "ERROR"
route.save()
logger.error("Deleting Route failed, it could not be deactivated - remaining in DB.")
return
# May not work in the first place... proxy is not aware of Route models
@shared_task
def batch_delete(routes, **kwargs):
if routes:
for route in routes:
route.status = 'PENDING';route.save()
applier = PR.Applier(route_objects=routes)
conf = applier.delete_routes()
commit, response = applier.apply(configuration=conf)
reason_text = ''
if commit:
status = "INACTIVE"
if "reason" in kwargs and kwargs['reason'] == 'EXPIRED':
status = 'EXPIRED'
reason_text = " Reason: %s " % status
elif "reason" in kwargs and kwargs['reason'] != 'EXPIRED':
status = kwargs['reason']
reason_text = " Reason: %s " % status
else:
status = "ERROR"
for route in routes:
route.status = status
route.response = response
route.expires = datetime.date.today()
route.save()
announce("[%s] Rule removal: %s%s- Result %s" % (route.applier_username_nice, route.name, reason_text, response), route.applier, route)
else:
return False
@shared_task(ignore_result=True)
def announce(messg, user, route):
peers = user.userprofile.peers.all()
username = user.username
tgt_net = ip_network(route.destination)
for peer in peers:
for network in peer.networks.all():
net = ip_network(network)
logger.info("ANNOUNCE check ip " + str(ip_network(route.destination)) + str(type(ip_network(route.destination))) + " in net " + str(net) + str(type(net)))
# check if the target is a subnet of peer range (python3.6 doesn't have subnet_of())
if tgt_net.network_address >= net.network_address and tgt_net.broadcast_address <= net.broadcast_address:
username = peer.peer_tag
logger.info("ANNOUNCE found peer " + str(username))
break
messg = str(messg)
logger.info("ANNOUNCE " + messg)
r = redis.StrictRedis()
key = "notifstream_%s" % username
obj = {"m": messg, "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
logger.info("ANNOUNCE " + str(obj))
lastid = r.xadd(key, obj, maxlen=settings.NOTIF_STREAM_MAXSIZE, approximate=False)
logger.info("ANNOUNCE key " + key + " with lastid " + lastid.decode("utf-8"))
r.expire(key, settings.NOTIF_STREAM_MAXLIFE)
@shared_task(ignore_result=True,default_retry_delay=5,max_retries=2,autoretry_for=(TimeLimitExceeded, SoftTimeLimitExceeded))
def check_sync(route_name=None, selected_routes=[]):
from flowspec.models import Route
if not selected_routes:
routes = Route.objects.all()
else:
routes = selected_routes
if route_name:
routes = routes.filter(name=route_name)
for route in routes:
if route.has_expired() and (route.status != 'EXPIRED' and route.status != 'ADMININACTIVE' and route.status != 'INACTIVE' and route.status != 'INACTIVE_TODELETE' and route.status != 'PENDING_TODELETE'):
if route.status != 'ERROR':
logger.info('Expiring %s route %s' %(route.status, route.name))
subtask(delete).delay(route, reason="EXPIRED")
else:
if route.status != 'EXPIRED':
route.check_sync()
@shared_task(ignore_result=True)
def notify_expired():
from flowspec.models import Route
from django.contrib.sites.models import Site
logger.info('Initializing expiration notification')
routes = Route.objects.all()
for route in routes:
if route.status not in ['EXPIRED', 'ADMININACTIVE', 'INACTIVE', 'INACTIVE_TODELETE', 'PENDING_TODELETE', 'ERROR']:
expiration_days = (route.expires - datetime.date.today()).days
if expiration_days < settings.EXPIRATION_NOTIFY_DAYS:
try:
fqdn = Site.objects.get_current().domain
admin_url = "https://%s%s" % \
(fqdn,
"/edit/%s"%route.name)
mail_body = render_to_string("rule_action.txt",
{"route": route, 'expiration_days':expiration_days, 'action':'expires', 'url':admin_url})
days_num = ' days'
expiration_days_text = "%s %s" %('in',expiration_days)
if expiration_days == 0:
days_num = ' today'
expiration_days_text = ''
if expiration_days == 1:
days_num = ' day'
logger.info('Route %s expires %s%s. Notifying %s (%s)' %(route.name, expiration_days_text, days_num, route.applier.username, route.applier.email))
send_mail(settings.EMAIL_SUBJECT_PREFIX + "Rule %s expires %s%s" %
(route.name,expiration_days_text, days_num),
mail_body, settings.SERVER_EMAIL,
[route.applier.email])
except Exception as e:
logger.info("Exception: %s"%e)
pass
logger.info('Expiration notification process finished')
##############################################################################
##############################################################################
# snmp task handling (including helper functions)
import os
import signal
def handleSIGCHLD(signal, frame):
logger.info("handleSIGCHLD(): reaping childs")
os.waitpid(-1, os.WNOHANG)
def snmp_lock_create(wait=0):
first=1
success=0
while first or wait:
first=0
try:
os.mkdir(settings.SNMP_POLL_LOCK)
logger.error("snmp_lock_create(): creating lock dir succeeded")
success=1
return success
except OSError as e:
logger.error("snmp_lock_create(): creating lock dir failed: OSError: "+str(e))
success=0
except Exception as e:
logger.error("snmp_lock_create(): Lock already exists")
logger.error("snmp_lock_create(): creating lock dir failed: "+str(e))
success=0
if not success and wait:
time.sleep(1)
return success;
def snmp_lock_remove():
try:
os.rmdir(settings.SNMP_POLL_LOCK)
except Exception as e:
logger.info("snmp_lock_remove(): failed "+str(e))
def exit_process():
import sys
pid = os.getpid()
logger.info("exit_process(): before exit in child process (pid="+str(pid)+")")
exit()
logger.info("exit_process(): before exit in child process (pid="+str(pid)+"), after exit")
sys.exit()
logger.info("exit_process(): before exit in child process (pid="+str(pid)+"), after sys.exit")
os._exit()
logger.info("exit_process(): before exit in child process (pid="+str(pid)+"), after os._exit")
#@shared_task(ignore_result=True, time_limit=580, soft_time_limit=550)
@shared_task(ignore_result=True, max_retries=0)
def poll_snmp_statistics():
from flowspec import snmpstats
if not snmp_lock_create(0):
return
signal.signal(signal.SIGCHLD, handleSIGCHLD)
pid = os.getpid()
logger.info("poll_snmp_statistics(): before fork (pid="+str(pid)+")")
npid = os.fork()
if npid == -1:
pass
elif npid > 0:
logger.info("poll_snmp_statistics(): returning in parent process (pid="+str(pid)+", npid="+str(npid)+")")
else:
logger.info("poll_snmp_statistics(): in child process (pid="+str(pid)+", npid="+str(npid)+")")
try:
snmpstats.poll_snmp_statistics()
except Exception as e:
logger.error("poll_snmp_statistics(): exception occured in snmp poll (pid="+str(pid)+", npid="+str(npid)+"): "+str(e))
snmp_lock_remove()
#exit_process()
logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+")")
exit()
logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after exit")
import sys
sys.exit()
logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after sys.exit")
os._exit()
logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after os._exit")
@shared_task(ignore_result=True, max_retries=0)
def snmp_add_initial_zero_value(rule_id, zero_or_null=True):
from flowspec import snmpstats
use_fork = False
if not use_fork:
snmpstats.add_initial_zero_value(rule_id, zero_or_null)
else:
signal.signal(signal.SIGCHLD, handleSIGCHLD)
pid = os.getpid()
logger.info("snmp_add_initial_zero_value(): before fork (pid="+str(pid)+" rule_id="+str(rule_id)+","+str(zero_or_null)+")")
npid = os.fork()
if npid == -1:
pass
elif npid > 0:
logger.info("snmp_add_initial_zero_value(): returning in parent process (pid="+str(pid)+", npid="+str(npid)+")")
else:
logger.info("snmp_add_initial_zero_value(): in child process (pid="+str(pid)+", npid="+str(npid)+")")
try:
snmpstats.add_initial_zero_value(rule_id, zero_or_null)
logger.info("snmp_add_initial_zero_value(): rule_id="+str(rule_id)+","+str(zero_or_null)+" sucesss")
except Exception as e:
logger.error("snmp_add_initial_zero_value(): rule_id="+str(rule_id)+","+str(zero_or_null)+" failed: "+str(e))
#exit_process()
logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+")")
exit()
logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after exit")
sys.exit()
logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after sys.exit")
os._exit()
logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after os._exit")
@pytest.mark.skip
@shared_task(ignore_result=True,default_retry_delay=5,max_retries=2,autoretry_for=(TimeoutError,))
def testcelerytask():
lockname = "/tmp/testlock"
try:
os.mkdir(lockname)
logger.info("testcelerytask: Do something and return")
return
except FileExistsError:
logger.info("testcelerytask: SKipping, raising exception for repeating")
os.rmdir(lockname)
raise TimeoutError