Skip to content
Snippets Groups Projects
Commit e4b3ad0b authored by Tomáš Čejka's avatar Tomáš Čejka
Browse files

check_sync: retry jobs in celery on error

Improved parameters of celery tasks, added testing task (without implicitly
enabling it by settigs.py)
parent eab8659f
Branches
Tags
No related merge requests found
...@@ -45,7 +45,7 @@ handler = logging.FileHandler(LOG_FILENAME) ...@@ -45,7 +45,7 @@ handler = logging.FileHandler(LOG_FILENAME)
handler.setFormatter(formatter) handler.setFormatter(formatter)
logger.addHandler(handler) logger.addHandler(handler)
@shared_task(ignore_result=True) @shared_task(ignore_result=True, autoretry_for=(TimeLimitExceeded, SoftTimeLimitExceeded), retry_backoff=True)
def add(routepk, callback=None): def add(routepk, callback=None):
from flowspec.models import Route from flowspec.models import Route
route = Route.objects.get(pk=routepk) route = Route.objects.get(pk=routepk)
...@@ -78,7 +78,7 @@ def add(routepk, callback=None): ...@@ -78,7 +78,7 @@ def add(routepk, callback=None):
announce("[%s] Rule add: %s - Result: %s" % (route.applier_username_nice, route.name, route.response), route.applier, route) announce("[%s] Rule add: %s - Result: %s" % (route.applier_username_nice, route.name, route.response), route.applier, route)
@shared_task(ignore_result=True) @shared_task(ignore_result=True, autoretry_for=(TimeLimitExceeded, SoftTimeLimitExceeded), retry_backoff=True)
def edit(routepk, callback=None): def edit(routepk, callback=None):
from flowspec.models import Route from flowspec.models import Route
route = Route.objects.get(pk=routepk) route = Route.objects.get(pk=routepk)
...@@ -117,7 +117,7 @@ def edit(routepk, callback=None): ...@@ -117,7 +117,7 @@ def edit(routepk, callback=None):
announce("[%s] Rule edit: %s - Result: %s" % (route.applier_username_nice, route.name, route.response), route.applier, route) announce("[%s] Rule edit: %s - Result: %s" % (route.applier_username_nice, route.name, route.response), route.applier, route)
@shared_task(ignore_result=True) @shared_task(ignore_result=True, autoretry_for=(TimeLimitExceeded, SoftTimeLimitExceeded), retry_backoff=True)
def delete(routepk, **kwargs): def delete(routepk, **kwargs):
from flowspec.models import Route from flowspec.models import Route
route = Route.objects.get(pk=routepk) route = Route.objects.get(pk=routepk)
...@@ -212,7 +212,7 @@ def announce(messg, user, route): ...@@ -212,7 +212,7 @@ def announce(messg, user, route):
logger.info("ANNOUNCE key " + key + " with lastid " + lastid.decode("utf-8")) logger.info("ANNOUNCE key " + key + " with lastid " + lastid.decode("utf-8"))
r.expire(key, settings.NOTIF_STREAM_MAXLIFE) r.expire(key, settings.NOTIF_STREAM_MAXLIFE)
@shared_task @shared_task(ignore_result=True,default_retry_delay=5,max_retries=2,autoretry_for=(TimeLimitExceeded, SoftTimeLimitExceeded))
def check_sync(route_name=None, selected_routes=[]): def check_sync(route_name=None, selected_routes=[]):
from flowspec.models import Route from flowspec.models import Route
if not selected_routes: if not selected_routes:
...@@ -380,3 +380,15 @@ def snmp_add_initial_zero_value(rule_id, zero_or_null=True): ...@@ -380,3 +380,15 @@ def snmp_add_initial_zero_value(rule_id, zero_or_null=True):
logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after os._exit") logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after os._exit")
@shared_task(ignore_result=True,default_retry_delay=5,max_retries=2,autoretry_for=(TimeoutError,))
def testcelerytask():
lockname = "/tmp/testlock"
try:
os.mkdir(lockname)
logger.info("testcelerytask: Do something and return")
return
except FileExistsError:
logger.info("testcelerytask: SKipping, raising exception for repeating")
os.rmdir(lockname)
raise TimeoutError
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment