X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-mediator-producer%2Fmain.go;h=380087f3c495e185d6daeab9f9126b900a4cb050;hb=b3896f4ad7912be9e12c05e7d4770fa39752d797;hp=cdfde5798aeef719a2d8eb1bba21c26ad078c669;hpb=c4960f1dc6688b039d6ee29da03e0c0de47b6fbb;p=nonrtric.git diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go index cdfde579..380087f3 100644 --- a/dmaap-mediator-producer/main.go +++ b/dmaap-mediator-producer/main.go @@ -55,22 +55,21 @@ func main() { log.Fatalf("Stopping producer due to error: %v", err) } - jobHandler := jobs.NewJobHandlerImpl("configs/type_config.json", retryClient, &http.Client{ + jobsManager := jobs.NewJobsManagerImpl("configs/type_config.json", retryClient, configuration.DMaaPMRAddress, &http.Client{ Timeout: time.Second * 5, }) - if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil { + if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil { log.Fatalf("Stopping producer due to: %v", err) } + jobsManager.StartJobs() log.Debug("Starting DMaaP Mediator Producer") go func() { log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort) - r := server.NewRouter(jobHandler) + r := server.NewRouter(jobsManager) log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r)) }() - go jobHandler.RunJobs(configuration.DMaaPMRAddress) - keepProducerAlive() } @@ -88,7 +87,7 @@ func createClientCertificate() (*tls.Certificate, error) { if cert, err := tls.LoadX509KeyPair(configuration.ProducerCertPath, configuration.ProducerKeyPath); err == nil { return &cert, nil } else { - return nil, fmt.Errorf("cannot create x509 keypair from cert file %s and key file %s", configuration.ProducerCertPath, configuration.ProducerKeyPath) + return nil, fmt.Errorf("cannot create x509 keypair from cert file %s and key file %s due to: %v", configuration.ProducerCertPath, configuration.ProducerKeyPath, err) } } @@ -108,9 +107,9 @@ func createRetryClient(cert *tls.Certificate) *http.Client { return rawRetryClient.StandardClient() } -func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error { +func registerTypesAndProducer(jobHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error { registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client) - if types, err := jobHandler.GetTypes(); err == nil { + if types, err := jobHandler.LoadTypesFromConfiguration(); err == nil { if regErr := registrator.RegisterTypes(types); regErr != nil { return fmt.Errorf("unable to register all types due to: %v", regErr) }