2 // ========================LICENSE_START=================================
5 // Copyright (C) 2021: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
29 log "github.com/sirupsen/logrus"
30 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
33 type TypeDefinitions struct {
34 Types []TypeDefinition `json:"types"`
36 type TypeDefinition struct {
38 DmaapTopicURL string `json:"dmaapTopicUrl"`
41 type TypeData struct {
42 TypeId string `json:"id"`
43 DMaaPTopicURL string `json:"dmaapTopicUrl"`
44 Jobs map[string]JobInfo
48 Owner string `json:"owner"`
49 LastUpdated string `json:"last_updated"`
50 InfoJobIdentity string `json:"info_job_identity"`
51 TargetUri string `json:"target_uri"`
52 InfoJobData interface{} `json:"info_job_data"`
53 InfoTypeIdentity string `json:"info_type_identity"`
56 type JobHandler interface {
58 DeleteJob(jobId string)
63 configFile = "configs/type_config.json"
65 allTypes = make(map[string]TypeData)
69 Handler = newJobHandlerImpl()
72 type jobHandlerImpl struct{}
74 func newJobHandlerImpl() *jobHandlerImpl {
75 return &jobHandlerImpl{}
78 func (jh *jobHandlerImpl) AddJob(ji JobInfo) error {
81 if err := validateJobInfo(ji); err == nil {
82 jobs := allTypes[ji.InfoTypeIdentity].Jobs
83 jobs[ji.InfoJobIdentity] = ji
84 log.Debug("Added job: ", ji)
91 func (jh *jobHandlerImpl) DeleteJob(jobId string) {
94 for _, typeData := range allTypes {
95 delete(typeData.Jobs, jobId)
97 log.Debug("Deleted job: ", jobId)
100 func validateJobInfo(ji JobInfo) error {
101 if _, ok := allTypes[ji.InfoTypeIdentity]; !ok {
102 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
104 if ji.InfoJobIdentity == "" {
105 return fmt.Errorf("missing required job identity: %v", ji)
107 // Temporary for when there are only REST callbacks needed
108 if ji.TargetUri == "" {
109 return fmt.Errorf("missing required target URI: %v", ji)
114 func GetTypes() ([]TypeData, error) {
117 types := make([]TypeData, 0, 1)
118 typeDefsByte, err := os.ReadFile(configFile)
122 typeDefs := TypeDefinitions{}
123 err = json.Unmarshal(typeDefsByte, &typeDefs)
127 for _, typeDef := range typeDefs.Types {
128 typeInfo := TypeData{
130 DMaaPTopicURL: typeDef.DmaapTopicURL,
131 Jobs: make(map[string]JobInfo),
133 if _, ok := allTypes[typeInfo.TypeId]; !ok {
134 allTypes[typeInfo.TypeId] = typeInfo
136 types = append(types, typeInfo)
141 func GetSupportedTypes() []string {
144 supportedTypes := []string{}
145 for k := range allTypes {
146 supportedTypes = append(supportedTypes, k)
148 return supportedTypes
151 func AddJob(job JobInfo) error {
152 return Handler.AddJob(job)
155 func DeleteJob(jobId string) {
156 Handler.DeleteJob(jobId)
159 func RunJobs(mRAddress string) {
161 pollAndDistributeMessages(mRAddress)
165 func pollAndDistributeMessages(mRAddress string) {
166 for typeId, typeInfo := range allTypes {
167 log.Debugf("Processing jobs for type: %v", typeId)
168 messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL))
170 log.Warnf("Error getting data from MR. Cause: %v", error)
173 distributeMessages(messagesBody, typeInfo)
177 func distributeMessages(messages []byte, typeInfo TypeData) {
178 if len(messages) > 2 {
180 for _, jobInfo := range typeInfo.Jobs {
181 go sendMessagesToConsumer(messages, jobInfo)
187 func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
188 log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
189 if postErr := restclient.Post(jobInfo.TargetUri, messages); postErr != nil {
190 log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
195 allTypes = make(map[string]TypeData)