Add new udafs and RMR support to gsprintconsole_ves
[com/gs-lite.git] / bin / gshub.py
1 #!/usr/bin/python
2 # ------------------------------------------------
3 #   Copyright 2014 AT&T Intellectual Property
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #   Unless required by applicable law or agreed to in writing, software
11 #   distributed under the License is distributed on an "AS IS" BASIS,
12 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #   See the License for the specific language governing permissions and
14 #   limitations under the License.
15 # -------------------------------------------
16
17 # Implementation of GSHUB REST service
18 # for announcement and discovery of gs instances, sources and sinks
19
20 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
21 from SocketServer import ThreadingMixIn
22 import SocketServer
23 import json
24 import cgi
25 import threading
26 import getopt
27 import sys
28 import re
29 import cgi
30 import socket
31 import json
32
33 # lis of URLS for all the REST calls we will serve
34 DISCOVER_INSTANCE_URL = '/v1/discover-instance'
35 DISCOVER_INITINSTANCE_URL = '/v1/discover-initialized-instance'
36 DISCOVER_SOURCE_URL =  '/v1/discover-source'
37 DISCOVER_SINK_URL =  '/v1/discover-sink'
38 DISCOVER_STARTPROCESSING_URL = '/v1/discover-start-processing'
39 ANNOUNCE_INSTANCE_URL =  '/v1/announce-instance'
40 ANNOUNCE_INITINSTANCE_URL =  '/v1/announce-initialized-instance'
41 ANNOUNCE_SOURCE_URL =  '/v1/announce-source'
42 ANNOUNCE_SINK_URL =  '/v1/announce-sink'
43 ANNOUNCE_STARTPROCESSING_URL = '/v1/announce-start-processing'
44 ANNOUNCE_STREAM_SUBSCRIPTION = '/v1/announce-stream-subscription'
45 ANNOUNCE_FTA_INSTANCE = '/v1/announce-fta-instance'
46 ANNOUNCE_METRICS = '/v1/log-metrics'
47
48 # gs instance endpoints
49 gs_instances = {}
50
51 # initialized gs instances
52 gs_init_instances = {}
53
54 # instances for which processing started
55 gs_startprocessing_instances = {}
56
57 # source endpoints
58 sources = {}
59
60 # sink endpoints
61 sinks = {}
62
63
64 # exctract endpoint information from json data
65 def extract_endpoint(data) :
66         name = ''
67         ip = ''
68         port = 0
69                 
70         try :
71                 doc = json.loads(str(data))     
72         except :
73                 print ('Invalid json message ' + str(data))
74                 return []
75         
76         for key in doc.keys() :
77                 if key == 'name' :
78                         name = doc[key]
79                 elif key == 'ip' :
80                         ip = doc[key]
81                         # validate ip address           
82                         try :
83                                 socket.inet_pton(socket.AF_INET, ip)
84                         except :
85                                 print ('Invalid IPV4 address ' + ip)
86                                 ip = ''
87                 elif key == 'port' :
88                         # validate port number
89                         try :
90                                 port = int(doc[key])
91                         except :
92                                 print ('Invalid port number ' + doc[key])
93                                 port = 0
94         
95         if name == '' or ip == '' or port == 0 :
96                 print ('Name, ip or port is missing from json message ' + str(doc))
97                 return []
98         
99
100         return [name, ip, port]
101
102
103 # extract instance name from json data
104 def extract_instance_name(data) :
105         name = ''
106         
107         try :
108                 doc = json.loads(str(data))     
109         except :
110                 print ('Invalid json message ' + str(data))
111                 return ''
112         
113         for key in doc.keys() :
114                 if key == 'name' :
115                         name = doc[key]
116         
117         if name == '' :
118                 print ('Name field is missing in json message ' + str(doc))
119         elif (name in gs_instances) == False:
120                 print ('Attempt to announce the initialization or start of processing for unknown instance ' + name)
121                 name = ''
122                 
123         return name
124         
125 # handler for HTTP requests. We will override do_POST and do_GET of BaseHTTPRequestHandler
126 class Server(BaseHTTPRequestHandler) :
127
128         def do_POST(self):
129                 if re.search(ANNOUNCE_INSTANCE_URL, self.path) != None:         
130                         print self.path
131                         print self.headers
132                         if self.headers.getheader('content-type') == 'application/json' :
133                                 # Find content length
134                                 content_len = 0
135                                 for i in range(len(self.headers.keys())):
136                                         if self.headers.keys()[i] == 'content-length' :
137                                                 content_len = int (self.headers.values()[i])
138                                                 break
139                                 if content_len != 0 :
140                                         # extract endpoint information
141                                         endpoint = extract_endpoint(self.rfile.read(content_len))
142                                         if endpoint == [] :
143                                                 self.send_response(400)
144                                         else :
145                                                 self.send_response(200)
146                                                 gs_instances[endpoint[0]] = [endpoint[1], endpoint[2]]
147                                 else :
148                                         self.send_response(400)
149
150                         else:
151                                 self.send_response(400)
152                         self.end_headers()              
153                         
154                 elif re.search(ANNOUNCE_INITINSTANCE_URL, self.path) != None:
155                         if self.headers.getheader('content-type') == 'application/json' :
156                                 # Find content length
157                                 content_len = 0
158                                 for i in range(len(self.headers.keys())):
159                                         if self.headers.keys()[i] == 'content-length' :
160                                                 content_len = int (self.headers.values()[i])
161                                                 break
162                                 if content_len != 0 :
163                                         # extract name of initialized gs instance                               
164                                         name = extract_instance_name(self.rfile.read(content_len))
165                                         if name == '' :
166                                                 self.send_response(400)
167                                         else :
168                                                 self.send_response(200)
169                                                 gs_init_instances[name] = 1
170                                 else :
171                                         self.send_response(400)
172
173                         else:
174                                 self.send_response(400)
175                         self.end_headers()      
176                         
177                 elif re.search(ANNOUNCE_SOURCE_URL, self.path) != None:
178                         if self.headers.getheader('content-type') == 'application/json' :
179                                 # Find content length
180                                 content_len = 0
181                                 for i in range(len(self.headers.keys())):
182                                         if self.headers.keys()[i] == 'content-length' :
183                                                 content_len = int (self.headers.values()[i])
184                                                 break
185                                 if content_len != 0 :
186                                         # extract endpoint information                          
187                                         endpoint = extract_endpoint(self.rfile.read(content_len))
188                                         if endpoint == [] :
189                                                 self.send_response(400)
190                                         else :
191                                                 self.send_response(200)
192                                                 sources[endpoint[0]] = [endpoint[1], endpoint[2]]
193                                 else :
194                                         self.send_response(400)
195
196                         else:
197                                 self.send_response(400)
198                         self.end_headers()
199                         
200                 elif re.search(ANNOUNCE_SINK_URL, self.path) != None:
201                         if self.headers.getheader('content-type') == 'application/json' :
202                                 # Find content length
203                                 content_len = 0
204                                 for i in range(len(self.headers.keys())):
205                                         if self.headers.keys()[i] == 'content-length' :
206                                                 content_len = int (self.headers.values()[i])
207                                                 break
208                                 if content_len != 0 :
209                                         # extract endpoint information                          
210                                         endpoint = extract_endpoint(self.rfile.read(content_len))
211                                         if endpoint == [] :
212                                                 self.send_response(400)
213                                         else :
214                                                 self.send_response(200)
215                                                 sinks[endpoint[0]] = [endpoint[1], endpoint[2]]
216                                 else :
217                                         self.send_response(400)
218
219                         else:
220                                 self.send_response(400)
221                         self.end_headers()      
222                         
223                 elif re.search(ANNOUNCE_STARTPROCESSING_URL, self.path) != None:
224                         if self.headers.getheader('content-type') == 'application/json' :
225                                 # Find content length
226                                 content_len = 0
227                                 for i in range(len(self.headers.keys())):
228                                         if self.headers.keys()[i] == 'content-length' :
229                                                 content_len = int (self.headers.values()[i])
230                                                 break
231                                 if content_len != 0 :
232                                         # extract name of initialized gs instance                               
233                                         name = extract_instance_name(self.rfile.read(content_len))
234                                         if name == '' :
235                                                 self.send_response(400)
236                                         else :
237                                                 self.send_response(200)
238                                                 gs_startprocessing_instances[name] = 1
239                                 else :
240                                         self.send_response(400)
241
242                         else:
243                                 self.send_response(400)
244                         self.end_headers()
245                         
246                 # we do not do any processing for ANNOUNCE_STREAM_SUBSCRIPTION, ANNOUNCE_FTA_INSTANCE and ANNOUNCE_METRICS in gshub simulator           
247                 elif (re.search(ANNOUNCE_STREAM_SUBSCRIPTION, self.path) != None) or (re.search(ANNOUNCE_FTA_INSTANCE, self.path) != None) or (re.search(ANNOUNCE_METRICS, self.path) != None):
248                         if self.headers.getheader('content-type') == 'application/json' :
249                                 # Find content length
250                                 content_len = 0
251                                 for i in range(len(self.headers.keys())):
252                                         if self.headers.keys()[i] == 'content-length' :
253                                                 content_len = int (self.headers.values()[i])
254                                                 break
255                                 if content_len != 0 :                           
256                                         self.send_response(200)
257                                 else :
258                                         self.send_response(400)
259
260                         else:
261                                 self.send_response(400)
262                         self.end_headers()      
263                         
264                 else:
265                         self.send_response(404)
266                         self.end_headers()
267                 return
268                 
269         def do_GET(self):
270                 if re.search(DISCOVER_INSTANCE_URL + '/*', self.path) != None:
271                         instance = self.path.split('/')[-1]
272                         # check if this instance is registered
273                         if instance in gs_instances :
274                                 self.send_response(200)
275                                 self.send_header('Content-Type', 'application/json')
276                                 self.end_headers()
277                                 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}"))
278                         else:
279                                 self.send_response(400)
280                                 self.end_headers()
281
282
283                 elif re.search(DISCOVER_INITINSTANCE_URL + '/*', self.path) != None:
284                         instance = self.path.split('/')[-1]
285                         # check if this instance is initialized
286                         if instance in gs_init_instances :
287                                 self.send_response(200)
288                                 self.send_header('Content-Type', 'application/json')
289                                 self.end_headers()
290                                 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}"))
291                         else:
292                                 self.send_response(400)
293                                 self.end_headers()
294
295                 elif re.search(DISCOVER_SOURCE_URL + '/*', self.path) != None:
296                         source = self.path.split('/')[-1]
297                         # check if it is a registered source
298                         if source in sources :
299                                 self.send_response(200)
300                                 self.send_header('Content-Type', 'application/json')
301                                 self.end_headers()
302                                 self.wfile.write(bytes("{\"ip\" : \"" + sources[source][0] + "\", \"port\": " + str(sources[source][1]) + "}"))
303                         else:
304                                 self.send_response(400)
305                                 self.end_headers()
306                                 
307                 elif re.search(DISCOVER_SINK_URL + '/*', self.path) != None:
308                         sink = self.path.split('/')[-1]
309                         # check if it is a registered sink
310                         if sink in sinks :
311                                 self.send_response(200)
312                                 self.send_header('Content-Type', 'application/json')
313                                 self.end_headers()
314                                 self.wfile.write(bytes("{\"ip\" : \"" + sinks[sink][0] + "\", \"port\": " + str(sinks[sink][1]) + "}"))
315                         else:
316                                 self.send_response(400)
317                                 self.end_headers()
318                                 
319                 elif re.search(DISCOVER_STARTPROCESSING_URL + '/*', self.path) != None:
320                         instance = self.path.split('/')[-1]
321                         # check if this instance is initialized
322                         if instance in gs_startprocessing_instances :
323                                 self.send_response(200)
324                                 self.send_header('Content-Type', 'application/json')
325                                 self.end_headers()
326                                 self.wfile.write(bytes("{}"))
327                         else:
328                                 self.send_response(400)
329                                 self.end_headers()                              
330                 else:
331                         self.send_response(404)
332                         self.end_headers()
333                 return
334
335
336 # print usage instructions
337 def usage():
338         print ('./gshub.py [-p port]')
339         
340
341 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
342     """Handle requests in a separate thread.""" 
343
344 def main():
345         # process command-line arguments
346         try:
347                 opts, args = getopt.getopt(sys.argv[1:], "hp:v", ["help", "port="])
348         except getopt.GetoptError as err:
349                 # print help information and exit:
350                 print(str(err)) 
351                 usage()
352                 sys.exit(2)
353
354         port = 0
355         for o, a in opts:
356                 if o in ("-h", "--help"):
357                         usage()
358                         sys.exit(0)
359                 elif o in ("-p", "--port"):
360                         port = int(a)
361                 else:
362                         print ('Unknown command-line option ' + o)
363
364         # start HTTP server to serve REST calls
365         server_address = ('127.0.0.1', port)
366         httpd = ThreadedHTTPServer(server_address, Server)
367
368         # record HTTP server address in gshub.log
369         f = open('gshub.log', 'w')
370         f.write(str(httpd.server_address[0]) + ':' + str(httpd.server_address[1]) + '\n')
371         f.close()
372                 
373         print ('GSHUB Running on port ' + str(httpd.server_address[1]) + ' ...')
374
375         httpd.serve_forever()
376
377         
378 if __name__ == "__main__":
379         main()