Merge "Updates of function test for f-release"
[nonrtric.git] / dmaap-mediator-producer / main.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package main
22
23 import (
24         "crypto/tls"
25         "fmt"
26         "net/http"
27         "time"
28
29         log "github.com/sirupsen/logrus"
30         "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
31         "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
32         "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
33         "oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
34 )
35
36 var configuration *config.Config
37 var registered bool
38
39 func init() {
40         configuration = config.New()
41 }
42
43 func main() {
44         log.SetLevel(configuration.LogLevel)
45         log.Debug("Initializing DMaaP Mediator Producer")
46         log.Debug("Using configuration: ", configuration)
47         if err := validateConfiguration(configuration); err != nil {
48                 log.Fatalf("Stopping producer due to error: %v", err)
49         }
50         callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
51
52         var cert tls.Certificate
53         if c, err := restclient.CreateClientCertificate(configuration.ProducerCertPath, configuration.ProducerKeyPath); err == nil {
54                 cert = c
55         } else {
56                 log.Fatalf("Stopping producer due to error: %v", err)
57         }
58         retryClient := restclient.CreateRetryClient(cert)
59
60         jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 10*time.Second))
61         go startCallbackServer(jobsManager, callbackAddress)
62
63         if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
64                 log.Fatalf("Stopping producer due to: %v", err)
65         }
66         registered = true
67         jobsManager.StartJobsForAllTypes()
68
69         log.Debug("Starting DMaaP Mediator Producer")
70
71         keepProducerAlive()
72 }
73
74 func validateConfiguration(configuration *config.Config) error {
75         if configuration.InfoProducerHost == "" {
76                 return fmt.Errorf("missing INFO_PRODUCER_HOST")
77         }
78         if configuration.ProducerCertPath == "" || configuration.ProducerKeyPath == "" {
79                 return fmt.Errorf("missing PRODUCER_CERT and/or PRODUCER_KEY")
80         }
81         return nil
82 }
83 func registerTypesAndProducer(jobTypesHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
84         registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
85         configTypes, err := config.GetJobTypesFromConfiguration("configs/type_config.json")
86         if err != nil {
87                 return fmt.Errorf("unable to register all types due to: %v", err)
88         }
89         regErr := registrator.RegisterTypes(jobTypesHandler.LoadTypesFromConfiguration(configTypes))
90         if regErr != nil {
91                 return fmt.Errorf("unable to register all types due to: %v", regErr)
92         }
93
94         producer := config.ProducerRegistrationInfo{
95                 InfoProducerSupervisionCallbackUrl: callbackAddress + server.HealthCheckPath,
96                 SupportedInfoTypes:                 jobTypesHandler.GetSupportedTypes(),
97                 InfoJobCallbackUrl:                 callbackAddress + server.AddJobPath,
98         }
99         if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
100                 return fmt.Errorf("unable to register producer due to: %v", err)
101         }
102         return nil
103 }
104
105 func startCallbackServer(jobsManager jobs.JobsManager, callbackAddress string) {
106         log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
107         r := server.NewRouter(jobsManager, statusHandler)
108         if restclient.IsUrlSecure(callbackAddress) {
109                 log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
110         } else {
111                 log.Fatalf("Server stopped: %v", http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))
112         }
113 }
114
115 func statusHandler(w http.ResponseWriter, r *http.Request) {
116         registeredStatus := "not registered"
117         if registered {
118                 registeredStatus = "registered"
119         }
120         fmt.Fprintf(w, `{"status": "%v"}`, registeredStatus)
121 }
122
123 func keepProducerAlive() {
124         forever := make(chan int)
125         <-forever
126 }