2 // ========================LICENSE_START=================================
5 // Copyright (C) 2021: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
29 log "github.com/sirupsen/logrus"
30 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
33 type TypeDefinitions struct {
34 Types []TypeDefinition `json:"types"`
36 type TypeDefinition struct {
38 DmaapTopicURL string `json:"dmaapTopicUrl"`
41 type TypeData struct {
42 TypeId string `json:"id"`
43 DMaaPTopicURL string `json:"dmaapTopicUrl"`
44 Jobs map[string]JobInfo
48 Owner string `json:"owner"`
49 LastUpdated string `json:"last_updated"`
50 InfoJobIdentity string `json:"info_job_identity"`
51 TargetUri string `json:"target_uri"`
52 InfoJobData interface{} `json:"info_job_data"`
53 InfoTypeIdentity string `json:"info_type_identity"`
56 type JobHandler interface {
62 configFile = "configs/type_config.json"
64 allTypes = make(map[string]TypeData)
68 Handler = newJobHandlerImpl()
71 type jobHandlerImpl struct{}
73 func newJobHandlerImpl() *jobHandlerImpl {
74 return &jobHandlerImpl{}
77 func (jh *jobHandlerImpl) AddJob(ji JobInfo) error {
80 if err := validateJobInfo(ji); err == nil {
81 jobs := allTypes[ji.InfoTypeIdentity].Jobs
82 jobs[ji.InfoJobIdentity] = ji
83 log.Debug("Added job: ", ji)
90 func validateJobInfo(ji JobInfo) error {
91 if _, ok := allTypes[ji.InfoTypeIdentity]; !ok {
92 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
94 if ji.InfoJobIdentity == "" {
95 return fmt.Errorf("missing required job identity: %v", ji)
97 // Temporary for when there are only REST callbacks needed
98 if ji.TargetUri == "" {
99 return fmt.Errorf("missing required target URI: %v", ji)
104 func GetTypes() ([]TypeData, error) {
107 types := make([]TypeData, 0, 1)
108 typeDefsByte, err := os.ReadFile(configFile)
112 typeDefs := TypeDefinitions{}
113 err = json.Unmarshal(typeDefsByte, &typeDefs)
117 for _, typeDef := range typeDefs.Types {
118 typeInfo := TypeData{
120 DMaaPTopicURL: typeDef.DmaapTopicURL,
121 Jobs: make(map[string]JobInfo),
123 if _, ok := allTypes[typeInfo.TypeId]; !ok {
124 allTypes[typeInfo.TypeId] = typeInfo
126 types = append(types, typeInfo)
131 func GetSupportedTypes() []string {
134 supportedTypes := []string{}
135 for k := range allTypes {
136 supportedTypes = append(supportedTypes, k)
138 return supportedTypes
141 func AddJob(job JobInfo) error {
142 return Handler.AddJob(job)
145 func RunJobs(mRAddress string) {
147 pollAndDistributeMessages(mRAddress)
151 func pollAndDistributeMessages(mRAddress string) {
152 for typeId, typeInfo := range allTypes {
153 log.Debugf("Processing jobs for type: %v", typeId)
154 messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL))
156 log.Warnf("Error getting data from MR. Cause: %v", error)
159 distributeMessages(messagesBody, typeInfo)
163 func distributeMessages(messages []byte, typeInfo TypeData) {
164 if len(messages) > 2 {
166 for _, jobInfo := range typeInfo.Jobs {
167 go sendMessagesToConsumer(messages, jobInfo)
173 func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
174 log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
175 if postErr := restclient.Post(jobInfo.TargetUri, messages); postErr != nil {
176 log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
181 allTypes = make(map[string]TypeData)