Uplift from master branch
[nonrtric.git] / dmaap-mediator-producer / internal / config / config.go
index e03c40a..7582e9c 100644 (file)
@@ -24,6 +24,7 @@ import (
        "encoding/json"
        "fmt"
        "os"
+       "path/filepath"
        "strconv"
 
        log "github.com/sirupsen/logrus"
@@ -35,6 +36,7 @@ type Config struct {
        InfoProducerPort       int
        InfoCoordinatorAddress string
        DMaaPMRAddress         string
+       KafkaBootstrapServers  string
        ProducerCertPath       string
        ProducerKeyPath        string
 }
@@ -45,6 +47,7 @@ func New() *Config {
                InfoProducerPort:       getEnvAsInt("INFO_PRODUCER_PORT", 8085),
                InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://informationservice:8434"),
                DMaaPMRAddress:         getEnv("DMAAP_MR_ADDR", "https://message-router.onap:3905"),
+               KafkaBootstrapServers:  getEnv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),
                ProducerCertPath:       getEnv("PRODUCER_CERT_PATH", "security/producer.crt"),
                ProducerKeyPath:        getEnv("PRODUCER_KEY_PATH", "security/producer.key"),
                LogLevel:               getLogLevel(),
@@ -83,8 +86,8 @@ func getLogLevel() log.Level {
        }
 }
 
-func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) {
-       typeDefsByte, err := os.ReadFile(configFile)
+func GetJobTypesFromConfiguration(configFolder string) ([]TypeDefinition, error) {
+       typeDefsByte, err := os.ReadFile(filepath.Join(configFolder, "type_config.json"))
        if err != nil {
                return nil, err
        }
@@ -96,5 +99,35 @@ func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) {
                return nil, err
        }
 
+       kafkaTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaKafka.json"))
+       if err != nil {
+               return nil, err
+       }
+
+       dMaaPTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaDmaap.json"))
+       if err != nil {
+               return nil, err
+       }
+
+       for i, typeDef := range typeDefs.Types {
+               if typeDef.IsKafkaType() {
+                       typeDefs.Types[i].TypeSchema = kafkaTypeSchema
+               } else {
+                       typeDefs.Types[i].TypeSchema = dMaaPTypeSchema
+               }
+       }
        return typeDefs.Types, nil
 }
+
+func getTypeSchema(schemaFile string) (interface{}, error) {
+       typeDefsByte, err := os.ReadFile(schemaFile)
+       if err != nil {
+               return nil, err
+       }
+       var schema interface{}
+       err = json.Unmarshal(typeDefsByte, &schema)
+       if err != nil {
+               return nil, err
+       }
+       return schema, nil
+}