2 // ========================LICENSE_START=================================
5 // Copyright (C) 2021: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
29 log "github.com/sirupsen/logrus"
30 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
31 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
34 type TypeData struct {
35 TypeId string `json:"id"`
36 DMaaPTopicURL string `json:"dmaapTopicUrl"`
37 jobHandler *jobHandler
41 Owner string `json:"owner"`
42 LastUpdated string `json:"last_updated"`
43 InfoJobIdentity string `json:"info_job_identity"`
44 TargetUri string `json:"target_uri"`
45 InfoJobData interface{} `json:"info_job_data"`
46 InfoTypeIdentity string `json:"info_type_identity"`
49 type JobTypesManager interface {
50 LoadTypesFromConfiguration() ([]config.TypeDefinition, error)
51 GetSupportedTypes() []string
54 type JobsManager interface {
56 DeleteJob(jobId string)
59 type JobsManagerImpl struct {
61 allTypes map[string]TypeData
62 pollClient restclient.HTTPClient
64 distributeClient restclient.HTTPClient
67 type jobHandler struct {
71 jobs map[string]JobInfo
73 deleteJobCh chan string
74 pollClient restclient.HTTPClient
75 distributeClient restclient.HTTPClient
78 func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
79 return &JobsManagerImpl{
80 configFile: typeConfigFilePath,
81 allTypes: make(map[string]TypeData),
82 pollClient: pollClient,
84 distributeClient: distributeClient,
88 func (jm *JobsManagerImpl) AddJob(ji JobInfo) error {
89 if err := jm.validateJobInfo(ji); err == nil {
90 typeData := jm.allTypes[ji.InfoTypeIdentity]
91 typeData.jobHandler.addJobCh <- ji
92 log.Debug("Added job: ", ji)
99 func (jm *JobsManagerImpl) DeleteJob(jobId string) {
100 for _, typeData := range jm.allTypes {
101 log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
102 typeData.jobHandler.deleteJobCh <- jobId
104 log.Debug("Deleted job: ", jobId)
107 func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
108 if _, ok := jm.allTypes[ji.InfoTypeIdentity]; !ok {
109 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
111 if ji.InfoJobIdentity == "" {
112 return fmt.Errorf("missing required job identity: %v", ji)
114 // Temporary for when there are only REST callbacks needed
115 if ji.TargetUri == "" {
116 return fmt.Errorf("missing required target URI: %v", ji)
121 func (jm *JobsManagerImpl) LoadTypesFromConfiguration() ([]config.TypeDefinition, error) {
122 typeDefsByte, err := os.ReadFile(jm.configFile)
127 Types []config.TypeDefinition `json:"types"`
129 err = json.Unmarshal(typeDefsByte, &typeDefs)
133 for _, typeDef := range typeDefs.Types {
134 addCh := make(chan JobInfo)
135 deleteCh := make(chan string)
138 topicUrl: typeDef.DmaapTopicURL,
139 jobs: make(map[string]JobInfo),
141 deleteJobCh: deleteCh,
142 pollClient: jm.pollClient,
143 distributeClient: jm.distributeClient,
145 jm.allTypes[typeDef.Id] = TypeData{
147 DMaaPTopicURL: typeDef.DmaapTopicURL,
151 return typeDefs.Types, nil
154 func (jm *JobsManagerImpl) GetSupportedTypes() []string {
155 supportedTypes := []string{}
156 for k := range jm.allTypes {
157 supportedTypes = append(supportedTypes, k)
159 return supportedTypes
162 func (jm *JobsManagerImpl) StartJobs() {
163 for _, jobType := range jm.allTypes {
165 go jobType.jobHandler.start(jm.mrAddress)
170 func (jh *jobHandler) start(mRAddress string) {
173 jh.pollAndDistributeMessages(mRAddress)
179 jh.monitorManagementChannels()
184 func (jh *jobHandler) pollAndDistributeMessages(mRAddress string) {
187 log.Debugf("Processing jobs for type: %v", jh.typeId)
188 messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
190 log.Warnf("Error getting data from MR. Cause: %v", error)
192 log.Debugf("Received messages: %v", string(messagesBody))
193 jh.distributeMessages(messagesBody)
196 func (jh *jobHandler) distributeMessages(messages []byte) {
197 if len(messages) > 2 {
198 for _, jobInfo := range jh.jobs {
199 go jh.sendMessagesToConsumer(messages, jobInfo)
204 func (jh *jobHandler) sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
205 log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
206 if postErr := restclient.Post(jobInfo.TargetUri, messages, jh.distributeClient); postErr != nil {
207 log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
209 log.Debugf("Messages distributed to consumer: %v.", jobInfo.Owner)
212 func (jh *jobHandler) monitorManagementChannels() {
214 case addedJob := <-jh.addJobCh:
216 log.Debugf("received %v from addJobCh\n", addedJob)
217 jh.jobs[addedJob.InfoJobIdentity] = addedJob
219 case deletedJob := <-jh.deleteJobCh:
221 log.Debugf("received %v from deleteJobCh\n", deletedJob)
222 delete(jh.jobs, deletedJob)