2 // ========================LICENSE_START=================================
5 // Copyright (C) 2021: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
28 log "github.com/sirupsen/logrus"
29 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
30 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
33 type TypeData struct {
34 TypeId string `json:"id"`
35 DMaaPTopicURL string `json:"dmaapTopicUrl"`
36 jobsHandler *jobsHandler
40 Owner string `json:"owner"`
41 LastUpdated string `json:"last_updated"`
42 InfoJobIdentity string `json:"info_job_identity"`
43 TargetUri string `json:"target_uri"`
44 InfoJobData interface{} `json:"info_job_data"`
45 InfoTypeIdentity string `json:"info_type_identity"`
48 type JobTypesManager interface {
49 LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
50 GetSupportedTypes() []string
53 type JobsManager interface {
54 AddJobFromRESTCall(JobInfo) error
55 DeleteJobFromRESTCall(jobId string)
58 type JobsManagerImpl struct {
59 allTypes map[string]TypeData
60 pollClient restclient.HTTPClient
62 distributeClient restclient.HTTPClient
65 func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
66 return &JobsManagerImpl{
67 allTypes: make(map[string]TypeData),
68 pollClient: pollClient,
70 distributeClient: distributeClient,
74 func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
75 if err := jm.validateJobInfo(ji); err == nil {
76 typeData := jm.allTypes[ji.InfoTypeIdentity]
77 typeData.jobsHandler.addJobCh <- ji
78 log.Debug("Added job: ", ji)
85 func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
86 for _, typeData := range jm.allTypes {
87 log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
88 typeData.jobsHandler.deleteJobCh <- jobId
90 log.Debug("Deleted job: ", jobId)
93 func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
94 if _, ok := jm.allTypes[ji.InfoTypeIdentity]; !ok {
95 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
97 if ji.InfoJobIdentity == "" {
98 return fmt.Errorf("missing required job identity: %v", ji)
100 // Temporary for when there are only REST callbacks needed
101 if ji.TargetUri == "" {
102 return fmt.Errorf("missing required target URI: %v", ji)
107 func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
108 for _, typeDef := range types {
109 jm.allTypes[typeDef.Id] = TypeData{
111 DMaaPTopicURL: typeDef.DmaapTopicURL,
112 jobsHandler: newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
118 func (jm *JobsManagerImpl) GetSupportedTypes() []string {
119 supportedTypes := []string{}
120 for k := range jm.allTypes {
121 supportedTypes = append(supportedTypes, k)
123 return supportedTypes
126 func (jm *JobsManagerImpl) StartJobsForAllTypes() {
127 for _, jobType := range jm.allTypes {
129 go jobType.jobsHandler.startPollingAndDistribution(jm.mrAddress)
134 type jobsHandler struct {
139 addJobCh chan JobInfo
140 deleteJobCh chan string
141 pollClient restclient.HTTPClient
142 distributeClient restclient.HTTPClient
145 func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
149 jobs: make(map[string]job),
150 addJobCh: make(chan JobInfo),
151 deleteJobCh: make(chan string),
152 pollClient: pollClient,
153 distributeClient: distributeClient,
157 func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) {
160 jh.pollAndDistributeMessages(mRAddress)
166 jh.monitorManagementChannels()
171 func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) {
172 log.Debugf("Processing jobs for type: %v", jh.typeId)
173 messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
175 log.Warn("Error getting data from MR. Cause: ", error)
176 time.Sleep(time.Minute) // Must wait before trying to call MR again
178 log.Debug("Received messages: ", string(messagesBody))
179 jh.distributeMessages(messagesBody)
182 func (jh *jobsHandler) distributeMessages(messages []byte) {
183 if len(messages) > 2 {
186 for _, job := range jh.jobs {
187 if len(job.messagesChannel) < cap(job.messagesChannel) {
188 job.messagesChannel <- messages
190 jh.emptyMessagesBuffer(job)
196 func (jh *jobsHandler) emptyMessagesBuffer(job job) {
197 log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
201 case <-job.messagesChannel:
208 func (jh *jobsHandler) monitorManagementChannels() {
210 case addedJob := <-jh.addJobCh:
212 case deletedJob := <-jh.deleteJobCh:
213 jh.deleteJob(deletedJob)
217 func (jh *jobsHandler) addJob(addedJob JobInfo) {
219 log.Debug("Add job: ", addedJob)
220 newJob := newJob(addedJob, jh.distributeClient)
222 jh.jobs[addedJob.InfoJobIdentity] = newJob
226 func (jh *jobsHandler) deleteJob(deletedJob string) {
228 log.Debug("Delete job: ", deletedJob)
229 j, exist := jh.jobs[deletedJob]
231 j.controlChannel <- struct{}{}
232 delete(jh.jobs, deletedJob)
239 client restclient.HTTPClient
240 messagesChannel chan []byte
241 controlChannel chan struct{}
244 func newJob(j JobInfo, c restclient.HTTPClient) job {
248 messagesChannel: make(chan []byte, 10),
249 controlChannel: make(chan struct{}),
253 func (j *job) start() {
257 case <-j.controlChannel:
258 log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
260 case msg := <-j.messagesChannel:
261 j.sendMessagesToConsumer(msg)
266 func (j *job) sendMessagesToConsumer(messages []byte) {
267 log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
268 if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
269 log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
271 log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)