-func main() {
- log.Debug("Starting DMaaP Mediator Producer")
- wg := new(sync.WaitGroup)
-
- // add two goroutines to `wg` WaitGroup, one for each avilable server
- wg.Add(2)
-
- log.Debugf("Starting status callback server at port %v", configuration.InfoProducerSupervisionCallbackPort)
- go func() {
- server := server.CreateServer(configuration.InfoProducerSupervisionCallbackPort, server.StatusHandler)
- log.Warn(server.ListenAndServe())
- wg.Done()
- }()
-
- go func() {
- server := server.CreateServer(configuration.InfoJobCallbackPort, server.CreateInfoJobHandler)
- log.Warn(server.ListenAndServe())
- wg.Done()
- }()
-
- // wait until WaitGroup is done
- wg.Wait()
- log.Debug("Stopping DMaaP Mediator Producer")
+func startCallbackServer(jobsManager jobs.JobsManager, callbackAddress string) {
+ log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
+ r := server.NewRouter(jobsManager, statusHandler)
+ if restclient.IsUrlSecure(callbackAddress) {
+ log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
+ } else {
+ log.Fatalf("Server stopped: %v", http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))
+ }
+}
+
+func statusHandler(w http.ResponseWriter, r *http.Request) {
+ registeredStatus := "not registered"
+ if registered {
+ registeredStatus = "registered"
+ }
+ fmt.Fprintf(w, `{"status": "%v"}`, registeredStatus)
+}
+
+func keepProducerAlive() {
+ forever := make(chan int)
+ <-forever