Added quantiling UDAFs
[com/gs-lite.git] / bin / gshub3.py
index a68a1d7..73481a8 100755 (executable)
-#!/usr/bin/python3.6
-
-# ------------------------------------------------
-#   Copyright 2014 AT&T Intellectual Property
-#   Licensed under the Apache License, Version 2.0 (the "License");
-#   you may not use this file except in compliance with the License.
-#   You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-#   Unless required by applicable law or agreed to in writing, software
-#   distributed under the License is distributed on an "AS IS" BASIS,
-#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#   See the License for the specific language governing permissions and
-#   limitations under the License.
-# -------------------------------------------
-
-# Implementation of GSHUB REST service
-# for announcement and discovery of gs instances, sources and sinks
-
-from http.server import BaseHTTPRequestHandler, HTTPServer
-from socketserver import ThreadingMixIn
-import threading
-import getopt
-import sys
-import re
-import cgi
-import socket
-import json
-
-# lis of URLS for all the REST calls we will serve
-DISCOVER_INSTANCE_URL = '/v1/discover-instance'
-DISCOVER_INITINSTANCE_URL = '/v1/discover-initialized-instance'
-DISCOVER_SOURCE_URL =  '/v1/discover-source'
-DISCOVER_SINK_URL =  '/v1/discover-sink'
-DISCOVER_STARTPROCESSING_URL = '/v1/discover-start-processing'
-ANNOUNCE_INSTANCE_URL =  '/v1/announce-instance'
-ANNOUNCE_INITINSTANCE_URL =  '/v1/announce-initialized-instance'
-ANNOUNCE_SOURCE_URL =  '/v1/announce-source'
-ANNOUNCE_SINK_URL =  '/v1/announce-sink'
-ANNOUNCE_STARTPROCESSING_URL = '/v1/announce-start-processing'
-ANNOUNCE_STREAM_SUBSCRIPTION = '/v1/announce-stream-subscription'
-ANNOUNCE_FTA_INSTANCE = '/v1/announce-fta-instance'
-ANNOUNCE_METRICS = '/v1/log-metrics'
-
-# gs instance endpoints
-gs_instances = {}
-
-# initialized gs instances
-gs_init_instances = {}
-
-# instances for which processing started
-gs_startprocessing_instances = {}
-
-# source endpoints
-sources = {}
-
-# sink endpoints
-sinks = {}
-
-
-# exctract endpoint information from json data
-def extract_endpoint(data) :
-       name = ''
-       ip = ''
-       port = 0
-               
-       try :
-               doc = json.loads(str(data, 'utf-8'))    
-       except :
-               print ('Invalid json message ' + str(data, 'utf-8'))
-               return []
-       
-       for key in doc.keys() :
-               if key == 'name' :
-                       name = doc[key]
-               elif key == 'ip' :
-                       ip = doc[key]
-                       # validate ip address           
-                       try :
-                               socket.inet_pton(socket.AF_INET, ip)
-                       except :
-                               print ('Invalid IPV4 address ' + ip)
-                               ip = ''
-               elif key == 'port' :
-                       # validate port number
-                       try :
-                               port = int(doc[key])
-                       except :
-                               print ('Invalid port number ' + doc[key])
-                               port = 0
-       
-       if name == '' or ip == '' or port == 0 :
-               print ('Name, ip or port is missing from json message ' + str(doc))
-               return []
-       
-
-       return [name, ip, port]
-
-
-# extract instance name from json data
-def extract_instance_name(data) :
-       name = ''
-       
-       try :
-               doc = json.loads(str(data, 'utf-8'))    
-       except :
-               print ('Invalid json message ' + str(data, "utf-8"))
-               return ''
-       
-       for key in doc.keys() :
-               if key == 'name' :
-                       name = doc[key]
-       
-       if name == '' :
-               print ('Name field is missing in json message ' + str(doc))
-       elif (name in gs_instances) == False:
-               print ('Attempt to announce the initialization or start of processing for unknown instance ' + name)
-               name = ''
-               
-       return name
-       
-# handler for HTTP requests. We will override do_PORT and do_GET of BaseHTTPRequestHandler
-class HTTPRequestHandler(BaseHTTPRequestHandler):
-
-       def do_POST(self):
-               if re.search(ANNOUNCE_INSTANCE_URL, self.path) != None:         
-                       if self.headers.get_content_type() == 'application/json' :
-                               # Find content length
-                               content_len = 0
-                               for i in range(len(self.headers.keys())):
-                                       if self.headers.keys()[i] == 'Content-Length' :
-                                               content_len = int (self.headers.values()[i])
-                                               break
-                               if content_len != 0 :
-                                       # extract endpoint information
-                                       endpoint = extract_endpoint(self.rfile.read(content_len))
-                                       if endpoint == [] :
-                                               self.send_response(400)
-                                       else :
-                                               self.send_response(200)
-                                               gs_instances[endpoint[0]] = [endpoint[1], endpoint[2]]
-                               else :
-                                       self.send_response(400)
-
-                       else:
-                               self.send_response(400)
-                       self.end_headers()              
-                       
-               elif re.search(ANNOUNCE_INITINSTANCE_URL, self.path) != None:
-                       if self.headers.get_content_type() == 'application/json' :
-                               # Find content length
-                               content_len = 0
-                               for i in range(len(self.headers.keys())):
-                                       if self.headers.keys()[i] == 'Content-Length' :
-                                               content_len = int (self.headers.values()[i])
-                                               break
-                               if content_len != 0 :
-                                       # extract name of initialized gs instance                               
-                                       name = extract_instance_name(self.rfile.read(content_len))
-                                       if name == '' :
-                                               self.send_response(400)
-                                       else :
-                                               self.send_response(200)
-                                               gs_init_instances[name] = 1
-                               else :
-                                       self.send_response(400)
-
-                       else:
-                               self.send_response(400)
-                       self.end_headers()      
-                       
-               elif re.search(ANNOUNCE_SOURCE_URL, self.path) != None:
-                       if self.headers.get_content_type() == 'application/json' :
-                               # Find content length
-                               content_len = 0
-                               for i in range(len(self.headers.keys())):
-                                       if self.headers.keys()[i] == 'Content-Length' :
-                                               content_len = int (self.headers.values()[i])
-                                               break
-                               if content_len != 0 :
-                                       # extract endpoint information                          
-                                       endpoint = extract_endpoint(self.rfile.read(content_len))
-                                       if endpoint == [] :
-                                               self.send_response(400)
-                                       else :
-                                               self.send_response(200)
-                                               sources[endpoint[0]] = [endpoint[1], endpoint[2]]
-                               else :
-                                       self.send_response(400)
-
-                       else:
-                               self.send_response(400)
-                       self.end_headers()
-                       
-               elif re.search(ANNOUNCE_SINK_URL, self.path) != None:
-                       if self.headers.get_content_type() == 'application/json' :
-                               # Find content length
-                               content_len = 0
-                               for i in range(len(self.headers.keys())):
-                                       if self.headers.keys()[i] == 'Content-Length' :
-                                               content_len = int (self.headers.values()[i])
-                                               break
-                               if content_len != 0 :
-                                       # extract endpoint information                          
-                                       endpoint = extract_endpoint(self.rfile.read(content_len))
-                                       if endpoint == [] :
-                                               self.send_response(400)
-                                       else :
-                                               self.send_response(200)
-                                               sinks[endpoint[0]] = [endpoint[1], endpoint[2]]
-                               else :
-                                       self.send_response(400)
-
-                       else:
-                               self.send_response(400)
-                       self.end_headers()      
-                       
-               elif re.search(ANNOUNCE_STARTPROCESSING_URL, self.path) != None:
-                       if self.headers.get_content_type() == 'application/json' :
-                               # Find content length
-                               content_len = 0
-                               for i in range(len(self.headers.keys())):
-                                       if self.headers.keys()[i] == 'Content-Length' :
-                                               content_len = int (self.headers.values()[i])
-                                               break
-                               if content_len != 0 :
-                                       # extract name of initialized gs instance                               
-                                       name = extract_instance_name(self.rfile.read(content_len))
-                                       if name == '' :
-                                               self.send_response(400)
-                                       else :
-                                               self.send_response(200)
-                                               gs_startprocessing_instances[name] = 1
-                               else :
-                                       self.send_response(400)
-
-                       else:
-                               self.send_response(400)
-                       self.end_headers()
-                       
-               # we do not do any processing for ANNOUNCE_STREAM_SUBSCRIPTION, ANNOUNCE_FTA_INSTANCE and ANNOUNCE_METRICS in gshub simulator           
-               elif (re.search(ANNOUNCE_STREAM_SUBSCRIPTION, self.path) != None) or (re.search(ANNOUNCE_FTA_INSTANCE, self.path) != None) or (re.search(ANNOUNCE_METRICS, self.path) != None):
-                       if self.headers.get_content_type() == 'application/json' :
-                               # Find content length
-                               content_len = 0
-                               for i in range(len(self.headers.keys())):
-                                       if self.headers.keys()[i] == 'Content-Length' :
-                                               content_len = int (self.headers.values()[i])
-                                               break
-                               if content_len != 0 :                           
-                                       self.send_response(200)
-                               else :
-                                       self.send_response(400)
-
-                       else:
-                               self.send_response(400)
-                       self.end_headers()      
-                       
-               else:
-                       self.send_response(404)
-                       self.end_headers()
-               return
-
-       def do_GET(self):
-               if re.search(DISCOVER_INSTANCE_URL + '/*', self.path) != None:
-                       instance = self.path.split('/')[-1]
-                       # check if this instance is registered
-                       if instance in gs_instances :
-                               self.send_response(200)
-                               self.send_header('Content-Type', 'application/json')
-                               self.end_headers()
-                               self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}", "utf-8"))
-                       else:
-                               self.send_response(400)
-                               self.end_headers()
-
-
-               elif re.search(DISCOVER_INITINSTANCE_URL + '/*', self.path) != None:
-                       instance = self.path.split('/')[-1]
-                       # check if this instance is initialized
-                       if instance in gs_init_instances :
-                               self.send_response(200)
-                               self.send_header('Content-Type', 'application/json')
-                               self.end_headers()
-                               self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}", "utf-8"))
-                       else:
-                               self.send_response(400)
-                               self.end_headers()
-
-               elif re.search(DISCOVER_SOURCE_URL + '/*', self.path) != None:
-                       source = self.path.split('/')[-1]
-                       # check if it is a registered source
-                       if source in sources :
-                               self.send_response(200)
-                               self.send_header('Content-Type', 'application/json')
-                               self.end_headers()
-                               self.wfile.write(bytes("{\"ip\" : \"" + sources[source][0] + "\", \"port\": " + str(sources[source][1]) + "}", "utf-8"))
-                       else:
-                               self.send_response(400)
-                               self.end_headers()
-                               
-               elif re.search(DISCOVER_SINK_URL + '/*', self.path) != None:
-                       sink = self.path.split('/')[-1]
-                       # check if it is a registered sink
-                       if sink in sinks :
-                               self.send_response(200)
-                               self.send_header('Content-Type', 'application/json')
-                               self.end_headers()
-                               self.wfile.write(bytes("{\"ip\" : \"" + sinks[sink][0] + "\", \"port\": " + str(sinks[sink][1]) + "}", "utf-8"))
-                       else:
-                               self.send_response(400)
-                               self.end_headers()
-                               
-               elif re.search(DISCOVER_STARTPROCESSING_URL + '/*', self.path) != None:
-                       instance = self.path.split('/')[-1]
-                       # check if this instance is initialized
-                       if instance in gs_startprocessing_instances :
-                               self.send_response(200)
-                               self.send_header('Content-Type', 'application/json')
-                               self.end_headers()
-                               self.wfile.write(bytes("{}", "utf-8"))
-                       else:
-                               self.send_response(400)
-                               self.end_headers()                              
-               else:
-                       self.send_response(404)
-                       self.end_headers()
-               return
-
-
-# we will use standard python threaded HTTP server
-class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
-       allow_reuse_address = True
-
-       def shutdown(self):
-               self.socket.close()
-               HTTPServer.shutdown(self)
-
-class SimpleHttpServer:
-       def __init__(self, ip, port):
-               self.server = ThreadedHTTPServer((ip,port), HTTPRequestHandler)
-
-       def start(self):
-               self.server_thread = threading.Thread(target=self.server.serve_forever)
-               self.server_thread.daemon = True
-               self.server_thread.start()
-               
-       def waitForThread(self):
-               self.server_thread.join()
-
-       def stop(self):
-               self.server.shutdown()
-               self.waitForThread()
-
-
-# print usage instructions
-def usage():
-       print ('./gshub.py [-p port]')
-       
-       
-
-def main():
-       # process command-line arguments
-       try:
-               opts, args = getopt.getopt(sys.argv[1:], "hp:v", ["help", "port="])
-       except getopt.GetoptError as err:
-               # print help information and exit:
-               print(str(err)) 
-               usage()
-               sys.exit(2)
-
-       port = 0
-       for o, a in opts:
-               if o in ("-h", "--help"):
-                       usage()
-                       sys.exit(0)
-               elif o in ("-p", "--port"):
-                       port = int(a)
-               else:
-                       print ('Unknown command-line option ' + o)
-
-       # start HTTP server to serve REST calls
-       server = SimpleHttpServer('127.0.0.1', port)
-
-       # record HTTP server address in gshub.log
-       f = open('gshub.log', 'w')
-       f.write('127.0.0.1:' + str(server.server.server_port) + '\n')
-       f.close()
-               
-       print ('GSHUB Running on port ' + str(server.server.server_port) + ' ...')
-       server.start()
-       server.waitForThread()
-
-
-if __name__ == "__main__":
-       main()
-
+#!/usr/bin/python3.6\r
+\r
+# ------------------------------------------------\r
+#   Copyright 2014 AT&T Intellectual Property\r
+#   Licensed under the Apache License, Version 2.0 (the "License");\r
+#   you may not use this file except in compliance with the License.\r
+#   You may obtain a copy of the License at\r
+#\r
+#     http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#   Unless required by applicable law or agreed to in writing, software\r
+#   distributed under the License is distributed on an "AS IS" BASIS,\r
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#   See the License for the specific language governing permissions and\r
+#   limitations under the License.\r
+# -------------------------------------------\r
+\r
+# Implementation of GSHUB REST service\r
+# for announcement and discovery of gs instances, sources and sinks\r
+\r
+from http.server import BaseHTTPRequestHandler, HTTPServer\r
+from socketserver import ThreadingMixIn\r
+import threading\r
+import getopt\r
+import sys\r
+import re\r
+import cgi\r
+import socket\r
+import json\r
+\r
+# lis of URLS for all the REST calls we will serve\r
+DISCOVER_INSTANCE_URL = '/v1/discover-instance'\r
+DISCOVER_INITINSTANCE_URL = '/v1/discover-initialized-instance'\r
+DISCOVER_SOURCE_URL =  '/v1/discover-source'\r
+DISCOVER_SINK_URL =  '/v1/discover-sink'\r
+DISCOVER_STARTPROCESSING_URL = '/v1/discover-start-processing'\r
+ANNOUNCE_INSTANCE_URL =  '/v1/announce-instance'\r
+ANNOUNCE_INITINSTANCE_URL =  '/v1/announce-initialized-instance'\r
+ANNOUNCE_SOURCE_URL =  '/v1/announce-source'\r
+ANNOUNCE_SINK_URL =  '/v1/announce-sink'\r
+ANNOUNCE_STARTPROCESSING_URL = '/v1/announce-start-processing'\r
+ANNOUNCE_STREAM_SUBSCRIPTION = '/v1/announce-stream-subscription'\r
+ANNOUNCE_FTA_INSTANCE = '/v1/announce-fta-instance'\r
+ANNOUNCE_METRICS = '/v1/log-metrics'\r
+\r
+# gs instance endpoints\r
+gs_instances = {}\r
+\r
+# initialized gs instances\r
+gs_init_instances = {}\r
+\r
+# instances for which processing started\r
+gs_startprocessing_instances = {}\r
+\r
+# source endpoints\r
+sources = {}\r
+\r
+# sink endpoints\r
+sinks = {}\r
+\r
+\r
+# exctract endpoint information from json data\r
+def extract_endpoint(data) :\r
+       name = ''\r
+       ip = ''\r
+       port = 0\r
+               \r
+       try :\r
+               doc = json.loads(str(data, 'utf-8'))    \r
+       except :\r
+               print ('Invalid json message ' + str(data, 'utf-8'))\r
+               return []\r
+       \r
+       for key in doc.keys() :\r
+               if key == 'name' :\r
+                       name = doc[key]\r
+               elif key == 'ip' :\r
+                       ip = doc[key]\r
+                       # validate ip address           \r
+                       try :\r
+                               socket.inet_pton(socket.AF_INET, ip)\r
+                       except :\r
+                               print ('Invalid IPV4 address ' + ip)\r
+                               ip = ''\r
+               elif key == 'port' :\r
+                       # validate port number\r
+                       try :\r
+                               port = int(doc[key])\r
+                       except :\r
+                               print ('Invalid port number ' + doc[key])\r
+                               port = 0\r
+       \r
+       if name == '' or ip == '' or port == 0 :\r
+               print ('Name, ip or port is missing from json message ' + str(doc))\r
+               return []\r
+       \r
+\r
+       return [name, ip, port]\r
+\r
+\r
+# extract instance name from json data\r
+def extract_instance_name(data) :\r
+       name = ''\r
+       \r
+       try :\r
+               doc = json.loads(str(data, 'utf-8'))    \r
+       except :\r
+               print ('Invalid json message ' + str(data, "utf-8"))\r
+               return ''\r
+       \r
+       for key in doc.keys() :\r
+               if key == 'name' :\r
+                       name = doc[key]\r
+       \r
+       if name == '' :\r
+               print ('Name field is missing in json message ' + str(doc))\r
+       elif (name in gs_instances) == False:\r
+               print ('Attempt to announce the initialization or start of processing for unknown instance ' + name)\r
+               name = ''\r
+               \r
+       return name\r
+       \r
+# handler for HTTP requests. We will override do_PORT and do_GET of BaseHTTPRequestHandler\r
+class HTTPRequestHandler(BaseHTTPRequestHandler):\r
+\r
+       def do_POST(self):\r
+               if re.search(ANNOUNCE_INSTANCE_URL, self.path) != None:         \r
+                       if self.headers.get_content_type() == 'application/json' :\r
+                               # Find content length\r
+                               content_len = 0\r
+                               for i in range(len(self.headers.keys())):\r
+                                       if self.headers.keys()[i] == 'Content-Length' :\r
+                                               content_len = int (self.headers.values()[i])\r
+                                               break\r
+                               if content_len != 0 :\r
+                                       # extract endpoint information\r
+                                       endpoint = extract_endpoint(self.rfile.read(content_len))\r
+                                       if endpoint == [] :\r
+                                               self.send_response(400)\r
+                                       else :\r
+                                               self.send_response(200)\r
+                                               gs_instances[endpoint[0]] = [endpoint[1], endpoint[2]]\r
+                               else :\r
+                                       self.send_response(400)\r
+\r
+                       else:\r
+                               self.send_response(400)\r
+                       self.end_headers()              \r
+                       \r
+               elif re.search(ANNOUNCE_INITINSTANCE_URL, self.path) != None:\r
+                       if self.headers.get_content_type() == 'application/json' :\r
+                               # Find content length\r
+                               content_len = 0\r
+                               for i in range(len(self.headers.keys())):\r
+                                       if self.headers.keys()[i] == 'Content-Length' :\r
+                                               content_len = int (self.headers.values()[i])\r
+                                               break\r
+                               if content_len != 0 :\r
+                                       # extract name of initialized gs instance                               \r
+                                       name = extract_instance_name(self.rfile.read(content_len))\r
+                                       if name == '' :\r
+                                               self.send_response(400)\r
+                                       else :\r
+                                               self.send_response(200)\r
+                                               gs_init_instances[name] = 1\r
+                               else :\r
+                                       self.send_response(400)\r
+\r
+                       else:\r
+                               self.send_response(400)\r
+                       self.end_headers()      \r
+                       \r
+               elif re.search(ANNOUNCE_SOURCE_URL, self.path) != None:\r
+                       if self.headers.get_content_type() == 'application/json' :\r
+                               # Find content length\r
+                               content_len = 0\r
+                               for i in range(len(self.headers.keys())):\r
+                                       if self.headers.keys()[i] == 'Content-Length' :\r
+                                               content_len = int (self.headers.values()[i])\r
+                                               break\r
+                               if content_len != 0 :\r
+                                       # extract endpoint information                          \r
+                                       endpoint = extract_endpoint(self.rfile.read(content_len))\r
+                                       if endpoint == [] :\r
+                                               self.send_response(400)\r
+                                       else :\r
+                                               self.send_response(200)\r
+                                               sources[endpoint[0]] = [endpoint[1], endpoint[2]]\r
+                               else :\r
+                                       self.send_response(400)\r
+\r
+                       else:\r
+                               self.send_response(400)\r
+                       self.end_headers()\r
+                       \r
+               elif re.search(ANNOUNCE_SINK_URL, self.path) != None:\r
+                       if self.headers.get_content_type() == 'application/json' :\r
+                               # Find content length\r
+                               content_len = 0\r
+                               for i in range(len(self.headers.keys())):\r
+                                       if self.headers.keys()[i] == 'Content-Length' :\r
+                                               content_len = int (self.headers.values()[i])\r
+                                               break\r
+                               if content_len != 0 :\r
+                                       # extract endpoint information                          \r
+                                       endpoint = extract_endpoint(self.rfile.read(content_len))\r
+                                       if endpoint == [] :\r
+                                               self.send_response(400)\r
+                                       else :\r
+                                               self.send_response(200)\r
+                                               sinks[endpoint[0]] = [endpoint[1], endpoint[2]]\r
+                               else :\r
+                                       self.send_response(400)\r
+\r
+                       else:\r
+                               self.send_response(400)\r
+                       self.end_headers()      \r
+                       \r
+               elif re.search(ANNOUNCE_STARTPROCESSING_URL, self.path) != None:\r
+                       if self.headers.get_content_type() == 'application/json' :\r
+                               # Find content length\r
+                               content_len = 0\r
+                               for i in range(len(self.headers.keys())):\r
+                                       if self.headers.keys()[i] == 'Content-Length' :\r
+                                               content_len = int (self.headers.values()[i])\r
+                                               break\r
+                               if content_len != 0 :\r
+                                       # extract name of initialized gs instance                               \r
+                                       name = extract_instance_name(self.rfile.read(content_len))\r
+                                       if name == '' :\r
+                                               self.send_response(400)\r
+                                       else :\r
+                                               self.send_response(200)\r
+                                               gs_startprocessing_instances[name] = 1\r
+                               else :\r
+                                       self.send_response(400)\r
+\r
+                       else:\r
+                               self.send_response(400)\r
+                       self.end_headers()\r
+                       \r
+               # we do not do any processing for ANNOUNCE_STREAM_SUBSCRIPTION, ANNOUNCE_FTA_INSTANCE and ANNOUNCE_METRICS in gshub simulator           \r
+               elif (re.search(ANNOUNCE_STREAM_SUBSCRIPTION, self.path) != None) or (re.search(ANNOUNCE_FTA_INSTANCE, self.path) != None) or (re.search(ANNOUNCE_METRICS, self.path) != None):\r
+                       if self.headers.get_content_type() == 'application/json' :\r
+                               # Find content length\r
+                               content_len = 0\r
+                               for i in range(len(self.headers.keys())):\r
+                                       if self.headers.keys()[i] == 'Content-Length' :\r
+                                               content_len = int (self.headers.values()[i])\r
+                                               break\r
+                               if content_len != 0 :                           \r
+                                       self.send_response(200)\r
+                               else :\r
+                                       self.send_response(400)\r
+\r
+                       else:\r
+                               self.send_response(400)\r
+                       self.end_headers()      \r
+                       \r
+               else:\r
+                       self.send_response(404)\r
+                       self.end_headers()\r
+               return\r
+\r
+       def do_GET(self):\r
+               if re.search(DISCOVER_INSTANCE_URL + '/*', self.path) != None:\r
+                       instance = self.path.split('/')[-1]\r
+                       # check if this instance is registered\r
+                       if instance in gs_instances :\r
+                               self.send_response(200)\r
+                               self.send_header('Content-Type', 'application/json')\r
+                               self.end_headers()\r
+                               self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}", "utf-8"))\r
+                       else:\r
+                               self.send_response(400)\r
+                               self.end_headers()\r
+\r
+\r
+               elif re.search(DISCOVER_INITINSTANCE_URL + '/*', self.path) != None:\r
+                       instance = self.path.split('/')[-1]\r
+                       # check if this instance is initialized\r
+                       if instance in gs_init_instances :\r
+                               self.send_response(200)\r
+                               self.send_header('Content-Type', 'application/json')\r
+                               self.end_headers()\r
+                               self.wfile.write(bytes("{\"ip\" : \"" + gs_instances[instance][0] + "\", \"port\": " + str(gs_instances[instance][1]) + "}", "utf-8"))\r
+                       else:\r
+                               self.send_response(400)\r
+                               self.end_headers()\r
+\r
+               elif re.search(DISCOVER_SOURCE_URL + '/*', self.path) != None:\r
+                       source = self.path.split('/')[-1]\r
+                       # check if it is a registered source\r
+                       if source in sources :\r
+                               self.send_response(200)\r
+                               self.send_header('Content-Type', 'application/json')\r
+                               self.end_headers()\r
+                               self.wfile.write(bytes("{\"ip\" : \"" + sources[source][0] + "\", \"port\": " + str(sources[source][1]) + "}", "utf-8"))\r
+                       else:\r
+                               self.send_response(400)\r
+                               self.end_headers()\r
+                               \r
+               elif re.search(DISCOVER_SINK_URL + '/*', self.path) != None:\r
+                       sink = self.path.split('/')[-1]\r
+                       # check if it is a registered sink\r
+                       if sink in sinks :\r
+                               self.send_response(200)\r
+                               self.send_header('Content-Type', 'application/json')\r
+                               self.end_headers()\r
+                               self.wfile.write(bytes("{\"ip\" : \"" + sinks[sink][0] + "\", \"port\": " + str(sinks[sink][1]) + "}", "utf-8"))\r
+                       else:\r
+                               self.send_response(400)\r
+                               self.end_headers()\r
+                               \r
+               elif re.search(DISCOVER_STARTPROCESSING_URL + '/*', self.path) != None:\r
+                       instance = self.path.split('/')[-1]\r
+                       # check if this instance is initialized\r
+                       if instance in gs_startprocessing_instances :\r
+                               self.send_response(200)\r
+                               self.send_header('Content-Type', 'application/json')\r
+                               self.end_headers()\r
+                               self.wfile.write(bytes("{}", "utf-8"))\r
+                       else:\r
+                               self.send_response(400)\r
+                               self.end_headers()                              \r
+               else:\r
+                       self.send_response(404)\r
+                       self.end_headers()\r
+               return\r
+\r
+\r
+# we will use standard python threaded HTTP server\r
+class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):\r
+       allow_reuse_address = True\r
+\r
+       def shutdown(self):\r
+               self.socket.close()\r
+               HTTPServer.shutdown(self)\r
+\r
+class SimpleHttpServer:\r
+       def __init__(self, ip, port):\r
+               self.server = ThreadedHTTPServer((ip,port), HTTPRequestHandler)\r
+\r
+       def start(self):\r
+               self.server_thread = threading.Thread(target=self.server.serve_forever)\r
+               self.server_thread.daemon = True\r
+               self.server_thread.start()\r
+               \r
+       def waitForThread(self):\r
+               self.server_thread.join()\r
+\r
+       def stop(self):\r
+               self.server.shutdown()\r
+               self.waitForThread()\r
+\r
+\r
+# print usage instructions\r
+def usage():\r
+       print ('./gshub.py [-p port]')\r
+       \r
+       \r
+\r
+def main():\r
+       # process command-line arguments\r
+       try:\r
+               opts, args = getopt.getopt(sys.argv[1:], "hp:v", ["help", "port="])\r
+       except getopt.GetoptError as err:\r
+               # print help information and exit:\r
+               print(str(err)) \r
+               usage()\r
+               sys.exit(2)\r
+\r
+       port = 0\r
+       for o, a in opts:\r
+               if o in ("-h", "--help"):\r
+                       usage()\r
+                       sys.exit(0)\r
+               elif o in ("-p", "--port"):\r
+                       port = int(a)\r
+               else:\r
+                       print ('Unknown command-line option ' + o)\r
+\r
+       # start HTTP server to serve REST calls\r
+       server = SimpleHttpServer('127.0.0.1', port)\r
+\r
+       # record HTTP server address in gshub.log\r
+       f = open('gshub.log', 'w')\r
+       f.write('127.0.0.1:' + str(server.server.server_port) + '\n')\r
+       f.close()\r
+               \r
+       print ('GSHUB Running on port ' + str(server.server.server_port) + ' ...')\r
+       server.start()\r
+       server.waitForThread()\r
+\r
+\r
+if __name__ == "__main__":\r
+       main()\r
+\r