- jobHandler := jobs.NewJobHandlerImpl("configs/type_config.json", retryClient, &http.Client{
+ jobsManager := jobs.NewJobsManagerImpl("configs/type_config.json", retryClient, configuration.DMaaPMRAddress, &http.Client{
- if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
+ if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
log.Debug("Starting DMaaP Mediator Producer")
go func() {
log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
log.Debug("Starting DMaaP Mediator Producer")
go func() {
log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
}()
log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
}()
if cert, err := tls.LoadX509KeyPair(configuration.ProducerCertPath, configuration.ProducerKeyPath); err == nil {
return &cert, nil
} else {
if cert, err := tls.LoadX509KeyPair(configuration.ProducerCertPath, configuration.ProducerKeyPath); err == nil {
return &cert, nil
} else {
- return nil, fmt.Errorf("cannot create x509 keypair from cert file %s and key file %s", configuration.ProducerCertPath, configuration.ProducerKeyPath)
+ return nil, fmt.Errorf("cannot create x509 keypair from cert file %s and key file %s due to: %v", configuration.ProducerCertPath, configuration.ProducerKeyPath, err)
-func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
+func registerTypesAndProducer(jobHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
if regErr := registrator.RegisterTypes(types); regErr != nil {
return fmt.Errorf("unable to register all types due to: %v", regErr)
}
if regErr := registrator.RegisterTypes(types); regErr != nil {
return fmt.Errorf("unable to register all types due to: %v", regErr)
}