package jobs
import (
- "encoding/json"
"fmt"
- "os"
"sync"
log "github.com/sirupsen/logrus"
}
type JobTypesManager interface {
- LoadTypesFromConfiguration() ([]config.TypeDefinition, error)
+ LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
GetSupportedTypes() []string
}
}
type JobsManagerImpl struct {
- configFile string
allTypes map[string]TypeData
pollClient restclient.HTTPClient
mrAddress string
distributeClient restclient.HTTPClient
}
-func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
+func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
return &JobsManagerImpl{
- configFile: typeConfigFilePath,
allTypes: make(map[string]TypeData),
pollClient: pollClient,
mrAddress: mrAddr,
return nil
}
-func (jm *JobsManagerImpl) LoadTypesFromConfiguration() ([]config.TypeDefinition, error) {
- typeDefsByte, err := os.ReadFile(jm.configFile)
- if err != nil {
- return nil, err
- }
- typeDefs := struct {
- Types []config.TypeDefinition `json:"types"`
- }{}
- err = json.Unmarshal(typeDefsByte, &typeDefs)
- if err != nil {
- return nil, err
- }
- for _, typeDef := range typeDefs.Types {
+func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
+ for _, typeDef := range types {
jm.allTypes[typeDef.Id] = TypeData{
TypeId: typeDef.Id,
DMaaPTopicURL: typeDef.DmaapTopicURL,
jobsHandler: newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
}
}
- return typeDefs.Types, nil
+ return types
}
func (jm *JobsManagerImpl) GetSupportedTypes() []string {