Added quantiling UDAFs
[com/gs-lite.git] / bin / gshub3.py
1 #!/usr/bin/python3.6\r
2 \r
3 # ------------------------------------------------\r
4 #   Copyright 2014 AT&T Intellectual Property\r
5 #   Licensed under the Apache License, Version 2.0 (the "License");\r
6 #   you may not use this file except in compliance with the License.\r
7 #   You may obtain a copy of the License at\r
8 #\r
9 #     http://www.apache.org/licenses/LICENSE-2.0\r
10 #\r
11 #   Unless required by applicable law or agreed to in writing, software\r
12 #   distributed under the License is distributed on an "AS IS" BASIS,\r
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
14 #   See the License for the specific language governing permissions and\r
15 #   limitations under the License.\r
16 # -------------------------------------------\r
17 \r
18 # Implementation of GSHUB REST service\r
19 # for announcement and discovery of gs instances, sources and sinks\r
20 \r
21 from http.server import BaseHTTPRequestHandler, HTTPServer\r
22 from socketserver import ThreadingMixIn\r
23 import threading\r
24 import getopt\r
25 import sys\r
26 import re\r
27 import cgi\r
28 import socket\r
29 import json\r
30 \r
31 # lis of URLS for all the REST calls we will serve\r
32 DISCOVER_INSTANCE_URL = '/v1/discover-instance'\r
33 DISCOVER_INITINSTANCE_URL = '/v1/discover-initialized-instance'\r
34 DISCOVER_SOURCE_URL =  '/v1/discover-source'\r
35 DISCOVER_SINK_URL =  '/v1/discover-sink'\r
36 DISCOVER_STARTPROCESSING_URL = '/v1/discover-start-processing'\r
37 ANNOUNCE_INSTANCE_URL =  '/v1/announce-instance'\r
38 ANNOUNCE_INITINSTANCE_URL =  '/v1/announce-initialized-instance'\r
39 ANNOUNCE_SOURCE_URL =  '/v1/announce-source'\r
40 ANNOUNCE_SINK_URL =  '/v1/announce-sink'\r
41 ANNOUNCE_STARTPROCESSING_URL = '/v1/announce-start-processing'\r
42 ANNOUNCE_STREAM_SUBSCRIPTION = '/v1/announce-stream-subscription'\r
43 ANNOUNCE_FTA_INSTANCE = '/v1/announce-fta-instance'\r
44 ANNOUNCE_METRICS = '/v1/log-metrics'\r
45 \r
46 # gs instance endpoints\r
47 gs_instances = {}\r
48 \r
49 # initialized gs instances\r
50 gs_init_instances = {}\r
51 \r
52 # instances for which processing started\r
53 gs_startprocessing_instances = {}\r
54 \r
55 # source endpoints\r
56 sources = {}\r
57 \r
58 # sink endpoints\r
59 sinks = {}\r
60 \r
61 \r
62 # exctract endpoint information from json data\r
63 def extract_endpoint(data) :\r
64         name = ''\r
65         ip = ''\r
66         port = 0\r
67                 \r
68         try :\r
69                 doc = json.loads(str(data, 'utf-8'))    \r
70         except :\r
71                 print ('Invalid json message ' + str(data, 'utf-8'))\r
72                 return []\r
73         \r
74         for key in doc.keys() :\r
75                 if key == 'name' :\r
76                         name = doc[key]\r
77                 elif key == 'ip' :\r
78                         ip = doc[key]\r
79                         # validate ip address           \r
80                         try :\r
81                                 socket.inet_pton(socket.AF_INET, ip)\r
82                         except :\r
83                                 print ('Invalid IPV4 address ' + ip)\r
84                                 ip = ''\r
85                 elif key == 'port' :\r
86                         # validate port number\r
87                         try :\r
88                                 port = int(doc[key])\r
89                         except :\r
90                                 print ('Invalid port number ' + doc[key])\r
91                                 port = 0\r
92         \r
93         if name == '' or ip == '' or port == 0 :\r
94                 print ('Name, ip or port is missing from json message ' + str(doc))\r
95                 return []\r
96         \r
97 \r
98         return [name, ip, port]\r
99 \r
100 \r
101 # extract instance name from json data\r
102 def extract_instance_name(data) :\r
103         name = ''\r
104         \r
105         try :\r
106                 doc = json.loads(str(data, 'utf-8'))    \r
107         except :\r
108                 print ('Invalid json message ' + str(data, "utf-8"))\r
109                 return ''\r
110         \r
111         for key in doc.keys() :\r
112                 if key == 'name' :\r
113                         name = doc[key]\r
114         \r
115         if name == '' :\r
116                 print ('Name field is missing in json message ' + str(doc))\r
117         elif (name in gs_instances) == False:\r
118                 print ('Attempt to announce the initialization or start of processing for unknown instance ' + name)\r
119                 name = ''\r
120                 \r
121         return name\r
122         \r
123 # handler for HTTP requests. We will override do_PORT and do_GET of BaseHTTPRequestHandler\r
124 class HTTPRequestHandler(BaseHTTPRequestHandler):\r
125 \r
126         def do_POST(self):\r
127                 if re.search(ANNOUNCE_INSTANCE_URL, self.path) != None:         \r
128                         if self.headers.get_content_type() == 'application/json' :\r
129                                 # Find content length\r
130                                 content_len = 0\r
131                                 for i in range(len(self.headers.keys())):\r
132                                         if self.headers.keys()[i] == 'Content-Length' :\r
133                                                 content_len = int (self.headers.values()[i])\r
134                                                 break\r
135                                 if content_len != 0 :\r
136                                         # extract endpoint information\r
137                                         endpoint = extract_endpoint(self.rfile.read(content_len))\r
138                                         if endpoint == [] :\r
139                                                 self.send_response(400)\r
140                                         else :\r
141                                                 self.send_response(200)\r
142                                                 gs_instances[endpoint[0]] = [endpoint[1], endpoint[2]]\r
143                                 else :\r
144                                         self.send_response(400)\r
145 \r
146                         else:\r
147                                 self.send_response(400)\r
148                         self.end_headers()              \r
149                         \r
150                 elif re.search(ANNOUNCE_INITINSTANCE_URL, self.path) != None:\r
151                         if self.headers.get_content_type() == 'application/json' :\r
152                                 # Find content length\r
153                                 content_len = 0\r
154                                 for i in range(len(self.headers.keys())):\r
155                                         if self.headers.keys()[i] == 'Content-Length' :\r
156                                                 content_len = int (self.headers.values()[i])\r
157                                                 break\r
158                                 if content_len != 0 :\r
159                                         # extract name of initialized gs instance                               \r
160                                         name = extract_instance_name(self.rfile.read(content_len))\r
161                                         if name == '' :\r
162                                                 self.send_response(400)\r
163                                         else :\r
164                                                 self.send_response(200)\r
165                                                 gs_init_instances[name] = 1\r
166                                 else :\r
167                                         self.send_response(400)\r
168 \r
169                         else:\r
170                                 self.send_response(400)\r
171                         self.end_headers()      \r
172                         \r
173                 elif re.search(ANNOUNCE_SOURCE_URL, self.path) != None:\r
174                         if self.headers.get_content_type() == 'application/json' :\r
175                                 # Find content length\r
176                                 content_len = 0\r
177                                 for i in range(len(self.headers.keys())):\r
178                                         if self.headers.keys()[i] == 'Content-Length' :\r
179                                                 content_len = int (self.headers.values()[i])\r
180                                                 break\r
181                                 if content_len != 0 :\r
182                                         # extract endpoint information                          \r
183                                         endpoint = extract_endpoint(self.rfile.read(content_len))\r
184                                         if endpoint == [] :\r
185                                                 self.send_response(400)\r
186                                         else :\r
187                                                 self.send_response(200)\r
188                                                 sources[endpoint[0]] = [endpoint[1], endpoint[2]]\r
189                                 else :\r
190                                         self.send_response(400)\r
191 \r
192                         else:\r
193                                 self.send_response(400)\r
194                         self.end_headers()\r
195                         \r
196                 elif re.search(ANNOUNCE_SINK_URL, self.path) != None:\r
197                         if self.headers.get_content_type() == 'application/json' :\r
198                                 # Find content length\r
199                                 content_len = 0\r
200                                 for i in range(len(self.headers.keys())):\r
201                                         if self.headers.keys()[i] == 'Content-Length' :\r
202                                                 content_len = int (self.headers.values()[i])\r
203                                                 break\r
204                                 if content_len != 0 :\r
205                                         # extract endpoint information                          \r
206                                         endpoint = extract_endpoint(self.rfile.read(content_len))\r
207                                         if endpoint == [] :\r
208                                                 self.send_response(400)\r
209                                         else :\r
210                                                 self.send_response(200)\r
211                                                 sinks[endpoint[0]] = [endpoint[1], endpoint[2]]\r
212                                 else :\r
213                                         self.send_response(400)\r
214 \r
215                         else:\r
216                                 self.send_response(400)\r
217                         self.end_headers()      \r
218                         \r
219                 elif re.search(ANNOUNCE_STARTPROCESSING_URL, self.path) != None:\r
220                         if self.headers.get_content_type() == 'application/json' :\r
221                                 # Find content length\r
222                                 content_len = 0\r
223                                 for i in range(len(self.headers.keys())):\r
224                                         if self.headers.keys()[i] == 'Content-Length' :\r
225                                                 content_len = int (self.headers.values()[i])\r
226                                                 break\r
227                                 if content_len != 0 :\r
228                                         # extract name of initialized gs instance                               \r
229                                         name = extract_instance_name(self.rfile.read(content_len))\r
230                                         if name == '' :\r
231                                                 self.send_response(400)\r
232                                         else :\r
233                                                 self.send_response(200)\r
234                                                 gs_startprocessing_instances[name] = 1\r
235                                 else :\r
236                                         self.send_response(400)\r
237 \r
238                         else:\r
239                                 self.send_response(400)\r
240                         self.end_headers()\r
241                         \r
242                 # we do not do any processing for ANNOUNCE_STREAM_SUBSCRIPTION, ANNOUNCE_FTA_INSTANCE and ANNOUNCE_METRICS in gshub simulator           \r
243                 elif (re.search(ANNOUNCE_STREAM_SUBSCRIPTION, self.path) != None) or (re.search(ANNOUNCE_FTA_INSTANCE, self.path) != None) or (re.search(ANNOUNCE_METRICS, self.path) != None):\r
244                         if self.headers.get_content_type() == 'application/json' :\r
245                                 # Find content length\r
246                                 content_len = 0\r
247                                 for i in range(len(self.headers.keys())):\r
248                                         if self.headers.keys()[i] == 'Content-Length' :\r
249                                                 content_len = int (self.headers.values()[i])\r
250                                                 break\r
251                                 if content_len != 0 :                           \r
252                                         self.send_response(200)\r
253                                 else :\r
254                                         self.send_response(400)\r
255 \r
256                         else:\r
257                                 self.send_response(400)\r
258                         self.end_headers()      \r
259                         \r
260                 else:\r
261                         self.send_response(404)\r
262                         self.end_headers()\r
263                 return\r
264 \r
265         def do_GET(self):\r
266                 if re.search(DISCOVER_INSTANCE_URL + '/*', self.path) != None:\r
267                         instance = self.path.split('/')[-1]\r
268                         # check if this instance is registered\r
269                         if instance in gs_instances :\r
270                                 self.send_response(200)\r
271                                 self.send_header('Content-Type', 'application/json')\r
272                                 self.end_headers()\r
273                                 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}", "utf-8"))\r
274                         else:\r
275                                 self.send_response(400)\r
276                                 self.end_headers()\r
277 \r
278 \r
279                 elif re.search(DISCOVER_INITINSTANCE_URL + '/*', self.path) != None:\r
280                         instance = self.path.split('/')[-1]\r
281                         # check if this instance is initialized\r
282                         if instance in gs_init_instances :\r
283                                 self.send_response(200)\r
284                                 self.send_header('Content-Type', 'application/json')\r
285                                 self.end_headers()\r
286                                 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}", "utf-8"))\r
287                         else:\r
288                                 self.send_response(400)\r
289                                 self.end_headers()\r
290 \r
291                 elif re.search(DISCOVER_SOURCE_URL + '/*', self.path) != None:\r
292                         source = self.path.split('/')[-1]\r
293                         # check if it is a registered source\r
294                         if source in sources :\r
295                                 self.send_response(200)\r
296                                 self.send_header('Content-Type', 'application/json')\r
297                                 self.end_headers()\r
298                                 self.wfile.write(bytes("{\"ip\" : \"" + sources[source][0] + "\", \"port\": " + str(sources[source][1]) + "}", "utf-8"))\r
299                         else:\r
300                                 self.send_response(400)\r
301                                 self.end_headers()\r
302                                 \r
303                 elif re.search(DISCOVER_SINK_URL + '/*', self.path) != None:\r
304                         sink = self.path.split('/')[-1]\r
305                         # check if it is a registered sink\r
306                         if sink in sinks :\r
307                                 self.send_response(200)\r
308                                 self.send_header('Content-Type', 'application/json')\r
309                                 self.end_headers()\r
310                                 self.wfile.write(bytes("{\"ip\" : \"" + sinks[sink][0] + "\", \"port\": " + str(sinks[sink][1]) + "}", "utf-8"))\r
311                         else:\r
312                                 self.send_response(400)\r
313                                 self.end_headers()\r
314                                 \r
315                 elif re.search(DISCOVER_STARTPROCESSING_URL + '/*', self.path) != None:\r
316                         instance = self.path.split('/')[-1]\r
317                         # check if this instance is initialized\r
318                         if instance in gs_startprocessing_instances :\r
319                                 self.send_response(200)\r
320                                 self.send_header('Content-Type', 'application/json')\r
321                                 self.end_headers()\r
322                                 self.wfile.write(bytes("{}", "utf-8"))\r
323                         else:\r
324                                 self.send_response(400)\r
325                                 self.end_headers()                              \r
326                 else:\r
327                         self.send_response(404)\r
328                         self.end_headers()\r
329                 return\r
330 \r
331 \r
332 # we will use standard python threaded HTTP server\r
333 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):\r
334         allow_reuse_address = True\r
335 \r
336         def shutdown(self):\r
337                 self.socket.close()\r
338                 HTTPServer.shutdown(self)\r
339 \r
340 class SimpleHttpServer:\r
341         def __init__(self, ip, port):\r
342                 self.server = ThreadedHTTPServer((ip,port), HTTPRequestHandler)\r
343 \r
344         def start(self):\r
345                 self.server_thread = threading.Thread(target=self.server.serve_forever)\r
346                 self.server_thread.daemon = True\r
347                 self.server_thread.start()\r
348                 \r
349         def waitForThread(self):\r
350                 self.server_thread.join()\r
351 \r
352         def stop(self):\r
353                 self.server.shutdown()\r
354                 self.waitForThread()\r
355 \r
356 \r
357 # print usage instructions\r
358 def usage():\r
359         print ('./gshub.py [-p port]')\r
360         \r
361         \r
362 \r
363 def main():\r
364         # process command-line arguments\r
365         try:\r
366                 opts, args = getopt.getopt(sys.argv[1:], "hp:v", ["help", "port="])\r
367         except getopt.GetoptError as err:\r
368                 # print help information and exit:\r
369                 print(str(err)) \r
370                 usage()\r
371                 sys.exit(2)\r
372 \r
373         port = 0\r
374         for o, a in opts:\r
375                 if o in ("-h", "--help"):\r
376                         usage()\r
377                         sys.exit(0)\r
378                 elif o in ("-p", "--port"):\r
379                         port = int(a)\r
380                 else:\r
381                         print ('Unknown command-line option ' + o)\r
382 \r
383         # start HTTP server to serve REST calls\r
384         server = SimpleHttpServer('127.0.0.1', port)\r
385 \r
386         # record HTTP server address in gshub.log\r
387         f = open('gshub.log', 'w')\r
388         f.write('127.0.0.1:' + str(server.server.server_port) + '\n')\r
389         f.close()\r
390                 \r
391         print ('GSHUB Running on port ' + str(server.server.server_port) + ' ...')\r
392         server.start()\r
393         server.waitForThread()\r
394 \r
395 \r
396 if __name__ == "__main__":\r
397         main()\r
398 \r