Update documentation for DMaaP Mediator Producer
[nonrtric.git] / dmaap-mediator-producer / main.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package main
22
23 import (
24         "crypto/tls"
25         "fmt"
26         "net/http"
27         "time"
28
29         "github.com/gorilla/mux"
30         log "github.com/sirupsen/logrus"
31         _ "oransc.org/nonrtric/dmaapmediatorproducer/docs"
32         "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
33         "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
34         "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
35         "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
36         "oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
37
38         httpSwagger "github.com/swaggo/http-swagger"
39 )
40
41 var configuration *config.Config
42 var registered bool
43
44 func init() {
45         configuration = config.New()
46 }
47
48 // @title DMaaP Mediator Producer
49 // @version 1.1.0
50
51 // @license.name  Apache 2.0
52 // @license.url   http://www.apache.org/licenses/LICENSE-2.0.html
53
54 func main() {
55         log.SetLevel(configuration.LogLevel)
56         log.Debug("Initializing DMaaP Mediator Producer")
57         log.Debug("Using configuration: ", configuration)
58         if err := validateConfiguration(configuration); err != nil {
59                 log.Fatalf("Stopping producer due to error: %v", err)
60         }
61         callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
62
63         var cert tls.Certificate
64         if c, err := restclient.CreateClientCertificate(configuration.ProducerCertPath, configuration.ProducerKeyPath); err == nil {
65                 cert = c
66         } else {
67                 log.Fatalf("Stopping producer due to error: %v", err)
68         }
69
70         retryClient := restclient.CreateRetryClient(cert)
71         kafkaFactory := kafkaclient.KafkaFactoryImpl{BootstrapServer: configuration.KafkaBootstrapServers}
72         distributionClient := restclient.CreateClientWithoutRetry(cert, 10*time.Second)
73
74         jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, kafkaFactory, distributionClient)
75         go startCallbackServer(jobsManager, callbackAddress)
76
77         if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
78                 log.Fatalf("Stopping producer due to: %v", err)
79         }
80         registered = true
81         jobsManager.StartJobsForAllTypes()
82
83         log.Debug("Starting DMaaP Mediator Producer")
84
85         keepProducerAlive()
86 }
87
88 func validateConfiguration(configuration *config.Config) error {
89         if configuration.InfoProducerHost == "" {
90                 return fmt.Errorf("missing INFO_PRODUCER_HOST")
91         }
92         if configuration.ProducerCertPath == "" || configuration.ProducerKeyPath == "" {
93                 return fmt.Errorf("missing PRODUCER_CERT and/or PRODUCER_KEY")
94         }
95         if configuration.DMaaPMRAddress == "" && configuration.KafkaBootstrapServers == "" {
96                 return fmt.Errorf("at least one of DMAAP_MR_ADDR or KAFKA_BOOTSRAP_SERVERS must be provided")
97         }
98         return nil
99 }
100 func registerTypesAndProducer(jobTypesManager jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
101         registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
102         configTypes, err := config.GetJobTypesFromConfiguration("configs")
103         if err != nil {
104                 return fmt.Errorf("unable to register all types due to: %v", err)
105         }
106         regErr := registrator.RegisterTypes(jobTypesManager.LoadTypesFromConfiguration(configTypes))
107         if regErr != nil {
108                 return fmt.Errorf("unable to register all types due to: %v", regErr)
109         }
110
111         producer := config.ProducerRegistrationInfo{
112                 InfoProducerSupervisionCallbackUrl: callbackAddress + server.HealthCheckPath,
113                 SupportedInfoTypes:                 jobTypesManager.GetSupportedTypes(),
114                 InfoJobCallbackUrl:                 callbackAddress + server.AddJobPath,
115         }
116         if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
117                 return fmt.Errorf("unable to register producer due to: %v", err)
118         }
119         return nil
120 }
121
122 func startCallbackServer(jobsManager jobs.JobsManager, callbackAddress string) {
123         log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
124         r := server.NewRouter(jobsManager, statusHandler)
125         addSwaggerHandler(r)
126         if restclient.IsUrlSecure(callbackAddress) {
127                 log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
128         } else {
129                 log.Fatalf("Server stopped: %v", http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))
130         }
131 }
132
133 // @Summary Get status
134 // @Description Get the status of the producer. Will show if the producer has registered in ICS.
135 // @Tags Data producer (callbacks)
136 // @Success 200
137 // @Router /health_check [get]
138 func statusHandler(w http.ResponseWriter, r *http.Request) {
139         registeredStatus := "not registered"
140         if registered {
141                 registeredStatus = "registered"
142         }
143         fmt.Fprintf(w, `{"status": "%v"}`, registeredStatus)
144 }
145
146 // @Summary Get Swagger Documentation
147 // @Description Get the Swagger API documentation for the producer.
148 // @Tags Admin
149 // @Success 200
150 // @Router /swagger [get]
151 func addSwaggerHandler(r *mux.Router) {
152         r.PathPrefix("/swagger").Handler(httpSwagger.WrapHandler)
153 }
154
155 func keepProducerAlive() {
156         forever := make(chan int)
157         <-forever
158 }