log.Fatalf("Stopping producer due to error: %v", err)
}
- jobHandler := jobs.NewJobHandlerImpl("configs/type_config.json", retryClient, &http.Client{
+ jobsManager := jobs.NewJobsManagerImpl("configs/type_config.json", retryClient, configuration.DMaaPMRAddress, &http.Client{
Timeout: time.Second * 5,
})
- if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
+ if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
log.Fatalf("Stopping producer due to: %v", err)
}
+ jobsManager.StartJobs()
log.Debug("Starting DMaaP Mediator Producer")
go func() {
log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
- r := server.NewRouter(jobHandler)
+ r := server.NewRouter(jobsManager)
log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
}()
- go jobHandler.RunJobs(configuration.DMaaPMRAddress)
-
keepProducerAlive()
}
return rawRetryClient.StandardClient()
}
-func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
+func registerTypesAndProducer(jobHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
- if types, err := jobHandler.GetTypes(); err == nil {
+ if types, err := jobHandler.LoadTypesFromConfiguration(); err == nil {
if regErr := registrator.RegisterTypes(types); regErr != nil {
return fmt.Errorf("unable to register all types due to: %v", regErr)
}