diff --git a/flowspec/admin.py b/flowspec/admin.py index f6c3ad9e3078805419f162345a49e5042dab8c69..35052d16d695702d1c347342898056f231187dd9 100644 --- a/flowspec/admin.py +++ b/flowspec/admin.py @@ -5,7 +5,7 @@ from utils import proxy as PR class RouteAdmin(admin.ModelAdmin): actions = ['deactivate'] - + def deactivate(self, request, queryset): applier = PR.Applier(route_objects=queryset) commit, response = applier.apply(configuration=applier.delete_routes()) @@ -19,7 +19,7 @@ class RouteAdmin(admin.ModelAdmin): list_display = ('name', 'is_online', 'applier', 'get_match', 'get_then', 'response') fieldsets = [ - (None, {'fields': ['name',]}), + (None, {'fields': ['name','applier']}), ("Match", {'fields': ['source', 'sourceport', 'destination', 'destinationport', 'port']}), ('Advanced Match Statements', {'fields': ['dscp', 'fragmenttype', 'icmpcode', 'icmptype', 'packetlength', 'protocol', 'tcpflag'], 'classes': ['collapse']}), ("Then", {'fields': ['then' ]}), diff --git a/flowspec/forms.py b/flowspec/forms.py new file mode 100644 index 0000000000000000000000000000000000000000..bdd78a6941c311c61689e7a2bcd53cd2ecd69d8c --- /dev/null +++ b/flowspec/forms.py @@ -0,0 +1,57 @@ +from django import forms +from django.utils.safestring import mark_safe +from django.utils.translation import ugettext as _ +from django.utils.translation import ugettext_lazy +from django.template.defaultfilters import filesizeformat + +from flowspy.flowspec.models import * +from ipaddr import * + +class RouteForm(forms.ModelForm): +# name = forms.CharField(help_text=ugettext_lazy("A unique route name," +# " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False) +# source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation," +# " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False) +# source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False) +# destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation," +# " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False) +# destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False) +# ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False) + class Meta: + model = Route + + def clean_source(self): + data = self.cleaned_data['source'] + if data: + try: + address = IPNetwork(data) + return self.cleaned_data["source"] + except Exception: + raise forms.ValidationError('Invalid network address format') + + def clean_destination(self): + data = self.cleaned_data['destination'] + if data: + try: + address = IPNetwork(data) + return self.cleaned_data["destination"] + except Exception: + raise forms.ValidationError('Invalid network address format') + + def clean(self): + source = self.cleaned_data.get('source', None) + sourceports = self.cleaned_data.get('sourceport', None) + ports = self.cleaned_data.get('port', None) + destination = self.cleaned_data.get('destination', None) + destinationports = self.cleaned_data.get('destinationport', None) + if (sourceports and ports): + raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports') + if (destinationports and ports): + raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports') + if sourceports and not source: + raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address') + if destinationports and not destination: + raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address') + if not (source or sourceports or ports or destination or destinationports): + raise forms.ValidationError('Fill at least a Route Match Condition') + return self.cleaned_data \ No newline at end of file diff --git a/flowspec/models.py b/flowspec/models.py index b3f917b84d3926dd943a89303cad75ed52a7dd0b..257c66038b7ed851db8efc708e8716b50d63f850 100644 --- a/flowspec/models.py +++ b/flowspec/models.py @@ -8,6 +8,8 @@ from utils import proxy as PR from ipaddr import * from datetime import * import logging +from flowspec.tasks import * +from time import sleep FORMAT = '%(asctime)s %(levelname)s: %(message)s' logging.basicConfig(format=FORMAT) @@ -61,7 +63,7 @@ class ThenAction(models.Model): class Route(models.Model): name = models.CharField(max_length=128) - applier = models.ForeignKey(User) + applier = models.ForeignKey(User, blank=True, null=True) source = models.CharField(max_length=32, blank=True, null=True, help_text=u"Network address. Use address/CIDR notation", verbose_name="Source Address") sourceport = models.ManyToManyField(MatchPort, blank=True, null=True, related_name="matchSourcePort", verbose_name="Source Port") destination = models.CharField(max_length=32, blank=True, null=True, help_text=u"Network address. Use address/CIDR notation", verbose_name="Destination Address") @@ -79,7 +81,7 @@ class Route(models.Model): last_updated = models.DateTimeField(auto_now=True) is_online = models.BooleanField(default=False) is_active = models.BooleanField(default=False) - expires = models.DateField(default=days_offset) + expires = models.DateField(default=days_offset, blank=True, null=True,) response = models.CharField(max_length=512, blank=True, null=True) comments = models.TextField(null=True, blank=True, verbose_name="Comments") @@ -106,17 +108,24 @@ class Route(models.Model): except Exception: raise ValidationError('Invalid network address format at Source Field') - def save(self, *args, **kwargs): - applier = PR.Applier(route_object=self) - commit, response = applier.apply() - if commit: - self.is_online = True - self.is_active = True - self.response = response - else: - self.is_online = False - self.response = response - super(Route, self).save(*args, **kwargs) +# def save(self, *args, **kwargs): +# edit = False +# if self.pk: +# #This is an edit +# edit = True +# super(Route, self).save(*args, **kwargs) +# if not edit: +# response = add.delay(self) +# logger.info("Got save job id: %s" %response) + + def commit_add(self, *args, **kwargs): + response = add.delay(self) + logger.info("Got save job id: %s" %response) +# +# def delete(self, *args, **kwargs): +# response = delete.delay(self) +# logger.info("Got delete job id: %s" %response) + def is_synced(self): diff --git a/flowspec/tasks.py b/flowspec/tasks.py new file mode 100644 index 0000000000000000000000000000000000000000..3a997af23a09f64fdeed26f23f66c744e00560cf --- /dev/null +++ b/flowspec/tasks.py @@ -0,0 +1,40 @@ +from utils import proxy as PR +from celery.task import task + +@task +def add(route): + applier = PR.Applier(route_object=route) + commit, response = applier.apply() + if commit: + is_online = True + is_active = True + else: + is_online = False + is_active = True + route.is_online = is_online + route.is_active = is_active + route.response = response + route.save() +# +#@task +#def delete(route): +# +# applier = PR.Applier(route_object=route) +# commit, response = applier.apply(configuration=applier.delete_routes()) +# if commit: +# rows = queryset.update(is_online=False, is_active=False) +# queryset.update(response="Successfully removed route from network") +# self.message_user(request, "Successfully removed %s routes from network" % rows) +# else: +# self.message_user(request, "Could not remove routes from network") +# if commit: +# is_online = False +# is_active = False +# response = "Successfully removed route from network" +# else: +# is_online = False +# is_active = True +# route.is_online = is_online +# route.is_active = is_active +# route.response = response +# route.save() \ No newline at end of file diff --git a/flowspec/tests.py b/flowspec/tests.py deleted file mode 100644 index 2247054b354559ab535df60bb5dc65c2aa5be686..0000000000000000000000000000000000000000 --- a/flowspec/tests.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -This file demonstrates two different styles of tests (one doctest and one -unittest). These will both pass when you run "manage.py test". - -Replace these with more appropriate tests for your application. -""" - -from django.test import TestCase - -class SimpleTest(TestCase): - def test_basic_addition(self): - """ - Tests that 1 + 1 always equals 2. - """ - self.failUnlessEqual(1 + 1, 2) - -__test__ = {"doctest": """ -Another way to test that 1 + 1 is equal to 2. - ->>> 1 + 1 == 2 -True -"""} - diff --git a/flowspec/views.py b/flowspec/views.py index 77ae75a27b8a40e630b85dcbb3cbe83f3e7a18cf..d8216fb08b3626f1da138cbb837fe53f4d503d6a 100644 --- a/flowspec/views.py +++ b/flowspec/views.py @@ -3,8 +3,8 @@ import urllib2 import re import socket from django import forms -from django.core.cache import cache from django.views.decorators.csrf import csrf_exempt +from django.core import urlresolvers from django.contrib.auth.decorators import login_required from django.http import HttpResponseRedirect, HttpResponseForbidden, HttpResponse from django.shortcuts import get_object_or_404, render_to_response @@ -15,14 +15,35 @@ from django.utils import simplejson from django.core.urlresolvers import reverse from django.contrib import messages +from flowspy.flowspec.forms import * from flowspy.flowspec.models import * +def days_offset(): return datetime.now() + timedelta(days = settings.EXPIRATION_DAYS_OFFSET) + def user_routes(request): if request.user.is_anonymous(): return HttpResponseRedirect(reverse('login')) user_routes = Route.objects.filter(applier=request.user) - print user_routes return render_to_response('user_routes.html', {'routes': user_routes}, context_instance=RequestContext(request)) +def add_route(request): + if request.method == "GET": + form = RouteForm() + return render_to_response('apply.html', {'form': form}, + context_instance=RequestContext(request)) + + else: + form = RouteForm(request.POST) + if form.is_valid(): + route=form.save(commit=False) + route.applier = request.user + route.expires = days_offset() + route.save() + form.save_m2m() + route.commit_add() + return HttpResponseRedirect(urlresolvers.reverse("user-routes")) + else: + return render_to_response('apply.html', {'form': form}, + context_instance=RequestContext(request)) diff --git a/flowspec_dev.db b/flowspec_dev.db index 9a32e30710d85f24c691b2c2a8e55e0697254adc..fdb41ff491466431e7a3eddb16d82718f09bcdc7 100644 Binary files a/flowspec_dev.db and b/flowspec_dev.db differ diff --git a/templates/apply.html b/templates/apply.html index 23b15f1b0bd5353cf5820b83200368cb66eed3f7..5c83c45d5075d85681aade07ee3cbee3ca95fe5f 100644 --- a/templates/apply.html +++ b/templates/apply.html @@ -1,7 +1,7 @@ {% extends "base.html" %} {% load i18n %} -{% block title %}{% trans "Create new Virtual Machine" %}{% endblock %} -{% block breadcrumbs %}:: {% trans "Create Instance" %}{% endblock %} +{% block title %}{% trans "Create new Route" %}{% endblock %} +{% block breadcrumbs %}:: {% trans "Create Route" %}{% endblock %} {% block content %} <style type="text/css"> th { @@ -17,80 +17,54 @@ th { </style> <div align="center"> -<h3>{% trans "Apply for a new instance" %}</h3> +<h3>{% trans "Apply for a new route" %}</h3> <form method="POST"> {% csrf_token %} +{% if form.non_field_errors %} +<p class="error">{{ form.non_field_errors|join:", "}}</p> +{% endif %} <fieldset> -<legend>{% trans "Instance information" %}</legend> + <legend>{% trans "Route Basic Info" %}</legend> <table> -<tr><th>{{ form.hostname.label_tag }}</th><td>{{ form.hostname }}<span class="error">{{ form.hostname.errors|join:", " }}</span></td></tr> -<tr class="help"><td></td><td>{{ form.hostname.help_text }}</td></tr> -<tr><th>{{ form.memory.label_tag }}</th><td>{{ form.memory }}<span class="error">{{ form.memory.errors|join:", " }}</span></td></tr> -<tr><th>{{ form.vcpus.label_tag }}</th><td>{{ form.vcpus }}<span class="error">{{ form.vcpus.errors|join:", " }}</span></td></tr> -<tr><th>{{ form.disk_size.label_tag }}</th><td>{{ form.disk_size }}<span class="error">{{ form.disk_size.errors|join:", " }}</span></td></tr> -<tr class="help"><td></td><td>{{ form.disk_size.help_text }}</td></tr> -<tr><th>{{ form.hosts_mail_server.label_tag }}</th><td>{{ form.hosts_mail_server }}<span class="error">{{ form.hosts_mail_server.errors|join:", " }}</span></td></tr> -<tr class="help"><td></td><td>{{ form.hosts_mail_server.help_text }}</td></tr> -<tr><th>{{ form.operating_system.label_tag }}</th><td>{{ form.operating_system }}<span class="error">{{ form.operating_system.errors|join:", " }}</span></td></tr> -{% if form.network %} -<tr><th>{{ form.network.label_tag }}</th><td>{{ form.network }}<span class="error">{{ form.network.errors|join:", " }}</span></td></tr> -<tr class="help"><td></td><td>{{ form.network.help_text|linebreaks }}</td></tr> -{% endif %} +<tr><th>{{ form.name.label_tag }}</th><td>{{ form.name }}<span class="error">{{ form.name.errors|join:", " }}</span></td></tr> +<tr class="help"><td></td><td>{{ form.name.help_text }}</td></tr> </table> </fieldset> <fieldset> -<legend>{% trans "Use/Comments" %}</legend> -{% blocktrans %} -<p>Give a short description of the intended use of this virtual machine, that justifies the parameter selection above. Feel free to include any additional comments.</p> -{% endblocktrans %} -<p>{{ form.comments }} -{% if form.errors %}<br /><span class="error">{{ form.comments.errors|join:", " }}</span>{% endif %} -</p> +<legend>{% trans "Route Match Conditions" %}</legend> +<table> +<tr><th>{{ form.source.label_tag }}</th><td>{{ form.source }}<span class="error">{{ form.source.errors|join:", " }}</span></td></tr> +<tr class="help"><td></td><td>{{ form.source.help_text }}</td></tr> +<tr><th>{{ form.sourceport.label_tag }}</th><td>{{ form.sourceport }}<span class="error">{{ form.sourceport.errors|join:", " }}</span></td></tr> +<tr class="help"><td></td><td>{{ form.sourceport.help_text }}</td></tr> +<tr><th>{{ form.destination.label_tag }}</th><td>{{ form.destination }}<span class="error">{{ form.destination.errors|join:", " }}</span></td></tr> +<tr class="help"><td></td><td>{{ form.destination.help_text }}</td></tr> +<tr><th>{{ form.destinationport.label_tag }}</th><td>{{ form.destinationport }}<span class="error">{{ form.destinationport.errors|join:", " }}</span></td></tr> +<tr class="help"><td></td><td>{{ form.destinationport.help_text }}</td></tr> +<tr><th>{{ form.port.label_tag }}</th><td>{{ form.port }}<span class="error">{{ form.port.errors|join:", " }}</span></td></tr> +<tr class="help"><td></td><td>{{ form.port.help_text }}</td></tr> +</table> </fieldset> - <fieldset> -<legend>{% trans "Administrative contact" %}</legend> -{% blocktrans %} -<p>If you are applying on behalf of a NOC under GRNET's constituency, please select the appropriate organization. Otherwise, fill-in the admin contact information below.</p> -{% endblocktrans %} - -{% if form.non_field_errors %} -<p class="error">{{ form.non_field_errors|join:", "}}</p> -{% endif %} - +<legend>{% trans "Route Actions" %}</legend> <table> -<tr><th>{{ form.organization.label_tag }}</th><td>{{ form.organization }}<span class="error">{{ form.organization.errors|join:", " }}</span></td></tr> - - -<tr><td colspan="3"><div align="center">{% trans "OR" %}</div></td></tr> - - -<tr><th colspan="3"><div align="center">{% trans "Administrative contact" %}</div></th></tr> -<tr><th>{% trans "Name" %}</th><td>{{ form.admin_contact_name }}<span class="error">{{ form.admin_contact_name.errors|join:", " }}</span></td></tr> -<tr><th>E-mail</th><td>{{ form.admin_contact_email }}<span class="error">{{ form.admin_contact_email.errors|join:", " }}</span></td></tr> -<tr><th>{% trans "Phone" %}</th><td>{{ form.admin_contact_phone }}<span class="error">{{ form.admin_contact_phone.errors|join:", " }}</span></td></tr> +<tr><th>{{ form.then.label_tag }}</th><td>{{ form.then }}<span class="error">{{ form.then.errors|join:", " }}</span></td></tr> +<tr class="help"><td></td><td>{{ form.then.help_text }}</td></tr> </table> </fieldset> - <fieldset> -<legend>{% trans "Miscellaneous" %}</legend> +<legend>{% trans "Use/Comments" %}</legend> {% blocktrans %} -<p>We kindly remind you of the following:</p> -<ul align="left"> -<li>You are solely responsible for the data on your VM. You have to take care of back-ups etc.</li> -<li>We reserve the right to temporarily suspend the operation of your VM in case it causes malfunctions to our infrastructure</li> -</ul> +<p>Give a short description of the intended use of this route, that justifies the parameter selection above. Feel free to include any additional comments.</p> {% endblocktrans %} -<p>{{ form.accept_tos }} {% trans "I have read the above and accept them, along with the" %} <a href="/about/terms-of-service/" target="_blank">{% trans "Terms of Service" %}</a></p> -{% if form.accept_tos.errors %} -<p class="error"> -{% trans "You must accept the terms of service before continuing." %} +<p>{{ form.comments }} +{% if form.errors %}<br /><span class="error">{{ form.comments.errors|join:", " }}</span>{% endif %} </p> -{% endif %} </fieldset> <p><input type="submit" value="{% trans "Apply" %}" /></p> </form> </div> + {% endblock %} diff --git a/templates/base.html b/templates/base.html index 19973f064439c1b1378b8d42be79bd080fec7dc0..9ce5e2e2859a281119d693096215c1d3dacd08b7 100644 --- a/templates/base.html +++ b/templates/base.html @@ -2,6 +2,8 @@ <html> <head> <title>GRNET's FoD :: {% block title %}{% endblock %} </title> +<META HTTP-EQUIV="Pragma" CONTENT="no-cache"> +<META HTTP-EQUIV="Expires" CONTENT="-1"> <script src="/static/js/jquery.min.js" type="text/javascript"></script> <link rel="stylesheet" type="text/css" href="/static/css/base.css"> <link rel="stylesheet" type="text/css" href="/static/css/smoothness/jquery-ui-1.8.13.custom.css"> diff --git a/templates/user_routes.html b/templates/user_routes.html index 6fe303d7083952855ba4a7c4cb4683f841f398d6..c29c4f7c675b7e73520c7ac1ae58b0e4e55d759b 100644 --- a/templates/user_routes.html +++ b/templates/user_routes.html @@ -4,6 +4,12 @@ <script type="text/javascript" src="/static/js/jquery.dataTables.js"></script> <script type="text/javascript"> $(document).ready( function(){ + $('#create_dialog').dialog({ + height: 400, + width: 500, + modal: true, + autoOpen: false, + }); $('#routes_table').dataTable( { "bJQueryUI": true, "oLanguage": { @@ -11,12 +17,26 @@ }, "iDisplayLength": 25, } ); + $( ".button_place #routebutton" ).button({ + icons: { + primary: "ui-icon-circle-plus" + }, + }); }); + + + + </script> {% endblock %} {% block title %}{% trans "My routes" %}{% endblock %} {% block content %} -<h3>{% trans "My routes" %}</h3> +<div style="float:left"> + <h3 style="margin-top: 0px;">{% trans "My routes" %}</h3> +</div> +<div class='button_place' style="float:right"> + <a href="{% url add-route %}" id="routebutton">Add Route</a> +</div> <table class="display" width="100%" id="routes_table"> <thead> @@ -27,6 +47,7 @@ <th style="text-align: center;">{% trans "Status" %}</th> {% comment %}<th style="text-align: center;">{% trans "Details" %}</th>{% endcomment %} <th style="text-align: center;">{% trans "Expires" %}</th> + <th style="text-align: center;">{% trans "Response" %}</th> </tr> </thead> @@ -40,10 +61,15 @@ <td style="text-align: center;">{% if route.is_online %}Online{% else %}Offline{% endif %}</td> {% comment %}<td style="text-align: center;">{{ route.response }}</td>{% endcomment %} <td style="text-align: center;">{{ route.expires }}</td> + <td style="text-align: center;">{{ route.response }}</td> </tr> {% endfor %} </tbody> </table> +<div id="create_dialog" title="Add a new Route"> + KOKO + </div> + {% endblock %} diff --git a/urls.py b/urls.py index 7fa854899c0acfdd1e6e0798617d1bce24524ec0..8cc86e6551b24da24e763a8395eab0b7950dd5c2 100644 --- a/urls.py +++ b/urls.py @@ -8,6 +8,7 @@ urlpatterns = patterns('', # Example: # (r'^flowspy/', include('flowspy.foo.urls')), url(r'^/?$', 'flowspy.flowspec.views.user_routes', name="user-routes"), + url(r'^add/?$', 'flowspy.flowspec.views.add_route', name="add-route"), url(r'^user/login/?', 'django.contrib.auth.views.login', {'template_name': 'login.html'}, name="login"), url(r'^user/logout/?', 'django.contrib.auth.views.logout', {'next_page': '/'}, name="logout"), (r'^setlang/?$', 'django.views.i18n.set_language'), diff --git a/utils/beanstalkc.py b/utils/beanstalkc.py new file mode 100644 index 0000000000000000000000000000000000000000..bb976df708e5eda11e96ae91c4925a45161b8826 --- /dev/null +++ b/utils/beanstalkc.py @@ -0,0 +1,328 @@ +#!/usr/bin/env python +"""beanstalkc - A beanstalkd Client Library for Python""" + +__license__ = ''' +Copyright (C) 2008-2010 Andreas Bolka + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +__version__ = '0.2.0' + +import logging +import socket +import re + + +DEFAULT_HOST = 'localhost' +DEFAULT_PORT = 11300 +DEFAULT_PRIORITY = 2**31 +DEFAULT_TTR = 120 +DEFAULT_TIMEOUT = 1 + + +class BeanstalkcException(Exception): pass +class UnexpectedResponse(BeanstalkcException): pass +class CommandFailed(BeanstalkcException): pass +class DeadlineSoon(BeanstalkcException): pass +class SocketError(BeanstalkcException): pass + + +class Connection(object): + def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, + connection_timeout=DEFAULT_TIMEOUT): + self._socket = None + self.host = host + self.port = port + self.connection_timeout = connection_timeout + self.connect() + + def connect(self): + """Connect to beanstalkd server, unless already connected.""" + if not self.closed: + return + try: + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.settimeout(self.connection_timeout) + self._socket.connect((self.host, self.port)) + self._socket.settimeout(None) + self._socket_file = self._socket.makefile('rb') + except socket.error, e: + self._socket = None + raise SocketError(e) + + def close(self): + """Close connection to server, if it is open.""" + if self.closed: + return + try: + self._socket.sendall('quit\r\n') + self._socket.close() + except socket.error: + pass + finally: + self._socket = None + + @property + def closed(self): + return self._socket is None + + def _interact(self, command, expected_ok, expected_err=[], size_field=None): + try: + self._socket.sendall(command) + status, results = self._read_response() + if status in expected_ok: + if size_field is not None: + results.append(self._read_body(int(results[size_field]))) + return results + elif status in expected_err: + raise CommandFailed(command.split()[0], status, results) + else: + raise UnexpectedResponse(command.split()[0], status, results) + except socket.error, e: + self.close() + raise SocketError(e) + + def _read_response(self): + line = self._socket_file.readline() + if not line: + raise socket.error('no data read') + response = line.split() + return response[0], response[1:] + + def _read_body(self, size): + body = self._socket_file.read(size) + self._socket_file.read(2) # trailing crlf + if size > 0 and not body: + raise socket.error('no data read') + return body + + def _interact_value(self, command, expected_ok, expected_err=[]): + return self._interact(command, expected_ok, expected_err)[0] + + def _interact_job(self, command, expected_ok, expected_err, reserved=True): + jid, _, body = self._interact(command, expected_ok, expected_err, + size_field=1) + return Job(self, int(jid), body, reserved) + + def _interact_yaml_dict(self, command, expected_ok, expected_err=[]): + _, body, = self._interact(command, expected_ok, expected_err, + size_field=0) + return parse_yaml_dict(body) + + def _interact_yaml_list(self, command, expected_ok, expected_err=[]): + _, body, = self._interact(command, expected_ok, expected_err, + size_field=0) + return parse_yaml_list(body) + + def _interact_peek(self, command): + try: + return self._interact_job(command, ['FOUND'], ['NOT_FOUND'], False) + except CommandFailed, (_, status, results): + return None + + # -- public interface -- + + def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=DEFAULT_TTR): + """Put a job into the current tube. Returns job id.""" + assert isinstance(body, str), 'Job body must be a str instance' + jid = self._interact_value( + 'put %d %d %d %d\r\n%s\r\n' % + (priority, delay, ttr, len(body), body), + ['INSERTED', 'BURIED'], ['JOB_TOO_BIG']) + return int(jid) + + def reserve(self, timeout=None): + """Reserve a job from one of the watched tubes, with optional timeout in + seconds. Returns a Job object, or None if the request times out.""" + if timeout is not None: + command = 'reserve-with-timeout %d\r\n' % timeout + else: + command = 'reserve\r\n' + try: + return self._interact_job(command, + ['RESERVED'], + ['DEADLINE_SOON', 'TIMED_OUT']) + except CommandFailed, (_, status, results): + if status == 'TIMED_OUT': + return None + elif status == 'DEADLINE_SOON': + raise DeadlineSoon(results) + + def kick(self, bound=1): + """Kick at most bound jobs into the ready queue.""" + return int(self._interact_value('kick %d\r\n' % bound, ['KICKED'])) + + def peek(self, jid): + """Peek at a job. Returns a Job, or None.""" + return self._interact_peek('peek %d\r\n' % jid) + + def peek_ready(self): + """Peek at next ready job. Returns a Job, or None.""" + return self._interact_peek('peek-ready\r\n') + + def peek_delayed(self): + """Peek at next delayed job. Returns a Job, or None.""" + return self._interact_peek('peek-delayed\r\n') + + def peek_buried(self): + """Peek at next buried job. Returns a Job, or None.""" + return self._interact_peek('peek-buried\r\n') + + def tubes(self): + """Return a list of all existing tubes.""" + return self._interact_yaml_list('list-tubes\r\n', ['OK']) + + def using(self): + """Return a list of all tubes currently being used.""" + return self._interact_value('list-tube-used\r\n', ['USING']) + + def use(self, name): + """Use a given tube.""" + return self._interact_value('use %s\r\n' % name, ['USING']) + + def watching(self): + """Return a list of all tubes being watched.""" + return self._interact_yaml_list('list-tubes-watched\r\n', ['OK']) + + def watch(self, name): + """Watch a given tube.""" + return int(self._interact_value('watch %s\r\n' % name, ['WATCHING'])) + + def ignore(self, name): + """Stop watching a given tube.""" + try: + return int(self._interact_value('ignore %s\r\n' % name, + ['WATCHING'], + ['NOT_IGNORED'])) + except CommandFailed: + return 1 + + def stats(self): + """Return a dict of beanstalkd statistics.""" + return self._interact_yaml_dict('stats\r\n', ['OK']) + + def stats_tube(self, name): + """Return a dict of stats about a given tube.""" + return self._interact_yaml_dict('stats-tube %s\r\n' % name, + ['OK'], + ['NOT_FOUND']) + + def pause_tube(self, name, delay): + """Pause a tube for a given delay time, in seconds.""" + self._interact('pause-tube %s %d\r\n' %(name, delay), + ['PAUSED'], + ['NOT_FOUND']) + + # -- job interactors -- + + def delete(self, jid): + """Delete a job, by job id.""" + self._interact('delete %d\r\n' % jid, ['DELETED'], ['NOT_FOUND']) + + def release(self, jid, priority=DEFAULT_PRIORITY, delay=0): + """Release a reserved job back into the ready queue.""" + self._interact('release %d %d %d\r\n' % (jid, priority, delay), + ['RELEASED', 'BURIED'], + ['NOT_FOUND']) + + def bury(self, jid, priority=DEFAULT_PRIORITY): + """Bury a job, by job id.""" + self._interact('bury %d %d\r\n' % (jid, priority), + ['BURIED'], + ['NOT_FOUND']) + + def touch(self, jid): + """Touch a job, by job id, requesting more time to work on a reserved + job before it expires.""" + self._interact('touch %d\r\n' % jid, ['TOUCHED'], ['NOT_FOUND']) + + def stats_job(self, jid): + """Return a dict of stats about a job, by job id.""" + return self._interact_yaml_dict('stats-job %d\r\n' % jid, + ['OK'], + ['NOT_FOUND']) + + +class Job(object): + def __init__(self, conn, jid, body, reserved=True): + self.conn = conn + self.jid = jid + self.body = body + self.reserved = reserved + + def _priority(self): + stats = self.stats() + if isinstance(stats, dict): + return stats['pri'] + return DEFAULT_PRIORITY + + # -- public interface -- + + def delete(self): + """Delete this job.""" + self.conn.delete(self.jid) + self.reserved = False + + def release(self, priority=None, delay=0): + """Release this job back into the ready queue.""" + if self.reserved: + self.conn.release(self.jid, priority or self._priority(), delay) + self.reserved = False + + def bury(self, priority=None): + """Bury this job.""" + if self.reserved: + self.conn.bury(self.jid, priority or self._priority()) + self.reserved = False + + def touch(self): + """Touch this reserved job, requesting more time to work on it before it + expires.""" + if self.reserved: + self.conn.touch(self.jid) + + def stats(self): + """Return a dict of stats about this job.""" + return self.conn.stats_job(self.jid) + +def parse_yaml_dict(yaml): + """Parse a YAML dict, in the form returned by beanstalkd.""" + dict = {} + for m in re.finditer(r'^\s*([^:\s]+)\s*:\s*([^\s]*)$', yaml, re.M): + key, val = m.group(1), m.group(2) + # Check the type of the value, and parse it. + if key == 'name' or key == 'tube' or key == 'version': + dict[key] = val # String, even if it looks like a number + elif re.match(r'^(0|-?[1-9][0-9]*)$', val) is not None: + dict[key] = int(val) # Integer value + elif re.match(r'^(-?\d+(\.\d+)?(e[-+]?[1-9][0-9]*)?)$', val) is not None: + dict[key] = float(val) # Float value + else: + dict[key] = val # String value + return dict + +def parse_yaml_list(yaml): + """Parse a YAML list, in the form returned by beanstalkd.""" + return re.findall(r'^- (.*)$', yaml, re.M) + +if __name__ == '__main__': + import doctest, os, signal + try: + pid = os.spawnlp(os.P_NOWAIT, + 'beanstalkd', + 'beanstalkd', '-l', '127.0.0.1', '-p', '14711') + doctest.testfile('TUTORIAL.md', optionflags=doctest.ELLIPSIS) + doctest.testfile('test/network.doctest', optionflags=doctest.ELLIPSIS) + finally: + os.kill(pid, signal.SIGTERM)