"oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
-func registerTypesAndProducer(jobTypesHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
+func registerTypesAndProducer(jobTypesManager jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
configTypes, err := config.GetJobTypesFromConfiguration("configs")
if err != nil {
return fmt.Errorf("unable to register all types due to: %v", err)
}
registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
configTypes, err := config.GetJobTypesFromConfiguration("configs")
if err != nil {
return fmt.Errorf("unable to register all types due to: %v", err)
}
if regErr != nil {
return fmt.Errorf("unable to register all types due to: %v", regErr)
}
producer := config.ProducerRegistrationInfo{
InfoProducerSupervisionCallbackUrl: callbackAddress + server.HealthCheckPath,
if regErr != nil {
return fmt.Errorf("unable to register all types due to: %v", regErr)
}
producer := config.ProducerRegistrationInfo{
InfoProducerSupervisionCallbackUrl: callbackAddress + server.HealthCheckPath,
InfoJobCallbackUrl: callbackAddress + server.AddJobPath,
}
if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
InfoJobCallbackUrl: callbackAddress + server.AddJobPath,
}
if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
func startCallbackServer(jobsManager jobs.JobsManager, callbackAddress string) {
log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
r := server.NewRouter(jobsManager, statusHandler)
func startCallbackServer(jobsManager jobs.JobsManager, callbackAddress string) {
log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
r := server.NewRouter(jobsManager, statusHandler)
if restclient.IsUrlSecure(callbackAddress) {
log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
} else {
if restclient.IsUrlSecure(callbackAddress) {
log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
} else {
func statusHandler(w http.ResponseWriter, r *http.Request) {
registeredStatus := "not registered"
if registered {
func statusHandler(w http.ResponseWriter, r *http.Request) {
registeredStatus := "not registered"
if registered {
+// @Summary Get Swagger Documentation
+// @Description Get the Swagger API documentation for the producer.
+// @Tags Admin
+// @Success 200
+// @Router /swagger [get]
+func addSwaggerHandler(r *mux.Router) {
+ r.PathPrefix("/swagger").Handler(httpSwagger.WrapHandler)
+}
+