2 // ========================LICENSE_START=================================
5 // Copyright (C) 2021: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
29 log "github.com/sirupsen/logrus"
30 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
31 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
34 type TypeData struct {
35 TypeId string `json:"id"`
36 DMaaPTopicURL string `json:"dmaapTopicUrl"`
37 Jobs map[string]JobInfo
41 Owner string `json:"owner"`
42 LastUpdated string `json:"last_updated"`
43 InfoJobIdentity string `json:"info_job_identity"`
44 TargetUri string `json:"target_uri"`
45 InfoJobData interface{} `json:"info_job_data"`
46 InfoTypeIdentity string `json:"info_type_identity"`
49 type JobTypeHandler interface {
50 GetTypes() ([]config.TypeDefinition, error)
51 GetSupportedTypes() []string
54 type JobHandler interface {
56 DeleteJob(jobId string)
59 type JobHandlerImpl struct {
62 allTypes map[string]TypeData
63 pollClient restclient.HTTPClient
64 distributeClient restclient.HTTPClient
67 func NewJobHandlerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *JobHandlerImpl {
68 return &JobHandlerImpl{
69 configFile: typeConfigFilePath,
70 allTypes: make(map[string]TypeData),
71 pollClient: pollClient,
72 distributeClient: distributeClient,
76 func (jh *JobHandlerImpl) AddJob(ji JobInfo) error {
79 if err := jh.validateJobInfo(ji); err == nil {
80 jobs := jh.allTypes[ji.InfoTypeIdentity].Jobs
81 jobs[ji.InfoJobIdentity] = ji
82 log.Debug("Added job: ", ji)
89 func (jh *JobHandlerImpl) DeleteJob(jobId string) {
92 for _, typeData := range jh.allTypes {
93 delete(typeData.Jobs, jobId)
95 log.Debug("Deleted job: ", jobId)
98 func (jh *JobHandlerImpl) validateJobInfo(ji JobInfo) error {
99 if _, ok := jh.allTypes[ji.InfoTypeIdentity]; !ok {
100 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
102 if ji.InfoJobIdentity == "" {
103 return fmt.Errorf("missing required job identity: %v", ji)
105 // Temporary for when there are only REST callbacks needed
106 if ji.TargetUri == "" {
107 return fmt.Errorf("missing required target URI: %v", ji)
112 func (jh *JobHandlerImpl) GetTypes() ([]config.TypeDefinition, error) {
115 typeDefsByte, err := os.ReadFile(jh.configFile)
120 Types []config.TypeDefinition `json:"types"`
122 err = json.Unmarshal(typeDefsByte, &typeDefs)
126 for _, typeDef := range typeDefs.Types {
127 jh.allTypes[typeDef.Id] = TypeData{
129 DMaaPTopicURL: typeDef.DmaapTopicURL,
130 Jobs: make(map[string]JobInfo),
133 return typeDefs.Types, nil
136 func (jh *JobHandlerImpl) GetSupportedTypes() []string {
139 supportedTypes := []string{}
140 for k := range jh.allTypes {
141 supportedTypes = append(supportedTypes, k)
143 return supportedTypes
146 func (jh *JobHandlerImpl) RunJobs(mRAddress string) {
148 jh.pollAndDistributeMessages(mRAddress)
152 func (jh *JobHandlerImpl) pollAndDistributeMessages(mRAddress string) {
155 for typeId, typeInfo := range jh.allTypes {
156 log.Debugf("Processing jobs for type: %v", typeId)
157 messagesBody, error := restclient.Get(mRAddress+typeInfo.DMaaPTopicURL, jh.pollClient)
159 log.Warnf("Error getting data from MR. Cause: %v", error)
162 log.Debugf("Received messages: %v", string(messagesBody))
163 jh.distributeMessages(messagesBody, typeInfo)
167 func (jh *JobHandlerImpl) distributeMessages(messages []byte, typeInfo TypeData) {
168 if len(messages) > 2 {
169 for _, jobInfo := range typeInfo.Jobs {
170 go jh.sendMessagesToConsumer(messages, jobInfo)
175 func (jh *JobHandlerImpl) sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
176 log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
177 if postErr := restclient.Post(jobInfo.TargetUri, messages, jh.distributeClient); postErr != nil {
178 log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
180 log.Debugf("Messages distributed to consumer: %v.", jobInfo.Owner)
183 func (jh *JobHandlerImpl) clearAll() {
184 jh.allTypes = make(map[string]TypeData)