2 # ------------------------------------------------
\r
3 # Copyright 2014 AT&T Intellectual Property
\r
4 # Licensed under the Apache License, Version 2.0 (the "License");
\r
5 # you may not use this file except in compliance with the License.
\r
6 # You may obtain a copy of the License at
\r
8 # http://www.apache.org/licenses/LICENSE-2.0
\r
10 # Unless required by applicable law or agreed to in writing, software
\r
11 # distributed under the License is distributed on an "AS IS" BASIS,
\r
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
13 # See the License for the specific language governing permissions and
\r
14 # limitations under the License.
\r
15 # -------------------------------------------
\r
17 # Implementation of GSHUB REST service
\r
18 # for announcement and discovery of gs instances, sources and sinks
\r
20 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
\r
32 # lis of URLS for all the REST calls we will serve
\r
33 DISCOVER_INSTANCE_URL = '/v1/discover-instance'
\r
34 DISCOVER_INITINSTANCE_URL = '/v1/discover-initialized-instance'
\r
35 DISCOVER_SOURCE_URL = '/v1/discover-source'
\r
36 DISCOVER_SINK_URL = '/v1/discover-sink'
\r
37 DISCOVER_STARTPROCESSING_URL = '/v1/discover-start-processing'
\r
38 ANNOUNCE_INSTANCE_URL = '/v1/announce-instance'
\r
39 ANNOUNCE_INITINSTANCE_URL = '/v1/announce-initialized-instance'
\r
40 ANNOUNCE_SOURCE_URL = '/v1/announce-source'
\r
41 ANNOUNCE_SINK_URL = '/v1/announce-sink'
\r
42 ANNOUNCE_STARTPROCESSING_URL = '/v1/announce-start-processing'
\r
43 ANNOUNCE_STREAM_SUBSCRIPTION = '/v1/announce-stream-subscription'
\r
44 ANNOUNCE_FTA_INSTANCE = '/v1/announce-fta-instance'
\r
45 ANNOUNCE_METRICS = '/v1/log-metrics'
\r
47 # gs instance endpoints
\r
50 # initialized gs instances
\r
51 gs_init_instances = {}
\r
53 # instances for which processing started
\r
54 gs_startprocessing_instances = {}
\r
63 # exctract endpoint information from json data
\r
64 def extract_endpoint(data) :
\r
70 doc = json.loads(str(data))
\r
72 print ('Invalid json message ' + str(data))
\r
75 for key in doc.keys() :
\r
80 # validate ip address
\r
82 socket.inet_pton(socket.AF_INET, ip)
\r
84 print ('Invalid IPV4 address ' + ip)
\r
86 elif key == 'port' :
\r
87 # validate port number
\r
89 port = int(doc[key])
\r
91 print ('Invalid port number ' + doc[key])
\r
94 if name == '' or ip == '' or port == 0 :
\r
95 print ('Name, ip or port is missing from json message ' + str(doc))
\r
99 return [name, ip, port]
\r
102 # extract instance name from json data
\r
103 def extract_instance_name(data) :
\r
107 doc = json.loads(str(data))
\r
109 print ('Invalid json message ' + str(data))
\r
112 for key in doc.keys() :
\r
117 print ('Name field is missing in json message ' + str(doc))
\r
118 elif (name in gs_instances) == False:
\r
119 print ('Attempt to announce the initialization or start of processing for unknown instance ' + name)
\r
124 # handler for HTTP requests. We will override do_POST and do_GET of BaseHTTPRequestHandler
\r
125 class Server(BaseHTTPRequestHandler) :
\r
128 if re.search(ANNOUNCE_INSTANCE_URL, self.path) != None:
\r
131 if self.headers.getheader('content-type') == 'application/json' :
\r
132 # Find content length
\r
134 for i in range(len(self.headers.keys())):
\r
135 if self.headers.keys()[i] == 'content-length' :
\r
136 content_len = int (self.headers.values()[i])
\r
138 if content_len != 0 :
\r
139 # extract endpoint information
\r
140 endpoint = extract_endpoint(self.rfile.read(content_len))
\r
141 if endpoint == [] :
\r
142 self.send_response(400)
\r
144 self.send_response(200)
\r
145 gs_instances[endpoint[0]] = [endpoint[1], endpoint[2]]
\r
147 self.send_response(400)
\r
150 self.send_response(400)
\r
151 self.end_headers()
\r
153 elif re.search(ANNOUNCE_INITINSTANCE_URL, self.path) != None:
\r
154 if self.headers.getheader('content-type') == 'application/json' :
\r
155 # Find content length
\r
157 for i in range(len(self.headers.keys())):
\r
158 if self.headers.keys()[i] == 'content-length' :
\r
159 content_len = int (self.headers.values()[i])
\r
161 if content_len != 0 :
\r
162 # extract name of initialized gs instance
\r
163 name = extract_instance_name(self.rfile.read(content_len))
\r
165 self.send_response(400)
\r
167 self.send_response(200)
\r
168 gs_init_instances[name] = 1
\r
170 self.send_response(400)
\r
173 self.send_response(400)
\r
174 self.end_headers()
\r
176 elif re.search(ANNOUNCE_SOURCE_URL, self.path) != None:
\r
177 if self.headers.getheader('content-type') == 'application/json' :
\r
178 # Find content length
\r
180 for i in range(len(self.headers.keys())):
\r
181 if self.headers.keys()[i] == 'content-length' :
\r
182 content_len = int (self.headers.values()[i])
\r
184 if content_len != 0 :
\r
185 # extract endpoint information
\r
186 endpoint = extract_endpoint(self.rfile.read(content_len))
\r
187 if endpoint == [] :
\r
188 self.send_response(400)
\r
190 self.send_response(200)
\r
191 sources[endpoint[0]] = [endpoint[1], endpoint[2]]
\r
193 self.send_response(400)
\r
196 self.send_response(400)
\r
199 elif re.search(ANNOUNCE_SINK_URL, self.path) != None:
\r
200 if self.headers.getheader('content-type') == 'application/json' :
\r
201 # Find content length
\r
203 for i in range(len(self.headers.keys())):
\r
204 if self.headers.keys()[i] == 'content-length' :
\r
205 content_len = int (self.headers.values()[i])
\r
207 if content_len != 0 :
\r
208 # extract endpoint information
\r
209 endpoint = extract_endpoint(self.rfile.read(content_len))
\r
210 if endpoint == [] :
\r
211 self.send_response(400)
\r
213 self.send_response(200)
\r
214 sinks[endpoint[0]] = [endpoint[1], endpoint[2]]
\r
216 self.send_response(400)
\r
219 self.send_response(400)
\r
220 self.end_headers()
\r
222 elif re.search(ANNOUNCE_STARTPROCESSING_URL, self.path) != None:
\r
223 if self.headers.getheader('content-type') == 'application/json' :
\r
224 # Find content length
\r
226 for i in range(len(self.headers.keys())):
\r
227 if self.headers.keys()[i] == 'content-length' :
\r
228 content_len = int (self.headers.values()[i])
\r
230 if content_len != 0 :
\r
231 # extract name of initialized gs instance
\r
232 name = extract_instance_name(self.rfile.read(content_len))
\r
234 self.send_response(400)
\r
236 self.send_response(200)
\r
237 gs_startprocessing_instances[name] = 1
\r
239 self.send_response(400)
\r
242 self.send_response(400)
\r
245 # we do not do any processing for ANNOUNCE_STREAM_SUBSCRIPTION, ANNOUNCE_FTA_INSTANCE and ANNOUNCE_METRICS in gshub simulator
\r
246 elif (re.search(ANNOUNCE_STREAM_SUBSCRIPTION, self.path) != None) or (re.search(ANNOUNCE_FTA_INSTANCE, self.path) != None) or (re.search(ANNOUNCE_METRICS, self.path) != None):
\r
247 if self.headers.getheader('content-type') == 'application/json' :
\r
248 # Find content length
\r
250 for i in range(len(self.headers.keys())):
\r
251 if self.headers.keys()[i] == 'content-length' :
\r
252 content_len = int (self.headers.values()[i])
\r
254 if content_len != 0 :
\r
255 self.send_response(200)
\r
257 self.send_response(400)
\r
260 self.send_response(400)
\r
261 self.end_headers()
\r
264 self.send_response(404)
\r
269 if re.search(DISCOVER_INSTANCE_URL + '/*', self.path) != None:
\r
270 instance = self.path.split('/')[-1]
\r
271 # check if this instance is registered
\r
272 if instance in gs_instances :
\r
273 self.send_response(200)
\r
274 self.send_header('Content-Type', 'application/json')
\r
276 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}"))
\r
278 self.send_response(400)
\r
282 elif re.search(DISCOVER_INITINSTANCE_URL + '/*', self.path) != None:
\r
283 instance = self.path.split('/')[-1]
\r
284 # check if this instance is initialized
\r
285 if instance in gs_init_instances :
\r
286 self.send_response(200)
\r
287 self.send_header('Content-Type', 'application/json')
\r
289 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}"))
\r
291 self.send_response(400)
\r
294 elif re.search(DISCOVER_SOURCE_URL + '/*', self.path) != None:
\r
295 source = self.path.split('/')[-1]
\r
296 # check if it is a registered source
\r
297 if source in sources :
\r
298 self.send_response(200)
\r
299 self.send_header('Content-Type', 'application/json')
\r
301 self.wfile.write(bytes("{\"ip\" : \"" + sources[source][0] + "\", \"port\": " + str(sources[source][1]) + "}"))
\r
303 self.send_response(400)
\r
306 elif re.search(DISCOVER_SINK_URL + '/*', self.path) != None:
\r
307 sink = self.path.split('/')[-1]
\r
308 # check if it is a registered sink
\r
310 self.send_response(200)
\r
311 self.send_header('Content-Type', 'application/json')
\r
313 self.wfile.write(bytes("{\"ip\" : \"" + sinks[sink][0] + "\", \"port\": " + str(sinks[sink][1]) + "}"))
\r
315 self.send_response(400)
\r
318 elif re.search(DISCOVER_STARTPROCESSING_URL + '/*', self.path) != None:
\r
319 instance = self.path.split('/')[-1]
\r
320 # check if this instance is initialized
\r
321 if instance in gs_startprocessing_instances :
\r
322 self.send_response(200)
\r
323 self.send_header('Content-Type', 'application/json')
\r
325 self.wfile.write(bytes("{}"))
\r
327 self.send_response(400)
\r
328 self.end_headers()
\r
330 self.send_response(404)
\r
335 # print usage instructions
\r
337 print ('./gshub.py [-p port]')
\r
342 # process command-line arguments
\r
344 opts, args = getopt.getopt(sys.argv[1:], "hp:v", ["help", "port="])
\r
345 except getopt.GetoptError as err:
\r
346 # print help information and exit:
\r
353 if o in ("-h", "--help"):
\r
356 elif o in ("-p", "--port"):
\r
359 print ('Unknown command-line option ' + o)
\r
361 # start HTTP server to serve REST calls
\r
362 server_address = ('127.0.0.1', port)
\r
363 httpd = HTTPServer(server_address, Server)
\r
365 # record HTTP server address in gshub.log
\r
366 f = open('gshub.log', 'w')
\r
367 f.write(str(httpd.server_address[0]) + ':' + str(httpd.server_address[1]) + '\n')
\r
370 print ('GSHUB Running on port ' + str(httpd.server_address[1]) + ' ...')
\r
372 httpd.serve_forever()
\r
375 if __name__ == "__main__":
\r