Add new udafs and RMR support to gsprintconsole_ves
[com/gs-lite.git] / bin / gshub3.py
1 #!/usr/bin/python3.6
2
3 # ------------------------------------------------
4 #   Copyright 2014 AT&T Intellectual Property
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #     http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # -------------------------------------------
17
18 # Implementation of GSHUB REST service
19 # for announcement and discovery of gs instances, sources and sinks
20
21 from http.server import BaseHTTPRequestHandler, HTTPServer
22 from socketserver import ThreadingMixIn
23 import threading
24 import getopt
25 import sys
26 import re
27 import cgi
28 import socket
29 import json
30
31 # lis of URLS for all the REST calls we will serve
32 DISCOVER_INSTANCE_URL = '/v1/discover-instance'
33 DISCOVER_INITINSTANCE_URL = '/v1/discover-initialized-instance'
34 DISCOVER_SOURCE_URL =  '/v1/discover-source'
35 DISCOVER_SINK_URL =  '/v1/discover-sink'
36 DISCOVER_STARTPROCESSING_URL = '/v1/discover-start-processing'
37 ANNOUNCE_INSTANCE_URL =  '/v1/announce-instance'
38 ANNOUNCE_INITINSTANCE_URL =  '/v1/announce-initialized-instance'
39 ANNOUNCE_SOURCE_URL =  '/v1/announce-source'
40 ANNOUNCE_SINK_URL =  '/v1/announce-sink'
41 ANNOUNCE_STARTPROCESSING_URL = '/v1/announce-start-processing'
42 ANNOUNCE_STREAM_SUBSCRIPTION = '/v1/announce-stream-subscription'
43 ANNOUNCE_FTA_INSTANCE = '/v1/announce-fta-instance'
44 ANNOUNCE_METRICS = '/v1/log-metrics'
45
46 # gs instance endpoints
47 gs_instances = {}
48
49 # initialized gs instances
50 gs_init_instances = {}
51
52 # instances for which processing started
53 gs_startprocessing_instances = {}
54
55 # source endpoints
56 sources = {}
57
58 # sink endpoints
59 sinks = {}
60
61
62 # exctract endpoint information from json data
63 def extract_endpoint(data) :
64         name = ''
65         ip = ''
66         port = 0
67                 
68         try :
69                 doc = json.loads(str(data, 'utf-8'))    
70         except :
71                 print ('Invalid json message ' + str(data, 'utf-8'))
72                 return []
73         
74         for key in doc.keys() :
75                 if key == 'name' :
76                         name = doc[key]
77                 elif key == 'ip' :
78                         ip = doc[key]
79                         # validate ip address           
80                         try :
81                                 socket.inet_pton(socket.AF_INET, ip)
82                         except :
83                                 print ('Invalid IPV4 address ' + ip)
84                                 ip = ''
85                 elif key == 'port' :
86                         # validate port number
87                         try :
88                                 port = int(doc[key])
89                         except :
90                                 print ('Invalid port number ' + doc[key])
91                                 port = 0
92         
93         if name == '' or ip == '' or port == 0 :
94                 print ('Name, ip or port is missing from json message ' + str(doc))
95                 return []
96         
97
98         return [name, ip, port]
99
100
101 # extract instance name from json data
102 def extract_instance_name(data) :
103         name = ''
104         
105         try :
106                 doc = json.loads(str(data, 'utf-8'))    
107         except :
108                 print ('Invalid json message ' + str(data, "utf-8"))
109                 return ''
110         
111         for key in doc.keys() :
112                 if key == 'name' :
113                         name = doc[key]
114         
115         if name == '' :
116                 print ('Name field is missing in json message ' + str(doc))
117         elif (name in gs_instances) == False:
118                 print ('Attempt to announce the initialization or start of processing for unknown instance ' + name)
119                 name = ''
120                 
121         return name
122         
123 # handler for HTTP requests. We will override do_PORT and do_GET of BaseHTTPRequestHandler
124 class HTTPRequestHandler(BaseHTTPRequestHandler):
125
126         def do_POST(self):
127                 if re.search(ANNOUNCE_INSTANCE_URL, self.path) != None:         
128                         if self.headers.get_content_type() == 'application/json' :
129                                 # Find content length
130                                 content_len = 0
131                                 for i in range(len(self.headers.keys())):
132                                         if self.headers.keys()[i] == 'Content-Length' :
133                                                 content_len = int (self.headers.values()[i])
134                                                 break
135                                 if content_len != 0 :
136                                         # extract endpoint information
137                                         endpoint = extract_endpoint(self.rfile.read(content_len))
138                                         if endpoint == [] :
139                                                 self.send_response(400)
140                                         else :
141                                                 self.send_response(200)
142                                                 gs_instances[endpoint[0]] = [endpoint[1], endpoint[2]]
143                                 else :
144                                         self.send_response(400)
145
146                         else:
147                                 self.send_response(400)
148                         self.end_headers()              
149                         
150                 elif re.search(ANNOUNCE_INITINSTANCE_URL, self.path) != None:
151                         if self.headers.get_content_type() == 'application/json' :
152                                 # Find content length
153                                 content_len = 0
154                                 for i in range(len(self.headers.keys())):
155                                         if self.headers.keys()[i] == 'Content-Length' :
156                                                 content_len = int (self.headers.values()[i])
157                                                 break
158                                 if content_len != 0 :
159                                         # extract name of initialized gs instance                               
160                                         name = extract_instance_name(self.rfile.read(content_len))
161                                         if name == '' :
162                                                 self.send_response(400)
163                                         else :
164                                                 self.send_response(200)
165                                                 gs_init_instances[name] = 1
166                                 else :
167                                         self.send_response(400)
168
169                         else:
170                                 self.send_response(400)
171                         self.end_headers()      
172                         
173                 elif re.search(ANNOUNCE_SOURCE_URL, self.path) != None:
174                         if self.headers.get_content_type() == 'application/json' :
175                                 # Find content length
176                                 content_len = 0
177                                 for i in range(len(self.headers.keys())):
178                                         if self.headers.keys()[i] == 'Content-Length' :
179                                                 content_len = int (self.headers.values()[i])
180                                                 break
181                                 if content_len != 0 :
182                                         # extract endpoint information                          
183                                         endpoint = extract_endpoint(self.rfile.read(content_len))
184                                         if endpoint == [] :
185                                                 self.send_response(400)
186                                         else :
187                                                 self.send_response(200)
188                                                 sources[endpoint[0]] = [endpoint[1], endpoint[2]]
189                                 else :
190                                         self.send_response(400)
191
192                         else:
193                                 self.send_response(400)
194                         self.end_headers()
195                         
196                 elif re.search(ANNOUNCE_SINK_URL, self.path) != None:
197                         if self.headers.get_content_type() == 'application/json' :
198                                 # Find content length
199                                 content_len = 0
200                                 for i in range(len(self.headers.keys())):
201                                         if self.headers.keys()[i] == 'Content-Length' :
202                                                 content_len = int (self.headers.values()[i])
203                                                 break
204                                 if content_len != 0 :
205                                         # extract endpoint information                          
206                                         endpoint = extract_endpoint(self.rfile.read(content_len))
207                                         if endpoint == [] :
208                                                 self.send_response(400)
209                                         else :
210                                                 self.send_response(200)
211                                                 sinks[endpoint[0]] = [endpoint[1], endpoint[2]]
212                                 else :
213                                         self.send_response(400)
214
215                         else:
216                                 self.send_response(400)
217                         self.end_headers()      
218                         
219                 elif re.search(ANNOUNCE_STARTPROCESSING_URL, self.path) != None:
220                         if self.headers.get_content_type() == 'application/json' :
221                                 # Find content length
222                                 content_len = 0
223                                 for i in range(len(self.headers.keys())):
224                                         if self.headers.keys()[i] == 'Content-Length' :
225                                                 content_len = int (self.headers.values()[i])
226                                                 break
227                                 if content_len != 0 :
228                                         # extract name of initialized gs instance                               
229                                         name = extract_instance_name(self.rfile.read(content_len))
230                                         if name == '' :
231                                                 self.send_response(400)
232                                         else :
233                                                 self.send_response(200)
234                                                 gs_startprocessing_instances[name] = 1
235                                 else :
236                                         self.send_response(400)
237
238                         else:
239                                 self.send_response(400)
240                         self.end_headers()
241                         
242                 # we do not do any processing for ANNOUNCE_STREAM_SUBSCRIPTION, ANNOUNCE_FTA_INSTANCE and ANNOUNCE_METRICS in gshub simulator           
243                 elif (re.search(ANNOUNCE_STREAM_SUBSCRIPTION, self.path) != None) or (re.search(ANNOUNCE_FTA_INSTANCE, self.path) != None) or (re.search(ANNOUNCE_METRICS, self.path) != None):
244                         if self.headers.get_content_type() == 'application/json' :
245                                 # Find content length
246                                 content_len = 0
247                                 for i in range(len(self.headers.keys())):
248                                         if self.headers.keys()[i] == 'Content-Length' :
249                                                 content_len = int (self.headers.values()[i])
250                                                 break
251                                 if content_len != 0 :                           
252                                         self.send_response(200)
253                                 else :
254                                         self.send_response(400)
255
256                         else:
257                                 self.send_response(400)
258                         self.end_headers()      
259                         
260                 else:
261                         self.send_response(404)
262                         self.end_headers()
263                 return
264
265         def do_GET(self):
266                 if re.search(DISCOVER_INSTANCE_URL + '/*', self.path) != None:
267                         instance = self.path.split('/')[-1]
268                         # check if this instance is registered
269                         if instance in gs_instances :
270                                 self.send_response(200)
271                                 self.send_header('Content-Type', 'application/json')
272                                 self.end_headers()
273                                 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}", "utf-8"))
274                         else:
275                                 self.send_response(400)
276                                 self.end_headers()
277
278
279                 elif re.search(DISCOVER_INITINSTANCE_URL + '/*', self.path) != None:
280                         instance = self.path.split('/')[-1]
281                         # check if this instance is initialized
282                         if instance in gs_init_instances :
283                                 self.send_response(200)
284                                 self.send_header('Content-Type', 'application/json')
285                                 self.end_headers()
286                                 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}", "utf-8"))
287                         else:
288                                 self.send_response(400)
289                                 self.end_headers()
290
291                 elif re.search(DISCOVER_SOURCE_URL + '/*', self.path) != None:
292                         source = self.path.split('/')[-1]
293                         # check if it is a registered source
294                         if source in sources :
295                                 self.send_response(200)
296                                 self.send_header('Content-Type', 'application/json')
297                                 self.end_headers()
298                                 self.wfile.write(bytes("{\"ip\" : \"" + sources[source][0] + "\", \"port\": " + str(sources[source][1]) + "}", "utf-8"))
299                         else:
300                                 self.send_response(400)
301                                 self.end_headers()
302                                 
303                 elif re.search(DISCOVER_SINK_URL + '/*', self.path) != None:
304                         sink = self.path.split('/')[-1]
305                         # check if it is a registered sink
306                         if sink in sinks :
307                                 self.send_response(200)
308                                 self.send_header('Content-Type', 'application/json')
309                                 self.end_headers()
310                                 self.wfile.write(bytes("{\"ip\" : \"" + sinks[sink][0] + "\", \"port\": " + str(sinks[sink][1]) + "}", "utf-8"))
311                         else:
312                                 self.send_response(400)
313                                 self.end_headers()
314                                 
315                 elif re.search(DISCOVER_STARTPROCESSING_URL + '/*', self.path) != None:
316                         instance = self.path.split('/')[-1]
317                         # check if this instance is initialized
318                         if instance in gs_startprocessing_instances :
319                                 self.send_response(200)
320                                 self.send_header('Content-Type', 'application/json')
321                                 self.end_headers()
322                                 self.wfile.write(bytes("{}", "utf-8"))
323                         else:
324                                 self.send_response(400)
325                                 self.end_headers()                              
326                 else:
327                         self.send_response(404)
328                         self.end_headers()
329                 return
330
331
332 # we will use standard python threaded HTTP server
333 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
334         allow_reuse_address = True
335
336         def shutdown(self):
337                 self.socket.close()
338                 HTTPServer.shutdown(self)
339
340 class SimpleHttpServer:
341         def __init__(self, ip, port):
342                 self.server = ThreadedHTTPServer((ip,port), HTTPRequestHandler)
343
344         def start(self):
345                 self.server_thread = threading.Thread(target=self.server.serve_forever)
346                 self.server_thread.daemon = True
347                 self.server_thread.start()
348                 
349         def waitForThread(self):
350                 self.server_thread.join()
351
352         def stop(self):
353                 self.server.shutdown()
354                 self.waitForThread()
355
356
357 # print usage instructions
358 def usage():
359         print ('./gshub.py [-p port]')
360         
361         
362
363 def main():
364         # process command-line arguments
365         try:
366                 opts, args = getopt.getopt(sys.argv[1:], "hp:v", ["help", "port="])
367         except getopt.GetoptError as err:
368                 # print help information and exit:
369                 print(str(err)) 
370                 usage()
371                 sys.exit(2)
372
373         port = 0
374         for o, a in opts:
375                 if o in ("-h", "--help"):
376                         usage()
377                         sys.exit(0)
378                 elif o in ("-p", "--port"):
379                         port = int(a)
380                 else:
381                         print ('Unknown command-line option ' + o)
382
383         # start HTTP server to serve REST calls
384         server = SimpleHttpServer('127.0.0.1', port)
385
386         # record HTTP server address in gshub.log
387         f = open('gshub.log', 'w')
388         f.write('127.0.0.1:' + str(server.server.server_port) + '\n')
389         f.close()
390                 
391         print ('GSHUB Running on port ' + str(server.server.server_port) + ' ...')
392         server.start()
393         server.waitForThread()
394
395
396 if __name__ == "__main__":
397         main()
398