3 # ------------------------------------------------
4 # Copyright 2014 AT&T Intellectual Property
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # -------------------------------------------
18 # Implementation of GSHUB REST service
19 # for announcement and discovery of gs instances, sources and sinks
21 from http.server import BaseHTTPRequestHandler, HTTPServer
22 from socketserver import ThreadingMixIn
31 # lis of URLS for all the REST calls we will serve
32 DISCOVER_INSTANCE_URL = '/v1/discover-instance'
33 DISCOVER_INITINSTANCE_URL = '/v1/discover-initialized-instance'
34 DISCOVER_SOURCE_URL = '/v1/discover-source'
35 DISCOVER_SINK_URL = '/v1/discover-sink'
36 DISCOVER_STARTPROCESSING_URL = '/v1/discover-start-processing'
37 ANNOUNCE_INSTANCE_URL = '/v1/announce-instance'
38 ANNOUNCE_INITINSTANCE_URL = '/v1/announce-initialized-instance'
39 ANNOUNCE_SOURCE_URL = '/v1/announce-source'
40 ANNOUNCE_SINK_URL = '/v1/announce-sink'
41 ANNOUNCE_STARTPROCESSING_URL = '/v1/announce-start-processing'
42 ANNOUNCE_STREAM_SUBSCRIPTION = '/v1/announce-stream-subscription'
43 ANNOUNCE_FTA_INSTANCE = '/v1/announce-fta-instance'
44 ANNOUNCE_METRICS = '/v1/log-metrics'
46 # gs instance endpoints
49 # initialized gs instances
50 gs_init_instances = {}
52 # instances for which processing started
53 gs_startprocessing_instances = {}
62 # exctract endpoint information from json data
63 def extract_endpoint(data) :
69 doc = json.loads(str(data, 'utf-8'))
71 print ('Invalid json message ' + str(data, 'utf-8'))
74 for key in doc.keys() :
81 socket.inet_pton(socket.AF_INET, ip)
83 print ('Invalid IPV4 address ' + ip)
86 # validate port number
90 print ('Invalid port number ' + doc[key])
93 if name == '' or ip == '' or port == 0 :
94 print ('Name, ip or port is missing from json message ' + str(doc))
98 return [name, ip, port]
101 # extract instance name from json data
102 def extract_instance_name(data) :
106 doc = json.loads(str(data, 'utf-8'))
108 print ('Invalid json message ' + str(data, "utf-8"))
111 for key in doc.keys() :
116 print ('Name field is missing in json message ' + str(doc))
117 elif (name in gs_instances) == False:
118 print ('Attempt to announce the initialization or start of processing for unknown instance ' + name)
123 # handler for HTTP requests. We will override do_PORT and do_GET of BaseHTTPRequestHandler
124 class HTTPRequestHandler(BaseHTTPRequestHandler):
127 if re.search(ANNOUNCE_INSTANCE_URL, self.path) != None:
128 if self.headers.get_content_type() == 'application/json' :
129 # Find content length
131 for i in range(len(self.headers.keys())):
132 if self.headers.keys()[i] == 'Content-Length' :
133 content_len = int (self.headers.values()[i])
135 if content_len != 0 :
136 # extract endpoint information
137 endpoint = extract_endpoint(self.rfile.read(content_len))
139 self.send_response(400)
141 self.send_response(200)
142 gs_instances[endpoint[0]] = [endpoint[1], endpoint[2]]
144 self.send_response(400)
147 self.send_response(400)
150 elif re.search(ANNOUNCE_INITINSTANCE_URL, self.path) != None:
151 if self.headers.get_content_type() == 'application/json' :
152 # Find content length
154 for i in range(len(self.headers.keys())):
155 if self.headers.keys()[i] == 'Content-Length' :
156 content_len = int (self.headers.values()[i])
158 if content_len != 0 :
159 # extract name of initialized gs instance
160 name = extract_instance_name(self.rfile.read(content_len))
162 self.send_response(400)
164 self.send_response(200)
165 gs_init_instances[name] = 1
167 self.send_response(400)
170 self.send_response(400)
173 elif re.search(ANNOUNCE_SOURCE_URL, self.path) != None:
174 if self.headers.get_content_type() == 'application/json' :
175 # Find content length
177 for i in range(len(self.headers.keys())):
178 if self.headers.keys()[i] == 'Content-Length' :
179 content_len = int (self.headers.values()[i])
181 if content_len != 0 :
182 # extract endpoint information
183 endpoint = extract_endpoint(self.rfile.read(content_len))
185 self.send_response(400)
187 self.send_response(200)
188 sources[endpoint[0]] = [endpoint[1], endpoint[2]]
190 self.send_response(400)
193 self.send_response(400)
196 elif re.search(ANNOUNCE_SINK_URL, self.path) != None:
197 if self.headers.get_content_type() == 'application/json' :
198 # Find content length
200 for i in range(len(self.headers.keys())):
201 if self.headers.keys()[i] == 'Content-Length' :
202 content_len = int (self.headers.values()[i])
204 if content_len != 0 :
205 # extract endpoint information
206 endpoint = extract_endpoint(self.rfile.read(content_len))
208 self.send_response(400)
210 self.send_response(200)
211 sinks[endpoint[0]] = [endpoint[1], endpoint[2]]
213 self.send_response(400)
216 self.send_response(400)
219 elif re.search(ANNOUNCE_STARTPROCESSING_URL, self.path) != None:
220 if self.headers.get_content_type() == 'application/json' :
221 # Find content length
223 for i in range(len(self.headers.keys())):
224 if self.headers.keys()[i] == 'Content-Length' :
225 content_len = int (self.headers.values()[i])
227 if content_len != 0 :
228 # extract name of initialized gs instance
229 name = extract_instance_name(self.rfile.read(content_len))
231 self.send_response(400)
233 self.send_response(200)
234 gs_startprocessing_instances[name] = 1
236 self.send_response(400)
239 self.send_response(400)
242 # we do not do any processing for ANNOUNCE_STREAM_SUBSCRIPTION, ANNOUNCE_FTA_INSTANCE and ANNOUNCE_METRICS in gshub simulator
243 elif (re.search(ANNOUNCE_STREAM_SUBSCRIPTION, self.path) != None) or (re.search(ANNOUNCE_FTA_INSTANCE, self.path) != None) or (re.search(ANNOUNCE_METRICS, self.path) != None):
244 if self.headers.get_content_type() == 'application/json' :
245 # Find content length
247 for i in range(len(self.headers.keys())):
248 if self.headers.keys()[i] == 'Content-Length' :
249 content_len = int (self.headers.values()[i])
251 if content_len != 0 :
252 self.send_response(200)
254 self.send_response(400)
257 self.send_response(400)
261 self.send_response(404)
266 if re.search(DISCOVER_INSTANCE_URL + '/*', self.path) != None:
267 instance = self.path.split('/')[-1]
268 # check if this instance is registered
269 if instance in gs_instances :
270 self.send_response(200)
271 self.send_header('Content-Type', 'application/json')
273 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}", "utf-8"))
275 self.send_response(400)
279 elif re.search(DISCOVER_INITINSTANCE_URL + '/*', self.path) != None:
280 instance = self.path.split('/')[-1]
281 # check if this instance is initialized
282 if instance in gs_init_instances :
283 self.send_response(200)
284 self.send_header('Content-Type', 'application/json')
286 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}", "utf-8"))
288 self.send_response(400)
291 elif re.search(DISCOVER_SOURCE_URL + '/*', self.path) != None:
292 source = self.path.split('/')[-1]
293 # check if it is a registered source
294 if source in sources :
295 self.send_response(200)
296 self.send_header('Content-Type', 'application/json')
298 self.wfile.write(bytes("{\"ip\" : \"" + sources[source][0] + "\", \"port\": " + str(sources[source][1]) + "}", "utf-8"))
300 self.send_response(400)
303 elif re.search(DISCOVER_SINK_URL + '/*', self.path) != None:
304 sink = self.path.split('/')[-1]
305 # check if it is a registered sink
307 self.send_response(200)
308 self.send_header('Content-Type', 'application/json')
310 self.wfile.write(bytes("{\"ip\" : \"" + sinks[sink][0] + "\", \"port\": " + str(sinks[sink][1]) + "}", "utf-8"))
312 self.send_response(400)
315 elif re.search(DISCOVER_STARTPROCESSING_URL + '/*', self.path) != None:
316 instance = self.path.split('/')[-1]
317 # check if this instance is initialized
318 if instance in gs_startprocessing_instances :
319 self.send_response(200)
320 self.send_header('Content-Type', 'application/json')
322 self.wfile.write(bytes("{}", "utf-8"))
324 self.send_response(400)
327 self.send_response(404)
332 # we will use standard python threaded HTTP server
333 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
334 allow_reuse_address = True
338 HTTPServer.shutdown(self)
340 class SimpleHttpServer:
341 def __init__(self, ip, port):
342 self.server = ThreadedHTTPServer((ip,port), HTTPRequestHandler)
345 self.server_thread = threading.Thread(target=self.server.serve_forever)
346 self.server_thread.daemon = True
347 self.server_thread.start()
349 def waitForThread(self):
350 self.server_thread.join()
353 self.server.shutdown()
357 # print usage instructions
359 print ('./gshub.py [-p port]')
364 # process command-line arguments
366 opts, args = getopt.getopt(sys.argv[1:], "hp:v", ["help", "port="])
367 except getopt.GetoptError as err:
368 # print help information and exit:
375 if o in ("-h", "--help"):
378 elif o in ("-p", "--port"):
381 print ('Unknown command-line option ' + o)
383 # start HTTP server to serve REST calls
384 server = SimpleHttpServer('127.0.0.1', port)
386 # record HTTP server address in gshub.log
387 f = open('gshub.log', 'w')
388 f.write('127.0.0.1:' + str(server.server.server_port) + '\n')
391 print ('GSHUB Running on port ' + str(server.server.server_port) + ' ...')
393 server.waitForThread()
396 if __name__ == "__main__":