Improvements to aggregation code and fucntion library
[com/gs-lite.git] / bin / gshub.py
1 #!/usr/bin/python
2 # ------------------------------------------------
3 #   Copyright 2014 AT&T Intellectual Property
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #   Unless required by applicable law or agreed to in writing, software
11 #   distributed under the License is distributed on an "AS IS" BASIS,
12 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #   See the License for the specific language governing permissions and
14 #   limitations under the License.
15 # -------------------------------------------
16
17 # Implementation of GSHUB REST service
18 # for announcement and discovery of gs instances, sources and sinks
19
20 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
21 import SocketServer
22 import json
23 import cgi
24 import threading
25 import getopt
26 import sys
27 import re
28 import cgi
29 import socket
30 import json
31
32 # lis of URLS for all the REST calls we will serve
33 DISCOVER_INSTANCE_URL = '/v1/discover-instance'
34 DISCOVER_INITINSTANCE_URL = '/v1/discover-initialized-instance'
35 DISCOVER_SOURCE_URL =  '/v1/discover-source'
36 DISCOVER_SINK_URL =  '/v1/discover-sink'
37 DISCOVER_STARTPROCESSING_URL = '/v1/discover-start-processing'
38 ANNOUNCE_INSTANCE_URL =  '/v1/announce-instance'
39 ANNOUNCE_INITINSTANCE_URL =  '/v1/announce-initialized-instance'
40 ANNOUNCE_SOURCE_URL =  '/v1/announce-source'
41 ANNOUNCE_SINK_URL =  '/v1/announce-sink'
42 ANNOUNCE_STARTPROCESSING_URL = '/v1/announce-start-processing'
43 ANNOUNCE_STREAM_SUBSCRIPTION = '/v1/announce-stream-subscription'
44 ANNOUNCE_FTA_INSTANCE = '/v1/announce-fta-instance'
45 ANNOUNCE_METRICS = '/v1/log-metrics'
46
47 # gs instance endpoints
48 gs_instances = {}
49
50 # initialized gs instances
51 gs_init_instances = {}
52
53 # instances for which processing started
54 gs_startprocessing_instances = {}
55
56 # source endpoints
57 sources = {}
58
59 # sink endpoints
60 sinks = {}
61
62
63 # exctract endpoint information from json data
64 def extract_endpoint(data) :
65         name = ''
66         ip = ''
67         port = 0
68                 
69         try :
70                 doc = json.loads(str(data))     
71         except :
72                 print ('Invalid json message ' + str(data))
73                 return []
74         
75         for key in doc.keys() :
76                 if key == 'name' :
77                         name = doc[key]
78                 elif key == 'ip' :
79                         ip = doc[key]
80                         # validate ip address           
81                         try :
82                                 socket.inet_pton(socket.AF_INET, ip)
83                         except :
84                                 print ('Invalid IPV4 address ' + ip)
85                                 ip = ''
86                 elif key == 'port' :
87                         # validate port number
88                         try :
89                                 port = int(doc[key])
90                         except :
91                                 print ('Invalid port number ' + doc[key])
92                                 port = 0
93         
94         if name == '' or ip == '' or port == 0 :
95                 print ('Name, ip or port is missing from json message ' + str(doc))
96                 return []
97         
98
99         return [name, ip, port]
100
101
102 # extract instance name from json data
103 def extract_instance_name(data) :
104         name = ''
105         
106         try :
107                 doc = json.loads(str(data))     
108         except :
109                 print ('Invalid json message ' + str(data))
110                 return ''
111         
112         for key in doc.keys() :
113                 if key == 'name' :
114                         name = doc[key]
115         
116         if name == '' :
117                 print ('Name field is missing in json message ' + str(doc))
118         elif (name in gs_instances) == False:
119                 print ('Attempt to announce the initialization or start of processing for unknown instance ' + name)
120                 name = ''
121                 
122         return name
123         
124 # handler for HTTP requests. We will override do_POST and do_GET of BaseHTTPRequestHandler
125 class Server(BaseHTTPRequestHandler) :
126
127         def do_POST(self):
128                 if re.search(ANNOUNCE_INSTANCE_URL, self.path) != None:         
129                         print self.path
130                         print self.headers
131                         if self.headers.getheader('content-type') == 'application/json' :
132                                 # Find content length
133                                 content_len = 0
134                                 for i in range(len(self.headers.keys())):
135                                         if self.headers.keys()[i] == 'content-length' :
136                                                 content_len = int (self.headers.values()[i])
137                                                 break
138                                 if content_len != 0 :
139                                         # extract endpoint information
140                                         endpoint = extract_endpoint(self.rfile.read(content_len))
141                                         if endpoint == [] :
142                                                 self.send_response(400)
143                                         else :
144                                                 self.send_response(200)
145                                                 gs_instances[endpoint[0]] = [endpoint[1], endpoint[2]]
146                                 else :
147                                         self.send_response(400)
148
149                         else:
150                                 self.send_response(400)
151                         self.end_headers()              
152                         
153                 elif re.search(ANNOUNCE_INITINSTANCE_URL, self.path) != None:
154                         if self.headers.getheader('content-type') == 'application/json' :
155                                 # Find content length
156                                 content_len = 0
157                                 for i in range(len(self.headers.keys())):
158                                         if self.headers.keys()[i] == 'content-length' :
159                                                 content_len = int (self.headers.values()[i])
160                                                 break
161                                 if content_len != 0 :
162                                         # extract name of initialized gs instance                               
163                                         name = extract_instance_name(self.rfile.read(content_len))
164                                         if name == '' :
165                                                 self.send_response(400)
166                                         else :
167                                                 self.send_response(200)
168                                                 gs_init_instances[name] = 1
169                                 else :
170                                         self.send_response(400)
171
172                         else:
173                                 self.send_response(400)
174                         self.end_headers()      
175                         
176                 elif re.search(ANNOUNCE_SOURCE_URL, self.path) != None:
177                         if self.headers.getheader('content-type') == 'application/json' :
178                                 # Find content length
179                                 content_len = 0
180                                 for i in range(len(self.headers.keys())):
181                                         if self.headers.keys()[i] == 'content-length' :
182                                                 content_len = int (self.headers.values()[i])
183                                                 break
184                                 if content_len != 0 :
185                                         # extract endpoint information                          
186                                         endpoint = extract_endpoint(self.rfile.read(content_len))
187                                         if endpoint == [] :
188                                                 self.send_response(400)
189                                         else :
190                                                 self.send_response(200)
191                                                 sources[endpoint[0]] = [endpoint[1], endpoint[2]]
192                                 else :
193                                         self.send_response(400)
194
195                         else:
196                                 self.send_response(400)
197                         self.end_headers()
198                         
199                 elif re.search(ANNOUNCE_SINK_URL, self.path) != None:
200                         if self.headers.getheader('content-type') == 'application/json' :
201                                 # Find content length
202                                 content_len = 0
203                                 for i in range(len(self.headers.keys())):
204                                         if self.headers.keys()[i] == 'content-length' :
205                                                 content_len = int (self.headers.values()[i])
206                                                 break
207                                 if content_len != 0 :
208                                         # extract endpoint information                          
209                                         endpoint = extract_endpoint(self.rfile.read(content_len))
210                                         if endpoint == [] :
211                                                 self.send_response(400)
212                                         else :
213                                                 self.send_response(200)
214                                                 sinks[endpoint[0]] = [endpoint[1], endpoint[2]]
215                                 else :
216                                         self.send_response(400)
217
218                         else:
219                                 self.send_response(400)
220                         self.end_headers()      
221                         
222                 elif re.search(ANNOUNCE_STARTPROCESSING_URL, self.path) != None:
223                         if self.headers.getheader('content-type') == 'application/json' :
224                                 # Find content length
225                                 content_len = 0
226                                 for i in range(len(self.headers.keys())):
227                                         if self.headers.keys()[i] == 'content-length' :
228                                                 content_len = int (self.headers.values()[i])
229                                                 break
230                                 if content_len != 0 :
231                                         # extract name of initialized gs instance                               
232                                         name = extract_instance_name(self.rfile.read(content_len))
233                                         if name == '' :
234                                                 self.send_response(400)
235                                         else :
236                                                 self.send_response(200)
237                                                 gs_startprocessing_instances[name] = 1
238                                 else :
239                                         self.send_response(400)
240
241                         else:
242                                 self.send_response(400)
243                         self.end_headers()
244                         
245                 # we do not do any processing for ANNOUNCE_STREAM_SUBSCRIPTION, ANNOUNCE_FTA_INSTANCE and ANNOUNCE_METRICS in gshub simulator           
246                 elif (re.search(ANNOUNCE_STREAM_SUBSCRIPTION, self.path) != None) or (re.search(ANNOUNCE_FTA_INSTANCE, self.path) != None) or (re.search(ANNOUNCE_METRICS, self.path) != None):
247                         if self.headers.getheader('content-type') == 'application/json' :
248                                 # Find content length
249                                 content_len = 0
250                                 for i in range(len(self.headers.keys())):
251                                         if self.headers.keys()[i] == 'content-length' :
252                                                 content_len = int (self.headers.values()[i])
253                                                 break
254                                 if content_len != 0 :                           
255                                         self.send_response(200)
256                                 else :
257                                         self.send_response(400)
258
259                         else:
260                                 self.send_response(400)
261                         self.end_headers()      
262                         
263                 else:
264                         self.send_response(404)
265                         self.end_headers()
266                 return
267                 
268         def do_GET(self):
269                 if re.search(DISCOVER_INSTANCE_URL + '/*', self.path) != None:
270                         instance = self.path.split('/')[-1]
271                         # check if this instance is registered
272                         if instance in gs_instances :
273                                 self.send_response(200)
274                                 self.send_header('Content-Type', 'application/json')
275                                 self.end_headers()
276                                 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}"))
277                         else:
278                                 self.send_response(400)
279                                 self.end_headers()
280
281
282                 elif re.search(DISCOVER_INITINSTANCE_URL + '/*', self.path) != None:
283                         instance = self.path.split('/')[-1]
284                         # check if this instance is initialized
285                         if instance in gs_init_instances :
286                                 self.send_response(200)
287                                 self.send_header('Content-Type', 'application/json')
288                                 self.end_headers()
289                                 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}"))
290                         else:
291                                 self.send_response(400)
292                                 self.end_headers()
293
294                 elif re.search(DISCOVER_SOURCE_URL + '/*', self.path) != None:
295                         source = self.path.split('/')[-1]
296                         # check if it is a registered source
297                         if source in sources :
298                                 self.send_response(200)
299                                 self.send_header('Content-Type', 'application/json')
300                                 self.end_headers()
301                                 self.wfile.write(bytes("{\"ip\" : \"" + sources[source][0] + "\", \"port\": " + str(sources[source][1]) + "}"))
302                         else:
303                                 self.send_response(400)
304                                 self.end_headers()
305                                 
306                 elif re.search(DISCOVER_SINK_URL + '/*', self.path) != None:
307                         sink = self.path.split('/')[-1]
308                         # check if it is a registered sink
309                         if sink in sinks :
310                                 self.send_response(200)
311                                 self.send_header('Content-Type', 'application/json')
312                                 self.end_headers()
313                                 self.wfile.write(bytes("{\"ip\" : \"" + sinks[sink][0] + "\", \"port\": " + str(sinks[sink][1]) + "}"))
314                         else:
315                                 self.send_response(400)
316                                 self.end_headers()
317                                 
318                 elif re.search(DISCOVER_STARTPROCESSING_URL + '/*', self.path) != None:
319                         instance = self.path.split('/')[-1]
320                         # check if this instance is initialized
321                         if instance in gs_startprocessing_instances :
322                                 self.send_response(200)
323                                 self.send_header('Content-Type', 'application/json')
324                                 self.end_headers()
325                                 self.wfile.write(bytes("{}"))
326                         else:
327                                 self.send_response(400)
328                                 self.end_headers()                              
329                 else:
330                         self.send_response(404)
331                         self.end_headers()
332                 return
333
334
335 # print usage instructions
336 def usage():
337         print ('./gshub.py [-p port]')
338         
339         
340
341 def main():
342         # process command-line arguments
343         try:
344                 opts, args = getopt.getopt(sys.argv[1:], "hp:v", ["help", "port="])
345         except getopt.GetoptError as err:
346                 # print help information and exit:
347                 print(str(err)) 
348                 usage()
349                 sys.exit(2)
350
351         port = 0
352         for o, a in opts:
353                 if o in ("-h", "--help"):
354                         usage()
355                         sys.exit(0)
356                 elif o in ("-p", "--port"):
357                         port = int(a)
358                 else:
359                         print ('Unknown command-line option ' + o)
360
361         # start HTTP server to serve REST calls
362         server_address = ('127.0.0.1', port)
363         httpd = HTTPServer(server_address, Server)
364
365         # record HTTP server address in gshub.log
366         f = open('gshub.log', 'w')
367         f.write(str(httpd.server_address[0]) + ':' + str(httpd.server_address[1]) + '\n')
368         f.close()
369                 
370         print ('GSHUB Running on port ' + str(httpd.server_address[1]) + ' ...')
371
372         httpd.serve_forever()
373
374         
375 if __name__ == "__main__":
376         main()