Implement a server event queue using beanstalkd
This commit is contained in:
60
api/events.py
Normal file
60
api/events.py
Normal file
@@ -0,0 +1,60 @@
|
||||
from django.conf import settings
|
||||
from minecraft.models import Server
|
||||
from json import dumps, JSONEncoder
|
||||
import beanstalkc
|
||||
from django.contrib.auth.models import User
|
||||
|
||||
class EventEncoder(JSONEncoder):
|
||||
def default(self, obj):
|
||||
if isinstance(obj, Event):
|
||||
return {'type': obj.type, 'payload': obj.data}
|
||||
return super(EventEncoder, self).default(obj)
|
||||
|
||||
|
||||
class Event(object):
|
||||
def __init__(self, type, data):
|
||||
self.type = type
|
||||
self.data = data
|
||||
|
||||
class BroadcastEvent(Event):
|
||||
def __init__(self, message):
|
||||
super(BroadcastEvent, self).__init__(type='broadcast', data={'message':
|
||||
message})
|
||||
|
||||
class PlayerMessageEvent(Event):
|
||||
def __init__(self, user, message):
|
||||
super(PlayerMessageEvent, self).__init__(type='player-message',
|
||||
data={'message': message, 'player': user})
|
||||
|
||||
def server_queue(server, users=[]):
|
||||
queueName = 'caminus-broadcast-%s'%server.id
|
||||
queue = beanstalkc.Connection(host=settings.CAMINUS_BEANSTALKD_HOST,
|
||||
port=settings.CAMINUS_BEANSTALKD_PORT)
|
||||
queue.use(queueName)
|
||||
queue.watch(queueName)
|
||||
if len(users) > 0:
|
||||
for user in users:
|
||||
queue.watch("caminus-user-%s"%user)
|
||||
return queue
|
||||
|
||||
def send_server_event(server, event):
|
||||
if settings.CAMINUS_USE_BEANSTALKD:
|
||||
queue = server_queue(server)
|
||||
json = dumps(event, cls=EventEncoder)
|
||||
queue.put(json)
|
||||
|
||||
def server_broadcast(message, *args):
|
||||
message = message%args
|
||||
for server in Server.objects.all():
|
||||
event = BroadcastEvent(message)
|
||||
send_server_event(server, event)
|
||||
|
||||
def user_message(user, message, *args):
|
||||
player = user.minecraftprofile.mc_username
|
||||
player_message(player, message, *args)
|
||||
|
||||
def player_message(playername, message, *args):
|
||||
message = message%args
|
||||
for server in Server.objects.all():
|
||||
event = PlayerMessageEvent(playername, message)
|
||||
send_server_event(server, event)
|
@@ -9,6 +9,7 @@ from urllib2 import urlopen
|
||||
import json
|
||||
from datetime import datetime
|
||||
from models import cachePlayerList
|
||||
from servers import server_queue, user_queue
|
||||
|
||||
class MOTDHandler(AnonymousBaseHandler):
|
||||
allowed_methods = ('GET',)
|
||||
@@ -76,6 +77,23 @@ class ServerPingHandler(BaseHandler):
|
||||
def read(self, request):
|
||||
return {'identity': request.server}
|
||||
|
||||
class ServerEventHandler(BaseHandler):
|
||||
allowed_methods = ('GET', 'POST')
|
||||
|
||||
def read(self, request):
|
||||
queue = server_queue(request.server)
|
||||
queue.watch('caminus-broadcast-%s'%request.server.id)
|
||||
events = []
|
||||
job = queue.reserve(timeout=30)
|
||||
if job:
|
||||
events.append({'id': job.jid, 'event': json.loads(job.body)})
|
||||
return {'events': events}
|
||||
|
||||
def create(self, request):
|
||||
queue = server_queue(request.server)
|
||||
queue.delete(int(request.POST['job']))
|
||||
return {'result': 'success'}
|
||||
|
||||
class PollHandler(BaseHandler):
|
||||
allowed_methods = ('GET',)
|
||||
|
||||
|
0
api/management/__init__.py
Normal file
0
api/management/__init__.py
Normal file
0
api/management/commands/__init__.py
Normal file
0
api/management/commands/__init__.py
Normal file
36
api/tests.py
36
api/tests.py
@@ -5,6 +5,8 @@ from django.contrib.auth.models import User
|
||||
from minecraft.models import MinecraftProfile, Server, PlayerSession, MOTD, Ban
|
||||
from local.models import Quote
|
||||
import hashlib
|
||||
import events
|
||||
from django.conf import settings
|
||||
|
||||
class ServerPingTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
@@ -25,6 +27,40 @@ class ServerPingTest(unittest.TestCase):
|
||||
resp = self.client.get('/api/server/whoami', HTTP_AUTHORIZATION='X-Caminus %s'%(self.token))
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
|
||||
if settings.CAMINUS_USE_BEANSTALKD:
|
||||
class ServerEventTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.client = Client()
|
||||
self.user = User.objects.create_user('ValidUsername', 'test@example.com')
|
||||
self.user.minecraftprofile.mc_username = "ValidUsername"
|
||||
self.user.minecraftprofile.save()
|
||||
self.server = Server.objects.create(hostname='localhost', secret='secret')
|
||||
tokenHash = hashlib.sha1()
|
||||
tokenHash.update("%s%s%s"%('localhost', 0, 'secret'))
|
||||
self.token = "%s$%s$%s"%('localhost', 0, tokenHash.hexdigest())
|
||||
|
||||
def tearDown(self):
|
||||
self.user.delete()
|
||||
self.server.delete()
|
||||
|
||||
def testBroadcast(self):
|
||||
events.server_broadcast("Test message")
|
||||
response = json.loads(self.client.get('/api/server/events',
|
||||
HTTP_AUTHORIZATION='X-Caminus %s'%(self.token)).content)
|
||||
self.assertTrue(len(response['events']) > 0)
|
||||
response = json.loads(self.client.post('/api/server/events', {'job':response['events'][0]['id']},
|
||||
HTTP_AUTHORIZATION='X-Caminus %s'%(self.token)).content)
|
||||
self.assertEqual(response['result'], 'success')
|
||||
|
||||
def testUserMessage(self):
|
||||
events.user_message(self.user, "Test user message")
|
||||
response = json.loads(self.client.get('/api/server/events',
|
||||
HTTP_AUTHORIZATION='X-Caminus %s'%(self.token)).content)
|
||||
self.assertTrue(len(response['events']) > 0)
|
||||
response = json.loads(self.client.post('/api/server/events', {'job':response['events'][0]['id']},
|
||||
HTTP_AUTHORIZATION='X-Caminus %s'%(self.token)).content)
|
||||
self.assertEqual(response['result'], 'success')
|
||||
|
||||
class MOTDTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.client = Client()
|
||||
|
@@ -40,6 +40,7 @@ class ServerResource(Resource):
|
||||
urlpatterns = patterns('api',
|
||||
url(r'^motd/(?P<username>.*)$', motdHandler),
|
||||
url(r'^server/whoami$', ServerResource(handlers.ServerPingHandler)),
|
||||
url(r'^server/events$', ServerResource(handlers.ServerEventHandler)),
|
||||
url(r'^server/economy/(?P<playername>.*)$', ServerResource(handlers.EconomyHandler)),
|
||||
url(r'^server/session/(?P<playername>.*)/new$', ServerResource(handlers.NewPlayerSessionHandler)),
|
||||
url(r'^server/session/(?P<playername>.*)/close$', ServerResource(handlers.ClosePlayerSessionHandler)),
|
||||
|
Reference in New Issue
Block a user