- registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress)
- if types, err := jobtypes.GetTypes(); err == nil {
- if regErr := registrator.RegisterTypes(types); regErr != nil {
- log.Fatalf("Unable to register all types due to: %v", regErr)
+ jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 10*time.Second))
+ if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
+ log.Fatalf("Stopping producer due to: %v", err)
+ }
+ jobsManager.StartJobsForAllTypes()
+
+ log.Debug("Starting DMaaP Mediator Producer")
+ go func() {
+ log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
+ r := server.NewRouter(jobsManager)
+ if restclient.IsUrlSecure(callbackAddress) {
+ log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
+ } else {
+ log.Fatalf("Server stopped: %v", http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))