+
+func getLogLevel() log.Level {
+ logLevelStr := getEnv("LOG_LEVEL", "Info")
+ if loglevel, err := log.ParseLevel(logLevelStr); err == nil {
+ return loglevel
+ } else {
+ log.Warnf("Invalid log level: %v. Log level will be Info!", logLevelStr)
+ return log.InfoLevel
+ }
+}
+
+func GetJobTypesFromConfiguration(configFolder string) ([]TypeDefinition, error) {
+ typeDefsByte, err := os.ReadFile(filepath.Join(configFolder, "type_config.json"))
+ if err != nil {
+ return nil, err
+ }
+ typeDefs := struct {
+ Types []TypeDefinition `json:"types"`
+ }{}
+ err = json.Unmarshal(typeDefsByte, &typeDefs)
+ if err != nil {
+ return nil, err
+ }
+
+ kafkaTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaKafka.json"))
+ if err != nil {
+ return nil, err
+ }
+
+ dMaaPTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaDmaap.json"))
+ if err != nil {
+ return nil, err
+ }
+
+ for i, typeDef := range typeDefs.Types {
+ if typeDef.IsKafkaType() {
+ typeDefs.Types[i].TypeSchema = kafkaTypeSchema
+ } else {
+ typeDefs.Types[i].TypeSchema = dMaaPTypeSchema
+ }
+ }
+ return typeDefs.Types, nil
+}
+
+func getTypeSchema(schemaFile string) (interface{}, error) {
+ typeDefsByte, err := os.ReadFile(schemaFile)
+ if err != nil {
+ return nil, err
+ }
+ var schema interface{}
+ err = json.Unmarshal(typeDefsByte, &schema)
+ if err != nil {
+ return nil, err
+ }
+ return schema, nil
+}