2 // ========================LICENSE_START=================================
5 // Copyright (C) 2021: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
31 log "github.com/sirupsen/logrus"
32 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
36 TypeId string `json:"id"`
37 DMaaPTopic string `json:"dmaapTopic"`
38 Schema string `json:"schema"`
39 Jobs map[string]JobInfo
43 Owner string `json:"owner"`
44 LastUpdated string `json:"last_updated"`
45 InfoJobIdentity string `json:"info_job_identity"`
46 TargetUri string `json:"target_uri"`
47 InfoJobData string `json:"info_job_data"`
48 InfoTypeIdentity string `json:"info_type_identity"`
51 type JobHandler interface {
59 allJobs = make(map[string]Type)
63 Handler = newJobHandlerImpl()
66 type jobHandlerImpl struct{}
68 func newJobHandlerImpl() *jobHandlerImpl {
69 return &jobHandlerImpl{}
72 func (jh *jobHandlerImpl) AddJob(ji JobInfo) error {
75 if err := validateJobInfo(ji); err == nil {
76 jobs := allJobs[ji.InfoTypeIdentity].Jobs
77 jobs[ji.InfoJobIdentity] = ji
84 func validateJobInfo(ji JobInfo) error {
85 if _, ok := allJobs[ji.InfoTypeIdentity]; !ok {
86 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
88 if ji.InfoJobIdentity == "" {
89 return fmt.Errorf("missing required job identity: %v", ji)
91 // Temporary for when there are only REST callbacks needed
92 if ji.TargetUri == "" {
93 return fmt.Errorf("missing required target URI: %v", ji)
98 func GetTypes() ([]*Type, error) {
101 types := make([]*Type, 0, 1)
102 err := filepath.Walk(typeDir,
103 func(path string, info os.FileInfo, err error) error {
107 if strings.Contains(path, ".json") {
108 if jobType, err := getType(path); err == nil {
109 types = append(types, jobType)
120 func GetSupportedTypes() []string {
123 supportedTypes := []string{}
124 for k := range allJobs {
125 supportedTypes = append(supportedTypes, k)
127 return supportedTypes
130 func AddJob(job JobInfo) error {
131 return Handler.AddJob(job)
134 func getType(path string) (*Type, error) {
135 if typeDefinition, err := os.ReadFile(path); err == nil {
136 var dat map[string]interface{}
137 if marshalError := json.Unmarshal(typeDefinition, &dat); marshalError == nil {
138 schema, _ := json.Marshal(dat["schema"])
140 TypeId: dat["id"].(string),
141 DMaaPTopic: dat["dmaapTopic"].(string),
142 Schema: string(schema),
143 Jobs: make(map[string]JobInfo),
145 if _, ok := allJobs[typeInfo.TypeId]; !ok {
146 allJobs[typeInfo.TypeId] = typeInfo
148 return &typeInfo, nil
150 return nil, marshalError
157 func RunJobs(mRAddress string) {
159 pollAndDistributeMessages(mRAddress)
163 func pollAndDistributeMessages(mRAddress string) {
164 for typeId, typeInfo := range allJobs {
165 log.Debugf("Processing jobs for type: %v", typeId)
166 messagesBody, error := restclient.Get(fmt.Sprintf("%v/events/%v/users/dmaapmediatorproducer", mRAddress, typeInfo.DMaaPTopic))
168 log.Warnf("Error getting data from MR. Cause: %v", error)
171 distributeMessages(messagesBody, typeInfo)
175 func distributeMessages(messages []byte, typeInfo Type) {
176 if len(messages) > 2 {
178 for _, jobInfo := range typeInfo.Jobs {
179 go sendMessagesToConsumer(messages, jobInfo)
185 func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
186 log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
187 if postErr := restclient.Post(jobInfo.TargetUri, messages); postErr != nil {
188 log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
193 allJobs = make(map[string]Type)