2 // ========================LICENSE_START=================================
5 // Copyright (C) 2021: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
29 log "github.com/sirupsen/logrus"
30 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
31 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
34 type TypeData struct {
35 TypeId string `json:"id"`
36 DMaaPTopicURL string `json:"dmaapTopicUrl"`
37 jobsHandler *jobsHandler
41 Owner string `json:"owner"`
42 LastUpdated string `json:"last_updated"`
43 InfoJobIdentity string `json:"info_job_identity"`
44 TargetUri string `json:"target_uri"`
45 InfoJobData interface{} `json:"info_job_data"`
46 InfoTypeIdentity string `json:"info_type_identity"`
49 type JobTypesManager interface {
50 LoadTypesFromConfiguration() ([]config.TypeDefinition, error)
51 GetSupportedTypes() []string
54 type JobsManager interface {
55 AddJobFromRESTCall(JobInfo) error
56 DeleteJobFromRESTCall(jobId string)
59 type JobsManagerImpl struct {
61 allTypes map[string]TypeData
62 pollClient restclient.HTTPClient
64 distributeClient restclient.HTTPClient
67 func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
68 return &JobsManagerImpl{
69 configFile: typeConfigFilePath,
70 allTypes: make(map[string]TypeData),
71 pollClient: pollClient,
73 distributeClient: distributeClient,
77 func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
78 if err := jm.validateJobInfo(ji); err == nil {
79 typeData := jm.allTypes[ji.InfoTypeIdentity]
80 typeData.jobsHandler.addJobCh <- ji
81 log.Debug("Added job: ", ji)
88 func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
89 for _, typeData := range jm.allTypes {
90 log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
91 typeData.jobsHandler.deleteJobCh <- jobId
93 log.Debug("Deleted job: ", jobId)
96 func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
97 if _, ok := jm.allTypes[ji.InfoTypeIdentity]; !ok {
98 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
100 if ji.InfoJobIdentity == "" {
101 return fmt.Errorf("missing required job identity: %v", ji)
103 // Temporary for when there are only REST callbacks needed
104 if ji.TargetUri == "" {
105 return fmt.Errorf("missing required target URI: %v", ji)
110 func (jm *JobsManagerImpl) LoadTypesFromConfiguration() ([]config.TypeDefinition, error) {
111 typeDefsByte, err := os.ReadFile(jm.configFile)
116 Types []config.TypeDefinition `json:"types"`
118 err = json.Unmarshal(typeDefsByte, &typeDefs)
122 for _, typeDef := range typeDefs.Types {
123 jm.allTypes[typeDef.Id] = TypeData{
125 DMaaPTopicURL: typeDef.DmaapTopicURL,
126 jobsHandler: newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
129 return typeDefs.Types, nil
132 func (jm *JobsManagerImpl) GetSupportedTypes() []string {
133 supportedTypes := []string{}
134 for k := range jm.allTypes {
135 supportedTypes = append(supportedTypes, k)
137 return supportedTypes
140 func (jm *JobsManagerImpl) StartJobsForAllTypes() {
141 for _, jobType := range jm.allTypes {
143 go jobType.jobsHandler.startPollingAndDistribution(jm.mrAddress)
148 type jobsHandler struct {
153 addJobCh chan JobInfo
154 deleteJobCh chan string
155 pollClient restclient.HTTPClient
156 distributeClient restclient.HTTPClient
159 func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
163 jobs: make(map[string]job),
164 addJobCh: make(chan JobInfo),
165 deleteJobCh: make(chan string),
166 pollClient: pollClient,
167 distributeClient: distributeClient,
171 func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) {
174 jh.pollAndDistributeMessages(mRAddress)
180 jh.monitorManagementChannels()
185 func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) {
186 log.Debugf("Processing jobs for type: %v", jh.typeId)
187 messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
189 log.Warn("Error getting data from MR. Cause: ", error)
191 log.Debug("Received messages: ", string(messagesBody))
192 jh.distributeMessages(messagesBody)
195 func (jh *jobsHandler) distributeMessages(messages []byte) {
196 if len(messages) > 2 {
199 for _, job := range jh.jobs {
200 if len(job.messagesChannel) < cap(job.messagesChannel) {
201 job.messagesChannel <- messages
203 jh.emptyMessagesBuffer(job)
209 func (jh *jobsHandler) emptyMessagesBuffer(job job) {
210 log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
214 case <-job.messagesChannel:
221 func (jh *jobsHandler) monitorManagementChannels() {
223 case addedJob := <-jh.addJobCh:
225 case deletedJob := <-jh.deleteJobCh:
226 jh.deleteJob(deletedJob)
230 func (jh *jobsHandler) addJob(addedJob JobInfo) {
232 log.Debug("Add job: ", addedJob)
233 newJob := newJob(addedJob, jh.distributeClient)
235 jh.jobs[addedJob.InfoJobIdentity] = newJob
239 func (jh *jobsHandler) deleteJob(deletedJob string) {
241 log.Debug("Delete job: ", deletedJob)
242 j, exist := jh.jobs[deletedJob]
244 j.controlChannel <- struct{}{}
245 delete(jh.jobs, deletedJob)
252 client restclient.HTTPClient
253 messagesChannel chan []byte
254 controlChannel chan struct{}
257 func newJob(j JobInfo, c restclient.HTTPClient) job {
261 messagesChannel: make(chan []byte, 10),
262 controlChannel: make(chan struct{}),
266 func (j *job) start() {
270 case <-j.controlChannel:
271 log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
273 case msg := <-j.messagesChannel:
274 j.sendMessagesToConsumer(msg)
279 func (j *job) sendMessagesToConsumer(messages []byte) {
280 log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
281 if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
282 log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
284 log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)