3 # ------------------------------------------------
\r
4 # Copyright 2014 AT&T Intellectual Property
\r
5 # Licensed under the Apache License, Version 2.0 (the "License");
\r
6 # you may not use this file except in compliance with the License.
\r
7 # You may obtain a copy of the License at
\r
9 # http://www.apache.org/licenses/LICENSE-2.0
\r
11 # Unless required by applicable law or agreed to in writing, software
\r
12 # distributed under the License is distributed on an "AS IS" BASIS,
\r
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
14 # See the License for the specific language governing permissions and
\r
15 # limitations under the License.
\r
16 # -------------------------------------------
\r
18 # Implementation of GSHUB REST service
\r
19 # for announcement and discovery of gs instances, sources and sinks
\r
21 from http.server import BaseHTTPRequestHandler, HTTPServer
\r
22 from socketserver import ThreadingMixIn
\r
31 # lis of URLS for all the REST calls we will serve
\r
32 DISCOVER_INSTANCE_URL = '/v1/discover-instance'
\r
33 DISCOVER_INITINSTANCE_URL = '/v1/discover-initialized-instance'
\r
34 DISCOVER_SOURCE_URL = '/v1/discover-source'
\r
35 DISCOVER_SINK_URL = '/v1/discover-sink'
\r
36 DISCOVER_STARTPROCESSING_URL = '/v1/discover-start-processing'
\r
37 ANNOUNCE_INSTANCE_URL = '/v1/announce-instance'
\r
38 ANNOUNCE_INITINSTANCE_URL = '/v1/announce-initialized-instance'
\r
39 ANNOUNCE_SOURCE_URL = '/v1/announce-source'
\r
40 ANNOUNCE_SINK_URL = '/v1/announce-sink'
\r
41 ANNOUNCE_STARTPROCESSING_URL = '/v1/announce-start-processing'
\r
42 ANNOUNCE_STREAM_SUBSCRIPTION = '/v1/announce-stream-subscription'
\r
43 ANNOUNCE_FTA_INSTANCE = '/v1/announce-fta-instance'
\r
44 ANNOUNCE_METRICS = '/v1/log-metrics'
\r
46 # gs instance endpoints
\r
49 # initialized gs instances
\r
50 gs_init_instances = {}
\r
52 # instances for which processing started
\r
53 gs_startprocessing_instances = {}
\r
62 # exctract endpoint information from json data
\r
63 def extract_endpoint(data) :
\r
69 doc = json.loads(str(data, 'utf-8'))
\r
71 print ('Invalid json message ' + str(data, 'utf-8'))
\r
74 for key in doc.keys() :
\r
79 # validate ip address
\r
81 socket.inet_pton(socket.AF_INET, ip)
\r
83 print ('Invalid IPV4 address ' + ip)
\r
85 elif key == 'port' :
\r
86 # validate port number
\r
88 port = int(doc[key])
\r
90 print ('Invalid port number ' + doc[key])
\r
93 if name == '' or ip == '' or port == 0 :
\r
94 print ('Name, ip or port is missing from json message ' + str(doc))
\r
98 return [name, ip, port]
\r
101 # extract instance name from json data
\r
102 def extract_instance_name(data) :
\r
106 doc = json.loads(str(data, 'utf-8'))
\r
108 print ('Invalid json message ' + str(data, "utf-8"))
\r
111 for key in doc.keys() :
\r
116 print ('Name field is missing in json message ' + str(doc))
\r
117 elif (name in gs_instances) == False:
\r
118 print ('Attempt to announce the initialization or start of processing for unknown instance ' + name)
\r
123 # handler for HTTP requests. We will override do_PORT and do_GET of BaseHTTPRequestHandler
\r
124 class HTTPRequestHandler(BaseHTTPRequestHandler):
\r
127 if re.search(ANNOUNCE_INSTANCE_URL, self.path) != None:
\r
128 if self.headers.get_content_type() == 'application/json' :
\r
129 # Find content length
\r
131 for i in range(len(self.headers.keys())):
\r
132 if self.headers.keys()[i] == 'Content-Length' :
\r
133 content_len = int (self.headers.values()[i])
\r
135 if content_len != 0 :
\r
136 # extract endpoint information
\r
137 endpoint = extract_endpoint(self.rfile.read(content_len))
\r
138 if endpoint == [] :
\r
139 self.send_response(400)
\r
141 self.send_response(200)
\r
142 gs_instances[endpoint[0]] = [endpoint[1], endpoint[2]]
\r
144 self.send_response(400)
\r
147 self.send_response(400)
\r
148 self.end_headers()
\r
150 elif re.search(ANNOUNCE_INITINSTANCE_URL, self.path) != None:
\r
151 if self.headers.get_content_type() == 'application/json' :
\r
152 # Find content length
\r
154 for i in range(len(self.headers.keys())):
\r
155 if self.headers.keys()[i] == 'Content-Length' :
\r
156 content_len = int (self.headers.values()[i])
\r
158 if content_len != 0 :
\r
159 # extract name of initialized gs instance
\r
160 name = extract_instance_name(self.rfile.read(content_len))
\r
162 self.send_response(400)
\r
164 self.send_response(200)
\r
165 gs_init_instances[name] = 1
\r
167 self.send_response(400)
\r
170 self.send_response(400)
\r
171 self.end_headers()
\r
173 elif re.search(ANNOUNCE_SOURCE_URL, self.path) != None:
\r
174 if self.headers.get_content_type() == 'application/json' :
\r
175 # Find content length
\r
177 for i in range(len(self.headers.keys())):
\r
178 if self.headers.keys()[i] == 'Content-Length' :
\r
179 content_len = int (self.headers.values()[i])
\r
181 if content_len != 0 :
\r
182 # extract endpoint information
\r
183 endpoint = extract_endpoint(self.rfile.read(content_len))
\r
184 if endpoint == [] :
\r
185 self.send_response(400)
\r
187 self.send_response(200)
\r
188 sources[endpoint[0]] = [endpoint[1], endpoint[2]]
\r
190 self.send_response(400)
\r
193 self.send_response(400)
\r
196 elif re.search(ANNOUNCE_SINK_URL, self.path) != None:
\r
197 if self.headers.get_content_type() == 'application/json' :
\r
198 # Find content length
\r
200 for i in range(len(self.headers.keys())):
\r
201 if self.headers.keys()[i] == 'Content-Length' :
\r
202 content_len = int (self.headers.values()[i])
\r
204 if content_len != 0 :
\r
205 # extract endpoint information
\r
206 endpoint = extract_endpoint(self.rfile.read(content_len))
\r
207 if endpoint == [] :
\r
208 self.send_response(400)
\r
210 self.send_response(200)
\r
211 sinks[endpoint[0]] = [endpoint[1], endpoint[2]]
\r
213 self.send_response(400)
\r
216 self.send_response(400)
\r
217 self.end_headers()
\r
219 elif re.search(ANNOUNCE_STARTPROCESSING_URL, self.path) != None:
\r
220 if self.headers.get_content_type() == 'application/json' :
\r
221 # Find content length
\r
223 for i in range(len(self.headers.keys())):
\r
224 if self.headers.keys()[i] == 'Content-Length' :
\r
225 content_len = int (self.headers.values()[i])
\r
227 if content_len != 0 :
\r
228 # extract name of initialized gs instance
\r
229 name = extract_instance_name(self.rfile.read(content_len))
\r
231 self.send_response(400)
\r
233 self.send_response(200)
\r
234 gs_startprocessing_instances[name] = 1
\r
236 self.send_response(400)
\r
239 self.send_response(400)
\r
242 # we do not do any processing for ANNOUNCE_STREAM_SUBSCRIPTION, ANNOUNCE_FTA_INSTANCE and ANNOUNCE_METRICS in gshub simulator
\r
243 elif (re.search(ANNOUNCE_STREAM_SUBSCRIPTION, self.path) != None) or (re.search(ANNOUNCE_FTA_INSTANCE, self.path) != None) or (re.search(ANNOUNCE_METRICS, self.path) != None):
\r
244 if self.headers.get_content_type() == 'application/json' :
\r
245 # Find content length
\r
247 for i in range(len(self.headers.keys())):
\r
248 if self.headers.keys()[i] == 'Content-Length' :
\r
249 content_len = int (self.headers.values()[i])
\r
251 if content_len != 0 :
\r
252 self.send_response(200)
\r
254 self.send_response(400)
\r
257 self.send_response(400)
\r
258 self.end_headers()
\r
261 self.send_response(404)
\r
266 if re.search(DISCOVER_INSTANCE_URL + '/*', self.path) != None:
\r
267 instance = self.path.split('/')[-1]
\r
268 # check if this instance is registered
\r
269 if instance in gs_instances :
\r
270 self.send_response(200)
\r
271 self.send_header('Content-Type', 'application/json')
\r
273 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}", "utf-8"))
\r
275 self.send_response(400)
\r
279 elif re.search(DISCOVER_INITINSTANCE_URL + '/*', self.path) != None:
\r
280 instance = self.path.split('/')[-1]
\r
281 # check if this instance is initialized
\r
282 if instance in gs_init_instances :
\r
283 self.send_response(200)
\r
284 self.send_header('Content-Type', 'application/json')
\r
286 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}", "utf-8"))
\r
288 self.send_response(400)
\r
291 elif re.search(DISCOVER_SOURCE_URL + '/*', self.path) != None:
\r
292 source = self.path.split('/')[-1]
\r
293 # check if it is a registered source
\r
294 if source in sources :
\r
295 self.send_response(200)
\r
296 self.send_header('Content-Type', 'application/json')
\r
298 self.wfile.write(bytes("{\"ip\" : \"" + sources[source][0] + "\", \"port\": " + str(sources[source][1]) + "}", "utf-8"))
\r
300 self.send_response(400)
\r
303 elif re.search(DISCOVER_SINK_URL + '/*', self.path) != None:
\r
304 sink = self.path.split('/')[-1]
\r
305 # check if it is a registered sink
\r
307 self.send_response(200)
\r
308 self.send_header('Content-Type', 'application/json')
\r
310 self.wfile.write(bytes("{\"ip\" : \"" + sinks[sink][0] + "\", \"port\": " + str(sinks[sink][1]) + "}", "utf-8"))
\r
312 self.send_response(400)
\r
315 elif re.search(DISCOVER_STARTPROCESSING_URL + '/*', self.path) != None:
\r
316 instance = self.path.split('/')[-1]
\r
317 # check if this instance is initialized
\r
318 if instance in gs_startprocessing_instances :
\r
319 self.send_response(200)
\r
320 self.send_header('Content-Type', 'application/json')
\r
322 self.wfile.write(bytes("{}", "utf-8"))
\r
324 self.send_response(400)
\r
325 self.end_headers()
\r
327 self.send_response(404)
\r
332 # we will use standard python threaded HTTP server
\r
333 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
\r
334 allow_reuse_address = True
\r
336 def shutdown(self):
\r
337 self.socket.close()
\r
338 HTTPServer.shutdown(self)
\r
340 class SimpleHttpServer:
\r
341 def __init__(self, ip, port):
\r
342 self.server = ThreadedHTTPServer((ip,port), HTTPRequestHandler)
\r
345 self.server_thread = threading.Thread(target=self.server.serve_forever)
\r
346 self.server_thread.daemon = True
\r
347 self.server_thread.start()
\r
349 def waitForThread(self):
\r
350 self.server_thread.join()
\r
353 self.server.shutdown()
\r
354 self.waitForThread()
\r
357 # print usage instructions
\r
359 print ('./gshub.py [-p port]')
\r
364 # process command-line arguments
\r
366 opts, args = getopt.getopt(sys.argv[1:], "hp:v", ["help", "port="])
\r
367 except getopt.GetoptError as err:
\r
368 # print help information and exit:
\r
375 if o in ("-h", "--help"):
\r
378 elif o in ("-p", "--port"):
\r
381 print ('Unknown command-line option ' + o)
\r
383 # start HTTP server to serve REST calls
\r
384 server = SimpleHttpServer('127.0.0.1', port)
\r
386 # record HTTP server address in gshub.log
\r
387 f = open('gshub.log', 'w')
\r
388 f.write('127.0.0.1:' + str(server.server.server_port) + '\n')
\r
391 print ('GSHUB Running on port ' + str(server.server.server_port) + ' ...')
\r
393 server.waitForThread()
\r
396 if __name__ == "__main__":
\r