Added quantiling UDAFs
[com/gs-lite.git] / bin / gshub.py
1 #!/usr/bin/python\r
2 # ------------------------------------------------\r
3 #   Copyright 2014 AT&T Intellectual Property\r
4 #   Licensed under the Apache License, Version 2.0 (the "License");\r
5 #   you may not use this file except in compliance with the License.\r
6 #   You may obtain a copy of the License at\r
7 #\r
8 #     http://www.apache.org/licenses/LICENSE-2.0\r
9 #\r
10 #   Unless required by applicable law or agreed to in writing, software\r
11 #   distributed under the License is distributed on an "AS IS" BASIS,\r
12 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
13 #   See the License for the specific language governing permissions and\r
14 #   limitations under the License.\r
15 # -------------------------------------------\r
16 \r
17 # Implementation of GSHUB REST service\r
18 # for announcement and discovery of gs instances, sources and sinks\r
19 \r
20 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer\r
21 import SocketServer\r
22 import json\r
23 import cgi\r
24 import threading\r
25 import getopt\r
26 import sys\r
27 import re\r
28 import cgi\r
29 import socket\r
30 import json\r
31 \r
32 # lis of URLS for all the REST calls we will serve\r
33 DISCOVER_INSTANCE_URL = '/v1/discover-instance'\r
34 DISCOVER_INITINSTANCE_URL = '/v1/discover-initialized-instance'\r
35 DISCOVER_SOURCE_URL =  '/v1/discover-source'\r
36 DISCOVER_SINK_URL =  '/v1/discover-sink'\r
37 DISCOVER_STARTPROCESSING_URL = '/v1/discover-start-processing'\r
38 ANNOUNCE_INSTANCE_URL =  '/v1/announce-instance'\r
39 ANNOUNCE_INITINSTANCE_URL =  '/v1/announce-initialized-instance'\r
40 ANNOUNCE_SOURCE_URL =  '/v1/announce-source'\r
41 ANNOUNCE_SINK_URL =  '/v1/announce-sink'\r
42 ANNOUNCE_STARTPROCESSING_URL = '/v1/announce-start-processing'\r
43 ANNOUNCE_STREAM_SUBSCRIPTION = '/v1/announce-stream-subscription'\r
44 ANNOUNCE_FTA_INSTANCE = '/v1/announce-fta-instance'\r
45 ANNOUNCE_METRICS = '/v1/log-metrics'\r
46 \r
47 # gs instance endpoints\r
48 gs_instances = {}\r
49 \r
50 # initialized gs instances\r
51 gs_init_instances = {}\r
52 \r
53 # instances for which processing started\r
54 gs_startprocessing_instances = {}\r
55 \r
56 # source endpoints\r
57 sources = {}\r
58 \r
59 # sink endpoints\r
60 sinks = {}\r
61 \r
62 \r
63 # exctract endpoint information from json data\r
64 def extract_endpoint(data) :\r
65         name = ''\r
66         ip = ''\r
67         port = 0\r
68                 \r
69         try :\r
70                 doc = json.loads(str(data))     \r
71         except :\r
72                 print ('Invalid json message ' + str(data))\r
73                 return []\r
74         \r
75         for key in doc.keys() :\r
76                 if key == 'name' :\r
77                         name = doc[key]\r
78                 elif key == 'ip' :\r
79                         ip = doc[key]\r
80                         # validate ip address           \r
81                         try :\r
82                                 socket.inet_pton(socket.AF_INET, ip)\r
83                         except :\r
84                                 print ('Invalid IPV4 address ' + ip)\r
85                                 ip = ''\r
86                 elif key == 'port' :\r
87                         # validate port number\r
88                         try :\r
89                                 port = int(doc[key])\r
90                         except :\r
91                                 print ('Invalid port number ' + doc[key])\r
92                                 port = 0\r
93         \r
94         if name == '' or ip == '' or port == 0 :\r
95                 print ('Name, ip or port is missing from json message ' + str(doc))\r
96                 return []\r
97         \r
98 \r
99         return [name, ip, port]\r
100 \r
101 \r
102 # extract instance name from json data\r
103 def extract_instance_name(data) :\r
104         name = ''\r
105         \r
106         try :\r
107                 doc = json.loads(str(data))     \r
108         except :\r
109                 print ('Invalid json message ' + str(data))\r
110                 return ''\r
111         \r
112         for key in doc.keys() :\r
113                 if key == 'name' :\r
114                         name = doc[key]\r
115         \r
116         if name == '' :\r
117                 print ('Name field is missing in json message ' + str(doc))\r
118         elif (name in gs_instances) == False:\r
119                 print ('Attempt to announce the initialization or start of processing for unknown instance ' + name)\r
120                 name = ''\r
121                 \r
122         return name\r
123         \r
124 # handler for HTTP requests. We will override do_POST and do_GET of BaseHTTPRequestHandler\r
125 class Server(BaseHTTPRequestHandler) :\r
126 \r
127         def do_POST(self):\r
128                 if re.search(ANNOUNCE_INSTANCE_URL, self.path) != None:         \r
129                         print self.path\r
130                         print self.headers\r
131                         if self.headers.getheader('content-type') == 'application/json' :\r
132                                 # Find content length\r
133                                 content_len = 0\r
134                                 for i in range(len(self.headers.keys())):\r
135                                         if self.headers.keys()[i] == 'content-length' :\r
136                                                 content_len = int (self.headers.values()[i])\r
137                                                 break\r
138                                 if content_len != 0 :\r
139                                         # extract endpoint information\r
140                                         endpoint = extract_endpoint(self.rfile.read(content_len))\r
141                                         if endpoint == [] :\r
142                                                 self.send_response(400)\r
143                                         else :\r
144                                                 self.send_response(200)\r
145                                                 gs_instances[endpoint[0]] = [endpoint[1], endpoint[2]]\r
146                                 else :\r
147                                         self.send_response(400)\r
148 \r
149                         else:\r
150                                 self.send_response(400)\r
151                         self.end_headers()              \r
152                         \r
153                 elif re.search(ANNOUNCE_INITINSTANCE_URL, self.path) != None:\r
154                         if self.headers.getheader('content-type') == 'application/json' :\r
155                                 # Find content length\r
156                                 content_len = 0\r
157                                 for i in range(len(self.headers.keys())):\r
158                                         if self.headers.keys()[i] == 'content-length' :\r
159                                                 content_len = int (self.headers.values()[i])\r
160                                                 break\r
161                                 if content_len != 0 :\r
162                                         # extract name of initialized gs instance                               \r
163                                         name = extract_instance_name(self.rfile.read(content_len))\r
164                                         if name == '' :\r
165                                                 self.send_response(400)\r
166                                         else :\r
167                                                 self.send_response(200)\r
168                                                 gs_init_instances[name] = 1\r
169                                 else :\r
170                                         self.send_response(400)\r
171 \r
172                         else:\r
173                                 self.send_response(400)\r
174                         self.end_headers()      \r
175                         \r
176                 elif re.search(ANNOUNCE_SOURCE_URL, self.path) != None:\r
177                         if self.headers.getheader('content-type') == 'application/json' :\r
178                                 # Find content length\r
179                                 content_len = 0\r
180                                 for i in range(len(self.headers.keys())):\r
181                                         if self.headers.keys()[i] == 'content-length' :\r
182                                                 content_len = int (self.headers.values()[i])\r
183                                                 break\r
184                                 if content_len != 0 :\r
185                                         # extract endpoint information                          \r
186                                         endpoint = extract_endpoint(self.rfile.read(content_len))\r
187                                         if endpoint == [] :\r
188                                                 self.send_response(400)\r
189                                         else :\r
190                                                 self.send_response(200)\r
191                                                 sources[endpoint[0]] = [endpoint[1], endpoint[2]]\r
192                                 else :\r
193                                         self.send_response(400)\r
194 \r
195                         else:\r
196                                 self.send_response(400)\r
197                         self.end_headers()\r
198                         \r
199                 elif re.search(ANNOUNCE_SINK_URL, self.path) != None:\r
200                         if self.headers.getheader('content-type') == 'application/json' :\r
201                                 # Find content length\r
202                                 content_len = 0\r
203                                 for i in range(len(self.headers.keys())):\r
204                                         if self.headers.keys()[i] == 'content-length' :\r
205                                                 content_len = int (self.headers.values()[i])\r
206                                                 break\r
207                                 if content_len != 0 :\r
208                                         # extract endpoint information                          \r
209                                         endpoint = extract_endpoint(self.rfile.read(content_len))\r
210                                         if endpoint == [] :\r
211                                                 self.send_response(400)\r
212                                         else :\r
213                                                 self.send_response(200)\r
214                                                 sinks[endpoint[0]] = [endpoint[1], endpoint[2]]\r
215                                 else :\r
216                                         self.send_response(400)\r
217 \r
218                         else:\r
219                                 self.send_response(400)\r
220                         self.end_headers()      \r
221                         \r
222                 elif re.search(ANNOUNCE_STARTPROCESSING_URL, self.path) != None:\r
223                         if self.headers.getheader('content-type') == 'application/json' :\r
224                                 # Find content length\r
225                                 content_len = 0\r
226                                 for i in range(len(self.headers.keys())):\r
227                                         if self.headers.keys()[i] == 'content-length' :\r
228                                                 content_len = int (self.headers.values()[i])\r
229                                                 break\r
230                                 if content_len != 0 :\r
231                                         # extract name of initialized gs instance                               \r
232                                         name = extract_instance_name(self.rfile.read(content_len))\r
233                                         if name == '' :\r
234                                                 self.send_response(400)\r
235                                         else :\r
236                                                 self.send_response(200)\r
237                                                 gs_startprocessing_instances[name] = 1\r
238                                 else :\r
239                                         self.send_response(400)\r
240 \r
241                         else:\r
242                                 self.send_response(400)\r
243                         self.end_headers()\r
244                         \r
245                 # we do not do any processing for ANNOUNCE_STREAM_SUBSCRIPTION, ANNOUNCE_FTA_INSTANCE and ANNOUNCE_METRICS in gshub simulator           \r
246                 elif (re.search(ANNOUNCE_STREAM_SUBSCRIPTION, self.path) != None) or (re.search(ANNOUNCE_FTA_INSTANCE, self.path) != None) or (re.search(ANNOUNCE_METRICS, self.path) != None):\r
247                         if self.headers.getheader('content-type') == 'application/json' :\r
248                                 # Find content length\r
249                                 content_len = 0\r
250                                 for i in range(len(self.headers.keys())):\r
251                                         if self.headers.keys()[i] == 'content-length' :\r
252                                                 content_len = int (self.headers.values()[i])\r
253                                                 break\r
254                                 if content_len != 0 :                           \r
255                                         self.send_response(200)\r
256                                 else :\r
257                                         self.send_response(400)\r
258 \r
259                         else:\r
260                                 self.send_response(400)\r
261                         self.end_headers()      \r
262                         \r
263                 else:\r
264                         self.send_response(404)\r
265                         self.end_headers()\r
266                 return\r
267                 \r
268         def do_GET(self):\r
269                 if re.search(DISCOVER_INSTANCE_URL + '/*', self.path) != None:\r
270                         instance = self.path.split('/')[-1]\r
271                         # check if this instance is registered\r
272                         if instance in gs_instances :\r
273                                 self.send_response(200)\r
274                                 self.send_header('Content-Type', 'application/json')\r
275                                 self.end_headers()\r
276                                 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}"))\r
277                         else:\r
278                                 self.send_response(400)\r
279                                 self.end_headers()\r
280 \r
281 \r
282                 elif re.search(DISCOVER_INITINSTANCE_URL + '/*', self.path) != None:\r
283                         instance = self.path.split('/')[-1]\r
284                         # check if this instance is initialized\r
285                         if instance in gs_init_instances :\r
286                                 self.send_response(200)\r
287                                 self.send_header('Content-Type', 'application/json')\r
288                                 self.end_headers()\r
289                                 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}"))\r
290                         else:\r
291                                 self.send_response(400)\r
292                                 self.end_headers()\r
293 \r
294                 elif re.search(DISCOVER_SOURCE_URL + '/*', self.path) != None:\r
295                         source = self.path.split('/')[-1]\r
296                         # check if it is a registered source\r
297                         if source in sources :\r
298                                 self.send_response(200)\r
299                                 self.send_header('Content-Type', 'application/json')\r
300                                 self.end_headers()\r
301                                 self.wfile.write(bytes("{\"ip\" : \"" + sources[source][0] + "\", \"port\": " + str(sources[source][1]) + "}"))\r
302                         else:\r
303                                 self.send_response(400)\r
304                                 self.end_headers()\r
305                                 \r
306                 elif re.search(DISCOVER_SINK_URL + '/*', self.path) != None:\r
307                         sink = self.path.split('/')[-1]\r
308                         # check if it is a registered sink\r
309                         if sink in sinks :\r
310                                 self.send_response(200)\r
311                                 self.send_header('Content-Type', 'application/json')\r
312                                 self.end_headers()\r
313                                 self.wfile.write(bytes("{\"ip\" : \"" + sinks[sink][0] + "\", \"port\": " + str(sinks[sink][1]) + "}"))\r
314                         else:\r
315                                 self.send_response(400)\r
316                                 self.end_headers()\r
317                                 \r
318                 elif re.search(DISCOVER_STARTPROCESSING_URL + '/*', self.path) != None:\r
319                         instance = self.path.split('/')[-1]\r
320                         # check if this instance is initialized\r
321                         if instance in gs_startprocessing_instances :\r
322                                 self.send_response(200)\r
323                                 self.send_header('Content-Type', 'application/json')\r
324                                 self.end_headers()\r
325                                 self.wfile.write(bytes("{}"))\r
326                         else:\r
327                                 self.send_response(400)\r
328                                 self.end_headers()                              \r
329                 else:\r
330                         self.send_response(404)\r
331                         self.end_headers()\r
332                 return\r
333 \r
334 \r
335 # print usage instructions\r
336 def usage():\r
337         print ('./gshub.py [-p port]')\r
338         \r
339         \r
340 \r
341 def main():\r
342         # process command-line arguments\r
343         try:\r
344                 opts, args = getopt.getopt(sys.argv[1:], "hp:v", ["help", "port="])\r
345         except getopt.GetoptError as err:\r
346                 # print help information and exit:\r
347                 print(str(err)) \r
348                 usage()\r
349                 sys.exit(2)\r
350 \r
351         port = 0\r
352         for o, a in opts:\r
353                 if o in ("-h", "--help"):\r
354                         usage()\r
355                         sys.exit(0)\r
356                 elif o in ("-p", "--port"):\r
357                         port = int(a)\r
358                 else:\r
359                         print ('Unknown command-line option ' + o)\r
360 \r
361         # start HTTP server to serve REST calls\r
362         server_address = ('127.0.0.1', port)\r
363         httpd = HTTPServer(server_address, Server)\r
364 \r
365         # record HTTP server address in gshub.log\r
366         f = open('gshub.log', 'w')\r
367         f.write(str(httpd.server_address[0]) + ':' + str(httpd.server_address[1]) + '\n')\r
368         f.close()\r
369                 \r
370         print ('GSHUB Running on port ' + str(httpd.server_address[1]) + ' ...')\r
371 \r
372         httpd.serve_forever()\r
373 \r
374         \r
375 if __name__ == "__main__":\r
376         main()\r