2 // ========================LICENSE_START=================================
5 // Copyright (C) 2021: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
27 log "github.com/sirupsen/logrus"
28 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
29 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
32 type TypeData struct {
33 TypeId string `json:"id"`
34 DMaaPTopicURL string `json:"dmaapTopicUrl"`
35 jobsHandler *jobsHandler
39 Owner string `json:"owner"`
40 LastUpdated string `json:"last_updated"`
41 InfoJobIdentity string `json:"info_job_identity"`
42 TargetUri string `json:"target_uri"`
43 InfoJobData interface{} `json:"info_job_data"`
44 InfoTypeIdentity string `json:"info_type_identity"`
47 type JobTypesManager interface {
48 LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
49 GetSupportedTypes() []string
52 type JobsManager interface {
53 AddJobFromRESTCall(JobInfo) error
54 DeleteJobFromRESTCall(jobId string)
57 type JobsManagerImpl struct {
58 allTypes map[string]TypeData
59 pollClient restclient.HTTPClient
61 distributeClient restclient.HTTPClient
64 func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
65 return &JobsManagerImpl{
66 allTypes: make(map[string]TypeData),
67 pollClient: pollClient,
69 distributeClient: distributeClient,
73 func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
74 if err := jm.validateJobInfo(ji); err == nil {
75 typeData := jm.allTypes[ji.InfoTypeIdentity]
76 typeData.jobsHandler.addJobCh <- ji
77 log.Debug("Added job: ", ji)
84 func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
85 for _, typeData := range jm.allTypes {
86 log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
87 typeData.jobsHandler.deleteJobCh <- jobId
89 log.Debug("Deleted job: ", jobId)
92 func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
93 if _, ok := jm.allTypes[ji.InfoTypeIdentity]; !ok {
94 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
96 if ji.InfoJobIdentity == "" {
97 return fmt.Errorf("missing required job identity: %v", ji)
99 // Temporary for when there are only REST callbacks needed
100 if ji.TargetUri == "" {
101 return fmt.Errorf("missing required target URI: %v", ji)
106 func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
107 for _, typeDef := range types {
108 jm.allTypes[typeDef.Id] = TypeData{
110 DMaaPTopicURL: typeDef.DmaapTopicURL,
111 jobsHandler: newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
117 func (jm *JobsManagerImpl) GetSupportedTypes() []string {
118 supportedTypes := []string{}
119 for k := range jm.allTypes {
120 supportedTypes = append(supportedTypes, k)
122 return supportedTypes
125 func (jm *JobsManagerImpl) StartJobsForAllTypes() {
126 for _, jobType := range jm.allTypes {
128 go jobType.jobsHandler.startPollingAndDistribution(jm.mrAddress)
133 type jobsHandler struct {
138 addJobCh chan JobInfo
139 deleteJobCh chan string
140 pollClient restclient.HTTPClient
141 distributeClient restclient.HTTPClient
144 func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
148 jobs: make(map[string]job),
149 addJobCh: make(chan JobInfo),
150 deleteJobCh: make(chan string),
151 pollClient: pollClient,
152 distributeClient: distributeClient,
156 func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) {
159 jh.pollAndDistributeMessages(mRAddress)
165 jh.monitorManagementChannels()
170 func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) {
171 log.Debugf("Processing jobs for type: %v", jh.typeId)
172 messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
174 log.Warn("Error getting data from MR. Cause: ", error)
176 log.Debug("Received messages: ", string(messagesBody))
177 jh.distributeMessages(messagesBody)
180 func (jh *jobsHandler) distributeMessages(messages []byte) {
181 if len(messages) > 2 {
184 for _, job := range jh.jobs {
185 if len(job.messagesChannel) < cap(job.messagesChannel) {
186 job.messagesChannel <- messages
188 jh.emptyMessagesBuffer(job)
194 func (jh *jobsHandler) emptyMessagesBuffer(job job) {
195 log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
199 case <-job.messagesChannel:
206 func (jh *jobsHandler) monitorManagementChannels() {
208 case addedJob := <-jh.addJobCh:
210 case deletedJob := <-jh.deleteJobCh:
211 jh.deleteJob(deletedJob)
215 func (jh *jobsHandler) addJob(addedJob JobInfo) {
217 log.Debug("Add job: ", addedJob)
218 newJob := newJob(addedJob, jh.distributeClient)
220 jh.jobs[addedJob.InfoJobIdentity] = newJob
224 func (jh *jobsHandler) deleteJob(deletedJob string) {
226 log.Debug("Delete job: ", deletedJob)
227 j, exist := jh.jobs[deletedJob]
229 j.controlChannel <- struct{}{}
230 delete(jh.jobs, deletedJob)
237 client restclient.HTTPClient
238 messagesChannel chan []byte
239 controlChannel chan struct{}
242 func newJob(j JobInfo, c restclient.HTTPClient) job {
246 messagesChannel: make(chan []byte, 10),
247 controlChannel: make(chan struct{}),
251 func (j *job) start() {
255 case <-j.controlChannel:
256 log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
258 case msg := <-j.messagesChannel:
259 j.sendMessagesToConsumer(msg)
264 func (j *job) sendMessagesToConsumer(messages []byte) {
265 log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
266 if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
267 log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
269 log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)