Skip to content
Snippets Groups Projects
Commit bf455d74 authored by Tomáš Čejka's avatar Tomáš Čejka
Browse files

polling: rewritten to use redis stream

Messages now remain longer on the server so user is able to get them even after
reload, multiple users from the same peertag receive the same messages.
New variables were added into settings to control behavior of the messages
storage (max size of queue, max life of queue).

NOTIF_STREAM_MAXSIZE = 50
NOTIF_STREAM_MAXLIFE = 1800
parent ad8a79dd
No related branches found
No related tags found
No related merge requests found
......@@ -202,10 +202,12 @@ def announce(messg, user, route):
messg = str(messg)
logger.info("ANNOUNCE " + messg)
r = redis.StrictRedis()
key = "msg_%s" % username
logger.info("ANNOUNCE key " + key)
r.rpush(key, messg)
r.expire(key, 1800)
key = "notifstream_%s" % username
obj = {"m": messg, "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
logger.info("ANNOUNCE " + str(obj))
lastid = r.xadd(key, obj, maxlen=settings.NOTIF_STREAM_MAXSIZE, approximate=False)
logger.info("ANNOUNCE key " + key + " with lastid " + lastid.decode("utf-8"))
r.expire(key, settings.NOTIF_STREAM_MAXLIFE)
@shared_task
def check_sync(route_name=None, selected_routes=[]):
......
......@@ -493,5 +493,9 @@ SETTINGS_EXPORT = [
ENABLE_SETUP_VIEW = False
from flowspy.settings_local import *
# Maximum size of stream of notification messages per each peer tag
NOTIF_STREAM_MAXSIZE = 50
# Maximum time that notifications persist on the server in seconds
NOTIF_STREAM_MAXLIFE = 1800
from flowspy.settings_local import *
......@@ -71,5 +71,5 @@
-ENABLE_SETUP_VIEW = False
+ENABLE_SETUP_VIEW = True
from flowspy.settings_local import *
# Maximum size of stream of notification messages per each peer tag
NOTIF_STREAM_MAXSIZE = 50
......@@ -22,6 +22,8 @@ from poller import views
urlpatterns = [
#'poller.views',
url('^$', views.main),
# 1st call to get all existing messages
url('^message/existing/(?P<peer_id>[\w\-]+)/$', views.message_existing, name='fetch-existing'),
url('^message/updates/(?P<peer_id>[\w\-]+)/$', views.message_updates, name='fetch-updates')
# update - get new messages
url('^message/updates/(?P<peer_id>[\w\-]+)/(?P<last_id>[\w\-]+)$', views.message_updates, name='fetch-updates')
]
......@@ -34,6 +34,13 @@ import redis
import logging
import os
# This component is used to retrieve stream of notifications from server into browser;
# the notifications are "announced" by flowspec/tasks.py announce() method;
# all notifications are passed via redis, the key is created as notifstream_%s, where %s is a peertag.
# The key is used to store stream of objects: {"m": "%s", "time": "timestamp"},
# where %s is a notification message, and timestamp is in "%Y-%m-%d %H:%M:%S"
# format.
LOG_FILENAME = os.path.join(settings.LOG_FILE_LOCATION, 'poller.log')
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)
......@@ -43,8 +50,16 @@ handler.setFormatter(formatter)
logger.addHandler(handler)
def create_message(message, user, time):
data = {'id': str(uuid.uuid4()), 'body': message, 'user':user, 'time':time}
def create_message(message, user, msgid, time):
"""Create new message that will be sent in a response to client with text "message".
Params:
message: str text of the notification
user: str username of logged in user
str: str message id from redis
Returns:
dict with the following keys: id, body, user, time
"""
data = {'id': msgid, 'body': message, 'user':user, 'time':time}
data['html'] = render_to_string('poll_message.html', {'message': data})
return data
......@@ -81,54 +96,16 @@ class Msgs(object):
def message_existing(self, request, peer_id):
if request.is_ajax():
try:
user = Peer.objects.get(pk=peer_id).peer_tag
except:
user = None
return False
try:
assert(self.new_message_user_event[user])
except:
self.new_message_user_event[user] = Event()
try:
if self.user_cache[user]:
self.user_cursor[user] = self.user_cache[user][-1]['id']
except:
self.user_cache[user] = []
self.user_cursor[user] = ''
return json_response({'messages': self.user_cache[user]})
logger.debug("Polling all existing notifications")
return self.message_updates(request, peer_id, "")
return HttpResponseRedirect(reverse('group-routes'))
def message_new(self, mesg=None):
if mesg:
message = mesg['message']
user = mesg['username']
logger.info("from %s" %user)
now = datetime.datetime.now()
msg = create_message(message, user, now.strftime("%Y-%m-%d %H:%M:%S"))
try:
isinstance(self.user_cache[user], list)
except:
self.user_cache[user] = []
self.user_cache[user].append(msg)
if self.user_cache[user][-1] == self.user_cache[user][0]:
self.user_cursor[user] = self.user_cache[user][-1]['id']
else:
self.user_cursor[user] = self.user_cache[user][-2]['id']
if len(self.user_cache[user]) > self.cache_size:
self.user_cache[user] = self.user_cache[user][-self.cache_size:]
try:
assert(self.new_message_user_event[user])
except:
self.new_message_user_event[user] = Event()
self.new_message_user_event[user].set()
self.new_message_user_event[user].clear()
return json_response(msg)
def message_updates(self, request, peer_id):
def message_updates(self, request, peer_id, last_id=""):
if request.is_ajax():
cursor = {}
logger.debug("Polling update")
if last_id:
logger.debug("Polling updates of notifications since " + last_id)
last_id = bytes(last_id, "utf-8")
try:
user = Peer.objects.get(pk=peer_id).peer_tag
logger.debug("Polling by user %s", str(user))
......@@ -136,20 +113,21 @@ class Msgs(object):
user = None
return False
r = redis.StrictRedis()
key = "msg_%s" % user
key = "notifstream_%s" % user
logger.debug(str((key, user)))
size = r.llen(key)
msgs = []
now = datetime.datetime.now()
for i in range(size):
m = r.lpop(key)
if m:
msgs.append(create_message(m.decode("utf-8"), request.user.username, now.strftime("%Y-%m-%d %H:%M:%S")))
#msgs = r.lrange(key, 0, size)
if last_id and last_id != b"null":
logger.debug("Polling from last_id %s", last_id)
msgs = r.xrange(key, min=last_id)
else:
msgs = r.xrange(key)
msglist = []
for i, msg in msgs:
if last_id != i:
msglist.append(create_message(msg[b"m"].decode("utf-8"), request.user.username, i.decode("utf-8"), msg[b"time"].decode("utf-8")))
logger.debug(str(msgs))
if not msgs:
return HttpResponse(content='', content_type=None, status=400)
return json_response({'messages': msgs})
return HttpResponse(content='', content_type=None, status=204)
return json_response({'messages': msglist})
return HttpResponseRedirect(reverse('group-routes'))
......
......@@ -49,28 +49,27 @@ jQuery.fn.enable = function(opt_enable) {
return this;
};
var loads = 0;
var updater = {
errorSleepTime: 5000,
cursor: null,
last_id: null,
start: function() {
var date = new Date();
var timestamp = date.getTime();
{% for peer in user.userprofile.peers.all %}
$.ajax({url: "{% url 'fetch-existing' peer.pk %}?="+timestamp, type: "POST", dataType: "json", cache:false,
success: updater.onFetchExisting,
error: updater.onError});
{% endfor %}
},
console.log("Initial fetching of all notifications.");
{% for peer in user.userprofile.peers.all %}
$.ajax({url: "{% url 'fetch-existing' peer.pk %}", type: "POST", dataType: "json", cache:false,
success: updater.onFetchExisting,
error: updater.onError});
{% endfor %}
},
poll: function() {
{% if user.is_authenticated %}
console.log("Polling new notifications from", updater.last_id);
if (oTable) {
oTable.fnReloadAjax(refreshUrl);
}
timeout = {{timeout}};
var date = new Date();
var timestamp = date.getTime();
{% for peer in user.userprofile.peers.all %}
$.ajax({url: "{% url 'fetch-updates' peer.pk %}?="+timestamp, type: "POST", dataType: "json", cache:false,
$.ajax({url: "{% url 'fetch-updates' peer.pk 'PLACEHOLDER' %}".replace("PLACEHOLDER", updater.last_id), type: "POST", dataType: "json", cache:false,
success: updater.onSuccess,
timeout: timeout,
error: updater.onError});
......@@ -84,7 +83,6 @@ var updater = {
updater.onError();
return;
}
//updater.errorSleepTime = 500;
window.setTimeout(updater.poll, updater.errorSleepTime);
},
......@@ -99,15 +97,18 @@ var updater = {
},
onError: function(response, text) {
if (text == 'timeout'){
if (oTable) {
oTable.fnReloadAjax(refreshUrl);
}
}
//updater.errorSleepTime *= 2;
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
window.setTimeout(updater.poll, updater.errorSleepTime);
if (text == 'timeout'){
if (oTable) {
oTable.fnReloadAjax(refreshUrl);
}
}
//updater.errorSleepTime *= 2;
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
if (updater.last_id == null) {
window.setTimeout(updater.start, updater.errorSleepTime);
} else {
window.setTimeout(updater.poll, updater.errorSleepTime);
}
},
newMessages: function(response) {
......@@ -115,10 +116,9 @@ var updater = {
if (response.messages.length == 0){
return true;
}
updater.cursor = response.cursor;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
updater.last_id = messages[messages.length - 1]["id"];
console.log(messages.length, "new messages, last_id:", updater.last_id);
for (var i = 0; i < messages.length; i++) {
updater.showMessage(messages[i]);
......@@ -134,10 +134,10 @@ var updater = {
if (response.messages.length == 0){
return true;
}
updater.cursor = response.cursor;
$("#inbox").empty();
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
var i = messages.length
updater.last_id = messages[messages.length - 1]["id"];
console.log("get ", i, " messages with last_id ", updater.last_id);
for (var i = 0; i < messages.length; i++) {
updater.showMessage(messages[i]);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment