-func main() {
- log.Debug("Starting DMaaP Mediator Producer")
- wg := new(sync.WaitGroup)
-
- // add two goroutines to `wg` WaitGroup, one for each avilable server
- wg.Add(3)
-
- log.Debugf("Starting status callback server at port %v", configuration.InfoProducerSupervisionCallbackPort)
- go func() {
- server := server.CreateServer(configuration.InfoProducerSupervisionCallbackPort, server.StatusHandler)
- log.Warn(server.ListenAndServe())
- wg.Done()
- }()
-
- go func() {
- server := server.CreateServer(configuration.InfoJobCallbackPort, server.CreateInfoJobHandler)
- log.Warn(server.ListenAndServe())
- wg.Done()
- }()
-
- go func() {
- jobs.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort))
- wg.Done()
- }()
-
- // wait until WaitGroup is done
- wg.Wait()
- log.Debug("Stopping DMaaP Mediator Producer")
+func startCallbackServer(jobsManager jobs.JobsManager, callbackAddress string) {
+ log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
+ r := server.NewRouter(jobsManager, statusHandler)
+ addSwaggerHandler(r)
+ if restclient.IsUrlSecure(callbackAddress) {
+ log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
+ } else {
+ log.Fatalf("Server stopped: %v", http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))
+ }
+}
+
+type ProducerStatus struct {
+ // The registration status of the producer in Information Coordinator Service. Either `registered` or `not registered`
+ RegisteredStatus string `json:"registeredStatus" swaggertype:"string" example:"registered"`
+} // @name ProducerStatus
+
+// @Summary Get status
+// @Description Get the status of the producer. Will show if the producer has registered in ICS.
+// @Tags Data producer (callbacks)
+// @Produce json
+// @Success 200 {object} ProducerStatus
+// @Router /health_check [get]
+func statusHandler(w http.ResponseWriter, r *http.Request) {
+ status := ProducerStatus{
+ RegisteredStatus: "not registered",
+ }
+ if registered {
+ status.RegisteredStatus = "registered"
+ }
+ json.NewEncoder(w).Encode(status)
+}
+
+// @Summary Get Swagger Documentation
+// @Description Get the Swagger API documentation for the producer.
+// @Tags Admin
+// @Success 200
+// @Router /swagger [get]
+func addSwaggerHandler(r *mux.Router) {
+ r.PathPrefix("/swagger").Handler(httpSwagger.WrapHandler)
+}
+
+func keepProducerAlive() {
+ forever := make(chan int)
+ <-forever