Update Dmaap topic
[nonrtric.git] / dmaap-mediator-producer / internal / config / config.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package config
22
23 import (
24         "encoding/json"
25         "fmt"
26         "os"
27         "path/filepath"
28         "strconv"
29
30         log "github.com/sirupsen/logrus"
31 )
32
33 type Config struct {
34         LogLevel               log.Level
35         InfoProducerHost       string
36         InfoProducerPort       int
37         InfoCoordinatorAddress string
38         DMaaPMRAddress         string
39         KafkaBootstrapServers  string
40         ProducerCertPath       string
41         ProducerKeyPath        string
42 }
43
44 func New() *Config {
45         return &Config{
46                 InfoProducerHost:       getEnv("INFO_PRODUCER_HOST", ""),
47                 InfoProducerPort:       getEnvAsInt("INFO_PRODUCER_PORT", 8085),
48                 InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://informationservice:8434"),
49                 DMaaPMRAddress:         getEnv("DMAAP_MR_ADDR", "https://message-router.onap:3905"),
50                 KafkaBootstrapServers:  getEnv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),
51                 ProducerCertPath:       getEnv("PRODUCER_CERT_PATH", "security/producer.crt"),
52                 ProducerKeyPath:        getEnv("PRODUCER_KEY_PATH", "security/producer.key"),
53                 LogLevel:               getLogLevel(),
54         }
55 }
56
57 func (c Config) String() string {
58         return fmt.Sprintf("InfoProducerHost: %v, InfoProducerPort: %v, InfoCoordinatorAddress: %v, DMaaPMRAddress: %v, ProducerCertPath: %v, ProducerKeyPath: %v, LogLevel: %v", c.InfoProducerHost, c.InfoProducerPort, c.InfoCoordinatorAddress, c.DMaaPMRAddress, c.ProducerCertPath, c.ProducerKeyPath, c.LogLevel)
59 }
60 func getEnv(key string, defaultVal string) string {
61         if value, exists := os.LookupEnv(key); exists {
62                 return value
63         }
64
65         return defaultVal
66 }
67
68 func getEnvAsInt(name string, defaultVal int) int {
69         valueStr := getEnv(name, "")
70         if value, err := strconv.Atoi(valueStr); err == nil {
71                 return value
72         } else if valueStr != "" {
73                 log.Warnf("Invalid int value: %v for variable: %v. Default value: %v will be used", valueStr, name, defaultVal)
74         }
75
76         return defaultVal
77 }
78
79 func getLogLevel() log.Level {
80         logLevelStr := getEnv("LOG_LEVEL", "Info")
81         if loglevel, err := log.ParseLevel(logLevelStr); err == nil {
82                 return loglevel
83         } else {
84                 log.Warnf("Invalid log level: %v. Log level will be Info!", logLevelStr)
85                 return log.InfoLevel
86         }
87 }
88
89 func GetJobTypesFromConfiguration(configFolder string) ([]TypeDefinition, error) {
90         typeDefsByte, err := os.ReadFile(filepath.Join(configFolder, "type_config.json"))
91         if err != nil {
92                 return nil, err
93         }
94         typeDefs := struct {
95                 Types []TypeDefinition `json:"types"`
96         }{}
97         err = json.Unmarshal(typeDefsByte, &typeDefs)
98         if err != nil {
99                 return nil, err
100         }
101
102         kafkaTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaKafka.json"))
103         if err != nil {
104                 return nil, err
105         }
106
107         dMaaPTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaDmaap.json"))
108         if err != nil {
109                 return nil, err
110         }
111
112         for i, typeDef := range typeDefs.Types {
113                 if typeDef.IsKafkaType() {
114                         typeDefs.Types[i].TypeSchema = kafkaTypeSchema
115                 } else {
116                         typeDefs.Types[i].TypeSchema = dMaaPTypeSchema
117                 }
118         }
119         return typeDefs.Types, nil
120 }
121
122 func getTypeSchema(schemaFile string) (interface{}, error) {
123         typeDefsByte, err := os.ReadFile(schemaFile)
124         if err != nil {
125                 return nil, err
126         }
127         var schema interface{}
128         err = json.Unmarshal(typeDefsByte, &schema)
129         if err != nil {
130                 return nil, err
131         }
132         return schema, nil
133 }