2 # ------------------------------------------------
3 # Copyright 2014 AT&T Intellectual Property
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # -------------------------------------------
17 # Implementation of GSHUB REST service
18 # for announcement and discovery of gs instances, sources and sinks
20 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
32 # lis of URLS for all the REST calls we will serve
33 DISCOVER_INSTANCE_URL = '/v1/discover-instance'
34 DISCOVER_INITINSTANCE_URL = '/v1/discover-initialized-instance'
35 DISCOVER_SOURCE_URL = '/v1/discover-source'
36 DISCOVER_SINK_URL = '/v1/discover-sink'
37 DISCOVER_STARTPROCESSING_URL = '/v1/discover-start-processing'
38 ANNOUNCE_INSTANCE_URL = '/v1/announce-instance'
39 ANNOUNCE_INITINSTANCE_URL = '/v1/announce-initialized-instance'
40 ANNOUNCE_SOURCE_URL = '/v1/announce-source'
41 ANNOUNCE_SINK_URL = '/v1/announce-sink'
42 ANNOUNCE_STARTPROCESSING_URL = '/v1/announce-start-processing'
43 ANNOUNCE_STREAM_SUBSCRIPTION = '/v1/announce-stream-subscription'
44 ANNOUNCE_FTA_INSTANCE = '/v1/announce-fta-instance'
45 ANNOUNCE_METRICS = '/v1/log-metrics'
47 # gs instance endpoints
50 # initialized gs instances
51 gs_init_instances = {}
53 # instances for which processing started
54 gs_startprocessing_instances = {}
63 # exctract endpoint information from json data
64 def extract_endpoint(data) :
70 doc = json.loads(str(data))
72 print ('Invalid json message ' + str(data))
75 for key in doc.keys() :
82 socket.inet_pton(socket.AF_INET, ip)
84 print ('Invalid IPV4 address ' + ip)
87 # validate port number
91 print ('Invalid port number ' + doc[key])
94 if name == '' or ip == '' or port == 0 :
95 print ('Name, ip or port is missing from json message ' + str(doc))
99 return [name, ip, port]
102 # extract instance name from json data
103 def extract_instance_name(data) :
107 doc = json.loads(str(data))
109 print ('Invalid json message ' + str(data))
112 for key in doc.keys() :
117 print ('Name field is missing in json message ' + str(doc))
118 elif (name in gs_instances) == False:
119 print ('Attempt to announce the initialization or start of processing for unknown instance ' + name)
124 # handler for HTTP requests. We will override do_POST and do_GET of BaseHTTPRequestHandler
125 class Server(BaseHTTPRequestHandler) :
128 if re.search(ANNOUNCE_INSTANCE_URL, self.path) != None:
131 if self.headers.getheader('content-type') == 'application/json' :
132 # Find content length
134 for i in range(len(self.headers.keys())):
135 if self.headers.keys()[i] == 'content-length' :
136 content_len = int (self.headers.values()[i])
138 if content_len != 0 :
139 # extract endpoint information
140 endpoint = extract_endpoint(self.rfile.read(content_len))
142 self.send_response(400)
144 self.send_response(200)
145 gs_instances[endpoint[0]] = [endpoint[1], endpoint[2]]
147 self.send_response(400)
150 self.send_response(400)
153 elif re.search(ANNOUNCE_INITINSTANCE_URL, self.path) != None:
154 if self.headers.getheader('content-type') == 'application/json' :
155 # Find content length
157 for i in range(len(self.headers.keys())):
158 if self.headers.keys()[i] == 'content-length' :
159 content_len = int (self.headers.values()[i])
161 if content_len != 0 :
162 # extract name of initialized gs instance
163 name = extract_instance_name(self.rfile.read(content_len))
165 self.send_response(400)
167 self.send_response(200)
168 gs_init_instances[name] = 1
170 self.send_response(400)
173 self.send_response(400)
176 elif re.search(ANNOUNCE_SOURCE_URL, self.path) != None:
177 if self.headers.getheader('content-type') == 'application/json' :
178 # Find content length
180 for i in range(len(self.headers.keys())):
181 if self.headers.keys()[i] == 'content-length' :
182 content_len = int (self.headers.values()[i])
184 if content_len != 0 :
185 # extract endpoint information
186 endpoint = extract_endpoint(self.rfile.read(content_len))
188 self.send_response(400)
190 self.send_response(200)
191 sources[endpoint[0]] = [endpoint[1], endpoint[2]]
193 self.send_response(400)
196 self.send_response(400)
199 elif re.search(ANNOUNCE_SINK_URL, self.path) != None:
200 if self.headers.getheader('content-type') == 'application/json' :
201 # Find content length
203 for i in range(len(self.headers.keys())):
204 if self.headers.keys()[i] == 'content-length' :
205 content_len = int (self.headers.values()[i])
207 if content_len != 0 :
208 # extract endpoint information
209 endpoint = extract_endpoint(self.rfile.read(content_len))
211 self.send_response(400)
213 self.send_response(200)
214 sinks[endpoint[0]] = [endpoint[1], endpoint[2]]
216 self.send_response(400)
219 self.send_response(400)
222 elif re.search(ANNOUNCE_STARTPROCESSING_URL, self.path) != None:
223 if self.headers.getheader('content-type') == 'application/json' :
224 # Find content length
226 for i in range(len(self.headers.keys())):
227 if self.headers.keys()[i] == 'content-length' :
228 content_len = int (self.headers.values()[i])
230 if content_len != 0 :
231 # extract name of initialized gs instance
232 name = extract_instance_name(self.rfile.read(content_len))
234 self.send_response(400)
236 self.send_response(200)
237 gs_startprocessing_instances[name] = 1
239 self.send_response(400)
242 self.send_response(400)
245 # we do not do any processing for ANNOUNCE_STREAM_SUBSCRIPTION, ANNOUNCE_FTA_INSTANCE and ANNOUNCE_METRICS in gshub simulator
246 elif (re.search(ANNOUNCE_STREAM_SUBSCRIPTION, self.path) != None) or (re.search(ANNOUNCE_FTA_INSTANCE, self.path) != None) or (re.search(ANNOUNCE_METRICS, self.path) != None):
247 if self.headers.getheader('content-type') == 'application/json' :
248 # Find content length
250 for i in range(len(self.headers.keys())):
251 if self.headers.keys()[i] == 'content-length' :
252 content_len = int (self.headers.values()[i])
254 if content_len != 0 :
255 self.send_response(200)
257 self.send_response(400)
260 self.send_response(400)
264 self.send_response(404)
269 if re.search(DISCOVER_INSTANCE_URL + '/*', self.path) != None:
270 instance = self.path.split('/')[-1]
271 # check if this instance is registered
272 if instance in gs_instances :
273 self.send_response(200)
274 self.send_header('Content-Type', 'application/json')
276 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}"))
278 self.send_response(400)
282 elif re.search(DISCOVER_INITINSTANCE_URL + '/*', self.path) != None:
283 instance = self.path.split('/')[-1]
284 # check if this instance is initialized
285 if instance in gs_init_instances :
286 self.send_response(200)
287 self.send_header('Content-Type', 'application/json')
289 self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}"))
291 self.send_response(400)
294 elif re.search(DISCOVER_SOURCE_URL + '/*', self.path) != None:
295 source = self.path.split('/')[-1]
296 # check if it is a registered source
297 if source in sources :
298 self.send_response(200)
299 self.send_header('Content-Type', 'application/json')
301 self.wfile.write(bytes("{\"ip\" : \"" + sources[source][0] + "\", \"port\": " + str(sources[source][1]) + "}"))
303 self.send_response(400)
306 elif re.search(DISCOVER_SINK_URL + '/*', self.path) != None:
307 sink = self.path.split('/')[-1]
308 # check if it is a registered sink
310 self.send_response(200)
311 self.send_header('Content-Type', 'application/json')
313 self.wfile.write(bytes("{\"ip\" : \"" + sinks[sink][0] + "\", \"port\": " + str(sinks[sink][1]) + "}"))
315 self.send_response(400)
318 elif re.search(DISCOVER_STARTPROCESSING_URL + '/*', self.path) != None:
319 instance = self.path.split('/')[-1]
320 # check if this instance is initialized
321 if instance in gs_startprocessing_instances :
322 self.send_response(200)
323 self.send_header('Content-Type', 'application/json')
325 self.wfile.write(bytes("{}"))
327 self.send_response(400)
330 self.send_response(404)
335 # print usage instructions
337 print ('./gshub.py [-p port]')
342 # process command-line arguments
344 opts, args = getopt.getopt(sys.argv[1:], "hp:v", ["help", "port="])
345 except getopt.GetoptError as err:
346 # print help information and exit:
353 if o in ("-h", "--help"):
356 elif o in ("-p", "--port"):
359 print ('Unknown command-line option ' + o)
361 # start HTTP server to serve REST calls
362 server_address = ('127.0.0.1', port)
363 httpd = HTTPServer(server_address, Server)
365 # record HTTP server address in gshub.log
366 f = open('gshub.log', 'w')
367 f.write(str(httpd.server_address[0]) + ':' + str(httpd.server_address[1]) + '\n')
370 print ('GSHUB Running on port ' + str(httpd.server_address[1]) + ' ...')
372 httpd.serve_forever()
375 if __name__ == "__main__":