"encoding/json"
"fmt"
"os"
+ "path/filepath"
"strconv"
log "github.com/sirupsen/logrus"
InfoProducerPort int
InfoCoordinatorAddress string
DMaaPMRAddress string
+ KafkaBootstrapServers string
ProducerCertPath string
ProducerKeyPath string
}
InfoProducerPort: getEnvAsInt("INFO_PRODUCER_PORT", 8085),
InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://informationservice:8434"),
DMaaPMRAddress: getEnv("DMAAP_MR_ADDR", "https://message-router.onap:3905"),
+ KafkaBootstrapServers: getEnv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),
ProducerCertPath: getEnv("PRODUCER_CERT_PATH", "security/producer.crt"),
ProducerKeyPath: getEnv("PRODUCER_KEY_PATH", "security/producer.key"),
LogLevel: getLogLevel(),
}
}
-func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) {
- typeDefsByte, err := os.ReadFile(configFile)
+func GetJobTypesFromConfiguration(configFolder string) ([]TypeDefinition, error) {
+ typeDefsByte, err := os.ReadFile(filepath.Join(configFolder, "type_config.json"))
if err != nil {
return nil, err
}
return nil, err
}
+ kafkaTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaKafka.json"))
+ if err != nil {
+ return nil, err
+ }
+
+ dMaaPTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaDmaap.json"))
+ if err != nil {
+ return nil, err
+ }
+
+ for i, typeDef := range typeDefs.Types {
+ if typeDef.IsKafkaType() {
+ typeDefs.Types[i].TypeSchema = kafkaTypeSchema
+ } else {
+ typeDefs.Types[i].TypeSchema = dMaaPTypeSchema
+ }
+ }
return typeDefs.Types, nil
}
+
+func getTypeSchema(schemaFile string) (interface{}, error) {
+ typeDefsByte, err := os.ReadFile(schemaFile)
+ if err != nil {
+ return nil, err
+ }
+ var schema interface{}
+ err = json.Unmarshal(typeDefsByte, &schema)
+ if err != nil {
+ return nil, err
+ }
+ return schema, nil
+}