X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-mediator-producer%2Finternal%2Fconfig%2Fconfig.go;h=7582e9cc7bf60526f053da8328c5a2925dfb863b;hb=6f5d3d1eccb8a1857c645ba6bd0b5e1b89ca7088;hp=e03c40ac1405aba4df88a9e2002092c85f33a243;hpb=6b43426c2b52daa5ace5205b98d09e1871fa41d6;p=nonrtric.git diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go index e03c40ac..7582e9cc 100644 --- a/dmaap-mediator-producer/internal/config/config.go +++ b/dmaap-mediator-producer/internal/config/config.go @@ -24,6 +24,7 @@ import ( "encoding/json" "fmt" "os" + "path/filepath" "strconv" log "github.com/sirupsen/logrus" @@ -35,6 +36,7 @@ type Config struct { InfoProducerPort int InfoCoordinatorAddress string DMaaPMRAddress string + KafkaBootstrapServers string ProducerCertPath string ProducerKeyPath string } @@ -45,6 +47,7 @@ func New() *Config { InfoProducerPort: getEnvAsInt("INFO_PRODUCER_PORT", 8085), InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://informationservice:8434"), DMaaPMRAddress: getEnv("DMAAP_MR_ADDR", "https://message-router.onap:3905"), + KafkaBootstrapServers: getEnv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"), ProducerCertPath: getEnv("PRODUCER_CERT_PATH", "security/producer.crt"), ProducerKeyPath: getEnv("PRODUCER_KEY_PATH", "security/producer.key"), LogLevel: getLogLevel(), @@ -83,8 +86,8 @@ func getLogLevel() log.Level { } } -func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) { - typeDefsByte, err := os.ReadFile(configFile) +func GetJobTypesFromConfiguration(configFolder string) ([]TypeDefinition, error) { + typeDefsByte, err := os.ReadFile(filepath.Join(configFolder, "type_config.json")) if err != nil { return nil, err } @@ -96,5 +99,35 @@ func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) { return nil, err } + kafkaTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaKafka.json")) + if err != nil { + return nil, err + } + + dMaaPTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaDmaap.json")) + if err != nil { + return nil, err + } + + for i, typeDef := range typeDefs.Types { + if typeDef.IsKafkaType() { + typeDefs.Types[i].TypeSchema = kafkaTypeSchema + } else { + typeDefs.Types[i].TypeSchema = dMaaPTypeSchema + } + } return typeDefs.Types, nil } + +func getTypeSchema(schemaFile string) (interface{}, error) { + typeDefsByte, err := os.ReadFile(schemaFile) + if err != nil { + return nil, err + } + var schema interface{} + err = json.Unmarshal(typeDefsByte, &schema) + if err != nil { + return nil, err + } + return schema, nil +}