2 # ------------------------------------------------
3 # Copyright 2014 AT&T Intellectual Property
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # -------------------------------------------
17 # Implementation of GSHUB REST service
18 # for announcement and discovery of gs instances, sources and sinks
20 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
21 from SocketServer import ThreadingMixIn
33 # lis of URLS for all the REST calls we will serve
34 DISCOVER_INSTANCE_URL = '/v1/discover-instance'
35 DISCOVER_INITINSTANCE_URL = '/v1/discover-initialized-instance'
36 DISCOVER_SOURCE_URL = '/v1/discover-source'
37 DISCOVER_SINK_URL = '/v1/discover-sink'
38 DISCOVER_STARTPROCESSING_URL = '/v1/discover-start-processing'
39 ANNOUNCE_INSTANCE_URL = '/v1/announce-instance'
40 ANNOUNCE_INITINSTANCE_URL = '/v1/announce-initialized-instance'
41 ANNOUNCE_SOURCE_URL = '/v1/announce-source'
42 ANNOUNCE_SINK_URL = '/v1/announce-sink'
43 ANNOUNCE_STARTPROCESSING_URL = '/v1/announce-start-processing'
44 ANNOUNCE_STREAM_SUBSCRIPTION = '/v1/announce-stream-subscription'
45 ANNOUNCE_FTA_INSTANCE = '/v1/announce-fta-instance'
46 ANNOUNCE_METRICS = '/v1/log-metrics'
48 # gs instance endpoints
51 # initialized gs instances
52 gs_init_instances = {}
54 # instances for which processing started
55 gs_startprocessing_instances = {}
64 # exctract endpoint information from json data
65 def extract_endpoint(data) :
71 doc = json.loads(str(data))
73 print ('Invalid json message ' + str(data))
76 for key in doc.keys() :
83 socket.inet_pton(socket.AF_INET, ip)
85 print ('Invalid IPV4 address ' + ip)
88 # validate port number
92 print ('Invalid port number ' + doc[key])
95 if name == '' or ip == '' or port == 0 :
96 print ('Name, ip or port is missing from json message ' + str(doc))
100 return [name, ip, port]
103 # extract instance name from json data
104 def extract_instance_name(data) :
108 doc = json.loads(str(data))
110 print ('Invalid json message ' + str(data))
113 for key in doc.keys() :
118 print ('Name field is missing in json message ' + str(doc))
119 elif (name in gs_instances) == False:
120 print ('Attempt to announce the initialization or start of processing for unknown instance ' + name)
125 # handler for HTTP requests. We will override do_POST and do_GET of BaseHTTPRequestHandler
126 class Server(BaseHTTPRequestHandler) :
129 if re.search(ANNOUNCE_INSTANCE_URL, self.path) != None:
132 if self.headers.getheader('content-type') == 'application/json' :
133 # Find content length
135 for i in range(len(self.headers.keys())):
136 if self.headers.keys()[i] == 'content-length' :
137 content_len = int (self.headers.values()[i])
139 if content_len != 0 :
140 # extract endpoint information
141 endpoint = extract_endpoint(self.rfile.read(content_len))
143 self.send_response(400)
145 self.send_response(200)
146 gs_instances[endpoint[0]] = [endpoint[1], endpoint[2]]
148 self.send_response(400)
151 self.send_response(400)
154 elif re.search(ANNOUNCE_INITINSTANCE_URL, self.path) != None:
155 if self.headers.getheader('content-type') == 'application/json' :
156 # Find content length
158 for i in range(len(self.headers.keys())):
159 if self.headers.keys()[i] == 'content-length' :
160 content_len = int (self.headers.values()[i])
162 if content_len != 0 :
163 # extract name of initialized gs instance
164 name = extract_instance_name(self.rfile.read(content_len))
166 self.send_response(400)
168 self.send_response(200)
169 gs_init_instances[name] = 1
171 self.send_response(400)
174 self.send_response(400)
177 elif re.search(ANNOUNCE_SOURCE_URL, self.path) != None:
178 if self.headers.getheader('content-type') == 'application/json' :
179 # Find content length
181 for i in range(len(self.headers.keys())):
182 if self.headers.keys()[i] == 'content-length' :
183 content_len = int (self.headers.values()[i])
185 if content_len != 0 :
186 # extract endpoint information
187 endpoint = extract_endpoint(self.rfile.read(content_len))
189 self.send_response(400)
191 self.send_response(200)
192 sources[endpoint[0]] = [endpoint[1], endpoint[2]]
194 self.send_response(400)
197 self.send_response(400)
200 elif re.search(ANNOUNCE_SINK_URL, self.path) != None:
201 if self.headers.getheader('content-type') == 'application/json' :
202 # Find content length
204 for i in range(len(self.headers.keys())):
205 if self.headers.keys()[i] == 'content-length' :
206 content_len = int (self.headers.values()[i])
208 if content_len != 0 :
209 # extract endpoint information
210 endpoint = extract_endpoint(self.rfile.read(content_len))
212 self.send_response(400)
214 self.send_response(200)
215 sinks[endpoint[0]] = [endpoint[1], endpoint[2]]
217 self.send_response(400)
220 self.send_response(400)
223 elif re.search(ANNOUNCE_STARTPROCESSING_URL, self.path) != None:
224 if self.headers.getheader('content-type') == 'application/json' :
225 # Find content length
227 for i in range(len(self.headers.keys())):
228 if self.headers.keys()[i] == 'content-length' :
229 content_len = int (self.headers.values()[i])
231 if content_len != 0 :
232 # extract name of initialized gs instance
233 name = extract_instance_name(self.rfile.read(content_len))
235 self.send_response(400)
237 self.send_response(200)
238 gs_startprocessing_instances[name] = 1
240 self.send_response(400)
243 self.send_response(400)
246 # we do not do any processing for ANNOUNCE_STREAM_SUBSCRIPTION, ANNOUNCE_FTA_INSTANCE and ANNOUNCE_METRICS in gshub simulator
247 elif (re.search(ANNOUNCE_STREAM_SUBSCRIPTION, self.path) != None) or (re.search(ANNOUNCE_FTA_INSTANCE, self.path) != None) or (re.search(ANNOUNCE_METRICS, self.path) != None):
248 if self.headers.getheader('content-type') == 'application/json' :
249 # Find content length
251 for i in range(len(self.headers.keys())):
252 if self.headers.keys()[i] == 'content-length' :
253 content_len = int (self.headers.values()[i])
255 if content_len != 0 :
256 self.send_response(200)
258 self.send_response(400)
261 self.send_response(400)
265 self.send_response(404)
270 if re.search(DISCOVER_INSTANCE_URL + '/*', self.path) != None:
271 instance = self.path.split('/')[-1]
272 # check if this instance is registered
273 if instance in gs_instances :
274 self.send_response(200)
275 self.send_header('Content-Type', 'application/json')
277 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}"))
279 self.send_response(400)
283 elif re.search(DISCOVER_INITINSTANCE_URL + '/*', self.path) != None:
284 instance = self.path.split('/')[-1]
285 # check if this instance is initialized
286 if instance in gs_init_instances :
287 self.send_response(200)
288 self.send_header('Content-Type', 'application/json')
290 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}"))
292 self.send_response(400)
295 elif re.search(DISCOVER_SOURCE_URL + '/*', self.path) != None:
296 source = self.path.split('/')[-1]
297 # check if it is a registered source
298 if source in sources :
299 self.send_response(200)
300 self.send_header('Content-Type', 'application/json')
302 self.wfile.write(bytes("{\"ip\" : \"" + sources[source][0] + "\", \"port\": " + str(sources[source][1]) + "}"))
304 self.send_response(400)
307 elif re.search(DISCOVER_SINK_URL + '/*', self.path) != None:
308 sink = self.path.split('/')[-1]
309 # check if it is a registered sink
311 self.send_response(200)
312 self.send_header('Content-Type', 'application/json')
314 self.wfile.write(bytes("{\"ip\" : \"" + sinks[sink][0] + "\", \"port\": " + str(sinks[sink][1]) + "}"))
316 self.send_response(400)
319 elif re.search(DISCOVER_STARTPROCESSING_URL + '/*', self.path) != None:
320 instance = self.path.split('/')[-1]
321 # check if this instance is initialized
322 if instance in gs_startprocessing_instances :
323 self.send_response(200)
324 self.send_header('Content-Type', 'application/json')
326 self.wfile.write(bytes("{}"))
328 self.send_response(400)
331 self.send_response(404)
336 # print usage instructions
338 print ('./gshub.py [-p port]')
341 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
342 """Handle requests in a separate thread."""
345 # process command-line arguments
347 opts, args = getopt.getopt(sys.argv[1:], "hp:v", ["help", "port="])
348 except getopt.GetoptError as err:
349 # print help information and exit:
356 if o in ("-h", "--help"):
359 elif o in ("-p", "--port"):
362 print ('Unknown command-line option ' + o)
364 # start HTTP server to serve REST calls
365 server_address = ('127.0.0.1', port)
366 httpd = ThreadedHTTPServer(server_address, Server)
368 # record HTTP server address in gshub.log
369 f = open('gshub.log', 'w')
370 f.write(str(httpd.server_address[0]) + ':' + str(httpd.server_address[1]) + '\n')
373 print ('GSHUB Running on port ' + str(httpd.server_address[1]) + ' ...')
375 httpd.serve_forever()
378 if __name__ == "__main__":