+ jobHandler := jobs.NewJobHandlerImpl("configs/type_config.json", retryClient, &http.Client{
+ Timeout: time.Second * 5,
+ })
+ if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
+ log.Fatalf("Stopping producer due to: %v", err)
+ }
+
+ log.Debug("Starting DMaaP Mediator Producer")
+ go func() {
+ log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
+ r := server.NewRouter(jobHandler)
+ log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
+ }()
+
+ go jobHandler.RunJobs(configuration.DMaaPMRAddress)
+
+ keepProducerAlive()
+}
+
+func validateConfiguration(configuration *config.Config) error {