blob: a0d2a51166723ab36b9da99ab6e1ac4d7ec883ff [file] [log] [blame]
Azul683d50d2015-03-29 14:34:45 +01001#!/usr/bin/env python2
2
3__author__ = 'azul'
4
5from flask import Flask, request
6import redis
7import redispubsub
8
9
10class Publisher(object):
11
12 def __init__(self):
13 """
14 Initialises a connection to Redis for the publisher
15
16 """
17 self.publisher = redis.Redis()
18
19 def publish(self, topic, message):
20 """
21 Publishes a message to a redis queue
22
23 :param topic: name of the redis queue to subscribe
24 :param message: message to push into the queue
25 :return: True/False
26 """
27 try:
28 self.publisher.publish(topic, message)
29 return True
30 except:
31 return False
32
33
34class Consumer(object):
35
36 def __init__(self):
37 """
38 Initialises a connection to Redis for the redis consumer
39
40 """
41 self.publisher = redis.Redis()
42 self.consumer = redispubsub.RedisPubsub(self.publisher.pubsub())
43 self.is_consumer_running = False
44
45 def subscribe(self, topic):
46 """
47 Subscribes to a redis queue
48
49 :param topic: name of the redis queue to subscribe
50 :return: True
51 """
52 if self.is_consumer_running is False:
53 self.consumer.subscribe(topic)
54 self.consumer.start()
55 self.is_consumer_running = True
56 else:
57 self.consumer.subscribe(topic)
58 return True
59
60 def unsubscribe(self, topic):
61 """
62 Unsubcribes from a redis queue
63
64 :param topic: name of the redis queue to subscribe
65 :return: True
66 """
67 self.consumer.unsubscribe(topic)
68 return True
69
70 def read_next(self, topic):
71 """
72
73 :param topic: name of the redis queue
74 :return: String containing the next message in the redis queue
75 """
76 message = self.consumer.dequeue(topic)
77 if message is None:
78 response = ''
79 elif message['type'] == 'subscribe':
80 response = ''
81 else:
82 response = message['data']
83 return str(response)
84
85
86if __name__ == '__main__':
87
88 publisher = Publisher()
89 users = {}
90
91 app = Flask(__name__)
92
93 @app.route('/<topic>', methods=['POST'])
94 def publish(topic):
95 # we need to call get_data() to get the body of the message posted
96 request.get_data()
97 result = publisher.publish(topic, request.data)
98 if result:
99 return 'Publish succeeded : message %s published to topic %s' % (request.data, topic)
100 else:
101 return 'failed to publish message %s to topic %s' % (
102 request.data, topic)
103
104 @app.route('/<topic>/<username>', methods=['POST'])
105 def subscribe(topic, username):
106 if username not in users.keys():
107 users[username] = Consumer()
108 if users[username].subscribe(topic):
109 return 'Subscription succeeded : %s to topic %s' % (username, topic)
110
111 @app.route('/<topic>/<username>', methods=['DELETE'])
112 def unsubscribe(topic, username):
113 if username in users.keys():
114 try:
115 if users[username].unsubscribe(topic):
116 result = \
117 "Unsubscribe succeeded : user %s from topic %s" \
118 % (username, topic)
119 return str(result)
120 else:
121 return 'The subscription does not exist', 404
122 except:
123 return 'The subscription does not exist', 404
124 else:
125 return 'The subscription does not exist', 404
126
127 @app.route('/<topic>/<username>', methods=['GET'])
128 def read_next(topic, username):
129 if username in users.keys():
130 result = users[username].read_next(topic)
131 else:
132 return str('The subscription does not exist'), 404
133 if result == '':
134 return \
135 'There are no messages available for this topic on this user', 204
136 else:
137 return str(result)
138
139 app.run(host='0.0.0.0')