X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-mediator-producer%2Finternal%2Fconfig%2Fconfig.go;h=7582e9cc7bf60526f053da8328c5a2925dfb863b;hb=6f5d3d1eccb8a1857c645ba6bd0b5e1b89ca7088;hp=9b7b1dd1544b184141abe4546a40a3430e09413e;hpb=b65d86fc9b02415e1adf2415f8c4a257378e9c09;p=nonrtric.git diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go index 9b7b1dd1..7582e9cc 100644 --- a/dmaap-mediator-producer/internal/config/config.go +++ b/dmaap-mediator-producer/internal/config/config.go @@ -21,38 +21,42 @@ package config import ( + "encoding/json" + "fmt" "os" + "path/filepath" "strconv" log "github.com/sirupsen/logrus" ) type Config struct { - LogLevel string + LogLevel log.Level InfoProducerHost string InfoProducerPort int InfoCoordinatorAddress string - MRHost string - MRPort int -} - -type ProducerRegistrationInfo struct { - InfoProducerSupervisionCallbackUrl string `json:"info_producer_supervision_callback_url"` - SupportedInfoTypes []string `json:"supported_info_types"` - InfoJobCallbackUrl string `json:"info_job_callback_url"` + DMaaPMRAddress string + KafkaBootstrapServers string + ProducerCertPath string + ProducerKeyPath string } func New() *Config { return &Config{ - LogLevel: getEnv("LOG_LEVEL", "Info"), InfoProducerHost: getEnv("INFO_PRODUCER_HOST", ""), InfoProducerPort: getEnvAsInt("INFO_PRODUCER_PORT", 8085), - InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"), - MRHost: getEnv("MR_HOST", "http://message-router.onap"), - MRPort: getEnvAsInt("MR_PORT", 3904), + InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://informationservice:8434"), + DMaaPMRAddress: getEnv("DMAAP_MR_ADDR", "https://message-router.onap:3905"), + KafkaBootstrapServers: getEnv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"), + ProducerCertPath: getEnv("PRODUCER_CERT_PATH", "security/producer.crt"), + ProducerKeyPath: getEnv("PRODUCER_KEY_PATH", "security/producer.key"), + LogLevel: getLogLevel(), } } +func (c Config) String() string { + return fmt.Sprintf("InfoProducerHost: %v, InfoProducerPort: %v, InfoCoordinatorAddress: %v, DMaaPMRAddress: %v, ProducerCertPath: %v, ProducerKeyPath: %v, LogLevel: %v", c.InfoProducerHost, c.InfoProducerPort, c.InfoCoordinatorAddress, c.DMaaPMRAddress, c.ProducerCertPath, c.ProducerKeyPath, c.LogLevel) +} func getEnv(key string, defaultVal string) string { if value, exists := os.LookupEnv(key); exists { return value @@ -71,3 +75,59 @@ func getEnvAsInt(name string, defaultVal int) int { return defaultVal } + +func getLogLevel() log.Level { + logLevelStr := getEnv("LOG_LEVEL", "Info") + if loglevel, err := log.ParseLevel(logLevelStr); err == nil { + return loglevel + } else { + log.Warnf("Invalid log level: %v. Log level will be Info!", logLevelStr) + return log.InfoLevel + } +} + +func GetJobTypesFromConfiguration(configFolder string) ([]TypeDefinition, error) { + typeDefsByte, err := os.ReadFile(filepath.Join(configFolder, "type_config.json")) + if err != nil { + return nil, err + } + typeDefs := struct { + Types []TypeDefinition `json:"types"` + }{} + err = json.Unmarshal(typeDefsByte, &typeDefs) + if err != nil { + return nil, err + } + + kafkaTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaKafka.json")) + if err != nil { + return nil, err + } + + dMaaPTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaDmaap.json")) + if err != nil { + return nil, err + } + + for i, typeDef := range typeDefs.Types { + if typeDef.IsKafkaType() { + typeDefs.Types[i].TypeSchema = kafkaTypeSchema + } else { + typeDefs.Types[i].TypeSchema = dMaaPTypeSchema + } + } + return typeDefs.Types, nil +} + +func getTypeSchema(schemaFile string) (interface{}, error) { + typeDefsByte, err := os.ReadFile(schemaFile) + if err != nil { + return nil, err + } + var schema interface{} + err = json.Unmarshal(typeDefsByte, &schema) + if err != nil { + return nil, err + } + return schema, nil +}