Add kafka jobs to DMaaP Mediator Producer
[nonrtric.git] / dmaap-mediator-producer / internal / config / config.go
index 9b7b1dd..7582e9c 100644 (file)
 package config
 
 import (
+       "encoding/json"
+       "fmt"
        "os"
+       "path/filepath"
        "strconv"
 
        log "github.com/sirupsen/logrus"
 )
 
 type Config struct {
-       LogLevel               string
+       LogLevel               log.Level
        InfoProducerHost       string
        InfoProducerPort       int
        InfoCoordinatorAddress string
-       MRHost                 string
-       MRPort                 int
-}
-
-type ProducerRegistrationInfo struct {
-       InfoProducerSupervisionCallbackUrl string   `json:"info_producer_supervision_callback_url"`
-       SupportedInfoTypes                 []string `json:"supported_info_types"`
-       InfoJobCallbackUrl                 string   `json:"info_job_callback_url"`
+       DMaaPMRAddress         string
+       KafkaBootstrapServers  string
+       ProducerCertPath       string
+       ProducerKeyPath        string
 }
 
 func New() *Config {
        return &Config{
-               LogLevel:               getEnv("LOG_LEVEL", "Info"),
                InfoProducerHost:       getEnv("INFO_PRODUCER_HOST", ""),
                InfoProducerPort:       getEnvAsInt("INFO_PRODUCER_PORT", 8085),
-               InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
-               MRHost:                 getEnv("MR_HOST", "http://message-router.onap"),
-               MRPort:                 getEnvAsInt("MR_PORT", 3904),
+               InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://informationservice:8434"),
+               DMaaPMRAddress:         getEnv("DMAAP_MR_ADDR", "https://message-router.onap:3905"),
+               KafkaBootstrapServers:  getEnv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),
+               ProducerCertPath:       getEnv("PRODUCER_CERT_PATH", "security/producer.crt"),
+               ProducerKeyPath:        getEnv("PRODUCER_KEY_PATH", "security/producer.key"),
+               LogLevel:               getLogLevel(),
        }
 }
 
+func (c Config) String() string {
+       return fmt.Sprintf("InfoProducerHost: %v, InfoProducerPort: %v, InfoCoordinatorAddress: %v, DMaaPMRAddress: %v, ProducerCertPath: %v, ProducerKeyPath: %v, LogLevel: %v", c.InfoProducerHost, c.InfoProducerPort, c.InfoCoordinatorAddress, c.DMaaPMRAddress, c.ProducerCertPath, c.ProducerKeyPath, c.LogLevel)
+}
 func getEnv(key string, defaultVal string) string {
        if value, exists := os.LookupEnv(key); exists {
                return value
@@ -71,3 +75,59 @@ func getEnvAsInt(name string, defaultVal int) int {
 
        return defaultVal
 }
+
+func getLogLevel() log.Level {
+       logLevelStr := getEnv("LOG_LEVEL", "Info")
+       if loglevel, err := log.ParseLevel(logLevelStr); err == nil {
+               return loglevel
+       } else {
+               log.Warnf("Invalid log level: %v. Log level will be Info!", logLevelStr)
+               return log.InfoLevel
+       }
+}
+
+func GetJobTypesFromConfiguration(configFolder string) ([]TypeDefinition, error) {
+       typeDefsByte, err := os.ReadFile(filepath.Join(configFolder, "type_config.json"))
+       if err != nil {
+               return nil, err
+       }
+       typeDefs := struct {
+               Types []TypeDefinition `json:"types"`
+       }{}
+       err = json.Unmarshal(typeDefsByte, &typeDefs)
+       if err != nil {
+               return nil, err
+       }
+
+       kafkaTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaKafka.json"))
+       if err != nil {
+               return nil, err
+       }
+
+       dMaaPTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaDmaap.json"))
+       if err != nil {
+               return nil, err
+       }
+
+       for i, typeDef := range typeDefs.Types {
+               if typeDef.IsKafkaType() {
+                       typeDefs.Types[i].TypeSchema = kafkaTypeSchema
+               } else {
+                       typeDefs.Types[i].TypeSchema = dMaaPTypeSchema
+               }
+       }
+       return typeDefs.Types, nil
+}
+
+func getTypeSchema(schemaFile string) (interface{}, error) {
+       typeDefsByte, err := os.ReadFile(schemaFile)
+       if err != nil {
+               return nil, err
+       }
+       var schema interface{}
+       err = json.Unmarshal(typeDefsByte, &schema)
+       if err != nil {
+               return nil, err
+       }
+       return schema, nil
+}