+ log.Debug("Starting DMaaP Mediator Producer")
+ go func() {
+ log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
+ r := server.NewRouter(jobsManager)
+ if restclient.IsUrlSecure(callbackAddress) {
+ log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
+ } else {
+ log.Fatalf("Server stopped: %v", http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))
+ }
+ }()
+
+ keepProducerAlive()
+}
+
+func validateConfiguration(configuration *config.Config) error {
+ if configuration.InfoProducerHost == "" {
+ return fmt.Errorf("missing INFO_PRODUCER_HOST")
+ }
+ if configuration.ProducerCertPath == "" || configuration.ProducerKeyPath == "" {
+ return fmt.Errorf("missing PRODUCER_CERT and/or PRODUCER_KEY")
+ }
+ return nil
+}
+func registerTypesAndProducer(jobTypesHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
+ registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
+ if types, err := jobTypesHandler.LoadTypesFromConfiguration(); err == nil {