X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=dmaap-mediator-producer%2Finternal%2Fconfig%2Fconfig.go;h=7582e9cc7bf60526f053da8328c5a2925dfb863b;hb=df5eeb6e3fe42f87ac399f624edef20c87d1e475;hp=34b056d59ad806b47363a8bf943a647b2d41ad14;hpb=2389606af9e77879c76e2f87822b5b7d68920d19;p=nonrtric.git diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go index 34b056d5..7582e9cc 100644 --- a/dmaap-mediator-producer/internal/config/config.go +++ b/dmaap-mediator-producer/internal/config/config.go @@ -24,6 +24,7 @@ import ( "encoding/json" "fmt" "os" + "path/filepath" "strconv" log "github.com/sirupsen/logrus" @@ -35,6 +36,7 @@ type Config struct { InfoProducerPort int InfoCoordinatorAddress string DMaaPMRAddress string + KafkaBootstrapServers string ProducerCertPath string ProducerKeyPath string } @@ -43,8 +45,9 @@ func New() *Config { return &Config{ InfoProducerHost: getEnv("INFO_PRODUCER_HOST", ""), InfoProducerPort: getEnvAsInt("INFO_PRODUCER_PORT", 8085), - InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://enrichmentservice:8434"), + InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://informationservice:8434"), DMaaPMRAddress: getEnv("DMAAP_MR_ADDR", "https://message-router.onap:3905"), + KafkaBootstrapServers: getEnv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"), ProducerCertPath: getEnv("PRODUCER_CERT_PATH", "security/producer.crt"), ProducerKeyPath: getEnv("PRODUCER_KEY_PATH", "security/producer.key"), LogLevel: getLogLevel(), @@ -83,8 +86,8 @@ func getLogLevel() log.Level { } } -func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) { - typeDefsByte, err := os.ReadFile(configFile) +func GetJobTypesFromConfiguration(configFolder string) ([]TypeDefinition, error) { + typeDefsByte, err := os.ReadFile(filepath.Join(configFolder, "type_config.json")) if err != nil { return nil, err } @@ -96,5 +99,35 @@ func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) { return nil, err } + kafkaTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaKafka.json")) + if err != nil { + return nil, err + } + + dMaaPTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaDmaap.json")) + if err != nil { + return nil, err + } + + for i, typeDef := range typeDefs.Types { + if typeDef.IsKafkaType() { + typeDefs.Types[i].TypeSchema = kafkaTypeSchema + } else { + typeDefs.Types[i].TypeSchema = dMaaPTypeSchema + } + } return typeDefs.Types, nil } + +func getTypeSchema(schemaFile string) (interface{}, error) { + typeDefsByte, err := os.ReadFile(schemaFile) + if err != nil { + return nil, err + } + var schema interface{} + err = json.Unmarshal(typeDefsByte, &schema) + if err != nil { + return nil, err + } + return schema, nil +}